rollback e3cbf02
Soar360 编写于 2024-05-29 11:50:17
AntJob
using AntJob.Data;
using AntJob.Handlers;
using AntJob.Providers;
using NewLife;
using NewLife.Log;
using NewLife.Model;
using NewLife.Reflection;
using NewLife.Threading;
using Stardust.Registry;

namespace AntJob;

/// <summary>作业调度器</summary>
public class Scheduler : DisposeBase
{
    #region 属性
    /// <summary>处理器集合</summary>
    public List<Handler> Handlers { get; } = [];

    /// <summary>作业提供者</summary>
    public IJobProvider Provider { get; set; }

    /// <summary>服务提供者</summary>
    public IServiceProvider ServiceProvider { get; set; }
    #endregion

    #region 构造
    /// <summary>销毁</summary>
    /// <param name="disposing"></param>
    protected override void Dispose(Boolean disposing)
    {
        base.Dispose(disposing);

        Stop();
    }
    #endregion

    #region 处理器
    /// <summary>添加处理器</summary>
    /// <param name="handler"></param>
    public void AddHandler(Handler handler) => Handlers.Add(handler);

    /// <summary>按类型添加处理器,支持依赖注入</summary>
    /// <typeparam name="T"></typeparam>
    public void AddHandler<T>() where T : Handler
    {
        var services = ObjectContainer.Current;
        var prv = ObjectContainer.Provider;
        services.AddTransient<T>();

        Handlers.Add(prv.GetService<T>());
    }
    #endregion

    #region 核心方法
    /// <summary>加入调度中心,从注册中心获取地址,自动识别RPC/Http</summary>
    /// <param name="server"></param>
    /// <param name="appId"></param>
    /// <param name="secret"></param>
    /// <param name="debug"></param>
    /// <returns></returns>
    public IJobProvider Join(String server, String appId, String secret, Boolean debug = false)
    {
        var registry = ServiceProvider?.GetService<IRegistry>();
        if (registry != null)
        {
            var svrs = registry.ResolveAddressAsync("AntServer").Result;
            if (svrs != null && svrs.Length > 0) server = svrs.Join();
        }

        if (server.IsNullOrEmpty()) return null;

        // 根据地址决定用Http还是RPC
        var servers = server.Split(",");
        if (servers.Any(e => e.StartsWithIgnoreCase("http://", "https://")))
        {
            var http = new HttpJobProvider
            {
                Debug = debug,
                Server = server,
                AppId = appId,
                Secret = secret,
            };

            // 如果有注册中心,则使用注册中心的服务发现
            if (registry != null)
            {
                //http.Client = registry.CreateForService("AntServer") as ApiHttpClient;
                //http.Client.RoundRobin = false;
            }

            Provider = http;
        }
        else
        {
            var rpc = new NetworkJobProvider
            {
                Debug = debug,
                Server = server,
                AppId = appId,
                Secret = secret,
            };

            Provider = rpc;
        }

        return Provider;
    }

    /// <summary>开始</summary>
    public void Start()
    {
        // 检查本地添加的处理器
        var hs = Handlers;
        if (hs.Count == 0) throw new ArgumentNullException(nameof(Handlers), "没有可用处理器");

        // 埋点
        Tracer ??= ServiceProvider?.GetService<ITracer>();
        using var span = Tracer?.NewSpan("job:SchedulerStart");

        // 启动作业提供者
        var prv = Provider;
        prv ??= Provider = new FileJobProvider();
        prv.Schedule ??= this;

        if (prv is ITracerFeature tf) tf.Tracer = Tracer;
        if (prv is ILogFeature lf) lf.Log = Log;

        prv.Start();

        // 获取本应用在调度中心管理的所有作业
        var jobs = prv.GetJobs();
        if (jobs == null || jobs.Length == 0) throw new Exception("调度中心没有可用作业");

        // 输出日志
        WriteLog($"启动任务调度引擎[{prv}],作业[{hs.Count}]项,定时{Period}秒");

        // 设置日志
        foreach (var handler in hs)
        {
            handler.Schedule = this;
            handler.Provider = prv;

            // 查找作业参数,分配给处理器
            var job = jobs.FirstOrDefault(e => e.Name == handler.Name);
            if (job != null && job.Mode == 0) job.Mode = handler.Mode;
            handler.Job = job;

            handler.Tracer = Tracer;
            handler.Log = Log;
            handler.Start();
        }

        // 定时执行
        if (Period > 0) _timer = new TimerX(Loop, null, 100, Period * 1000, "Job") { Async = true };
    }

    /// <summary>停止</summary>
    public void Stop()
    {
        using var span = Tracer?.NewSpan("job:SchedulerStop");

        _timer.TryDispose();
        _timer = null;

        Provider?.Stop();

        foreach (var handler in Handlers)
        {
            handler.Stop("SchedulerStop");
        }
    }

    /// <summary>任务调度</summary>
    /// <returns></returns>
    public Boolean Process()
    {
        var prv = Provider;

        // 查询所有处理器
        var hs = Handlers;

        // 拿到处理器对应的作业
        var jobs = prv.GetJobs();
        if (jobs == null) return false;

        // 运行时动态往集合里面加处理器,为了配合Sql+C#
        CheckHandlers(prv, jobs, hs);

        var flag = false;
        // 遍历处理器,给空闲的增加任务
        foreach (var handler in hs)
        {
            var job = jobs.FirstOrDefault(e => e.Name == handler.Name);
            // 找不到或者已停用
            if (job == null || !job.Enable)
            {
                if (handler.Active) handler.Stop("ConfigChanged");
                continue;
            }

            // 可能外部添加的Worker并不完整
            handler.Schedule = this;
            handler.Provider = prv;

            // 更新作业参数,并启动处理器
            handler.Job = job;
            if (job.Mode == 0) job.Mode = handler.Mode;
            if (!handler.Active) handler.Start();

            // 如果正在处理任务数没达到最大并行度,则继续安排任务
            var max = job.MaxTask;
            if (prv is NetworkJobProvider nprv)
            {
                // 如果是网络提供者,则根据在线节点数平分并行度
                var ps = nprv.Peers;
                if (ps != null && ps.Length > 0)
                {
                    max = max < ps.Length ? 1 : (Int32)Math.Round((Double)max / ps.Length);
                }
            }
            var count = max - handler.Busy;
            if (count > 0)
            {
                // 循环申请任务,喂饱处理器
                var ts = handler.Acquire(count);

                // 送给处理器处理
                for (var i = 0; i < count && ts != null && i < ts.Length; i++)
                {
                    // 准备就绪,增加Busy,避免超额分配
                    handler.Prepare(ts[i]);

                    // 使用线程池调度,避免Task排队影响使用
                    ThreadPool.QueueUserWorkItem(s => handler.Process(s as ITask), ts[i]);
                }

                if (ts != null && ts.Length > 0) flag = true;
            }
        }

        return flag;
    }

    private void CheckHandlers(IJobProvider provider, IList<IJob> jobs, IList<Handler> handlers)
    {
        foreach (var job in jobs)
        {
            var handler = handlers.FirstOrDefault(e => e.Name == job.Name);
            if (handler == null && job.Enable && !job.ClassName.IsNullOrEmpty())
            {
                using var span = Tracer?.NewSpan($"job:NewHandler", job);

                WriteLog("发现未知作业[{0}]@[{1}]", job.Name, job.ClassName);
                try
                {
                    // 实例化一个处理器
                    var type = Type.GetType(job.ClassName) ?? (handlers.Where(e => e.GetType().FullName == job.ClassName)?.FirstOrDefault()?.GetType());
                    if (type != null)
                    {
                        handler = type.CreateInstance() as Handler;
                        if (handler != null)
                        {
                            WriteLog("添加新作业[{0}]@[{1}]", job.Name, job.ClassName);

                            handler.Name = job.Name;
                            handler.Schedule = this;
                            handler.Provider = provider;

                            if (handler is MessageHandler messageHandler && !job.Topic.IsNullOrEmpty()) messageHandler.Topic = job.Topic;

                            handler.Log = Log;
                            handler.Tracer = Tracer;
                            handler.Start();

                            handlers.Add(handler);
                        }
                    }
                }
                catch (Exception ex)
                {
                    span?.SetError(ex, null);
                    XTrace.WriteException(ex);
                }
            }
        }
    }

    /// <summary>已完成</summary>
    /// <param name="ctx"></param>
    internal protected virtual void OnFinish(JobContext ctx) => _timer?.SetNext(-1);
    #endregion

    #region 定时调度
    /// <summary>定时轮询周期。默认5秒</summary>
    public Int32 Period { get; set; } = 5;

    private TimerX _timer;

    private void Loop(Object state)
    {
        // 任务调度
        var rs = Process();

        // 如果有数据,马上开始下一轮
        if (rs) TimerX.Current.SetNext(-1);
    }
    #endregion

    #region 日志
    /// <summary>性能跟踪器</summary>
    public ITracer Tracer { get; set; }

    /// <summary>日志</summary>
    public ILog Log { get; set; } = Logger.Null;

    /// <summary>写日志</summary>
    /// <param name="format"></param>
    /// <param name="args"></param>
    public void WriteLog(String format, params Object[] args) => Log?.Info(format, args);
    #endregion
}