using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Net.NetworkInformation;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Runtime.Versioning;
using NewLife;
using NewLife.Caching;
using NewLife.Log;
using NewLife.Reflection;
using NewLife.Remoting;
using NewLife.Serialization;
using NewLife.Threading;
using Stardust.Managers;
using Stardust.Models;
using Stardust.Services;
using NewLife.Data;
#if NET45_OR_GREATER || NETCOREAPP || NETSTANDARD
using System.Net.WebSockets;
using WebSocket = System.Net.WebSockets.WebSocket;
#endif
namespace Stardust;
/// <summary>星星客户端。每个设备节点有一个客户端连接服务端</summary>
public class StarClient : ApiHttpClient, ICommandClient, IEventProvider
{
#region 属性
/// <summary>证书</summary>
public String? Code { get; set; }
/// <summary>密钥</summary>
public String? Secret { get; set; }
/// <summary>产品编码</summary>
public String? ProductCode { get; set; }
/// <summary>是否已登录</summary>
public Boolean Logined { get; set; }
/// <summary>登录完成后触发</summary>
public event EventHandler? OnLogined;
/// <summary>服务迁移</summary>
public event EventHandler<MigrationEventArgs>? OnMigration;
/// <summary>最后一次登录成功后的消息</summary>
public LoginResponse? Info { get; private set; }
/// <summary>请求到服务端并返回的延迟时间。单位ms</summary>
public Int32 Delay { get; set; }
///// <summary>本地应用服务管理</summary>
//public ServiceManager Manager { get; set; }
/// <summary>插件列表</summary>
public String[] Plugins { get; set; }
/// <summary>最大失败数。超过该数时,新的数据将被抛弃,默认10 * 24 * 60</summary>
public Int32 MaxFails { get; set; } = 10 * 24 * 60;
private readonly ConcurrentDictionary<String, Delegate> _commands = new(StringComparer.OrdinalIgnoreCase);
/// <summary>命令集合</summary>
public IDictionary<String, Delegate> Commands => _commands;
/// <summary>收到命令时触发</summary>
public event EventHandler<CommandEventArgs>? Received;
private FrameworkManager _frameworkManager = new();
private readonly ConcurrentQueue<PingInfo> _fails = new();
private readonly ICache _cache = new MemoryCache();
#endregion
#region 构造
/// <summary>实例化</summary>
public StarClient()
{
Log = XTrace.Log;
}
/// <summary>实例化</summary>
/// <param name="urls"></param>
public StarClient(String urls) : this()
{
if (!urls.IsNullOrEmpty())
{
var ss = urls.Split(",");
for (var i = 0; i < ss.Length; i++)
{
Add("service" + (i + 1), new Uri(ss[i]));
}
}
}
/// <summary>销毁</summary>
/// <param name="disposing"></param>
protected override void Dispose(Boolean disposing)
{
StopTimer();
Logout(disposing ? "Dispose" : "GC").Wait(1_000);
base.Dispose(disposing);
}
#endregion
#region 方法
/// <summary>远程调用拦截,支持重新登录</summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="method"></param>
/// <param name="action"></param>
/// <param name="args"></param>
/// <param name="onRequest"></param>
/// <param name="cancellationToken">取消通知</param>
/// <returns></returns>
[return: MaybeNull]
public override async Task<TResult> InvokeAsync<TResult>(HttpMethod method, String action, Object? args = null, Action<HttpRequestMessage>? onRequest = null, CancellationToken cancellationToken = default)
{
var needLogin = !Logined && !action.EqualIgnoreCase("Node/Login", "Node/Logout");
if (needLogin)
{
await Login();
}
try
{
return await base.InvokeAsync<TResult>(method, action, args, onRequest, cancellationToken);
}
catch (Exception ex)
{
var ex2 = ex.GetTrue();
if (Logined && ex2 is ApiException aex && (aex.Code == 401 || aex.Code == 403) && !action.EqualIgnoreCase("Node/Login", "Node/Logout"))
{
Log?.Debug("{0}", ex);
//XTrace.WriteException(ex);
WriteLog("重新登录!");
await Login();
return await base.InvokeAsync<TResult>(method, action, args, onRequest, cancellationToken);
}
throw;
}
}
#endregion
#region 登录
/// <summary>登录</summary>
/// <returns></returns>
public async Task<Object?> Login()
{
XTrace.WriteLine("登录:{0}", Code);
var info = GetLoginInfo();
// 登录前清空令牌,避免服务端使用上一次信息
Token = null;
Logined = false;
Info = null;
var rs = Info = await LoginAsync(info);
if (rs != null && !rs.Code.IsNullOrEmpty())
{
XTrace.WriteLine("下发证书:{0}/{1}", rs.Code, rs.Secret);
Code = rs.Code;
Secret = rs.Secret;
}
// 登录后设置用于用户认证的token
Token = rs?.Token;
Logined = true;
OnLogined?.Invoke(this, EventArgs.Empty);
StartTimer();
_frameworkManager.Attach(this);
return rs;
}
/// <summary>获取登录信息</summary>
/// <returns></returns>
public LoginInfo GetLoginInfo()
{
var di = GetNodeInfo();
var ext = new LoginInfo
{
Code = Code,
Secret = Secret.IsNullOrEmpty() ? null : Secret.MD5(),
ProductCode = ProductCode,
Node = di,
};
return ext;
}
/// <summary>获取设备信息</summary>
/// <returns></returns>
public NodeInfo GetNodeInfo()
{
var mi = MachineInfo.GetCurrent();
var asm = AssemblyX.Entry ?? AssemblyX.Create(Assembly.GetExecutingAssembly());
var mcs = NetHelper.GetMacs().Select(e => e.ToHex("-")).Where(e => e != "00-00-00-00-00-00").OrderBy(e => e).Join(",");
var path = ".".GetFullPath();
var drives = GetDrives();
var driveInfo = DriveInfo.GetDrives().FirstOrDefault(e => path.StartsWithIgnoreCase(e.Name));
var di = new NodeInfo
{
Version = asm?.FileVersion,
Compile = asm?.Compile ?? DateTime.MinValue,
OSName = mi.OSName,
OSVersion = mi.OSVersion,
MachineName = Environment.MachineName,
UserName = Environment.UserName,
IP = AgentInfo.GetIps(),
ProcessorCount = Environment.ProcessorCount,
Memory = mi.Memory,
AvailableMemory = mi.AvailableMemory,
TotalSize = (UInt64)(driveInfo?.TotalSize ?? 0),
AvailableFreeSpace = (UInt64)(driveInfo?.AvailableFreeSpace ?? 0),
DriveSize = (UInt64)drives.Sum(e => e.TotalSize),
DriveInfo = drives.Join(",", e => $"{e.Name}[{e.DriveFormat}]={e.AvailableFreeSpace.ToGMK()}/{e.TotalSize.ToGMK()}"),
Product = mi.Product,
Vendor = mi.Vendor,
Processor = mi.Processor,
//CpuRate = mi.CpuRate,
UUID = mi.UUID,
MachineGuid = mi.Guid,
SerialNumber = mi.Serial,
Board = mi.Board,
DiskID = mi.DiskID,
Macs = mcs,
InstallPath = ".".GetFullPath(),
Runtime = Environment.Version + "",
Time = DateTime.UtcNow,
Plugins = Plugins,
};
// 获取最新机器名
if (Runtime.Linux)
{
var file = @"/etc/hostname";
if (File.Exists(file)) di.MachineName = File.ReadAllText(file).Trim();
}
// 目标框架
di.Framework = _frameworkManager.GetAllVersions().Join(",", e => e.TrimStart('v'));
#if NETCOREAPP || NETSTANDARD
di.Framework ??= RuntimeInformation.FrameworkDescription?.TrimStart(".NET Framework", ".NET Core", ".NET Native", ".NET").Trim();
di.Architecture = RuntimeInformation.ProcessArchitecture + "";
if (Runtime.Linux)
{
// 识别Alpine
var nr = new NetRuntime();
if (nr.IsAlpine() && !di.OSName.StartsWithIgnoreCase("Alpine")) di.OSName = $"{di.OSName}(Alpine)";
}
#else
var ver = "";
var tar = asm?.Asm.GetCustomAttribute<TargetFrameworkAttribute>();
if (tar != null) ver = !tar.FrameworkDisplayName.IsNullOrEmpty() ? tar.FrameworkDisplayName : tar.FrameworkName;
di.Framework ??= ver?.TrimStart(".NET Framework", ".NET Core", ".NET Native", ".NET").Trim();
di.Architecture = IntPtr.Size == 8 ? "X64" : "X86";
#endif
#if NETFRAMEWORK || WINDOWS
try
{
// 收集屏幕相关信息。Mono+Linux无法获取
var g = System.Drawing.Graphics.FromHwnd(IntPtr.Zero);
di.Dpi = $"{g.DpiX}*{g.DpiY}";
var screen = System.Windows.Forms.Screen.PrimaryScreen;
if (screen != null)
di.Resolution = $"{screen.Bounds.Width}*{screen.Bounds.Height}";
}
catch { }
#else
if (Runtime.Windows) FixGdi(di);
#endif
if (Runtime.Linux) FixOnLinux(di);
return di;
}
#if NETCOREAPP || NETSTANDARD
/// <summary>更新分辨率信息</summary>
/// <param name="di"></param>
public static void FixGdi(NodeInfo di)
{
try
{
var graphics = IntPtr.Zero;
var num = NativeMethods.GdipCreateFromHWND(new HandleRef(null, IntPtr.Zero), out graphics);
if (num == 0)
{
var xx = new Single[1];
var numx = NativeMethods.GdipGetDpiX(new HandleRef(di, graphics), xx);
var yy = new Single[1];
var numy = NativeMethods.GdipGetDpiY(new HandleRef(di, graphics), yy);
if (numx == 0 && numy == 0) di.Dpi = $"{xx[0]}*{yy[0]}";
}
var w = NativeMethods.GetSystemMetrics(0);
var h = NativeMethods.GetSystemMetrics(1);
if (w > 0 && h > 0) di.Resolution = $"{w}*{h}";
}
catch { }
}
#endif
private static void FixOnLinux(NodeInfo di)
{
di.MaxOpenFiles = Execute("bash", "-c \"ulimit -n\"")?.Trim().ToInt() ?? 0;
var xrandr = Execute("xrandr", "-q");
if (!xrandr.IsNullOrEmpty())
{
var current = xrandr.Substring("current", ",").Trim();
if (!current.IsNullOrEmpty())
{
var ss = current.SplitAsInt("x");
if (ss.Length >= 2) di.Resolution = $"{ss[0]}*{ss[1]}";
}
}
}
/// <summary>获取驱动器信息</summary>
/// <returns></returns>
public static IList<DriveInfo> GetDrives()
{
var list = new List<DriveInfo>();
foreach (var di in DriveInfo.GetDrives())
{
if (!di.IsReady) continue;
if (di.DriveType is not DriveType.Fixed and not DriveType.Removable) continue;
if (di.Name != "/" && di.DriveFormat.EqualIgnoreCase("overlay", "squashfs")) continue;
if (di.Name.Contains("container") && di.Name.EndsWithIgnoreCase("/overlay")) continue;
if (di.TotalSize <= 0) continue;
if (!list.Any(e => e.Name == di.Name)) list.Add(di);
}
return list;
}
private static String? Execute(String cmd, String? arguments = null)
{
try
{
var psi = new ProcessStartInfo(cmd, arguments ?? "")
{
// UseShellExecute 必须 false,以便于后续重定向输出流
UseShellExecute = false,
RedirectStandardOutput = true
};
var process = Process.Start(psi);
if (process == null) return null;
if (!process.WaitForExit(3_000))
{
process.Kill();
return null;
}
return process.StandardOutput.ReadToEnd();
}
catch (Exception ex)
{
XTrace.WriteLine(ex.Message);
return null;
}
}
/// <summary>注销</summary>
/// <param name="reason"></param>
/// <returns></returns>
public async Task<Object?> Logout(String reason)
{
if (!Logined) return null;
Logined = false;
XTrace.WriteLine("注销:{0} {1}", Code, reason);
try
{
var rs = await LogoutAsync(reason);
// 更新令牌
Token = rs?.Token;
StopTimer();
Logined = false;
return rs;
}
catch (Exception ex)
{
Log?.Debug("{0}", ex);
//XTrace.WriteException(ex);
return null;
}
}
/// <summary>登录</summary>
/// <param name="inf">登录信息</param>
/// <returns></returns>
private async Task<LoginResponse?> LoginAsync(LoginInfo inf) => await PostAsync<LoginResponse>("Node/Login", inf);
/// <summary>注销</summary>
/// <returns></returns>
private async Task<LoginResponse?> LogoutAsync(String reason) => await GetAsync<LoginResponse>("Node/Logout", new { reason });
#endregion
#region 心跳
private readonly String[] _excludes = ["Idle", "System", "Registry", "smss", "csrss", "lsass", "wininit", "services", "winlogon", "LogonUI", "SearchUI", "fontdrvhost", "dwm", "svchost", "dllhost", "conhost", "taskhostw", "explorer", "ctfmon", "ChsIME", "WmiPrvSE", "WUDFHost", "TabTip*", "igfxCUIServiceN", "igfxEMN", "smartscreen", "sihost", "RuntimeBroker", "StartMenuExperienceHost", "SecurityHealthSystray", "SecurityHealthService", "ShellExperienceHost", "PerfWatson2", "audiodg", "spoolsv", "*ServiceHub*", "systemd*", "cron", "rsyslogd", "sudo", "dbus*", "bash", "login", "networkd*", "kworker*", "ksoftirqd*", "migration*", "auditd", "polkitd", "atd"];
/// <summary>获取心跳信息</summary>
public PingInfo GetHeartInfo()
{
var exs = _excludes.Where(e => e.Contains('*')).ToArray();
var ps = Process.GetProcesses();
var pcs = new List<String>();
foreach (var item in ps)
{
// 有些进程可能已退出,无法获取详细信息
try
{
if (Runtime.Linux && item.SessionId == 0) continue;
var name = item.GetProcessName2();
if (name.EqualIgnoreCase(_excludes) || exs.Any(e => e.IsMatch(name))) continue;
if (!pcs.Contains(name)) pcs.Add(name);
}
catch { }
}
var mi = MachineInfo.GetCurrent();
mi.Refresh();
var mcs = NetHelper.GetMacs().Select(e => e.ToHex("-")).OrderBy(e => e).Join(",");
var path = ".".GetFullPath();
var drives = GetDrives();
var driveInfo = DriveInfo.GetDrives().FirstOrDefault(e => path.StartsWithIgnoreCase(e.Name));
var ip = AgentInfo.GetIps();
var info = new PingInfo
{
AvailableMemory = mi.AvailableMemory,
AvailableFreeSpace = (UInt64)(driveInfo?.AvailableFreeSpace ?? 0),
DriveInfo = drives.Join(",", e => $"{e.Name}[{e.DriveFormat}]={e.AvailableFreeSpace.ToGMK()}/{e.TotalSize.ToGMK()}"),
CpuRate = Math.Round(mi.CpuRate, 3),
Temperature = Math.Round(mi.Temperature, 1),
Battery = Math.Round(mi.Battery, 3),
UplinkSpeed = mi.UplinkSpeed,
DownlinkSpeed = mi.DownlinkSpeed,
ProcessCount = ps.Length,
Uptime = Environment.TickCount / 1000,
Macs = mcs,
IP = ip,
Processes = pcs.Join(),
Time = DateTime.UtcNow.ToLong(),
Delay = Delay,
};
//ext.Uptime = Environment.TickCount64 / 1000;
// 开始时间 Environment.TickCount 很容易溢出,导致开机24天后变成负数。
// 后来在 netcore3.0 增加了Environment.TickCount64
// 现在借助 Stopwatch 来解决
if (Stopwatch.IsHighResolution) info.Uptime = (Int32)(Stopwatch.GetTimestamp() / Stopwatch.Frequency);
// 目标框架
info.Framework = _frameworkManager.GetAllVersions().Join(",", e => e.TrimStart('v'));
// 获取Tcp连接信息,某些Linux平台不支持
try
{
var properties = IPGlobalProperties.GetIPGlobalProperties();
var connections = properties.GetActiveTcpConnections();
info.TcpConnections = connections.Count(e => e.State == TcpState.Established);
info.TcpTimeWait = connections.Count(e => e.State == TcpState.TimeWait);
info.TcpCloseWait = connections.Count(e => e.State == TcpState.CloseWait);
}
catch { }
if (mi is IExtend ext)
{
// 读取无线信号强度
if (ext.Items.TryGetValue("Signal", out var value)) info.Signal = value.ToInt();
}
return info;
}
/// <summary>心跳</summary>
/// <returns></returns>
public async Task<Object?> Ping()
{
try
{
var inf = GetHeartInfo();
// 如果网络不可用,直接保存到队列
if (!NetworkInterface.GetIsNetworkAvailable())
{
if (_fails.Count < MaxFails) _fails.Enqueue(inf);
return null;
}
PingResponse? rs = null;
try
{
rs = await PingAsync(inf);
if (rs != null)
{
// 由服务器改变采样频率
if (rs.Period > 0 && _timer != null) _timer.Period = rs.Period * 1000;
var dt = rs.Time.ToDateTime();
if (dt.Year > 2000)
{
// 计算延迟
var ts = DateTime.UtcNow - dt;
var ms = (Int32)Math.Round(ts.TotalMilliseconds);
Delay = Delay > 0 ? (Delay + ms) / 2 : ms;
}
// 时间偏移,用于修正本地时间
dt = rs.ServerTime.ToDateTime();
if (dt.Year > 2000) _span = dt.AddMilliseconds(Delay / 2) - DateTime.UtcNow;
// 令牌
if (!rs.Token.IsNullOrEmpty()) Token = rs.Token;
// 推队列
if (rs.Commands != null && rs.Commands.Length > 0)
{
foreach (var model in rs.Commands)
{
await ReceiveCommand(model);
}
}
// 迁移到新服务器
if (!rs.NewServer.IsNullOrEmpty())
{
var arg = new MigrationEventArgs { NewServer = rs.NewServer + "" };
OnMigration?.Invoke(this, arg);
if (!arg.Cancel)
{
await Logout("切换新服务器");
// 清空原有链接,添加新链接
Services.Clear();
var ss = rs.NewServer.Split(",");
for (var i = 0; i < ss.Length; i++)
{
Add("service" + (i + 1), new Uri(ss[i]));
}
await Login();
}
}
}
}
catch
{
if (_fails.Count < MaxFails) _fails.Enqueue(inf);
throw;
}
// 上报正常,处理历史,失败则丢弃
while (_fails.TryDequeue(out var info))
{
await PingAsync(info);
}
return rs;
}
catch (Exception ex)
{
var ex2 = ex.GetTrue();
if (ex2 is ApiException aex && (aex.Code == 401 || aex.Code == 403))
{
XTrace.WriteLine("重新登录");
return Login();
}
XTrace.WriteLine("心跳异常 {0}", ex.GetTrue().Message);
throw;
}
}
/// <summary>心跳</summary>
/// <param name="inf"></param>
/// <returns></returns>
private async Task<PingResponse?> PingAsync(PingInfo inf) => await PostAsync<PingResponse>("Node/Ping", inf);
private TimeSpan _span;
/// <summary>获取相对于服务器的当前时间,避免两端时间差</summary>
/// <returns></returns>
public DateTime GetNow() => DateTime.Now.Add(_span);
private TraceService? _trace;
/// <summary>使用追踪服务</summary>
public void UseTrace()
{
_trace = new TraceService();
_trace.Attach(this);
}
#endregion
#region 上报
private readonly ConcurrentQueue<EventModel> _events = new();
private readonly ConcurrentQueue<EventModel> _failEvents = new();
private TimerX? _eventTimer;
private String? _eventTraceId;
/// <summary>批量上报事件</summary>
/// <param name="events"></param>
/// <returns></returns>
public async Task<Int32> PostEvents(params EventModel[] events) => await PostAsync<Int32>("Node/PostEvents", events);
async Task DoPostEvent(Object state)
{
if (!NetworkInterface.GetIsNetworkAvailable()) return;
DefaultSpan.Current = null;
var tid = _eventTraceId;
_eventTraceId = null;
// 正常队列为空,异常队列有数据,给它一次机会
if (_events.IsEmpty && !_failEvents.IsEmpty)
{
while (_failEvents.TryDequeue(out var ev))
{
_events.Enqueue(ev);
}
}
while (!_events.IsEmpty)
{
var max = 100;
var list = new List<EventModel>();
while (_events.TryDequeue(out var model) && max-- > 0) list.Add(model);
using var span = Tracer?.NewSpan("PostEvent", list.Count);
if (tid != null) span?.Detach(tid);
try
{
if (list.Count > 0) await PostEvents(list.ToArray());
// 成功后读取本地缓存
while (_failEvents.TryDequeue(out var ev))
{
_events.Enqueue(ev);
}
}
catch (Exception ex)
{
span?.SetError(ex, null);
// 失败后进入本地缓存
foreach (var item in list)
{
_failEvents.Enqueue(item);
}
}
}
}
/// <summary>写事件</summary>
/// <param name="type"></param>
/// <param name="name"></param>
/// <param name="remark"></param>
public virtual Boolean WriteEvent(String type, String name, String? remark)
{
// 记录追踪标识,上报的时候带上,尽可能让源头和下游串联起来
_eventTraceId = DefaultSpan.Current?.ToString();
var now = GetNow().ToUniversalTime();
var ev = new EventModel { Time = now.ToLong(), Type = type, Name = name, Remark = remark };
_events.Enqueue(ev);
_eventTimer?.SetNext(1000);
return true;
}
/// <summary>上报命令结果,如截屏、抓日志</summary>
/// <param name="id"></param>
/// <param name="data"></param>
/// <returns></returns>
private async Task<Object?> ReportAsync(Int32 id, Byte[] data) => await PostAsync<Object>("Node/Report?Id=" + id, data);
/// <summary>上报服务调用结果</summary>
/// <param name="model"></param>
/// <returns></returns>
public virtual async Task<Object?> CommandReply(CommandReplyModel model) => await PostAsync<Object>("Node/CommandReply", model);
#endregion
#region 长连接
private TimerX? _timer;
private void StartTimer()
{
if (_timer == null)
{
lock (this)
{
_timer ??= new TimerX(DoPing, null, 1_000, 60_000, "Device") { Async = true };
_eventTimer = new TimerX(DoPostEvent, null, 3_000, 60_000, "Device") { Async = true };
}
}
}
private void StopTimer()
{
_timer.TryDispose();
_timer = null;
#if NET45_OR_GREATER || NETCOREAPP || NETSTANDARD
_source?.Cancel();
try
{
if (_websocket != null && _websocket.State == WebSocketState.Open)
_websocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "finish", default);
}
catch { }
//_websocket.TryDispose();
_websocket = null;
#endif
}
private async Task DoPing(Object state)
{
DefaultSpan.Current = null;
using var span = Tracer?.NewSpan("NodePing");
try
{
await Ping();
if (!NetworkInterface.GetIsNetworkAvailable()) return;
#if NET45_OR_GREATER || NETCOREAPP || NETSTANDARD
var svc = _currentService;
if (svc == null) return;
// 使用过滤器内部token,因为它有过期刷新机制
var token = Token;
if (Filter is NewLife.Http.TokenHttpFilter thf) token = thf.Token?.AccessToken;
span?.AppendTag($"svc={svc.Address} Token=[{token?.Length}] websocket={_websocket?.State}");
if (token.IsNullOrEmpty()) return;
// 定时ws心跳
if (_websocket != null && _websocket.State == WebSocketState.Open)
{
try
{
// 在websocket链路上定时发送心跳,避免长连接被断开
var str = "Ping";
await _websocket.SendAsync(new ArraySegment<Byte>(str.GetBytes()), WebSocketMessageType.Text, true, default);
}
catch (Exception ex)
{
span?.SetError(ex, null);
WriteLog("{0}", ex);
}
}
if (_websocket == null || _websocket.State != WebSocketState.Open)
{
var url = svc.Address.ToString().Replace("http://", "ws://").Replace("https://", "wss://");
var uri = new Uri(new Uri(url), "/node/notify");
using var span2 = Tracer?.NewSpan("WebSocketConnect", uri + "");
var client = new ClientWebSocket();
client.Options.SetRequestHeader("Authorization", "Bearer " + token);
span?.AppendTag($"WebSocket.Connect {uri}");
await client.ConnectAsync(uri, default);
_websocket = client;
_source = new CancellationTokenSource();
_ = Task.Run(() => DoPull(client, _source.Token));
}
#endif
}
catch (Exception ex)
{
span?.SetError(ex, null);
Log?.Debug("{0}", ex);
}
}
#if NET45_OR_GREATER || NETCOREAPP || NETSTANDARD
private WebSocket? _websocket;
private CancellationTokenSource? _source;
private async Task DoPull(WebSocket socket, CancellationToken cancellationToken)
{
DefaultSpan.Current = null;
try
{
var buf = new Byte[4 * 1024];
while (!cancellationToken.IsCancellationRequested && socket.State == WebSocketState.Open)
{
var data = await socket.ReceiveAsync(new ArraySegment<Byte>(buf), cancellationToken);
var txt = buf.ToStr(null, 0, data.Count);
if (txt.StartsWithIgnoreCase("Pong"))
{
}
else
{
var model = txt.ToJsonEntity<CommandModel>();
if (model != null) await ReceiveCommand(model);
}
}
}
catch (WebSocketException) { }
catch (Exception ex)
{
Log?.Debug("{0}", ex);
}
using var span = Tracer?.NewSpan("NodePull", socket.State + "");
if (socket.State == WebSocketState.Open)
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "finish", default);
}
#endif
async Task ReceiveCommand(CommandModel model)
{
if (model == null) return;
// 去重,避免命令被重复执行
if (!_cache.Add($"nodecmd:{model.Id}", model, 3600)) return;
// 埋点,建立调用链
using var span = Tracer?.NewSpan("cmd:" + model.Command, model);
if (!model.TraceId.IsNullOrEmpty()) span?.Detach(model.TraceId);
try
{
//todo 有效期判断可能有隐患,现在只是假设服务器和客户端在同一个时区,如果不同,可能会出现问题
var now = GetNow();
XTrace.WriteLine("Got Command: {0}", model.ToJson());
if (model.Expire.Year < 2000 || model.Expire > now)
{
// 延迟执行
var ts = model.StartTime - now;
if (ts.TotalMilliseconds > 0)
{
TimerX.Delay(s =>
{
_ = OnReceiveCommand(model);
}, (Int32)ts.TotalMilliseconds);
var reply = new CommandReplyModel
{
Id = model.Id,
Status = CommandStatus.处理中,
Data = $"已安排计划执行 {model.StartTime.ToFullString()}"
};
await CommandReply(reply);
}
else
await OnReceiveCommand(model);
}
else
{
var reply = new CommandReplyModel { Id = model.Id, Status = CommandStatus.取消 };
await CommandReply(reply);
}
}
catch (Exception ex)
{
span?.SetError(ex, null);
}
}
/// <summary>
/// 触发收到命令的动作
/// </summary>
/// <param name="model"></param>
protected virtual async Task OnReceiveCommand(CommandModel model)
{
var e = new CommandEventArgs { Model = model };
Received?.Invoke(this, e);
var rs = await this.ExecuteCommand(model);
e.Reply ??= rs;
if (e.Reply != null && e.Reply.Id > 0) await CommandReply(e.Reply);
}
/// <summary>向命令引擎发送命令,触发指定已注册动作</summary>
/// <param name="command"></param>
/// <param name="argument"></param>
/// <returns></returns>
public async Task SendCommand(String command, String argument) => await OnReceiveCommand(new CommandModel { Command = command, Argument = argument });
#endregion
#region 更新
/// <summary>获取更新信息</summary>
/// <param name="channel"></param>
/// <param name="lastVersion">最后一次升级的本地版本</param>
/// <returns></returns>
public async Task<UpgradeInfo?> Upgrade(String channel, String lastVersion)
{
XTrace.WriteLine("检查更新:{0}", channel);
// 清理
var ug = new Stardust.Web.Upgrade { Log = XTrace.Log };
ug.DeleteBackup(".");
var rs = await UpgradeAsync(channel, lastVersion);
if (rs != null)
{
XTrace.WriteLine("发现更新:{0}", rs.ToJson(true));
}
return rs;
}
/// <summary>更新</summary>
/// <param name="channel"></param>
/// <param name="lastVersion">最后一次升级的本地版本</param>
/// <returns></returns>
public async Task<UpgradeInfo?> UpgradeAsync(String channel, String lastVersion) => await GetAsync<UpgradeInfo>("Node/Upgrade", new { channel, lastVersion });
#endregion
#region 部署
/// <summary>获取分配到本节点的应用服务信息</summary>
/// <returns></returns>
public async Task<DeployInfo[]?> GetDeploy() => await GetAsync<DeployInfo[]>("Deploy/GetAll");
/// <summary>上传本节点的所有应用服务信息</summary>
/// <param name="services"></param>
/// <returns></returns>
public async Task<Int32> UploadDeploy(ServiceInfo[] services) => await PostAsync<Int32>("Deploy/Upload", services);
/// <summary>应用心跳。上报应用信息</summary>
/// <param name="inf"></param>
/// <returns></returns>
public async Task<Int32> AppPing(AppInfo inf) => await PostAsync<Int32>("Deploy/Ping", inf);
#endregion
#region 辅助
/// <summary>
/// 把Url相对路径格式化为绝对路径。常用于文件下载
/// </summary>
/// <param name="url"></param>
/// <returns></returns>
public String BuildUrl(String url)
{
if (!url.StartsWithIgnoreCase("http://", "https://"))
{
var svr = Services.FirstOrDefault(e => e.Name == Source) ?? Services.FirstOrDefault();
if (svr != null && svr.Address != null)
url = new Uri(svr.Address, url) + "";
}
return url;
}
#endregion
}
|