Merge branch 'master' into business
大石头 编写于 2024-10-29 10:27:40
AntJob
using System.Diagnostics;
using System.Net;
using AntJob.Data.Entity;
using NewLife;
using NewLife.Caching;
using NewLife.Log;
using NewLife.Model;
using NewLife.Remoting;
using NewLife.Threading;
using Stardust.Registry;
using XCode;

namespace AntJob.Server;

public class Worker : IHostedService
{
    private readonly IRegistry _registry;
    private readonly ICacheProvider _cacheProvider;
    private readonly IServiceProvider _provider;
    private readonly AntJobSetting _setting;
    private readonly ITracer _tracer;

    public Worker(ICacheProvider cacheProvider, IServiceProvider provider, AntJobSetting setting, ITracer tracer)
    {
        _cacheProvider = cacheProvider;
        _provider = provider;
        _setting = setting;
        _tracer = tracer;
        _registry = provider.GetService<IRegistry>();
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        InitData();

        //var set = AntJobSetting.Current;
        var set = _setting;

        // 实例化RPC服务端,指定端口,指定ServiceProvider,用于依赖注入获取接口服务层
        var server = new ApiServer(set.Port)
        {
            ServiceProvider = _provider,
            ShowError = true,

            Tracer = _tracer,
            Log = XTrace.Log,
        };

        server.Register<AntService>();

        // 本地结点
        AntService.Local = new IPEndPoint(NetHelper.MyIP(), set.Port);

        //// 数据缓存,也用于全局锁,支持MemoryCache和Redis
        //if (_cacheProvider.Cache is not FullRedis && !set.RedisCache.IsNullOrEmpty())
        //{
        //    var redis = new Redis { Timeout = 5_000 + 1_000 };
        //    redis.Init(set.RedisCache);

        //    _cacheProvider.Cache = redis;
        //}

        server.Start();

        _clearOnlineTimer = new TimerX(ClearOnline, null, 1000, 10 * 1000) { Async = true };
        _clearItemTimer = new TimerX(ClearItems, null, 10_000, 3600_000) { Async = true };

        // 启用星尘注册中心,向注册中心注册服务,服务消费者将自动更新服务端地址列表
        if (_registry != null) await _registry.RegisterAsync("AntServer", $"tcp://*:{server.Port}");
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _clearOnlineTimer.TryDispose();
        _clearItemTimer.TryDispose();

        return Task.CompletedTask;
    }

    private static void InitData()
    {
        var set = NewLife.Setting.Current;
        if (set.IsNew)
        {
            set.DataPath = @"..\Data";

            set.Save();
        }

        var set2 = XCodeSetting.Current;
        if (set2.IsNew)
        {
            set2.Debug = true;
            set2.ShowSQL = false;
            set2.TraceSQLTime = 3000;
            //set2.SQLiteDbPath = @"..\Data";

            set2.Save();
        }

        _ = EntityFactory.InitAllAsync();
    }

    #region 清理过时
    private TimerX _clearOnlineTimer;

    //每10s清除一次UpdateTime超10分钟未更新的
    private static void ClearOnline(Object state)
    {
        var ls = AppOnline.GetOnlines(10);
        foreach (var item in ls)
        {
            item.Delete();
        }
    }
    #endregion

    #region 清理任务项
    private TimerX _clearItemTimer;

    private static void ClearItems(Object state)
    {
        // 遍历所有作业
        var p = 0;
        var rs = 0;
        var sw = Stopwatch.StartNew();
        while (true)
        {
            var list = Job.FindAll(null, null, null, p, 1000);
            if (list.Count == 0) break;

            foreach (var job in list)
            {
                try
                {
                    rs += job.DeleteItems();
                }
                catch (Exception ex)
                {
                    XTrace.WriteException(ex);
                }
            }

            if (list.Count < 1000) break;
            p += list.Count;
        }

        // 删除作业已不存在的任务
        rs += JobTask.DeleteNoJob();

        if (rs > 0)
        {
            sw.Stop();
            var ms = sw.Elapsed.TotalMilliseconds;
            var speed = rs * 1000 / ms;
            XTrace.WriteLine("共删除作业项[{0:n0}]行,耗时{1:n0}ms,速度{2:n0}tps", rs, ms, speed);
        }
    }
    #endregion
}