[fix]修正UdpServer在接收广播时连续启动接收的错误,在StarAgent中,此时可能收到广播包,SocketFlags是Broadcast,需要清空,否则报错“参考的对象类型不支持尝试的操作”; 无需设置SocketOptionName.PacketInformation,在ReceiveMessageFromAsync时会自动设置,并且支持ipv6;
石头 authored at 2024-10-10 00:36:00 石头 committed at 2024-10-10 00:45:43
8.38 KiB
X
using System.Collections.Concurrent;
using System.Diagnostics;
using NewLife.Log;
using NewLife.Security;
using NewLife.Threading;

namespace NewLife.Model;

/// <summary>延迟队列。缓冲合并对象,批量处理</summary>
/// <remarks>
/// 借助实体字典,缓冲实体对象,定期给字典换新,实现批量处理。
/// 
/// 有可能外部拿到对象后,正在修改,内部恰巧执行批量处理,导致外部的部分修改未能得到处理。
/// 解决办法是增加一个提交机制,外部用完后提交修改,内部需要处理时,等待一个时间。
/// </remarks>
public class DeferredQueue : DisposeBase
{
    #region 属性
    /// <summary>名称</summary>
    public String Name { get; set; }

    private volatile ConcurrentDictionary<String, Object> _Entities = new();
    /// <summary>实体字典</summary>
    public ConcurrentDictionary<String, Object> Entities => _Entities;

    /// <summary>跟踪数。达到该值时输出跟踪日志,默认1000</summary>
    public Int32 TraceCount { get; set; } = 1000;

    /// <summary>周期。默认10_000毫秒</summary>
    public Int32 Period { get; set; } = 10_000;

    /// <summary>最大个数。超过该个数时,进入队列将产生堵塞。默认10_000_000</summary>
    public Int32 MaxEntity { get; set; } = 10_000_000;

    /// <summary>批大小。默认5_000</summary>
    public Int32 BatchSize { get; set; } = 5_000;

    /// <summary>等待借出对象确认修改的时间,默认3000ms</summary>
    public Int32 WaitForBusy { get; set; } = 3_000;

    /// <summary>保存速度,每秒保存多少个实体</summary>
    public Int32 Speed { get; private set; }

    /// <summary>是否异步处理。默认true表示异步处理,共用DQ定时调度;false表示同步处理,独立线程</summary>
    public Boolean Async { get; set; } = true;

    private Int32 _Times;
    /// <summary>合并保存的总次数</summary>
    public Int32 Times => _Times;

    /// <summary>批次处理成功时</summary>
    public Action<IList<Object>>? Finish;

    /// <summary>批次处理失败时</summary>
    public Action<IList<Object>, Exception>? Error;
    #endregion

    #region 构造
    /// <summary>实例化</summary>
    public DeferredQueue() => Name = GetType().Name.TrimEnd("Queue", "Actor", "Cache");

    /// <summary>销毁。统计队列销毁时保存数据</summary>
    /// <param name="disposing"></param>
    protected override void Dispose(Boolean disposing)
    {
        base.Dispose(disposing);

        _Timer.TryDispose();
        _Entities?.Clear();
    }

    /// <summary>初始化</summary>
    public void Init()
    {
        // 首次使用时初始化定时器
        if (_Timer == null)
        {
            lock (this)
            {
                _Timer ??= OnInit();
            }
        }
    }

    /// <summary>初始化</summary>
    protected virtual TimerX OnInit()
    {
        // 为了避免多队列并发,首次执行时间随机错开
        var p = Period;
        if (p > 1000) p = Rand.Next(1000, p);

        var name = Async ? "DQ" : Name;

        var timer = new TimerX(Work, null, p, Period, name)
        {
            Async = Async,
            //CanExecute = () => _Entities.Any()
        };

        // 独立调度时加大最大耗时告警
        if (!Async) timer.Scheduler.MaxCost = 30_000;

        return timer;
    }
    #endregion

    #region 方法
    /// <summary>尝试添加</summary>
    /// <param name="key"></param>
    /// <param name="value"></param>
    /// <returns></returns>
    public virtual Boolean TryAdd(String key, Object value)
    {
        Interlocked.Increment(ref _Times);

        Init();

        if (!_Entities.TryAdd(key, value)) return false;

        Interlocked.Increment(ref _count);

        // 超过最大值时,堵塞一段时间,等待消费完成
        CheckMax();

        return true;
    }

    /// <summary>获取 或 添加 实体对象,在外部修改对象值</summary>
    /// <remarks>
    /// 外部正在修改对象时,内部不允许执行批量处理
    /// </remarks>
    /// <typeparam name="T"></typeparam>
    /// <param name="key"></param>
    /// <param name="valueFactory"></param>
    /// <returns></returns>
    public virtual T? GetOrAdd<T>(String key, Func<String, T>? valueFactory = null) where T : class, new()
    {
        Interlocked.Increment(ref _Times);

        Init();

        Object? entity;
        while (!_Entities.TryGetValue(key, out entity))
        {
            if (entity == null)
            {
                if (valueFactory != null)
                    entity = valueFactory(key);
                else
                    entity = new T();
            }
            if (_Entities.TryAdd(key, entity))
            {
                Interlocked.Increment(ref _count);
                break;
            }
        }

        // 超过最大值时,堵塞一段时间,等待消费完成
        CheckMax();

        // 增加繁忙数
        Interlocked.Increment(ref _busy);

        return entity as T;
    }

    private void CheckMax()
    {
        if (_count < MaxEntity) return;

        using var span = DefaultTracer.Instance?.NewError("MaxQueueOverflow", $"延迟队列[{Name}]超过上限{MaxEntity:n0}");

        // 超过最大值时,堵塞一段时间,等待消费完成
        var t = WaitForBusy * 5;
        while (t > 0)
        {
            if (_count < MaxEntity) return;

            Thread.Sleep(100);
            t -= 100;
        }

        throw new InvalidOperationException($"The existing data amount [{_count: n0}] exceeds the maximum data amount [{MaxEntity: n0}]");
    }

    /// <summary>等待确认修改的借出对象数</summary>
    private volatile Int32 _busy;

    /// <summary>提交对象的修改,外部不再使用该对象</summary>
    /// <param name="key"></param>
    public virtual void Commit(String key)
    {
        // 减少繁忙数
        if (_busy > 0) Interlocked.Decrement(ref _busy);
    }

    /// <summary>当前缓存个数</summary>
    private Int32 _count;
    private TimerX? _Timer;

    private void Work(Object? state)
    {
        var es = _Entities;
        if (!es.Any()) return;

        _Entities = new ConcurrentDictionary<String, Object>();
        var times = _Times;

        Interlocked.Add(ref _count, -es.Count);
        Interlocked.Add(ref _Times, -times);

        // 检查繁忙数,等待外部未完成的修改
        var t = WaitForBusy;
        while (_busy > 0 && t > 0)
        {
            Thread.Sleep(100);
            t -= 100;
        }
        //_busy = 0;

        // 先取出来
        var list = es.Values.ToList();

        //if (list.Count > TraceCount)
        //{
        //    var cost = Speed == 0 ? 0 : list.Count * 1000 / Speed;
        //    XTrace.WriteLine($"延迟队列[{Name}]\t保存 {list.Count:n0}\t预测 {cost:n0}ms\t次数 {times:n0}");
        //}

        var sw = Stopwatch.StartNew();
        var total = ProcessAll(list);
        sw.Stop();

        var ms = sw.Elapsed.TotalMilliseconds;
        Speed = ms == 0 ? 0 : (Int32)(list.Count * 1000 / ms);
        if (list.Count >= TraceCount)
        {
            var sp = ms == 0 ? 0 : (Int32)(times * 1000 / ms);
            XTrace.WriteLine($"延迟队列[{Name}]\t保存 {list.Count:n0}\t耗时 {ms:n0}ms\t速度 {Speed:n0}tps\t次数 {times:n0}\t速度 {sp:n0}tps\t成功 {total:n0}");
        }
    }

    /// <summary>定时处理全部数据</summary>
    /// <param name="list"></param>
    protected virtual Int32 ProcessAll(ICollection<Object> list)
    {
        var total = 0;
        // 分批
        for (var i = 0; i < list.Count;)
        {
            var batch = list.Skip(i).Take(BatchSize).ToList();

            try
            {
                total += Process(batch);

                Finish?.Invoke(batch);
            }
            catch (Exception ex)
            {
                OnError(batch, ex);
            }

            i += batch.Count;
        }

        return total;
    }

    /// <summary>处理一批</summary>
    /// <param name="list"></param>
    public virtual Int32 Process(IList<Object> list) => 0;

    /// <summary>发生错误</summary>
    /// <param name="list"></param>
    /// <param name="ex"></param>
    protected virtual void OnError(IList<Object> list, Exception ex)
    {
        if (Error != null)
            Error(list, ex);
        else
            XTrace.WriteException(ex);
    }
    #endregion
}