优化消息队列并发性能,完善编码器注释 - DefaultMatchQueue 优化游标并发访问,环形扫描减少争用,提升匹配效率 - 编码器方法补充参数和返回值注释,提升文档质量 - Read/Match 逻辑更清晰,避免多线程数据区冲突 - SessionBase 发送消息资源管理更健壮,支持取消回调 - 统一变量命名,结构调整,提升代码一致性和可维护性智能大石头 authored at 2026-01-13 11:16:04
diff --git a/NewLife.Core/Net/Handlers/IMatchQueue.cs b/NewLife.Core/Net/Handlers/IMatchQueue.cs
index 6afdd08..b8ef78b 100644
--- a/NewLife.Core/Net/Handlers/IMatchQueue.cs
+++ b/NewLife.Core/Net/Handlers/IMatchQueue.cs
@@ -52,7 +52,7 @@ public class DefaultMatchQueue : IMatchQueue
private Int32 _cursor;
/// <summary>按指定大小来初始化队列</summary>
- /// <param name="size"></param>
+ /// <param name="size">队列大小</param>
public DefaultMatchQueue(Int32 size = 256) => Items = new ItemWrap[size];
/// <summary>加入请求队列</summary>
@@ -77,68 +77,43 @@ public class DefaultMatchQueue : IMatchQueue
Span = ext?["Span"] as ISpan,
};
- // 若计数已接近容量,先做一次快速清理以回收过期项,避免“看似满”的误判
var items = Items;
- if (Volatile.Read(ref _Count) >= items.Length)
- {
- Check(null);
- }
+ var len = items.Length;
+
+ // 若计数已接近容量,先做一次快速清理以回收过期项,避免"看似满"的误判
+ if (Volatile.Read(ref _Count) >= len) Check(null);
// 加入队列(从游标位置开始扫描,避免总是从0导致争用)
- var len = items.Length;
- var start = _cursor;
+ var start = Volatile.Read(ref _cursor);
for (var offset = 0; offset < len; ++offset)
{
- var i = start + offset;
- if (i >= len) i -= len;
-
+ var i = (start + offset) % len;
if (Interlocked.CompareExchange(ref items[i].Value, qi, null) == null)
{
Interlocked.Increment(ref _Count);
- // 推进游标
- if (++i >= len) i = 0;
- _cursor = i;
-
- if (_Timer == null)
- {
- lock (this)
- {
- _Timer ??= new TimerX(Check, null, 1000, 1000, "Match") { Async = true };
- }
- }
+ // 推进游标到下一个位置
+ Volatile.Write(ref _cursor, (i + 1) % len);
+ StartTimer();
return source.Task;
}
}
- // 第一次扫描失败后,再进行一次同步清理并重试,最后才认为真的满
+ // 第一次扫描失败后,再进行一次同步清理并重试
Check(null);
// 重试一次
- items = Items; // 允许未来可能的扩容,这里重新读取引用
- len = items.Length;
- start = _cursor;
+ start = Volatile.Read(ref _cursor);
for (var offset = 0; offset < len; ++offset)
{
- var i = start + offset;
- if (i >= len) i -= len;
-
+ var i = (start + offset) % len;
if (Interlocked.CompareExchange(ref items[i].Value, qi, null) == null)
{
Interlocked.Increment(ref _Count);
+ Volatile.Write(ref _cursor, (i + 1) % len);
- if (++i >= len) i = 0;
- _cursor = i;
-
- if (_Timer == null)
- {
- lock (this)
- {
- _Timer ??= new TimerX(Check, null, 1000, 1000, "Match") { Async = true };
- }
- }
-
+ StartTimer();
return source.Task;
}
}
@@ -147,6 +122,15 @@ public class DefaultMatchQueue : IMatchQueue
throw new XException("The matching queue is full [{0}]", items.Length);
}
+ private void StartTimer()
+ {
+ if (_Timer != null) return;
+ lock (this)
+ {
+ _Timer ??= new TimerX(Check, null, 1000, 1000, "Match") { Async = true };
+ }
+ }
+
/// <summary>检查请求队列是否有匹配该响应的请求</summary>
/// <param name="owner">拥有者</param>
/// <param name="response">响应消息</param>
@@ -157,21 +141,26 @@ public class DefaultMatchQueue : IMatchQueue
{
if (Volatile.Read(ref _Count) <= 0) return false;
- // 直接遍历,队列不会很长
- var qs = Items;
- for (var i = 0; i < qs.Length; i++)
+ // 从游标位置向前搜索,响应通常与最近的请求匹配
+ var items = Items;
+ var len = items.Length;
+ var start = Volatile.Read(ref _cursor);
+
+ // 先从游标往前搜索(最近添加的请求更可能匹配)
+ for (var offset = 1; offset <= len; ++offset)
{
- var qi = Volatile.Read(ref qs[i].Value);
+ var i = (start - offset + len) % len;
+ var qi = Volatile.Read(ref items[i].Value);
if (qi == null) continue;
if (qi.Owner == owner && callback(qi.Request, response))
{
- // CAS 置空,确保仅一次成功清理,避免并发重复清理造成 _Count 错乱
- if (Interlocked.CompareExchange(ref qs[i].Value, null, qi) != qi) continue;
+ // CAS 置空,确保仅一次成功清理
+ if (Interlocked.CompareExchange(ref items[i].Value, null, qi) != qi) continue;
Interlocked.Decrement(ref _Count);
- // 异步设置完成结果,否则可能会在当前线程恢复上层await,导致堵塞当前任务
+ // 设置完成结果,TaskCreationOptions.RunContinuationsAsynchronously确保不会阻塞当前线程
var src = qi.Source;
if (src != null && !src.Task.IsCompleted)
{
@@ -188,33 +177,34 @@ public class DefaultMatchQueue : IMatchQueue
}
if (SocketSetting.Current.Debug)
- XTrace.WriteLine("MatchQueue.Check 失败 [{0}] result={1} Items={2}", response, result, _Count);
+ XTrace.WriteLine("MatchQueue.Match 失败 [{0}] result={1} Items={2}", response, result, _Count);
return false;
}
- /// <summary>定时检查发送队列,超时未收到响应则重发</summary>
- /// <param name="state"></param>
+ /// <summary>定时检查发送队列,超时未收到响应则取消</summary>
+ /// <param name="state">状态参数</param>
void Check(Object? state)
{
if (Volatile.Read(ref _Count) <= 0) return;
- // 直接遍历,队列不会很长
var now = Runtime.TickCount64;
- var qs = Items;
- for (var i = 0; i < qs.Length; i++)
+ var items = Items;
+
+ // 遍历清理过期项
+ for (var i = 0; i < items.Length; i++)
{
- var qi = Volatile.Read(ref qs[i].Value);
+ var qi = Volatile.Read(ref items[i].Value);
if (qi == null) continue;
// 过期取消
if (qi.EndTime <= now)
{
- if (Interlocked.CompareExchange(ref qs[i].Value, null, qi) != qi) continue;
+ if (Interlocked.CompareExchange(ref items[i].Value, null, qi) != qi) continue;
Interlocked.Decrement(ref _Count);
- // 异步取消任务,避免在当前线程执行上层await的延续任务
+ // 异步取消任务
var src = qi.Source;
if (src != null && !src.Task.IsCompleted)
{
@@ -233,15 +223,15 @@ public class DefaultMatchQueue : IMatchQueue
/// <summary>清空队列</summary>
public virtual void Clear()
{
- var qs = Items;
- for (var i = 0; i < qs.Length; ++i)
+ var items = Items;
+ for (var i = 0; i < items.Length; ++i)
{
- var qi = Interlocked.Exchange(ref qs[i].Value, null);
+ var qi = Interlocked.Exchange(ref items[i].Value, null);
if (qi == null) continue;
Interlocked.Decrement(ref _Count);
- // 异步取消任务,避免在当前线程执行上层await的延续任务
+ // 异步取消任务
var src = qi.Source;
if (src != null && !src.Task.IsCompleted)
{
diff --git a/NewLife.Core/Net/Handlers/MessageCodec.cs b/NewLife.Core/Net/Handlers/MessageCodec.cs
index a141fbe..9b1ee6f 100644
--- a/NewLife.Core/Net/Handlers/MessageCodec.cs
+++ b/NewLife.Core/Net/Handlers/MessageCodec.cs
@@ -38,8 +38,8 @@ public class MessageCodec<T> : Handler
public Boolean UserPacket { get; set; } = true;
/// <summary>打开链接</summary>
- /// <param name="context"></param>
- /// <returns></returns>
+ /// <param name="context">处理器上下文</param>
+ /// <returns>是否成功打开</returns>
public override Boolean Open(IHandlerContext context)
{
if (context.Owner is ISocketClient client) Timeout = client.Timeout;
@@ -52,9 +52,9 @@ public class MessageCodec<T> : Handler
/// 遇到消息T时,调用Encode编码并加入队列。
/// Encode返回空时,跳出调用链。
/// </remarks>
- /// <param name="context"></param>
- /// <param name="message"></param>
- /// <returns></returns>
+ /// <param name="context">处理器上下文</param>
+ /// <param name="message">消息</param>
+ /// <returns>处理后的消息</returns>
public override Object? Write(IHandlerContext context, Object message)
{
// 谁申请,谁归还
@@ -88,9 +88,9 @@ public class MessageCodec<T> : Handler
}
/// <summary>编码消息,一般是编码为Packet后传给下一个处理器</summary>
- /// <param name="context"></param>
- /// <param name="msg"></param>
- /// <returns></returns>
+ /// <param name="context">处理器上下文</param>
+ /// <param name="msg">消息</param>
+ /// <returns>编码后的数据包</returns>
protected virtual Object? Encode(IHandlerContext context, T msg)
{
if (msg is IMessage msg2) return msg2.ToPacket();
@@ -99,9 +99,8 @@ public class MessageCodec<T> : Handler
}
/// <summary>把请求加入队列,等待响应到来时建立请求响应匹配</summary>
- /// <param name="context"></param>
- /// <param name="msg"></param>
- /// <returns></returns>
+ /// <param name="context">处理器上下文</param>
+ /// <param name="msg">消息</param>
protected virtual void AddToQueue(IHandlerContext context, T msg)
{
if (msg != null && context is IExtend ext && ext["TaskSource"] is TaskCompletionSource<Object> source)
@@ -113,9 +112,9 @@ public class MessageCodec<T> : Handler
}
/// <summary>连接关闭时,清空粘包编码器</summary>
- /// <param name="context"></param>
- /// <param name="reason"></param>
- /// <returns></returns>
+ /// <param name="context">处理器上下文</param>
+ /// <param name="reason">关闭原因</param>
+ /// <returns>是否成功关闭</returns>
public override Boolean Close(IHandlerContext context, String reason)
{
Queue?.Clear();
@@ -128,9 +127,9 @@ public class MessageCodec<T> : Handler
/// Decode可以返回多个消息,每个消息调用一次下一级处理器。
/// Decode返回空时,跳出调用链。
/// </remarks>
- /// <param name="context"></param>
- /// <param name="message"></param>
- /// <returns></returns>
+ /// <param name="context">处理器上下文</param>
+ /// <param name="message">消息</param>
+ /// <returns>处理后的消息</returns>
public override Object? Read(IHandlerContext context, Object message)
{
if (message is not IPacket pk) return base.Read(context, message);
@@ -139,40 +138,45 @@ public class MessageCodec<T> : Handler
var list = Decode(context, pk);
if (list == null) return null;
+ var queue = Queue;
+ var userPacket = UserPacket;
+
foreach (var msg in list)
{
if (msg == null) continue;
Object? rs = null;
- if (UserPacket && msg is IMessage msg2)
+ IMessage? rawMsg = null;
+
+ // 提取消息负载
+ if (userPacket && msg is IMessage msg2)
{
+ rawMsg = msg2;
if (context is IExtend ext) ext["_raw_message"] = msg2;
rs = msg2.Payload;
}
else
+ {
rs = msg;
+ }
- //var rs = message;
- if (msg is IMessage msg3)
+ // 匹配请求队列(仅响应消息)
+ if (rawMsg != null && rawMsg.Reply && queue != null)
{
- // 匹配请求队列
- if (msg3.Reply)
- {
- //!!! 处理结果的Packet需要拷贝一份,否则交给另一个线程使用会有冲突
- // Match里面TrySetResult时,必然唤醒原来阻塞的Task,如果不是当前io线程执行后续代码,必然导致两个线程共用了数据区,因此需要拷贝
- if (rs is IMessage msg4 && msg4.Payload != null && msg4.Payload == msg3.Payload)
- msg4.Payload = msg4.Payload.Clone();
-
- Queue?.Match(context.Owner, msg, rs ?? msg, IsMatch);
- }
+ // 处理结果的Packet需要拷贝一份,否则交给另一个线程使用会有冲突
+ // Match里面TrySetResult时,必然唤醒原来阻塞的Task,可能导致两个线程共用了数据区
+ if (rs is IMessage msg4 && msg4.Payload != null && msg4.Payload == rawMsg.Payload)
+ msg4.Payload = msg4.Payload.Clone();
+
+ queue.Match(context.Owner, msg, rs ?? msg, IsMatch);
}
- else if (rs != null)
+ else if (rs != null && queue != null && msg is not IMessage)
{
// 其它消息不考虑响应
- Queue?.Match(context.Owner, msg, rs, IsMatch);
+ queue.Match(context.Owner, msg, rs, IsMatch);
}
- // 匹配输入回调,让上层事件收到分包信息。
+ // 匹配输入回调,让上层事件收到分包信息
// 这里很可能处于网络IO线程,阻塞了下一个Tcp包的接收
base.Read(context, rs ?? msg);
}
@@ -181,8 +185,8 @@ public class MessageCodec<T> : Handler
}
/// <summary>从上下文中获取原始请求</summary>
- /// <param name="context"></param>
- /// <returns></returns>
+ /// <param name="context">处理器上下文</param>
+ /// <returns>原始消息</returns>
protected IMessage? GetRequest(IHandlerContext context)
{
if (context is IExtend ext) return ext["_raw_message"] as IMessage;
@@ -191,15 +195,15 @@ public class MessageCodec<T> : Handler
}
/// <summary>解码</summary>
- /// <param name="context"></param>
- /// <param name="pk"></param>
- /// <returns></returns>
+ /// <param name="context">处理器上下文</param>
+ /// <param name="pk">数据包</param>
+ /// <returns>解码后的消息列表</returns>
protected virtual IList<T>? Decode(IHandlerContext context, IPacket pk) => null;
/// <summary>是否匹配响应</summary>
- /// <param name="request"></param>
- /// <param name="response"></param>
- /// <returns></returns>
+ /// <param name="request">请求消息</param>
+ /// <param name="response">响应消息</param>
+ /// <returns>是否匹配</returns>
protected virtual Boolean IsMatch(Object? request, Object? response) => true;
#region 粘包处理
diff --git a/NewLife.Core/Net/Handlers/StandardCodec.cs b/NewLife.Core/Net/Handlers/StandardCodec.cs
index 675e159..9aef773 100644
--- a/NewLife.Core/Net/Handlers/StandardCodec.cs
+++ b/NewLife.Core/Net/Handlers/StandardCodec.cs
@@ -12,15 +12,12 @@ namespace NewLife.Net.Handlers;
/// </remarks>
public class StandardCodec : MessageCodec<IMessage>
{
- ///// <summary>编码器。用于编码非消息对象</summary>
- //public IPacketEncoder? Encoder { get; set; }
-
private Int32 _gid;
/// <summary>写入数据</summary>
- /// <param name="context"></param>
- /// <param name="message"></param>
- /// <returns></returns>
+ /// <param name="context">处理器上下文</param>
+ /// <param name="message">消息</param>
+ /// <returns>处理后的消息</returns>
public override Object? Write(IHandlerContext context, Object message)
{
DataKinds? kind = null;
@@ -42,13 +39,12 @@ public class StandardCodec : MessageCodec<IMessage>
if (message is IPacket pk)
{
- DefaultMessage? response = null;
+ // 优先复用请求消息创建响应
var request = GetRequest(context);
- if (request != null && !request.Reply)
- {
- response = request.CreateReply() as DefaultMessage;
- }
- response ??= new DefaultMessage();
+ var response = (request != null && !request.Reply)
+ ? request.CreateReply() as DefaultMessage ?? new DefaultMessage()
+ : new DefaultMessage();
+
response.Flag = (Byte)(kind ?? DataKinds.Packet);
response.Payload = pk;
message = response;
@@ -58,6 +54,7 @@ public class StandardCodec : MessageCodec<IMessage>
response.Flag = (Byte)dk;
}
+ // 为请求消息分配序列号
if (message is DefaultMessage msg && !msg.Reply && msg.Sequence == 0)
msg.Sequence = (Byte)Interlocked.Increment(ref _gid);
@@ -65,18 +62,18 @@ public class StandardCodec : MessageCodec<IMessage>
}
/// <summary>加入队列</summary>
- /// <param name="context"></param>
- /// <param name="msg"></param>
- /// <returns></returns>
+ /// <param name="context">处理器上下文</param>
+ /// <param name="msg">消息</param>
protected override void AddToQueue(IHandlerContext context, IMessage msg)
{
+ // 只有请求消息才加入队列等待响应
if (!msg.Reply) base.AddToQueue(context, msg);
}
/// <summary>解码</summary>
- /// <param name="context"></param>
- /// <param name="pk"></param>
- /// <returns></returns>
+ /// <param name="context">处理器上下文</param>
+ /// <param name="pk">数据包</param>
+ /// <returns>解码后的消息列表</returns>
protected override IList<IMessage>? Decode(IHandlerContext context, IPacket pk)
{
if (context.Owner is not IExtend ss) return null;
@@ -95,7 +92,7 @@ public class StandardCodec : MessageCodec<IMessage>
}
var pks = pc.Parse(pk);
- var list = new List<IMessage>();
+ var list = new List<IMessage>(pks.Count);
foreach (var item in pks)
{
var msg = new DefaultMessage();
@@ -106,20 +103,18 @@ public class StandardCodec : MessageCodec<IMessage>
}
/// <summary>是否匹配响应</summary>
- /// <param name="request"></param>
- /// <param name="response"></param>
- /// <returns></returns>
- protected override Boolean IsMatch(Object? request, Object? response)
- {
- return request is DefaultMessage req &&
- response is DefaultMessage res &&
- req.Sequence == res.Sequence;
- }
+ /// <param name="request">请求消息</param>
+ /// <param name="response">响应消息</param>
+ /// <returns>是否匹配</returns>
+ protected override Boolean IsMatch(Object? request, Object? response) =>
+ request is DefaultMessage req &&
+ response is DefaultMessage res &&
+ req.Sequence == res.Sequence;
/// <summary>连接关闭时,清空粘包编码器</summary>
- /// <param name="context"></param>
- /// <param name="reason"></param>
- /// <returns></returns>
+ /// <param name="context">处理器上下文</param>
+ /// <param name="reason">关闭原因</param>
+ /// <returns>是否成功关闭</returns>
public override Boolean Close(IHandlerContext context, String reason)
{
if (context.Owner is IExtend ss) ss["Codec"] = null;
diff --git a/NewLife.Core/Net/SessionBase.cs b/NewLife.Core/Net/SessionBase.cs
index 4e16be2..2d6c8ae 100644
--- a/NewLife.Core/Net/SessionBase.cs
+++ b/NewLife.Core/Net/SessionBase.cs
@@ -811,82 +811,51 @@ public abstract class SessionBase : DisposeBase, ISocketClient, ITransport, ILog
}
/// <summary>通过管道发送消息,不等待响应</summary>
- /// <param name="message"></param>
- /// <returns></returns>
+ /// <param name="message">消息</param>
+ /// <returns>发送字节数</returns>
public virtual Int32 SendMessage(Object message)
{
if (Pipeline == null) throw new ArgumentNullException(nameof(Pipeline), "No pipes are set");
using var span = Tracer?.NewSpan($"net:{Name}:SendMessage", message);
+ var ctx = CreateContext(this);
try
{
if (span != null && message is ITraceMessage tm && tm.TraceId.IsNullOrEmpty()) tm.TraceId = span.ToString();
- var ctx = CreateContext(this);
- var rs = (Int32)(Pipeline.Write(ctx, message) ?? 0);
-
- // 写入完成后归还上下文
- ReturnContext(ctx);
-
- return rs;
+ return (Int32)(Pipeline.Write(ctx, message) ?? 0);
}
catch (Exception ex)
{
span?.SetError(ex, message);
throw;
}
- }
-
- /// <summary>通过管道发送消息并等待响应</summary>
- /// <param name="message"></param>
- /// <returns></returns>
- public virtual async Task<Object> SendMessageAsync(Object message)
- {
- if (Pipeline == null) throw new ArgumentNullException(nameof(Pipeline), "No pipes are set");
-
- using var span = Tracer?.NewSpan($"net:{Name}:SendMessageAsync", message);
- try
- {
- if (span != null && message is ITraceMessage tm && tm.TraceId.IsNullOrEmpty()) tm.TraceId = span.ToString();
-
- var ctx = CreateContext(this);
-#if NET45
- var source = new TaskCompletionSource<Object>();
-#else
- var source = new TaskCompletionSource<Object>(TaskCreationOptions.RunContinuationsAsynchronously);
-#endif
- ctx["TaskSource"] = source;
- ctx["Span"] = span;
-
- var rs = (Int32)(Pipeline.Write(ctx, message) ?? 0);
- if (rs < 0) return TaskEx.CompletedTask;
-
- return await source.Task.ConfigureAwait(false);
- }
- catch (Exception ex)
+ finally
{
- if (ex is TaskCanceledException)
- span?.AppendTag(ex.Message);
- else
- span?.SetError(ex, message);
- throw;
+ // 写入完成后归还上下文
+ ReturnContext(ctx);
}
}
/// <summary>通过管道发送消息并等待响应</summary>
/// <param name="message">消息</param>
+ /// <returns>响应消息</returns>
+ public virtual Task<Object> SendMessageAsync(Object message) => SendMessageAsync(message, default);
+
+ /// <summary>通过管道发送消息并等待响应</summary>
+ /// <param name="message">消息</param>
/// <param name="cancellationToken">取消通知</param>
- /// <returns></returns>
+ /// <returns>响应消息</returns>
public virtual async Task<Object> SendMessageAsync(Object message, CancellationToken cancellationToken)
{
if (Pipeline == null) throw new ArgumentNullException(nameof(Pipeline), "No pipes are set");
using var span = Tracer?.NewSpan($"net:{Name}:SendMessageAsync", message);
+ var ctx = CreateContext(this);
try
{
if (span != null && message is ITraceMessage tm && tm.TraceId.IsNullOrEmpty()) tm.TraceId = span.ToString();
- var ctx = CreateContext(this);
#if NET45
var source = new TaskCompletionSource<Object>();
#else
@@ -898,13 +867,22 @@ public abstract class SessionBase : DisposeBase, ISocketClient, ITransport, ILog
var rs = (Int32)(Pipeline.Write(ctx, message) ?? 0);
if (rs < 0) return TaskEx.CompletedTask;
+ // 写入完成后立即归还上下文,source已加入匹配队列,不再需要上下文
+ ReturnContext(ctx);
+ ctx = null;
+
// 注册取消时的处理,如果没有收到响应,取消发送等待
// Register返回值需要Dispose,否则会导致内存泄漏
// https://stackoverflow.com/questions/14627226/why-is-my-async-await-with-cancellationtokensource-leaking-memory
- using (cancellationToken.Register(TrySetCanceled, source))
+ if (cancellationToken.CanBeCanceled)
{
- return await source.Task.ConfigureAwait(false);
+ using (cancellationToken.Register(TrySetCanceled, source))
+ {
+ return await source.Task.ConfigureAwait(false);
+ }
}
+
+ return await source.Task.ConfigureAwait(false);
}
catch (Exception ex)
{
@@ -914,6 +892,11 @@ public abstract class SessionBase : DisposeBase, ISocketClient, ITransport, ILog
span?.SetError(ex, null);
throw;
}
+ finally
+ {
+ // 异常时归还上下文(正常路径已在上面归还并置null)
+ if (ctx != null) ReturnContext(ctx);
+ }
}
private void TrySetCanceled(Object? state)