[fix]修正单机多应用注册相同服务时只能注册一次的BUG
智能大石头 authored at 2024-03-26 23:13:58
14.46 KiB
Stardust
using NewLife;
using NewLife.Log;
using NewLife.Remoting;
using NewLife.Security;
using NewLife.Serialization;
using Stardust.Data;
using Stardust.Data.Configs;
using Stardust.Data.Nodes;
using Stardust.Models;
using Stardust.Server.Models;

namespace Stardust.Server.Services;

public class RegistryService
{
    private readonly AppQueueService _queue;
    private readonly AppOnlineService _appOnline;
    private readonly ITracer _tracer;

    public RegistryService(AppQueueService queue, AppOnlineService appOnline, ITracer tracer)
    {
        _queue = queue;
        _appOnline = appOnline;
        _tracer = tracer;
    }

    public AppOnline Register(App app, AppModel inf, String ip, String clientId, String token)
    {
        if (app == null) return null;

        if (app.DisplayName.IsNullOrEmpty()) app.DisplayName = inf.AppName;
        app.UpdateIP = ip;
        app.Update();

        app.WriteHistory("Register", true, inf.ToJson(), inf.Version, ip, clientId);

        if (!inf.ClientId.IsNullOrEmpty()) clientId = inf.ClientId;

        // 更新在线记录
        var (online, _) = _appOnline.GetOnline(app, clientId, token, inf?.IP, ip);
        if (online != null)
        {
            // 关联节点,根据NodeCode匹配,如果未匹配上,则在未曾关联节点时才使用IP匹配
            var node = Node.FindByCode(inf.NodeCode);
            if (node == null && online.NodeId == 0) node = Node.SearchByIP(inf.IP).FirstOrDefault();
            if (node != null) online.NodeId = node.ID;

            if (!inf.Version.IsNullOrEmpty()) online.Version = inf.Version;
        }
        online.Update();

        //// 根据节点IP规则,自动创建节点
        //if (online.NodeId == 0)
        //{
        //    var node = GetOrAddNode(inf, online.IP, ip);
        //    if (node != null)
        //    {
        //        online.NodeId = node.ID;
        //        online.SaveAsync();
        //    }
        //}

        return online;
    }

    //public Node GetOrAddNode(AppModel inf, String localIp, String ip)
    //{
    //    // 根据节点IP规则,自动创建节点
    //    var rule = NodeResolver.Instance.Match(null, localIp);
    //    if (rule != null && rule.NewNode)
    //    {
    //        using var span = _tracer?.NewSpan("AddNodeForApp", rule);

    //        var nodes = Node.SearchByIP(localIp);
    //        if (nodes.Count == 0)
    //        {
    //            var node = new Node
    //            {
    //                Code = Rand.NextString(8),
    //                Name = rule.Name,
    //                ProductCode = inf?.AppName ?? "App",
    //                Category = rule.Category,
    //                IP = localIp,
    //                Version = inf?.Version,
    //                Enable = true,
    //            };
    //            if (node.Name.IsNullOrEmpty()) node.Name = inf?.AppName;
    //            if (node.Name.IsNullOrEmpty()) node.Name = node.Code;
    //            node.Insert();

    //            node.WriteHistory("AppAddNode", true, inf.ToJson(), ip);

    //            return node;
    //        }
    //    }

    //    return null;
    //}

    public (AppService, Boolean changed) RegisterService(App app, Service service, PublishServiceInfo model, String ip)
    {
        var clientId = model.ClientId;
        var localIp = clientId;
        if (!localIp.IsNullOrEmpty())
        {
            var p = localIp.IndexOf('@');
            if (p > 0) localIp = localIp[..p];
        }

        // 单例部署服务,每个节点只有一个实例,使用本地IP作为唯一标识,无需进程ID,减少应用服务关联数
        if (service.Singleton) clientId = localIp;

        // 所有服务
        var services = AppService.FindAllByService(service.Id);
        var changed = false;
        var svc = services.FirstOrDefault(e => e.AppId == app.Id && (e.Client == clientId || service.Singleton && !localIp.IsNullOrEmpty() && e.Client.StartsWith($"{localIp}@")));
        if (svc == null)
        {
            svc = new AppService
            {
                AppId = app.Id,
                ServiceId = service.Id,
                ServiceName = model.ServiceName,
                Client = clientId,

                CreateIP = ip,
            };
            services.Add(svc);

            changed = true;
            WriteHistory(app, "RegisterService", true, $"注册服务[{model.ServiceName}] {model.ClientId}", ip, clientId);
        }
        else
        {
            if (!svc.Enable)
            {
                svc.Enable = app.AutoActive;

                if (svc.Enable) changed = true;
            }
        }

        // 节点信息
        var olt = AppOnline.FindByClient(model.ClientId);
        if (olt != null) svc.NodeId = olt.NodeId;

        // 作用域
        if (service.UseScope)
            svc.Scope = AppRule.CheckScope(-1, ip, localIp);

        svc.Enable = app.AutoActive;
        svc.PingCount++;
        svc.Tag = model.Tag;
        svc.Version = model.Version;
        svc.OriginAddress = model.Address;

        // 地址处理。本地任意地址,更换为IP地址
        var serverAddress = "";
        if (service.Extranet)
        {
            serverAddress = ip;
        }
        else
        {
            serverAddress = model.IP;
            if (serverAddress.IsNullOrEmpty()) serverAddress = localIp;
            if (serverAddress.IsNullOrEmpty()) serverAddress = ip;
        }

        var urls = new List<String>();
        foreach (var item in serverAddress.Split(","))
        {
            var addrs = model.Address
                ?.Replace("://*", $"://{item}")
                ?.Replace("://+", $"://{item}")
                .Replace("://0.0.0.0", $"://{item}")
                .Replace("://[::]", $"://{item}")
                .Split(",");
            if (addrs != null)
            {
                foreach (var elm in addrs)
                {
                    var url = elm;
                    if (url.StartsWithIgnoreCase("http://", "https://")) url = new Uri(url).ToString().TrimEnd('/');
                    if (!urls.Contains(url)) urls.Add(url);
                }
            }
        }

        if (service.Address.IsNullOrEmpty())
        {
            if (!model.ExternalAddress.IsNullOrEmpty())
                svc.Address = model.ExternalAddress;
            else
                svc.Address = urls.Take(10).Join(",");
        }
        else
        {
            // 地址模版
            var addr = service.Address.Replace("{LocalIP}", localIp).Replace("{IP}", ip);
            if (addr.Contains("{Port}"))
            {
                var port = 0;
                foreach (var item in urls)
                {
                    var p = item.IndexOf(":", "http://".Length);
                    if (p >= 0)
                    {
                        port = item[(p + 1)..].TrimEnd('/').ToInt();
                        if (port > 0) break;
                    }
                }
                if (port > 0) addr = addr.Replace("{Port}", port + "");
            }
            svc.Address = addr;
        }

        // 无需健康监测,直接标记为健康
        if (!model.Health.IsNullOrEmpty()) service.HealthAddress = model.Health;
        if (!service.HealthCheck || service.HealthAddress.IsNullOrEmpty()) svc.Healthy = true;

        svc.Save();

        service.Providers = services.Count;
        service.Save();

        // 如果有改变,异步监测健康状况
        if (changed && service.HealthCheck && !service.HealthAddress.IsNullOrEmpty())
        {
            _ = Task.Run(() => HealthCheck(svc));
        }

        return (svc, changed);
    }

    public async Task HealthCheck(AppService svc)
    {
        var url = svc.Service?.HealthAddress;
        if (url.IsNullOrEmpty()) return;

        try
        {
            if (!url.StartsWithIgnoreCase("http://", "https://"))
            {
                var ss = svc.Address.Split(',');
                var uri = new Uri(new Uri(ss[0]), url);
                url = uri.ToString();
            }

            var http = _tracer.CreateHttpClient();
            var rs = await http.GetStringAsync(url);

            svc.Healthy = true;
            svc.CheckResult = rs;
        }
        catch (Exception ex)
        {
            svc.Healthy = false;
            svc.CheckResult = ex.ToString();

            XTrace.WriteLine("HealthCheck: {0}", url);
            XTrace.Log.Error(ex.Message);
        }

        svc.CheckTimes++;
        svc.LastCheck = DateTime.Now;
        svc.Update();
    }

    public (AppService, Boolean changed) UnregisterService(App app, Service info, PublishServiceInfo model, String ip)
    {
        // 单例部署服务,每个节点只有一个实例,使用本地IP作为唯一标识,无需进程ID,减少应用服务关联数
        var clientId = model.ClientId;
        if (info.Singleton && !clientId.IsNullOrEmpty())
        {
            var p = clientId.IndexOf('@');
            if (p > 0) clientId = clientId[..p];
        }

        // 所有服务
        var services = AppService.FindAllByService(info.Id);
        var changed = false;
        var svc = services.FirstOrDefault(e => e.AppId == app.Id && e.Client == clientId);
        if (svc != null)
        {
            //svc.Delete();
            svc.Enable = false;
            svc.Healthy = false;
            svc.Update();

            services.Remove(svc);

            changed = true;
            WriteHistory(app, "UnregisterService", true, $"服务[{model.ServiceName}]下线 {svc.Client}", ip, svc.Client);
        }

        info.Providers = services.Count;
        info.Save();

        return (svc, changed);
    }

    public ServiceModel[] ResolveService(Service service, ConsumeServiceInfo model, String scope)
    {
        var list = new List<ServiceModel>();
        var tags = model.Tag?.Split(",");

        // 该服务所有生产
        var services = AppService.FindAllByService(service.Id);
        foreach (var item in services)
        {
            // 启用,匹配规则,健康
            if (item.Enable && item.Healthy && item.Match(model.MinVersion, scope, tags))
            {
                list.Add(item.ToModel());
            }
        }

        return list.ToArray();
    }

    private void WriteHistory(App app, String action, Boolean success, String remark, String ip, String clientId)
    {
        var olt = AppOnline.FindByClient(clientId);
        var version = olt?.Version;

        var hi = AppHistory.Create(app, action, success, remark, version, Environment.MachineName, ip);
        hi.Client = clientId;
        hi.Insert();
    }

    public AppOnline Ping(App app, AppInfo inf, String ip, String clientId, String token)
    {
        if (app == null) return null;

        // 更新在线记录
        var (online, _) = _appOnline.GetOnline(app, clientId, token, inf?.IP, ip);
        if (online != null)
        {
            //online.Version = app.Version;
            online.Fill(app, inf);
            online.SaveAsync();
        }

        // 保存性能数据
        AppMeter.WriteData(app, inf, "Ping", clientId, ip);

        return online;
    }

    /// <summary>向应用发送命令</summary>
    /// <param name="app"></param>
    /// <param name="model"></param>
    /// <param name="user">创建者</param>
    /// <returns></returns>
    public async Task<AppCommand> SendCommand(App app, CommandInModel model, String user)
    {
        //if (model.Code.IsNullOrEmpty()) throw new ArgumentNullException(nameof(model.Code), "必须指定应用");
        if (model.Command.IsNullOrEmpty()) throw new ArgumentNullException(nameof(model.Command));

        var cmd = new AppCommand
        {
            AppId = app.Id,
            Command = model.Command,
            Argument = model.Argument,
            //Expire = model.Expire,
            TraceId = DefaultSpan.Current?.TraceId,

            CreateUser = user,
        };
        if (model.Expire > 0) cmd.Expire = DateTime.Now.AddSeconds(model.Expire);
        cmd.Insert();

        // 分发命令给该应用的所有实例
        var cmdModel = cmd.ToModel();
        foreach (var item in AppOnline.FindAllByApp(app.Id))
        {
            _queue.Publish(app.Name, item.Client, cmdModel);
        }

        // 挂起等待。借助redis队列,等待响应
        if (model.Timeout > 0)
        {
            var q = _queue.GetReplyQueue(cmd.Id);
            var reply = await q.TakeOneAsync(model.Timeout);
            if (reply != null)
            {
                // 埋点
                using var span = _tracer?.NewSpan($"mq:AppCommandReply", reply);

                if (reply.Status == CommandStatus.错误)
                    throw new Exception($"命令错误!{reply.Data}");
                else if (reply.Status == CommandStatus.取消)
                    throw new Exception($"命令已取消!{reply.Data}");
            }
        }

        return cmd;
    }

    public async Task<AppCommand> SendCommand(App app, String command, String argument, String user = null)
    {
        var model = new CommandInModel
        {
            Command = command,
            Argument = argument,
        };
        return await SendCommand(app, model, user);
    }

    public AppCommand CommandReply(App app, CommandReplyModel model)
    {
        var cmd = AppCommand.FindById(model.Id);
        if (cmd == null) return null;

        // 防止越权
        if (cmd.AppId != app.Id) throw new ApiException(403, $"[{app}]越权访问[{cmd.AppName}]的服务");

        cmd.Status = model.Status;
        cmd.Result = model.Data;
        cmd.Update();

        // 推入服务响应队列,让服务调用方得到响应
        _queue.Reply(model);

        return cmd;
    }

    /// <summary>通知该服务的所有消费者,服务信息有变更</summary>
    /// <param name="service"></param>
    /// <param name="command"></param>
    /// <param name="user"></param>
    public async Task NotifyConsumers(Service service, String command, String user = null)
    {
        var list = AppConsume.FindAllByService(service.Id);
        if (list.Count == 0) return;

        var appIds = list.Select(e => e.AppId).Distinct().ToArray();
        var arguments = new { service.Name, service.Address }.ToJson();

        using var span = _tracer?.NewSpan(nameof(NotifyConsumers), $"{command} appIds={appIds.Join()} user={user} arguments={arguments}");

        var ts = new List<Task>();
        foreach (var item in appIds)
        {
            var app = App.FindById(item);
            if (app != null) ts.Add(SendCommand(app, command, arguments, user));
        }

        await Task.WhenAll(ts);
    }
}