NewLife/NewLife.RocketMQ

Merge pull request #102 from NewLifeX/copilot/vscode1757042790060

[WIP] 仅向Master节点发送消息,BrokerInfo增加IsMaster标记
Stone authored at 2025-09-05 11:27:52 GitHub committed at 2025-09-05 11:27:52
9d1f975
Tree
2 Parent(s) 1de05a2 + b88204a
Summary: 3 changed files with 12 additions and 2 deletions.
Modified +3 -0
Modified +7 -1
Modified +2 -1
Modified +3 -0
diff --git a/NewLife.RocketMQ/Common/BrokerInfo.cs b/NewLife.RocketMQ/Common/BrokerInfo.cs
index 6bb2e0b..26cbe70 100644
--- a/NewLife.RocketMQ/Common/BrokerInfo.cs
+++ b/NewLife.RocketMQ/Common/BrokerInfo.cs
@@ -35,6 +35,9 @@ public class BrokerInfo
 
     /// <summary>主题同步标记</summary>
     public Int32 TopicSynFlag { get; set; }
+
+    /// <summary>是否主节点</summary>
+    public Boolean IsMaster { get; set; }
     #endregion
 
     #region 相等
Modified +7 -1
diff --git a/NewLife.RocketMQ/NameClient.cs b/NewLife.RocketMQ/NameClient.cs
index 62a9877..b3778bf 100644
--- a/NewLife.RocketMQ/NameClient.cs
+++ b/NewLife.RocketMQ/NameClient.cs
@@ -109,12 +109,18 @@ public class NameClient : ClusterClient
                     var name = item["brokerName"] + "";
                     var cluster = item["cluster"] + "";
                     if (item["brokerAddrs"] is IDictionary<String, Object> addrs)
+                    {
+                        // key==0为Master
+                        var addresses = addrs.Select(e => e.Value + "").ToArray();
+                        var isMaster = addrs.ContainsKey("0");
                         list.Add(new BrokerInfo
                         {
                             Name = name,
                             Cluster = cluster,
-                            Addresses = addrs.Select(e => e.Value + "").ToArray()
+                            Addresses = addresses,
+                            IsMaster = isMaster
                         });
+                    }
                 }
             }
 
Modified +2 -1
diff --git a/NewLife.RocketMQ/Producer.cs b/NewLife.RocketMQ/Producer.cs
index da317dd..897d291 100644
--- a/NewLife.RocketMQ/Producer.cs
+++ b/NewLife.RocketMQ/Producer.cs
@@ -583,7 +583,8 @@ public class Producer : MqBase
         var lb = LoadBalance;
         if (!lb.Ready)
         {
-            var list = Brokers.Where(e => e.Permission.HasFlag(Permissions.Write) && e.WriteQueueNums > 0).ToList();
+            // 只选择主节点且可写
+            var list = Brokers.Where(e => e.IsMaster && e.Permission.HasFlag(Permissions.Write) && e.WriteQueueNums > 0).ToList();
             if (list.Count == 0) return null;
 
             var total = list.Sum(e => e.WriteQueueNums);