v2.6.2024.1004 支持RocketMQ v5.3,在公网测试通过。默认内网broker地址替换为公网地址。大石头 authored at 2024-10-05 01:14:36
diff --git a/NewLife.RocketMQ/MqBase.cs b/NewLife.RocketMQ/MqBase.cs
index e3b0eb2..b00c2eb 100644
--- a/NewLife.RocketMQ/MqBase.cs
+++ b/NewLife.RocketMQ/MqBase.cs
@@ -2,6 +2,7 @@
using System.Diagnostics;
using System.Reflection;
using NewLife.Log;
+using NewLife.Net;
using NewLife.RocketMQ.Protocol;
using NewLife.Serialization;
@@ -267,8 +268,21 @@ public abstract class MqBase : DisposeBase
{
if (_Brokers.TryGetValue(name, out client)) return client;
+ // broker可能在内网,转为公网地址
+ var uri = new NetUri(NameServerAddress.Split(";").FirstOrDefault());
+ var addrs = bk.Addresses.ToArray();
+ for (var i = 0; i < addrs.Length; i++)
+ {
+ var addr = addrs[i];
+ if (addr.StartsWithIgnoreCase("10.", "192.", "172."))
+ {
+ var p = addr.IndexOf(':');
+ addrs[i] = p > 0 ? uri.Host + addr[p..] : uri.Host;
+ }
+ }
+
// 实例化客户端
- client = CreateBroker(bk.Name, bk.Addresses);
+ client = CreateBroker(bk.Name, addrs);
client.Start();
@@ -316,7 +330,7 @@ public abstract class MqBase : DisposeBase
/// <param name="topic">主题</param>
/// <param name="queueNum">队列数</param>
/// <param name="topicSysFlag"></param>
- public virtual void CreateTopic(String topic, Int32 queueNum, Int32 topicSysFlag = 0)
+ public virtual Int32 CreateTopic(String topic, Int32 queueNum, Int32 topicSysFlag = 0)
{
var header = new
{
@@ -330,6 +344,7 @@ public abstract class MqBase : DisposeBase
order = false,
};
+ var count = 0;
using var span = Tracer?.NewSpan($"mq:{Name}:CreateTopic", header);
try
{
@@ -341,6 +356,7 @@ public abstract class MqBase : DisposeBase
{
var bk = GetBroker(item.Name);
var rs = bk.Invoke(RequestCode.UPDATE_AND_CREATE_TOPIC, null, header);
+ if (rs != null && rs.Header.Code == (Int32)ResponseCode.SUCCESS) count++;
}
catch (Exception ex)
{
@@ -354,6 +370,8 @@ public abstract class MqBase : DisposeBase
throw;
}
+
+ return count;
}
#endregion
diff --git a/NewLife.RocketMQ/NewLife.RocketMQ.csproj b/NewLife.RocketMQ/NewLife.RocketMQ.csproj
index 374e69e..f140267 100644
--- a/NewLife.RocketMQ/NewLife.RocketMQ.csproj
+++ b/NewLife.RocketMQ/NewLife.RocketMQ.csproj
@@ -59,7 +59,7 @@
</ItemGroup>
<ItemGroup>
- <PackageReference Include="NewLife.Core" Version="11.0.2024.917-beta0004" />
+ <PackageReference Include="NewLife.Core" Version="11.0.2024.1001" />
</ItemGroup>
</Project>
diff --git a/NewLife.RocketMQ/Protocol/Command.cs b/NewLife.RocketMQ/Protocol/Command.cs
index 7b59e13..51a36c2 100644
--- a/NewLife.RocketMQ/Protocol/Command.cs
+++ b/NewLife.RocketMQ/Protocol/Command.cs
@@ -258,9 +258,9 @@ public class Command : IAccessor, IMessage
{
var ms = new MemoryStream();
Write(ms, null);
- ms.Position = 0;
- return new Packet(ms);
+ ms.Position = 0;
+ return new ArrayPacket(ms);
}
/// <summary>创建响应</summary>
diff --git a/NewLife.RocketMQ/Protocol/MessageExt.cs b/NewLife.RocketMQ/Protocol/MessageExt.cs
index 8dd6c82..dba3528 100644
--- a/NewLife.RocketMQ/Protocol/MessageExt.cs
+++ b/NewLife.RocketMQ/Protocol/MessageExt.cs
@@ -121,7 +121,7 @@ public class MessageExt : Message, IAccessor
ms.Write(port.GetBytes(false));
ms.Write(CommitLogOffset.GetBytes(false));
- MsgId = ms.Put(true).ToHex(0, 16);
+ MsgId = ms.Return(true).ToHex(0, 16);
return true;
}
diff --git a/Test/Program.cs b/Test/Program.cs
index af44b54..55fe83d 100644
--- a/Test/Program.cs
+++ b/Test/Program.cs
@@ -17,8 +17,8 @@ class Program
{
XTrace.UseConsole();
- //Test5();
- TestAliyun();
+ Test1();
+ //TestAliyun();
Console.WriteLine("OK!");
Console.ReadKey();
@@ -26,35 +26,61 @@ class Program
static void Test1()
{
- var mq = new Producer
+ XTrace.WriteLine("");
+ XTrace.WriteLine("创建生产者……");
+ var producer = new Producer
{
Topic = "nx_test",
- NameServerAddress = "127.0.0.1:9876",
+ NameServerAddress = "rocketmq.newlifex.com:9876",
Log = XTrace.Log,
};
- mq.Configure(MqSetting.Current);
- mq.Start();
+ producer.Configure(MqSetting.Current);
+ producer.Start();
//mq.CreateTopic("nx_test", 2);
- for (var i = 0; i < 1000_000; i++)
+ XTrace.WriteLine("");
+ XTrace.WriteLine("创建消费者……");
+ var consumer = new Consumer
+ {
+ Topic = producer.Topic,
+ Group = "test",
+ NameServerAddress = producer.NameServerAddress,
+
+ FromLastOffset = false,
+ //SkipOverStoredMsgCount = 0,
+ //BatchSize = 20,
+
+ Log = XTrace.Log,
+ ClientLog = XTrace.Log,
+ };
+
+ consumer.OnConsume = OnConsume;
+
+ consumer.Configure(MqSetting.Current);
+ consumer.Start();
+ Thread.Sleep(1000);
+
+ XTrace.WriteLine("");
+ XTrace.WriteLine("发布测试消息……");
+ for (var i = 0; i < 10; i++)
{
var str = "学无先后达者为师" + i;
//var str = Rand.NextString(1337);
- var sr = mq.Publish(str, "TagA", null);
+ var sr = producer.Publish(str, "TagA", null);
//Console.WriteLine("[{0}] {1} {2} {3}", sr.Queue.BrokerName, sr.Queue.QueueId, sr.MsgId, sr.QueueOffset);
// 阿里云发送消息不能过快,否则报错“服务不可用”
- Thread.Sleep(100);
+ Thread.Sleep(500);
}
Console.WriteLine("完成");
- mq.Dispose();
+ producer.Dispose();
}
private static Consumer _consumer;
@@ -121,7 +147,7 @@ class Program
foreach (var item in ms.ToList())
{
- Console.WriteLine($"消息:主键【{item.Keys}】 Topic 【{item.Topic}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr(null, 0, 64)}】");
+ Console.WriteLine($"消息:主键【{item.Keys}】 Topic 【{item.Topic}】,产生时间【{item.BornTimestamp.ToDateTime().ToFullString()}】,内容【{item.Body.ToStr(null, 0, 64)}】");
}
return true;
diff --git a/XUnitTestRocketMQ/BasicTest.cs b/XUnitTestRocketMQ/BasicTest.cs
new file mode 100644
index 0000000..b290acb
--- /dev/null
+++ b/XUnitTestRocketMQ/BasicTest.cs
@@ -0,0 +1,36 @@
+using System;
+using System.Linq;
+using NewLife;
+using NewLife.Log;
+using NewLife.RocketMQ;
+using Xunit;
+
+// 所有测试用例放入一个汇编级集合,除非单独指定Collection特性
+[assembly: CollectionBehavior(CollectionBehavior.CollectionPerAssembly)]
+
+namespace XUnitTestRocketMQ;
+
+[Collection("Basic")]
+public class BasicTest
+{
+ private static MqSetting _config;
+ public static MqSetting GetConfig()
+ {
+ if (_config != null) return _config;
+ lock (typeof(BasicTest))
+ {
+ if (_config != null) return _config;
+
+ var set = MqSetting.Current;
+ if (set.IsNew)
+ {
+ set.NameServer = "rocketmq.newlifex.com:9876";
+ set.Save();
+ }
+
+ XTrace.WriteLine("RocketMQ配置:{0}", set.NameServer);
+
+ return _config = set;
+ }
+ }
+}
diff --git a/XUnitTestRocketMQ/CommandTests.cs b/XUnitTestRocketMQ/CommandTests.cs
index de6c7c1..e5b6724 100644
--- a/XUnitTestRocketMQ/CommandTests.cs
+++ b/XUnitTestRocketMQ/CommandTests.cs
@@ -1,9 +1,5 @@
using System;
-using System.Collections.Generic;
using System.IO;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
using NewLife;
using NewLife.Data;
using NewLife.RocketMQ.Protocol;
diff --git a/XUnitTestRocketMQ/ConsumerTests.cs b/XUnitTestRocketMQ/ConsumerTests.cs
index 030109e..91f9987 100644
--- a/XUnitTestRocketMQ/ConsumerTests.cs
+++ b/XUnitTestRocketMQ/ConsumerTests.cs
@@ -7,49 +7,49 @@ using System.Linq;
using System.Threading;
using Xunit;
-namespace XUnitTestRocketMQ
+namespace XUnitTestRocketMQ;
+
+public class ConsumerTests
{
- public class ConsumerTests
+ private static Consumer _consumer;
+ [Fact]
+ public static void ConsumeTest()
{
- private static Consumer _consumer;
- [Fact]
- static void ConsumeTest()
+ var set = BasicTest.GetConfig();
+ var consumer = new Consumer
{
- var consumer = new Consumer
- {
- Topic = "nx_test",
- Group = "test",
- NameServerAddress = "127.0.0.1:9876",
-
- FromLastOffset = true,
- BatchSize = 20,
+ Topic = "nx_test",
+ Group = "test",
+ NameServerAddress = set.NameServer,
- Log = XTrace.Log,
- };
+ FromLastOffset = true,
+ BatchSize = 20,
- consumer.OnConsume = OnConsume;
- consumer.Start();
+ Log = XTrace.Log,
+ };
- _consumer = consumer;
+ consumer.OnConsume = OnConsume;
+ consumer.Start();
- Thread.Sleep(3000);
- //foreach (var item in consumer.Clients)
- //{
- // var rs = item.GetRuntimeInfo();
- // Console.WriteLine("{0}\t{1}", item.Name, rs["brokerVersionDesc"]);
- //}
- }
+ _consumer = consumer;
- private static Boolean OnConsume(MessageQueue q, MessageExt[] ms)
- {
- Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);
+ Thread.Sleep(3000);
+ //foreach (var item in consumer.Clients)
+ //{
+ // var rs = item.GetRuntimeInfo();
+ // Console.WriteLine("{0}\t{1}", item.Name, rs["brokerVersionDesc"]);
+ //}
+ }
- foreach (var item in ms.ToList())
- {
- Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】");
- }
+ private static Boolean OnConsume(MessageQueue q, MessageExt[] ms)
+ {
+ Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);
- return true;
+ foreach (var item in ms.ToList())
+ {
+ Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】");
}
+
+ return true;
}
}
\ No newline at end of file
diff --git a/XUnitTestRocketMQ/ProducerTests.cs b/XUnitTestRocketMQ/ProducerTests.cs
index 2e5d813..cae2e22 100644
--- a/XUnitTestRocketMQ/ProducerTests.cs
+++ b/XUnitTestRocketMQ/ProducerTests.cs
@@ -1,52 +1,52 @@
using NewLife.Log;
using NewLife.RocketMQ;
-using System;
-using System.Linq;
using Xunit;
-namespace XUnitTestRocketMQ
+namespace XUnitTestRocketMQ;
+
+public class ProducerTests
{
- public class ProducerTests
+ [Fact]
+ public void CreateTopic()
{
- [Fact]
- public void CreateTopic()
+ var set = BasicTest.GetConfig();
+ var mq = new Producer
{
- var mq = new Producer
- {
- //Topic = "nx_test",
- NameServerAddress = "127.0.0.1:9876",
+ //Topic = "nx_test",
+ NameServerAddress = set.NameServer,
- Log = XTrace.Log,
- };
+ Log = XTrace.Log,
+ };
- mq.Start();
+ mq.Start();
- // 创建topic时,start前不能指定topic,让其使用默认TBW102
- Assert.Equal("TBW102", mq.Topic);
+ // 创建topic时,start前不能指定topic,让其使用默认TBW102
+ Assert.Equal("TBW102", mq.Topic);
- mq.CreateTopic("nx_test", 2);
- }
+ var rs = mq.CreateTopic("nx_test", 2);
+ Assert.True(rs > 0);
+ }
- [Fact]
- static void ProduceTest()
+ [Fact]
+ public static void ProduceTest()
+ {
+ var set = BasicTest.GetConfig();
+ using var mq = new Producer
{
- using var mq = new Producer
- {
- Topic = "nx_test",
- NameServerAddress = "127.0.0.1:9876",
+ Topic = "nx_test",
+ NameServerAddress = set.NameServer,
- Log = XTrace.Log,
- };
+ Log = XTrace.Log,
+ };
- mq.Start();
+ mq.Start();
- for (var i = 0; i < 10; i++)
- {
- var str = "学无先后达者为师" + i;
- //var str = Rand.NextString(1337);
+ for (var i = 0; i < 10; i++)
+ {
+ var str = "学无先后达者为师" + i;
+ //var str = Rand.NextString(1337);
- var sr = mq.Publish(str, "TagA", null);
- }
+ var sr = mq.Publish(str, "TagA", null);
}
}
}
\ No newline at end of file