using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using NewLife.Data;
using NewLife.Log;
using NewLife.Model;
using NewLife.Reflection;
using NewLife.Security;
using XCode;
using XCode.DataAccessLayer;
namespace FeiXian.Client
{
delegate String GetBatchSqlDelegate(IDataTable table, IDataColumn[] columns, ICollection<String> updateColumns, ICollection<String> addColumns, IEnumerable<IExtend> list);
/// <summary>数据模拟</summary>
public class DataSimulate
{
#region 属性
/// <summary>实体工厂</summary>
public IEntityFactory Factory { get; set; }
/// <summary>事务提交的批大小</summary>
public Int32 BatchSize { get; set; } = 1000;
/// <summary>并发线程数</summary>
public Int32 Threads { get; set; } = 1;
/// <summary>直接执行SQL</summary>
public Boolean UseSql { get; set; }
/// <summary>批量插入</summary>
public Boolean BatchInsert { get; set; }
/// <summary>分数</summary>
public Int32 Score { get; private set; }
/// <summary>总数</summary>
public Int32 Total { get; private set; }
#endregion
#region 方法
/// <summary>开始执行</summary>
/// <param name="count"></param>
public void Run(Int32 count)
{
var set = XCode.Setting.Current;
set.TraceSQLTime = 0;
var fact = Factory;
var session = fact.Session;
//var pst = fact.Persistence;
var conn = fact.Table.ConnName;
// 禁用自动关闭连接,避免频繁开关连接影响性能
session.Dal.Db.AutoClose = false;
// 关闭SQL日志
session.Dal.Db.ShowSQL = false;
// 不必获取自增返回值
fact.AutoIdentity = false;
// 使用Sql
GetBatchSqlDelegate pst = null;
if (UseSql)
{
var method = session.Dal.Session.GetType().GetMethod("GetBatchSql", BindingFlags.Instance | BindingFlags.NonPublic);
if (method == null) throw new InvalidOperationException($"数据库[{session.Dal.DbType}]不支持获取批量Sql");
pst = method.As<GetBatchSqlDelegate>(session.Dal.Session);
}
Console.WriteLine();
WriteLog("数据模拟 Count={0:n0} Threads={1} BatchSize={2} UseSql={3} BatchInsert={4}", count, Threads, BatchSize, UseSql, BatchInsert);
// 预热数据表
WriteLog("{0} 已有数据:{1:n0}", fact.TableName, session.Count);
Total = 0;
// 准备数据
var list = new List<IEntity>();
var qs = new List<String>();
var ths = Threads;
var cpu = Environment.ProcessorCount;
WriteLog("正在准备数据[CPU={0}]:", cpu);
var sw = Stopwatch.StartNew();
Parallel.For(0, cpu, n =>
{
try
{
var es = new List<IEntity>();
var k = 0;
for (var i = n; i < count; i += cpu, k++)
{
if (k % BatchSize == 0) Console.Write(".");
var e = fact.Create();
foreach (var item in fact.Fields)
{
if (item.IsIdentity) continue;
if (item.Type == typeof(Int32))
e[item.Name] = Rand.Next();
else if (item.Type == typeof(String))
e[item.Name] = Rand.NextString(8);
else if (item.Type == typeof(DateTime))
e[item.Name] = DateTime.Now.AddSeconds(Rand.Next(-10000, 10000));
}
es.Add(e);
}
lock (list)
{
list.AddRange(es);
}
}
catch (Exception ex)
{
XTrace.WriteException(ex);
}
});
if (UseSql)
{
var columns = fact.Fields.Select(e => e.Field).Where(e => !e.Identity).ToArray();
for (int i = 0; i < list.Count;)
{
var es = list.Skip(i).Take(BatchSize).ToList();
var sql = pst(session.Table, columns, null, null, es);
qs.Add(sql);
i += es.Count;
}
}
sw.Stop();
Console.WriteLine();
var ms = sw.Elapsed.TotalMilliseconds;
WriteLog("耗时:{0:n0}ms / {1:n0}tps", ms, list.Count * 1000L / ms);
// 反向工程建表建库
Parallel.For(0, ths, n =>
{
try
{
//fact.ConnName = conn + n;
fact.FindCount();
}
catch (Exception ex)
{
XTrace.WriteException(ex);
}
});
var useTrans = session.Dal.DbType == DatabaseType.SQLite;
Console.WriteLine();
WriteLog("正在准备写入[Threads={0}]:", ths);
sw.Restart();
Parallel.For(0, ths, n =>
{
//fact.ConnName = conn + n;
//WriteLog("[{0}] 正在准备写入:{1}", n, fact.ConnName);
var sw2 = Stopwatch.StartNew();
var err = "";
var k = 0;
try
{
var dal = session.Dal;
if (UseSql)
{
for (var i = n; i < qs.Count; i += ths)
{
dal.Session.Execute(qs[i]);
k += BatchSize;
}
}
else
{
EntityTransaction tr = null;
var es = new List<IEntity>();
for (var i = n; i < list.Count; i += ths, k++)
{
if (k % BatchSize == 0)
{
Console.Write(".");
if (!BatchInsert)
{
tr?.Commit();
if (useTrans) tr = session.CreateTrans();
}
else
{
// 批量插入
//es.Insert(true);
es.BatchInsert(null, null);
es.Clear();
}
}
if (BatchInsert)
es.Add(list[i]);
else
list[i].Insert();
}
tr?.Commit();
//es.Insert(true);
es.BatchInsert(null, null);
}
}
catch (Exception ex)
{
err = ex.GetTrue()?.Message;
}
finally
{
Total += k;
sw2.Stop();
WriteLog("[{0}] 写入完成!{1:n0}tps {2}", n, k * 1000L / sw2.Elapsed.TotalMilliseconds, err);
}
});
sw.Stop();
Console.WriteLine();
WriteLog("数据写入完毕!");
ms = sw.Elapsed.TotalMilliseconds;
var speed = list.Count * 1000L / ms;
WriteLog("{2}插入{3:n0}行数据,耗时:{0:n0}ms 速度:{1:n0}tps", ms, speed, session.Dal.DbType, Total);
Score = (Int32)speed;
session.ClearCache("SqlInsert");
//var t = session.Count;
//Thread.Sleep(100);
WriteLog("{0} 共有数据:{1:n0}", fact.TableName, fact.FindCount());
}
#endregion
#region 日志
/// <summary>日志</summary>
public ILog Log { get; set; } = Logger.Null;
/// <summary>写日志</summary>
/// <param name="format"></param>
/// <param name="args"></param>
public void WriteLog(String format, params Object[] args)
{
Log?.Info(format, args);
}
#endregion
}
}
|