beta
大石头 编写于 2024-06-20 19:38:10 大石头 提交于 2024-06-20 19:38:25
AntJob
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using NewLife;
using NewLife.Caching;
using NewLife.Data;
using NewLife.Log;
using NewLife.Serialization;
using XCode;
using XCode.Membership;

namespace AntJob.Data.Entity;

/// <summary>作业</summary>
public partial class Job : EntityBase<Job>
{
    #region 对象操作
    static Job()
    {
        // 累加字段
        var df = Meta.Factory.AdditionalFields;
        df.Add(__.Total);
        df.Add(__.Success);
        df.Add(__.Error);
        df.Add(__.Times);
        //df.Add(__.MessageCount);

        // 过滤器 UserModule、TimeModule、IPModule
        Meta.Modules.Add<UserModule>();
        Meta.Modules.Add<TimeModule>();
        Meta.Modules.Add<IPModule>();
    }

    /// <summary>验证数据,通过抛出异常的方式提示验证失败。</summary>
    /// <param name="isNew">是否插入</param>
    public override void Valid(Boolean isNew)
    {
        // 如果没有脏数据,则不需要进行任何处理
        if (!HasDirty) return;

        //if ((Mode == JobModes.Sql || Mode == JobModes.CSharp) && Data.IsNullOrEmpty())
        //    throw new ArgumentNullException(nameof(Data), $"{Mode}调度模式要求设置Data模板");

        // 参数默认值
        if (Step == 0) Step = 5;
        if (MaxRetain == 0) MaxRetain = 3;
        if (MaxIdle == 0) MaxIdle = GetDefaultIdle();

        if (isNew)
        {
            if (!Dirtys[nameof(MaxRetry)]) MaxRetry = 10;
            if (!Dirtys[nameof(MaxTime)]) MaxTime = 600;
            if (!Dirtys[nameof(ErrorDelay)]) ErrorDelay = 60;
            if (!Dirtys[nameof(MaxIdle)]) MaxIdle = GetDefaultIdle();
        }

        //// 截断错误信息,避免过长
        //var len = _.Remark.Length;
        //if (!Remark.IsNullOrEmpty() && len > 0 && Remark.Length > len) Remark = Remark.Substring(0, len);

        var app = App;
        if (isNew && app != null)
        {
            app.JobCount = FindCountByAppID(app.ID);
            app.SaveAsync();
        }
    }

    private Int32 GetDefaultIdle()
    {
        // 定时调度,取步进加一分钟
        if (Mode == JobModes.Time) return Step + 600;

        return 3600;
    }

    /// <summary>删除</summary>
    /// <returns></returns>
    protected override Int32 OnDelete()
    {
        var rs = base.OnDelete();

        var app = App;
        if (app != null)
        {
            app.JobCount = FindCountByAppID(app.ID);
            app.SaveAsync();
        }

        return rs;
    }
    #endregion

    #region 扩展属性
    ///// <summary>应用</summary>
    //[XmlIgnore]
    ////[ScriptIgnore]
    //public App App => Extends.Get(nameof(App), k => App.FindByID(AppID));

    ///// <summary>应用</summary>
    //[XmlIgnore]
    ////[ScriptIgnore]
    //[DisplayName("应用")]
    //[Map(__.AppID)]
    //public String AppName => App?.Name;
    #endregion

    #region 扩展查询
    /// <summary>根据编号查找</summary>
    /// <param name="id">编号</param>
    /// <returns>实体对象</returns>
    public static Job FindByID(Int32 id)
    {
        if (id <= 0) return null;

        //// 实体缓存
        //if (Meta.Session.Count < 1000) return Meta.Cache.Find(e => e.ID == id);

        // 单对象缓存
        return Meta.SingleCache[id];
        //return Find(_.ID == id);
    }

    /// <summary>根据应用、名称查找</summary>
    /// <param name="appid">应用</param>
    /// <param name="name">名称</param>
    /// <returns>实体对象</returns>
    public static Job FindByAppIDAndName(Int32 appid, String name)
    {
        //// 实体缓存
        //if (Meta.Session.Count < 1000) return Meta.Cache.Find(e => e.AppID == appid && e.Name == name);

        return Find(_.AppID == appid & _.Name == name);
    }

    /// <summary>根据应用查询</summary>
    /// <param name="appid"></param>
    /// <returns></returns>
    public static IList<Job> FindAllByAppID(Int32 appid)
    {
        if (appid == 0) return new List<Job>();

        // 实体缓存
        if (Meta.Session.Count < 1000) return Meta.Cache.FindAll(e => e.AppID == appid);

        return FindAll(_.AppID == appid);
    }

    /// <summary>
    /// 直接查库,不查缓存
    /// </summary>
    /// <param name="appid"></param>
    /// <returns></returns>
    public static IList<Job> FindAllByAppID2(Int32 appid)
    {
        if (appid == 0) return new List<Job>();

        return FindAll(_.AppID == appid);
    }

    /// <summary>
    /// 查询当前应用的作业数
    /// </summary>
    /// <param name="appid"></param>
    /// <returns></returns>
    public static Int32 FindCountByAppID(Int32 appid)
    {
        if (appid == 0) return 0;

        return (Int32)FindCount(_.AppID == appid);
    }
    #endregion

    #region 高级查询
    /// <summary>高级查询</summary>
    /// <param name="id"></param>
    /// <param name="appid"></param>
    /// <param name="start"></param>
    /// <param name="end"></param>
    /// <param name="mode"></param>
    /// <param name="key"></param>
    /// <param name="p"></param>
    /// <returns></returns>
    public static IEnumerable<Job> Search(Int32 id, Int32 appid, DateTime start, DateTime end, Int32 mode, String key, PageParameter p)
    {
        var exp = new WhereExpression();

        if (id > 0) exp &= _.ID == id;
        if (appid > 0) exp &= _.AppID == appid;
        if (mode > 0) exp &= _.Mode == mode;
        if (!key.IsNullOrEmpty()) exp &= _.Name.Contains(key);
        exp &= _.CreateTime.Between(start, end);

        return FindAll(exp, p);
    }
    #endregion

    #region 业务操作
    /// <summary>是否已准备就绪</summary>
    /// <returns></returns>
    public Boolean IsReady()
    {
        switch (Mode)
        {
            case JobModes.Data:
            case JobModes.Time:
                return Start.Year > 2000 && Step > 0;
            case JobModes.Message:
                return Topic.IsNullOrEmpty();
            default:
                break;
        }

        return false;
    }

    /// <summary>重置任务,让它从新开始工作</summary>
    /// <param name="days">重置到多少天之前</param>
    /// <param name="stime">开始时间(优先级低于days)</param>
    /// <param name="etime">结束时间(优先级低于days)</param>
    public void ResetTime(Int32 days, DateTime stime, DateTime etime)
    {
        if (days < 0)
        {
            Start = DateTime.MinValue;

            if (stime > DateTime.MinValue)
                Start = stime;
            End = etime;
        }
        else
            Start = DateTime.Now.Date.AddDays(-days);

        Save();
    }

    /// <summary>重置任务,让它从新开始工作</summary>
    public void ResetOther()
    {
        Total = 0;
        Success = 0;
        Times = 0;
        Speed = 0;
        Error = 0;

        Save();
    }

    /// <summary>删除过期</summary>
    /// <returns></returns>
    public Int32 DeleteItems()
    {
        // 每个作业保留1000行
        var count = JobTask.FindCountByJobId(ID);
        if (count <= 1000) return 0;

        var days = MaxRetain;
        if (days <= 0) days = 3;
        var last = JobTask.FindLastByJobId(ID, DateTime.Now.AddDays(-days));
        if (last == null) return 0;

        return JobTask.DeleteByID(ID, last.ID);
    }

    /// <summary>转模型类</summary>
    /// <returns></returns>
    public JobModel ToModel()
    {
        // 如果禁用,仅返回最简单的字段
        // 缺少开始时间赋值,会导致客户端启动校验失败,Job没有启用的状态下服务器报错无法正常启动
        if (!Enable) return new JobModel { Name = Name, Enable = Enable, Start = Start };

        return new JobModel
        {
            Name = Name,
            ClassName = ClassName,
            Enable = Enable,

            Start = Start,
            End = End,
            Topic = Topic,
            Data = Data,

            Offset = Offset,
            Step = Step,
            BatchSize = BatchSize,
            MaxTask = MaxTask,

            Mode = Mode,
        };
    }
    #endregion

    #region 申请任务
    /// <summary>用于表示切片批次的序号</summary>
    private static Int32 _idxBatch;

    /// <summary>申请任务分片</summary>
    /// <param name="server">申请任务的服务端</param>
    /// <param name="ip">申请任务的IP</param>
    /// <param name="pid">申请任务的服务端进程ID</param>
    /// <param name="count">要申请的任务个数</param>
    /// <param name="cache">缓存对象</param>
    /// <returns></returns>
    public IList<JobTask> Acquire(String server, String ip, Int32 pid, Int32 count, ICache cache)
    {
        var list = new List<JobTask>();

        if (!Enable) return list;

        var step = Step;
        if (step <= 0) step = 30;

        //// 全局锁,确保单个作业只有一个线程在分配作业
        //using var ck = cache.AcquireLock($"Job:{ID}", 5_000);

        using var ts = Meta.CreateTrans();
        var start = Start;
        for (var i = 0; i < count; i++)
        {
            if (!TrySplit(start, step, out var end)) break;

            // 创建新的分片
            var ti = new JobTask
            {
                AppID = AppID,
                JobID = ID,
                Start = start,
                End = end,
                Data = Data,
                BatchSize = BatchSize,
            };

            ti.Server = server;
            ti.ProcessID = Interlocked.Increment(ref _idxBatch);
            ti.Client = $"{ip}@{pid}";
            ti.Status = JobStatus.就绪;
            ti.CreateTime = DateTime.Now;
            ti.UpdateTime = DateTime.Now;

            //// 如果有模板,则进行计算替换
            //if (!Data.IsNullOrEmpty()) ti.Data = TemplateHelper.Build(Data, ti.Start, ti.End);

            // 重复切片判断
            var key = $"job:task:{ID}:{start:yyyyMMddHHmmss}";
            if (!cache.Add(key, ti, 30))
            {
                var ti2 = cache.Get<JobTask>(key);
                XTrace.WriteLine("[{0}]重复切片:{1}", key, ti2?.ToJson());
                using var span = DefaultTracer.Instance?.NewSpan($"job:AcquireDuplicate", ti2);
            }
            else
            {
                ti.Insert();

                list.Add(ti);
            }

            // 更新任务
            Start = end;
            start = end;
        }

        if (list.Count > 0)
        {
            // 任务需要ID,不能批量插入优化
            //list.Insert(null);

            UpdateTime = DateTime.Now;
            Save();
            ts.Commit();
        }

        return list;
    }

    /// <summary>尝试分割时间片</summary>
    /// <param name="start"></param>
    /// <param name="step"></param>
    /// <param name="end"></param>
    /// <returns></returns>
    public Boolean TrySplit(DateTime start, Int32 step, out DateTime end)
    {
        // 当前时间减去偏移量,作为当前时间。数据抽取不许超过该时间
        var now = DateTime.Now.AddSeconds(-Offset);
        // 去掉毫秒
        now = now.Trim();

        end = DateTime.MinValue;

        // 开始时间和结束时间是否越界
        if (start >= now) return false;

        if (step <= 0) step = 30;

        // 必须严格要求按照步进大小分片,除非有合适的End
        end = start.AddSeconds(step);
        // 任务结束时间超过作业结束时间时,取后者
        if (End.Year > 2000 && end > End) end = End;

        // 时间片必须严格要求按照步进大小分片,除非有合适的End
        if (Mode != JobModes.Time)
        {
            if (end > now) return false;
        }

        // 时间区间判断
        if (start >= end) return false;

        return true;
    }

    /// <summary>申请历史错误或中断的任务</summary>
    /// <param name="server">申请任务的服务端</param>
    /// <param name="ip">申请任务的IP</param>
    /// <param name="pid">申请任务的服务端进程ID</param>
    /// <param name="count">要申请的任务个数</param>
    /// <param name="cache">缓存对象</param>
    /// <returns></returns>
    public IList<JobTask> AcquireOld(String server, String ip, Int32 pid, Int32 count, ICache cache)
    {
        //// 全局锁,确保单个作业只有一个线程在分配作业
        //using var ck = cache.AcquireLock($"Job:{ID}", 5_000);

        using var ts = Meta.CreateTrans();
        var list = new List<JobTask>();

        // 查找历史错误任务
        if (ErrorDelay > 0)
        {
            var dt = DateTime.Now.AddSeconds(-ErrorDelay);
            var list2 = JobTask.Search(ID, dt, MaxRetry, new[] { JobStatus.错误, JobStatus.取消 }, count);
            if (list2.Count > 0) list.AddRange(list2);
        }

        // 查找历史中断任务,持续10分钟仍然未完成
        if (MaxTime > 0 && list.Count < count)
        {
            var dt = DateTime.Now.AddSeconds(-MaxTime);
            var list2 = JobTask.Search(ID, dt, MaxRetry, new[] { JobStatus.就绪, JobStatus.抽取中, JobStatus.处理中 }, count - list.Count);
            if (list2.Count > 0) list.AddRange(list2);
        }
        if (list.Count > 0)
        {
            foreach (var ti in list)
            {
                ti.Server = server;
                ti.ProcessID = Interlocked.Increment(ref _idxBatch);
                ti.Client = $"{ip}@{pid}";
                //ti.Status = JobStatus.就绪;
                ti.CreateTime = DateTime.Now;
                ti.UpdateTime = DateTime.Now;
            }
            list.Save();
        }

        ts.Commit();

        return list;
    }

    /// <summary>申请任务分片</summary>
    /// <param name="topic">主题</param>
    /// <param name="server">申请任务的服务端</param>
    /// <param name="ip">申请任务的IP</param>
    /// <param name="pid">申请任务的服务端进程ID</param>
    /// <param name="count">要申请的任务个数</param>
    /// <param name="cache">缓存对象</param>
    /// <returns></returns>
    public IList<JobTask> AcquireMessage(String topic, String server, String ip, Int32 pid, Int32 count, ICache cache)
    {
        // 消费消息时,保存主题
        if (Topic != topic)
        {
            Topic = topic;
            SaveAsync();
        }

        var list = new List<JobTask>();

        if (!Enable) return list;

        // 验证消息数
        var now = DateTime.Now;
        if (MessageCount == 0 && UpdateTime.AddMinutes(2) > now) return list;

        //// 全局锁,确保单个作业只有一个线程在分配作业
        //using var ck = cache.AcquireLock($"Job:{ID}", 5_000);

        using var ts = Meta.CreateTrans();
        var size = BatchSize;
        if (size == 0) size = 1;

        // 消费消息。请求任务数量=空闲线程*批大小
        var msgs = AppMessage.GetTopic(AppID, topic, now, count * size);
        if (msgs.Count > 0)
        {
            for (var i = 0; i < msgs.Count;)
            {
                var msgList = msgs.Skip(i).Take(size).ToList();
                if (msgList.Count == 0) break;

                i += msgList.Count;

                // 创建新的分片
                var ti = new JobTask
                {
                    AppID = AppID,
                    JobID = ID,
                    Data = msgList.Select(e => e.Data).ToJson(),
                    MsgCount = msgList.Count,

                    BatchSize = size,
                };

                ti.Server = server;
                ti.ProcessID = Interlocked.Increment(ref _idxBatch);
                ti.Client = $"{ip}@{pid}";
                ti.Status = JobStatus.就绪;
                ti.CreateTime = DateTime.Now;
                ti.UpdateTime = DateTime.Now;

                ti.Insert();

                // 从去重缓存去掉
                cache.Remove(msgList.Select(e => $"antjob:{AppID}:{Topic}:{e}").ToArray());

                list.Add(ti);
            }

            // 批量删除消息
            msgs.Delete();
        }

        // 更新作业下的消息数
        MessageCount = AppMessage.FindCountByAppIDAndTopic(AppID, topic);
        UpdateTime = now;
        Save();

        // 消费完成后,更新应用的消息数
        if (MessageCount == 0)
        {
            var app = App;
            if (app != null)
            {
                app.MessageCount = AppMessage.FindCountByAppID(ID);
                app.SaveAsync();
            }
        }

        ts.Commit();

        return list;
    }
    #endregion
}