v2.0.2018.1227   升级XCode,全新的批量写入
大石头 authored at 2018-12-27 23:22:14
9.02 KiB
FeiXian
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using NewLife.Data;
using NewLife.Log;
using NewLife.Model;
using NewLife.Reflection;
using NewLife.Security;
using XCode;
using XCode.DataAccessLayer;

namespace FeiXian.Client
{
    delegate String GetBatchSqlDelegate(IDataTable table, IDataColumn[] columns, ICollection<String> updateColumns, ICollection<String> addColumns, IEnumerable<IExtend> list);

    /// <summary>数据模拟</summary>
    public class DataSimulate
    {
        #region 属性
        /// <summary>实体工厂</summary>
        public IEntityFactory Factory { get; set; }

        /// <summary>事务提交的批大小</summary>
        public Int32 BatchSize { get; set; } = 1000;

        /// <summary>并发线程数</summary>
        public Int32 Threads { get; set; } = 1;

        /// <summary>直接执行SQL</summary>
        public Boolean UseSql { get; set; }

        /// <summary>批量插入</summary>
        public Boolean BatchInsert { get; set; }

        /// <summary>分数</summary>
        public Int32 Score { get; private set; }

        /// <summary>总数</summary>
        public Int32 Total { get; private set; }
        #endregion

        #region 方法
        /// <summary>开始执行</summary>
        /// <param name="count"></param>
        public void Run(Int32 count)
        {
            var set = XCode.Setting.Current;
            set.TraceSQLTime = 0;

            var fact = Factory;
            var session = fact.Session;
            //var pst = fact.Persistence;
            var conn = fact.Table.ConnName;

            // 禁用自动关闭连接,避免频繁开关连接影响性能
            session.Dal.Db.AutoClose = false;
            // 关闭SQL日志
            session.Dal.Db.ShowSQL = false;
            // 不必获取自增返回值
            fact.AutoIdentity = false;

            // 使用Sql
            GetBatchSqlDelegate pst = null;
            if (UseSql)
            {
                var method = session.Dal.Session.GetType().GetMethod("GetBatchSql", BindingFlags.Instance | BindingFlags.NonPublic);
                if (method == null) throw new InvalidOperationException($"数据库[{session.Dal.DbType}]不支持获取批量Sql");

                pst = method.As<GetBatchSqlDelegate>(session.Dal.Session);
            }

            Console.WriteLine();
            WriteLog("数据模拟 Count={0:n0} Threads={1} BatchSize={2} UseSql={3} BatchInsert={4}", count, Threads, BatchSize, UseSql, BatchInsert);

            // 预热数据表
            WriteLog("{0} 已有数据:{1:n0}", fact.TableName, session.Count);

            Total = 0;

            // 准备数据
            var list = new List<IEntity>();
            var qs = new List<String>();

            var ths = Threads;
            var cpu = Environment.ProcessorCount;
            WriteLog("正在准备数据[CPU={0}]:", cpu);
            var sw = Stopwatch.StartNew();
            Parallel.For(0, cpu, n =>
            {
                try
                {
                    var es = new List<IEntity>();
                    var k = 0;
                    for (var i = n; i < count; i += cpu, k++)
                    {
                        if (k % BatchSize == 0) Console.Write(".");

                        var e = fact.Create();
                        foreach (var item in fact.Fields)
                        {
                            if (item.IsIdentity) continue;

                            if (item.Type == typeof(Int32))
                                e[item.Name] = Rand.Next();
                            else if (item.Type == typeof(String))
                                e[item.Name] = Rand.NextString(8);
                            else if (item.Type == typeof(DateTime))
                                e[item.Name] = DateTime.Now.AddSeconds(Rand.Next(-10000, 10000));
                        }
                        es.Add(e);
                    }
                    lock (list)
                    {
                        list.AddRange(es);
                    }
                }
                catch (Exception ex)
                {
                    XTrace.WriteException(ex);
                }
            });
            if (UseSql)
            {
                var columns = fact.Fields.Select(e => e.Field).Where(e => !e.Identity).ToArray();
                for (int i = 0; i < list.Count;)
                {
                    var es = list.Skip(i).Take(BatchSize).ToList();
                    var sql = pst(session.Table, columns, null, null, es);
                    qs.Add(sql);

                    i += es.Count;
                }
            }
            sw.Stop();
            Console.WriteLine();
            var ms = sw.Elapsed.TotalMilliseconds;
            WriteLog("耗时:{0:n0}ms / {1:n0}tps", ms, list.Count * 1000L / ms);

            // 反向工程建表建库
            Parallel.For(0, ths, n =>
            {
                try
                {
                    //fact.ConnName = conn + n;
                    fact.FindCount();
                }
                catch (Exception ex)
                {
                    XTrace.WriteException(ex);
                }
            });

            var useTrans = session.Dal.DbType == DatabaseType.SQLite;

            Console.WriteLine();
            WriteLog("正在准备写入[Threads={0}]:", ths);
            sw.Restart();
            Parallel.For(0, ths, n =>
            {
                //fact.ConnName = conn + n;
                //WriteLog("[{0}] 正在准备写入:{1}", n, fact.ConnName);

                var sw2 = Stopwatch.StartNew();
                var err = "";
                var k = 0;
                try
                {
                    var dal = session.Dal;
                    if (UseSql)
                    {
                        for (var i = n; i < qs.Count; i += ths)
                        {
                            dal.Session.Execute(qs[i]);
                            k += BatchSize;
                        }
                    }
                    else
                    {
                        EntityTransaction tr = null;
                        var es = new List<IEntity>();
                        for (var i = n; i < list.Count; i += ths, k++)
                        {
                            if (k % BatchSize == 0)
                            {
                                Console.Write(".");

                                if (!BatchInsert)
                                {
                                    tr?.Commit();
                                    if (useTrans) tr = session.CreateTrans();
                                }
                                else
                                {
                                    // 批量插入
                                    //es.Insert(true);
                                    es.BatchInsert(null, null);
                                    es.Clear();
                                }
                            }

                            if (BatchInsert)
                                es.Add(list[i]);
                            else
                                list[i].Insert();
                        }
                        tr?.Commit();
                        //es.Insert(true);
                        es.BatchInsert(null, null);
                    }
                }
                catch (Exception ex)
                {
                    err = ex.GetTrue()?.Message;
                }
                finally
                {
                    Total += k;
                    sw2.Stop();
                    WriteLog("[{0}] 写入完成!{1:n0}tps {2}", n, k * 1000L / sw2.Elapsed.TotalMilliseconds, err);
                }
            });

            sw.Stop();
            Console.WriteLine();
            WriteLog("数据写入完毕!");
            ms = sw.Elapsed.TotalMilliseconds;
            var speed = list.Count * 1000L / ms;
            WriteLog("{2}插入{3:n0}行数据,耗时:{0:n0}ms 速度:{1:n0}tps", ms, speed, session.Dal.DbType, Total);

            Score = (Int32)speed;

            session.ClearCache("SqlInsert");
            //var t = session.Count;
            //Thread.Sleep(100);
            WriteLog("{0} 共有数据:{1:n0}", fact.TableName, fact.FindCount());
        }
        #endregion

        #region 日志
        /// <summary>日志</summary>
        public ILog Log { get; set; } = Logger.Null;

        /// <summary>写日志</summary>
        /// <param name="format"></param>
        /// <param name="args"></param>
        public void WriteLog(String format, params Object[] args)
        {
            Log?.Info(format, args);
        }
        #endregion
    }
}