using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using NewLife.Log;
using NewLife.Threading;
namespace XCode
{
/// <summary>实体队列</summary>
public class EntityQueue
{
#region 属性
/// <summary>需要近实时保存的实体队列</summary>
private ConcurrentDictionary<IEntity, IEntity> Entities { get; set; } = new ConcurrentDictionary<IEntity, IEntity>();
/// <summary>需要延迟保存的实体队列</summary>
private ConcurrentDictionary<IEntity, DateTime> DelayEntities { get; } = new ConcurrentDictionary<IEntity, DateTime>();
/// <summary>调试开关,默认false</summary>
public Boolean Debug { get; set; }
/// <summary>数据会话,分表分库时使用</summary>
public IEntitySession Session { get; }
/// <summary>是否仅插入。默认false</summary>
public Boolean InsertOnly { get; set; }
/// <summary>
/// 是否显示SQL
/// </summary>
public Boolean? ShowSQL { get; set; }
/// <summary>周期。默认1000毫秒,根据繁忙程度动态调节,尽量靠近每次持久化1000个对象</summary>
public Int32 Period { get; set; } = 1000;
/// <summary>最大个数,超过该个数时,进入队列将产生堵塞。默认10000</summary>
public Int32 MaxEntity { get; set; } = 10_000;
/// <summary>保存速度,每秒保存多少个实体</summary>
public Int32 Speed { get; private set; }
private TimerX _Timer;
#endregion
#region 构造
/// <summary>实例化实体队列</summary>
public EntityQueue(IEntitySession session) => Session = session;
#endregion
#region 方法
/// <summary>添加实体对象进入队列</summary>
/// <param name="entity">实体对象</param>
/// <param name="msDelay">延迟保存的时间</param>
/// <returns>返回是否添加成功,实体对象已存在于队列中则返回false</returns>
public Boolean Add(IEntity entity, Int32 msDelay)
{
// 首次使用时初始化定时器
if (_Timer == null)
{
lock (this)
{
if (_Timer == null)
{
_Timer = new TimerX(Work, null, Period, Period, "EQ") { Async = true };
//_Timer.Scheduler.Log = XTrace.Log;
_Timer.CanExecute = () => DelayEntities.Any() || Entities.Any();
}
}
}
var rs = false;
if (msDelay <= 0)
rs = Entities.TryAdd(entity, entity);
else
rs = DelayEntities.TryAdd(entity, TimerX.Now.AddMilliseconds(msDelay));
if (!rs) return false;
Interlocked.Increment(ref _count);
// 超过最大值时,堵塞一段时间,等待消费完成
while (_count >= MaxEntity)
{
Thread.Sleep(100);
}
return true;
}
/// <summary>当前缓存个数</summary>
private Int32 _count;
private void Work(Object state)
{
var list = new List<IEntity>();
var n = 0;
// 检查是否有延迟保存
var ds = DelayEntities;
if (ds.Any())
{
var now = TimerX.Now;
foreach (var item in ds)
{
if (item.Value < now) list.Add(item.Key);
n++;
}
// 从列表删除过期
foreach (var item in list)
{
ds.Remove(item);
}
}
// 检查是否有近实时保存
var es = Entities;
if (es.Any())
{
// 为了速度,不拷贝,直接创建一个新的集合
Entities = new ConcurrentDictionary<IEntity, IEntity>();
list.AddRange(es.Keys);
n += es.Count;
}
if (list.Count > 0)
{
Interlocked.Add(ref _count, -list.Count);
Process(list);
}
}
private void Process(Object state)
{
var list = state as ICollection<IEntity>;
var ss = Session;
var speed = Speed;
if (Debug || list.Count > 100_000)
{
var cost = speed == 0 ? 0 : list.Count * 1000 / speed;
XTrace.WriteLine($"实体队列[{ss.TableName}/{ss.ConnName}]\t保存 {list.Count:n0}\t预测耗时 {cost:n0}ms");
}
var dss = ss.Dal.Session;
var old = dss.ShowSQL;
if (ShowSQL != null) dss.ShowSQL = ShowSQL.Value;
var sw = Stopwatch.StartNew();
// 分批
var batchSize = 10_000;
for (var i = 0; i < list.Count;)
{
var batch = list.Skip(i).Take(batchSize).ToList();
try
{
OnProcess(batch);
}
catch (Exception ex)
{
OnError(batch, ex);
}
i += batch.Count;
}
sw.Stop();
dss.ShowSQL = old;
// 根据繁忙程度动态调节
// 大于1000个对象时,说明需要加快持久化间隔,缩小周期
// 小于1000个对象时,说明持久化太快了,加大周期
var p = Period;
if (list.Count > 1000)
p = p * 1000 / list.Count;
else
p = p * 1000 / list.Count;
// 最小间隔
if (p < 500) p = 500;
// 最大间隔
if (p > 5000) p = 5000;
if (p != Period)
{
Period = p;
_Timer.Period = p;
}
var ms = sw.Elapsed.TotalMilliseconds;
Speed = ms == 0 ? 0 : (Int32)(list.Count * 1000 / ms);
if (Debug || list.Count > 10000)
{
XTrace.WriteLine($"实体队列[{ss.TableName}/{ss.ConnName}]\t保存 {list.Count:n0}\t耗时 {ms:n0}ms\t速度 {speed:n0}tps\t周期 {p:n0}ms");
}
// 马上再来一次,以便于连续处理数据
_Timer.SetNext(-1);
}
/// <summary>处理一批数据。插入或更新</summary>
/// <param name="batch"></param>
protected virtual void OnProcess(IList<IEntity> batch)
{
var ss = Session;
// 实体队列SaveAsync异步保存时,如果只插入表,直接走批量Insert,而不是Upsert
if (InsertOnly || ss.Table.InsertOnly)
batch.Insert(null, ss);
else
batch.SaveWithoutValid(null, ss);
}
/// <summary>发生错误</summary>
/// <param name="list"></param>
/// <param name="ex"></param>
protected virtual void OnError(IList<IEntity> list, Exception ex) => XTrace.WriteException(ex);
#endregion
}
}
|