diff --git a/Stardust.Server/Services/RedisService.cs b/Stardust.Server/Services/RedisService.cs
index 204a6e9..088cfbc 100644
--- a/Stardust.Server/Services/RedisService.cs
+++ b/Stardust.Server/Services/RedisService.cs
@@ -6,6 +6,7 @@ using NewLife.Serialization;
using NewLife.Threading;
using Stardust.Data.Models;
using Stardust.Data.Nodes;
+using XCode.Membership;
namespace Stardust.Server.Services;
@@ -25,9 +26,9 @@ public class RedisService : IHostedService, IRedisService
private TimerX _traceQueue;
private readonly ICache _cache = new MemoryCache();
private readonly ITracer _tracer;
- private readonly ILog _log;
+ private readonly NewLife.Log.ILog _log;
- public RedisService(ITracer tracer, ILog log)
+ public RedisService(ITracer tracer, NewLife.Log.ILog log)
{
_tracer = tracer;
_log = log;
@@ -140,6 +141,9 @@ public class RedisService : IHostedService, IRedisService
// 扫描队列
if (node.ScanQueue && dbs != null && dbs.Length > 0) ScanQueue(node, dbs);
+
+ // 自动发现集群和哨兵节点
+ DiscoverNodes(node, rds);
}
private void ScanQueue(RedisNode node, RedisDbEntry[] dbs)
@@ -364,4 +368,278 @@ public class RedisService : IHostedService, IRedisService
break;
}
}
+
+ /// <summary>自动发现Redis集群和哨兵节点</summary>
+ /// <param name="node">当前Redis节点</param>
+ /// <param name="rds">Redis客户端</param>
+ private void DiscoverNodes(RedisNode node, FullRedis rds)
+ {
+ if (node.Mode.IsNullOrEmpty()) return;
+
+ try
+ {
+ if (node.Mode.EqualIgnoreCase("cluster"))
+ {
+ DiscoverClusterNodes(node, rds);
+ }
+ else if (node.Mode.EqualIgnoreCase("sentinel"))
+ {
+ DiscoverSentinelNodes(node, rds);
+ }
+ }
+ catch (Exception ex)
+ {
+ XTrace.WriteException(ex);
+ }
+ }
+
+ /// <summary>发现集群节点</summary>
+ /// <param name="node">当前Redis节点</param>
+ /// <param name="rds">Redis客户端</param>
+ private void DiscoverClusterNodes(RedisNode node, FullRedis rds)
+ {
+ using var span = _tracer?.NewSpan("RedisService-DiscoverClusterNodes", node);
+
+ // 执行 CLUSTER NODES 命令获取集群节点信息
+ var result = rds.Execute(null, (r, k) => r.Execute<String>("CLUSTER", "NODES"));
+ if (result.IsNullOrEmpty()) return;
+
+ var lines = result.Split('\n').Where(line => !line.IsNullOrWhiteSpace());
+ foreach (var line in lines)
+ {
+ // 每行格式: <id> <ip:port@cport> <flags> <master> <ping-sent> <pong-recv> <config-epoch> <link-state> <slot> <slot> ... <slot>
+ var parts = line.Split(' ', StringSplitOptions.RemoveEmptyEntries);
+ if (parts.Length < 3) continue;
+
+ // 解析地址信息 ip:port@cport 或 ip:port
+ var address = parts[1];
+ var atPos = address.IndexOf('@');
+ if (atPos > 0) address = address[..atPos];
+
+ // flags 字段用于判断是否为当前节点(包含 "myself")
+ var flags = parts[2];
+ if (flags.IndexOf("myself", StringComparison.OrdinalIgnoreCase) >= 0) continue;
+
+ // 检查节点是否已存在
+ var existingNode = RedisNode.FindByServer(address);
+ if (existingNode != null) continue;
+
+ // 生成长度受控的节点名称,避免超过 RedisNode.Name 的最大长度(50)
+ var name = !node.Name.IsNullOrEmpty() ? $"{node.Name}-{address}" : address;
+ if (name.Length > 50) name = name[..50];
+
+ // 创建新节点,捕获唯一索引冲突
+ try
+ {
+ var newNode = new RedisNode
+ {
+ Name = name,
+ Category = node.Category,
+ Server = address,
+ UserName = node.UserName,
+ Password = node.Password,
+ ProjectId = node.ProjectId,
+ Enable = true,
+ ScanQueue = node.ScanQueue,
+ WebHook = node.WebHook,
+ AlarmMemoryRate = node.AlarmMemoryRate,
+ AlarmConnections = node.AlarmConnections,
+ AlarmSpeed = node.AlarmSpeed,
+ AlarmInputKbps = node.AlarmInputKbps,
+ AlarmOutputKbps = node.AlarmOutputKbps,
+ };
+ newNode.Insert();
+
+ XTrace.WriteLine("自动添加集群节点: {0}", address);
+ WriteLog("DiscoverClusterNodes", true, $"自动添加集群节点 [{address}] 从 [{node.Server}]");
+ }
+ catch (Exception ex)
+ {
+ // 可能是并发插入导致的唯一索引冲突,忽略
+ if (!ex.Message.Contains("duplicate") && !ex.Message.Contains("唯一"))
+ {
+ XTrace.WriteException(ex);
+ }
+ }
+ }
+ }
+
+ /// <summary>发现哨兵节点</summary>
+ /// <param name="node">当前Redis节点</param>
+ /// <param name="rds">Redis客户端</param>
+ private void DiscoverSentinelNodes(RedisNode node, FullRedis rds)
+ {
+ using var span = _tracer?.NewSpan("RedisService-DiscoverSentinelNodes", node);
+
+ try
+ {
+ // 获取所有master
+ var masters = rds.Execute(null, (r, k) => r.Execute<Object[]>("SENTINEL", "MASTERS"));
+ if (masters != null && masters.Length > 0)
+ {
+ DiscoverSentinelMasters(node, rds, masters);
+ }
+ }
+ catch (Exception ex)
+ {
+ XTrace.WriteException(ex);
+ }
+ }
+
+ /// <summary>发现哨兵主从节点</summary>
+ /// <param name="node">当前Redis节点</param>
+ /// <param name="rds">Redis客户端</param>
+ /// <param name="masters">主节点列表</param>
+ private void DiscoverSentinelMasters(RedisNode node, FullRedis rds, Object[] masters)
+ {
+ var masterObjs = masters.Where(m => m is Object[] arr && arr.Length > 0);
+ foreach (var masterObj in masterObjs)
+ {
+ var master = (Object[])masterObj;
+
+ // 解析master信息,格式为key-value对
+ var masterInfo = ParseRedisArray(master);
+ if (!masterInfo.TryGetValue("name", out var masterName)) continue;
+
+ // 添加master节点
+ if (masterInfo.TryGetValue("ip", out var ip) && masterInfo.TryGetValue("port", out var port))
+ {
+ var address = $"{ip}:{port}";
+ AddRedisNode(node, address, "master");
+ }
+
+ // 获取该master的所有slaves
+ try
+ {
+ var slaves = rds.Execute(null, (r, k) => r.Execute<Object[]>("SENTINEL", "SLAVES", masterName));
+ if (slaves != null && slaves.Length > 0)
+ {
+ ProcessSentinelList(node, slaves, "slave");
+ }
+ }
+ catch (Exception ex)
+ {
+ XTrace.Log.Debug("获取从节点失败 [{0}]: {1}", masterName, ex.Message);
+ }
+
+ // 获取该master的哨兵节点
+ try
+ {
+ var sentinels = rds.Execute(null, (r, k) => r.Execute<Object[]>("SENTINEL", "SENTINELS", masterName));
+ if (sentinels != null && sentinels.Length > 0)
+ {
+ ProcessSentinelList(node, sentinels, "sentinel");
+ }
+ }
+ catch (Exception ex)
+ {
+ XTrace.Log.Debug("获取哨兵节点失败 [{0}]: {1}", masterName, ex.Message);
+ }
+ }
+ }
+
+ /// <summary>处理哨兵节点列表</summary>
+ /// <param name="node">当前Redis节点</param>
+ /// <param name="list">节点列表</param>
+ /// <param name="role">角色</param>
+ private void ProcessSentinelList(RedisNode node, Object[] list, String role)
+ {
+ var items = list.Where(item => item is Object[] arr && arr.Length > 0);
+ foreach (var itemObj in items)
+ {
+ var item = (Object[])itemObj;
+ var info = ParseRedisArray(item);
+ if (info.TryGetValue("ip", out var ip) && info.TryGetValue("port", out var port))
+ {
+ var address = $"{ip}:{port}";
+ AddRedisNode(node, address, role);
+ }
+ }
+ }
+
+ /// <summary>解析Redis返回的数组为字典</summary>
+ /// <param name="array">Redis数组</param>
+ /// <returns>字典</returns>
+ private static IDictionary<String, String> ParseRedisArray(Object[] array)
+ {
+ var dict = new Dictionary<String, String>();
+
+ // Redis 返回的数组应该是偶数长度(key-value对)
+ if (array.Length % 2 != 0)
+ {
+ XTrace.WriteLine("警告: Redis 返回的数组长度为奇数 {0},最后一个元素将被忽略", array.Length);
+ }
+
+ for (var i = 0; i < array.Length - 1; i += 2)
+ {
+ var key = array[i]?.ToString();
+ var value = array[i + 1]?.ToString();
+ if (!key.IsNullOrEmpty() && !value.IsNullOrEmpty())
+ {
+ dict[key] = value;
+ }
+ }
+ return dict;
+ }
+
+ /// <summary>添加Redis节点</summary>
+ /// <param name="parentNode">父节点</param>
+ /// <param name="address">节点地址</param>
+ /// <param name="role">节点角色</param>
+ private void AddRedisNode(RedisNode parentNode, String address, String role)
+ {
+ // 跳过当前节点
+ if (address.EqualIgnoreCase(parentNode.Server)) return;
+
+ // 检查节点是否已存在
+ var existingNode = RedisNode.FindByServer(address);
+ if (existingNode != null) return;
+
+ // 生成长度受控的节点名称,避免超过 RedisNode.Name 的最大长度(50)
+ var name = !parentNode.Name.IsNullOrEmpty() ? $"{parentNode.Name}-{role}-{address}" : $"{role}-{address}";
+ if (name.Length > 50) name = name[..50];
+
+ // 创建新节点,捕获唯一索引冲突
+ try
+ {
+ var newNode = new RedisNode
+ {
+ Name = name,
+ Category = parentNode.Category,
+ Server = address,
+ UserName = parentNode.UserName,
+ Password = parentNode.Password,
+ ProjectId = parentNode.ProjectId,
+ Enable = true,
+ ScanQueue = parentNode.ScanQueue,
+ WebHook = parentNode.WebHook,
+ AlarmMemoryRate = parentNode.AlarmMemoryRate,
+ AlarmConnections = parentNode.AlarmConnections,
+ AlarmSpeed = parentNode.AlarmSpeed,
+ AlarmInputKbps = parentNode.AlarmInputKbps,
+ AlarmOutputKbps = parentNode.AlarmOutputKbps,
+ };
+ newNode.Insert();
+
+ XTrace.WriteLine("自动添加{0}节点: {1}", role, address);
+ WriteLog("DiscoverSentinelNodes", true, $"自动添加{role}节点 [{address}] 从 [{parentNode.Server}]");
+ }
+ catch (Exception ex)
+ {
+ // 可能是并发插入导致的唯一索引冲突,忽略
+ if (!ex.Message.Contains("duplicate") && !ex.Message.Contains("唯一"))
+ {
+ XTrace.WriteException(ex);
+ }
+ }
+ }
+
+ /// <summary>写日志</summary>
+ /// <param name="action">操作</param>
+ /// <param name="success">是否成功</param>
+ /// <param name="remark">备注</param>
+ private static void WriteLog(String action, Boolean success, String remark)
+ {
+ LogProvider.Provider?.WriteLog("RedisNode", action, success, remark);
+ }
}
\ No newline at end of file