优化ETL过滤模块
大石头 编写于 2017-08-29 17:11:46
X
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using NewLife;
using NewLife.Log;
using NewLife.Threading;
using XCode.DataAccessLayer;

namespace XCode
{
    /// <summary>实体队列</summary>
    public class EntityQueue
    {
        #region 属性
        /// <summary>需要近实时保存的实体队列</summary>
        private ICollection<IEntity> Entities { get; set; }

        /// <summary>需要延迟保存的实体队列</summary>
        private IDictionary<IEntity, DateTime> DelayEntities { get; } = new Dictionary<IEntity, DateTime>();

        /// <summary>调试开关,默认false</summary>
        public Boolean Debug { get; set; }

        /// <summary>数据访问</summary>
        public DAL Dal { get; set; }

        /// <summary>周期。默认1000毫秒,根据繁忙程度动态调节,尽量靠近每次持久化1000个对象</summary>
        public Int32 Period { get; set; }

        /// <summary>完成事件。</summary>
        public event EventHandler<EventArgs<IEntity, Int32>> Completed;

        private TimerX _Timer;
        #endregion

        #region 构造
        /// <summary>实例化实体队列</summary>
        public EntityQueue()
        {
            Entities = new HashSet<IEntity>();

            Period = 1000;
        }
        #endregion

        #region 方法
        /// <summary>添加实体对象进入队列</summary>
        /// <param name="entity">实体对象</param>
        /// <param name="msDelay">延迟保存的时间</param>
        /// <returns>返回是否添加成功,实体对象已存在于队列中则返回false</returns>
        public Boolean Add(IEntity entity, Int32 msDelay)
        {
            if (msDelay <= 0)
            {
                // 避免重复加入队列
                var es = Entities;
                if (es.Contains(entity)) return false;

                lock (this)
                {
                    es = Entities;
                    // 避免重复加入队列
                    if (es.Contains(entity)) return false;

                    es.Add(entity);
                }
            }
            else
            {
                var dic = DelayEntities;
                if (dic.ContainsKey(entity)) return false;

                lock (dic)
                {
                    if (dic.ContainsKey(entity)) return false;

                    dic[entity] = DateTime.Now.AddMilliseconds(msDelay);
                }
            }

            // 放到锁里面,避免重入
            if (_Timer == null)
            {
                lock (this)
                {
                    if (_Timer == null) _Timer = new TimerX(Work, null, Period, Period, "EQ");
                }
            }

            return true;
        }

        private void Work(Object state)
        {
            //if (_Running) return;

            var list = new List<IEntity>();
            // 检查是否有延迟保存
            var dic = DelayEntities;
            if (dic.Count > 0)
            {
                lock (dic)
                {
                    var now = DateTime.Now;
                    foreach (var item in dic)
                    {
                        if (item.Value < now) list.Add(item.Key);
                    }
                    // 从列表删除过期
                    foreach (var item in list)
                    {
                        dic.Remove(item);
                    }
                }
            }

            // 检查是否有近实时保存
            var es = Entities;
            if (es.Count > 0)
            {
                lock (this)
                {
                    // 为了速度,不拷贝,直接创建一个新的集合
                    es = Entities;
                    if (es.Count > 0)
                    {
                        Entities = new HashSet<IEntity>();
                        list.AddRange(es);
                    }
                }
            }

            //_Running = true;
            //Task.Factory.StartNew(Process, list).LogException();
            if (list.Count > 0) Process(list);
        }

        //private Boolean _Running;
        private void Process(Object state)
        {
            var list = state as ICollection<IEntity>;
            var dal = Dal;

            if (Debug) XTrace.WriteLine("实体队列[{0}]\t准备持久化{1}个对象", dal.ConnName, list.Count);

            var rs = new List<Int32>();
            var sw = new Stopwatch();
            if (Debug) sw.Start();

            // 开启事务保存
            dal.BeginTransaction();
            try
            {
                foreach (var item in list)
                {
                    //rs.Add(item.Save());
                    // 加入队列时已经Valid一次,这里不需要再次Valid
                    rs.Add(item.SaveWithoutValid());
                }

                dal.Commit();

                if (Debug) sw.Stop();
            }
            catch
            {
                dal.Rollback();
                throw;
            }
            finally
            {
                //_Running = false;
            }

            // 根据繁忙程度动态调节
            // 大于1000个对象时,说明需要加快持久化间隔,缩小周期
            // 小于1000个对象时,说明持久化太快了,加大周期
            var p = Period;
            if (list.Count > 1000)
                p = p * 1000 / list.Count;
            else
                p = p * 1000 / list.Count;

            // 最小间隔100毫秒
            if (p < 100) p = 100;
            // 最大间隔3秒
            if (p > 3000) p = 3000;

            if (p != Period)
            {
                Period = p;
                _Timer.Period = p;
            }

            if (Debug) XTrace.WriteLine("实体队列[{0}]\t共耗时 {1:n0}毫秒\t周期 {2:n0}毫秒", dal.ConnName, sw.ElapsedMilliseconds, p);

            if (Completed != null)
            {
                var k = 0;
                foreach (var item in list)
                {
                    Completed(this, new EventArgs<IEntity, Int32>(item, rs[k++]));
                }
            }
        }
        #endregion
    }
}