发布0115
大石头 authored at 2020-01-15 01:11:37
8.54 KiB
X
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using NewLife.Collections;
using NewLife.Log;

namespace NewLife.Threading
{
    /// <summary>轻量级线程池。无等待和调度逻辑,直接创建线程竞争处理器资源</summary>
    public class ThreadPoolX : DisposeBase
    {
        #region 全局线程池助手
        static ThreadPoolX()
        {
            // 在这个同步异步大量混合使用的时代,需要更多的初始线程来屏蔽各种对TPL的不合理使用
            ThreadPool.GetMinThreads(out var wt, out var pt);
            if (wt < 32) ThreadPool.SetMinThreads(32, 32);
        }

        /// <summary>初始化线程池
        /// </summary>
        public static void Init() { }

        /// <summary>带异常处理的线程池任务调度,不允许异常抛出,以免造成应用程序退出</summary>
        /// <param name="callback"></param>
        [DebuggerHidden]
        public static void QueueUserWorkItem(Action callback)
        {
            if (callback == null) return;

            ThreadPool.UnsafeQueueUserWorkItem(s =>
            {
                try
                {
                    callback();
                }
                catch (Exception ex)
                {
                    XTrace.WriteException(ex);
                }
            }, null);

            //Instance.QueueWorkItem(callback);
        }

        /// <summary>带异常处理的线程池任务调度,不允许异常抛出,以免造成应用程序退出</summary>
        /// <param name="callback"></param>
        /// <param name="state"></param>
        [DebuggerHidden]
        public static void QueueUserWorkItem<T>(Action<T> callback, T state)
        {
            if (callback == null) return;

            ThreadPool.UnsafeQueueUserWorkItem(s =>
            {
                try
                {
                    callback(state);
                }
                catch (Exception ex)
                {
                    XTrace.WriteException(ex);
                }
            }, null);

            //Instance.QueueWorkItem(() => callback(state));
        }
        #endregion

        #region 静态实例
        private static ThreadPoolX _Instance;
        /// <summary>静态实例</summary>
        public static ThreadPoolX Instance
        {
            get
            {
                if (_Instance == null)
                {
                    lock (typeof(ThreadPoolX))
                    {
                        if (_Instance == null) _Instance = new ThreadPoolX();
                    }
                }

                return _Instance;
            }
        }
        #endregion

        #region 属性
        /// <summary>内部池</summary>
        public ObjectPool<ThreadItem> Pool { get; }
        #endregion

        #region 构造
        /// <summary>实例化</summary>
        public ThreadPoolX()
        {
            var pool = new MyThreadPool
            {
                Name = nameof(ThreadPoolX),
                Host = this,

                Min = Environment.ProcessorCount,
                Max = 1000,

                IdleTime = 30,
                AllIdleTime = 120,
            };
            Pool = pool;
        }

        class MyThreadPool : ObjectPool<ThreadItem>
        {
            public ThreadPoolX Host { get; set; }

            /// <summary>创建实例</summary>
            /// <returns></returns>
            protected override ThreadItem OnCreate() => new ThreadItem(Host);
        }
        #endregion

        #region 方法
        /// <summary>把委托放入线程池执行</summary>
        /// <param name="callback"></param>
        public void QueueWorkItem(Action callback)
        {
            if (callback == null) return;

            var ti = Pool.Get();
            ti.Execute(callback);
        }

        /// <summary>在线程池中异步执行任务</summary>
        /// <param name="function"></param>
        /// <returns></returns>
        public Task QueueTask(Action function)
        {
            if (function == null) return null;

            return QueueTask<Object>(token => { function(); return null; }, CancellationToken.None);
        }

        /// <summary>在线程池中异步执行任务</summary>
        /// <typeparam name="TResult"></typeparam>
        /// <param name="function"></param>
        /// <returns></returns>
        public Task<TResult> QueueTask<TResult>(Func<TResult> function)
        {
            if (function == null) return null;

            return QueueTask(token => function(), CancellationToken.None);
        }

        /// <summary>在线程池中异步执行任务</summary>
        /// <typeparam name="TResult"></typeparam>
        /// <param name="function"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public Task<TResult> QueueTask<TResult>(Func<CancellationToken, TResult> function, CancellationToken cancellationToken)
        {
            if (function == null) return null;

            var source = new TaskCompletionSource<TResult>();

            var ti = Pool.Get();
            ti.Execute(() =>
            {
                try
                {
                    var rs = function(cancellationToken);
                    source.SetResult(rs);
                }
                catch (Exception ex)
                {
                    source.SetException(ex);
                }
            });

            return source.Task;
        }
        #endregion
    }

    /// <summary>线程任务项</summary>
    public class ThreadItem : DisposeBase
    {
        #region 属性
        /// <summary>编号</summary>
        public Int32 ID { get; private set; }

        /// <summary>线程</summary>
        public Thread Thread { get; private set; }

        /// <summary>主机线程池</summary>
        public ThreadPoolX Host { get; }

        /// <summary>活跃</summary>
        public Boolean Active { get; private set; }
        #endregion

        #region 构造
        /// <summary>实例化</summary>
        public ThreadItem(ThreadPoolX host)
        {
            Host = host ?? throw new ArgumentNullException(nameof(host));

            var th = Thread = new Thread(Work)
            {
                Name = "P",
                IsBackground = true,
                //Priority = ThreadPriority.AboveNormal,
            };
            waitForTimer = new AutoResetEvent(false);
            ID = th.ManagedThreadId;

            Active = true;
            th.Start();
        }

        /// <summary>销毁</summary>
        /// <param name="disposing"></param>
        protected override void Dispose(Boolean disposing)
        {
            base.Dispose(disposing);

            try
            {
                Active = false;
                waitForTimer?.Set();

                var th = Thread;
                if (th != null && th.IsAlive) th.Abort();
            }
            catch { }
        }

        /// <summary>已重载。</summary>
        /// <returns></returns>
        public override String ToString() => "P" + ID;
        #endregion

        #region 方法
        /// <summary>执行委托</summary>
        /// <param name="callback"></param>
        public void Execute(Action callback)
        {
            _callback = callback;
            _state = 1;

            waitForTimer.Set();
        }

        private Action _callback;
        private AutoResetEvent waitForTimer;
        private Int32 _state;
        private void Work()
        {
            while (Active)
            {
                try
                {
                    _callback?.Invoke();
                }
                catch (ThreadAbortException) { break; }
                catch (ThreadInterruptedException) { break; }
                catch (Exception ex)
                {
                    XTrace.WriteException(ex);
                }

                _callback = null;

                // 回到线程池里
                if (Interlocked.CompareExchange(ref _state, 0, 1) == 1 && !Host.Pool.Put(this)) break;

                // 不能重置,如果外面先Set,这里再WaitOne,同样得到信号
                //waitForTimer.Reset();
                waitForTimer.WaitOne();
            }

            // 销毁
            Dispose();
        }
        #endregion
    }
}