NewLife/NewLife.Remoting

借助队列事件总线,统一会话管理,在总线上消费到事件后,通过会话集合驱动业务行为。如果是本机,直接使用内存事件总线;如果是Redis,将使用专用的消息队列。
智能大石头 authored at 2025-02-21 18:00:17
e57057f
Tree
1 Parent(s) c4939d0
Summary: 9 changed files with 186 additions and 105 deletions.
Modified +1 -1
Modified +15 -7
Modified +155 -82
Modified +4 -4
Modified +1 -1
Modified +2 -2
Modified +3 -3
Modified +1 -1
Modified +4 -4
Modified +1 -1
diff --git a/NewLife.Remoting/NewLife.Remoting.csproj b/NewLife.Remoting/NewLife.Remoting.csproj
index aef498a..d6f1b6e 100644
--- a/NewLife.Remoting/NewLife.Remoting.csproj
+++ b/NewLife.Remoting/NewLife.Remoting.csproj
@@ -54,7 +54,7 @@
   </ItemGroup>
 
   <ItemGroup>
-    <PackageReference Include="NewLife.Core" Version="11.4.2025.201" />
+    <PackageReference Include="NewLife.Core" Version="11.4.2025.221-beta0925" />
   </ItemGroup>
 
   <ItemGroup>
Modified +15 -7
diff --git a/NewLife.Remoting/Services/DeviceSession.cs b/NewLife.Remoting/Services/DeviceSession.cs
index 0e6cd95..1c39ead 100644
--- a/NewLife.Remoting/Services/DeviceSession.cs
+++ b/NewLife.Remoting/Services/DeviceSession.cs
@@ -1,28 +1,36 @@
 using NewLife.Log;
 using NewLife.Messaging;
+using NewLife.Remoting.Models;
 #if !NET45
 using TaskEx=System.Threading.Tasks.Task;
 #endif
 
 namespace NewLife.Remoting.Services;
 
-public class DeviceSession : DisposeBase, IDeviceSession, IEventHandler<String>
+/// <summary>设备会话</summary>
+public class DeviceSession : DisposeBase, IDeviceSession
 {
     #region 属性
-    public String Code { get; set; }
+    /// <summary>编码</summary>
+    public String Code { get; set; } = null!;
 
-    public ITracer Tracer { get; set; }
+    /// <summary>最后一次通信时间,主要表示会话活跃时间,包括收发</summary>
+    public DateTime LastTime { get; }
+
+    /// <summary>链路追踪</summary>
+    public ITracer? Tracer { get; set; }
     #endregion
 
+    /// <summary>开始数据交互</summary>
     public void Start(CancellationTokenSource source)
     {
-
     }
 
-    public Task HandleAsync(String @event, IEventContext<String> context)
+    /// <summary>处理事件</summary>
+    public virtual Task HandleAsync(CommandModel command)
     {
-        if (context is not DeviceEventContext ctx || ctx.Code.IsNullOrEmpty() || ctx.Code != Code)
-            return TaskEx.CompletedTask;
+        //if (context is not DeviceEventContext ctx || ctx.Code.IsNullOrEmpty() || ctx.Code != Code)
+        //    return TaskEx.CompletedTask;
 
         return TaskEx.CompletedTask;
     }
Modified +155 -82
diff --git a/NewLife.Remoting/Services/SessionManager.cs b/NewLife.Remoting/Services/SessionManager.cs
index 4b01841..9d71d1c 100644
--- a/NewLife.Remoting/Services/SessionManager.cs
+++ b/NewLife.Remoting/Services/SessionManager.cs
@@ -1,9 +1,12 @@
-using NewLife.Caching;
+using System.Collections.Concurrent;
+using NewLife.Caching;
 using NewLife.Log;
 using NewLife.Messaging;
 using NewLife.Model;
+using NewLife.Net;
 using NewLife.Remoting.Models;
 using NewLife.Serialization;
+using NewLife.Threading;
 #if !NET45
 using TaskEx=System.Threading.Tasks.Task;
 #endif
@@ -13,15 +16,23 @@ namespace NewLife.Remoting.Services;
 /// <summary>命令会话</summary>
 public interface IDeviceSession : IDisposable
 {
+    /// <summary>最后一次通信时间,主要表示会话活跃时间,包括收发</summary>
+    DateTime LastTime { get; }
+
+    /// <summary>开始数据交互</summary>
+    /// <param name="source"></param>
     void Start(CancellationTokenSource source);
-}
 
-class DeviceEventContext(IEventBus<String> bus, String code) : EventContext<String>(bus)
-{
-    /// <summary>设备编码</summary>
-    public String Code { get; set; } = code;
+    /// <summary>处理事件</summary>
+    Task HandleAsync(CommandModel command);
 }
 
+//class DeviceEventContext(IEventBus<String> bus, String code) : EventContext<String>(bus)
+//{
+//    /// <summary>设备编码</summary>
+//    public String Code { get; set; } = code;
+//}
+
 /// <summary>会话管理器</summary>
 public class SessionManager : DisposeBase
 {
@@ -29,14 +40,21 @@ public class SessionManager : DisposeBase
     public String Topic { get; set; } = "Commands";
 
     /// <summary>事件总线</summary>
-    public EventBus<String> Bus { get; set; } = new EventBus<String>();
+    public IEventBus<String> Bus { get; set; } = null!;
+
+    /// <summary>清理周期。单位毫秒,默认10秒。</summary>
+    public Int32 ClearPeriod { get; set; } = 10;
+
+    /// <summary>会话超时时间。默认30秒</summary>
+    public Int32 Timeout { get; set; } = 30;
 
+    /// <summary>清理会话计时器</summary>
+    private TimerX? _clearTimer;
+    private readonly ConcurrentDictionary<String, IDeviceSession> _dic = new();
+
+    private readonly ICache? _cache;
     private readonly ITracer _tracer;
     private readonly ILog _log;
-    ICache? _cache;
-    IProducerConsumer<String>? _queue;
-    IProducerConsumer<String>? _queue2;
-    CancellationTokenSource? _source;
 
     /// <summary>实例化</summary>
     public SessionManager(ITracer tracer, ILog log, IServiceProvider serviceProvider)
@@ -54,7 +72,29 @@ public class SessionManager : DisposeBase
     {
         base.Dispose(disposing);
 
-        _source?.TryDispose();
+        CloseAll(disposing ? "Dispose" : "GC");
+    }
+
+    private void Init()
+    {
+        if (Bus != null) return;
+        lock (this)
+        {
+            if (Bus != null) return;
+
+            // 事件总线
+            if (_cache is not MemoryCache && _cache is Cache cache)
+                Bus = cache.GetEventBus<String>(Topic);
+            else
+                Bus = new EventBus<String>();
+
+            // 订阅总线事件到OnMessage
+            Bus.Subscribe(OnMessage);
+
+            //_source = new CancellationTokenSource();
+
+            //_ = Task.Run(() => ConsumeMessage(_queue2, _source));
+        }
     }
 
     /// <summary>创建会话</summary>
@@ -62,25 +102,24 @@ public class SessionManager : DisposeBase
     /// <returns></returns>
     public IDeviceSession Create(String code)
     {
+        Init();
+
         var session = new DeviceSession
         {
             Code = code
         };
 
-        Bus.Subscribe(session, code);
-        session.OnDisposed += (s, e) => Bus.Unsubscribe(code);
+        _dic.AddOrUpdate(code, session, (k, s) => s);
 
-        // 如果有缓存提供者,则使用缓存提供者的队列,否则直接进入事件总线
-        if (_queue2 == null)
+        session.OnDisposed += (s, e) =>
         {
-            _queue2 ??= _cache?.GetQueue<String>(Topic);
-            if (_queue2 != null)
-            {
-                _source = new CancellationTokenSource();
+            _dic.Remove((s as DeviceSession)?.Code + "");
+            //Bus.Unsubscribe(code);
+        };
 
-                _ = Task.Run(() => ConsumeMessage(_queue2, _source));
-            }
-        }
+        var p = ClearPeriod * 1000;
+        if (p > 0)
+            _clearTimer ??= new TimerX(RemoveNotAlive, null, p, p) { Async = true, };
 
         return session;
     }
@@ -91,81 +130,115 @@ public class SessionManager : DisposeBase
     /// <returns></returns>
     public Task PublishAsync(String code, String message)
     {
-        // 如果有缓存提供者,则使用缓存提供者的队列,否则直接进入事件总线
-        _queue ??= _cache?.GetQueue<String>(Topic);
+        //// 如果有缓存提供者,则使用缓存提供者的队列,否则直接进入事件总线
+        //_queue ??= _cache?.GetQueue<String>(Topic);
+
+        //if (_queue != null)
+        //{
+        //    _queue.Add($"{code}#{message}");
+
+        //    return TaskEx.CompletedTask;
+        //}
+
+        message = $"{code}#{message}";
+
+        return Bus.PublishAsync(message);
+    }
+
+    private async Task OnMessage(String message)
+    {
+        using var span = _tracer?.NewSpan($"mq:Command", message);
 
-        if (_queue != null)
+        var code = "";
+        var p = message.IndexOf('#');
+        if (p > 0 && p < 32)
         {
-            _queue.Add($"{code}#{message}");
+            code = message[..p];
+            message = message[(p + 1)..];
+        }
+
+        // 解码
+        var dic = JsonParser.Decode(message)!;
+        var msg = JsonHelper.Convert<CommandModel>(dic);
+        span?.Detach(dic);
 
-            return TaskEx.CompletedTask;
+        // 修正时间
+        if (msg != null)
+        {
+            if (msg.StartTime.Year < 2000) msg.StartTime = DateTime.MinValue;
+            if (msg.Expire.Year < 2000) msg.Expire = DateTime.MinValue;
         }
 
-        return Bus.PublishAsync(message, new DeviceEventContext(Bus, code));
+        // 交由设备会话处理
+        if (msg != null && (msg.Expire.Year <= 2000 || msg.Expire >= Runtime.UtcNow))
+        {
+            var session = Get(code);
+            if (session != null) await session.HandleAsync(msg).ConfigureAwait(false);
+        }
     }
 
-    /// <summary>从队列中消费消息,经事件总线送给设备会话</summary>
-    /// <param name="queue"></param>
-    /// <param name="source"></param>
+    /// <summary>获取会话,加锁</summary>
+    /// <param name="key"></param>
     /// <returns></returns>
-    private async Task ConsumeMessage(IProducerConsumer<String> queue, CancellationTokenSource source)
+    public IDeviceSession? Get(String key)
     {
-        DefaultSpan.Current = null;
-        var cancellationToken = source.Token;
-        try
+        if (!_dic.TryGetValue(key, out var session)) return null;
+
+        return session;
+    }
+
+    /// <summary>关闭所有</summary>
+    public void CloseAll(String reason)
+    {
+        if (!_dic.Any()) return;
+
+        foreach (var item in _dic.ToValueArray())
         {
-            while (!cancellationToken.IsCancellationRequested)
+            if (item is IDisposable2 ds && !ds.Disposed)
             {
-                ISpan? span = null;
-                var mqMsg = await queue.TakeOneAsync(15, cancellationToken).ConfigureAwait(false);
-                if (mqMsg != null)
-                {
-                    // 埋点
-                    span = _tracer?.NewSpan($"mq:Command", mqMsg);
-
-                    var code = "";
-                    var p = mqMsg.IndexOf('#');
-                    if (p > 0 && p < 32)
-                    {
-                        code = mqMsg[..p];
-                        mqMsg = mqMsg[(p + 1)..];
-                    }
-
-                    // 解码
-                    var dic = JsonParser.Decode(mqMsg)!;
-                    var msg = JsonHelper.Convert<CommandModel>(dic);
-                    span?.Detach(dic);
-
-                    // 修正时间
-                    if (msg != null)
-                    {
-                        if (msg.StartTime.Year < 2000) msg.StartTime = DateTime.MinValue;
-                        if (msg.Expire.Year < 2000) msg.Expire = DateTime.MinValue;
-                    }
-
-                    // 发布到事件总线,交由设备会话处理
-                    if (msg != null && (msg.Expire.Year <= 2000 || msg.Expire >= DateTime.Now))
-                    {
-                        await Bus.PublishAsync(mqMsg, new DeviceEventContext(Bus, code)).ConfigureAwait(false);
-                    }
-
-                    span?.Dispose();
-                }
-                else
-                {
-                    await Task.Delay(1_000, cancellationToken).ConfigureAwait(false);
-                }
+                if (item is INetSession ss) ss.Close(reason);
+
+                item.TryDispose();
             }
         }
-        catch (TaskCanceledException) { }
-        catch (OperationCanceledException) { }
-        catch (Exception ex)
+    }
+
+    /// <summary>移除不活动的会话</summary>
+    private void RemoveNotAlive(Object? state)
+    {
+        if (!_dic.Any()) return;
+
+        var timeout = Timeout;
+        var keys = new List<String>();
+        var values = new List<IDeviceSession>();
+
+        foreach (var elm in _dic)
         {
-            _log?.Error(ex.ToString());
+            var item = elm.Value;
+            // 判断是否已超过最大不活跃时间
+            if (item == null || item is IDisposable2 ds && ds.Disposed || timeout > 0 && IsNotAlive(item, timeout))
+            {
+                keys.Add(elm.Key);
+                values.Add(elm.Value);
+            }
+        }
+        // 从会话集合里删除这些键值,并行字典操作安全
+        foreach (var item in keys)
+        {
+            _dic.Remove(item);
         }
-        finally
+
+        // 已经离开了锁,慢慢释放各个会话
+        foreach (var item in values)
         {
-            source.Cancel();
+            if (item is ILogFeature lf)
+                lf.Log?.Info("超过{0}秒不活跃销毁 {1}", timeout, item);
+
+            if (item is INetSession ss) ss.Close(nameof(RemoveNotAlive));
+            //item.Dispose();
+            item.TryDispose();
         }
     }
+
+    private static Boolean IsNotAlive(IDeviceSession session, Int32 timeout) => session.LastTime > DateTime.MinValue && session.LastTime.AddSeconds(timeout) < DateTime.Now;
 }
Modified +4 -4
diff --git a/Samples/IoTZero/IoTZero.csproj b/Samples/IoTZero/IoTZero.csproj
index 5138ff2..baa578b 100644
--- a/Samples/IoTZero/IoTZero.csproj
+++ b/Samples/IoTZero/IoTZero.csproj
@@ -19,11 +19,11 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="NewLife.Cube.Core" Version="6.3.2025.115-beta0618" />
-    <PackageReference Include="NewLife.IoT" Version="2.4.2025.109-beta1741" />
+    <PackageReference Include="NewLife.Cube.Core" Version="6.4.2025.203" />
+    <PackageReference Include="NewLife.IoT" Version="2.4.2025.203" />
     <PackageReference Include="NewLife.MQTT" Version="2.0.2025.109-beta1738" />
-    <PackageReference Include="NewLife.Redis" Version="6.1.2025.114-beta0537" />
-    <PackageReference Include="NewLife.Stardust.Extensions" Version="3.3.2025.119-beta1552" />
+    <PackageReference Include="NewLife.Redis" Version="6.1.2025.209" />
+    <PackageReference Include="NewLife.Stardust.Extensions" Version="3.3.2025.202" />
     <PackageReference Include="NewLife.XCode" Version="11.18.2025.201" />
   </ItemGroup>
 
Modified +1 -1
diff --git a/Samples/Zero.Desktop/Zero.Desktop.csproj b/Samples/Zero.Desktop/Zero.Desktop.csproj
index 1f7573e..71a10d8 100644
--- a/Samples/Zero.Desktop/Zero.Desktop.csproj
+++ b/Samples/Zero.Desktop/Zero.Desktop.csproj
@@ -26,7 +26,7 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="NewLife.Stardust" Version="3.3.2025.119-beta1552" />
+    <PackageReference Include="NewLife.Stardust" Version="3.3.2025.202" />
     <PackageReference Include="System.Speech" Version="9.0.0" />
   </ItemGroup>
 
Modified +2 -2
diff --git a/Samples/Zero.RpcServer/Zero.RpcServer.csproj b/Samples/Zero.RpcServer/Zero.RpcServer.csproj
index 64bb2e1..5eca937 100644
--- a/Samples/Zero.RpcServer/Zero.RpcServer.csproj
+++ b/Samples/Zero.RpcServer/Zero.RpcServer.csproj
@@ -20,8 +20,8 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="NewLife.Redis" Version="6.1.2025.114-beta0537" />
-    <PackageReference Include="NewLife.Stardust" Version="3.3.2025.119-beta1552" />
+    <PackageReference Include="NewLife.Redis" Version="6.1.2025.209" />
+    <PackageReference Include="NewLife.Stardust" Version="3.3.2025.202" />
     <PackageReference Include="NewLife.XCode" Version="11.18.2025.201" />
   </ItemGroup>
 
Modified +3 -3
diff --git a/Samples/ZeroServer/ZeroServer.csproj b/Samples/ZeroServer/ZeroServer.csproj
index 7d78d1c..6ae770b 100644
--- a/Samples/ZeroServer/ZeroServer.csproj
+++ b/Samples/ZeroServer/ZeroServer.csproj
@@ -19,9 +19,9 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="NewLife.Cube.Core" Version="6.3.2025.115-beta0618" />
-    <PackageReference Include="NewLife.Redis" Version="6.1.2025.114-beta0537" />
-    <PackageReference Include="NewLife.Stardust.Extensions" Version="3.3.2025.119-beta1552" />
+    <PackageReference Include="NewLife.Cube.Core" Version="6.4.2025.203" />
+    <PackageReference Include="NewLife.Redis" Version="6.1.2025.209" />
+    <PackageReference Include="NewLife.Stardust.Extensions" Version="3.3.2025.202" />
     <PackageReference Include="NewLife.XCode" Version="11.18.2025.201" />
   </ItemGroup>
 
Modified +1 -1
diff --git a/Test/Test.csproj b/Test/Test.csproj
index b07633c..ca331fb 100644
--- a/Test/Test.csproj
+++ b/Test/Test.csproj
@@ -10,7 +10,7 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="NewLife.Core" Version="11.4.2025.201" />
+    <PackageReference Include="NewLife.Core" Version="11.4.2025.221-beta0925" />
   </ItemGroup>
 
   <ItemGroup>
Modified +4 -4
diff --git a/XUnitTest/XUnitTest.csproj b/XUnitTest/XUnitTest.csproj
index 00cb597..7d9de76 100644
--- a/XUnitTest/XUnitTest.csproj
+++ b/XUnitTest/XUnitTest.csproj
@@ -9,13 +9,13 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="9.0.0" />
-    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
+    <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="9.0.2" />
+    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.13.0" />
     <PackageReference Include="Moq" Version="4.20.72" />
-    <PackageReference Include="NewLife.Core" Version="11.4.2025.201" />
+    <PackageReference Include="NewLife.Core" Version="11.4.2025.221-beta0925" />
     <PackageReference Include="NewLife.UnitTest" Version="1.0.2025.101" />
     <PackageReference Include="xunit" Version="2.9.3" />
-    <PackageReference Include="xunit.runner.visualstudio" Version="3.0.1">
+    <PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
     </PackageReference>