NewLife/NewLife.RocketMQ

优化消费者Stop方法。https://github.com/NewLifeX/NewLife.RocketMQ/issues/95
智能大石头 authored at 2025-04-13 22:59:07
a4287b3
Tree
1 Parent(s) cdc8264
Summary: 3 changed files with 28 additions and 23 deletions.
Modified +5 -17
Modified +22 -5
Modified +1 -1
Modified +5 -17
diff --git a/NewLife.RocketMQ/Consumer.cs b/NewLife.RocketMQ/Consumer.cs
index d00867f..907ab45 100644
--- a/NewLife.RocketMQ/Consumer.cs
+++ b/NewLife.RocketMQ/Consumer.cs
@@ -115,25 +115,13 @@ public class Consumer : MqBase
     /// <summary>
     /// 停止
     /// </summary>
-    public override void Stop()
+    protected override void OnStop()
     {
-        if (!Active) return;
-
-        using var span = Tracer?.NewSpan($"mq:{Name}:Stop");
-        try
-        {
-            // 停止并保存偏移
-            StopSchedule();
-            PersistAll(_Queues).Wait();
-
-            base.Stop();
-        }
-        catch (Exception ex)
-        {
-            span?.SetError(ex, null);
+        // 停止并保存偏移
+        StopSchedule();
+        PersistAll(_Queues).Wait();
 
-            throw;
-        }
+        base.OnStop();
     }
 
     /// <summary>创建Broker客户端,已重载,设置更大的超时时间</summary>
Modified +22 -5
diff --git a/NewLife.RocketMQ/MqBase.cs b/NewLife.RocketMQ/MqBase.cs
index b6df4ea..50539a7 100644
--- a/NewLife.RocketMQ/MqBase.cs
+++ b/NewLife.RocketMQ/MqBase.cs
@@ -1,4 +1,5 @@
-using System.Collections.Concurrent;
+using System.Collections;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Reflection;
@@ -31,7 +32,7 @@ public abstract class MqBase : DisposeBase
     /// <summary>主题</summary>
     /// <remarks>阿里云目前需要在Topic前面带上实例ID并用【%】连接,组成路由Topic[用来路由到实例Topic]</remarks>
     public String Topic { get; set; } = DefaultTopic;
-    
+
     /// <summary>默认的主题队列数量</summary>
     public Int32 DefaultTopicQueueNums { get; set; } = 4;
 
@@ -255,10 +256,28 @@ public abstract class MqBase : DisposeBase
 
     /// <summary>停止</summary>
     /// <returns></returns>
-    public virtual void Stop()
+    public void Stop()
     {
         if (!Active) return;
 
+        using var span = Tracer?.NewSpan($"mq:{Name}:Stop");
+        try
+        {
+            OnStop();
+        }
+        catch (Exception ex)
+        {
+            span?.SetError(ex, null);
+
+            throw;
+        }
+
+        Active = false;
+    }
+
+    /// <summary>停止</summary>
+    protected virtual void OnStop()
+    {
         foreach (var item in _Brokers)
         {
             try
@@ -272,8 +291,6 @@ public abstract class MqBase : DisposeBase
             }
         }
         _Brokers.Clear();
-
-        Active = false;
     }
     #endregion
 
Modified +1 -1
diff --git a/NewLife.RocketMQ/NewLife.RocketMQ.csproj b/NewLife.RocketMQ/NewLife.RocketMQ.csproj
index 5b66f7c..1e3af41 100644
--- a/NewLife.RocketMQ/NewLife.RocketMQ.csproj
+++ b/NewLife.RocketMQ/NewLife.RocketMQ.csproj
@@ -61,7 +61,7 @@
   </ItemGroup>
 
   <ItemGroup>
-    <PackageReference Include="NewLife.Core" Version="11.4.2025.301" />
+    <PackageReference Include="NewLife.Core" Version="11.4.2025.401" />
   </ItemGroup>
 
 </Project>