Redis
大石头 authored at 2024-10-14 10:36:03
17.73 KiB
Stardust
using NewLife;
using NewLife.Log;
using NewLife.Remoting;
using NewLife.Remoting.Models;
using NewLife.Security;
using NewLife.Serialization;
using Stardust.Data;
using Stardust.Data.Configs;
using Stardust.Data.Nodes;
using Stardust.Data.Platform;
using Stardust.Models;
using XCode;

namespace Stardust.Server.Services;

public class RegistryService
{
    private readonly AppQueueService _queue;
    private readonly AppOnlineService _appOnline;
    private readonly IPasswordProvider _passwordProvider;
    private readonly ITracer _tracer;

    public RegistryService(AppQueueService queue, AppOnlineService appOnline, IPasswordProvider passwordProvider, ITracer tracer)
    {
        _queue = queue;
        _appOnline = appOnline;
        _passwordProvider = passwordProvider;
        _tracer = tracer;
    }

    /// <summary>应用鉴权</summary>
    /// <param name="app"></param>
    /// <param name="secret"></param>
    /// <param name="ip"></param>
    /// <param name="clientId"></param>
    /// <returns></returns>
    /// <exception cref="ApiException"></exception>
    public Boolean Auth(App app, String secret, String ip, String clientId)
    {
        if (app == null) return false;

        // 检查黑白名单
        if (!app.MatchIp(ip))
            throw new ApiException(403, $"应用[{app.Name}]禁止{ip}访问!");
        if (app.Project != null && !app.Project.MatchIp(ip))
            throw new ApiException(403, $"项目[{app.Project}]禁止{ip}访问!");

        // 检查应用有效性
        if (!app.Enable) throw new ApiException(403, $"应用[{app.Name}]已禁用!");

        // 未设置密钥,直接通过
        if (app.Secret.IsNullOrEmpty()) return true;
        if (app.Secret == secret) return true;
        if (secret.IsNullOrEmpty() || !_passwordProvider.Verify(app.Secret, secret))
        {
            app.WriteHistory("应用鉴权", false, "密钥校验失败", null, ip, clientId);
            return false;
        }

        return true;
    }

    /// <summary>应用注册</summary>
    /// <param name="name"></param>
    /// <param name="secret"></param>
    /// <param name="autoRegister"></param>
    /// <param name="ip"></param>
    /// <returns></returns>
    public App Register(String name, String secret, Boolean autoRegister, String ip, String clientId)
    {
        // 查找应用
        var app = App.FindByName(name);
        // 查找或创建应用,避免多线程创建冲突
        app ??= App.GetOrAdd(name, App.FindByName, k => new App
        {
            Name = name,
            Secret = Rand.NextString(16),
            Enable = autoRegister,
        });

        app.WriteHistory("应用注册", true, $"[{app.Name}]注册成功", null, ip, clientId);

        return app;
    }

    public App Login(App app, AppModel model, String ip, StarServerSetting setting)
    {
        // 设置默认项目
        if (app.ProjectId == 0 || app.ProjectName == "默认")
        {
            var project = GalaxyProject.FindByName(model.Project);
            if (project != null)
                app.ProjectId = project.Id;
        }

        if (app.DisplayName.IsNullOrEmpty()) app.DisplayName = model.AppName;

        app.LastLogin = DateTime.Now;
        app.LastIP = ip;
        app.UpdateIP = ip;
        app.Update();

        // 登录历史
        app.WriteHistory("应用鉴权", true, $"[{app.DisplayName}/{app.Name}]鉴权成功 " + model.ToJson(false, false, false), model.Version, ip, model.ClientId);

        return app;
    }

    /// <summary>激活应用。更新在线信息和关联节点</summary>
    /// <param name="app"></param>
    /// <param name="inf"></param>
    /// <param name="ip"></param>
    /// <param name="clientId"></param>
    /// <param name="token"></param>
    /// <returns></returns>
    public AppOnline SetOnline(App app, AppModel inf, String ip, String clientId, String token)
    {
        if (app == null) return null;

        if (app.DisplayName.IsNullOrEmpty()) app.DisplayName = inf.AppName;
        app.UpdateIP = ip;
        app.Update();

        app.WriteHistory(nameof(Register), true, inf.ToJson(), inf.Version, ip, clientId);

        if (!inf.ClientId.IsNullOrEmpty()) clientId = inf.ClientId;

        // 更新在线记录
        var (online, _) = _appOnline.GetOnline(app, clientId, token, inf?.IP, ip);
        if (online != null)
        {
            // 关联节点,根据NodeCode匹配,如果未匹配上,则在未曾关联节点时才使用IP匹配
            var node = Node.FindByCode(inf.NodeCode);
            if (node == null && online.NodeId == 0) node = Node.SearchByIP(inf.IP).FirstOrDefault();
            if (node != null) online.NodeId = node.ID;

            if (!inf.Version.IsNullOrEmpty()) online.Version = inf.Version;
        }
        online.Update();

        return online;
    }

    public (AppService, Boolean changed) RegisterService(App app, Service service, PublishServiceInfo model, String ip)
    {
        var clientId = model.ClientId;
        var localIp = clientId;
        if (!localIp.IsNullOrEmpty())
        {
            var p = localIp.IndexOf('@');
            if (p > 0) localIp = localIp[..p];
        }

        // 单例部署服务,每个节点只有一个实例,使用本地IP作为唯一标识,无需进程ID,减少应用服务关联数
        if (service.Singleton) clientId = localIp;

        // 所有服务
        var services = AppService.FindAllByService(service.Id);
        var changed = false;
        var svc = services.FirstOrDefault(e => e.AppId == app.Id && (e.Client == clientId || service.Singleton && !localIp.IsNullOrEmpty() && e.Client.StartsWith($"{localIp}@")));
        if (svc == null)
        {
            svc = new AppService
            {
                AppId = app.Id,
                ServiceId = service.Id,
                ServiceName = model.ServiceName,
                Client = clientId,

                CreateIP = ip,
            };
            services.Add(svc);

            changed = true;
            WriteHistory(app, "RegisterService", true, $"注册服务[{model.ServiceName}] {model.ClientId}", ip, clientId);
        }
        else
        {
            if (!svc.Enable)
            {
                svc.Enable = app.AutoActive;

                if (svc.Enable) changed = true;
            }
        }

        // 节点信息
        var olt = AppOnline.FindByClient(model.ClientId);
        if (olt != null) svc.NodeId = olt.NodeId;

        // 作用域
        if (service.UseScope)
            svc.Scope = AppRule.CheckScope(-1, ip, localIp);

        svc.Enable = app.AutoActive;
        svc.PingCount++;
        svc.Tag = model.Tag;
        svc.Version = model.Version;
        svc.OriginAddress = model.Address;

        // 地址处理。本地任意地址,更换为IP地址
        var serverAddress = "";
        if (service.Extranet && ip != "127.0.0.1" && ip != "::1")
        {
            serverAddress = ip;
        }
        else
        {
            serverAddress = model.IP;
            if (serverAddress.IsNullOrEmpty()) serverAddress = localIp;
            if (serverAddress.IsNullOrEmpty()) serverAddress = ip;
        }

        var urls = new List<String>();
        foreach (var item in serverAddress.Split(","))
        {
            var addrs = model.Address
                ?.Replace("://*", $"://{item}")
                ?.Replace("://+", $"://{item}")
                .Replace("://0.0.0.0", $"://{item}")
                .Replace("://[::]", $"://{item}")
                .Split(",");
            if (addrs != null)
            {
                foreach (var elm in addrs)
                {
                    var url = elm;
                    if (url.StartsWithIgnoreCase("http://", "https://")) url = new Uri(url).ToString().TrimEnd('/');
                    if (!urls.Contains(url)) urls.Add(url);
                }
            }
        }

        if (service.Address.IsNullOrEmpty())
        {
            if (!model.ExternalAddress.IsNullOrEmpty())
                svc.Address = model.ExternalAddress;
            else
                svc.Address = urls.Take(10).Join(",");
        }
        else
        {
            // 地址模版
            var addr = service.Address.Replace("{LocalIP}", localIp).Replace("{IP}", ip);
            if (addr.Contains("{Port}"))
            {
                var port = 0;
                foreach (var item in urls)
                {
                    var p = item.IndexOf(":", "http://".Length);
                    if (p >= 0)
                    {
                        port = item[(p + 1)..].TrimEnd('/').ToInt();
                        if (port > 0) break;
                    }
                }
                if (port > 0) addr = addr.Replace("{Port}", port + "");
            }
            svc.Address = addr;
        }

        // 无需健康监测,直接标记为健康
        if (!model.Health.IsNullOrEmpty()) service.HealthAddress = model.Health;
        if (!service.HealthCheck || service.HealthAddress.IsNullOrEmpty()) svc.Healthy = true;

        svc.Save();

        service.Providers = services.Count;
        service.Save();

        // 如果有改变,异步监测健康状况
        if (changed && service.HealthCheck && !service.HealthAddress.IsNullOrEmpty())
        {
            _ = Task.Run(() => HealthCheck(svc));
        }

        return (svc, changed);
    }

    public async Task HealthCheck(AppService svc)
    {
        var url = svc.Service?.HealthAddress;
        if (url.IsNullOrEmpty()) return;

        try
        {
            if (!url.StartsWithIgnoreCase("http://", "https://"))
            {
                var ss = svc.Address.Split(',');
                var uri = new Uri(new Uri(ss[0]), url);
                url = uri.ToString();
            }

            var http = _tracer.CreateHttpClient();
            var rs = await http.GetStringAsync(url);

            svc.Healthy = true;
            svc.CheckResult = rs;
        }
        catch (Exception ex)
        {
            svc.Healthy = false;
            svc.CheckResult = ex.ToString();

            XTrace.WriteLine("HealthCheck: {0}", url);
            XTrace.Log.Error(ex.Message);
        }

        svc.CheckTimes++;
        svc.LastCheck = DateTime.Now;
        svc.Update();
    }

    public (AppService, Boolean changed) UnregisterService(App app, Service info, PublishServiceInfo model, String ip)
    {
        // 单例部署服务,每个节点只有一个实例,使用本地IP作为唯一标识,无需进程ID,减少应用服务关联数
        var clientId = model.ClientId;
        if (info.Singleton && !clientId.IsNullOrEmpty())
        {
            var p = clientId.IndexOf('@');
            if (p > 0) clientId = clientId[..p];
        }

        // 所有服务
        var services = AppService.FindAllByService(info.Id);
        var changed = false;
        var svc = services.FirstOrDefault(e => e.AppId == app.Id && e.Client == clientId);
        if (svc != null)
        {
            //svc.Delete();
            svc.Enable = false;
            svc.Healthy = false;
            svc.Update();

            services.Remove(svc);

            changed = true;
            WriteHistory(app, "UnregisterService", true, $"服务[{model.ServiceName}]下线 {svc.Client}", ip, svc.Client);
        }

        info.Providers = services.Count;
        info.Save();

        return (svc, changed);
    }

    public ServiceModel[] ResolveService(Service service, ConsumeServiceInfo model, String scope)
    {
        var list = new List<ServiceModel>();
        var tags = model.Tag?.Split(",");

        // 该服务所有生产
        var services = AppService.FindAllByService(service.Id);
        foreach (var item in services)
        {
            // 启用,匹配规则,健康
            if (item.Enable && item.Healthy && item.Match(model.MinVersion, scope, tags))
            {
                list.Add(item.ToModel());
            }
        }

        return list.ToArray();
    }

    private void WriteHistory(App app, String action, Boolean success, String remark, String ip, String clientId)
    {
        var olt = AppOnline.FindByClient(clientId);
        var version = olt?.Version;

        var hi = AppHistory.Create(app, action, success, remark, version, Environment.MachineName, ip);
        hi.Client = clientId;
        hi.Insert();
    }

    public AppOnline Ping(App app, AppInfo inf, String ip, String clientId, String token)
    {
        if (app == null) return null;

        // 更新在线记录
        var (online, _) = _appOnline.GetOnline(app, clientId, token, inf?.IP, ip);
        if (online != null)
        {
            //online.Version = app.Version;
            online.Fill(app, inf);
            online.SaveAsync();
        }

        // 保存性能数据
        AppMeter.WriteData(app, inf, "Ping", clientId, ip);

        return online;
    }

    private static Int32 _totalCommands;
    private static IList<AppCommand> _commands;
    private static DateTime _nextTime;

    public CommandModel[] AcquireAppCommands(Int32 appId)
    {
        // 缓存最近1000个未执行命令,用于快速过滤,避免大量节点在线时频繁查询命令表
        if (_nextTime < DateTime.Now || _totalCommands != AppCommand.Meta.Count)
        {
            _totalCommands = AppCommand.Meta.Count;
            _commands = AppCommand.AcquireCommands(-1, 1000);
            _nextTime = DateTime.Now.AddMinutes(1);
        }

        // 是否有本节点
        if (!_commands.Any(e => e.AppId == appId)) return null;

        using var span = _tracer?.NewSpan(nameof(AcquireAppCommands), new { appId });

        var cmds = AppCommand.AcquireCommands(appId, 100);
        if (cmds.Count == 0) return null;

        var rs = new List<CommandModel>();
        foreach (var item in cmds)
        {
            if (item.Times > 10 || item.Expire.Year > 2000 && item.Expire < DateTime.Now)
                item.Status = CommandStatus.取消;
            else
            {
                if (item.Status == CommandStatus.处理中 && item.UpdateTime.AddMinutes(10) < DateTime.Now) continue;

                item.Times++;
                item.Status = CommandStatus.处理中;
                rs.Add(item.ToModel());
            }
            item.UpdateTime = DateTime.Now;
        }
        cmds.Update(false);

        return rs.ToArray();
    }

    /// <summary>向应用发送命令</summary>
    /// <param name="app"></param>
    /// <param name="model"></param>
    /// <param name="user">创建者</param>
    /// <returns></returns>
    public async Task<AppCommand> SendCommand(App app, CommandInModel model, String user)
    {
        //if (model.Code.IsNullOrEmpty()) throw new ArgumentNullException(nameof(model.Code), "必须指定应用");
        if (model.Command.IsNullOrEmpty()) throw new ArgumentNullException(nameof(model.Command));

        var cmd = new AppCommand
        {
            AppId = app.Id,
            Command = model.Command,
            Argument = model.Argument,
            //Expire = model.Expire,
            TraceId = DefaultSpan.Current?.TraceId,

            CreateUser = user,
        };
        if (model.StartTime > 0) cmd.StartTime = DateTime.Now.AddSeconds(model.StartTime);
        if (model.Expire > 0) cmd.Expire = DateTime.Now.AddSeconds(model.Expire);
        cmd.Insert();

        // 分发命令给该应用的所有实例
        var cmdModel = cmd.ToModel();
        foreach (var item in AppOnline.FindAllByApp(app.Id))
        {
            _queue.Publish(app.Name, item.Client, cmdModel);
        }

        // 挂起等待。借助redis队列,等待响应
        if (model.Timeout > 0)
        {
            var q = _queue.GetReplyQueue(cmd.Id);
            var reply = await q.TakeOneAsync(model.Timeout);
            if (reply != null)
            {
                // 埋点
                using var span = _tracer?.NewSpan($"mq:AppCommandReply", reply);

                if (reply.Status == CommandStatus.错误)
                    throw new Exception($"命令错误!{reply.Data}");
                else if (reply.Status == CommandStatus.取消)
                    throw new Exception($"命令已取消!{reply.Data}");
            }
        }

        return cmd;
    }

    public async Task<AppCommand> SendCommand(App app, String command, String argument, String user = null)
    {
        var model = new CommandInModel
        {
            Command = command,
            Argument = argument,
        };
        return await SendCommand(app, model, user);
    }

    public AppCommand CommandReply(App app, CommandReplyModel model)
    {
        var cmd = AppCommand.FindById((Int32)model.Id);
        if (cmd == null) return null;

        // 防止越权
        if (cmd.AppId != app.Id) throw new ApiException(403, $"[{app}]越权访问[{cmd.AppName}]的服务");

        cmd.Status = model.Status;
        cmd.Result = model.Data;
        cmd.Update();

        // 推入服务响应队列,让服务调用方得到响应
        _queue.Reply(model);

        return cmd;
    }

    /// <summary>通知该服务的所有消费者,服务信息有变更</summary>
    /// <param name="service"></param>
    /// <param name="command"></param>
    /// <param name="user"></param>
    public async Task NotifyConsumers(Service service, String command, String user = null)
    {
        var list = AppConsume.FindAllByService(service.Id);
        if (list.Count == 0) return;

        var appIds = list.Select(e => e.AppId).Distinct().ToArray();
        var arguments = new { service.Name, service.Address }.ToJson();

        using var span = _tracer?.NewSpan(nameof(NotifyConsumers), $"{command} appIds={appIds.Join()} user={user} arguments={arguments}");

        var ts = new List<Task>();
        foreach (var item in appIds)
        {
            var app = App.FindById(item);
            if (app != null) ts.Add(SendCommand(app, command, arguments, user));
        }

        await Task.WhenAll(ts);
    }
}