NewLife/Stardust

星尘客户端建立到服务端的websocket长连接
大石头 authored at 2021-04-15 09:22:06
db4c904
Tree
1 Parent(s) b6ccce9
Summary: 3 changed files with 85 additions and 13 deletions.
Modified +2 -0
Modified +1 -0
Modified +82 -13
Modified +2 -0
diff --git a/Stardust.Server/Controllers/NodeController.cs b/Stardust.Server/Controllers/NodeController.cs
index 3a69e27..729ed94 100644
--- a/Stardust.Server/Controllers/NodeController.cs
+++ b/Stardust.Server/Controllers/NodeController.cs
@@ -535,6 +535,7 @@ namespace Stardust.Server.Controllers
             if (node == null) throw new InvalidOperationException("未登录!");
 
             XTrace.WriteLine("websocket连接/node_ws {0}", node);
+            WriteHistory(node, "WebSocket连接", true, websocket + "");
 
             var queue = _cache.GetQueue<String>($"cmd:{node.Code}");
             while (!cancellationToken.IsCancellationRequested && websocket.State == WebSocketState.Open)
@@ -552,6 +553,7 @@ namespace Stardust.Server.Controllers
             }
 
             XTrace.WriteLine("websocket关闭/node_ws {0}", node);
+            WriteHistory(node, "WebSocket断开", true, websocket + "");
 
             await websocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "finish", cancellationToken);
         }
Modified +1 -0
diff --git a/Stardust.ServerTests/Controllers/WebSocketTests.cs b/Stardust.ServerTests/Controllers/WebSocketTests.cs
index 93fd2cc..2f0449e 100644
--- a/Stardust.ServerTests/Controllers/WebSocketTests.cs
+++ b/Stardust.ServerTests/Controllers/WebSocketTests.cs
@@ -93,6 +93,7 @@ namespace Stardust.Server.Controllers.Tests
                 }
             });
 
+            // 客户端接收服务端推送的指令
             for (var i = 0; i < 3; i++)
             {
                 var buf = new Byte[1024];
Modified +82 -13
diff --git a/Stardust/StarClient.cs b/Stardust/StarClient.cs
index 569f426..a1c915f 100644
--- a/Stardust/StarClient.cs
+++ b/Stardust/StarClient.cs
@@ -6,7 +6,9 @@ using System.Linq;
 using System.Net;
 using System.Net.Http;
 using System.Net.NetworkInformation;
+using System.Net.WebSockets;
 using System.Reflection;
+using System.Threading;
 using System.Threading.Tasks;
 using NewLife;
 using NewLife.Http;
@@ -77,8 +79,9 @@ namespace Stardust
         {
             Logout(disposing ? "Dispose" : "GC").Wait(1_000);
 
-            _timer.TryDispose();
-            _timer = null;
+            //_timer.TryDispose();
+            //_timer = null;
+            StopTimer();
 
             base.Dispose(disposing);
         }
@@ -143,16 +146,7 @@ namespace Stardust
 
             OnLogined?.Invoke(this, EventArgs.Empty);
 
-            if (Logined && _timer == null)
-            {
-                lock (this)
-                {
-                    if (_timer == null)
-                    {
-                        _timer = new TimerX(s => Ping().Wait(), null, 5_000, 60_000, "Device") { Async = true };
-                    }
-                }
-            }
+            StartTimer();
 
             return rs;
         }
@@ -343,7 +337,6 @@ namespace Stardust
             return ext;
         }
 
-        private TimerX _timer;
         /// <summary>心跳</summary>
         /// <returns></returns>
         public async Task<Object> Ping()
@@ -422,6 +415,82 @@ namespace Stardust
         private async Task<Object> ReportAsync(Int32 id, Byte[] data) => await PostAsync<Object>("Device/Report?Id=" + id, data);
         #endregion
 
+        #region 长连接
+        private TimerX _timer;
+        private void StartTimer()
+        {
+            if (_timer == null)
+            {
+                lock (this)
+                {
+                    if (_timer == null)
+                    {
+                        _timer = new TimerX(DoPing, null, 1_000, 60_000, "Device") { Async = true };
+                    }
+                }
+            }
+        }
+
+        private void StopTimer()
+        {
+            _timer.TryDispose();
+            _timer = null;
+
+            _source.Cancel();
+
+            //_websocket.TryDispose();
+            _websocket = null;
+        }
+
+        private WebSocket _websocket;
+        private CancellationTokenSource _source;
+        private async Task DoPing(Object state)
+        {
+            await Ping();
+
+            var svc = _currentService;
+            if (svc == null || Token == null) return;
+
+            if (_websocket == null || _websocket.State != WebSocketState.Open)
+            {
+                var url = svc.Address.ToString().Replace("http://", "ws://").Replace("https://", "wss://");
+                var uri = new Uri(new Uri(url), "/node/notify");
+                var client = new ClientWebSocket();
+                client.Options.SetRequestHeader("Authorization", "Bearer " + Token);
+                await client.ConnectAsync(uri, default);
+
+                _websocket = client;
+
+                _source = new CancellationTokenSource();
+                _ = Task.Run(() => DoPull(client, _source.Token));
+            }
+        }
+
+        private async Task DoPull(WebSocket socket, CancellationToken cancellationToken)
+        {
+            try
+            {
+                var buf = new Byte[1024];
+                while (!cancellationToken.IsCancellationRequested && socket.State == WebSocketState.Open)
+                {
+                    var data = await socket.ReceiveAsync(new ArraySegment<Byte>(buf), cancellationToken);
+                    var cmd = buf.ToStr(null, 0, data.Count).ToJsonEntity<CommandModel>();
+                    if (cmd != null)
+                    {
+                        XTrace.WriteLine("Got Command: {0}", cmd);
+                        CommandQueue.Publish(cmd.Command, cmd);
+                    }
+                }
+            }
+            catch (Exception ex)
+            {
+                XTrace.WriteException(ex);
+            }
+
+            if (socket.State == WebSocketState.Open) await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "finish", default);
+        }
+        #endregion
+
         #region 更新
         /// <summary>获取更新信息</summary>
         /// <param name="channel"></param>