NewLife/Stardust

[WIP] Add automatic node addition for Redis monitoring (#127)

* Initial plan

* 实现Redis集群和哨兵节点自动发现功能

Co-authored-by: nnhy <506367+nnhy@users.noreply.github.com>

* Changes before error encountered

Co-authored-by: nnhy <506367+nnhy@users.noreply.github.com>

* 修复代码审查问题:名称长度限制、并发冲突、哨兵节点发现、显式过滤

Co-authored-by: nnhy <506367+nnhy@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: nnhy <506367+nnhy@users.noreply.github.com>
Copilot authored at 2026-02-13 08:31:20 GitHub committed at 2026-02-13 08:31:20
a7e3a5b
Tree
1 Parent(s) 9d525c6
Summary: 1 changed files with 280 additions and 2 deletions.
Modified +280 -2
Modified +280 -2
diff --git a/Stardust.Server/Services/RedisService.cs b/Stardust.Server/Services/RedisService.cs
index 204a6e9..088cfbc 100644
--- a/Stardust.Server/Services/RedisService.cs
+++ b/Stardust.Server/Services/RedisService.cs
@@ -6,6 +6,7 @@ using NewLife.Serialization;
 using NewLife.Threading;
 using Stardust.Data.Models;
 using Stardust.Data.Nodes;
+using XCode.Membership;
 
 namespace Stardust.Server.Services;
 
@@ -25,9 +26,9 @@ public class RedisService : IHostedService, IRedisService
     private TimerX _traceQueue;
     private readonly ICache _cache = new MemoryCache();
     private readonly ITracer _tracer;
-    private readonly ILog _log;
+    private readonly NewLife.Log.ILog _log;
 
-    public RedisService(ITracer tracer, ILog log)
+    public RedisService(ITracer tracer, NewLife.Log.ILog log)
     {
         _tracer = tracer;
         _log = log;
@@ -140,6 +141,9 @@ public class RedisService : IHostedService, IRedisService
 
         // 扫描队列
         if (node.ScanQueue && dbs != null && dbs.Length > 0) ScanQueue(node, dbs);
+
+        // 自动发现集群和哨兵节点
+        DiscoverNodes(node, rds);
     }
 
     private void ScanQueue(RedisNode node, RedisDbEntry[] dbs)
@@ -364,4 +368,278 @@ public class RedisService : IHostedService, IRedisService
                 break;
         }
     }
+
+    /// <summary>自动发现Redis集群和哨兵节点</summary>
+    /// <param name="node">当前Redis节点</param>
+    /// <param name="rds">Redis客户端</param>
+    private void DiscoverNodes(RedisNode node, FullRedis rds)
+    {
+        if (node.Mode.IsNullOrEmpty()) return;
+
+        try
+        {
+            if (node.Mode.EqualIgnoreCase("cluster"))
+            {
+                DiscoverClusterNodes(node, rds);
+            }
+            else if (node.Mode.EqualIgnoreCase("sentinel"))
+            {
+                DiscoverSentinelNodes(node, rds);
+            }
+        }
+        catch (Exception ex)
+        {
+            XTrace.WriteException(ex);
+        }
+    }
+
+    /// <summary>发现集群节点</summary>
+    /// <param name="node">当前Redis节点</param>
+    /// <param name="rds">Redis客户端</param>
+    private void DiscoverClusterNodes(RedisNode node, FullRedis rds)
+    {
+        using var span = _tracer?.NewSpan("RedisService-DiscoverClusterNodes", node);
+
+        // 执行 CLUSTER NODES 命令获取集群节点信息
+        var result = rds.Execute(null, (r, k) => r.Execute<String>("CLUSTER", "NODES"));
+        if (result.IsNullOrEmpty()) return;
+
+        var lines = result.Split('\n').Where(line => !line.IsNullOrWhiteSpace());
+        foreach (var line in lines)
+        {
+            // 每行格式: <id> <ip:port@cport> <flags> <master> <ping-sent> <pong-recv> <config-epoch> <link-state> <slot> <slot> ... <slot>
+            var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
+            if (parts.Length < 3) continue;
+
+            // 解析地址信息 ip:port@cport 或 ip:port
+            var address = parts[1];
+            var atPos = address.IndexOf('@');
+            if (atPos > 0) address = address[..atPos];
+
+            // flags 字段用于判断是否为当前节点(包含 "myself")
+            var flags = parts[2];
+            if (flags.IndexOf("myself", StringComparison.OrdinalIgnoreCase) >= 0) continue;
+
+            // 检查节点是否已存在
+            var existingNode = RedisNode.FindByServer(address);
+            if (existingNode != null) continue;
+
+            // 生成长度受控的节点名称,避免超过 RedisNode.Name 的最大长度(50)
+            var name = !node.Name.IsNullOrEmpty() ? $"{node.Name}-{address}" : address;
+            if (name.Length > 50) name = name[..50];
+
+            // 创建新节点,捕获唯一索引冲突
+            try
+            {
+                var newNode = new RedisNode
+                {
+                    Name = name,
+                    Category = node.Category,
+                    Server = address,
+                    UserName = node.UserName,
+                    Password = node.Password,
+                    ProjectId = node.ProjectId,
+                    Enable = true,
+                    ScanQueue = node.ScanQueue,
+                    WebHook = node.WebHook,
+                    AlarmMemoryRate = node.AlarmMemoryRate,
+                    AlarmConnections = node.AlarmConnections,
+                    AlarmSpeed = node.AlarmSpeed,
+                    AlarmInputKbps = node.AlarmInputKbps,
+                    AlarmOutputKbps = node.AlarmOutputKbps,
+                };
+                newNode.Insert();
+
+                XTrace.WriteLine("自动添加集群节点: {0}", address);
+                WriteLog("DiscoverClusterNodes", true, $"自动添加集群节点 [{address}] 从 [{node.Server}]");
+            }
+            catch (Exception ex)
+            {
+                // 可能是并发插入导致的唯一索引冲突,忽略
+                if (!ex.Message.Contains("duplicate") && !ex.Message.Contains("唯一"))
+                {
+                    XTrace.WriteException(ex);
+                }
+            }
+        }
+    }
+
+    /// <summary>发现哨兵节点</summary>
+    /// <param name="node">当前Redis节点</param>
+    /// <param name="rds">Redis客户端</param>
+    private void DiscoverSentinelNodes(RedisNode node, FullRedis rds)
+    {
+        using var span = _tracer?.NewSpan("RedisService-DiscoverSentinelNodes", node);
+
+        try
+        {
+            // 获取所有master
+            var masters = rds.Execute(null, (r, k) => r.Execute<Object[]>("SENTINEL", "MASTERS"));
+            if (masters != null && masters.Length > 0)
+            {
+                DiscoverSentinelMasters(node, rds, masters);
+            }
+        }
+        catch (Exception ex)
+        {
+            XTrace.WriteException(ex);
+        }
+    }
+
+    /// <summary>发现哨兵主从节点</summary>
+    /// <param name="node">当前Redis节点</param>
+    /// <param name="rds">Redis客户端</param>
+    /// <param name="masters">主节点列表</param>
+    private void DiscoverSentinelMasters(RedisNode node, FullRedis rds, Object[] masters)
+    {
+        var masterObjs = masters.Where(m => m is Object[] arr && arr.Length > 0);
+        foreach (var masterObj in masterObjs)
+        {
+            var master = (Object[])masterObj;
+
+            // 解析master信息,格式为key-value对
+            var masterInfo = ParseRedisArray(master);
+            if (!masterInfo.TryGetValue("name", out var masterName)) continue;
+
+            // 添加master节点
+            if (masterInfo.TryGetValue("ip", out var ip) && masterInfo.TryGetValue("port", out var port))
+            {
+                var address = $"{ip}:{port}";
+                AddRedisNode(node, address, "master");
+            }
+
+            // 获取该master的所有slaves
+            try
+            {
+                var slaves = rds.Execute(null, (r, k) => r.Execute<Object[]>("SENTINEL", "SLAVES", masterName));
+                if (slaves != null && slaves.Length > 0)
+                {
+                    ProcessSentinelList(node, slaves, "slave");
+                }
+            }
+            catch (Exception ex)
+            {
+                XTrace.Log.Debug("获取从节点失败 [{0}]: {1}", masterName, ex.Message);
+            }
+
+            // 获取该master的哨兵节点
+            try
+            {
+                var sentinels = rds.Execute(null, (r, k) => r.Execute<Object[]>("SENTINEL", "SENTINELS", masterName));
+                if (sentinels != null && sentinels.Length > 0)
+                {
+                    ProcessSentinelList(node, sentinels, "sentinel");
+                }
+            }
+            catch (Exception ex)
+            {
+                XTrace.Log.Debug("获取哨兵节点失败 [{0}]: {1}", masterName, ex.Message);
+            }
+        }
+    }
+
+    /// <summary>处理哨兵节点列表</summary>
+    /// <param name="node">当前Redis节点</param>
+    /// <param name="list">节点列表</param>
+    /// <param name="role">角色</param>
+    private void ProcessSentinelList(RedisNode node, Object[] list, String role)
+    {
+        var items = list.Where(item => item is Object[] arr && arr.Length > 0);
+        foreach (var itemObj in items)
+        {
+            var item = (Object[])itemObj;
+            var info = ParseRedisArray(item);
+            if (info.TryGetValue("ip", out var ip) && info.TryGetValue("port", out var port))
+            {
+                var address = $"{ip}:{port}";
+                AddRedisNode(node, address, role);
+            }
+        }
+    }
+
+    /// <summary>解析Redis返回的数组为字典</summary>
+    /// <param name="array">Redis数组</param>
+    /// <returns>字典</returns>
+    private static IDictionary<String, String> ParseRedisArray(Object[] array)
+    {
+        var dict = new Dictionary<String, String>();
+        
+        // Redis 返回的数组应该是偶数长度(key-value对)
+        if (array.Length % 2 != 0)
+        {
+            XTrace.WriteLine("警告: Redis 返回的数组长度为奇数 {0},最后一个元素将被忽略", array.Length);
+        }
+
+        for (var i = 0; i < array.Length - 1; i += 2)
+        {
+            var key = array[i]?.ToString();
+            var value = array[i + 1]?.ToString();
+            if (!key.IsNullOrEmpty() && !value.IsNullOrEmpty())
+            {
+                dict[key] = value;
+            }
+        }
+        return dict;
+    }
+
+    /// <summary>添加Redis节点</summary>
+    /// <param name="parentNode">父节点</param>
+    /// <param name="address">节点地址</param>
+    /// <param name="role">节点角色</param>
+    private void AddRedisNode(RedisNode parentNode, String address, String role)
+    {
+        // 跳过当前节点
+        if (address.EqualIgnoreCase(parentNode.Server)) return;
+
+        // 检查节点是否已存在
+        var existingNode = RedisNode.FindByServer(address);
+        if (existingNode != null) return;
+
+        // 生成长度受控的节点名称,避免超过 RedisNode.Name 的最大长度(50)
+        var name = !parentNode.Name.IsNullOrEmpty() ? $"{parentNode.Name}-{role}-{address}" : $"{role}-{address}";
+        if (name.Length > 50) name = name[..50];
+
+        // 创建新节点,捕获唯一索引冲突
+        try
+        {
+            var newNode = new RedisNode
+            {
+                Name = name,
+                Category = parentNode.Category,
+                Server = address,
+                UserName = parentNode.UserName,
+                Password = parentNode.Password,
+                ProjectId = parentNode.ProjectId,
+                Enable = true,
+                ScanQueue = parentNode.ScanQueue,
+                WebHook = parentNode.WebHook,
+                AlarmMemoryRate = parentNode.AlarmMemoryRate,
+                AlarmConnections = parentNode.AlarmConnections,
+                AlarmSpeed = parentNode.AlarmSpeed,
+                AlarmInputKbps = parentNode.AlarmInputKbps,
+                AlarmOutputKbps = parentNode.AlarmOutputKbps,
+            };
+            newNode.Insert();
+
+            XTrace.WriteLine("自动添加{0}节点: {1}", role, address);
+            WriteLog("DiscoverSentinelNodes", true, $"自动添加{role}节点 [{address}] 从 [{parentNode.Server}]");
+        }
+        catch (Exception ex)
+        {
+            // 可能是并发插入导致的唯一索引冲突,忽略
+            if (!ex.Message.Contains("duplicate") && !ex.Message.Contains("唯一"))
+            {
+                XTrace.WriteException(ex);
+            }
+        }
+    }
+
+    /// <summary>写日志</summary>
+    /// <param name="action">操作</param>
+    /// <param name="success">是否成功</param>
+    /// <param name="remark">备注</param>
+    private static void WriteLog(String action, Boolean success, String remark)
+    {
+        LogProvider.Provider?.WriteLog("RedisNode", action, success, remark);
+    }
 }
\ No newline at end of file