保存失败时,不能简单写回去,可能因为数据库原因导致重复执行,再次错误
大石头 authored at 2018-08-23 15:47:11
7.09 KiB
X
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using NewLife;
using NewLife.Log;
using NewLife.Threading;

namespace XCode
{
    /// <summary>实体队列</summary>
    public class EntityQueue
    {
        #region 属性
        /// <summary>需要近实时保存的实体队列</summary>
        private ConcurrentDictionary<IEntity, IEntity> Entities { get; set; } = new ConcurrentDictionary<IEntity, IEntity>();

        /// <summary>需要延迟保存的实体队列</summary>
        private ConcurrentDictionary<IEntity, DateTime> DelayEntities { get; } = new ConcurrentDictionary<IEntity, DateTime>();

        /// <summary>调试开关,默认false</summary>
        public Boolean Debug { get; set; }

        /// <summary>数据访问</summary>
        public IEntitySession Session { get; }

        /// <summary>周期。默认1000毫秒,根据繁忙程度动态调节,尽量靠近每次持久化1000个对象</summary>
        public Int32 Period { get; set; } = 1000;

        /// <summary>最大个数,超过该个数时,进入队列将产生堵塞。默认10000</summary>
        public Int32 MaxEntity { get; set; } = 10_000;

        /// <summary>保存速度,每秒保存多少个实体</summary>
        public Int32 Speed { get; private set; }

        ///// <summary>完成事件。</summary>
        //public event EventHandler<EventArgs<IEntity, Int32>> Completed;

        /// <summary>错误发生时</summary>
        public event EventHandler<EventArgs<Exception>> Error;

        private TimerX _Timer;
        #endregion

        #region 构造
        /// <summary>实例化实体队列</summary>
        public EntityQueue(IEntitySession session)
        {
            Session = session;
        }
        #endregion

        #region 方法
        /// <summary>添加实体对象进入队列</summary>
        /// <param name="entity">实体对象</param>
        /// <param name="msDelay">延迟保存的时间</param>
        /// <returns>返回是否添加成功,实体对象已存在于队列中则返回false</returns>
        public Boolean Add(IEntity entity, Int32 msDelay)
        {
            // 首次使用时初始化定时器
            if (_Timer == null)
            {
                lock (this)
                {
                    if (_Timer == null)
                    {
                        _Timer = new TimerX(Work, null, Period, Period, "EQ") { Async = true };
                        _Timer.CanExecute = () => DelayEntities.Any() || Entities.Any();
                    }
                }
            }

            var rs = false;
            if (msDelay <= 0)
                rs = Entities.TryAdd(entity, entity);
            else
                rs = DelayEntities.TryAdd(entity, TimerX.Now.AddMilliseconds(msDelay));
            if (!rs) return false;

            Interlocked.Increment(ref _count);

            // 超过最大值时,堵塞一段时间,等待消费完成
            while (_count >= MaxEntity)
            {
                Thread.Sleep(100);
            }

            return true;
        }

        /// <summary>当前缓存个数</summary>
        private Int32 _count;

        private void Work(Object state)
        {
            var list = new List<IEntity>();
            var n = 0;

            // 检查是否有延迟保存
            var ds = DelayEntities;
            if (ds.Any())
            {
                var now = TimerX.Now;
                foreach (var item in ds)
                {
                    if (item.Value < now) list.Add(item.Key);

                    n++;
                }
                // 从列表删除过期
                foreach (var item in list)
                {
                    ds.Remove(item);
                }

                //n += ds.Count;
            }

            // 检查是否有近实时保存
            var es = Entities;
            if (es.Any())
            {
                // 为了速度,不拷贝,直接创建一个新的集合
                Entities = new ConcurrentDictionary<IEntity, IEntity>();
                list.AddRange(es.Keys);

                n += es.Count;
            }

            //_count = n;

            if (list.Count > 0)
            {
                //_count -= list.Count;
                Interlocked.Add(ref _count, -list.Count);

                Process(list);
            }
        }

        private void Process(Object state)
        {
            var list = state as ICollection<IEntity>;
            var ss = Session;
            var dal = ss.Dal;

            var speed = Speed;
            if (Debug || list.Count > 100_000)
            {
                var cost = speed == 0 ? 0 : list.Count * 1000 / speed;
                XTrace.WriteLine($"实体队列[{ss.TableName}/{ss.ConnName}]\t保存 {list.Count:n0}\t预测耗时 {cost:n0}ms");
            }

            var sw = Stopwatch.StartNew();

            // 分批
            var batchSize = 5000;
            for (var i = 0; i < list.Count();)
            {
                var batch = list.Skip(i).Take(batchSize).ToList();

                try
                {
                    batch.SaveWithoutValid();
                }
                catch (Exception ex)
                {
                    //// 保存失败,写回去
                    //foreach (var entity in list)
                    //{
                    //    Entities.TryAdd(entity, entity);
                    //}
                    //Interlocked.Add(ref _count, list.Count);

                    if (Error != null)
                        Error(this, new EventArgs<Exception>(ex));
                    else
                        XTrace.WriteException(ex);
                }

                i += batch.Count;
            }

            sw.Stop();

            // 根据繁忙程度动态调节
            // 大于1000个对象时,说明需要加快持久化间隔,缩小周期
            // 小于1000个对象时,说明持久化太快了,加大周期
            var p = Period;
            if (list.Count > 1000)
                p = p * 1000 / list.Count;
            else
                p = p * 1000 / list.Count;

            // 最小间隔
            if (p < 500) p = 500;
            // 最大间隔
            if (p > 5000) p = 5000;

            if (p != Period)
            {
                Period = p;
                _Timer.Period = p;
            }

            var ms = sw.Elapsed.TotalMilliseconds;
            Speed = ms == 0 ? 0 : (Int32)(list.Count * 1000 / ms);
            if (Debug || list.Count > 10000)
            {
                XTrace.WriteLine($"实体队列[{ss.TableName}/{ss.ConnName}]\t保存 {list.Count:n0}\t耗时 {ms:n0}ms\t速度 {speed:n0}tps\t周期 {p:n0}ms");
            }

            // 马上再来一次,以便于连续处理数据
            _Timer.SetNext(-1);
        }
        #endregion
    }
}