[improv]改进阿里云rocketmq对接。在过去两年时间里,阿里云rocketmq做了升级,导致某些指令兼容性没那么好,这里统一做兼容处理。阿里云rmq的网络架构非常特殊,在vpc内网时,就当作普通rmq使用,没有特别之处。在公网时,获取得到的broker实际上是网关,然后获取消费组状态时,得到的却是内网broker状态,这里修改代码强行通过,但是消费时仍然得到不支持lite pull的错误。智能大石头 编写于 2024-04-11 00:49:12
diff --git a/NewLife.RocketMQ/BrokerClient.cs b/NewLife.RocketMQ/BrokerClient.cs
index 2353d5d..c3189bb 100644
--- a/NewLife.RocketMQ/BrokerClient.cs
+++ b/NewLife.RocketMQ/BrokerClient.cs
@@ -99,15 +99,15 @@ public class BrokerClient : ClusterClient
// 生产者 和 消费者 略有不同
if (cfg is Producer pd)
{
- body.ProducerDataSet = new[] {
+ body.ProducerDataSet = [
new ProducerData { GroupName = pd.Group },
new ProducerData { GroupName = "CLIENT_INNER_PRODUCER" },
- };
- body.ConsumerDataSet = new ConsumerData[] { };
+ ];
+ body.ConsumerDataSet = [];
}
else if (cfg is Consumer cm)
{
- body.ProducerDataSet = new[] { new ProducerData { GroupName = "CLIENT_INNER_PRODUCER" } };
+ body.ProducerDataSet = [new ProducerData { GroupName = "CLIENT_INNER_PRODUCER" }];
body.ConsumerDataSet = cm.Data.ToArray();
}
diff --git a/NewLife.RocketMQ/Consumer.cs b/NewLife.RocketMQ/Consumer.cs
index 29086db..c34eda3 100644
--- a/NewLife.RocketMQ/Consumer.cs
+++ b/NewLife.RocketMQ/Consumer.cs
@@ -80,50 +80,36 @@ public class Consumer : MqBase
/// <summary>启动</summary>
/// <returns></returns>
- public override Boolean Start()
+ protected override void OnStart()
{
- if (Active) return true;
-
WriteLog("正在准备消费 {0}", Topic);
- using var span = Tracer?.NewSpan($"mq:{Topic}:Start");
- try
+ var list = Data;
+ if (list == null)
{
- var list = Data;
- if (list == null)
+ // 建立消费者数据,用于心跳
+ var sd = new SubscriptionData
{
- // 建立消费者数据,用于心跳
- var sd = new SubscriptionData
- {
- Topic = Topic,
- TagsSet = Tags
- };
- var cd = new ConsumerData
- {
- GroupName = Group,
- ConsumeFromWhere = FromLastOffset ? "CONSUME_FROM_LAST_OFFSET" : "CONSUME_FROM_FIRST_OFFSET",
- MessageModel = MessageModel.ToString().ToUpper(),
- SubscriptionDataSet = new[] { sd },
- };
-
- list = new List<ConsumerData> { cd };
-
- Data = list;
- }
+ Topic = Topic,
+ TagsSet = Tags
+ };
+ var cd = new ConsumerData
+ {
+ GroupName = Group,
+ ConsumeFromWhere = FromLastOffset ? "CONSUME_FROM_LAST_OFFSET" : "CONSUME_FROM_FIRST_OFFSET",
+ MessageModel = MessageModel.ToString().ToUpper(),
+ SubscriptionDataSet = [sd],
+ };
- if (!base.Start()) return false;
+ list = [cd];
- // 默认自动开始调度
- if (AutoSchedule) StartSchedule();
+ Data = list;
}
- catch (Exception ex)
- {
- span?.SetError(ex, null);
- throw;
- }
+ base.OnStart();
- return true;
+ // 默认自动开始调度
+ if (AutoSchedule) StartSchedule();
}
/// <summary>
@@ -133,7 +119,7 @@ public class Consumer : MqBase
{
if (!Active) return;
- using var span = Tracer?.NewSpan($"mq:{Topic}:Stop");
+ using var span = Tracer?.NewSpan($"mq:{Name}:Stop");
try
{
// 停止并保存偏移
@@ -207,7 +193,7 @@ public class Consumer : MqBase
else
{
pr.Status = PullStatus.Unknown;
- Log.Warn("响应编号:{0} 响应备注:{1} 序列编号:{2} 序列偏移量:{3}", rs.Header.Code, rs.Header.Remark, mq.QueueId, offset);
+ Log.Warn("[{0}]{1} 序列编号:{2} 序列偏移量:{3}", (ResponseCode)rs.Header.Code, rs.Header.Remark, mq.QueueId, offset);
}
pr.Read(rs.Header?.ExtFields);
@@ -331,7 +317,7 @@ public class Consumer : MqBase
// 在所有Broker上查询
foreach (var item in Brokers)
{
- using var span = Tracer?.NewSpan($"mq:{Topic}:GetConsumers", item.Name);
+ using var span = Tracer?.NewSpan($"mq:{Name}:GetConsumers", item.Name);
try
{
var bk = GetBroker(item.Name);
@@ -476,7 +462,7 @@ public class Consumer : MqBase
DefaultSpan.Current = null;
// 性能埋点
- using var span = Tracer?.NewSpan($"mq:{Topic}:Consume", pr.Messages);
+ using var span = Tracer?.NewSpan($"mq:{Name}:Consume", pr.Messages);
try
{
// 触发消费
@@ -512,7 +498,7 @@ public class Consumer : MqBase
break;
case PullStatus.Unknown:
- Log.Error("未知响应类型消息序列[{1}]偏移量{0}", st.Offset, st.Queue.QueueId);
+ Log.Error("未知响应类型消息,序列[{1}]偏移量{0}", st.Offset, st.Queue.QueueId);
break;
default:
break;
@@ -587,7 +573,8 @@ public class Consumer : MqBase
class QueueStore
{
- [XmlIgnore] public MessageQueue Queue { get; set; }
+ [XmlIgnore]
+ public MessageQueue Queue { get; set; }
public Int64 Offset { get; set; } = -1;
public Int64 CommitOffset { get; set; } = -1;
@@ -602,6 +589,7 @@ public class Consumer : MqBase
/// <returns></returns>
public override Int32 GetHashCode() => (Queue == null ? 0 : Queue.GetHashCode()) ^ Offset.GetHashCode();
+ public override String ToString() => Queue?.ToString();
#endregion
}
@@ -682,7 +670,7 @@ public class Consumer : MqBase
var str = dic.Join(";", e => $"{e.Key}[{e.Value}]");
WriteLog("消费重新平衡,当前消费者负责queue分片:{0}", str);
- using var span = Tracer?.NewSpan($"mq:{Topic}:Rebalance", str);
+ using var span = Tracer?.NewSpan($"mq:{Name}:Rebalance", str);
_Queues = rs.ToArray();
await InitOffsetAsync();
@@ -710,7 +698,7 @@ public class Consumer : MqBase
if (_checking) return;
_checking = true;
- using var span = Tracer?.NewSpan($"mq:{Topic}:CheckGroup");
+ using var span = Tracer?.NewSpan($"mq:{Name}:CheckGroup");
try
{
var rs = await Rebalance();
@@ -766,7 +754,8 @@ public class Consumer : MqBase
if (store.Offset >= 0) continue;
var item = offsetTables.FirstOrDefault(t => t.Key.BrokerName == store.Queue.BrokerName && t.Key.QueueId == store.Queue.QueueId);
- var offsetTable = item.Value;
+ //!! 阿里云公网版RocketMQ,消费者状态返回的是真正brokerName,而前面Broker得到的是网关名,导致这里无法匹配
+ var offsetTable = item.Value ?? new OffsetWrapperModel();
if (neverConsumed)
{
var offset = 0L;
@@ -937,10 +926,11 @@ public class Consumer : MqBase
ci.Properties = dic;
var sd = new SubscriptionData { Topic = Topic, };
- ci.SubscriptionSet = new[] { sd };
+ ci.SubscriptionSet = [sd];
var sb = new StringBuilder();
sb.Append('{');
+ if (_Queues != null)
{
sb.Append("\"mqTable\":{");
for (var i = 0; i < _Queues.Length; i++)
diff --git a/NewLife.RocketMQ/MqBase.cs b/NewLife.RocketMQ/MqBase.cs
index f165d5f..ea15684 100644
--- a/NewLife.RocketMQ/MqBase.cs
+++ b/NewLife.RocketMQ/MqBase.cs
@@ -10,42 +10,19 @@ namespace NewLife.RocketMQ.Client;
public abstract class MqBase : DisposeBase
{
#region 属性
+ /// <summary>名称</summary>
+ public String Name { get; set; }
+
/// <summary>名称服务器地址</summary>
public String NameServerAddress { get; set; }
- private String _group = "DEFAULT_PRODUCER";
/// <summary>消费组</summary>
/// <remarks>阿里云目前需要在Group前面带上实例ID并用【%】连接,组成路由Group[用来路由到实例Group]</remarks>
- public String Group
- {
- get
- {
- // 阿里云目前需要在Group前面带上实例ID并用【%】连接,组成路由Group[用来路由到实例Group]
- var ins = Aliyun?.InstanceId;
- return ins.IsNullOrEmpty() ? _group : $"{ins}%{_group}";
- }
- set
- {
- _group = value;
- }
- }
+ public String Group { get; set; } = "DEFAULT_PRODUCER";
- private String _topic = "TBW102";
/// <summary>主题</summary>
/// <remarks>阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic]</remarks>
- public String Topic
- {
- get
- {
- // 阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic]
- var ins = Aliyun?.InstanceId;
- return ins.IsNullOrEmpty() ? _topic : $"{ins}%{_topic}";
- }
- set
- {
- _topic = value;
- }
- }
+ public String Topic { get; set; } = "TBW102";
/// <summary>本地IP地址</summary>
public String ClientIP { get; set; } = NetHelper.MyIP() + "";
@@ -91,6 +68,9 @@ public abstract class MqBase : DisposeBase
/// <summary>性能跟踪</summary>
public ITracer Tracer { get; set; } = DefaultTracer.Instance;
+ private String _group;
+ private String _topic;
+
/// <summary>名称服务器</summary>
protected NameClient _NameServer;
#endregion
@@ -143,7 +123,7 @@ public abstract class MqBase : DisposeBase
/// <summary>友好字符串</summary>
/// <returns></returns>
- public override String ToString() => Group;
+ public override String ToString() => _group;
#endregion
#region 基础方法
@@ -151,28 +131,63 @@ public abstract class MqBase : DisposeBase
/// <param name="setting"></param>
public virtual void Configure(MqSetting setting)
{
- NameServerAddress = setting.NameServer;
- Topic = setting.Topic;
- Group = setting.Group;
+ if (!setting.NameServer.IsNullOrEmpty()) NameServerAddress = setting.NameServer;
+ if (!setting.Topic.IsNullOrEmpty()) Topic = setting.Topic;
+ if (!setting.Group.IsNullOrEmpty()) Group = setting.Group;
+
+ Aliyun ??= new AliyunOptions();
+ if (!setting.Server.IsNullOrEmpty()) Aliyun.Server = setting.Server;
+ if (!setting.AccessKey.IsNullOrEmpty()) Aliyun.AccessKey = setting.AccessKey;
+ if (!setting.SecretKey.IsNullOrEmpty()) Aliyun.SecretKey = setting.SecretKey;
+ }
+
+ /// <summary>开始</summary>
+ /// <returns></returns>
+ public Boolean Start()
+ {
+ if (Active) return true;
- if (!setting.Server.IsNullOrEmpty() &&
- !setting.AccessKey.IsNullOrEmpty())
+ _group = Group;
+ _topic = Topic;
+ if (Name.IsNullOrEmpty()) Name = Topic;
+
+ // 解析阿里云实例
+ var aliyun = Aliyun;
+ if (aliyun != null && !aliyun.AccessKey.IsNullOrEmpty())
+ {
+ var ns = NameServerAddress;
+ if (aliyun.InstanceId.IsNullOrEmpty() && !ns.IsNullOrEmpty() && ns.Contains("MQ_INST_"))
+ {
+ aliyun.InstanceId = ns.Substring("://", ".");
+ }
+ }
+
+ using var span = Tracer?.NewSpan($"mq:{Name}:Start");
+ try
{
- Aliyun = new AliyunOptions
+ // 阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic]
+ var ins = Aliyun?.InstanceId;
+ if (!ins.IsNullOrEmpty())
{
- Server = setting.Server,
- AccessKey = setting.AccessKey,
- SecretKey = setting.SecretKey,
- };
+ if (!Topic.StartsWith(ins)) Topic = $"{ins}%{Topic}";
+ if (!Group.StartsWith(ins)) Group = $"{ins}%{Group}";
+ }
+
+ OnStart();
+ }
+ catch (Exception ex)
+ {
+ span?.SetError(ex, null);
+
+ throw;
}
+
+ return Active = true;
}
/// <summary>开始</summary>
- /// <returns></returns>
- public virtual Boolean Start()
+ protected virtual void OnStart()
{
- if (Active) return true;
-
if (NameServerAddress.IsNullOrEmpty())
{
// 获取阿里云ONS的名称服务器地址
@@ -190,7 +205,7 @@ public abstract class MqBase : DisposeBase
var client = new NameClient(ClientId, this)
{
- Name = Topic,
+ Name = Name,
Tracer = Tracer,
Log = Log
};
@@ -204,8 +219,6 @@ public abstract class MqBase : DisposeBase
}
_NameServer = client;
-
- return Active = true;
}
/// <summary>停止</summary>
@@ -313,7 +326,7 @@ public abstract class MqBase : DisposeBase
order = false,
};
- using var span = Tracer?.NewSpan($"mq:{Topic}:CreateTopic", header);
+ using var span = Tracer?.NewSpan($"mq:{Name}:CreateTopic", header);
try
{
// 在所有Broker上创建Topic
diff --git a/NewLife.RocketMQ/NameClient.cs b/NewLife.RocketMQ/NameClient.cs
index 36efce5..1ff889c 100644
--- a/NewLife.RocketMQ/NameClient.cs
+++ b/NewLife.RocketMQ/NameClient.cs
@@ -46,6 +46,9 @@ public class NameClient : ClusterClient
protected override void OnStart()
{
var cfg = Config;
+ if (cfg.NameServerAddress.IsNullOrEmpty())
+ throw new ArgumentNullException(nameof(cfg.NameServerAddress), "未指定NameServer地址");
+
var ss = cfg.NameServerAddress.Split(";");
var list = new List<NetUri>();
diff --git a/NewLife.RocketMQ/Producer.cs b/NewLife.RocketMQ/Producer.cs
index 7c5d611..bb1a144 100644
--- a/NewLife.RocketMQ/Producer.cs
+++ b/NewLife.RocketMQ/Producer.cs
@@ -33,9 +33,9 @@ public class Producer : MqBase
#region 基础方法
/// <summary>启动</summary>
/// <returns></returns>
- public override Boolean Start()
+ protected override void OnStart()
{
- if (!base.Start()) return false;
+ base.OnStart();
LoadBalance ??= new WeightRoundRobin();
@@ -48,8 +48,6 @@ public class Producer : MqBase
LoadBalance.Ready = false;
};
}
-
- return true;
}
#endregion
@@ -72,7 +70,7 @@ public class Producer : MqBase
header.QueueId = mq.QueueId;
// 性能埋点
- using var span = Tracer?.NewSpan($"mq:{Topic}:Publish", message.BodyString);
+ using var span = Tracer?.NewSpan($"mq:{Name}:Publish", message.BodyString);
span?.AppendTag($"queue={mq}");
try
{
@@ -179,7 +177,7 @@ public class Producer : MqBase
header.QueueId = mq.QueueId;
// 性能埋点
- using var span = Tracer?.NewSpan($"mq:{Topic}:PublishAsync", message.BodyString);
+ using var span = Tracer?.NewSpan($"mq:{Name}:PublishAsync", message.BodyString);
try
{
// 根据队列获取Broker客户端
@@ -260,7 +258,7 @@ public class Producer : MqBase
header.QueueId = mq.QueueId;
// 性能埋点
- using var span = Tracer?.NewSpan($"mq:{Topic}:PublishOneway", message.BodyString);
+ using var span = Tracer?.NewSpan($"mq:{Name}:PublishOneway", message.BodyString);
try
{
// 根据队列获取Broker客户端
@@ -333,7 +331,7 @@ public class Producer : MqBase
header.QueueId = mq.QueueId;
// 性能埋点
- using var span = Tracer?.NewSpan($"mq:{Topic}:PublishDelay", new { level, message.BodyString });
+ using var span = Tracer?.NewSpan($"mq:{Name}:PublishDelay", new { level, message.BodyString });
try
{
// 根据队列获取Broker客户端
diff --git a/NewLife.RocketMQ/Protocol/ConsumerStates/MessageQueueModel.cs b/NewLife.RocketMQ/Protocol/ConsumerStates/MessageQueueModel.cs
index 23ba90d..67cf1b3 100644
--- a/NewLife.RocketMQ/Protocol/ConsumerStates/MessageQueueModel.cs
+++ b/NewLife.RocketMQ/Protocol/ConsumerStates/MessageQueueModel.cs
@@ -1,33 +1,36 @@
-namespace NewLife.RocketMQ.Protocol.ConsumerStates
+namespace NewLife.RocketMQ.Protocol.ConsumerStates;
+
+/// <summary>
+/// 消息队列信息模型
+/// </summary>
+public class MessageQueueModel
{
/// <summary>
- /// 消息队列信息模型
+ /// Broker服务器名称
/// </summary>
- public class MessageQueueModel
- {
- /// <summary>
- /// Broker服务器名称
- /// </summary>
- public String BrokerName { get; set; }
+ public String BrokerName { get; set; }
- /// <summary>
- /// 队列编码
- /// </summary>
- public Int32 QueueId { get; set; }
+ /// <summary>
+ /// 队列编码
+ /// </summary>
+ public Int32 QueueId { get; set; }
+
+ /// <summary>
+ /// 主题
+ /// </summary>
+ public String Topic { get; set; }
- /// <summary>
- /// 主题
- /// </summary>
- public String Topic { get; set; }
+ /// <summary>
+ /// 阿里版本返回字段
+ /// </summary>
+ public Boolean MainQueue { get; set; }
- /// <summary>
- /// 阿里版本返回字段
- /// </summary>
- public Boolean MainQueue { get; set; }
+ /// <summary>
+ /// 阿里版本返回字段
+ /// </summary>
+ public Int32 QueueGroupId { get; set; }
- /// <summary>
- /// 阿里版本返回字段
- /// </summary>
- public Int32 QueueGroupId { get; set; }
- }
+ /// <summary>已重载。</summary>
+ /// <returns></returns>
+ public override String ToString() => $"{BrokerName}[{QueueId}]";
}
diff --git a/NewLife.RocketMQ/Protocol/MessageQueue.cs b/NewLife.RocketMQ/Protocol/MessageQueue.cs
index b0e58a5..918881d 100644
--- a/NewLife.RocketMQ/Protocol/MessageQueue.cs
+++ b/NewLife.RocketMQ/Protocol/MessageQueue.cs
@@ -38,6 +38,6 @@ public class MessageQueue
#region 辅助
/// <summary>友好字符串</summary>
/// <returns></returns>
- public override String ToString() => $"MessageQueue[Topic={Topic}, BrokerName={BrokerName}, QueueId={QueueId}]";
+ public override String ToString() => $"{BrokerName}[{QueueId}]";
#endregion
}
\ No newline at end of file
diff --git a/Test/Program.cs b/Test/Program.cs
index c10c09a..af44b54 100644
--- a/Test/Program.cs
+++ b/Test/Program.cs
@@ -17,8 +17,8 @@ class Program
{
XTrace.UseConsole();
- Test5();
- //TestAliyun();
+ //Test5();
+ TestAliyun();
Console.WriteLine("OK!");
Console.ReadKey();
@@ -84,6 +84,8 @@ class Program
static void TestAliyun()
{
+ // 2024-04-10 对接阿里云RocketMQ v4测试通过。
+ // 创建RocketMQ实例后,需要手工创建Topic和Group,并创建正确的AccessKey
var consumer = new Consumer
{
Topic = "newlife_test_02",
@@ -94,7 +96,7 @@ class Program
{
AccessKey = "LTAI5tKTGShu31C61xRARVC4",
SecretKey = "a9oPwph1IcMGanWckzUOwOf3Ork8LO",
- InstanceId = "MQ_INST_1827694722767531_BXxCwUhm",
+ //InstanceId = "MQ_INST_1827694722767531_BXxCwUhm",
},
FromLastOffset = true,
@@ -107,7 +109,7 @@ class Program
consumer.OnConsume = OnConsume;
- //consumer.Configure(MqSetting.Current);
+ consumer.Configure(MqSetting.Current);
consumer.Start();
_consumer = consumer;
diff --git a/XUnitTestRocketMQ/AliyunIssuesTests.cs b/XUnitTestRocketMQ/AliyunIssuesTests.cs
index 0ed91a8..f0da090 100644
--- a/XUnitTestRocketMQ/AliyunIssuesTests.cs
+++ b/XUnitTestRocketMQ/AliyunIssuesTests.cs
@@ -7,76 +7,75 @@ using NewLife.RocketMQ;
using NewLife.RocketMQ.Protocol;
using Xunit;
-namespace XUnitTestRocketMQ
+namespace XUnitTestRocketMQ;
+
+/// <summary>
+/// 修复Issues调用阿里云版RocketMQ相关问题
+/// #35、#24
+/// </summary>
+public class AliyunIssuesTests
{
- /// <summary>
- /// 修复Issues调用阿里云版RocketMQ相关问题
- /// #35、#24
- /// </summary>
- public class AliyunIssuesTests
+ private readonly String _testTopic = "newlife_test_01";
+ private readonly String _testGroup = "GID_newlife_Group01";
+ private static readonly AliyunOptions _aliyunOptions = new AliyunOptions()
+ {
+ AccessKey = "LTAIxxxxxxxxxxxxRARVC4",
+ SecretKey = "a9oPwxxxxxxxxxxx3OrxxLO",
+ Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet",
+ InstanceId = "MQ_INST_xxxxxxxxxxxx_AXxCwUhm"
+ };
+
+ [Fact]
+ public void ProducerForAliyun_Test()
{
- private readonly String _testTopic = "newlife_test_01";
- private readonly String _testGroup = "GID_newlife_Group01";
- private static readonly AliyunOptions _aliyunOptions = new AliyunOptions()
+ var producer = new Producer()
{
- AccessKey = "LTAIxxxxxxxxxxxxRARVC4",
- SecretKey = "a9oPwxxxxxxxxxxx3OrxxLO",
- Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet",
- InstanceId = "MQ_INST_xxxxxxxxxxxx_AXxCwUhm"
+ Topic = _testTopic,
+ Aliyun = _aliyunOptions,
+ //NameServerAddress = "http://MQ_INST_xxxxxxxxxx_AXxCwUhm.mq-internet-access.mq-internet.aliyuncs.com:80",
+ //如果不用上面的默认Server地址,直接将NameServerAddress设为你自己的TCP公网接收点地址也是可以的
};
- [Fact]
- public void ProducerForAliyun_Test()
- {
- var producer = new Producer()
- {
- Topic = _testTopic,
- Aliyun = _aliyunOptions,
- //NameServerAddress = "http://MQ_INST_xxxxxxxxxx_AXxCwUhm.mq-internet-access.mq-internet.aliyuncs.com:80",
- //如果不用上面的默认Server地址,直接将NameServerAddress设为你自己的TCP公网接收点地址也是可以的
- };
+ producer.Start();
- producer.Start();
+ var pubResultList = new List<Boolean>();
+ for (var i = 0; i < 2; i++)
+ {
+ var message = "大家好才是真的好!";
+ var pubResult = producer.Publish(message, "newlife_test_tag");
+ pubResultList.Add(pubResult.Status == SendStatus.SendOK);
+ }
+ Assert.True(pubResultList.All(t => true));
- var pubResultList = new List<Boolean>();
- for (var i = 0; i < 2; i++)
- {
- var message = "大家好才是真的好!";
- var pubResult = producer.Publish(message, "newlife_test_tag");
- pubResultList.Add(pubResult.Status == SendStatus.SendOK);
- }
- Assert.True(pubResultList.All(t => true));
+ producer.Dispose();
+ }
- producer.Dispose();
- }
+ [Fact]
+ public void ConsumerForAliyun_Test()
+ {
+ var consumer = new Consumer()
+ {
+ Topic = _testTopic,
+ Aliyun = _aliyunOptions,
+ Group = _testGroup,
+ FromLastOffset = true,
+ BatchSize = 1,
+ };
- [Fact]
- public void ConsumerForAliyun_Test()
+ consumer.OnConsume = OnConsume;
+ consumer.Start();
+ Thread.Sleep(3000);
+
+ static Boolean OnConsume(MessageQueue q, MessageExt[] ms)
{
- var consumer = new Consumer()
- {
- Topic = _testTopic,
- Aliyun = _aliyunOptions,
- Group = _testGroup,
- FromLastOffset = true,
- BatchSize = 1,
- };
+ Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);
- consumer.OnConsume = OnConsume;
- consumer.Start();
- Thread.Sleep(3000);
-
- static Boolean OnConsume(MessageQueue q, MessageExt[] ms)
+ foreach (var item in ms.ToList())
{
- Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);
-
- foreach (var item in ms.ToList())
- {
- Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】");
- }
-
- return true;
+ Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】");
}
+
+ return true;
}
}
}
diff --git a/XUnitTestRocketMQ/AliyunTests.cs b/XUnitTestRocketMQ/AliyunTests.cs
index 45e48c3..a871a80 100644
--- a/XUnitTestRocketMQ/AliyunTests.cs
+++ b/XUnitTestRocketMQ/AliyunTests.cs
@@ -9,107 +9,106 @@ using System.Threading;
using System.Threading.Tasks;
using Xunit;
-namespace XUnitTestRocketMQ
+namespace XUnitTestRocketMQ;
+
+public class AliyunTests
{
- public class AliyunTests
+ private static void SetConfig(MqBase mq)
{
- private static void SetConfig(MqBase mq)
- {
- //mq.Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet";
- mq.Configure(MqSetting.Current);
+ //mq.Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet";
+ mq.Configure(MqSetting.Current);
- mq.Log = XTrace.Log;
- }
+ mq.Log = XTrace.Log;
+ }
- [Fact]
- public void CreateTopic()
+ [Fact]
+ public void CreateTopic()
+ {
+ var mq = new Producer
{
- var mq = new Producer
- {
- //Topic = "nx_test",
- };
- SetConfig(mq);
+ //Topic = "nx_test",
+ };
+ SetConfig(mq);
- mq.Start();
+ mq.Start();
- // 创建topic时,start前不能指定topic,让其使用默认TBW102
- Assert.Equal("TBW102", mq.Topic);
+ // 创建topic时,start前不能指定topic,让其使用默认TBW102
+ Assert.Equal("TBW102", mq.Topic);
- mq.CreateTopic("nx_test", 2);
- }
+ mq.CreateTopic("nx_test", 2);
+ }
- [Fact]
- static void ProduceTest()
+ [Fact]
+ static void ProduceTest()
+ {
+ using var mq = new Producer
{
- using var mq = new Producer
- {
- Topic = "test1",
- };
- SetConfig(mq);
+ Topic = "test1",
+ };
+ SetConfig(mq);
- mq.Start();
+ mq.Start();
- for (var i = 0; i < 10; i++)
- {
- var str = "学无先后达者为师" + i;
- //var str = Rand.NextString(1337);
+ for (var i = 0; i < 10; i++)
+ {
+ var str = "学无先后达者为师" + i;
+ //var str = Rand.NextString(1337);
- var sr = mq.Publish(str, "TagA", null);
- }
+ var sr = mq.Publish(str, "TagA", null);
}
+ }
- [Fact]
- static async Task ProduceAsyncTest()
+ [Fact]
+ static async Task ProduceAsyncTest()
+ {
+ using var mq = new Producer
{
- using var mq = new Producer
- {
- Topic = "test1",
- };
- SetConfig(mq);
+ Topic = "test1",
+ };
+ SetConfig(mq);
- mq.Start();
+ mq.Start();
- for (var i = 0; i < 10; i++)
- {
- var str = "学无先后达者为师" + i;
- //var str = Rand.NextString(1337);
+ for (var i = 0; i < 10; i++)
+ {
+ var str = "学无先后达者为师" + i;
+ //var str = Rand.NextString(1337);
- var sr = await mq.PublishAsync(str, "TagA", null);
- }
+ var sr = await mq.PublishAsync(str, "TagA", null);
}
+ }
- private static Consumer _consumer;
- [Fact]
- static void ConsumeTest()
+ private static Consumer _consumer;
+ [Fact]
+ static void ConsumeTest()
+ {
+ var consumer = new Consumer
{
- var consumer = new Consumer
- {
- Topic = "test1",
- Group = "test",
+ Topic = "test1",
+ Group = "test",
- FromLastOffset = true,
- BatchSize = 20,
- };
- SetConfig(consumer);
+ FromLastOffset = true,
+ BatchSize = 20,
+ };
+ SetConfig(consumer);
- consumer.OnConsume = OnConsume;
- consumer.Start();
+ consumer.OnConsume = OnConsume;
+ consumer.Start();
- _consumer = consumer;
-
- Thread.Sleep(3000);
- }
+ _consumer = consumer;
- private static Boolean OnConsume(MessageQueue q, MessageExt[] ms)
- {
- Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);
+ Thread.Sleep(3000);
+ }
- foreach (var item in ms.ToList())
- {
- Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】");
- }
+ private static Boolean OnConsume(MessageQueue q, MessageExt[] ms)
+ {
+ Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);
- return true;
+ foreach (var item in ms.ToList())
+ {
+ Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】");
}
+
+ return true;
}
}
\ No newline at end of file