修复Pop/Ack/ChangeInvisibleTime操作缺少queueId参数及添加MessageExt便利方法 Co-authored-by: nnhy <506367+nnhy@users.noreply.github.com>copilot-swe-agent[bot] authored at 2026-03-19 11:29:45 Stone committed at 2026-03-19 11:55:15
diff --git a/NewLife.RocketMQ/Consumer.cs b/NewLife.RocketMQ/Consumer.cs
index 5f0252a..b41f21e 100644
--- a/NewLife.RocketMQ/Consumer.cs
+++ b/NewLife.RocketMQ/Consumer.cs
@@ -1474,12 +1474,13 @@ public class Consumer : MqBase
#region Pop消费模式
/// <summary>Pop方式拉取消息。5.0新增的轻量消费模式,无需客户端Rebalance</summary>
/// <param name="brokerName">Broker名称</param>
+ /// <param name="queueId">队列编号。-1表示由Broker自动分配</param>
/// <param name="maxNums">最大拉取数</param>
/// <param name="invisibleTime">不可见时间(毫秒),消息被拉取后在此时间内不会被其他消费者看到</param>
/// <param name="pollTime">长轮询等待时间(毫秒)</param>
/// <param name="cancellationToken">取消通知</param>
/// <returns>拉取结果</returns>
- public async Task<PullResult> PopMessageAsync(String brokerName, Int32 maxNums = 32, Int64 invisibleTime = 60_000, Int32 pollTime = 15_000, CancellationToken cancellationToken = default)
+ public async Task<PullResult> PopMessageAsync(String brokerName, Int32 queueId = -1, Int32 maxNums = 32, Int64 invisibleTime = 60_000, Int32 pollTime = 15_000, CancellationToken cancellationToken = default)
{
if (String.IsNullOrEmpty(brokerName)) throw new ArgumentNullException(nameof(brokerName));
@@ -1493,6 +1494,7 @@ public class Consumer : MqBase
{
consumerGroup = Group,
topic = Topic,
+ queueId,
maxMsgNums = maxNums,
invisibleTime,
pollTime,
@@ -1528,11 +1530,12 @@ public class Consumer : MqBase
/// <summary>确认Pop消息消费完成</summary>
/// <param name="brokerName">Broker名称</param>
- /// <param name="extraInfo">消息额外信息,Pop拉取时返回</param>
- /// <param name="offset">消息偏移</param>
+ /// <param name="extraInfo">Pop检查点信息,即消息属性中的POP_CK字段值</param>
+ /// <param name="offset">消息在Queue中的偏移量</param>
+ /// <param name="queueId">队列编号</param>
/// <param name="cancellationToken">取消通知</param>
/// <returns></returns>
- public async Task<Boolean> AckMessageAsync(String brokerName, String extraInfo, Int64 offset, CancellationToken cancellationToken = default)
+ public async Task<Boolean> AckMessageAsync(String brokerName, String extraInfo, Int64 offset, Int32 queueId = -1, CancellationToken cancellationToken = default)
{
using var span = Tracer?.NewSpan($"mq:{Name}:AckMessage", offset);
try
@@ -1546,6 +1549,7 @@ public class Consumer : MqBase
topic = Topic,
extraInfo,
offset,
+ queueId,
};
await bk.InvokeAsync(RequestCode.ACK_MESSAGE, null, header, true, cancellationToken).ConfigureAwait(false);
@@ -1559,14 +1563,28 @@ public class Consumer : MqBase
}
}
+ /// <summary>确认Pop消息消费完成。自动从消息属性中提取Pop检查点信息(POP_CK)</summary>
+ /// <param name="brokerName">Broker名称</param>
+ /// <param name="msg">通过Pop方式拉取的消息</param>
+ /// <param name="cancellationToken">取消通知</param>
+ /// <returns></returns>
+ public Task<Boolean> AckMessageAsync(String brokerName, MessageExt msg, CancellationToken cancellationToken = default)
+ {
+ if (msg == null) throw new ArgumentNullException(nameof(msg));
+ if (String.IsNullOrEmpty(msg.PopCheckPoint)) throw new ArgumentException("消息不含Pop检查点信息(POP_CK属性缺失),请确认该消息是通过Pop方式拉取的。", nameof(msg));
+
+ return AckMessageAsync(brokerName, msg.PopCheckPoint, msg.QueueOffset, msg.QueueId, cancellationToken);
+ }
+
/// <summary>修改Pop消息不可见时间,延长消费处理窗口</summary>
/// <param name="brokerName">Broker名称</param>
- /// <param name="extraInfo">消息额外信息</param>
- /// <param name="offset">消息偏移</param>
+ /// <param name="extraInfo">Pop检查点信息,即消息属性中的POP_CK字段值</param>
+ /// <param name="offset">消息在Queue中的偏移量</param>
/// <param name="invisibleTime">新的不可见时间(毫秒)</param>
+ /// <param name="queueId">队列编号</param>
/// <param name="cancellationToken">取消通知</param>
/// <returns></returns>
- public async Task<Boolean> ChangeInvisibleTimeAsync(String brokerName, String extraInfo, Int64 offset, Int64 invisibleTime, CancellationToken cancellationToken = default)
+ public async Task<Boolean> ChangeInvisibleTimeAsync(String brokerName, String extraInfo, Int64 offset, Int64 invisibleTime, Int32 queueId = -1, CancellationToken cancellationToken = default)
{
using var span = Tracer?.NewSpan($"mq:{Name}:ChangeInvisibleTime", offset);
try
@@ -1581,6 +1599,7 @@ public class Consumer : MqBase
extraInfo,
offset,
invisibleTime,
+ queueId,
};
await bk.InvokeAsync(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, null, header, true, cancellationToken).ConfigureAwait(false);
@@ -1594,6 +1613,20 @@ public class Consumer : MqBase
}
}
+ /// <summary>修改Pop消息不可见时间,延长消费处理窗口。自动从消息属性中提取Pop检查点信息(POP_CK)</summary>
+ /// <param name="brokerName">Broker名称</param>
+ /// <param name="msg">通过Pop方式拉取的消息</param>
+ /// <param name="invisibleTime">新的不可见时间(毫秒)</param>
+ /// <param name="cancellationToken">取消通知</param>
+ /// <returns></returns>
+ public Task<Boolean> ChangeInvisibleTimeAsync(String brokerName, MessageExt msg, Int64 invisibleTime, CancellationToken cancellationToken = default)
+ {
+ if (msg == null) throw new ArgumentNullException(nameof(msg));
+ if (String.IsNullOrEmpty(msg.PopCheckPoint)) throw new ArgumentException("消息不含Pop检查点信息(POP_CK属性缺失),请确认该消息是通过Pop方式拉取的。", nameof(msg));
+
+ return ChangeInvisibleTimeAsync(brokerName, msg.PopCheckPoint, msg.QueueOffset, invisibleTime, msg.QueueId, cancellationToken);
+ }
+
/// <summary>批量确认Pop消息消费完成</summary>
/// <param name="brokerName">Broker名称</param>
/// <param name="ackEntries">批量确认条目列表,每个条目包含extraInfo和offset</param>
diff --git a/NewLife.RocketMQ/Protocol/MessageExt.cs b/NewLife.RocketMQ/Protocol/MessageExt.cs
index 4df9eae..62221c1 100644
--- a/NewLife.RocketMQ/Protocol/MessageExt.cs
+++ b/NewLife.RocketMQ/Protocol/MessageExt.cs
@@ -58,6 +58,13 @@ public class MessageExt : Message, IAccessor
/// <summary>消息编号</summary>
public String MsgId { get; set; }
+
+ /// <summary>Pop检查点信息。Pop消费模式下由Broker在消息属性中返回,Ack/ChangeInvisibleTime操作时需传入此值</summary>
+ public String PopCheckPoint
+ {
+ get => Properties.TryGetValue("POP_CK", out var str) ? str : null;
+ set => Properties["POP_CK"] = value;
+ }
#endregion
#region 构造
diff --git a/XUnitTestRocketMQ/PopConsumeTests.cs b/XUnitTestRocketMQ/PopConsumeTests.cs
index 828db5b..62cfb2a 100644
--- a/XUnitTestRocketMQ/PopConsumeTests.cs
+++ b/XUnitTestRocketMQ/PopConsumeTests.cs
@@ -1,6 +1,7 @@
using System;
using System.ComponentModel;
using NewLife.RocketMQ;
+using NewLife.RocketMQ.Protocol;
using Xunit;
namespace XUnitTestRocketMQ;
@@ -27,6 +28,16 @@ public class PopConsumeTests
}
[Fact]
+ [DisplayName("PopMessageAsync_可指定queueId参数")]
+ public async void PopMessageAsync_WithQueueId_ThrowsWhenBrokerNameNull()
+ {
+ using var consumer = new Consumer();
+ // 验证带queueId的重载依然会在brokerName为null时抛出异常
+ await Assert.ThrowsAsync<ArgumentNullException>(() =>
+ consumer.PopMessageAsync(null, queueId: 0));
+ }
+
+ [Fact]
[DisplayName("AckMessageAsync_无Broker连接时返回false")]
public async void AckMessageAsync_NoBroker_ReturnsFalse()
{
@@ -37,6 +48,46 @@ public class PopConsumeTests
}
[Fact]
+ [DisplayName("AckMessageAsync_指定queueId_无Broker连接时返回false")]
+ public async void AckMessageAsync_WithQueueId_NoBroker_ReturnsFalse()
+ {
+ using var consumer = new Consumer();
+ var result = await consumer.AckMessageAsync("nonexistent", "extra", 0, queueId: 2);
+ Assert.False(result);
+ }
+
+ [Fact]
+ [DisplayName("AckMessageAsync_传入MessageExt_Null消息抛出异常")]
+ public async void AckMessageAsync_NullMsg_ThrowsException()
+ {
+ using var consumer = new Consumer();
+ await Assert.ThrowsAsync<ArgumentNullException>(() =>
+ consumer.AckMessageAsync("broker", (MessageExt)null));
+ }
+
+ [Fact]
+ [DisplayName("AckMessageAsync_传入MessageExt_缺少POP_CK属性抛出异常")]
+ public async void AckMessageAsync_MsgWithoutPopCk_ThrowsArgumentException()
+ {
+ using var consumer = new Consumer();
+ var msg = new MessageExt { QueueId = 1, QueueOffset = 100 };
+ // 没有设置 PopCheckPoint(POP_CK)
+ await Assert.ThrowsAsync<ArgumentException>(() =>
+ consumer.AckMessageAsync("broker", msg));
+ }
+
+ [Fact]
+ [DisplayName("AckMessageAsync_传入MessageExt_无Broker连接时返回false")]
+ public async void AckMessageAsync_WithMsgExt_NoBroker_ReturnsFalse()
+ {
+ using var consumer = new Consumer();
+ var msg = new MessageExt { QueueId = 1, QueueOffset = 100 };
+ msg.PopCheckPoint = "100 1700000000000 60000 1 broker-a 1";
+ var result = await consumer.AckMessageAsync("nonexistent", msg);
+ Assert.False(result);
+ }
+
+ [Fact]
[DisplayName("ChangeInvisibleTimeAsync_无Broker连接时返回false")]
public async void ChangeInvisibleTimeAsync_NoBroker_ReturnsFalse()
{
@@ -46,12 +97,64 @@ public class PopConsumeTests
}
[Fact]
+ [DisplayName("ChangeInvisibleTimeAsync_指定queueId_无Broker连接时返回false")]
+ public async void ChangeInvisibleTimeAsync_WithQueueId_NoBroker_ReturnsFalse()
+ {
+ using var consumer = new Consumer();
+ var result = await consumer.ChangeInvisibleTimeAsync("nonexistent", "extra", 0, 30000, queueId: 3);
+ Assert.False(result);
+ }
+
+ [Fact]
+ [DisplayName("ChangeInvisibleTimeAsync_传入MessageExt_Null消息抛出异常")]
+ public async void ChangeInvisibleTimeAsync_NullMsg_ThrowsException()
+ {
+ using var consumer = new Consumer();
+ await Assert.ThrowsAsync<ArgumentNullException>(() =>
+ consumer.ChangeInvisibleTimeAsync("broker", (MessageExt)null, 30000));
+ }
+
+ [Fact]
+ [DisplayName("ChangeInvisibleTimeAsync_传入MessageExt_缺少POP_CK属性抛出异常")]
+ public async void ChangeInvisibleTimeAsync_MsgWithoutPopCk_ThrowsArgumentException()
+ {
+ using var consumer = new Consumer();
+ var msg = new MessageExt { QueueId = 2, QueueOffset = 200 };
+ // 没有设置 PopCheckPoint(POP_CK)
+ await Assert.ThrowsAsync<ArgumentException>(() =>
+ consumer.ChangeInvisibleTimeAsync("broker", msg, 30000));
+ }
+
+ [Fact]
+ [DisplayName("ChangeInvisibleTimeAsync_传入MessageExt_无Broker连接时返回false")]
+ public async void ChangeInvisibleTimeAsync_WithMsgExt_NoBroker_ReturnsFalse()
+ {
+ using var consumer = new Consumer();
+ var msg = new MessageExt { QueueId = 2, QueueOffset = 200 };
+ msg.PopCheckPoint = "200 1700000000000 60000 1 broker-a 2";
+ var result = await consumer.ChangeInvisibleTimeAsync("nonexistent", msg, 30000);
+ Assert.False(result);
+ }
+
+ [Fact]
+ [DisplayName("MessageExt_PopCheckPoint属性读写正常")]
+ public void MessageExt_PopCheckPoint_GetSet()
+ {
+ var msg = new MessageExt();
+ Assert.Null(msg.PopCheckPoint);
+
+ msg.PopCheckPoint = "100 1700000000000 60000 1 broker-a 1";
+ Assert.Equal("100 1700000000000 60000 1 broker-a 1", msg.PopCheckPoint);
+ Assert.Equal("100 1700000000000 60000 1 broker-a 1", msg.Properties["POP_CK"]);
+ }
+
+ [Fact]
[DisplayName("RequestCode包含Pop消费相关码")]
public void RequestCode_ContainsPopCodes()
{
- Assert.Equal(200050, (Int32)NewLife.RocketMQ.Protocol.RequestCode.POP_MESSAGE);
- Assert.Equal(200051, (Int32)NewLife.RocketMQ.Protocol.RequestCode.ACK_MESSAGE);
- Assert.Equal(200052, (Int32)NewLife.RocketMQ.Protocol.RequestCode.CHANGE_MESSAGE_INVISIBLETIME);
- Assert.Equal(200151, (Int32)NewLife.RocketMQ.Protocol.RequestCode.BATCH_ACK_MESSAGE);
+ Assert.Equal(200050, (Int32)RequestCode.POP_MESSAGE);
+ Assert.Equal(200051, (Int32)RequestCode.ACK_MESSAGE);
+ Assert.Equal(200052, (Int32)RequestCode.CHANGE_MESSAGE_INVISIBLETIME);
+ Assert.Equal(200151, (Int32)RequestCode.BATCH_ACK_MESSAGE);
}
}