借助队列事件总线,统一会话管理,在总线上消费到事件后,通过会话集合驱动业务行为。如果是本机,直接使用内存事件总线;如果是Redis,将使用专用的消息队列。智能大石头 authored at 2025-02-21 18:00:17
diff --git a/NewLife.Remoting/NewLife.Remoting.csproj b/NewLife.Remoting/NewLife.Remoting.csproj
index aef498a..d6f1b6e 100644
--- a/NewLife.Remoting/NewLife.Remoting.csproj
+++ b/NewLife.Remoting/NewLife.Remoting.csproj
@@ -54,7 +54,7 @@
</ItemGroup>
<ItemGroup>
- <PackageReference Include="NewLife.Core" Version="11.4.2025.201" />
+ <PackageReference Include="NewLife.Core" Version="11.4.2025.221-beta0925" />
</ItemGroup>
<ItemGroup>
diff --git a/NewLife.Remoting/Services/DeviceSession.cs b/NewLife.Remoting/Services/DeviceSession.cs
index 0e6cd95..1c39ead 100644
--- a/NewLife.Remoting/Services/DeviceSession.cs
+++ b/NewLife.Remoting/Services/DeviceSession.cs
@@ -1,28 +1,36 @@
using NewLife.Log;
using NewLife.Messaging;
+using NewLife.Remoting.Models;
#if !NET45
using TaskEx=System.Threading.Tasks.Task;
#endif
namespace NewLife.Remoting.Services;
-public class DeviceSession : DisposeBase, IDeviceSession, IEventHandler<String>
+/// <summary>设备会话</summary>
+public class DeviceSession : DisposeBase, IDeviceSession
{
#region 属性
- public String Code { get; set; }
+ /// <summary>编码</summary>
+ public String Code { get; set; } = null!;
- public ITracer Tracer { get; set; }
+ /// <summary>最后一次通信时间,主要表示会话活跃时间,包括收发</summary>
+ public DateTime LastTime { get; }
+
+ /// <summary>链路追踪</summary>
+ public ITracer? Tracer { get; set; }
#endregion
+ /// <summary>开始数据交互</summary>
public void Start(CancellationTokenSource source)
{
-
}
- public Task HandleAsync(String @event, IEventContext<String> context)
+ /// <summary>处理事件</summary>
+ public virtual Task HandleAsync(CommandModel command)
{
- if (context is not DeviceEventContext ctx || ctx.Code.IsNullOrEmpty() || ctx.Code != Code)
- return TaskEx.CompletedTask;
+ //if (context is not DeviceEventContext ctx || ctx.Code.IsNullOrEmpty() || ctx.Code != Code)
+ // return TaskEx.CompletedTask;
return TaskEx.CompletedTask;
}
diff --git a/NewLife.Remoting/Services/SessionManager.cs b/NewLife.Remoting/Services/SessionManager.cs
index 4b01841..9d71d1c 100644
--- a/NewLife.Remoting/Services/SessionManager.cs
+++ b/NewLife.Remoting/Services/SessionManager.cs
@@ -1,9 +1,12 @@
-using NewLife.Caching;
+using System.Collections.Concurrent;
+using NewLife.Caching;
using NewLife.Log;
using NewLife.Messaging;
using NewLife.Model;
+using NewLife.Net;
using NewLife.Remoting.Models;
using NewLife.Serialization;
+using NewLife.Threading;
#if !NET45
using TaskEx=System.Threading.Tasks.Task;
#endif
@@ -13,15 +16,23 @@ namespace NewLife.Remoting.Services;
/// <summary>命令会话</summary>
public interface IDeviceSession : IDisposable
{
+ /// <summary>最后一次通信时间,主要表示会话活跃时间,包括收发</summary>
+ DateTime LastTime { get; }
+
+ /// <summary>开始数据交互</summary>
+ /// <param name="source"></param>
void Start(CancellationTokenSource source);
-}
-class DeviceEventContext(IEventBus<String> bus, String code) : EventContext<String>(bus)
-{
- /// <summary>设备编码</summary>
- public String Code { get; set; } = code;
+ /// <summary>处理事件</summary>
+ Task HandleAsync(CommandModel command);
}
+//class DeviceEventContext(IEventBus<String> bus, String code) : EventContext<String>(bus)
+//{
+// /// <summary>设备编码</summary>
+// public String Code { get; set; } = code;
+//}
+
/// <summary>会话管理器</summary>
public class SessionManager : DisposeBase
{
@@ -29,14 +40,21 @@ public class SessionManager : DisposeBase
public String Topic { get; set; } = "Commands";
/// <summary>事件总线</summary>
- public EventBus<String> Bus { get; set; } = new EventBus<String>();
+ public IEventBus<String> Bus { get; set; } = null!;
+
+ /// <summary>清理周期。单位毫秒,默认10秒。</summary>
+ public Int32 ClearPeriod { get; set; } = 10;
+
+ /// <summary>会话超时时间。默认30秒</summary>
+ public Int32 Timeout { get; set; } = 30;
+ /// <summary>清理会话计时器</summary>
+ private TimerX? _clearTimer;
+ private readonly ConcurrentDictionary<String, IDeviceSession> _dic = new();
+
+ private readonly ICache? _cache;
private readonly ITracer _tracer;
private readonly ILog _log;
- ICache? _cache;
- IProducerConsumer<String>? _queue;
- IProducerConsumer<String>? _queue2;
- CancellationTokenSource? _source;
/// <summary>实例化</summary>
public SessionManager(ITracer tracer, ILog log, IServiceProvider serviceProvider)
@@ -54,7 +72,29 @@ public class SessionManager : DisposeBase
{
base.Dispose(disposing);
- _source?.TryDispose();
+ CloseAll(disposing ? "Dispose" : "GC");
+ }
+
+ private void Init()
+ {
+ if (Bus != null) return;
+ lock (this)
+ {
+ if (Bus != null) return;
+
+ // 事件总线
+ if (_cache is not MemoryCache && _cache is Cache cache)
+ Bus = cache.GetEventBus<String>(Topic);
+ else
+ Bus = new EventBus<String>();
+
+ // 订阅总线事件到OnMessage
+ Bus.Subscribe(OnMessage);
+
+ //_source = new CancellationTokenSource();
+
+ //_ = Task.Run(() => ConsumeMessage(_queue2, _source));
+ }
}
/// <summary>创建会话</summary>
@@ -62,25 +102,24 @@ public class SessionManager : DisposeBase
/// <returns></returns>
public IDeviceSession Create(String code)
{
+ Init();
+
var session = new DeviceSession
{
Code = code
};
- Bus.Subscribe(session, code);
- session.OnDisposed += (s, e) => Bus.Unsubscribe(code);
+ _dic.AddOrUpdate(code, session, (k, s) => s);
- // 如果有缓存提供者,则使用缓存提供者的队列,否则直接进入事件总线
- if (_queue2 == null)
+ session.OnDisposed += (s, e) =>
{
- _queue2 ??= _cache?.GetQueue<String>(Topic);
- if (_queue2 != null)
- {
- _source = new CancellationTokenSource();
+ _dic.Remove((s as DeviceSession)?.Code + "");
+ //Bus.Unsubscribe(code);
+ };
- _ = Task.Run(() => ConsumeMessage(_queue2, _source));
- }
- }
+ var p = ClearPeriod * 1000;
+ if (p > 0)
+ _clearTimer ??= new TimerX(RemoveNotAlive, null, p, p) { Async = true, };
return session;
}
@@ -91,81 +130,115 @@ public class SessionManager : DisposeBase
/// <returns></returns>
public Task PublishAsync(String code, String message)
{
- // 如果有缓存提供者,则使用缓存提供者的队列,否则直接进入事件总线
- _queue ??= _cache?.GetQueue<String>(Topic);
+ //// 如果有缓存提供者,则使用缓存提供者的队列,否则直接进入事件总线
+ //_queue ??= _cache?.GetQueue<String>(Topic);
+
+ //if (_queue != null)
+ //{
+ // _queue.Add($"{code}#{message}");
+
+ // return TaskEx.CompletedTask;
+ //}
+
+ message = $"{code}#{message}";
+
+ return Bus.PublishAsync(message);
+ }
+
+ private async Task OnMessage(String message)
+ {
+ using var span = _tracer?.NewSpan($"mq:Command", message);
- if (_queue != null)
+ var code = "";
+ var p = message.IndexOf('#');
+ if (p > 0 && p < 32)
{
- _queue.Add($"{code}#{message}");
+ code = message[..p];
+ message = message[(p + 1)..];
+ }
+
+ // 解码
+ var dic = JsonParser.Decode(message)!;
+ var msg = JsonHelper.Convert<CommandModel>(dic);
+ span?.Detach(dic);
- return TaskEx.CompletedTask;
+ // 修正时间
+ if (msg != null)
+ {
+ if (msg.StartTime.Year < 2000) msg.StartTime = DateTime.MinValue;
+ if (msg.Expire.Year < 2000) msg.Expire = DateTime.MinValue;
}
- return Bus.PublishAsync(message, new DeviceEventContext(Bus, code));
+ // 交由设备会话处理
+ if (msg != null && (msg.Expire.Year <= 2000 || msg.Expire >= Runtime.UtcNow))
+ {
+ var session = Get(code);
+ if (session != null) await session.HandleAsync(msg).ConfigureAwait(false);
+ }
}
- /// <summary>从队列中消费消息,经事件总线送给设备会话</summary>
- /// <param name="queue"></param>
- /// <param name="source"></param>
+ /// <summary>获取会话,加锁</summary>
+ /// <param name="key"></param>
/// <returns></returns>
- private async Task ConsumeMessage(IProducerConsumer<String> queue, CancellationTokenSource source)
+ public IDeviceSession? Get(String key)
{
- DefaultSpan.Current = null;
- var cancellationToken = source.Token;
- try
+ if (!_dic.TryGetValue(key, out var session)) return null;
+
+ return session;
+ }
+
+ /// <summary>关闭所有</summary>
+ public void CloseAll(String reason)
+ {
+ if (!_dic.Any()) return;
+
+ foreach (var item in _dic.ToValueArray())
{
- while (!cancellationToken.IsCancellationRequested)
+ if (item is IDisposable2 ds && !ds.Disposed)
{
- ISpan? span = null;
- var mqMsg = await queue.TakeOneAsync(15, cancellationToken).ConfigureAwait(false);
- if (mqMsg != null)
- {
- // 埋点
- span = _tracer?.NewSpan($"mq:Command", mqMsg);
-
- var code = "";
- var p = mqMsg.IndexOf('#');
- if (p > 0 && p < 32)
- {
- code = mqMsg[..p];
- mqMsg = mqMsg[(p + 1)..];
- }
-
- // 解码
- var dic = JsonParser.Decode(mqMsg)!;
- var msg = JsonHelper.Convert<CommandModel>(dic);
- span?.Detach(dic);
-
- // 修正时间
- if (msg != null)
- {
- if (msg.StartTime.Year < 2000) msg.StartTime = DateTime.MinValue;
- if (msg.Expire.Year < 2000) msg.Expire = DateTime.MinValue;
- }
-
- // 发布到事件总线,交由设备会话处理
- if (msg != null && (msg.Expire.Year <= 2000 || msg.Expire >= DateTime.Now))
- {
- await Bus.PublishAsync(mqMsg, new DeviceEventContext(Bus, code)).ConfigureAwait(false);
- }
-
- span?.Dispose();
- }
- else
- {
- await Task.Delay(1_000, cancellationToken).ConfigureAwait(false);
- }
+ if (item is INetSession ss) ss.Close(reason);
+
+ item.TryDispose();
}
}
- catch (TaskCanceledException) { }
- catch (OperationCanceledException) { }
- catch (Exception ex)
+ }
+
+ /// <summary>移除不活动的会话</summary>
+ private void RemoveNotAlive(Object? state)
+ {
+ if (!_dic.Any()) return;
+
+ var timeout = Timeout;
+ var keys = new List<String>();
+ var values = new List<IDeviceSession>();
+
+ foreach (var elm in _dic)
{
- _log?.Error(ex.ToString());
+ var item = elm.Value;
+ // 判断是否已超过最大不活跃时间
+ if (item == null || item is IDisposable2 ds && ds.Disposed || timeout > 0 && IsNotAlive(item, timeout))
+ {
+ keys.Add(elm.Key);
+ values.Add(elm.Value);
+ }
+ }
+ // 从会话集合里删除这些键值,并行字典操作安全
+ foreach (var item in keys)
+ {
+ _dic.Remove(item);
}
- finally
+
+ // 已经离开了锁,慢慢释放各个会话
+ foreach (var item in values)
{
- source.Cancel();
+ if (item is ILogFeature lf)
+ lf.Log?.Info("超过{0}秒不活跃销毁 {1}", timeout, item);
+
+ if (item is INetSession ss) ss.Close(nameof(RemoveNotAlive));
+ //item.Dispose();
+ item.TryDispose();
}
}
+
+ private static Boolean IsNotAlive(IDeviceSession session, Int32 timeout) => session.LastTime > DateTime.MinValue && session.LastTime.AddSeconds(timeout) < DateTime.Now;
}
diff --git a/Samples/IoTZero/IoTZero.csproj b/Samples/IoTZero/IoTZero.csproj
index 5138ff2..baa578b 100644
--- a/Samples/IoTZero/IoTZero.csproj
+++ b/Samples/IoTZero/IoTZero.csproj
@@ -19,11 +19,11 @@
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="NewLife.Cube.Core" Version="6.3.2025.115-beta0618" />
- <PackageReference Include="NewLife.IoT" Version="2.4.2025.109-beta1741" />
+ <PackageReference Include="NewLife.Cube.Core" Version="6.4.2025.203" />
+ <PackageReference Include="NewLife.IoT" Version="2.4.2025.203" />
<PackageReference Include="NewLife.MQTT" Version="2.0.2025.109-beta1738" />
- <PackageReference Include="NewLife.Redis" Version="6.1.2025.114-beta0537" />
- <PackageReference Include="NewLife.Stardust.Extensions" Version="3.3.2025.119-beta1552" />
+ <PackageReference Include="NewLife.Redis" Version="6.1.2025.209" />
+ <PackageReference Include="NewLife.Stardust.Extensions" Version="3.3.2025.202" />
<PackageReference Include="NewLife.XCode" Version="11.18.2025.201" />
</ItemGroup>
diff --git a/Samples/Zero.Desktop/Zero.Desktop.csproj b/Samples/Zero.Desktop/Zero.Desktop.csproj
index 1f7573e..71a10d8 100644
--- a/Samples/Zero.Desktop/Zero.Desktop.csproj
+++ b/Samples/Zero.Desktop/Zero.Desktop.csproj
@@ -26,7 +26,7 @@
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="NewLife.Stardust" Version="3.3.2025.119-beta1552" />
+ <PackageReference Include="NewLife.Stardust" Version="3.3.2025.202" />
<PackageReference Include="System.Speech" Version="9.0.0" />
</ItemGroup>
diff --git a/Samples/Zero.RpcServer/Zero.RpcServer.csproj b/Samples/Zero.RpcServer/Zero.RpcServer.csproj
index 64bb2e1..5eca937 100644
--- a/Samples/Zero.RpcServer/Zero.RpcServer.csproj
+++ b/Samples/Zero.RpcServer/Zero.RpcServer.csproj
@@ -20,8 +20,8 @@
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="NewLife.Redis" Version="6.1.2025.114-beta0537" />
- <PackageReference Include="NewLife.Stardust" Version="3.3.2025.119-beta1552" />
+ <PackageReference Include="NewLife.Redis" Version="6.1.2025.209" />
+ <PackageReference Include="NewLife.Stardust" Version="3.3.2025.202" />
<PackageReference Include="NewLife.XCode" Version="11.18.2025.201" />
</ItemGroup>
diff --git a/Samples/ZeroServer/ZeroServer.csproj b/Samples/ZeroServer/ZeroServer.csproj
index 7d78d1c..6ae770b 100644
--- a/Samples/ZeroServer/ZeroServer.csproj
+++ b/Samples/ZeroServer/ZeroServer.csproj
@@ -19,9 +19,9 @@
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="NewLife.Cube.Core" Version="6.3.2025.115-beta0618" />
- <PackageReference Include="NewLife.Redis" Version="6.1.2025.114-beta0537" />
- <PackageReference Include="NewLife.Stardust.Extensions" Version="3.3.2025.119-beta1552" />
+ <PackageReference Include="NewLife.Cube.Core" Version="6.4.2025.203" />
+ <PackageReference Include="NewLife.Redis" Version="6.1.2025.209" />
+ <PackageReference Include="NewLife.Stardust.Extensions" Version="3.3.2025.202" />
<PackageReference Include="NewLife.XCode" Version="11.18.2025.201" />
</ItemGroup>
diff --git a/Test/Test.csproj b/Test/Test.csproj
index b07633c..ca331fb 100644
--- a/Test/Test.csproj
+++ b/Test/Test.csproj
@@ -10,7 +10,7 @@
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="NewLife.Core" Version="11.4.2025.201" />
+ <PackageReference Include="NewLife.Core" Version="11.4.2025.221-beta0925" />
</ItemGroup>
<ItemGroup>
diff --git a/XUnitTest/XUnitTest.csproj b/XUnitTest/XUnitTest.csproj
index 00cb597..7d9de76 100644
--- a/XUnitTest/XUnitTest.csproj
+++ b/XUnitTest/XUnitTest.csproj
@@ -9,13 +9,13 @@
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="9.0.0" />
- <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
+ <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="9.0.2" />
+ <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.13.0" />
<PackageReference Include="Moq" Version="4.20.72" />
- <PackageReference Include="NewLife.Core" Version="11.4.2025.201" />
+ <PackageReference Include="NewLife.Core" Version="11.4.2025.221-beta0925" />
<PackageReference Include="NewLife.UnitTest" Version="1.0.2025.101" />
<PackageReference Include="xunit" Version="2.9.3" />
- <PackageReference Include="xunit.runner.visualstudio" Version="3.0.1">
+ <PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>