解决 args 变量没有赋值导致的参数传递失败的问题。by Soar360
大石头 authored at 2023-09-09 07:16:48
7.47 KiB
X
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using NewLife.Log;
using NewLife.Threading;

namespace XCode
{
    /// <summary>实体队列</summary>
    public class EntityQueue
    {
        #region 属性
        /// <summary>需要近实时保存的实体队列</summary>
        private ConcurrentDictionary<IEntity, IEntity> Entities { get; set; } = new ConcurrentDictionary<IEntity, IEntity>();

        /// <summary>需要延迟保存的实体队列</summary>
        private ConcurrentDictionary<IEntity, DateTime> DelayEntities { get; } = new ConcurrentDictionary<IEntity, DateTime>();

        /// <summary>调试开关,默认false</summary>
        public Boolean Debug { get; set; }

        /// <summary>数据会话,分表分库时使用</summary>
        public IEntitySession Session { get; }

        /// <summary>是否仅插入。默认false</summary>
        public Boolean InsertOnly { get; set; }

        /// <summary>
        /// 是否显示SQL
        /// </summary>
        public Boolean? ShowSQL { get; set; }

        /// <summary>周期。默认1000毫秒,根据繁忙程度动态调节,尽量靠近每次持久化1000个对象</summary>
        public Int32 Period { get; set; } = 1000;

        /// <summary>最大个数,超过该个数时,进入队列将产生堵塞。默认10000</summary>
        public Int32 MaxEntity { get; set; } = 10_000;

        /// <summary>保存速度,每秒保存多少个实体</summary>
        public Int32 Speed { get; private set; }

        private TimerX _Timer;
        #endregion

        #region 构造
        /// <summary>实例化实体队列</summary>
        public EntityQueue(IEntitySession session) => Session = session;
        #endregion

        #region 方法
        /// <summary>添加实体对象进入队列</summary>
        /// <param name="entity">实体对象</param>
        /// <param name="msDelay">延迟保存的时间</param>
        /// <returns>返回是否添加成功,实体对象已存在于队列中则返回false</returns>
        public Boolean Add(IEntity entity, Int32 msDelay)
        {
            // 首次使用时初始化定时器
            if (_Timer == null)
            {
                lock (this)
                {
                    if (_Timer == null)
                    {
                        _Timer = new TimerX(Work, null, Period, Period, "EQ") { Async = true };
                        //_Timer.Scheduler.Log = XTrace.Log;
                        _Timer.CanExecute = () => DelayEntities.Any() || Entities.Any();
                    }
                }
            }

            var rs = false;
            if (msDelay <= 0)
                rs = Entities.TryAdd(entity, entity);
            else
                rs = DelayEntities.TryAdd(entity, TimerX.Now.AddMilliseconds(msDelay));
            if (!rs) return false;

            Interlocked.Increment(ref _count);

            // 超过最大值时,堵塞一段时间,等待消费完成
            while (_count >= MaxEntity)
            {
                Thread.Sleep(100);
            }

            return true;
        }

        /// <summary>当前缓存个数</summary>
        private Int32 _count;

        private void Work(Object state)
        {
            var list = new List<IEntity>();
            var n = 0;

            // 检查是否有延迟保存
            var ds = DelayEntities;
            if (ds.Any())
            {
                var now = TimerX.Now;
                foreach (var item in ds)
                {
                    if (item.Value < now) list.Add(item.Key);

                    n++;
                }
                // 从列表删除过期
                foreach (var item in list)
                {
                    ds.Remove(item);
                }
            }

            // 检查是否有近实时保存
            var es = Entities;
            if (es.Any())
            {
                // 为了速度,不拷贝,直接创建一个新的集合
                Entities = new ConcurrentDictionary<IEntity, IEntity>();
                list.AddRange(es.Keys);

                n += es.Count;
            }

            if (list.Count > 0)
            {
                Interlocked.Add(ref _count, -list.Count);

                Process(list);
            }
        }

        private void Process(Object state)
        {
            var list = state as ICollection<IEntity>;
            var ss = Session;

            var speed = Speed;
            if (Debug || list.Count > 100_000)
            {
                var cost = speed == 0 ? 0 : list.Count * 1000 / speed;
                XTrace.WriteLine($"实体队列[{ss.TableName}/{ss.ConnName}]\t保存 {list.Count:n0}\t预测耗时 {cost:n0}ms");
            }

            var dss = ss.Dal.Session;
            var old = dss.ShowSQL;
            if (ShowSQL != null) dss.ShowSQL = ShowSQL.Value;

            var sw = Stopwatch.StartNew();

            // 分批
            var batchSize = 10_000;
            for (var i = 0; i < list.Count;)
            {
                var batch = list.Skip(i).Take(batchSize).ToList();

                try
                {
                    OnProcess(batch);
                }
                catch (Exception ex)
                {
                    OnError(batch, ex);
                }

                i += batch.Count;
            }

            sw.Stop();

            dss.ShowSQL = old;

            // 根据繁忙程度动态调节
            // 大于1000个对象时,说明需要加快持久化间隔,缩小周期
            // 小于1000个对象时,说明持久化太快了,加大周期
            var p = Period;
            if (list.Count > 1000)
                p = p * 1000 / list.Count;
            else
                p = p * 1000 / list.Count;

            // 最小间隔
            if (p < 500) p = 500;
            // 最大间隔
            if (p > 5000) p = 5000;

            if (p != Period)
            {
                Period = p;
                _Timer.Period = p;
            }

            var ms = sw.Elapsed.TotalMilliseconds;
            Speed = ms == 0 ? 0 : (Int32)(list.Count * 1000 / ms);
            if (Debug || list.Count > 10000)
            {
                XTrace.WriteLine($"实体队列[{ss.TableName}/{ss.ConnName}]\t保存 {list.Count:n0}\t耗时 {ms:n0}ms\t速度 {speed:n0}tps\t周期 {p:n0}ms");
            }

            // 马上再来一次,以便于连续处理数据
            _Timer.SetNext(-1);
        }

        /// <summary>处理一批数据。插入或更新</summary>
        /// <param name="batch"></param>
        protected virtual void OnProcess(IList<IEntity> batch)
        {
            var ss = Session;

            // 实体队列SaveAsync异步保存时,如果只插入表,直接走批量Insert,而不是Upsert
            if (InsertOnly || ss.Table.InsertOnly)
                batch.Insert(null, ss);
            else
                batch.SaveWithoutValid(null, ss);
        }

        /// <summary>发生错误</summary>
        /// <param name="list"></param>
        /// <param name="ex"></param>
        protected virtual void OnError(IList<IEntity> list, Exception ex) => XTrace.WriteException(ex);
        #endregion
    }
}