优化ETL过滤模块
大石头 编写于 2017-08-29 17:11:46
X
using System;
using System.Collections.Generic;
using System.Diagnostics;
using NewLife.Log;
using XCode.Membership;

/*
 * 数据抽取流程:
 *      Start   启动检查抽取器和抽取设置
 *      Process 大循环处理
 *          克隆一份抽取配置,抽取时会滑动到下一批
 *          Fetch   抽取一批数据,并滑动配置
 *              ProcessList 处理列表,可异步调用
 *                  OnProcess   处理列表,异步调用前,先新增配置项,以免失败
 *                      ProcessItem 处理实体
 *                      OnError     处理实体异常
 *                  OnSync      同步列表
 *                      SyncItem    同步实体
 *                          GetItem     查找或新建目标对象
 *                          SaveItem    保存目标对象
 *                  ProcessFinished 处理完成,保存统计,异步时修改配置项为成功
 *              OnError     处理列表异常
 *      Stop    停止处理
 */

namespace XCode.Transform
{
    /// <summary>数据分批统计</summary>
    /// <typeparam name="TSource">源实体类</typeparam>
    public class ETL<TSource> : ETL
        where TSource : Entity<TSource>, new()
    {
        #region 构造
        /// <summary>实例化数据抽取器</summary>
        public ETL() : base(Entity<TSource>.Meta.Factory) { }
        #endregion
    }

    /// <summary>数据抽取转换处理</summary>
    /// <remarks>
    /// ETL数据抽取可以独立使用,也可以继承扩展。
    /// 同步数据或数据分批统计。
    /// </remarks>
    public class ETL
    {
        #region 属性
        /// <summary>名称</summary>
        public String Name { get; set; }

        /// <summary>数据源抽取器</summary>
        public IExtracter Extracter { get; set; }

        /// <summary>最大错误数,连续发生多个错误时停止</summary>
        public Int32 MaxError { get; set; }

        /// <summary>当前累计连续错误次数</summary>
        private Int32 _Error;

        /// <summary>统计</summary>
        public IETLStat Stat { get; set; }

        public List<IETLModule> Modules { get; set; } = new List<IETLModule>();
        #endregion

        #region 构造
        /// <summary>实例化数据抽取器</summary>
        public ETL()
        {
            Name = GetType().Name.TrimEnd("Worker");
        }

        /// <summary>实例化数据抽取器</summary>
        /// <param name="source"></param>
        public ETL(IEntityOperate source) : this()
        {
            Extracter = new TimeExtracter { Factory = source };
        }
        #endregion

        #region 开始停止
        /// <summary>开始</summary>
        public virtual void Start()
        {
            Modules.Start();

            var ext = Extracter;
            if (ext == null) throw new ArgumentNullException(nameof(Extracter), "没有设置数据抽取器");

            //if (ext.Setting == null) ext.Setting = new ExtractSetting();
            ext.Init();

            if (Stat == null) Stat = new ETLStat();
        }

        /// <summary>停止</summary>
        public virtual void Stop()
        {
            _Inited = false;

            Modules.Stop();
        }
        #endregion

        #region 数据转换
        private Boolean _Inited;
        /// <summary>每一轮启动时</summary>
        /// <param name="set"></param>
        /// <returns></returns>
        protected virtual Boolean Init(IExtractSetting set)
        {
            WriteLog("开始处理{0},区间({1} + {3:n0}, {2})", Name, set.Start, set.End, set.Row);

            Modules.Init();

            return true;
        }

        /// <summary>抽取并处理一批数据</summary>
        /// <returns>返回抽取数据行数,没有数据返回0,初始化或配置失败返回-1</returns>
        public virtual Int32 Process()
        {
            if (!Modules.Processing()) { _Inited = false; return -1; }

            var set = Extracter.Setting;

            if (!_Inited)
            {
                if (!Init(set)) return -1;
                _Inited = true;
            }

            // 拷贝配置,支持多线程
            var set2 = set.Clone();

            var ext = Extracter;
            IList<IEntity> list = null;
            try
            {
                var sw = Stopwatch.StartNew();

                // 分批抽取
                list = Fetch(ext);
                if (list == null || list.Count == 0) return 0;
                sw.Stop();

                // 批量处理
                ProcessList(list, set2, sw.Elapsed.TotalMilliseconds);
            }
            catch (Exception ex)
            {
                ex = OnError(list, set2, ex);
                if (ex != null) throw ex;
            }

            Modules.Processed();

            return list == null ? 0 : list.Count;
        }

        /// <summary>抽取一批数据</summary>
        /// <param name="extracter"></param>
        /// <returns></returns>
        protected virtual IList<IEntity> Fetch(IExtracter extracter)
        {
            return extracter?.Fetch();
        }

        /// <summary>处理列表,传递批次配置,支持多线程和异步</summary>
        /// <remarks>
        /// 子类可以根据需要重载该方法,实现异步处理。
        /// 异步处理之前,需要先保存配置
        /// </remarks>
        /// <param name="list">实体列表</param>
        /// <param name="set">本批次配置</param>
        /// <param name="fetchCost">抽取数据耗时</param>
        protected virtual void ProcessList(IList<IEntity> list, IExtractSetting set, Double fetchCost)
        {
            var sw = Stopwatch.StartNew();

            var count = OnProcess(list, set);

            sw.Stop();

            OnFinished(list, set, count, fetchCost, sw.Elapsed.TotalMilliseconds);
        }

        /// <summary>处理列表</summary>
        /// <param name="list">实体列表</param>
        /// <param name="set">本批次配置</param>
        protected virtual Int32 OnProcess(IList<IEntity> list, IExtractSetting set)
        {
            var count = 0;
            foreach (var source in list)
            {
                try
                {
                    ProcessItem(source);

                    count++;
                }
                catch (Exception ex)
                {
                    ex = OnError(source, set, ex);
                    if (ex != null) throw ex;
                }
            }

            return count;
        }

        /// <summary>处理完成</summary>
        /// <param name="list">实体列表</param>
        /// <param name="set">本批次配置</param>
        /// <param name="success">成功行数</param>
        /// <param name="fetchCost">抽取数据耗时</param>
        /// <param name="processCost">处理数据耗时</param>
        protected virtual void OnFinished(IList<IEntity> list, IExtractSetting set, Int32 success, Double fetchCost, Double processCost)
        {
            // 累计错误清零
            _Error = 0;

            var ext = Extracter;
            var start = set.Start;
            var end = set.End;
            var row = set.Row;

            var st = Stat;
            var total = list.Count;
            st.Total += total;
            st.Success += success;
            st.Times++;

            st.Speed = processCost <= 0 ? 0 : (Int32)(total * 1000 / processCost);
            st.FetchSpeed = fetchCost <= 0 ? 0 : (Int32)(total * 1000 / fetchCost);

            if (ext is TimeExtracter time) end = time.ActualEnd;
            var ends = end > DateTime.MinValue && end < DateTime.MaxValue ? ", {0}".F(end) : "";
            WriteLog("共处理{0}行,区间({1}, {2}{3}),抓取{4:n0}ms,{5:n0}qps,处理{6:n0}ms,{7:n0}tps", total, start, row, ends, fetchCost, st.FetchSpeed, processCost, st.Speed);

            Modules.OnFinished(list, set, success, fetchCost, processCost);
        }

        /// <summary>处理单行数据</summary>
        /// <remarks>打开AutoSave时,上层ProcessList会自动保存数据</remarks>
        /// <param name="source">源实体</param>
        /// <returns></returns>
        protected virtual IEntity ProcessItem(IEntity source)
        {
            return source;
        }

        private Exception _lastError;
        /// <summary>遇到错误时如何处理</summary>
        /// <param name="source"></param>
        /// <param name="set">本批次配置</param>
        /// <param name="ex"></param>
        /// <returns></returns>
        protected virtual Exception OnError(Object source, IExtractSetting set, Exception ex)
        {
            Modules.OnError(source, set, ex);

            // 处理单个实体时的异常,会被外层捕获,需要判断跳过
            if (_lastError == ex) return ex;

            ex = ex?.GetTrue();
            if (ex == null) return null;

            _Error++;
            if (MaxError > 0 && _Error >= MaxError) return _lastError = ex;

            // 跳过错误时,把错误记下来
            var st = Stat;
            st.Error++;
            st.Message = ex.Message;

            WriteError(ex.ToString());

            return null;
        }
        #endregion

        #region 日志
        /// <summary>日志</summary>
        public NewLife.Log.ILog Log { get; set; } = Logger.Null;

        /// <summary>数据库日志提供者</summary>
        public LogProvider Provider { get; set; }

        /// <summary>写日志</summary>
        /// <param name="format"></param>
        /// <param name="args"></param>
        public void WriteLog(String format, params Object[] args)
        {
            Log?.Info(Name + " " + format, args);

            Provider?.WriteLog(Name, "处理", format.F(args));
        }

        /// <summary>写错误日志</summary>
        /// <param name="format"></param>
        /// <param name="args"></param>
        public void WriteError(String format, params Object[] args)
        {
            Log?.Error(Name + " " + format, args);

            Provider?.WriteLog(Name, "错误", format.F(args));
        }
        #endregion
    }
}