NewLife/NewLife.RocketMQ

修复Pop/Ack/ChangeInvisibleTime操作缺少queueId参数及添加MessageExt便利方法

Co-authored-by: nnhy <506367+nnhy@users.noreply.github.com>
copilot-swe-agent[bot] authored at 2026-03-19 11:29:45 Stone committed at 2026-03-19 11:55:15
425b328
Tree
1 Parent(s) 65175b4
Summary: 3 changed files with 154 additions and 11 deletions.
Modified +40 -7
Modified +7 -0
Modified +107 -4
Modified +40 -7
diff --git a/NewLife.RocketMQ/Consumer.cs b/NewLife.RocketMQ/Consumer.cs
index 5f0252a..b41f21e 100644
--- a/NewLife.RocketMQ/Consumer.cs
+++ b/NewLife.RocketMQ/Consumer.cs
@@ -1474,12 +1474,13 @@ public class Consumer : MqBase
     #region Pop消费模式
     /// <summary>Pop方式拉取消息。5.0新增的轻量消费模式,无需客户端Rebalance</summary>
     /// <param name="brokerName">Broker名称</param>
+    /// <param name="queueId">队列编号。-1表示由Broker自动分配</param>
     /// <param name="maxNums">最大拉取数</param>
     /// <param name="invisibleTime">不可见时间(毫秒),消息被拉取后在此时间内不会被其他消费者看到</param>
     /// <param name="pollTime">长轮询等待时间(毫秒)</param>
     /// <param name="cancellationToken">取消通知</param>
     /// <returns>拉取结果</returns>
-    public async Task<PullResult> PopMessageAsync(String brokerName, Int32 maxNums = 32, Int64 invisibleTime = 60_000, Int32 pollTime = 15_000, CancellationToken cancellationToken = default)
+    public async Task<PullResult> PopMessageAsync(String brokerName, Int32 queueId = -1, Int32 maxNums = 32, Int64 invisibleTime = 60_000, Int32 pollTime = 15_000, CancellationToken cancellationToken = default)
     {
         if (String.IsNullOrEmpty(brokerName)) throw new ArgumentNullException(nameof(brokerName));
 
@@ -1493,6 +1494,7 @@ public class Consumer : MqBase
             {
                 consumerGroup = Group,
                 topic = Topic,
+                queueId,
                 maxMsgNums = maxNums,
                 invisibleTime,
                 pollTime,
@@ -1528,11 +1530,12 @@ public class Consumer : MqBase
 
     /// <summary>确认Pop消息消费完成</summary>
     /// <param name="brokerName">Broker名称</param>
-    /// <param name="extraInfo">消息额外信息,Pop拉取时返回</param>
-    /// <param name="offset">消息偏移</param>
+    /// <param name="extraInfo">Pop检查点信息,即消息属性中的POP_CK字段值</param>
+    /// <param name="offset">消息在Queue中的偏移量</param>
+    /// <param name="queueId">队列编号</param>
     /// <param name="cancellationToken">取消通知</param>
     /// <returns></returns>
-    public async Task<Boolean> AckMessageAsync(String brokerName, String extraInfo, Int64 offset, CancellationToken cancellationToken = default)
+    public async Task<Boolean> AckMessageAsync(String brokerName, String extraInfo, Int64 offset, Int32 queueId = -1, CancellationToken cancellationToken = default)
     {
         using var span = Tracer?.NewSpan($"mq:{Name}:AckMessage", offset);
         try
@@ -1546,6 +1549,7 @@ public class Consumer : MqBase
                 topic = Topic,
                 extraInfo,
                 offset,
+                queueId,
             };
 
             await bk.InvokeAsync(RequestCode.ACK_MESSAGE, null, header, true, cancellationToken).ConfigureAwait(false);
@@ -1559,14 +1563,28 @@ public class Consumer : MqBase
         }
     }
 
+    /// <summary>确认Pop消息消费完成。自动从消息属性中提取Pop检查点信息(POP_CK)</summary>
+    /// <param name="brokerName">Broker名称</param>
+    /// <param name="msg">通过Pop方式拉取的消息</param>
+    /// <param name="cancellationToken">取消通知</param>
+    /// <returns></returns>
+    public Task<Boolean> AckMessageAsync(String brokerName, MessageExt msg, CancellationToken cancellationToken = default)
+    {
+        if (msg == null) throw new ArgumentNullException(nameof(msg));
+        if (String.IsNullOrEmpty(msg.PopCheckPoint)) throw new ArgumentException("消息不含Pop检查点信息(POP_CK属性缺失),请确认该消息是通过Pop方式拉取的。", nameof(msg));
+
+        return AckMessageAsync(brokerName, msg.PopCheckPoint, msg.QueueOffset, msg.QueueId, cancellationToken);
+    }
+
     /// <summary>修改Pop消息不可见时间,延长消费处理窗口</summary>
     /// <param name="brokerName">Broker名称</param>
-    /// <param name="extraInfo">消息额外信息</param>
-    /// <param name="offset">消息偏移</param>
+    /// <param name="extraInfo">Pop检查点信息,即消息属性中的POP_CK字段值</param>
+    /// <param name="offset">消息在Queue中的偏移量</param>
     /// <param name="invisibleTime">新的不可见时间(毫秒)</param>
+    /// <param name="queueId">队列编号</param>
     /// <param name="cancellationToken">取消通知</param>
     /// <returns></returns>
-    public async Task<Boolean> ChangeInvisibleTimeAsync(String brokerName, String extraInfo, Int64 offset, Int64 invisibleTime, CancellationToken cancellationToken = default)
+    public async Task<Boolean> ChangeInvisibleTimeAsync(String brokerName, String extraInfo, Int64 offset, Int64 invisibleTime, Int32 queueId = -1, CancellationToken cancellationToken = default)
     {
         using var span = Tracer?.NewSpan($"mq:{Name}:ChangeInvisibleTime", offset);
         try
@@ -1581,6 +1599,7 @@ public class Consumer : MqBase
                 extraInfo,
                 offset,
                 invisibleTime,
+                queueId,
             };
 
             await bk.InvokeAsync(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, null, header, true, cancellationToken).ConfigureAwait(false);
@@ -1594,6 +1613,20 @@ public class Consumer : MqBase
         }
     }
 
+    /// <summary>修改Pop消息不可见时间,延长消费处理窗口。自动从消息属性中提取Pop检查点信息(POP_CK)</summary>
+    /// <param name="brokerName">Broker名称</param>
+    /// <param name="msg">通过Pop方式拉取的消息</param>
+    /// <param name="invisibleTime">新的不可见时间(毫秒)</param>
+    /// <param name="cancellationToken">取消通知</param>
+    /// <returns></returns>
+    public Task<Boolean> ChangeInvisibleTimeAsync(String brokerName, MessageExt msg, Int64 invisibleTime, CancellationToken cancellationToken = default)
+    {
+        if (msg == null) throw new ArgumentNullException(nameof(msg));
+        if (String.IsNullOrEmpty(msg.PopCheckPoint)) throw new ArgumentException("消息不含Pop检查点信息(POP_CK属性缺失),请确认该消息是通过Pop方式拉取的。", nameof(msg));
+
+        return ChangeInvisibleTimeAsync(brokerName, msg.PopCheckPoint, msg.QueueOffset, invisibleTime, msg.QueueId, cancellationToken);
+    }
+
     /// <summary>批量确认Pop消息消费完成</summary>
     /// <param name="brokerName">Broker名称</param>
     /// <param name="ackEntries">批量确认条目列表,每个条目包含extraInfo和offset</param>
Modified +7 -0
diff --git a/NewLife.RocketMQ/Protocol/MessageExt.cs b/NewLife.RocketMQ/Protocol/MessageExt.cs
index 4df9eae..62221c1 100644
--- a/NewLife.RocketMQ/Protocol/MessageExt.cs
+++ b/NewLife.RocketMQ/Protocol/MessageExt.cs
@@ -58,6 +58,13 @@ public class MessageExt : Message, IAccessor
 
     /// <summary>消息编号</summary>
     public String MsgId { get; set; }
+
+    /// <summary>Pop检查点信息。Pop消费模式下由Broker在消息属性中返回,Ack/ChangeInvisibleTime操作时需传入此值</summary>
+    public String PopCheckPoint
+    {
+        get => Properties.TryGetValue("POP_CK", out var str) ? str : null;
+        set => Properties["POP_CK"] = value;
+    }
     #endregion
 
     #region 构造
Modified +107 -4
diff --git a/XUnitTestRocketMQ/PopConsumeTests.cs b/XUnitTestRocketMQ/PopConsumeTests.cs
index 828db5b..62cfb2a 100644
--- a/XUnitTestRocketMQ/PopConsumeTests.cs
+++ b/XUnitTestRocketMQ/PopConsumeTests.cs
@@ -1,6 +1,7 @@
 using System;
 using System.ComponentModel;
 using NewLife.RocketMQ;
+using NewLife.RocketMQ.Protocol;
 using Xunit;
 
 namespace XUnitTestRocketMQ;
@@ -27,6 +28,16 @@ public class PopConsumeTests
     }
 
     [Fact]
+    [DisplayName("PopMessageAsync_可指定queueId参数")]
+    public async void PopMessageAsync_WithQueueId_ThrowsWhenBrokerNameNull()
+    {
+        using var consumer = new Consumer();
+        // 验证带queueId的重载依然会在brokerName为null时抛出异常
+        await Assert.ThrowsAsync<ArgumentNullException>(() =>
+            consumer.PopMessageAsync(null, queueId: 0));
+    }
+
+    [Fact]
     [DisplayName("AckMessageAsync_无Broker连接时返回false")]
     public async void AckMessageAsync_NoBroker_ReturnsFalse()
     {
@@ -37,6 +48,46 @@ public class PopConsumeTests
     }
 
     [Fact]
+    [DisplayName("AckMessageAsync_指定queueId_无Broker连接时返回false")]
+    public async void AckMessageAsync_WithQueueId_NoBroker_ReturnsFalse()
+    {
+        using var consumer = new Consumer();
+        var result = await consumer.AckMessageAsync("nonexistent", "extra", 0, queueId: 2);
+        Assert.False(result);
+    }
+
+    [Fact]
+    [DisplayName("AckMessageAsync_传入MessageExt_Null消息抛出异常")]
+    public async void AckMessageAsync_NullMsg_ThrowsException()
+    {
+        using var consumer = new Consumer();
+        await Assert.ThrowsAsync<ArgumentNullException>(() =>
+            consumer.AckMessageAsync("broker", (MessageExt)null));
+    }
+
+    [Fact]
+    [DisplayName("AckMessageAsync_传入MessageExt_缺少POP_CK属性抛出异常")]
+    public async void AckMessageAsync_MsgWithoutPopCk_ThrowsArgumentException()
+    {
+        using var consumer = new Consumer();
+        var msg = new MessageExt { QueueId = 1, QueueOffset = 100 };
+        // 没有设置 PopCheckPoint(POP_CK)
+        await Assert.ThrowsAsync<ArgumentException>(() =>
+            consumer.AckMessageAsync("broker", msg));
+    }
+
+    [Fact]
+    [DisplayName("AckMessageAsync_传入MessageExt_无Broker连接时返回false")]
+    public async void AckMessageAsync_WithMsgExt_NoBroker_ReturnsFalse()
+    {
+        using var consumer = new Consumer();
+        var msg = new MessageExt { QueueId = 1, QueueOffset = 100 };
+        msg.PopCheckPoint = "100 1700000000000 60000 1 broker-a 1";
+        var result = await consumer.AckMessageAsync("nonexistent", msg);
+        Assert.False(result);
+    }
+
+    [Fact]
     [DisplayName("ChangeInvisibleTimeAsync_无Broker连接时返回false")]
     public async void ChangeInvisibleTimeAsync_NoBroker_ReturnsFalse()
     {
@@ -46,12 +97,64 @@ public class PopConsumeTests
     }
 
     [Fact]
+    [DisplayName("ChangeInvisibleTimeAsync_指定queueId_无Broker连接时返回false")]
+    public async void ChangeInvisibleTimeAsync_WithQueueId_NoBroker_ReturnsFalse()
+    {
+        using var consumer = new Consumer();
+        var result = await consumer.ChangeInvisibleTimeAsync("nonexistent", "extra", 0, 30000, queueId: 3);
+        Assert.False(result);
+    }
+
+    [Fact]
+    [DisplayName("ChangeInvisibleTimeAsync_传入MessageExt_Null消息抛出异常")]
+    public async void ChangeInvisibleTimeAsync_NullMsg_ThrowsException()
+    {
+        using var consumer = new Consumer();
+        await Assert.ThrowsAsync<ArgumentNullException>(() =>
+            consumer.ChangeInvisibleTimeAsync("broker", (MessageExt)null, 30000));
+    }
+
+    [Fact]
+    [DisplayName("ChangeInvisibleTimeAsync_传入MessageExt_缺少POP_CK属性抛出异常")]
+    public async void ChangeInvisibleTimeAsync_MsgWithoutPopCk_ThrowsArgumentException()
+    {
+        using var consumer = new Consumer();
+        var msg = new MessageExt { QueueId = 2, QueueOffset = 200 };
+        // 没有设置 PopCheckPoint(POP_CK)
+        await Assert.ThrowsAsync<ArgumentException>(() =>
+            consumer.ChangeInvisibleTimeAsync("broker", msg, 30000));
+    }
+
+    [Fact]
+    [DisplayName("ChangeInvisibleTimeAsync_传入MessageExt_无Broker连接时返回false")]
+    public async void ChangeInvisibleTimeAsync_WithMsgExt_NoBroker_ReturnsFalse()
+    {
+        using var consumer = new Consumer();
+        var msg = new MessageExt { QueueId = 2, QueueOffset = 200 };
+        msg.PopCheckPoint = "200 1700000000000 60000 1 broker-a 2";
+        var result = await consumer.ChangeInvisibleTimeAsync("nonexistent", msg, 30000);
+        Assert.False(result);
+    }
+
+    [Fact]
+    [DisplayName("MessageExt_PopCheckPoint属性读写正常")]
+    public void MessageExt_PopCheckPoint_GetSet()
+    {
+        var msg = new MessageExt();
+        Assert.Null(msg.PopCheckPoint);
+
+        msg.PopCheckPoint = "100 1700000000000 60000 1 broker-a 1";
+        Assert.Equal("100 1700000000000 60000 1 broker-a 1", msg.PopCheckPoint);
+        Assert.Equal("100 1700000000000 60000 1 broker-a 1", msg.Properties["POP_CK"]);
+    }
+
+    [Fact]
     [DisplayName("RequestCode包含Pop消费相关码")]
     public void RequestCode_ContainsPopCodes()
     {
-        Assert.Equal(200050, (Int32)NewLife.RocketMQ.Protocol.RequestCode.POP_MESSAGE);
-        Assert.Equal(200051, (Int32)NewLife.RocketMQ.Protocol.RequestCode.ACK_MESSAGE);
-        Assert.Equal(200052, (Int32)NewLife.RocketMQ.Protocol.RequestCode.CHANGE_MESSAGE_INVISIBLETIME);
-        Assert.Equal(200151, (Int32)NewLife.RocketMQ.Protocol.RequestCode.BATCH_ACK_MESSAGE);
+        Assert.Equal(200050, (Int32)RequestCode.POP_MESSAGE);
+        Assert.Equal(200051, (Int32)RequestCode.ACK_MESSAGE);
+        Assert.Equal(200052, (Int32)RequestCode.CHANGE_MESSAGE_INVISIBLETIME);
+        Assert.Equal(200151, (Int32)RequestCode.BATCH_ACK_MESSAGE);
     }
 }