NewLife/X

优化消息队列并发性能,完善编码器注释

- DefaultMatchQueue 优化游标并发访问,环形扫描减少争用,提升匹配效率
- 编码器方法补充参数和返回值注释,提升文档质量
- Read/Match 逻辑更清晰,避免多线程数据区冲突
- SessionBase 发送消息资源管理更健壮,支持取消回调
- 统一变量命名,结构调整,提升代码一致性和可维护性
智能大石头 authored at 2026-01-13 11:16:04
441478d
Tree
1 Parent(s) 44342b2
Summary: 4 changed files with 151 additions and 179 deletions.
Modified +50 -60
Modified +45 -41
Modified +26 -31
Modified +30 -47
Modified +50 -60
diff --git a/NewLife.Core/Net/Handlers/IMatchQueue.cs b/NewLife.Core/Net/Handlers/IMatchQueue.cs
index 6afdd08..b8ef78b 100644
--- a/NewLife.Core/Net/Handlers/IMatchQueue.cs
+++ b/NewLife.Core/Net/Handlers/IMatchQueue.cs
@@ -52,7 +52,7 @@ public class DefaultMatchQueue : IMatchQueue
     private Int32 _cursor;
 
     /// <summary>按指定大小来初始化队列</summary>
-    /// <param name="size"></param>
+    /// <param name="size">队列大小</param>
     public DefaultMatchQueue(Int32 size = 256) => Items = new ItemWrap[size];
 
     /// <summary>加入请求队列</summary>
@@ -77,68 +77,43 @@ public class DefaultMatchQueue : IMatchQueue
             Span = ext?["Span"] as ISpan,
         };
 
-        // 若计数已接近容量,先做一次快速清理以回收过期项,避免“看似满”的误判
         var items = Items;
-        if (Volatile.Read(ref _Count) >= items.Length)
-        {
-            Check(null);
-        }
+        var len = items.Length;
+
+        // 若计数已接近容量,先做一次快速清理以回收过期项,避免"看似满"的误判
+        if (Volatile.Read(ref _Count) >= len) Check(null);
 
         // 加入队列(从游标位置开始扫描,避免总是从0导致争用)
-        var len = items.Length;
-        var start = _cursor;
+        var start = Volatile.Read(ref _cursor);
         for (var offset = 0; offset < len; ++offset)
         {
-            var i = start + offset;
-            if (i >= len) i -= len;
-
+            var i = (start + offset) % len;
             if (Interlocked.CompareExchange(ref items[i].Value, qi, null) == null)
             {
                 Interlocked.Increment(ref _Count);
 
-                // 推进游标
-                if (++i >= len) i = 0;
-                _cursor = i;
-
-                if (_Timer == null)
-                {
-                    lock (this)
-                    {
-                        _Timer ??= new TimerX(Check, null, 1000, 1000, "Match") { Async = true };
-                    }
-                }
+                // 推进游标到下一个位置
+                Volatile.Write(ref _cursor, (i + 1) % len);
 
+                StartTimer();
                 return source.Task;
             }
         }
 
-        // 第一次扫描失败后,再进行一次同步清理并重试,最后才认为真的满
+        // 第一次扫描失败后,再进行一次同步清理并重试
         Check(null);
 
         // 重试一次
-        items = Items; // 允许未来可能的扩容,这里重新读取引用
-        len = items.Length;
-        start = _cursor;
+        start = Volatile.Read(ref _cursor);
         for (var offset = 0; offset < len; ++offset)
         {
-            var i = start + offset;
-            if (i >= len) i -= len;
-
+            var i = (start + offset) % len;
             if (Interlocked.CompareExchange(ref items[i].Value, qi, null) == null)
             {
                 Interlocked.Increment(ref _Count);
+                Volatile.Write(ref _cursor, (i + 1) % len);
 
-                if (++i >= len) i = 0;
-                _cursor = i;
-
-                if (_Timer == null)
-                {
-                    lock (this)
-                    {
-                        _Timer ??= new TimerX(Check, null, 1000, 1000, "Match") { Async = true };
-                    }
-                }
-
+                StartTimer();
                 return source.Task;
             }
         }
@@ -147,6 +122,15 @@ public class DefaultMatchQueue : IMatchQueue
         throw new XException("The matching queue is full [{0}]", items.Length);
     }
 
+    private void StartTimer()
+    {
+        if (_Timer != null) return;
+        lock (this)
+        {
+            _Timer ??= new TimerX(Check, null, 1000, 1000, "Match") { Async = true };
+        }
+    }
+
     /// <summary>检查请求队列是否有匹配该响应的请求</summary>
     /// <param name="owner">拥有者</param>
     /// <param name="response">响应消息</param>
@@ -157,21 +141,26 @@ public class DefaultMatchQueue : IMatchQueue
     {
         if (Volatile.Read(ref _Count) <= 0) return false;
 
-        // 直接遍历,队列不会很长
-        var qs = Items;
-        for (var i = 0; i < qs.Length; i++)
+        // 从游标位置向前搜索,响应通常与最近的请求匹配
+        var items = Items;
+        var len = items.Length;
+        var start = Volatile.Read(ref _cursor);
+
+        // 先从游标往前搜索(最近添加的请求更可能匹配)
+        for (var offset = 1; offset <= len; ++offset)
         {
-            var qi = Volatile.Read(ref qs[i].Value);
+            var i = (start - offset + len) % len;
+            var qi = Volatile.Read(ref items[i].Value);
             if (qi == null) continue;
 
             if (qi.Owner == owner && callback(qi.Request, response))
             {
-                // CAS 置空,确保仅一次成功清理,避免并发重复清理造成 _Count 错乱
-                if (Interlocked.CompareExchange(ref qs[i].Value, null, qi) != qi) continue;
+                // CAS 置空,确保仅一次成功清理
+                if (Interlocked.CompareExchange(ref items[i].Value, null, qi) != qi) continue;
 
                 Interlocked.Decrement(ref _Count);
 
-                // 异步设置完成结果,否则可能会在当前线程恢复上层await,导致堵塞当前任务
+                // 设置完成结果,TaskCreationOptions.RunContinuationsAsynchronously确保不会阻塞当前线程
                 var src = qi.Source;
                 if (src != null && !src.Task.IsCompleted)
                 {
@@ -188,33 +177,34 @@ public class DefaultMatchQueue : IMatchQueue
         }
 
         if (SocketSetting.Current.Debug)
-            XTrace.WriteLine("MatchQueue.Check 失败 [{0}] result={1} Items={2}", response, result, _Count);
+            XTrace.WriteLine("MatchQueue.Match 失败 [{0}] result={1} Items={2}", response, result, _Count);
 
         return false;
     }
 
-    /// <summary>定时检查发送队列,超时未收到响应则重发</summary>
-    /// <param name="state"></param>
+    /// <summary>定时检查发送队列,超时未收到响应则取消</summary>
+    /// <param name="state">状态参数</param>
     void Check(Object? state)
     {
         if (Volatile.Read(ref _Count) <= 0) return;
 
-        // 直接遍历,队列不会很长
         var now = Runtime.TickCount64;
-        var qs = Items;
-        for (var i = 0; i < qs.Length; i++)
+        var items = Items;
+
+        // 遍历清理过期项
+        for (var i = 0; i < items.Length; i++)
         {
-            var qi = Volatile.Read(ref qs[i].Value);
+            var qi = Volatile.Read(ref items[i].Value);
             if (qi == null) continue;
 
             // 过期取消
             if (qi.EndTime <= now)
             {
-                if (Interlocked.CompareExchange(ref qs[i].Value, null, qi) != qi) continue;
+                if (Interlocked.CompareExchange(ref items[i].Value, null, qi) != qi) continue;
 
                 Interlocked.Decrement(ref _Count);
 
-                // 异步取消任务,避免在当前线程执行上层await的延续任务
+                // 异步取消任务
                 var src = qi.Source;
                 if (src != null && !src.Task.IsCompleted)
                 {
@@ -233,15 +223,15 @@ public class DefaultMatchQueue : IMatchQueue
     /// <summary>清空队列</summary>
     public virtual void Clear()
     {
-        var qs = Items;
-        for (var i = 0; i < qs.Length; ++i)
+        var items = Items;
+        for (var i = 0; i < items.Length; ++i)
         {
-            var qi = Interlocked.Exchange(ref qs[i].Value, null);
+            var qi = Interlocked.Exchange(ref items[i].Value, null);
             if (qi == null) continue;
 
             Interlocked.Decrement(ref _Count);
 
-            // 异步取消任务,避免在当前线程执行上层await的延续任务
+            // 异步取消任务
             var src = qi.Source;
             if (src != null && !src.Task.IsCompleted)
             {
Modified +45 -41
diff --git a/NewLife.Core/Net/Handlers/MessageCodec.cs b/NewLife.Core/Net/Handlers/MessageCodec.cs
index a141fbe..9b1ee6f 100644
--- a/NewLife.Core/Net/Handlers/MessageCodec.cs
+++ b/NewLife.Core/Net/Handlers/MessageCodec.cs
@@ -38,8 +38,8 @@ public class MessageCodec<T> : Handler
     public Boolean UserPacket { get; set; } = true;
 
     /// <summary>打开链接</summary>
-    /// <param name="context"></param>
-    /// <returns></returns>
+    /// <param name="context">处理器上下文</param>
+    /// <returns>是否成功打开</returns>
     public override Boolean Open(IHandlerContext context)
     {
         if (context.Owner is ISocketClient client) Timeout = client.Timeout;
@@ -52,9 +52,9 @@ public class MessageCodec<T> : Handler
     /// 遇到消息T时,调用Encode编码并加入队列。
     /// Encode返回空时,跳出调用链。
     /// </remarks>
-    /// <param name="context"></param>
-    /// <param name="message"></param>
-    /// <returns></returns>
+    /// <param name="context">处理器上下文</param>
+    /// <param name="message">消息</param>
+    /// <returns>处理后的消息</returns>
     public override Object? Write(IHandlerContext context, Object message)
     {
         // 谁申请,谁归还
@@ -88,9 +88,9 @@ public class MessageCodec<T> : Handler
     }
 
     /// <summary>编码消息,一般是编码为Packet后传给下一个处理器</summary>
-    /// <param name="context"></param>
-    /// <param name="msg"></param>
-    /// <returns></returns>
+    /// <param name="context">处理器上下文</param>
+    /// <param name="msg">消息</param>
+    /// <returns>编码后的数据包</returns>
     protected virtual Object? Encode(IHandlerContext context, T msg)
     {
         if (msg is IMessage msg2) return msg2.ToPacket();
@@ -99,9 +99,8 @@ public class MessageCodec<T> : Handler
     }
 
     /// <summary>把请求加入队列,等待响应到来时建立请求响应匹配</summary>
-    /// <param name="context"></param>
-    /// <param name="msg"></param>
-    /// <returns></returns>
+    /// <param name="context">处理器上下文</param>
+    /// <param name="msg">消息</param>
     protected virtual void AddToQueue(IHandlerContext context, T msg)
     {
         if (msg != null && context is IExtend ext && ext["TaskSource"] is TaskCompletionSource<Object> source)
@@ -113,9 +112,9 @@ public class MessageCodec<T> : Handler
     }
 
     /// <summary>连接关闭时,清空粘包编码器</summary>
-    /// <param name="context"></param>
-    /// <param name="reason"></param>
-    /// <returns></returns>
+    /// <param name="context">处理器上下文</param>
+    /// <param name="reason">关闭原因</param>
+    /// <returns>是否成功关闭</returns>
     public override Boolean Close(IHandlerContext context, String reason)
     {
         Queue?.Clear();
@@ -128,9 +127,9 @@ public class MessageCodec<T> : Handler
     /// Decode可以返回多个消息,每个消息调用一次下一级处理器。
     /// Decode返回空时,跳出调用链。
     /// </remarks>
-    /// <param name="context"></param>
-    /// <param name="message"></param>
-    /// <returns></returns>
+    /// <param name="context">处理器上下文</param>
+    /// <param name="message">消息</param>
+    /// <returns>处理后的消息</returns>
     public override Object? Read(IHandlerContext context, Object message)
     {
         if (message is not IPacket pk) return base.Read(context, message);
@@ -139,40 +138,45 @@ public class MessageCodec<T> : Handler
         var list = Decode(context, pk);
         if (list == null) return null;
 
+        var queue = Queue;
+        var userPacket = UserPacket;
+
         foreach (var msg in list)
         {
             if (msg == null) continue;
 
             Object? rs = null;
-            if (UserPacket && msg is IMessage msg2)
+            IMessage? rawMsg = null;
+
+            // 提取消息负载
+            if (userPacket && msg is IMessage msg2)
             {
+                rawMsg = msg2;
                 if (context is IExtend ext) ext["_raw_message"] = msg2;
                 rs = msg2.Payload;
             }
             else
+            {
                 rs = msg;
+            }
 
-            //var rs = message;
-            if (msg is IMessage msg3)
+            // 匹配请求队列(仅响应消息)
+            if (rawMsg != null && rawMsg.Reply && queue != null)
             {
-                // 匹配请求队列
-                if (msg3.Reply)
-                {
-                    //!!! 处理结果的Packet需要拷贝一份,否则交给另一个线程使用会有冲突
-                    // Match里面TrySetResult时,必然唤醒原来阻塞的Task,如果不是当前io线程执行后续代码,必然导致两个线程共用了数据区,因此需要拷贝
-                    if (rs is IMessage msg4 && msg4.Payload != null && msg4.Payload == msg3.Payload)
-                        msg4.Payload = msg4.Payload.Clone();
-
-                    Queue?.Match(context.Owner, msg, rs ?? msg, IsMatch);
-                }
+                // 处理结果的Packet需要拷贝一份,否则交给另一个线程使用会有冲突
+                // Match里面TrySetResult时,必然唤醒原来阻塞的Task,可能导致两个线程共用了数据区
+                if (rs is IMessage msg4 && msg4.Payload != null && msg4.Payload == rawMsg.Payload)
+                    msg4.Payload = msg4.Payload.Clone();
+
+                queue.Match(context.Owner, msg, rs ?? msg, IsMatch);
             }
-            else if (rs != null)
+            else if (rs != null && queue != null && msg is not IMessage)
             {
                 // 其它消息不考虑响应
-                Queue?.Match(context.Owner, msg, rs, IsMatch);
+                queue.Match(context.Owner, msg, rs, IsMatch);
             }
 
-            // 匹配输入回调,让上层事件收到分包信息。
+            // 匹配输入回调,让上层事件收到分包信息
             // 这里很可能处于网络IO线程,阻塞了下一个Tcp包的接收
             base.Read(context, rs ?? msg);
         }
@@ -181,8 +185,8 @@ public class MessageCodec<T> : Handler
     }
 
     /// <summary>从上下文中获取原始请求</summary>
-    /// <param name="context"></param>
-    /// <returns></returns>
+    /// <param name="context">处理器上下文</param>
+    /// <returns>原始消息</returns>
     protected IMessage? GetRequest(IHandlerContext context)
     {
         if (context is IExtend ext) return ext["_raw_message"] as IMessage;
@@ -191,15 +195,15 @@ public class MessageCodec<T> : Handler
     }
 
     /// <summary>解码</summary>
-    /// <param name="context"></param>
-    /// <param name="pk"></param>
-    /// <returns></returns>
+    /// <param name="context">处理器上下文</param>
+    /// <param name="pk">数据包</param>
+    /// <returns>解码后的消息列表</returns>
     protected virtual IList<T>? Decode(IHandlerContext context, IPacket pk) => null;
 
     /// <summary>是否匹配响应</summary>
-    /// <param name="request"></param>
-    /// <param name="response"></param>
-    /// <returns></returns>
+    /// <param name="request">请求消息</param>
+    /// <param name="response">响应消息</param>
+    /// <returns>是否匹配</returns>
     protected virtual Boolean IsMatch(Object? request, Object? response) => true;
 
     #region 粘包处理
Modified +26 -31
diff --git a/NewLife.Core/Net/Handlers/StandardCodec.cs b/NewLife.Core/Net/Handlers/StandardCodec.cs
index 675e159..9aef773 100644
--- a/NewLife.Core/Net/Handlers/StandardCodec.cs
+++ b/NewLife.Core/Net/Handlers/StandardCodec.cs
@@ -12,15 +12,12 @@ namespace NewLife.Net.Handlers;
 /// </remarks>
 public class StandardCodec : MessageCodec<IMessage>
 {
-    ///// <summary>编码器。用于编码非消息对象</summary>
-    //public IPacketEncoder? Encoder { get; set; }
-
     private Int32 _gid;
 
     /// <summary>写入数据</summary>
-    /// <param name="context"></param>
-    /// <param name="message"></param>
-    /// <returns></returns>
+    /// <param name="context">处理器上下文</param>
+    /// <param name="message">消息</param>
+    /// <returns>处理后的消息</returns>
     public override Object? Write(IHandlerContext context, Object message)
     {
         DataKinds? kind = null;
@@ -42,13 +39,12 @@ public class StandardCodec : MessageCodec<IMessage>
 
         if (message is IPacket pk)
         {
-            DefaultMessage? response = null;
+            // 优先复用请求消息创建响应
             var request = GetRequest(context);
-            if (request != null && !request.Reply)
-            {
-                response = request.CreateReply() as DefaultMessage;
-            }
-            response ??= new DefaultMessage();
+            var response = (request != null && !request.Reply)
+                ? request.CreateReply() as DefaultMessage ?? new DefaultMessage()
+                : new DefaultMessage();
+
             response.Flag = (Byte)(kind ?? DataKinds.Packet);
             response.Payload = pk;
             message = response;
@@ -58,6 +54,7 @@ public class StandardCodec : MessageCodec<IMessage>
                 response.Flag = (Byte)dk;
         }
 
+        // 为请求消息分配序列号
         if (message is DefaultMessage msg && !msg.Reply && msg.Sequence == 0)
             msg.Sequence = (Byte)Interlocked.Increment(ref _gid);
 
@@ -65,18 +62,18 @@ public class StandardCodec : MessageCodec<IMessage>
     }
 
     /// <summary>加入队列</summary>
-    /// <param name="context"></param>
-    /// <param name="msg"></param>
-    /// <returns></returns>
+    /// <param name="context">处理器上下文</param>
+    /// <param name="msg">消息</param>
     protected override void AddToQueue(IHandlerContext context, IMessage msg)
     {
+        // 只有请求消息才加入队列等待响应
         if (!msg.Reply) base.AddToQueue(context, msg);
     }
 
     /// <summary>解码</summary>
-    /// <param name="context"></param>
-    /// <param name="pk"></param>
-    /// <returns></returns>
+    /// <param name="context">处理器上下文</param>
+    /// <param name="pk">数据包</param>
+    /// <returns>解码后的消息列表</returns>
     protected override IList<IMessage>? Decode(IHandlerContext context, IPacket pk)
     {
         if (context.Owner is not IExtend ss) return null;
@@ -95,7 +92,7 @@ public class StandardCodec : MessageCodec<IMessage>
         }
 
         var pks = pc.Parse(pk);
-        var list = new List<IMessage>();
+        var list = new List<IMessage>(pks.Count);
         foreach (var item in pks)
         {
             var msg = new DefaultMessage();
@@ -106,20 +103,18 @@ public class StandardCodec : MessageCodec<IMessage>
     }
 
     /// <summary>是否匹配响应</summary>
-    /// <param name="request"></param>
-    /// <param name="response"></param>
-    /// <returns></returns>
-    protected override Boolean IsMatch(Object? request, Object? response)
-    {
-        return request is DefaultMessage req &&
-            response is DefaultMessage res &&
-            req.Sequence == res.Sequence;
-    }
+    /// <param name="request">请求消息</param>
+    /// <param name="response">响应消息</param>
+    /// <returns>是否匹配</returns>
+    protected override Boolean IsMatch(Object? request, Object? response) =>
+        request is DefaultMessage req &&
+        response is DefaultMessage res &&
+        req.Sequence == res.Sequence;
 
     /// <summary>连接关闭时,清空粘包编码器</summary>
-    /// <param name="context"></param>
-    /// <param name="reason"></param>
-    /// <returns></returns>
+    /// <param name="context">处理器上下文</param>
+    /// <param name="reason">关闭原因</param>
+    /// <returns>是否成功关闭</returns>
     public override Boolean Close(IHandlerContext context, String reason)
     {
         if (context.Owner is IExtend ss) ss["Codec"] = null;
Modified +30 -47
diff --git a/NewLife.Core/Net/SessionBase.cs b/NewLife.Core/Net/SessionBase.cs
index 4e16be2..2d6c8ae 100644
--- a/NewLife.Core/Net/SessionBase.cs
+++ b/NewLife.Core/Net/SessionBase.cs
@@ -811,82 +811,51 @@ public abstract class SessionBase : DisposeBase, ISocketClient, ITransport, ILog
     }
 
     /// <summary>通过管道发送消息,不等待响应</summary>
-    /// <param name="message"></param>
-    /// <returns></returns>
+    /// <param name="message">消息</param>
+    /// <returns>发送字节数</returns>
     public virtual Int32 SendMessage(Object message)
     {
         if (Pipeline == null) throw new ArgumentNullException(nameof(Pipeline), "No pipes are set");
 
         using var span = Tracer?.NewSpan($"net:{Name}:SendMessage", message);
+        var ctx = CreateContext(this);
         try
         {
             if (span != null && message is ITraceMessage tm && tm.TraceId.IsNullOrEmpty()) tm.TraceId = span.ToString();
 
-            var ctx = CreateContext(this);
-            var rs = (Int32)(Pipeline.Write(ctx, message) ?? 0);
-
-            // 写入完成后归还上下文
-            ReturnContext(ctx);
-
-            return rs;
+            return (Int32)(Pipeline.Write(ctx, message) ?? 0);
         }
         catch (Exception ex)
         {
             span?.SetError(ex, message);
             throw;
         }
-    }
-
-    /// <summary>通过管道发送消息并等待响应</summary>
-    /// <param name="message"></param>
-    /// <returns></returns>
-    public virtual async Task<Object> SendMessageAsync(Object message)
-    {
-        if (Pipeline == null) throw new ArgumentNullException(nameof(Pipeline), "No pipes are set");
-
-        using var span = Tracer?.NewSpan($"net:{Name}:SendMessageAsync", message);
-        try
-        {
-            if (span != null && message is ITraceMessage tm && tm.TraceId.IsNullOrEmpty()) tm.TraceId = span.ToString();
-
-            var ctx = CreateContext(this);
-#if NET45
-            var source = new TaskCompletionSource<Object>();
-#else
-            var source = new TaskCompletionSource<Object>(TaskCreationOptions.RunContinuationsAsynchronously);
-#endif
-            ctx["TaskSource"] = source;
-            ctx["Span"] = span;
-
-            var rs = (Int32)(Pipeline.Write(ctx, message) ?? 0);
-            if (rs < 0) return TaskEx.CompletedTask;
-
-            return await source.Task.ConfigureAwait(false);
-        }
-        catch (Exception ex)
+        finally
         {
-            if (ex is TaskCanceledException)
-                span?.AppendTag(ex.Message);
-            else
-                span?.SetError(ex, message);
-            throw;
+            // 写入完成后归还上下文
+            ReturnContext(ctx);
         }
     }
 
     /// <summary>通过管道发送消息并等待响应</summary>
     /// <param name="message">消息</param>
+    /// <returns>响应消息</returns>
+    public virtual Task<Object> SendMessageAsync(Object message) => SendMessageAsync(message, default);
+
+    /// <summary>通过管道发送消息并等待响应</summary>
+    /// <param name="message">消息</param>
     /// <param name="cancellationToken">取消通知</param>
-    /// <returns></returns>
+    /// <returns>响应消息</returns>
     public virtual async Task<Object> SendMessageAsync(Object message, CancellationToken cancellationToken)
     {
         if (Pipeline == null) throw new ArgumentNullException(nameof(Pipeline), "No pipes are set");
 
         using var span = Tracer?.NewSpan($"net:{Name}:SendMessageAsync", message);
+        var ctx = CreateContext(this);
         try
         {
             if (span != null && message is ITraceMessage tm && tm.TraceId.IsNullOrEmpty()) tm.TraceId = span.ToString();
 
-            var ctx = CreateContext(this);
 #if NET45
             var source = new TaskCompletionSource<Object>();
 #else
@@ -898,13 +867,22 @@ public abstract class SessionBase : DisposeBase, ISocketClient, ITransport, ILog
             var rs = (Int32)(Pipeline.Write(ctx, message) ?? 0);
             if (rs < 0) return TaskEx.CompletedTask;
 
+            // 写入完成后立即归还上下文,source已加入匹配队列,不再需要上下文
+            ReturnContext(ctx);
+            ctx = null;
+
             // 注册取消时的处理,如果没有收到响应,取消发送等待
             // Register返回值需要Dispose,否则会导致内存泄漏
             // https://stackoverflow.com/questions/14627226/why-is-my-async-await-with-cancellationtokensource-leaking-memory
-            using (cancellationToken.Register(TrySetCanceled, source))
+            if (cancellationToken.CanBeCanceled)
             {
-                return await source.Task.ConfigureAwait(false);
+                using (cancellationToken.Register(TrySetCanceled, source))
+                {
+                    return await source.Task.ConfigureAwait(false);
+                }
             }
+
+            return await source.Task.ConfigureAwait(false);
         }
         catch (Exception ex)
         {
@@ -914,6 +892,11 @@ public abstract class SessionBase : DisposeBase, ISocketClient, ITransport, ILog
                 span?.SetError(ex, null);
             throw;
         }
+        finally
+        {
+            // 异常时归还上下文(正常路径已在上面归还并置null)
+            if (ctx != null) ReturnContext(ctx);
+        }
     }
 
     private void TrySetCanceled(Object? state)