using System.Collections.Concurrent;
using System.Diagnostics.CodeAnalysis;
using System.Net;
using System.Net.Http;
using System.Net.NetworkInformation;
using System.Net.Sockets;
using System.Reflection;
using NewLife.Caching;
using NewLife.Data;
using NewLife.Log;
using NewLife.Messaging;
using NewLife.Model;
using NewLife.Net;
using NewLife.Reflection;
using NewLife.Remoting.Models;
using NewLife.Security;
using NewLife.Serialization;
using NewLife.Threading;
#if !NET40
using TaskEx = System.Threading.Tasks.Task;
#endif
namespace NewLife.Remoting.Clients;
/// <summary>应用客户端基类。实现对接目标平台的登录、心跳、更新和指令下发等场景操作</summary>
/// <remarks>
/// 典型应用架构:
/// 1,RPC应用架构
/// 客户端ApiClient通过Tcp/Udp等协议连接服务端ApiServer,进行登录、心跳和更新等操作,服务端直接下发指令。
/// 例如蚂蚁调度,客户端使用应用编码和密钥登录后,获得令牌,后续无需验证令牌,直到令牌过期,重新登录。
/// 2,Http应用架构
/// 客户端ApiHttpClient通过Http/Https协议连接服务端WebApi,进行登录、心跳和更新等操作,服务端通过WebSocket下发指令。
/// 例如ZeroIot,客户端使用设备编码和密钥登录后,获得令牌,后续每次请求都需要带上令牌,在心跳时维持WebSocket长连接。
/// 3,OAuth应用架构
/// 客户端ApiHttpClient通过Http/Https协议连接服务端WebApi,进行OAuth登录,获得令牌,后续每次请求都需要带上令牌。
/// 例如星尘AppClient,AppId和AppSecret进行OAuth登录后,获得令牌,后续每次请求都需要带上令牌。
/// </remarks>
public abstract class ClientBase : DisposeBase, IApiClient, ICommandClient, IEventProvider, IEventHandler<IPacket>, ITracerFeature, ILogFeature
{
#region 属性
/// <summary>客户端名称。例如Device/Node/App</summary>
public String Name { get; set; } = null!;
/// <summary>服务端地址。支持http/tcp/udp,支持客户端负载均衡,多地址逗号分隔</summary>
public String? Server { get; set; }
/// <summary>编码。设备编码DeviceCode,或应用标识AppId</summary>
public String? Code { get; set; }
/// <summary>密钥。设备密钥DeviceSecret,或应用密钥AppSecret</summary>
public String? Secret { get; set; }
/// <summary>调用超时时间。请求发出后,等待响应的最大时间,默认15_000ms</summary>
public Int32 Timeout { get; set; } = 15_000;
/// <summary>密码提供者</summary>
/// <remarks>
/// 用于保护密码传输,默认提供者为空,密码将明文传输。
/// 推荐使用SaltPasswordProvider。
/// </remarks>
public IPasswordProvider? PasswordProvider { get; set; }
/// <summary>服务提供者</summary>
/// <remarks>借助对象容器,解析各基本接口的请求响应模型</remarks>
public IServiceProvider? ServiceProvider { get; set; }
private IApiClient? _client;
/// <summary>Api客户端。ApiClient或ApiHttpClient</summary>
public IApiClient? Client { get => _client; set => _client = value; }
String? IApiClient.Token { get => _client?.Token; set => _client?.Token = value; }
/// <summary>登录状态</summary>
public LoginStatus Status { get; set; }
/// <summary>是否已登录</summary>
public Boolean Logined => Status == LoginStatus.LoggedIn;
/// <summary>登录完成后触发</summary>
public event EventHandler<LoginEventArgs>? OnLogined;
/// <summary>请求到服务端并返回的延迟时间。单位ms</summary>
public Int32 Delay { get; set; }
private TimeSpan _span;
/// <summary>时间差。服务器时间减去客户端时间</summary>
public TimeSpan Span => _span;
/// <summary>最大失败数。心跳上报失败时进入失败队列,并稍候重试。重试超过该数时,新的数据将被抛弃,默认1440次,约24小时</summary>
public Int32 MaxFails { get; set; } = 1 * 24 * 60;
private readonly ConcurrentDictionary<String, Delegate> _commands = new(StringComparer.OrdinalIgnoreCase);
/// <summary>命令集合。注册到客户端的命令与委托</summary>
public IDictionary<String, Delegate> Commands => _commands;
/// <summary>收到下行命令时触发</summary>
public event EventHandler<CommandEventArgs>? Received;
/// <summary>客户端功能特性。默认登录注销心跳,可添加更新等</summary>
public Features Features { get; set; } = Features.Login | Features.Logout | Features.Ping;
/// <summary>各功能的动作集合。记录每一种功能所对应的动作接口路径。</summary>
public IDictionary<Features, String> Actions { get; set; } = null!;
/// <summary>Json主机。提供序列化能力</summary>
public IJsonHost JsonHost { get; set; } = null!;
/// <summary>JSON序列化选项,影响复杂对象的编码和解码行为</summary>
public JsonOptions? JsonOptions { get; set; }
/// <summary>客户端设置</summary>
public IClientSetting? Setting { get; set; }
/// <summary>协议版本</summary>
private readonly static String _version;
private readonly static String _name;
private readonly ConcurrentQueue<IPingRequest> _fails = new();
private readonly ICache _cache = new MemoryCache();
#endregion
#region 构造
static ClientBase()
{
var asm = AssemblyX.Entry ?? AssemblyX.Create(Assembly.GetExecutingAssembly());
_version = asm?.FileVersion + "";
_name = asm?.Name ?? "NewLifeRemoting";
}
/// <summary>实例化</summary>
public ClientBase() => Name = GetType().Name.TrimSuffix("Client");
/// <summary>通过客户端设置实例化</summary>
/// <param name="setting">客户端设置</param>
public ClientBase(IClientSetting setting) : this()
{
Setting = setting;
Server = setting.Server;
Code = setting.Code;
Secret = setting.Secret;
}
/// <summary>销毁资源</summary>
/// <param name="disposing">是否释放托管资源</param>
protected override void Dispose(Boolean disposing)
{
base.Dispose(disposing);
if (Features.HasFlag(Features.Logout))
Logout(disposing ? "Dispose" : "GC").Wait(1_000);
StopTimer();
Status = LoginStatus.Ready;
_timerLogin.TryDispose();
_timerLogin = null;
if (_client is ApiClient client)
client.Received -= OnRpcReceive;
_client.TryDispose();
_client = null;
}
#endregion
#region 方法
/// <summary>设置各功能接口路径</summary>
/// <param name="prefix">接口路径前缀。例如 Device/ 或 Node/</param>
protected virtual void SetActions(String prefix)
{
Actions = new Dictionary<Features, String>
{
[Features.Login] = prefix + "Login",
[Features.Logout] = prefix + "Logout",
[Features.Ping] = prefix + "Ping",
[Features.Upgrade] = prefix + "Upgrade",
[Features.Notify] = prefix + "Notify",
[Features.CommandReply] = prefix + "CommandReply",
[Features.PostEvent] = prefix + "PostEvents",
};
this.RegisterCommand(prefix + "Upgrade", ReceiveUpgrade);
}
private String? _lastServer;
/// <summary>初始化客户端</summary>
/// <remarks>
/// 检查并更新服务端地址,初始化对象容器和客户端实例。
/// 若客户端已存在且地址未变,则直接返回。
/// </remarks>
[MemberNotNull(nameof(_client))]
protected void Init()
{
if (_client != null)
{
// 如果配置中的服务端地址与当前不一致,则需要同步修改客户端的服务地址
var urls = Server ?? Setting?.Server;
if (!urls.IsNullOrEmpty() && urls != _lastServer)
{
using var span = Tracer?.NewSpan("ChangeServer", new { urls, _lastServer });
if (_client is ApiHttpClient http)
http.SetServer(urls);
else if (_client is ApiClient rpc)
rpc.SetServer(urls);
_lastServer = urls;
}
return;
}
OnInit();
if (Actions == null || Actions.Count == 0) SetActions("Device/");
}
/// <summary>初始化对象容器以及客户端</summary>
[MemberNotNull(nameof(_client))]
protected virtual void OnInit()
{
var provider = ServiceProvider ??= ObjectContainer.Provider;
// 找到容器,注册默认的模型实现,供后续InvokeAsync返回时自动创建正确的模型对象
var container = provider?.GetService<IObjectContainer>() ?? ObjectContainer.Current;
if (container != null)
{
container.TryAddTransient<ILoginRequest, LoginRequest>();
container.TryAddTransient<ILoginResponse, LoginResponse>();
container.TryAddTransient<ILogoutResponse, LogoutResponse>();
container.TryAddTransient<IPingRequest, PingRequest>();
container.TryAddTransient<IPingResponse, PingResponse>();
container.TryAddTransient<IUpgradeInfo, UpgradeInfo>();
}
JsonHost ??= GetService<IJsonHost>() ?? JsonHelper.Default;
//PasswordProvider ??= GetService<IPasswordProvider>() ?? new SaltPasswordProvider { Algorithm = "md5", SaltTime = 60 };
PasswordProvider ??= GetService<IPasswordProvider>();
if (_client == null)
{
var urls = Server ?? Setting?.Server;
if (urls.IsNullOrEmpty()) throw new ArgumentNullException(nameof(Setting), "未指定服务端地址");
_client = urls.StartsWithIgnoreCase("http", "https") ? CreateHttp(urls) : CreateRpc(urls);
_lastServer = urls;
}
}
/// <summary>创建Http客户端</summary>
/// <param name="urls">服务端地址。支持逗号分隔的多地址</param>
/// <returns>Http客户端实例</returns>
protected virtual ApiHttpClient CreateHttp(String urls) => new(urls)
{
JsonHost = JsonHost,
DefaultUserAgent = $"{_name}/v{_version}",
Timeout = Timeout,
Log = Log,
};
/// <summary>创建RPC客户端</summary>
/// <param name="urls">服务端地址。支持逗号分隔的多地址</param>
/// <returns>RPC客户端实例</returns>
protected virtual ApiClient CreateRpc(String urls)
{
var client = new MyApiClient
{
Name = Name,
Client = this,
Servers = urls.Split(","),
JsonHost = JsonHost,
ServiceProvider = ServiceProvider,
Timeout = Timeout,
Log = Log
};
client.Received += OnRpcReceive;
return client;
}
private void OnRpcReceive(Object? sender, ApiReceivedEventArgs e)
{
//var client = (sender as ApiClient)!;
var msg = e.Message;
var api = e.ApiMessage;
if (msg != null && !msg.Reply && api != null && api.Action == "Notify")
{
if (api.Data != null)
_ = HandleAsync(api.Data, null, default);
}
}
class MyApiClient : ApiClient
{
public ClientBase Client { get; set; } = null!;
protected override Task<Object?> OnLoginAsync(ISocketClient client, Boolean force, CancellationToken cancellationToken) => InvokeWithClientAsync<Object>(client, Client.Actions[Features.Login], Client.BuildLoginRequest(), 0, cancellationToken);
}
/// <summary>异步调用。HTTP默认POST,自动识别GET</summary>
/// <param name="action">动作</param>
/// <param name="args">参数</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>调用结果</returns>
protected virtual async Task<TResult> OnInvokeAsync<TResult>(String action, Object? args, CancellationToken cancellationToken)
{
if (Log != null && Log.Level <= LogLevel.Debug) WriteLog("[{0}]=>{1}", action, args is IPacket or Byte[]? "" : args?.ToJson());
Init();
// HTTP请求需要区分GET/POST
TResult? rs = default;
if (_client is ApiHttpClient http)
{
var method = HttpMethod.Post;
if (args == null || args.GetType().IsBaseType() || action.StartsWithIgnoreCase("Get") || action.IndexOf("/get", StringComparison.OrdinalIgnoreCase) >= 0)
method = HttpMethod.Get;
rs = await http.InvokeAsync<TResult>(method, action, args, null, cancellationToken).ConfigureAwait(false);
}
else
rs = await _client.InvokeAsync<TResult>(action, args, cancellationToken).ConfigureAwait(false);
if (Log != null && Log.Level <= LogLevel.Debug) WriteLog("[{0}]<={1}", action, rs is IPacket or Byte[]? "" : rs?.ToJson());
return rs!;
}
/// <summary>异步Get调用(仅用于HTTP)</summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="action">动作</param>
/// <param name="args">参数</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>调用结果</returns>
/// <exception cref="NotSupportedException"></exception>
protected virtual async Task<TResult> GetAsync<TResult>(String action, Object? args, CancellationToken cancellationToken = default)
{
if (Log != null && Log.Level <= LogLevel.Debug) WriteLog("[{0}]=>{1}", action, args is IPacket or Byte[]? "" : args?.ToJson());
Init();
if (_client is not ApiHttpClient http) throw new NotSupportedException();
// 验证登录
var needLogin = !Actions[Features.Login].EqualIgnoreCase(action);
if (needLogin && !Logined && Features.HasFlag(Features.Login))
{
if (Disposed) throw new ObjectDisposedException(GetType().Name);
await Login(action, cancellationToken).ConfigureAwait(false);
}
try
{
// GET请求
var rs = await http.InvokeAsync<TResult>(HttpMethod.Get, action, args, null, cancellationToken).ConfigureAwait(false);
if (Log != null && Log.Level <= LogLevel.Debug) WriteLog("[{0}]<={1}", action, rs is IPacket or Byte[]? "" : rs?.ToJson());
return rs!;
}
catch (Exception ex)
{
var ex2 = ex.GetTrue();
if (ex2 is ApiException aex)
{
// 在客户端已登录状态下,服务端返回未授权,可能是令牌过期,尝试重新登录
if (Logined && aex.Code == ApiCode.Unauthorized)
{
Status = LoginStatus.Ready;
if (needLogin && Features.HasFlag(Features.Login))
{
Log?.Debug("{0}", ex);
WriteLog("重新登录,因调用[{0}]失败:{1}", action, ex.Message);
await Login(action, cancellationToken).ConfigureAwait(false);
// 再次执行当前请求
var rs = await http.InvokeAsync<TResult>(HttpMethod.Get, action, args, null, cancellationToken).ConfigureAwait(false);
if (Log != null && Log.Level <= LogLevel.Debug) WriteLog("[{0}]<={1}", action, rs is IPacket or Byte[]? "" : rs?.ToJson());
return rs!;
}
}
throw new ApiException(aex.Code, $"[{action}]{aex.Message}");
}
if (Log != null && Log.Enable && Log.Level <= LogLevel.Debug)
throw new XException($"[{action}]{ex.Message}", ex);
else
throw new XException($"[{action}]{ex.Message}");
}
}
/// <summary>[核心接口]远程调用服务端接口,支持重新登录</summary>
/// <remarks>
/// 所有对服务端接口的调用,都应该走这个方法,以便统一处理登录、心跳、令牌过期等问题。
/// </remarks>
/// <typeparam name="TResult">返回结果类型</typeparam>
/// <param name="action">动作接口路径</param>
/// <param name="args">请求参数</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>调用结果</returns>
public virtual async Task<TResult?> InvokeAsync<TResult>(String action, Object? args = null, CancellationToken cancellationToken = default)
{
// 验证登录。如果该接口需要登录,且未登录,则先登录
var needLogin = !Actions[Features.Login].EqualIgnoreCase(action);
if (needLogin && !Logined && Features.HasFlag(Features.Login))
{
if (Disposed) throw new ObjectDisposedException(GetType().Name);
await Login(action, cancellationToken).ConfigureAwait(false);
}
try
{
return await OnInvokeAsync<TResult>(action, args, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
var ex2 = ex.GetTrue();
if (ex2 is ApiException aex)
{
// 在客户端已登录状态下,服务端返回未授权,可能是令牌过期,尝试重新登录
if (Logined && aex.Code == ApiCode.Unauthorized)
{
Status = LoginStatus.Ready;
if (needLogin && Features.HasFlag(Features.Login))
{
Log?.Debug("{0}", ex);
WriteLog("重新登录,因调用[{0}]失败:{1}", action, ex.Message);
await Login(action, cancellationToken).ConfigureAwait(false);
// 再次执行当前请求
return await OnInvokeAsync<TResult>(action, args, cancellationToken).ConfigureAwait(false);
}
}
throw new ApiException(aex.Code, $"[{action}]{aex.Message}");
}
if (Log != null && Log.Enable && Log.Level <= LogLevel.Debug)
throw new XException($"[{action}]{ex.Message}", ex);
else
throw new XException($"[{action}]{ex.Message}");
}
}
/// <summary>同步调用</summary>
/// <typeparam name="TResult">返回结果类型</typeparam>
/// <param name="action">动作接口路径</param>
/// <param name="args">请求参数</param>
/// <returns>调用结果</returns>
[return: MaybeNull]
public virtual TResult Invoke<TResult>(String action, Object? args = null)
{
using var source = new CancellationTokenSource(Timeout);
return InvokeAsync<TResult>(action, args, source.Token).ConfigureAwait(false).GetAwaiter().GetResult();
}
/// <summary>设置令牌。派生类可重定义逻辑</summary>
/// <param name="token">认证令牌</param>
protected virtual void SetToken(String? token) => _client?.Token = token;
/// <summary>获取相对于服务器的当前时间,本地时区,避免两端时间差</summary>
/// <returns></returns>
public DateTime GetNow() => DateTime.Now.Add(_span);
#endregion
#region 登录注销
private TimerX? _timerLogin;
private Int32 _times;
/// <summary>打开连接,尝试登录服务端。在网络未就绪之前反复尝试</summary>
public virtual void Open()
{
_timerLogin = new TimerX(TryConnectServer, null, 0, 5_000) { Async = true };
}
private async Task TryConnectServer(Object state)
{
if (!NetworkInterface.GetIsNetworkAvailable())
{
WriteLog("网络不可达,延迟连接服务器");
return;
}
var timer = _timerLogin;
try
{
using var source = new CancellationTokenSource(Timeout);
if (!Logined) await Login(nameof(TryConnectServer), source.Token).ConfigureAwait(false);
}
catch (Exception ex)
{
// 登录报错后,加大定时间隔,输出简单日志
if (timer != null && timer.Period < 30_000) timer.Period += 5_000;
Log?.Error("[{0}]TryConnectServer: {1}", Name, ex.Message);
if (!Logined) return;
}
timer.TryDispose();
_timerLogin = null;
}
/// <summary>登录。使用编码和密钥登录服务端,获取令牌用于后续接口调用</summary>
/// <remarks>
/// 支持编码和密钥下发(自动注册)、时间校准。
/// 用户可重载Login实现自定义登录逻辑,通过Logined判断是否登录成功。
/// 也可以在OnLogined事件中处理登录成功后的逻辑。
/// </remarks>
/// <param name="source">来源。标记从哪里发起登录请求</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>登录响应</returns>
public virtual async Task<ILoginResponse?> Login(String? source = null, CancellationToken cancellationToken = default)
{
// 如果已登录,直接返回
if (Status == LoginStatus.LoggedIn) return null;
//!!! 这里的多线程登录设计不采取共用Task的架构,因为首次登录可能会失败,后续其它线程需要重新登录,而不是共用失败结果。
var times = Interlocked.Increment(ref _times);
var level = times < 10 ? LogLevel.Info : LogLevel.Debug;
// 如果正在登录,则稍等一会,避免重复登录。
if (Status == LoginStatus.LoggingIn)
{
Log?.Write(level, "[{0}]正在登录,请稍等{1}ms!序号:{2},来源:{3}", Name, 50 * 100, times, source);
for (var i = 0; Status == LoginStatus.LoggingIn && i < 50; i++)
{
await TaskEx.Delay(100, cancellationToken).ConfigureAwait(false);
if (Status == LoginStatus.LoggedIn) return null;
}
}
if (Status != LoginStatus.LoggedIn) Status = LoginStatus.LoggingIn;
Init();
ILoginRequest? request = null;
ILoginResponse? response = null;
using var span = Tracer?.NewSpan(nameof(Login), new { Code, source, Server });
Log?.Write(level, "[{0}]登录:{1},序号:{2},来源:{3}", Name, Code, times, source);
try
{
// 创建登录请求,用户可重载BuildLoginRequest实现自定义登录请求,填充更多参数
request = BuildLoginRequest();
// 登录前清空令牌,避免服务端使用上一次信息
SetToken(null);
// 滚动的登录超时时间,实际上只对StarServer有效
var timeout = times * 1000;
if (timeout > Timeout) timeout = Timeout;
using var timeoutSource = new CancellationTokenSource(timeout);
using var ts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutSource.Token);
response = await LoginAsync(request, ts.Token).ConfigureAwait(false);
if (response == null) return null;
Log?.Write(level, "[{0}]登录成功:{1},序号:{2},来源:{3}", Name, response, times, source);
// 登录后设置用于用户认证的token
SetToken(response.Token);
Status = LoginStatus.LoggedIn;
}
catch (Exception ex)
{
Log?.Write(level, "[{0}]登录失败:{1},序号:{2},来源:{3}", Name, ex.Message, times, source);
Status = LoginStatus.Ready;
span?.SetError(ex, null);
throw;
}
// 登录成果。服务端执行自动注册时,可能有下发编码和密钥
if (!response.Code.IsNullOrEmpty() && !response.Secret.IsNullOrEmpty())
{
WriteLog("下发密钥:{0}/{1}", response.Code, response.Secret);
Code = response.Code;
Secret = response.Secret;
var set = Setting;
if (set != null)
{
set.Code = response.Code;
set.Secret = response.Secret;
set.Save();
}
}
FixTime(response.Time, response.ServerTime);
OnLogined?.Invoke(this, new(request, response));
StartTimer();
return response;
}
/// <summary>计算客户端到服务端的网络延迟,以及相对时间差。支持GetNow()返回基于服务器的当前时间</summary>
/// <param name="startTime">请求开始时间戳(UTC毫秒)</param>
/// <param name="serverTime">服务器时间戳(UTC毫秒)</param>
protected void FixTime(Int64 startTime, Int64 serverTime)
{
var dt = startTime.ToDateTime();
if (dt.Year > 2000)
{
// 计算延迟
var ts = DateTime.UtcNow - dt;
var ms = (Int32)ts.TotalMilliseconds;
if (Delay > 0)
Delay = (Delay + ms) / 2;
else
Delay = ms;
}
// 时间偏移
dt = serverTime.ToDateTime();
if (dt.Year > 2000) _span = dt.AddMilliseconds(Delay / 2) - DateTime.UtcNow;
}
/// <summary>创建登录请求。支持重载后使用自定义的登录请求对象</summary>
/// <remarks>
/// 用户可以重载此方法,返回自定义的登录请求对象,用于支持更多的登录参数。
/// 也可以调用基类BuildLoginRequest后得到ClientId和Secret等基本参数,然后填充自己的登录请求对象。
/// 还可以直接使用自己的登录请求对象,调用FillLoginRequest填充基本参数。
/// </remarks>
/// <returns>登录请求</returns>
public virtual ILoginRequest BuildLoginRequest()
{
Init();
var request = GetService<ILoginRequest>() ?? new LoginRequest();
FillLoginRequest(request);
return request;
}
/// <summary>填充登录请求。用户自定义登录时可选调用</summary>
/// <param name="request">登录请求</param>
protected virtual void FillLoginRequest(ILoginRequest request)
{
request.Code = Code;
request.ClientId = Runtime.ClientId;
if (!Secret.IsNullOrEmpty())
request.Secret = PasswordProvider?.Hash(Secret) ?? Secret;
if (request is ILoginRequest2 info)
{
var asm = AssemblyX.Entry ?? AssemblyX.Create(Assembly.GetExecutingAssembly());
if (asm != null)
{
info.Version = asm.FileVersion;
info.Compile = asm.Compile.ToUniversalTime().ToLong();
}
info.IP = NetHelper.GetIPsWithCache().Where(e => e.IsIPv4() && e.GetAddressBytes()[0] != 169).Join();
info.Macs = NetHelper.GetMacs().Select(e => e.ToHex("-")).Where(e => e != "00-00-00-00-00-00").OrderBy(e => e).Join(",");
info.UUID = MachineInfo.GetCurrent().BuildCode();
info.Time = DateTime.UtcNow.ToLong();
}
}
/// <summary>注销。调用服务端注销接口,销毁令牌</summary>
/// <param name="reason">原因</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>注销响应</returns>
public virtual async Task<ILogoutResponse?> Logout(String? reason, CancellationToken cancellationToken = default)
{
if (!Logined) return null;
using var span = Tracer?.NewSpan(nameof(Logout), reason);
WriteLog("注销:{0} {1}", Code, reason);
try
{
var rs = await LogoutAsync(reason, cancellationToken).ConfigureAwait(false);
// 更新令牌
SetToken(rs?.Token);
StopTimer();
Status = LoginStatus.Ready;
return rs;
}
catch (Exception ex)
{
span?.SetError(ex, null);
Log?.Error("[{0}]Logout: {1}", Name, ex.ToString());
return null;
}
}
/// <summary>发起登录异步请求。由Login内部调用</summary>
/// <param name="request">登录请求</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>登录响应</returns>
protected virtual Task<ILoginResponse?> LoginAsync(ILoginRequest request, CancellationToken cancellationToken) => InvokeAsync<ILoginResponse>(Actions[Features.Login], request, cancellationToken);
/// <summary>发起注销异步请求。由Logout内部调用</summary>
/// <param name="reason">注销原因</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>注销响应</returns>
protected virtual async Task<ILogoutResponse?> LogoutAsync(String? reason, CancellationToken cancellationToken)
{
if (_client is ApiHttpClient)
return await GetAsync<ILogoutResponse>(Actions[Features.Logout], new { reason }, cancellationToken).ConfigureAwait(false);
return await InvokeAsync<ILogoutResponse>(Actions[Features.Logout], new { reason }, cancellationToken).ConfigureAwait(false);
}
#endregion
#region 心跳保活
/// <summary>心跳。请求服务端心跳接口,上报客户端性能数据的同时,更新其在服务端的最后活跃时间</summary>
/// <remarks>
/// 心跳逻辑内部带有失败重试机制,最大失败数MaxFails默认120,超过该数时,新的数据将被抛弃。
/// 在网络不可用或者接口请求异常时,会将数据保存到队列,等待网络恢复或者下次心跳时重试。
/// </remarks>
/// <returns>心跳响应</returns>
public virtual async Task<IPingResponse?> Ping(CancellationToken cancellationToken = default)
{
Init();
//using var span = Tracer?.NewSpan(nameof(Ping));
try
{
// 创建心跳请求。支持重载后使用自定义的心跳请求对象,填充更多参数
var request = BuildPingRequest();
// 如果网络不可用,直接保存到队列
if (!NetworkInterface.GetIsNetworkAvailable())
{
// 如果心跳请求实现了ICloneable接口,可以克隆一份,避免后续修改
if (_fails.Count < MaxFails) _fails.Enqueue((request as ICloneable)?.Clone() as IPingRequest ?? request);
return null;
}
IPingResponse? response = null;
try
{
response = await PingAsync(request, cancellationToken).ConfigureAwait(false);
if (response != null)
{
// 由服务器改变采样频率
if (response.Period > 0 && _timerPing != null) _timerPing.Period = response.Period * 1000;
FixTime(response.Time, response.ServerTime);
// 更新令牌。即将过期时,服务端会返回新令牌
if (!response.Token.IsNullOrEmpty()) SetToken(response.Token);
// 心跳响应携带的命令,推送到队列
var commands = (response as PingResponse)?.Commands;
if (commands != null && commands.Length > 0)
{
foreach (var model in commands)
{
await ReceiveCommand(model, null, "Pong", cancellationToken).ConfigureAwait(false);
}
}
}
}
catch
{
// 失败时保存到队列
//if (_fails.Count < MaxFails) _fails.Enqueue(request);
if (_fails.Count < MaxFails) _fails.Enqueue((request as ICloneable)?.Clone() as IPingRequest ?? request);
throw;
}
// 上报正常,处理历史,失败则丢弃
while (_fails.TryDequeue(out var info))
{
await PingAsync(info, cancellationToken).ConfigureAwait(false);
}
return response;
}
catch (Exception ex)
{
//span?.SetError(ex, null);
var ex2 = ex.GetTrue();
if (ex2 is ApiException aex && (aex.Code == ApiCode.Unauthorized || aex.Code == ApiCode.Forbidden))
{
Status = LoginStatus.Ready;
if (Features.HasFlag(Features.Login))
{
Log?.Debug("[{0}]重新登录,因心跳失败:{1}", Name, ex.Message);
await Login(nameof(Ping), cancellationToken).ConfigureAwait(false);
}
return null;
}
Log?.Debug("[{0}]心跳异常:{1}", Name, ex.GetTrue().Message);
// 常见网络断开错误不要抛出异常
if (ex2 is IOException || ex2 is SocketException sex && sex.SocketErrorCode == SocketError.ConnectionReset)
return null;
throw;
}
}
/// <summary>创建心跳请求。支持重载后使用自定义的心跳请求对象</summary>
/// <remarks>
/// 用户可以重载此方法,返回自定义的心跳请求对象,用于支持更多的心跳参数。
/// 也可以调用基类BuildPingRequest后得到基本参数,然后填充自己的心跳请求对象。
/// 还可以直接使用自己的心跳请求对象,调用FillPingRequest填充基本参数。
/// </remarks>
public virtual IPingRequest BuildPingRequest()
{
Init();
var request = GetService<IPingRequest>() ?? new PingRequest();
FillPingRequest(request);
return request;
}
/// <summary>填充心跳请求</summary>
/// <param name="request"></param>
protected virtual void FillPingRequest(IPingRequest request)
{
if (request is IPingRequest2 req)
{
var path = ".".GetFullPath();
var driveInfo = DriveInfo.GetDrives().FirstOrDefault(e => path.StartsWithIgnoreCase(e.Name));
var mi = MachineInfo.GetCurrent();
mi.Refresh();
req.Memory = mi.Memory;
req.AvailableMemory = mi.AvailableMemory;
req.FreeMemory = mi.FreeMemory;
req.TotalSize = (UInt64)(driveInfo?.TotalSize ?? 0);
req.AvailableFreeSpace = (UInt64)(driveInfo?.AvailableFreeSpace ?? 0);
req.CpuRate = Math.Round(mi.CpuRate, 3);
req.Temperature = Math.Round(mi.Temperature, 1);
req.Battery = Math.Round(mi.Battery, 3);
if (request is PingRequest preq)
{
preq.UplinkSpeed = mi.UplinkSpeed;
preq.DownlinkSpeed = mi.DownlinkSpeed;
}
var ip = NetHelper.GetIPs().Where(ip => ip.IsIPv4() && !IPAddress.IsLoopback(ip) && ip.GetAddressBytes()[0] != 169).Join();
req.IP = ip;
req.Delay = Delay;
req.Uptime = (Int32)(Runtime.TickCount64 / 1000);
//// 开始时间 Environment.TickCount 很容易溢出,导致开机24天后变成负数。
//// 后来在 netcore3.0 增加了Environment.TickCount64
//// 现在借助 Stopwatch 来解决
//if (Stopwatch.IsHighResolution) req.Uptime = (Int32)(Stopwatch.GetTimestamp() / Stopwatch.Frequency);
if (mi is IExtend ext)
{
// 读取无线信号强度
if (ext.Items.TryGetValue("Signal", out var value)) req.Signal = value.ToInt();
}
}
// 最后设置时间,避免因为代码执行原因导致误差过大
request.Time = DateTime.UtcNow.ToLong();
}
/// <summary>发起心跳异步请求。由Ping内部调用</summary>
/// <param name="request">心跳请求</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>心跳响应</returns>
protected virtual Task<IPingResponse?> PingAsync(IPingRequest request, CancellationToken cancellationToken) => InvokeAsync<IPingResponse>(Actions[Features.Ping], request, cancellationToken);
#endregion
#region 升级更新
private async Task CheckUpgrade(Object? data)
{
if (!NetworkInterface.GetIsNetworkAvailable()) return;
await Upgrade(null).ConfigureAwait(false);
}
private async Task<String?> ReceiveUpgrade(String? arguments)
{
// 参数作为通道
var channel = arguments;
var rs = await Upgrade(channel).ConfigureAwait(false);
if (rs == null) return "没有可用更新!";
return $"成功更新到[{rs.Version}]";
}
private String? _lastVersion;
/// <summary>获取更新信息。如有更新,则下载解压覆盖并重启应用</summary>
/// <param name="channel">更新通道</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>更新信息</returns>
public virtual async Task<IUpgradeInfo?> Upgrade(String? channel, CancellationToken cancellationToken = default)
{
using var span = Tracer?.NewSpan(nameof(Upgrade));
WriteLog("检查更新");
// 清理旧版备份文件
var ug = new Upgrade { Log = Log };
ug.DeleteBackup(".");
// 调用接口查询思否存在更新信息
var info = await UpgradeAsync(channel, cancellationToken).ConfigureAwait(false);
if (info == null || info.Version == _lastVersion) return info;
// _lastVersion避免频繁更新同一个版本
WriteLog("发现更新:{0}", info.ToJson(true));
this.WriteInfoEvent("Upgrade", $"准备从[{_lastVersion}]更新到[{info.Version}],开始下载 {info.Source}");
try
{
// 下载文件包
ug.Url = BuildUrl(info.Source!);
await ug.Download(cancellationToken).ConfigureAwait(false);
// 检查文件完整性
if (!info.FileHash.IsNullOrEmpty() && !ug.CheckFileHash(info.FileHash))
{
this.WriteInfoEvent("Upgrade", "下载完成,哈希校验失败");
}
else
{
this.WriteInfoEvent("Upgrade", "下载完成,准备解压文件");
if (!ug.Extract())
{
this.WriteInfoEvent("Upgrade", "解压失败");
}
else
{
var info2 = info as IUpgradeInfo2;
if (info2 != null && !info2.Preinstall.IsNullOrEmpty())
{
this.WriteInfoEvent("Upgrade", "执行预安装脚本");
ug.Run(info2.Preinstall);
}
this.WriteInfoEvent("Upgrade", "解压完成,准备覆盖文件");
// 执行更新,解压缩覆盖文件
var rs = ug.Update();
// 执行更新后命令
if (rs && info2 != null && !info2.Executor.IsNullOrEmpty()) ug.Run(info2.Executor);
_lastVersion = info.Version;
// 强制更新时,马上重启
if (rs && info2 != null && info2.Force) Restart(ug);
}
}
}
catch (Exception ex)
{
span?.SetError(ex, null);
//Log?.Error(ex.ToString());
this.WriteErrorEvent("Upgrade", $"更新失败!{ex.Message}");
throw;
}
return info;
}
/// <summary>更新完成,重启自己</summary>
/// <remarks>
/// 自更新策略(冒烟测试 + 自动回滚):
/// 1. 新版文件已覆盖到位,旧版 .exe/.dll/.json 备份为 .del
/// 2. 旧进程尝试拉起新进程作为冒烟测试——若新版本在 3 秒内异常退出,说明存在致命缺陷
/// 3. 首次失败不立即判定为缺陷——等待 5 秒(给慢速存储落盘 + 运行时 JIT 预热等留出时间),重试一次
/// 4. 重试仍失败 → 回滚所有 .del 恢复旧版文件 → 旧进程继续提供服务
/// (同一启动周期内 exec 从页缓存读取,落盘延迟不是 root cause;重试主要应对 eMMC GC、JIT 冷启动等)
/// 5. 两次均通过 → 旧进程退出,新进程接管
/// </remarks>
/// <param name="upgrade"></param>
protected virtual void Restart(Upgrade upgrade)
{
var asm = Assembly.GetEntryAssembly();
if (asm == null) return;
var name = asm.GetName().Name;
if (name.IsNullOrEmpty()) return;
// 重新拉起进程。对于大多数应用,都是拉起新进程,然后退出当前进程;对于星尘代理,通过新进程来重启服务。
var gs = BuildRestartArguments();
// 冒烟测试:尝试拉起新版本进程
var rs = upgrade.Run(name, gs, 3_000);
if (!rs)
{
// 首次失败可能是慢速存储落盘延迟或 JIT 冷启动,等待后重试一次
var delay = 5_000;
this.WriteInfoEvent("Upgrade", $"新版首次启动失败,等待{delay}ms后重试。{upgrade.LastErrorMessage}");
Thread.Sleep(delay);
rs = upgrade.Run(name, gs, 3_000);
}
if (rs)
{
// 新版启动成功(冒烟测试通过),旧版退出
var pid = System.Diagnostics.Process.GetCurrentProcess().Id;
this.WriteInfoEvent("Upgrade", "强制更新完成,新进程已拉起,准备退出当前进程!PID=" + pid);
upgrade.KillSelf();
}
else
{
// 两次均失败,确认新版有致命缺陷,回滚 .del 恢复旧版文件,旧进程继续运行
upgrade.Rollback();
this.WriteInfoEvent("Upgrade", "新版启动失败(冒烟测试不通过),已回滚旧版文件,当前服务继续运行。" + upgrade.LastErrorMessage);
}
}
/// <summary>构建重启命令行参数。派生类可重载以支持服务模式等自定义逻辑</summary>
/// <remarks>
/// 默认行为:从当前进程命令行去掉 args[0](可执行文件路径),注入 -upgrade 标记。
/// MyStarClient 可重载此方法生成 -restart -upgrade(服务模式)或 -run -upgrade(进程模式)。
/// </remarks>
/// <returns>重启参数</returns>
protected virtual String BuildRestartArguments()
{
var args = Environment.GetCommandLineArgs();
if (args == null || args.Length == 0) args = new String[1];
args[0] = "-upgrade";
return args.Join(" ");
}
/// <summary>放弃更新异步请求。由Upgrade内部调用</summary>
/// <param name="channel">更新通道</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>更新信息</returns>
protected virtual async Task<IUpgradeInfo?> UpgradeAsync(String? channel, CancellationToken cancellationToken)
{
if (_client is ApiHttpClient)
return await GetAsync<IUpgradeInfo>(Actions[Features.Upgrade], new { channel }, cancellationToken).ConfigureAwait(false);
return await InvokeAsync<IUpgradeInfo>(Actions[Features.Upgrade], new { channel }, cancellationToken).ConfigureAwait(false);
}
#endregion
#region 下行通知
private TimerX? _timerPing;
private TimerX? _timerUpgrade;
/// <summary>开始心跳定时器</summary>
protected virtual void StartTimer()
{
if (_timerPing == null && (Features.HasFlag(Features.Ping) || Features.HasFlag(Features.Notify)))
{
lock (this)
{
// 稍微延迟首次心跳时间,确保鉴权已完成,便于快速建立WebSocket连接
_timerPing ??= new TimerX(DoPing, null, DateTime.Now.AddSeconds(3), 60_000) { Async = true };
}
}
if (_timerUpgrade == null && Features.HasFlag(Features.Upgrade))
{
lock (this)
{
_timerUpgrade ??= new TimerX(CheckUpgrade, null, DateTime.Now.AddSeconds(15), 600_000) { Async = true };
}
}
// 如果事件队列不为空,启动事件定时器
if (!_events.IsEmpty) InitEvent();
}
/// <summary>停止心跳定时器</summary>
protected virtual void StopTimer()
{
_timerPing.TryDispose();
_timerPing = null;
_timerUpgrade.TryDispose();
_timerUpgrade = null;
_eventTimer.TryDispose();
_eventTimer = null;
_ws.TryDispose();
_ws = null;
}
private async Task DoPing(Object state)
{
using var span = Tracer?.NewSpan(Name + "Ping");
try
{
await OnPing(state).ConfigureAwait(false);
}
catch (Exception ex)
{
span?.SetError(ex, null);
Log?.Debug("[{0}]{1}", Name, ex);
}
}
private WsChannel? _ws;
private Boolean _firstPingDone;
/// <summary>定时心跳。由心跳定时器调用,主要用于维护WebSocket长连接</summary>
/// <param name="state">定时器状态参数</param>
/// <returns></returns>
protected virtual async Task OnPing(Object state)
{
if (Features.HasFlag(Features.Ping)) await Ping().ConfigureAwait(false);
if (_client is ApiHttpClient http && Features.HasFlag(Features.Notify))
{
await ValidWebSocket(http).ConfigureAwait(false);
if (!_firstPingDone)
{
_firstPingDone = true;
_timerUpgrade?.SetNext(-1);
}
}
}
private async Task ValidWebSocket(ApiHttpClient http)
{
// 非NetCore平台,使用自研轻量级WebSocket
#if NETCOREAPP
_ws ??= new WsChannelCore(this);
#else
_ws ??= new WsChannel(this);
#endif
if (_ws != null) await _ws.ValidWebSocket(http).ConfigureAwait(false);
}
/// <summary>发送上行消息。借助WebSocket等上行通道</summary>
/// <param name="packet">数据包</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
public virtual async Task SendAsync(IPacket packet, CancellationToken cancellationToken = default)
{
if (_client is ApiHttpClient http)
{
await ValidWebSocket(http).ConfigureAwait(false);
await _ws!.SendTextAsync(packet, cancellationToken).ConfigureAwait(false);
}
else if (_client is ApiClient client)
{
client.InvokeOneWay("data", packet);
}
}
/// <summary>分发消息</summary>
/// <param name="data">数据包</param>
/// <param name="context">事件上下文。用于在发布者、订阅者及中间处理器之间传递协调数据,如 Handler、ClientId 等</param>
/// <param name="cancellationToken">取消通知</param>
/// <returns></returns>
public virtual async Task HandleAsync(IPacket data, IEventContext? context, CancellationToken cancellationToken)
{
if (data == null) throw new ArgumentNullException(nameof(data));
if (data[0] == '{')
{
var str = data.ToStr();
if (!str.IsNullOrEmpty())
{
var model = JsonHost.Read<CommandModel>(str, JsonOptions)!;
if (model != null && !model.Command.IsNullOrEmpty())
{
await ReceiveCommand(model, str, null, cancellationToken).ConfigureAwait(false);
}
else
throw new InvalidDataException("无效命令!");
}
}
}
/// <summary>接收命令,分发调用指定委托</summary>
/// <remarks>
/// 命令处理流程中,会对命令进行去重,避免重复执行。
/// 其次判断命令是否已经过期,如果已经过期则取消执行。
/// 还支持定时执行,延迟执行。
/// </remarks>
/// <param name="model">命令模型</param>
/// <param name="message">原始命令消息</param>
/// <param name="source">来源</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>命令回复模型</returns>
public virtual async Task<CommandReplyModel?> ReceiveCommand(CommandModel model, String? message, String? source, CancellationToken cancellationToken = default)
{
if (model == null) return null;
if (model.Command.IsNullOrEmpty()) return null;
// 去重,避免命令被重复执行
if (model.Id > 0 && !_cache.Add($"cmd:{model.Id}", model, 3600)) return null;
// 埋点,建立调用链
message ??= JsonHost.Write(model, JsonOptions);
using var span = Tracer?.NewSpan("cmd:" + model.Command, message);
if (!model.TraceId.IsNullOrEmpty()) span?.Detach(model.TraceId);
try
{
// 有效期判断前把UTC转为本地时间
var now = GetNow();
var expire = model.Expire.ToLocalTime();
WriteLog("[{0}] 收到命令: {1}", source, message);
if (expire.Year < 2000 || expire > now)
{
// 延迟执行
var startTime = model.StartTime.ToLocalTime();
var ts = startTime - now;
if (ts.TotalMilliseconds > 0)
{
//TimerX.Delay(s =>
//{
// _ = OnReceiveCommand(model, CancellationToken.None);
//}, (Int32)ts.TotalMilliseconds);
_ = Task.Run(async () =>
{
await TaskEx.Delay((Int32)ts.TotalMilliseconds).ConfigureAwait(false);
WriteLog("[{0}] 延迟执行: {1}", source, message);
await OnReceiveCommand(model, message, CancellationToken.None).ConfigureAwait(false);
}, cancellationToken);
var reply = new CommandReplyModel
{
Id = model.Id,
Status = CommandStatus.处理中,
Data = $"已安排计划执行 {startTime.ToFullString()}"
};
if (Features.HasFlag(Features.CommandReply))
await CommandReply(reply, cancellationToken).ConfigureAwait(false);
return reply;
}
else
return await OnReceiveCommand(model, message, cancellationToken).ConfigureAwait(false);
}
else
{
var reply = new CommandReplyModel { Id = model.Id, Status = CommandStatus.取消 };
if (Features.HasFlag(Features.CommandReply))
await CommandReply(reply, cancellationToken).ConfigureAwait(false);
return reply;
}
}
catch (Exception ex)
{
span?.SetError(ex, null);
}
return null;
}
/// <summary>触发收到命令的动作</summary>
/// <param name="model">命令</param>
/// <param name="message">原始命令消息</param>
/// <param name="cancellationToken">取消令牌</param>
protected virtual async Task<CommandReplyModel?> OnReceiveCommand(CommandModel model, String? message, CancellationToken cancellationToken)
{
var e = new CommandEventArgs { Model = model, Message = message };
Received?.Invoke(this, e);
model = e.Model;
var rs = await this.ExecuteCommand(model, cancellationToken).ConfigureAwait(false);
e.Reply ??= rs;
if (e.Reply != null && /*e.Reply.Id > 0 &&*/ Features.HasFlag(Features.CommandReply))
await CommandReply(e.Reply, cancellationToken).ConfigureAwait(false);
return e.Reply;
}
/// <summary>向命令引擎发送命令,触发指定已注册动作</summary>
/// <param name="command">命令</param>
/// <param name="argument">参数</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns>命令回复模型</returns>
public virtual Task<CommandReplyModel?> SendCommand(String command, String argument, CancellationToken cancellationToken = default) => OnReceiveCommand(new CommandModel { Command = command, Argument = argument }, null, cancellationToken);
/// <summary>上报命令调用结果</summary>
/// <param name="model">命令</param>
/// <param name="cancellationToken">取消令牌</param>
/// <returns></returns>
public virtual Task<Object?> CommandReply(CommandReplyModel model, CancellationToken cancellationToken = default) => InvokeAsync<Object>(Actions[Features.CommandReply], model, cancellationToken);
#endregion
#region 事件上报
private readonly ConcurrentQueue<EventModel> _events = new();
private readonly ConcurrentQueue<EventModel> _failEvents = new();
private TimerX? _eventTimer;
private String? _eventTraceId;
void InitEvent() => _eventTimer ??= new TimerX(DoPostEvent, null, 3_000, 60_000) { Async = true };
/// <summary>批量上报事件</summary>
/// <param name="events">事件模型数组</param>
/// <returns>上报成功数量</returns>
public virtual Task<Int32> PostEvents(params EventModel[] events) => InvokeAsync<Int32>(Actions[Features.PostEvent], events);
async Task DoPostEvent(Object state)
{
if (!NetworkInterface.GetIsNetworkAvailable()) return;
DefaultSpan.Current = null;
var tid = _eventTraceId;
_eventTraceId = null;
// 正常队列为空,异常队列有数据,给它一次机会
if (_events.IsEmpty && !_failEvents.IsEmpty)
{
while (_failEvents.TryDequeue(out var ev))
{
_events.Enqueue(ev);
}
}
while (!_events.IsEmpty)
{
var max = 100;
var list = new List<EventModel>();
while (_events.TryDequeue(out var model) && max-- > 0) list.Add(model);
using var span = Tracer?.NewSpan("PostEvent", list.Count);
if (tid != null) span?.Detach(tid);
try
{
if (list.Count > 0) await PostEvents(list.ToArray()).ConfigureAwait(false);
// 成功后读取本地缓存
while (_failEvents.TryDequeue(out var ev))
{
_events.Enqueue(ev);
}
}
catch (Exception ex)
{
span?.SetError(ex, null);
// 失败后进入本地缓存
foreach (var item in list)
{
_failEvents.Enqueue(item);
}
}
}
}
/// <summary>写事件</summary>
/// <param name="type">事件类型。info/alert/error等</param>
/// <param name="name">事件名称</param>
/// <param name="remark">事件内容</param>
/// <returns>是否成功加入事件队列</returns>
public virtual Boolean WriteEvent(String type, String name, String? remark)
{
// 如果没有事件上报功能,直接返回
if (!Features.HasFlag(Features.PostEvent)) return false;
// 使用时才创建定时器
if (Logined) InitEvent();
// 记录追踪标识,上报的时候带上,尽可能让源头和下游串联起来
_eventTraceId = DefaultSpan.Current?.TraceId;
// 获取相对于服务器的当前时间,避免两端时间差。转为UTC毫秒,作为事件时间。
var now = GetNow().ToUniversalTime();
var ev = new EventModel { Time = now.ToLong(), Type = type, Name = name, Remark = remark };
_events.Enqueue(ev);
_eventTimer?.SetNext(1000);
return true;
}
#endregion
#region 辅助
/// <summary>
/// 把Url相对路径格式化为绝对路径。常用于文件下载
/// </summary>
/// <param name="url">相对或绝对路径</param>
/// <returns>绝对路径</returns>
public virtual String BuildUrl(String url)
{
if (Client is ApiHttpClient client && !url.StartsWithIgnoreCase("http://", "https://"))
{
var svr = client.Services.FirstOrDefault(e => e.Name == client.Source) ?? client.Services.FirstOrDefault();
if (svr != null && svr.Address != null)
url = new Uri(svr.Address, url) + "";
}
return url;
}
/// <summary>从服务提供者(对象容器)创建模型对象</summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public virtual T? GetService<T>() where T : class => ServiceProvider?.GetService<T>() ?? ObjectContainer.Current.GetService<T>();
#endregion
#region 日志
/// <summary>链路追踪</summary>
public ITracer? Tracer { get; set; }
/// <summary>日志</summary>
public ILog Log { get; set; } = Logger.Null;
/// <summary>写日志</summary>
/// <param name="format">格式化字符串</param>
/// <param name="args">参数</param>
public void WriteLog(String format, params Object?[] args) => Log?.Info($"[{Name}]{format}", args);
#endregion
}
|