正在保存时,其它线程可以继续写入队列
大石头 authored at 2018-06-06 18:29:26
6.95 KiB
X
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using NewLife;
using NewLife.Log;
using NewLife.Threading;
using XCode.DataAccessLayer;

namespace XCode
{
    /// <summary>实体队列</summary>
    public class EntityQueue
    {
        #region 属性
        /// <summary>需要近实时保存的实体队列</summary>
        private ConcurrentDictionary<IEntity, IEntity> Entities { get; set; } = new ConcurrentDictionary<IEntity, IEntity>();

        /// <summary>需要延迟保存的实体队列</summary>
        private ConcurrentDictionary<IEntity, DateTime> DelayEntities { get; } = new ConcurrentDictionary<IEntity, DateTime>();

        /// <summary>调试开关,默认false</summary>
        public Boolean Debug { get; set; }

        /// <summary>数据访问</summary>
        public IEntitySession Session { get; }

        /// <summary>周期。默认1000毫秒,根据繁忙程度动态调节,尽量靠近每次持久化1000个对象</summary>
        public Int32 Period { get; set; } = 1000;

        /// <summary>最大个数,超过该个数时,进入队列将产生堵塞。默认10000</summary>
        public Int32 MaxEntity { get; set; } = 10_000;

        /// <summary>保存速度,每秒保存多少个实体</summary>
        public Int32 Speed { get; private set; }

        ///// <summary>完成事件。</summary>
        //public event EventHandler<EventArgs<IEntity, Int32>> Completed;

        /// <summary>错误发生时</summary>
        public event EventHandler<EventArgs<Exception>> Error;

        private TimerX _Timer;
        #endregion

        #region 构造
        /// <summary>实例化实体队列</summary>
        public EntityQueue(IEntitySession session)
        {
            Session = session;
            _Timer = new TimerX(Work, null, Period, Period, "EQ") { Async = true };
        }
        #endregion

        #region 方法
        /// <summary>添加实体对象进入队列</summary>
        /// <param name="entity">实体对象</param>
        /// <param name="msDelay">延迟保存的时间</param>
        /// <returns>返回是否添加成功,实体对象已存在于队列中则返回false</returns>
        public Boolean Add(IEntity entity, Int32 msDelay)
        {
            var rs = false;
            if (msDelay <= 0)
                rs = Entities.TryAdd(entity, entity);
            else
                rs = DelayEntities.TryAdd(entity, TimerX.Now.AddMilliseconds(msDelay));
            if (!rs) return false;

            // 超过最大值时,堵塞一段时间,等待消费完成
            while (_count >= MaxEntity)
            {
                Thread.Sleep(100);
            }

            return true;
        }

        /// <summary>当前缓存个数</summary>
        private Int32 _count;

        private void Work(Object state)
        {
            var list = new List<IEntity>();
            var n = 0;

            // 检查是否有延迟保存
            var ds = DelayEntities;
            if (ds.Any())
            {
                var now = TimerX.Now;
                foreach (var item in ds)
                {
                    if (item.Value < now) list.Add(item.Key);

                    n++;
                }
                // 从列表删除过期
                foreach (var item in list)
                {
                    ds.Remove(item);
                }

                //n += ds.Count;
            }

            // 检查是否有近实时保存
            var es = Entities;
            if (es.Any())
            {
                // 为了速度,不拷贝,直接创建一个新的集合
                Entities = new ConcurrentDictionary<IEntity, IEntity>();
                list.AddRange(es.Keys);

                n += es.Count;
            }

            _count = n;

            if (list.Count > 0)
            {
                _count -= list.Count;

                Process(list);
            }
        }

        private void Process(Object state)
        {
            var list = state as ICollection<IEntity>;
            var ss = Session;
            var dal = ss.Dal;
            var useTrans = dal.DbType == DatabaseType.SQLite;

            var speed = Speed;
            if (Debug || list.Count > 100_000)
            {
                var cost = speed == 0 ? 0 : list.Count * 1000 / speed;
                XTrace.WriteLine($"实体队列[{ss.TableName}/{ss.ConnName}]\t保存 {list.Count:n0}\t预测耗时 {cost:n0}ms");
            }

            var rs = new List<Int32>();
            var sw = Stopwatch.StartNew();

            // 开启事务保存
            if (useTrans) dal.BeginTransaction();
            try
            {
                // 禁用自动关闭连接
                dal.Session.SetAutoClose(false);

                foreach (var item in list)
                {
                    try
                    {
                        // 加入队列时已经Valid一次,这里不需要再次Valid
                        rs.Add(item.SaveWithoutValid());
                    }
                    catch (Exception ex)
                    {
                        if (Error != null)
                            Error(this, new EventArgs<Exception>(ex));
                        else
                            XTrace.WriteException(ex);
                    }
                }

                if (useTrans) dal.Commit();
            }
            catch
            {
                if (useTrans) dal.Rollback();
                throw;
            }
            finally
            {
                sw.Stop();
                dal.Session.SetAutoClose(null);
            }

            // 根据繁忙程度动态调节
            // 大于1000个对象时,说明需要加快持久化间隔,缩小周期
            // 小于1000个对象时,说明持久化太快了,加大周期
            var p = Period;
            if (list.Count > 1000)
                p = p * 1000 / list.Count;
            else
                p = p * 1000 / list.Count;

            // 最小间隔
            if (p < 500) p = 500;
            // 最大间隔
            if (p > 5000) p = 5000;

            if (p != Period)
            {
                Period = p;
                _Timer.Period = p;
            }

            var ms = sw.Elapsed.TotalMilliseconds;
            Speed = ms == 0 ? 0 : (Int32)(list.Count * 1000 / ms);
            if (Debug || list.Count > 10000)
            {
                XTrace.WriteLine($"实体队列[{ss.TableName}/{ss.ConnName}]\t保存 {list.Count:n0}\t耗时 {ms:n0}ms\t速度 {speed:n0}tps\t周期 {p:n0}ms");
            }

            // 马上再来一次,以便于连续处理数据
            _Timer.SetNext(-1);
        }
        #endregion
    }
}