using NewLife.Collections;
using NewLife.Data;
using NewLife.Log;
using NewLife.Messaging;
using NewLife.Net;
using NewLife.Threading;
#if !NET40
using TaskEx = System.Threading.Tasks.Task;
#endif
namespace NewLife.Remoting;
/// <summary>应用接口客户端</summary>
/// <remarks>
/// 保持到服务端直接的长连接RPC通信。
/// 常用于向服务端发送请求并接收应答,也可以接收服务端主动发送的单向消息。
///
/// 文档 https://newlifex.com/core/srmp
/// </remarks>
public class ApiClient : ApiHost, IApiClient
{
#region 属性
/// <summary>是否已打开</summary>
public Boolean Active { get; protected set; }
/// <summary>服务端地址集合。负载均衡</summary>
public String[]? Servers { get; set; }
/// <summary>客户端连接集群</summary>
public ICluster<String, ISocketClient>? Cluster { get; set; }
/// <summary>是否使用连接池。true时建立多个到服务端的连接(高吞吐),默认false使用单一连接(低延迟)</summary>
public Boolean UsePool { get; set; }
/// <summary>令牌。每次请求携带</summary>
public String? Token { get; set; }
/// <summary>最后活跃时间</summary>
public DateTime LastActive { get; set; }
/// <summary>收到请求或响应时</summary>
public event EventHandler<ApiReceivedEventArgs>? Received;
/// <summary>调用统计</summary>
public ICounter? StatInvoke { get; set; }
#endregion
#region 构造
/// <summary>实例化应用接口客户端</summary>
public ApiClient()
{
var type = GetType();
Name = type.GetDisplayName() ?? type.Name.TrimEnd("Client");
}
/// <summary>实例化应用接口客户端</summary>
/// <param name="uris">服务端地址集合,逗号分隔</param>
public ApiClient(String uris) : this()
{
if (!uris.IsNullOrEmpty()) Servers = uris.Split(",", ";");
}
/// <summary>销毁</summary>
/// <param name="disposing"></param>
protected override void Dispose(Boolean disposing)
{
base.Dispose(disposing);
_Timer.TryDispose();
Close(Name + (disposing ? "Dispose" : "GC"));
}
#endregion
#region 打开关闭
/// <summary>打开客户端</summary>
public virtual Boolean Open()
{
if (Active) return true;
lock (this)
{
if (Active) return true;
var ss = Servers;
if (ss == null || ss.Length == 0) throw new ArgumentNullException(nameof(Servers), "未指定服务端地址");
Encoder ??= new JsonEncoder();
// 集群
Cluster = InitCluster();
WriteLog("集群:{0}", Cluster);
Encoder.Log = EncoderLog;
// 控制性能统计信息
var ms = StatPeriod * 1000;
if (ms > 0)
{
_Timer = new TimerX(DoWork, null, ms, ms) { Async = true };
}
return Active = true;
}
}
/// <summary>关闭</summary>
/// <param name="reason">关闭原因。便于日志分析</param>
/// <returns>是否成功</returns>
public virtual void Close(String reason)
{
if (!Active) return;
Cluster?.Close(reason ?? (GetType().Name + "Close"));
Active = false;
}
/// <summary>初始化集群</summary>
protected virtual ICluster<String, ISocketClient> InitCluster()
{
var cluster = Cluster;
cluster ??= UsePool ?
new ClientPoolCluster { Log = Log } :
new ClientSingleCluster { Log = Log };
if (cluster is ClientSingleCluster sc && sc.OnCreate == null) sc.OnCreate = OnCreate;
if (cluster is ClientPoolCluster pc && pc.OnCreate == null) pc.OnCreate = OnCreate;
cluster.GetItems ??= () => Servers ?? new String[0];
cluster.Open();
return cluster;
}
#endregion
#region 远程调用
/// <summary>异步调用,等待返回结果</summary>
/// <typeparam name="TResult">返回类型</typeparam>
/// <param name="action">服务操作</param>
/// <param name="args">参数</param>
/// <param name="cancellationToken">取消通知</param>
/// <returns></returns>
public virtual async Task<TResult?> InvokeAsync<TResult>(String action, Object? args = null, CancellationToken cancellationToken = default)
{
// 让上层异步到这直接返回,后续代码在另一个线程执行
//!!! Task.Yield会导致强制捕获上下文,虽然会在另一个线程执行,但在UI线程中可能无法抢占上下文导致死锁
//await Task.Yield();
Open();
if (Cluster == null) throw new ArgumentNullException(nameof(Cluster));
var act = action;
var client = Cluster.Get();
try
{
return await InvokeWithClientAsync<TResult>(client, act, args, 0, cancellationToken).ConfigureAwait(false);
}
catch (ApiException ex)
{
// 这个连接没有鉴权,重新登录后再次调用
if (ex.Code == 401)
{
//await Cluster.InvokeAsync(client => OnLoginAsync(client, true)).ConfigureAwait(false);
await OnLoginAsync(client, true).ConfigureAwait(false);
return await InvokeWithClientAsync<TResult>(client, act, args, 0, cancellationToken).ConfigureAwait(false);
}
throw;
}
// 截断任务取消异常,避免过长
catch (TaskCanceledException)
{
throw new TaskCanceledException($"[{act}]超时[{Timeout:n0}ms]取消");
}
finally
{
Cluster.Put(client);
}
}
/// <summary>同步调用,阻塞等待</summary>
/// <param name="action">服务操作</param>
/// <param name="args">参数</param>
/// <returns></returns>
public virtual TResult? Invoke<TResult>(String action, Object? args = null) => TaskEx.Run(() => InvokeAsync<TResult>(action, args)).Result;
/// <summary>单向发送。同步调用,不等待返回</summary>
/// <param name="action">服务操作</param>
/// <param name="args">参数</param>
/// <param name="flag">标识</param>
/// <returns></returns>
public virtual Int32 InvokeOneWay(String action, Object? args = null, Byte flag = 0)
{
if (!Open()) return -1;
if (Cluster == null) throw new ArgumentNullException(nameof(Cluster));
return Cluster.Invoke(client => InvokeWithClient(client, action, args, flag));
}
/// <summary>指定客户端的异步调用,等待返回结果</summary>
/// <remarks>常用于在OnLoginAsync中实现连接后登录功能</remarks>
/// <typeparam name="TResult"></typeparam>
/// <param name="client">客户端</param>
/// <param name="action">服务操作</param>
/// <param name="args">参数</param>
/// <param name="flag">标识</param>
/// <param name="cancellationToken">取消通知</param>
/// <returns></returns>
public virtual async Task<TResult?> InvokeWithClientAsync<TResult>(ISocketClient client, String action, Object? args = null, Byte flag = 0, CancellationToken cancellationToken = default)
{
// 性能计数器,次数、TPS、平均耗时
var st = StatInvoke;
var sw = st.StartCount();
LastActive = DateTime.Now;
// 令牌
if (!Token.IsNullOrEmpty() && args != null)
{
var dic = args.ToDictionary();
if (!dic.ContainsKey("Token")) dic["Token"] = Token;
args = dic;
}
// 埋点,注入traceParent到参数集合
var span = Tracer?.NewSpan("rpc:" + action, args);
if (args != null && span != null) args = span.Attach(args);
// 编码请求,构造消息
var enc = Encoder;
var msg = enc.CreateRequest(action, args);
if (flag > 0 && msg is DefaultMessage dm) dm.Flag = flag;
var invoker = client.Remote + "";
IMessage? rs = null;
try
{
// 发起异步请求,等待返回
rs = (await client.SendMessageAsync(msg, cancellationToken).ConfigureAwait(false)) as IMessage;
if (rs == null) return default;
}
catch (AggregateException aggex)
{
var ex = aggex.GetTrue();
// 跟踪异常
span?.SetError(ex, args);
if (ex is TaskCanceledException)
{
throw new TimeoutException($"请求[{action}]超时({msg})!", ex);
}
throw;
}
catch (TaskCanceledException ex)
{
span?.SetError(ex, args);
throw new TimeoutException($"请求[{action}]超时({msg})!", ex);
}
catch (Exception ex)
{
span?.SetError(ex, args);
throw;
}
finally
{
var msCost = st.StopCount(sw) / 1000;
if (SlowTrace > 0 && msCost >= SlowTrace) WriteLog($"慢调用[{action}]({msg}),耗时{msCost:n0}ms");
span?.Dispose();
}
// 特殊返回类型
var resultType = typeof(TResult);
if (resultType == typeof(IMessage)) return (TResult)rs;
//if (resultType == typeof(Packet)) return rs.Payload;
if (rs.Payload == null) return default;
// 解码响应得到SRMP报文
var message = enc.Decode(rs) ?? throw new InvalidOperationException();
// 是否成功
if (message.Code is not 0 and not 200)
throw new ApiException(message.Code, message.Data?.ToStr().Trim('\"') ?? "") { Source = invoker + "/" + action };
if (message.Data == null) return default;
if (resultType == typeof(Packet)) return (TResult)(Object)message.Data;
// 解码结果
var result = enc.DecodeResult(action, message.Data, rs, resultType);
if (result == null) return default;
if (resultType == typeof(Object)) return (TResult)result;
// 返回
return (TResult?)enc.Convert(result, resultType);
}
/// <summary>单向调用,不等待返回</summary>
/// <param name="client"></param>
/// <param name="action">服务操作</param>
/// <param name="args">参数</param>
/// <param name="flag">标识</param>
/// <returns></returns>
public Int32 InvokeWithClient(ISocketClient client, String action, Object? args, Byte flag = 0)
{
if (client == null) return -1;
// 性能计数器,次数、TPS、平均耗时
var st = StatInvoke;
using var span = Tracer?.NewSpan("rpc:" + action, args);
if (args != null && span != null) args = span.Attach(args);
// 编码请求
var msg = Encoder.CreateRequest(action, args);
if (msg is DefaultMessage dm)
{
dm.OneWay = true;
if (flag > 0) dm.Flag = flag;
}
var sw = st.StartCount();
try
{
return client.SendMessage(msg);
}
catch (Exception ex)
{
// 跟踪异常
span?.SetError(ex, args);
throw;
}
finally
{
var msCost = st.StopCount(sw) / 1000;
if (SlowTrace > 0 && msCost >= SlowTrace) WriteLog($"慢调用[{action}],耗时{msCost:n0}ms");
}
}
#endregion
#region 异步接收
/// <summary>客户端收到服务端主动下发消息</summary>
/// <param name="message"></param>
/// <param name="e"></param>
protected virtual void OnReceive(IMessage message, ApiReceivedEventArgs e) => Received?.Invoke(this, e);
private void Client_Received(Object sender, ReceivedEventArgs e)
{
LastActive = DateTime.Now;
// Api解码消息得到Action和参数
if (e.Message is not IMessage msg) return;
var apiMessage = Encoder.Decode(msg);
var e2 = new ApiReceivedEventArgs
{
Remote = sender as ISocketRemote,
Message = msg,
ApiMessage = apiMessage,
UserState = e,
};
OnReceive(msg, e2);
}
#endregion
#region 登录
/// <summary>新会话。客户端每次连接或断线重连后,可用InvokeWithClientAsync做登录</summary>
/// <param name="client">会话</param>
public virtual void OnNewSession(ISocketClient client) => OnLoginAsync(client, true)?.Wait();
/// <summary>连接后自动登录</summary>
/// <param name="client">客户端</param>
/// <param name="force">强制登录</param>
protected virtual Task<Object> OnLoginAsync(ISocketClient client, Boolean force) => TaskEx.FromResult<Object>(0);
/// <summary>登录</summary>
/// <returns></returns>
public virtual async Task<Object> LoginAsync()
{
if (Cluster == null) throw new ArgumentNullException(nameof(Cluster));
return await Cluster.InvokeAsync(client => OnLoginAsync(client, false)).ConfigureAwait(false);
}
#endregion
#region 连接池
/// <summary>创建客户端之后,打开连接之前</summary>
/// <param name="svr"></param>
protected virtual ISocketClient OnCreate(String svr)
{
var client = new NetUri(svr).CreateRemote();
// 网络层采用消息层超时
client.Timeout = Timeout;
client.Tracer = Tracer;
client.Add(GetMessageCodec());
client.Opened += (s, e) => OnNewSession((s as ISocketClient)!);
client.Received += Client_Received;
return client;
}
#endregion
#region 统计
private TimerX? _Timer;
private String? _Last;
/// <summary>显示统计信息的周期。默认600秒,0表示不显示统计信息</summary>
public Int32 StatPeriod { get; set; } = 600;
private void DoWork(Object? state)
{
var sb = Pool.StringBuilder.Get();
var pf1 = StatInvoke;
if (pf1 != null && pf1.Value > 0) sb.AppendFormat("请求:{0} ", pf1);
var msg = sb.Put(true);
if (msg.IsNullOrEmpty() || msg == _Last) return;
_Last = msg;
WriteLog(msg);
}
#endregion
}
|