清空队列时,把等待任务设为取消,避免死等
大石头 authored at 2019-07-10 12:24:34
6.76 KiB
X
using System;
using System.Threading;
using System.Threading.Tasks;
using NewLife.Log;
using NewLife.Threading;

namespace NewLife.Net.Handlers
{
    /// <summary>消息匹配队列接口。用于把响应数据包配对到请求包</summary>
    public interface IMatchQueue
    {
        /// <summary>加入请求队列</summary>
        /// <param name="owner">拥有者</param>
        /// <param name="request">请求消息</param>
        /// <param name="msTimeout">超时取消时间</param>
        /// <param name="source">任务源</param>
        Task<Object> Add(Object owner, Object request, Int32 msTimeout, TaskCompletionSource<Object> source);

        /// <summary>检查请求队列是否有匹配该响应的请求</summary>
        /// <param name="owner">拥有者</param>
        /// <param name="response">响应消息</param>
        /// <param name="result">任务结果</param>
        /// <param name="callback">用于检查匹配的回调</param>
        /// <returns></returns>
        Boolean Match(Object owner, Object response, Object result, Func<Object, Object, Boolean> callback);

        /// <summary>清空队列</summary>
        void Clear();
    }

    /// <summary>消息匹配队列。子类可重载以自定义请求响应匹配逻辑</summary>
    public class DefaultMatchQueue : IMatchQueue
    {
        private readonly ItemWrap[] Items = new ItemWrap[256];
        private Int32 _Count;
        private TimerX _Timer;

        /// <summary>加入请求队列</summary>
        /// <param name="owner">拥有者</param>
        /// <param name="request">请求的数据</param>
        /// <param name="msTimeout">超时取消时间</param>
        /// <param name="source">任务源</param>
        public virtual Task<Object> Add(Object owner, Object request, Int32 msTimeout, TaskCompletionSource<Object> source)
        {
            var now = TimerX.Now;

            // 控制超时时间,默认15秒
            if (msTimeout <= 10 || msTimeout >= 600_000) msTimeout = 15_000;

            var qi = new Item
            {
                Owner = owner,
                Request = request,
                EndTime = now.AddMilliseconds(msTimeout),
                Source = source,
            };

            // 加入队列
            var items = Items;
            var i = 0;
            for (i = 0; i < items.Length; ++i)
            {
                if (Interlocked.CompareExchange(ref items[i].Value, qi, null) == null) break;
            }
            if (i >= items.Length) throw new XException("匹配队列已满[{0}]", items.Length);

            Interlocked.Increment(ref _Count);

            if (_Timer == null)
            {
                lock (this)
                {
                    if (_Timer == null)
                    {
                        _Timer = new TimerX(Check, null, 1000, 1000, "Match")
                        {
                            Async = true,
                            CanExecute = () => _Count > 0
                        };
                    }
                }
            }

            return source?.Task;
        }

        /// <summary>检查请求队列是否有匹配该响应的请求</summary>
        /// <param name="owner">拥有者</param>
        /// <param name="response">响应消息</param>
        /// <param name="result">任务结果</param>
        /// <param name="callback">用于检查匹配的回调</param>
        /// <returns></returns>
        public virtual Boolean Match(Object owner, Object response, Object result, Func<Object, Object, Boolean> callback)
        {
            if (_Count <= 0) return false;

            // 直接遍历,队列不会很长
            var qs = Items;
            for (var i = 0; i < qs.Length; i++)
            {
                var qi = qs[i].Value;
                if (qi == null) continue;

                var src = qi.Source;
                if (src != null && qi.Owner == owner && callback(qi.Request, response))
                {
                    qs[i].Value = null;
                    Interlocked.Decrement(ref _Count);

                    // 异步设置完成结果,否则可能会在当前线程恢复上层await,导致堵塞当前任务
                    if (!src.Task.IsCompleted) Task.Factory.StartNew(() => src.TrySetResult(result));
                    //if (!src.Task.IsCompleted) src.TrySetResult(result);

                    return true;
                }
            }

            if (Setting.Current.Debug)
                XTrace.WriteLine("MatchQueue.Check 失败 [{0}] result={1} Items={2}", response, result, _Count);

            return false;
        }

        /// <summary>定时检查发送队列,超时未收到响应则重发</summary>
        /// <param name="state"></param>
        void Check(Object state)
        {
            if (_Count <= 0) return;

            var now = TimerX.Now;
            // 直接遍历,队列不会很长
            var qs = Items;
            for (var i = 0; i < qs.Length; i++)
            {
                var qi = qs[i].Value;
                if (qi == null) continue;

                // 过期取消
                var src = qi.Source;
                if (src != null && qi.EndTime <= now)
                {
                    qs[i].Value = null;
                    Interlocked.Decrement(ref _Count);

                    // 当前在线程池里面
                    if (!src.Task.IsCompleted) src.TrySetCanceled();
                    //if (!src.Task.IsCompleted) Task.Factory.StartNew(() => src.TrySetCanceled());
                }
            }
        }

        /// <summary>清空队列</summary>
        public virtual void Clear()
        {
            var qs = Items;
            for (var i = 0; i < qs.Length; ++i)
            {
                var qi = qs[i].Value;
                if (qi == null) continue;

                qs[i].Value = null;

                // 过期取消
                var src = qi.Source;
                if (src != null)
                {
                    Interlocked.Decrement(ref _Count);

                    // 当前在线程池里面
                    if (!src.Task.IsCompleted) src.TrySetCanceled();
                    //if (!src.Task.IsCompleted) Task.Factory.StartNew(() => src.TrySetCanceled());
                }
            }
            _Count = 0;
        }

        class Item
        {
            public Object Owner { get; set; }
            public Object Request { get; set; }
            public DateTime EndTime { get; set; }
            public TaskCompletionSource<Object> Source { get; set; }
        }
        struct ItemWrap
        {
            public Item Value;
        }
    }
}