9.8.2018.0630
大石头 编写于 2018-06-30 11:15:32
X
using System;
using System.Diagnostics;
using System.Threading;
using NewLife.Collections;
using NewLife.Log;

namespace NewLife.Threading
{
    /// <summary>轻量级线程池。无等待和调度逻辑,直接创建线程竞争处理器资源</summary>
    public class ThreadPoolX : DisposeBase
    {
        #region 全局线程池助手
        /// <summary>带异常处理的线程池任务调度,不允许异常抛出,以免造成应用程序退出</summary>
        /// <param name="callback"></param>
        [DebuggerHidden]
        public static void QueueUserWorkItem(Action callback)
        {
            if (callback == null) return;

            //ThreadPool.UnsafeQueueUserWorkItem(s =>
            //{
            //    try
            //    {
            //        callback();
            //    }
            //    catch (Exception ex)
            //    {
            //        XTrace.WriteException(ex);
            //    }
            //}, null);

            Instance.QueueWorkItem(callback);
        }

        /// <summary>带异常处理的线程池任务调度,不允许异常抛出,以免造成应用程序退出</summary>
        /// <param name="callback"></param>
        /// <param name="state"></param>
        [DebuggerHidden]
        public static void QueueUserWorkItem<T>(Action<T> callback, T state)
        {
            if (callback == null) return;

            Instance.QueueWorkItem(() => callback(state));
        }
        #endregion

        #region 静态实例
        /// <summary>静态实例</summary>
        public static ThreadPoolX Instance { get; } = new ThreadPoolX();
        #endregion

        #region 属性
        /// <summary>内部池</summary>
        public ObjectPool<ThreadItem> Pool { get; }

        #endregion

        #region 构造
        /// <summary>实例化</summary>
        public ThreadPoolX()
        {
            var pool = new MyThreadPool
            {
                Name = nameof(ThreadPoolX),
                Host = this,

                Min = Environment.ProcessorCount,
                Max = 1000,

                IdleTime = 30,
                AllIdleTime = 120,
            };
            Pool = pool;
        }

        class MyThreadPool : ObjectPool<ThreadItem>
        {
            public ThreadPoolX Host { get; set; }

            /// <summary>创建实例</summary>
            /// <returns></returns>
            protected override ThreadItem OnCreate() => new ThreadItem(Host);
        }
        #endregion

        #region 方法
        /// <summary>把委托放入线程池执行</summary>
        /// <param name="callback"></param>
        public void QueueWorkItem(Action callback)
        {
            if (callback == null) return;

            var ti = Pool.Get();
            ti.Execute(callback);
        }
        #endregion
    }

    /// <summary>线程任务项</summary>
    public class ThreadItem : DisposeBase
    {
        #region 属性
        /// <summary>线程</summary>
        public Thread Thread { get; private set; }

        /// <summary>主机线程池</summary>
        public ThreadPoolX Host { get; }

        /// <summary>活跃</summary>
        public Boolean Active { get; private set; }
        #endregion

        #region 构造
        /// <summary>实例化</summary>
        public ThreadItem(ThreadPoolX host) => Host = host ?? throw new ArgumentNullException(nameof(host));

        /// <summary>销毁</summary>
        /// <param name="disposing"></param>
        protected override void OnDispose(Boolean disposing)
        {
            base.OnDispose(disposing);

            try
            {
                Active = false;
                waitForTimer?.Set();

                var th = Thread;
                if (th != null && th.IsAlive) th.Abort();
            }
            catch { }
        }
        #endregion

        #region 方法
        /// <summary>执行委托</summary>
        /// <param name="callback"></param>
        public void Execute(Action callback)
        {
            var th = Thread;
            if (th == null)
            {
                th = Thread = new Thread(Work)
                {
                    Name = "P",
                    IsBackground = true,
                    //Priority = ThreadPriority.AboveNormal,
                };
                waitForTimer = new AutoResetEvent(false);

                Active = true;
                th.Start();
            }

            _callback = callback;

            waitForTimer.Set();
        }

        private Action _callback;
        private AutoResetEvent waitForTimer;
        private void Work()
        {
            while (Active)
            {
                try
                {
                    _callback?.Invoke();
                }
                catch (ThreadAbortException) { break; }
                catch (ThreadInterruptedException) { break; }
                catch (Exception ex)
                {
                    XTrace.WriteException(ex);
                }

                _callback = null;

                // 回到线程池里
                Host.Pool.Put(this);

                waitForTimer.Reset();
                waitForTimer.WaitOne();
            }
        }
        #endregion
    }
}