diff --git a/Stardust/StarClient.cs b/Stardust/StarClient.cs
index 569f426..a1c915f 100644
--- a/Stardust/StarClient.cs
+++ b/Stardust/StarClient.cs
@@ -6,7 +6,9 @@ using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.NetworkInformation;
+using System.Net.WebSockets;
using System.Reflection;
+using System.Threading;
using System.Threading.Tasks;
using NewLife;
using NewLife.Http;
@@ -77,8 +79,9 @@ namespace Stardust
{
Logout(disposing ? "Dispose" : "GC").Wait(1_000);
- _timer.TryDispose();
- _timer = null;
+ //_timer.TryDispose();
+ //_timer = null;
+ StopTimer();
base.Dispose(disposing);
}
@@ -143,16 +146,7 @@ namespace Stardust
OnLogined?.Invoke(this, EventArgs.Empty);
- if (Logined && _timer == null)
- {
- lock (this)
- {
- if (_timer == null)
- {
- _timer = new TimerX(s => Ping().Wait(), null, 5_000, 60_000, "Device") { Async = true };
- }
- }
- }
+ StartTimer();
return rs;
}
@@ -343,7 +337,6 @@ namespace Stardust
return ext;
}
- private TimerX _timer;
/// <summary>心跳</summary>
/// <returns></returns>
public async Task<Object> Ping()
@@ -422,6 +415,82 @@ namespace Stardust
private async Task<Object> ReportAsync(Int32 id, Byte[] data) => await PostAsync<Object>("Device/Report?Id=" + id, data);
#endregion
+ #region 长连接
+ private TimerX _timer;
+ private void StartTimer()
+ {
+ if (_timer == null)
+ {
+ lock (this)
+ {
+ if (_timer == null)
+ {
+ _timer = new TimerX(DoPing, null, 1_000, 60_000, "Device") { Async = true };
+ }
+ }
+ }
+ }
+
+ private void StopTimer()
+ {
+ _timer.TryDispose();
+ _timer = null;
+
+ _source.Cancel();
+
+ //_websocket.TryDispose();
+ _websocket = null;
+ }
+
+ private WebSocket _websocket;
+ private CancellationTokenSource _source;
+ private async Task DoPing(Object state)
+ {
+ await Ping();
+
+ var svc = _currentService;
+ if (svc == null || Token == null) return;
+
+ if (_websocket == null || _websocket.State != WebSocketState.Open)
+ {
+ var url = svc.Address.ToString().Replace("http://", "ws://").Replace("https://", "wss://");
+ var uri = new Uri(new Uri(url), "/node/notify");
+ var client = new ClientWebSocket();
+ client.Options.SetRequestHeader("Authorization", "Bearer " + Token);
+ await client.ConnectAsync(uri, default);
+
+ _websocket = client;
+
+ _source = new CancellationTokenSource();
+ _ = Task.Run(() => DoPull(client, _source.Token));
+ }
+ }
+
+ private async Task DoPull(WebSocket socket, CancellationToken cancellationToken)
+ {
+ try
+ {
+ var buf = new Byte[1024];
+ while (!cancellationToken.IsCancellationRequested && socket.State == WebSocketState.Open)
+ {
+ var data = await socket.ReceiveAsync(new ArraySegment<Byte>(buf), cancellationToken);
+ var cmd = buf.ToStr(null, 0, data.Count).ToJsonEntity<CommandModel>();
+ if (cmd != null)
+ {
+ XTrace.WriteLine("Got Command: {0}", cmd);
+ CommandQueue.Publish(cmd.Command, cmd);
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ XTrace.WriteException(ex);
+ }
+
+ if (socket.State == WebSocketState.Open) await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "finish", default);
+ }
+ #endregion
+
#region 更新
/// <summary>获取更新信息</summary>
/// <param name="channel"></param>