NewLife/NewLife.RocketMQ

v2.6.2024.1004 支持RocketMQ v5.3,在公网测试通过。默认内网broker地址替换为公网地址。
大石头 authored at 2024-10-05 01:14:36
006817e
Tree
1 Parent(s) a5dbb18
Summary: 9 changed files with 162 additions and 86 deletions.
Modified +20 -2
Modified +1 -1
Modified +2 -2
Modified +1 -1
Modified +37 -11
Added +36 -0
Modified +0 -4
Modified +33 -33
Modified +32 -32
Modified +20 -2
diff --git a/NewLife.RocketMQ/MqBase.cs b/NewLife.RocketMQ/MqBase.cs
index e3b0eb2..b00c2eb 100644
--- a/NewLife.RocketMQ/MqBase.cs
+++ b/NewLife.RocketMQ/MqBase.cs
@@ -2,6 +2,7 @@
 using System.Diagnostics;
 using System.Reflection;
 using NewLife.Log;
+using NewLife.Net;
 using NewLife.RocketMQ.Protocol;
 using NewLife.Serialization;
 
@@ -267,8 +268,21 @@ public abstract class MqBase : DisposeBase
         {
             if (_Brokers.TryGetValue(name, out client)) return client;
 
+            // broker可能在内网,转为公网地址
+            var uri = new NetUri(NameServerAddress.Split(";").FirstOrDefault());
+            var addrs = bk.Addresses.ToArray();
+            for (var i = 0; i < addrs.Length; i++)
+            {
+                var addr = addrs[i];
+                if (addr.StartsWithIgnoreCase("10.", "192.", "172."))
+                {
+                    var p = addr.IndexOf(':');
+                    addrs[i] = p > 0 ? uri.Host + addr[p..] : uri.Host;
+                }
+            }
+
             // 实例化客户端
-            client = CreateBroker(bk.Name, bk.Addresses);
+            client = CreateBroker(bk.Name, addrs);
 
             client.Start();
 
@@ -316,7 +330,7 @@ public abstract class MqBase : DisposeBase
     /// <param name="topic">主题</param>
     /// <param name="queueNum">队列数</param>
     /// <param name="topicSysFlag"></param>
-    public virtual void CreateTopic(String topic, Int32 queueNum, Int32 topicSysFlag = 0)
+    public virtual Int32 CreateTopic(String topic, Int32 queueNum, Int32 topicSysFlag = 0)
     {
         var header = new
         {
@@ -330,6 +344,7 @@ public abstract class MqBase : DisposeBase
             order = false,
         };
 
+        var count = 0;
         using var span = Tracer?.NewSpan($"mq:{Name}:CreateTopic", header);
         try
         {
@@ -341,6 +356,7 @@ public abstract class MqBase : DisposeBase
                 {
                     var bk = GetBroker(item.Name);
                     var rs = bk.Invoke(RequestCode.UPDATE_AND_CREATE_TOPIC, null, header);
+                    if (rs != null && rs.Header.Code == (Int32)ResponseCode.SUCCESS) count++;
                 }
                 catch (Exception ex)
                 {
@@ -354,6 +370,8 @@ public abstract class MqBase : DisposeBase
 
             throw;
         }
+
+        return count;
     }
     #endregion
 
Modified +1 -1
diff --git a/NewLife.RocketMQ/NewLife.RocketMQ.csproj b/NewLife.RocketMQ/NewLife.RocketMQ.csproj
index 374e69e..f140267 100644
--- a/NewLife.RocketMQ/NewLife.RocketMQ.csproj
+++ b/NewLife.RocketMQ/NewLife.RocketMQ.csproj
@@ -59,7 +59,7 @@
   </ItemGroup>
 
   <ItemGroup>
-    <PackageReference Include="NewLife.Core" Version="11.0.2024.917-beta0004" />
+    <PackageReference Include="NewLife.Core" Version="11.0.2024.1001" />
   </ItemGroup>
 
 </Project>
Modified +2 -2
diff --git a/NewLife.RocketMQ/Protocol/Command.cs b/NewLife.RocketMQ/Protocol/Command.cs
index 7b59e13..51a36c2 100644
--- a/NewLife.RocketMQ/Protocol/Command.cs
+++ b/NewLife.RocketMQ/Protocol/Command.cs
@@ -258,9 +258,9 @@ public class Command : IAccessor, IMessage
     {
         var ms = new MemoryStream();
         Write(ms, null);
-        ms.Position = 0;
 
-        return new Packet(ms);
+        ms.Position = 0;
+        return new ArrayPacket(ms);
     }
 
     /// <summary>创建响应</summary>
Modified +1 -1
diff --git a/NewLife.RocketMQ/Protocol/MessageExt.cs b/NewLife.RocketMQ/Protocol/MessageExt.cs
index 8dd6c82..dba3528 100644
--- a/NewLife.RocketMQ/Protocol/MessageExt.cs
+++ b/NewLife.RocketMQ/Protocol/MessageExt.cs
@@ -121,7 +121,7 @@ public class MessageExt : Message, IAccessor
         ms.Write(port.GetBytes(false));
         ms.Write(CommitLogOffset.GetBytes(false));
 
-        MsgId = ms.Put(true).ToHex(0, 16);
+        MsgId = ms.Return(true).ToHex(0, 16);
 
         return true;
     }
Modified +37 -11
diff --git a/Test/Program.cs b/Test/Program.cs
index af44b54..55fe83d 100644
--- a/Test/Program.cs
+++ b/Test/Program.cs
@@ -17,8 +17,8 @@ class Program
     {
         XTrace.UseConsole();
 
-        //Test5();
-        TestAliyun();
+        Test1();
+        //TestAliyun();
 
         Console.WriteLine("OK!");
         Console.ReadKey();
@@ -26,35 +26,61 @@ class Program
 
     static void Test1()
     {
-        var mq = new Producer
+        XTrace.WriteLine("");
+        XTrace.WriteLine("创建生产者……");
+        var producer = new Producer
         {
             Topic = "nx_test",
-            NameServerAddress = "127.0.0.1:9876",
+            NameServerAddress = "rocketmq.newlifex.com:9876",
 
             Log = XTrace.Log,
         };
 
-        mq.Configure(MqSetting.Current);
-        mq.Start();
+        producer.Configure(MqSetting.Current);
+        producer.Start();
 
         //mq.CreateTopic("nx_test", 2);
 
-        for (var i = 0; i < 1000_000; i++)
+        XTrace.WriteLine("");
+        XTrace.WriteLine("创建消费者……");
+        var consumer = new Consumer
+        {
+            Topic = producer.Topic,
+            Group = "test",
+            NameServerAddress = producer.NameServerAddress,
+
+            FromLastOffset = false,
+            //SkipOverStoredMsgCount = 0,
+            //BatchSize = 20,
+
+            Log = XTrace.Log,
+            ClientLog = XTrace.Log,
+        };
+
+        consumer.OnConsume = OnConsume;
+
+        consumer.Configure(MqSetting.Current);
+        consumer.Start();
+        Thread.Sleep(1000);
+
+        XTrace.WriteLine("");
+        XTrace.WriteLine("发布测试消息……");
+        for (var i = 0; i < 10; i++)
         {
             var str = "学无先后达者为师" + i;
             //var str = Rand.NextString(1337);
 
-            var sr = mq.Publish(str, "TagA", null);
+            var sr = producer.Publish(str, "TagA", null);
 
             //Console.WriteLine("[{0}] {1} {2} {3}", sr.Queue.BrokerName, sr.Queue.QueueId, sr.MsgId, sr.QueueOffset);
 
             // 阿里云发送消息不能过快,否则报错“服务不可用”
-            Thread.Sleep(100);
+            Thread.Sleep(500);
         }
 
         Console.WriteLine("完成");
 
-        mq.Dispose();
+        producer.Dispose();
     }
 
     private static Consumer _consumer;
@@ -121,7 +147,7 @@ class Program
 
         foreach (var item in ms.ToList())
         {
-            Console.WriteLine($"消息:主键【{item.Keys}】 Topic 【{item.Topic}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr(null, 0, 64)}】");
+            Console.WriteLine($"消息:主键【{item.Keys}】 Topic 【{item.Topic}】,产生时间【{item.BornTimestamp.ToDateTime().ToFullString()}】,内容【{item.Body.ToStr(null, 0, 64)}】");
         }
 
         return true;
Added +36 -0
diff --git a/XUnitTestRocketMQ/BasicTest.cs b/XUnitTestRocketMQ/BasicTest.cs
new file mode 100644
index 0000000..b290acb
--- /dev/null
+++ b/XUnitTestRocketMQ/BasicTest.cs
@@ -0,0 +1,36 @@
+using System;
+using System.Linq;
+using NewLife;
+using NewLife.Log;
+using NewLife.RocketMQ;
+using Xunit;
+
+// 所有测试用例放入一个汇编级集合,除非单独指定Collection特性
+[assembly: CollectionBehavior(CollectionBehavior.CollectionPerAssembly)]
+
+namespace XUnitTestRocketMQ;
+
+[Collection("Basic")]
+public class BasicTest
+{
+    private static MqSetting _config;
+    public static MqSetting GetConfig()
+    {
+        if (_config != null) return _config;
+        lock (typeof(BasicTest))
+        {
+            if (_config != null) return _config;
+
+            var set = MqSetting.Current;
+            if (set.IsNew)
+            {
+                set.NameServer = "rocketmq.newlifex.com:9876";
+                set.Save();
+            }
+
+            XTrace.WriteLine("RocketMQ配置:{0}", set.NameServer);
+
+            return _config = set;
+        }
+    }
+}
Modified +0 -4
diff --git a/XUnitTestRocketMQ/CommandTests.cs b/XUnitTestRocketMQ/CommandTests.cs
index de6c7c1..e5b6724 100644
--- a/XUnitTestRocketMQ/CommandTests.cs
+++ b/XUnitTestRocketMQ/CommandTests.cs
@@ -1,9 +1,5 @@
 using System;
-using System.Collections.Generic;
 using System.IO;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
 using NewLife;
 using NewLife.Data;
 using NewLife.RocketMQ.Protocol;
Modified +33 -33
diff --git a/XUnitTestRocketMQ/ConsumerTests.cs b/XUnitTestRocketMQ/ConsumerTests.cs
index 030109e..91f9987 100644
--- a/XUnitTestRocketMQ/ConsumerTests.cs
+++ b/XUnitTestRocketMQ/ConsumerTests.cs
@@ -7,49 +7,49 @@ using System.Linq;
 using System.Threading;
 using Xunit;
 
-namespace XUnitTestRocketMQ
+namespace XUnitTestRocketMQ;
+
+public class ConsumerTests
 {
-    public class ConsumerTests
+    private static Consumer _consumer;
+    [Fact]
+    public static void ConsumeTest()
     {
-        private static Consumer _consumer;
-        [Fact]
-        static void ConsumeTest()
+        var set = BasicTest.GetConfig();
+        var consumer = new Consumer
         {
-            var consumer = new Consumer
-            {
-                Topic = "nx_test",
-                Group = "test",
-                NameServerAddress = "127.0.0.1:9876",
-
-                FromLastOffset = true,
-                BatchSize = 20,
+            Topic = "nx_test",
+            Group = "test",
+            NameServerAddress = set.NameServer,
 
-                Log = XTrace.Log,
-            };
+            FromLastOffset = true,
+            BatchSize = 20,
 
-            consumer.OnConsume = OnConsume;
-            consumer.Start();
+            Log = XTrace.Log,
+        };
 
-            _consumer = consumer;
+        consumer.OnConsume = OnConsume;
+        consumer.Start();
 
-            Thread.Sleep(3000);
-            //foreach (var item in consumer.Clients)
-            //{
-            //    var rs = item.GetRuntimeInfo();
-            //    Console.WriteLine("{0}\t{1}", item.Name, rs["brokerVersionDesc"]);
-            //}
-        }
+        _consumer = consumer;
 
-        private static Boolean OnConsume(MessageQueue q, MessageExt[] ms)
-        {
-            Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);
+        Thread.Sleep(3000);
+        //foreach (var item in consumer.Clients)
+        //{
+        //    var rs = item.GetRuntimeInfo();
+        //    Console.WriteLine("{0}\t{1}", item.Name, rs["brokerVersionDesc"]);
+        //}
+    }
 
-            foreach (var item in ms.ToList())
-            {
-                Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】");
-            }
+    private static Boolean OnConsume(MessageQueue q, MessageExt[] ms)
+    {
+        Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);
 
-            return true;
+        foreach (var item in ms.ToList())
+        {
+            Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】");
         }
+
+        return true;
     }
 }
\ No newline at end of file
Modified +32 -32
diff --git a/XUnitTestRocketMQ/ProducerTests.cs b/XUnitTestRocketMQ/ProducerTests.cs
index 2e5d813..cae2e22 100644
--- a/XUnitTestRocketMQ/ProducerTests.cs
+++ b/XUnitTestRocketMQ/ProducerTests.cs
@@ -1,52 +1,52 @@
 using NewLife.Log;
 using NewLife.RocketMQ;
-using System;
-using System.Linq;
 using Xunit;
 
-namespace XUnitTestRocketMQ
+namespace XUnitTestRocketMQ;
+
+public class ProducerTests
 {
-    public class ProducerTests
+    [Fact]
+    public void CreateTopic()
     {
-        [Fact]
-        public void CreateTopic()
+        var set = BasicTest.GetConfig();
+        var mq = new Producer
         {
-            var mq = new Producer
-            {
-                //Topic = "nx_test",
-                NameServerAddress = "127.0.0.1:9876",
+            //Topic = "nx_test",
+            NameServerAddress = set.NameServer,
 
-                Log = XTrace.Log,
-            };
+            Log = XTrace.Log,
+        };
 
-            mq.Start();
+        mq.Start();
 
-            // 创建topic时,start前不能指定topic,让其使用默认TBW102
-            Assert.Equal("TBW102", mq.Topic);
+        // 创建topic时,start前不能指定topic,让其使用默认TBW102
+        Assert.Equal("TBW102", mq.Topic);
 
-            mq.CreateTopic("nx_test", 2);
-        }
+        var rs = mq.CreateTopic("nx_test", 2);
+        Assert.True(rs > 0);
+    }
 
-        [Fact]
-        static void ProduceTest()
+    [Fact]
+    public static void ProduceTest()
+    {
+        var set = BasicTest.GetConfig();
+        using var mq = new Producer
         {
-            using var mq = new Producer
-            {
-                Topic = "nx_test",
-                NameServerAddress = "127.0.0.1:9876",
+            Topic = "nx_test",
+            NameServerAddress = set.NameServer,
 
-                Log = XTrace.Log,
-            };
+            Log = XTrace.Log,
+        };
 
-            mq.Start();
+        mq.Start();
 
-            for (var i = 0; i < 10; i++)
-            {
-                var str = "学无先后达者为师" + i;
-                //var str = Rand.NextString(1337);
+        for (var i = 0; i < 10; i++)
+        {
+            var str = "学无先后达者为师" + i;
+            //var str = Rand.NextString(1337);
 
-                var sr = mq.Publish(str, "TagA", null);
-            }
+            var sr = mq.Publish(str, "TagA", null);
         }
     }
 }
\ No newline at end of file