[feat] 增加阿里云OpenAPI通用客户端
智能大石头 authored at 2025-12-26 15:01:16
5.82 KiB
NewLife.Remoting
using System.Net;
using System.Net.WebSockets;
using NewLife.Data;
using NewLife.Log;
using NewLife.Messaging;
using NewLife.Remoting.Models;
using NewLife.Remoting.Services;
using NewLife.Security;
using NewLife.Serialization;

namespace NewLife.Remoting.Extensions.Services;

/// <summary>WebSocket设备会话</summary>
public class WsCommandSession(WebSocket socket) : CommandSession
{
    /// <summary>是否活动中</summary>
    public override Boolean Active => socket != null && socket.State == WebSocketState.Open;

    /// <summary>数据包分发器。用于EventHub</summary>
    public IEventDispatcher<IPacket>? Dispatcher { get; set; }

    /// <summary>服务提供者</summary>
    public IServiceProvider? ServiceProvider { get; set; }

    private CancellationTokenSource? _source;

    /// <summary>销毁</summary>
    protected override void Dispose(Boolean disposing)
    {
        base.Dispose(disposing);

        try
        {
            _source?.Cancel();

            if (socket != null && socket.State == WebSocketState.Open)
                socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Dispose", default);
        }
        catch { }
        finally
        {
            socket?.Dispose();
        }
    }

    /// <summary>处理事件消息,通过WebSocket向下发送</summary>
    /// <param name="command">命令模型</param>
    /// <param name="message">原始命令消息</param>
    /// <param name="cancellationToken">取消令牌</param>
    /// <returns></returns>
    public override Task HandleAsync(CommandModel command, String? message, CancellationToken cancellationToken)
    {
        //message ??= command.ToJson();
        if (message == null && command != null)
        {
            var jsonHost = ServiceProvider?.GetService<IJsonHost>();
            if (jsonHost != null)
                message = jsonHost.Write(command);
            else
                message = command.ToJson();
        }

        return socket.SendAsync(message.GetBytes(), WebSocketMessageType.Text, true, cancellationToken);
    }

    /// <summary>阻塞WebSocket,等待连接结束</summary>  
    /// <param name="context">上下文</param>
    /// <param name="span">埋点</param>  
    /// <param name="cancellationToken">取消令牌</param>  
    /// <returns></returns>  
    public virtual async Task WaitAsync(HttpContext context, ISpan? span, CancellationToken cancellationToken)
    {
        var ip = context.GetUserHost();
        var sid = Rand.Next();
        var connection = context.Connection;
        var address = connection.RemoteIpAddress ?? IPAddress.Loopback;
        if (address.IsIPv4MappedToIPv6) address = address.MapToIPv4();
        var remote = new IPEndPoint(address, connection.RemotePort);
        Log?.WriteLog("WebSocket连接", true, $"State={socket.State} sid={sid} Remote={remote}");

        // 长连接上线  
        SetOnline?.Invoke(true);

        // 即将进入阻塞等待,结束埋点
        span?.TryDispose();

        // 链接取消令牌。当客户端断开时,触发取消,结束长连接  
        using var source = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
        _source = source;

        var buf = new Byte[64 * 1024];
        while (!source.IsCancellationRequested && socket.State == WebSocketState.Open)
        {
            // try-catch 放在循环内,避免单次异常退出循环
            try
            {
                var data = await socket.ReceiveAsync(new ArraySegment<Byte>(buf), source.Token).ConfigureAwait(false);
                if (data.MessageType == WebSocketMessageType.Close) break;
                if (data.MessageType is WebSocketMessageType.Text or WebSocketMessageType.Binary)
                {
                    using var span2 = Tracer?.NewSpan("cmd:Ws.Receive", $"[{data.MessageType}]{remote}", data.Count);

                    var pk = new ArrayPacket(buf, 0, data.Count);
                    await OnReceive(pk, source.Token);
                }
            }
            catch (ThreadAbortException) { break; }
            catch (ThreadInterruptedException) { break; }
            catch (TaskCanceledException) { }
            catch (OperationCanceledException) { }
            catch (WebSocketException ex)
            {
                if (source.IsCancellationRequested) break;

                Log?.WriteLog("WebSocket异常", false, ex.Message);
                break;
            }
            catch (Exception ex)
            {
                if (source.IsCancellationRequested) break;

                XTrace.WriteException(ex);
            }
        }

        // 通知取消
        try
        {
            if (!source.IsCancellationRequested) source.Cancel();
        }
        catch (ObjectDisposedException) { }
        _source = null;

        if (socket.State == WebSocketState.Open)
            await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "finish", default).ConfigureAwait(false);

        Log?.WriteLog("WebSocket断开", true, $"State={socket.State} CloseStatus={socket.CloseStatus} sid={sid} Remote={remote}");

        // 长连接下线  
        SetOnline?.Invoke(false);
    }

    /// <summary>收到客户端主动上发数据</summary>
    /// <param name="data">数据包</param>
    /// <param name="cancellationToken">取消通知</param>
    /// <returns></returns>
    protected async Task OnReceive(IPacket data, CancellationToken cancellationToken)
    {
        if (data.Total == 4 && data.ToStr() == "Ping")
        {
            // 长连接上线。可能客户端心跳已经停了,WS还在,这里重新上线  
            SetOnline?.Invoke(true);

            await socket.SendAsync("Pong".GetBytes(), WebSocketMessageType.Text, true, cancellationToken);
        }
        else if (Dispatcher != null)
        {
            await Dispatcher.DispatchAsync(data, cancellationToken);
        }
    }
}