NewLife/NewLife.RocketMQ

[improv]改进阿里云rocketmq对接。在过去两年时间里,阿里云rocketmq做了升级,导致某些指令兼容性没那么好,这里统一做兼容处理。阿里云rmq的网络架构非常特殊,在vpc内网时,就当作普通rmq使用,没有特别之处。在公网时,获取得到的broker实际上是网关,然后获取消费组状态时,得到的却是内网broker状态,这里修改代码强行通过,但是消费时仍然得到不支持lite pull的错误。
智能大石头 编写于 2024-04-11 00:49:12
共计: 修改10个文件,增加270行、删除263行。
修改 +4 -4
修改 +34 -44
修改 +60 -47
修改 +3 -0
修改 +6 -8
修改 +28 -25
修改 +1 -1
修改 +6 -4
修改 +56 -57
修改 +72 -73
修改 +4 -4
diff --git a/NewLife.RocketMQ/BrokerClient.cs b/NewLife.RocketMQ/BrokerClient.cs
index 2353d5d..c3189bb 100644
--- a/NewLife.RocketMQ/BrokerClient.cs
+++ b/NewLife.RocketMQ/BrokerClient.cs
@@ -99,15 +99,15 @@ public class BrokerClient : ClusterClient
             // 生产者 和 消费者 略有不同
             if (cfg is Producer pd)
             {
-                body.ProducerDataSet = new[] {
+                body.ProducerDataSet = [
                 new ProducerData { GroupName = pd.Group },
                 new ProducerData { GroupName = "CLIENT_INNER_PRODUCER" },
-            };
-                body.ConsumerDataSet = new ConsumerData[] { };
+            ];
+                body.ConsumerDataSet = [];
             }
             else if (cfg is Consumer cm)
             {
-                body.ProducerDataSet = new[] { new ProducerData { GroupName = "CLIENT_INNER_PRODUCER" } };
+                body.ProducerDataSet = [new ProducerData { GroupName = "CLIENT_INNER_PRODUCER" }];
                 body.ConsumerDataSet = cm.Data.ToArray();
             }
 
修改 +34 -44
diff --git a/NewLife.RocketMQ/Consumer.cs b/NewLife.RocketMQ/Consumer.cs
index 29086db..c34eda3 100644
--- a/NewLife.RocketMQ/Consumer.cs
+++ b/NewLife.RocketMQ/Consumer.cs
@@ -80,50 +80,36 @@ public class Consumer : MqBase
 
     /// <summary>启动</summary>
     /// <returns></returns>
-    public override Boolean Start()
+    protected override void OnStart()
     {
-        if (Active) return true;
-
         WriteLog("正在准备消费 {0}", Topic);
 
-        using var span = Tracer?.NewSpan($"mq:{Topic}:Start");
-        try
+        var list = Data;
+        if (list == null)
         {
-            var list = Data;
-            if (list == null)
+            // 建立消费者数据,用于心跳
+            var sd = new SubscriptionData
             {
-                // 建立消费者数据,用于心跳
-                var sd = new SubscriptionData
-                {
-                    Topic = Topic,
-                    TagsSet = Tags
-                };
-                var cd = new ConsumerData
-                {
-                    GroupName = Group,
-                    ConsumeFromWhere = FromLastOffset ? "CONSUME_FROM_LAST_OFFSET" : "CONSUME_FROM_FIRST_OFFSET",
-                    MessageModel = MessageModel.ToString().ToUpper(),
-                    SubscriptionDataSet = new[] { sd },
-                };
-
-                list = new List<ConsumerData> { cd };
-
-                Data = list;
-            }
+                Topic = Topic,
+                TagsSet = Tags
+            };
+            var cd = new ConsumerData
+            {
+                GroupName = Group,
+                ConsumeFromWhere = FromLastOffset ? "CONSUME_FROM_LAST_OFFSET" : "CONSUME_FROM_FIRST_OFFSET",
+                MessageModel = MessageModel.ToString().ToUpper(),
+                SubscriptionDataSet = [sd],
+            };
 
-            if (!base.Start()) return false;
+            list = [cd];
 
-            // 默认自动开始调度
-            if (AutoSchedule) StartSchedule();
+            Data = list;
         }
-        catch (Exception ex)
-        {
-            span?.SetError(ex, null);
 
-            throw;
-        }
+        base.OnStart();
 
-        return true;
+        // 默认自动开始调度
+        if (AutoSchedule) StartSchedule();
     }
 
     /// <summary>
@@ -133,7 +119,7 @@ public class Consumer : MqBase
     {
         if (!Active) return;
 
-        using var span = Tracer?.NewSpan($"mq:{Topic}:Stop");
+        using var span = Tracer?.NewSpan($"mq:{Name}:Stop");
         try
         {
             // 停止并保存偏移
@@ -207,7 +193,7 @@ public class Consumer : MqBase
         else
         {
             pr.Status = PullStatus.Unknown;
-            Log.Warn("响应编号:{0} 响应备注:{1} 序列编号:{2} 序列偏移量:{3}", rs.Header.Code, rs.Header.Remark, mq.QueueId, offset);
+            Log.Warn("[{0}]{1} 序列编号:{2} 序列偏移量:{3}", (ResponseCode)rs.Header.Code, rs.Header.Remark, mq.QueueId, offset);
         }
 
         pr.Read(rs.Header?.ExtFields);
@@ -331,7 +317,7 @@ public class Consumer : MqBase
         // 在所有Broker上查询
         foreach (var item in Brokers)
         {
-            using var span = Tracer?.NewSpan($"mq:{Topic}:GetConsumers", item.Name);
+            using var span = Tracer?.NewSpan($"mq:{Name}:GetConsumers", item.Name);
             try
             {
                 var bk = GetBroker(item.Name);
@@ -476,7 +462,7 @@ public class Consumer : MqBase
                                 DefaultSpan.Current = null;
 
                                 // 性能埋点
-                                using var span = Tracer?.NewSpan($"mq:{Topic}:Consume", pr.Messages);
+                                using var span = Tracer?.NewSpan($"mq:{Name}:Consume", pr.Messages);
                                 try
                                 {
                                     // 触发消费
@@ -512,7 +498,7 @@ public class Consumer : MqBase
 
                             break;
                         case PullStatus.Unknown:
-                            Log.Error("未知响应类型消息序列[{1}]偏移量{0}", st.Offset, st.Queue.QueueId);
+                            Log.Error("未知响应类型消息,序列[{1}]偏移量{0}", st.Offset, st.Queue.QueueId);
                             break;
                         default:
                             break;
@@ -587,7 +573,8 @@ public class Consumer : MqBase
 
     class QueueStore
     {
-        [XmlIgnore] public MessageQueue Queue { get; set; }
+        [XmlIgnore]
+        public MessageQueue Queue { get; set; }
         public Int64 Offset { get; set; } = -1;
         public Int64 CommitOffset { get; set; } = -1;
 
@@ -602,6 +589,7 @@ public class Consumer : MqBase
         /// <returns></returns>
         public override Int32 GetHashCode() => (Queue == null ? 0 : Queue.GetHashCode()) ^ Offset.GetHashCode();
 
+        public override String ToString() => Queue?.ToString();
         #endregion
     }
 
@@ -682,7 +670,7 @@ public class Consumer : MqBase
         var str = dic.Join(";", e => $"{e.Key}[{e.Value}]");
         WriteLog("消费重新平衡,当前消费者负责queue分片:{0}", str);
 
-        using var span = Tracer?.NewSpan($"mq:{Topic}:Rebalance", str);
+        using var span = Tracer?.NewSpan($"mq:{Name}:Rebalance", str);
 
         _Queues = rs.ToArray();
         await InitOffsetAsync();
@@ -710,7 +698,7 @@ public class Consumer : MqBase
         if (_checking) return;
         _checking = true;
 
-        using var span = Tracer?.NewSpan($"mq:{Topic}:CheckGroup");
+        using var span = Tracer?.NewSpan($"mq:{Name}:CheckGroup");
         try
         {
             var rs = await Rebalance();
@@ -766,7 +754,8 @@ public class Consumer : MqBase
             if (store.Offset >= 0) continue;
 
             var item = offsetTables.FirstOrDefault(t => t.Key.BrokerName == store.Queue.BrokerName && t.Key.QueueId == store.Queue.QueueId);
-            var offsetTable = item.Value;
+            //!! 阿里云公网版RocketMQ,消费者状态返回的是真正brokerName,而前面Broker得到的是网关名,导致这里无法匹配
+            var offsetTable = item.Value ?? new OffsetWrapperModel();
             if (neverConsumed)
             {
                 var offset = 0L;
@@ -937,10 +926,11 @@ public class Consumer : MqBase
         ci.Properties = dic;
 
         var sd = new SubscriptionData { Topic = Topic, };
-        ci.SubscriptionSet = new[] { sd };
+        ci.SubscriptionSet = [sd];
 
         var sb = new StringBuilder();
         sb.Append('{');
+        if (_Queues != null)
         {
             sb.Append("\"mqTable\":{");
             for (var i = 0; i < _Queues.Length; i++)
修改 +60 -47
diff --git a/NewLife.RocketMQ/MqBase.cs b/NewLife.RocketMQ/MqBase.cs
index f165d5f..ea15684 100644
--- a/NewLife.RocketMQ/MqBase.cs
+++ b/NewLife.RocketMQ/MqBase.cs
@@ -10,42 +10,19 @@ namespace NewLife.RocketMQ.Client;
 public abstract class MqBase : DisposeBase
 {
     #region 属性
+    /// <summary>名称</summary>
+    public String Name { get; set; }
+
     /// <summary>名称服务器地址</summary>
     public String NameServerAddress { get; set; }
 
-    private String _group = "DEFAULT_PRODUCER";
     /// <summary>消费组</summary>
     /// <remarks>阿里云目前需要在Group前面带上实例ID并用【%】连接,组成路由Group[用来路由到实例Group]</remarks>
-    public String Group
-    {
-        get
-        {
-            // 阿里云目前需要在Group前面带上实例ID并用【%】连接,组成路由Group[用来路由到实例Group]
-            var ins = Aliyun?.InstanceId;
-            return ins.IsNullOrEmpty() ? _group : $"{ins}%{_group}";
-        }
-        set
-        {
-            _group = value;
-        }
-    }
+    public String Group { get; set; } = "DEFAULT_PRODUCER";
 
-    private String _topic = "TBW102";
     /// <summary>主题</summary>
     /// <remarks>阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic]</remarks>
-    public String Topic
-    {
-        get
-        {
-            // 阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic]
-            var ins = Aliyun?.InstanceId;
-            return ins.IsNullOrEmpty() ? _topic : $"{ins}%{_topic}";
-        }
-        set
-        {
-            _topic = value;
-        }
-    }
+    public String Topic { get; set; } = "TBW102";
 
     /// <summary>本地IP地址</summary>
     public String ClientIP { get; set; } = NetHelper.MyIP() + "";
@@ -91,6 +68,9 @@ public abstract class MqBase : DisposeBase
     /// <summary>性能跟踪</summary>
     public ITracer Tracer { get; set; } = DefaultTracer.Instance;
 
+    private String _group;
+    private String _topic;
+
     /// <summary>名称服务器</summary>
     protected NameClient _NameServer;
     #endregion
@@ -143,7 +123,7 @@ public abstract class MqBase : DisposeBase
 
     /// <summary>友好字符串</summary>
     /// <returns></returns>
-    public override String ToString() => Group;
+    public override String ToString() => _group;
     #endregion
 
     #region 基础方法
@@ -151,28 +131,63 @@ public abstract class MqBase : DisposeBase
     /// <param name="setting"></param>
     public virtual void Configure(MqSetting setting)
     {
-        NameServerAddress = setting.NameServer;
-        Topic = setting.Topic;
-        Group = setting.Group;
+        if (!setting.NameServer.IsNullOrEmpty()) NameServerAddress = setting.NameServer;
+        if (!setting.Topic.IsNullOrEmpty()) Topic = setting.Topic;
+        if (!setting.Group.IsNullOrEmpty()) Group = setting.Group;
+
+        Aliyun ??= new AliyunOptions();
+        if (!setting.Server.IsNullOrEmpty()) Aliyun.Server = setting.Server;
+        if (!setting.AccessKey.IsNullOrEmpty()) Aliyun.AccessKey = setting.AccessKey;
+        if (!setting.SecretKey.IsNullOrEmpty()) Aliyun.SecretKey = setting.SecretKey;
+    }
+
+    /// <summary>开始</summary>
+    /// <returns></returns>
+    public Boolean Start()
+    {
+        if (Active) return true;
 
-        if (!setting.Server.IsNullOrEmpty() &&
-            !setting.AccessKey.IsNullOrEmpty())
+        _group = Group;
+        _topic = Topic;
+        if (Name.IsNullOrEmpty()) Name = Topic;
+
+        // 解析阿里云实例
+        var aliyun = Aliyun;
+        if (aliyun != null && !aliyun.AccessKey.IsNullOrEmpty())
+        {
+            var ns = NameServerAddress;
+            if (aliyun.InstanceId.IsNullOrEmpty() && !ns.IsNullOrEmpty() && ns.Contains("MQ_INST_"))
+            {
+                aliyun.InstanceId = ns.Substring("://", ".");
+            }
+        }
+
+        using var span = Tracer?.NewSpan($"mq:{Name}:Start");
+        try
         {
-            Aliyun = new AliyunOptions
+            // 阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic]
+            var ins = Aliyun?.InstanceId;
+            if (!ins.IsNullOrEmpty())
             {
-                Server = setting.Server,
-                AccessKey = setting.AccessKey,
-                SecretKey = setting.SecretKey,
-            };
+                if (!Topic.StartsWith(ins)) Topic = $"{ins}%{Topic}";
+                if (!Group.StartsWith(ins)) Group = $"{ins}%{Group}";
+            }
+
+            OnStart();
+        }
+        catch (Exception ex)
+        {
+            span?.SetError(ex, null);
+
+            throw;
         }
+
+        return Active = true;
     }
 
     /// <summary>开始</summary>
-    /// <returns></returns>
-    public virtual Boolean Start()
+    protected virtual void OnStart()
     {
-        if (Active) return true;
-
         if (NameServerAddress.IsNullOrEmpty())
         {
             // 获取阿里云ONS的名称服务器地址
@@ -190,7 +205,7 @@ public abstract class MqBase : DisposeBase
 
         var client = new NameClient(ClientId, this)
         {
-            Name = Topic,
+            Name = Name,
             Tracer = Tracer,
             Log = Log
         };
@@ -204,8 +219,6 @@ public abstract class MqBase : DisposeBase
         }
 
         _NameServer = client;
-
-        return Active = true;
     }
 
     /// <summary>停止</summary>
@@ -313,7 +326,7 @@ public abstract class MqBase : DisposeBase
             order = false,
         };
 
-        using var span = Tracer?.NewSpan($"mq:{Topic}:CreateTopic", header);
+        using var span = Tracer?.NewSpan($"mq:{Name}:CreateTopic", header);
         try
         {
             // 在所有Broker上创建Topic
修改 +3 -0
diff --git a/NewLife.RocketMQ/NameClient.cs b/NewLife.RocketMQ/NameClient.cs
index 36efce5..1ff889c 100644
--- a/NewLife.RocketMQ/NameClient.cs
+++ b/NewLife.RocketMQ/NameClient.cs
@@ -46,6 +46,9 @@ public class NameClient : ClusterClient
     protected override void OnStart()
     {
         var cfg = Config;
+        if (cfg.NameServerAddress.IsNullOrEmpty())
+            throw new ArgumentNullException(nameof(cfg.NameServerAddress), "未指定NameServer地址");
+
         var ss = cfg.NameServerAddress.Split(";");
 
         var list = new List<NetUri>();
修改 +6 -8
diff --git a/NewLife.RocketMQ/Producer.cs b/NewLife.RocketMQ/Producer.cs
index 7c5d611..bb1a144 100644
--- a/NewLife.RocketMQ/Producer.cs
+++ b/NewLife.RocketMQ/Producer.cs
@@ -33,9 +33,9 @@ public class Producer : MqBase
     #region 基础方法
     /// <summary>启动</summary>
     /// <returns></returns>
-    public override Boolean Start()
+    protected override void OnStart()
     {
-        if (!base.Start()) return false;
+        base.OnStart();
 
         LoadBalance ??= new WeightRoundRobin();
 
@@ -48,8 +48,6 @@ public class Producer : MqBase
                 LoadBalance.Ready = false;
             };
         }
-
-        return true;
     }
     #endregion
 
@@ -72,7 +70,7 @@ public class Producer : MqBase
             header.QueueId = mq.QueueId;
 
             // 性能埋点
-            using var span = Tracer?.NewSpan($"mq:{Topic}:Publish", message.BodyString);
+            using var span = Tracer?.NewSpan($"mq:{Name}:Publish", message.BodyString);
             span?.AppendTag($"queue={mq}");
             try
             {
@@ -179,7 +177,7 @@ public class Producer : MqBase
             header.QueueId = mq.QueueId;
 
             // 性能埋点
-            using var span = Tracer?.NewSpan($"mq:{Topic}:PublishAsync", message.BodyString);
+            using var span = Tracer?.NewSpan($"mq:{Name}:PublishAsync", message.BodyString);
             try
             {
                 // 根据队列获取Broker客户端
@@ -260,7 +258,7 @@ public class Producer : MqBase
             header.QueueId = mq.QueueId;
 
             // 性能埋点
-            using var span = Tracer?.NewSpan($"mq:{Topic}:PublishOneway", message.BodyString);
+            using var span = Tracer?.NewSpan($"mq:{Name}:PublishOneway", message.BodyString);
             try
             {
                 // 根据队列获取Broker客户端
@@ -333,7 +331,7 @@ public class Producer : MqBase
             header.QueueId = mq.QueueId;
 
             // 性能埋点
-            using var span = Tracer?.NewSpan($"mq:{Topic}:PublishDelay", new { level, message.BodyString });
+            using var span = Tracer?.NewSpan($"mq:{Name}:PublishDelay", new { level, message.BodyString });
             try
             {
                 // 根据队列获取Broker客户端
修改 +28 -25
diff --git a/NewLife.RocketMQ/Protocol/ConsumerStates/MessageQueueModel.cs b/NewLife.RocketMQ/Protocol/ConsumerStates/MessageQueueModel.cs
index 23ba90d..67cf1b3 100644
--- a/NewLife.RocketMQ/Protocol/ConsumerStates/MessageQueueModel.cs
+++ b/NewLife.RocketMQ/Protocol/ConsumerStates/MessageQueueModel.cs
@@ -1,33 +1,36 @@
-namespace NewLife.RocketMQ.Protocol.ConsumerStates
+namespace NewLife.RocketMQ.Protocol.ConsumerStates;
+
+/// <summary>
+/// 消息队列信息模型
+/// </summary>
+public class MessageQueueModel
 {
     /// <summary>
-    /// 消息队列信息模型
+    /// Broker服务器名称
     /// </summary>
-    public class MessageQueueModel
-    {
-        /// <summary>
-        /// Broker服务器名称
-        /// </summary>
-        public String BrokerName { get; set; }
+    public String BrokerName { get; set; }
 
-        /// <summary>
-        /// 队列编码
-        /// </summary>
-        public Int32 QueueId { get; set; }
+    /// <summary>
+    /// 队列编码
+    /// </summary>
+    public Int32 QueueId { get; set; }
+
+    /// <summary>
+    /// 主题
+    /// </summary>
+    public String Topic { get; set; }
 
-        /// <summary>
-        /// 主题
-        /// </summary>
-        public String Topic { get; set; }
+    /// <summary>
+    /// 阿里版本返回字段
+    /// </summary>
+    public Boolean MainQueue { get; set; }
 
-        /// <summary>
-        /// 阿里版本返回字段
-        /// </summary>
-        public Boolean MainQueue { get; set; }
+    /// <summary>
+    /// 阿里版本返回字段
+    /// </summary>
+    public Int32 QueueGroupId { get; set; }
 
-        /// <summary>
-        /// 阿里版本返回字段
-        /// </summary>
-        public Int32 QueueGroupId { get; set; }
-    }
+    /// <summary>已重载。</summary>
+    /// <returns></returns>
+    public override String ToString() => $"{BrokerName}[{QueueId}]";   
 }
修改 +1 -1
diff --git a/NewLife.RocketMQ/Protocol/MessageQueue.cs b/NewLife.RocketMQ/Protocol/MessageQueue.cs
index b0e58a5..918881d 100644
--- a/NewLife.RocketMQ/Protocol/MessageQueue.cs
+++ b/NewLife.RocketMQ/Protocol/MessageQueue.cs
@@ -38,6 +38,6 @@ public class MessageQueue
     #region 辅助
     /// <summary>友好字符串</summary>
     /// <returns></returns>
-    public override String ToString() => $"MessageQueue[Topic={Topic}, BrokerName={BrokerName}, QueueId={QueueId}]";
+    public override String ToString() => $"{BrokerName}[{QueueId}]";
     #endregion
 }
\ No newline at end of file
修改 +6 -4
diff --git a/Test/Program.cs b/Test/Program.cs
index c10c09a..af44b54 100644
--- a/Test/Program.cs
+++ b/Test/Program.cs
@@ -17,8 +17,8 @@ class Program
     {
         XTrace.UseConsole();
 
-        Test5();
-        //TestAliyun();
+        //Test5();
+        TestAliyun();
 
         Console.WriteLine("OK!");
         Console.ReadKey();
@@ -84,6 +84,8 @@ class Program
 
     static void TestAliyun()
     {
+        // 2024-04-10 对接阿里云RocketMQ v4测试通过。
+        // 创建RocketMQ实例后,需要手工创建Topic和Group,并创建正确的AccessKey
         var consumer = new Consumer
         {
             Topic = "newlife_test_02",
@@ -94,7 +96,7 @@ class Program
             {
                 AccessKey = "LTAI5tKTGShu31C61xRARVC4",
                 SecretKey = "a9oPwph1IcMGanWckzUOwOf3Ork8LO",
-                InstanceId = "MQ_INST_1827694722767531_BXxCwUhm",
+                //InstanceId = "MQ_INST_1827694722767531_BXxCwUhm",
             },
 
             FromLastOffset = true,
@@ -107,7 +109,7 @@ class Program
 
         consumer.OnConsume = OnConsume;
 
-        //consumer.Configure(MqSetting.Current);
+        consumer.Configure(MqSetting.Current);
         consumer.Start();
 
         _consumer = consumer;
修改 +56 -57
diff --git a/XUnitTestRocketMQ/AliyunIssuesTests.cs b/XUnitTestRocketMQ/AliyunIssuesTests.cs
index 0ed91a8..f0da090 100644
--- a/XUnitTestRocketMQ/AliyunIssuesTests.cs
+++ b/XUnitTestRocketMQ/AliyunIssuesTests.cs
@@ -7,76 +7,75 @@ using NewLife.RocketMQ;
 using NewLife.RocketMQ.Protocol;
 using Xunit;
 
-namespace XUnitTestRocketMQ
+namespace XUnitTestRocketMQ;
+
+/// <summary>
+/// 修复Issues调用阿里云版RocketMQ相关问题
+/// #35、#24
+/// </summary>
+public class AliyunIssuesTests
 {
-    /// <summary>
-    /// 修复Issues调用阿里云版RocketMQ相关问题
-    /// #35、#24
-    /// </summary>
-    public class AliyunIssuesTests
+    private readonly String _testTopic = "newlife_test_01";
+    private readonly String _testGroup = "GID_newlife_Group01";
+    private static readonly AliyunOptions _aliyunOptions = new AliyunOptions()
+    {
+        AccessKey = "LTAIxxxxxxxxxxxxRARVC4",
+        SecretKey = "a9oPwxxxxxxxxxxx3OrxxLO",
+        Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet",
+        InstanceId = "MQ_INST_xxxxxxxxxxxx_AXxCwUhm"
+    };
+
+    [Fact]
+    public void ProducerForAliyun_Test()
     {
-        private readonly String _testTopic = "newlife_test_01";
-        private readonly String _testGroup = "GID_newlife_Group01";
-        private static readonly AliyunOptions _aliyunOptions = new AliyunOptions()
+        var producer = new Producer()
         {
-            AccessKey = "LTAIxxxxxxxxxxxxRARVC4",
-            SecretKey = "a9oPwxxxxxxxxxxx3OrxxLO",
-            Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet",
-            InstanceId = "MQ_INST_xxxxxxxxxxxx_AXxCwUhm"
+            Topic = _testTopic,
+            Aliyun = _aliyunOptions,
+            //NameServerAddress = "http://MQ_INST_xxxxxxxxxx_AXxCwUhm.mq-internet-access.mq-internet.aliyuncs.com:80",
+            //如果不用上面的默认Server地址,直接将NameServerAddress设为你自己的TCP公网接收点地址也是可以的
         };
 
-        [Fact]
-        public void ProducerForAliyun_Test()
-        {
-            var producer = new Producer()
-            {
-                Topic = _testTopic,
-                Aliyun = _aliyunOptions,
-                //NameServerAddress = "http://MQ_INST_xxxxxxxxxx_AXxCwUhm.mq-internet-access.mq-internet.aliyuncs.com:80",
-                //如果不用上面的默认Server地址,直接将NameServerAddress设为你自己的TCP公网接收点地址也是可以的
-            };
+        producer.Start();
 
-            producer.Start();
+        var pubResultList = new List<Boolean>();
+        for (var i = 0; i < 2; i++)
+        {
+            var message = "大家好才是真的好!";
+            var pubResult = producer.Publish(message, "newlife_test_tag");
+            pubResultList.Add(pubResult.Status == SendStatus.SendOK);
+        }
+        Assert.True(pubResultList.All(t => true));
 
-            var pubResultList = new List<Boolean>();
-            for (var i = 0; i < 2; i++)
-            {
-                var message = "大家好才是真的好!";
-                var pubResult = producer.Publish(message, "newlife_test_tag");
-                pubResultList.Add(pubResult.Status == SendStatus.SendOK);
-            }
-            Assert.True(pubResultList.All(t => true));
+        producer.Dispose();
+    }
 
-            producer.Dispose();
-        }
+    [Fact]
+    public void ConsumerForAliyun_Test()
+    {
+        var consumer = new Consumer()
+        {
+            Topic = _testTopic,
+            Aliyun = _aliyunOptions,
+            Group = _testGroup,
+            FromLastOffset = true,
+            BatchSize = 1,
+        };
 
-        [Fact]
-        public void ConsumerForAliyun_Test()
+        consumer.OnConsume = OnConsume;
+        consumer.Start();
+        Thread.Sleep(3000);
+       
+        static Boolean OnConsume(MessageQueue q, MessageExt[] ms)
         {
-            var consumer = new Consumer()
-            {
-                Topic = _testTopic,
-                Aliyun = _aliyunOptions,
-                Group = _testGroup,
-                FromLastOffset = true,
-                BatchSize = 1,
-            };
+            Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);
 
-            consumer.OnConsume = OnConsume;
-            consumer.Start();
-            Thread.Sleep(3000);
-           
-            static Boolean OnConsume(MessageQueue q, MessageExt[] ms)
+            foreach (var item in ms.ToList())
             {
-                Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);
-
-                foreach (var item in ms.ToList())
-                {
-                    Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】");
-                }
-
-                return true;
+                Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】");
             }
+
+            return true;
         }
     }
 }
修改 +72 -73
diff --git a/XUnitTestRocketMQ/AliyunTests.cs b/XUnitTestRocketMQ/AliyunTests.cs
index 45e48c3..a871a80 100644
--- a/XUnitTestRocketMQ/AliyunTests.cs
+++ b/XUnitTestRocketMQ/AliyunTests.cs
@@ -9,107 +9,106 @@ using System.Threading;
 using System.Threading.Tasks;
 using Xunit;
 
-namespace XUnitTestRocketMQ
+namespace XUnitTestRocketMQ;
+
+public class AliyunTests
 {
-    public class AliyunTests
+    private static void SetConfig(MqBase mq)
     {
-        private static void SetConfig(MqBase mq)
-        {
-            //mq.Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet";
-            mq.Configure(MqSetting.Current);
+        //mq.Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet";
+        mq.Configure(MqSetting.Current);
 
-            mq.Log = XTrace.Log;
-        }
+        mq.Log = XTrace.Log;
+    }
 
-        [Fact]
-        public void CreateTopic()
+    [Fact]
+    public void CreateTopic()
+    {
+        var mq = new Producer
         {
-            var mq = new Producer
-            {
-                //Topic = "nx_test",
-            };
-            SetConfig(mq);
+            //Topic = "nx_test",
+        };
+        SetConfig(mq);
 
-            mq.Start();
+        mq.Start();
 
-            // 创建topic时,start前不能指定topic,让其使用默认TBW102
-            Assert.Equal("TBW102", mq.Topic);
+        // 创建topic时,start前不能指定topic,让其使用默认TBW102
+        Assert.Equal("TBW102", mq.Topic);
 
-            mq.CreateTopic("nx_test", 2);
-        }
+        mq.CreateTopic("nx_test", 2);
+    }
 
-        [Fact]
-        static void ProduceTest()
+    [Fact]
+    static void ProduceTest()
+    {
+        using var mq = new Producer
         {
-            using var mq = new Producer
-            {
-                Topic = "test1",
-            };
-            SetConfig(mq);
+            Topic = "test1",
+        };
+        SetConfig(mq);
 
-            mq.Start();
+        mq.Start();
 
-            for (var i = 0; i < 10; i++)
-            {
-                var str = "学无先后达者为师" + i;
-                //var str = Rand.NextString(1337);
+        for (var i = 0; i < 10; i++)
+        {
+            var str = "学无先后达者为师" + i;
+            //var str = Rand.NextString(1337);
 
-                var sr = mq.Publish(str, "TagA", null);
-            }
+            var sr = mq.Publish(str, "TagA", null);
         }
+    }
 
-        [Fact]
-        static async Task ProduceAsyncTest()
+    [Fact]
+    static async Task ProduceAsyncTest()
+    {
+        using var mq = new Producer
         {
-            using var mq = new Producer
-            {
-                Topic = "test1",
-            };
-            SetConfig(mq);
+            Topic = "test1",
+        };
+        SetConfig(mq);
 
-            mq.Start();
+        mq.Start();
 
-            for (var i = 0; i < 10; i++)
-            {
-                var str = "学无先后达者为师" + i;
-                //var str = Rand.NextString(1337);
+        for (var i = 0; i < 10; i++)
+        {
+            var str = "学无先后达者为师" + i;
+            //var str = Rand.NextString(1337);
 
-                var sr = await mq.PublishAsync(str, "TagA", null);
-            }
+            var sr = await mq.PublishAsync(str, "TagA", null);
         }
+    }
 
-        private static Consumer _consumer;
-        [Fact]
-        static void ConsumeTest()
+    private static Consumer _consumer;
+    [Fact]
+    static void ConsumeTest()
+    {
+        var consumer = new Consumer
         {
-            var consumer = new Consumer
-            {
-                Topic = "test1",
-                Group = "test",
+            Topic = "test1",
+            Group = "test",
 
-                FromLastOffset = true,
-                BatchSize = 20,
-            };
-            SetConfig(consumer);
+            FromLastOffset = true,
+            BatchSize = 20,
+        };
+        SetConfig(consumer);
 
-            consumer.OnConsume = OnConsume;
-            consumer.Start();
+        consumer.OnConsume = OnConsume;
+        consumer.Start();
 
-            _consumer = consumer;
-
-            Thread.Sleep(3000);
-        }
+        _consumer = consumer;
 
-        private static Boolean OnConsume(MessageQueue q, MessageExt[] ms)
-        {
-            Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);
+        Thread.Sleep(3000);
+    }
 
-            foreach (var item in ms.ToList())
-            {
-                Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】");
-            }
+    private static Boolean OnConsume(MessageQueue q, MessageExt[] ms)
+    {
+        Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);
 
-            return true;
+        foreach (var item in ms.ToList())
+        {
+            Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】");
         }
+
+        return true;
     }
 }
\ No newline at end of file