RPC远程过程调用,二进制封装,提供高吞吐低延迟的高性能RPC框架
大石头 authored at 2022-08-10 13:26:19
7.66 KiB
NewLife.Remoting
using System.Text;
using NewLife.Buffers;
using NewLife.Data;
using NewLife.Log;
using NewLife.Messaging;

namespace NewLife.Remoting;

/// <summary>编码器</summary>
public interface IEncoder
{
    ///// <summary>编码 请求/响应</summary>
    ///// <param name="action"></param>
    ///// <param name="code"></param>
    ///// <param name="value"></param>
    ///// <returns></returns>
    //Packet Encode(String action, Int32 code, Packet value);

    /// <summary>创建请求</summary>
    /// <param name="action"></param>
    /// <param name="args"></param>
    /// <returns></returns>
    IMessage CreateRequest(String action, Object? args);

    /// <summary>创建响应</summary>
    /// <param name="msg"></param>
    /// <param name="action"></param>
    /// <param name="code"></param>
    /// <param name="value"></param>
    /// <returns></returns>
    IMessage CreateResponse(IMessage msg, String action, Int32 code, Object? value);

    /// <summary>解码 请求/响应</summary>
    /// <param name="msg">消息</param>
    /// <returns>请求响应报文</returns>
    ApiMessage? Decode(IMessage msg);

    ///// <summary>编码 请求/响应</summary>
    ///// <param name="action">服务动作</param>
    ///// <param name="code">错误码</param>
    ///// <param name="value">参数或结果</param>
    ///// <returns></returns>
    //Packet Encode(String action, Int32 code, Object value);

    /// <summary>解码参数</summary>
    /// <param name="action">动作</param>
    /// <param name="data">数据</param>
    /// <param name="msg">消息</param>
    /// <returns></returns>
    Object? DecodeParameters(String action, IPacket? data, IMessage msg);

    /// <summary>解码结果</summary>
    /// <param name="action"></param>
    /// <param name="data"></param>
    /// <param name="msg">消息</param>
    /// <param name="returnType">返回类型</param>
    /// <returns></returns>
    Object? DecodeResult(String action, IPacket data, IMessage msg, Type returnType);

    /// <summary>转换为目标类型</summary>
    /// <param name="obj"></param>
    /// <param name="targetType"></param>
    /// <returns></returns>
    Object? Convert(Object obj, Type targetType);

    /// <summary>日志提供者</summary>
    ILog Log { get; set; }

    /// <summary>创建流式响应帧</summary>
    /// <param name="msg">原始请求消息</param>
    /// <param name="action">动作名称</param>
    /// <param name="code">错误码</param>
    /// <param name="value">流数据块</param>
    /// <param name="isLast">是否最后一帧</param>
    /// <returns></returns>
    IMessage CreateStreamResponse(IMessage msg, String action, Int32 code, Object? value, Boolean isLast);
}

/// <summary>编码器基类</summary>
public abstract class EncoderBase
{
    #region 编码/解码
    /// <summary>编码请求/响应为数据包</summary>
    /// <param name="action">动作名称</param>
    /// <param name="code">错误码。null 或 0/200 表示成功</param>
    /// <param name="value">负载数据。若为 IOwnerPacket,将挂载到返回包的 Next 链,所有权随之转移</param>
    /// <returns>编码后的数据包,头部为 OwnerPacket,value 作为 Next 链节点</returns>
    /// <remarks>
    /// 返回的 IPacket 链结构:[OwnerPacket(header: action+code+len)] → [value(IOwnerPacket)]。
    /// 调用方无需单独释放 value,它的生命周期由链头的 OwnerPacket.Dispose 级联管理。
    /// </remarks>
    public virtual IPacket Encode(String action, Int32? code, IPacket? value)
    {
        // 内存流,前面留空8字节用于协议头4字节(超长8字节)
        //var ms = new MemoryStream();
        //ms.Seek(8, SeekOrigin.Begin);

        var len = 8 + 1 + Encoding.UTF8.GetByteCount(action) + 4;
        if (code != null && code.Value is not ApiCode.Ok and not 200) len += 4;
        var pk = new OwnerPacket(len);

        // 请求:action + args
        // 响应:action + code + result
        var writer = new SpanWriter(pk.GetSpan());
        writer.Advance(8);
        writer.Write(action);

        // 异常响应才有code。定长4字节
        if (code != null && code.Value is not ApiCode.Ok and not 200) writer.Write(code.Value);

        // 参数或结果。长度部分定长4字节
        if (value != null) writer.Write(value.Total);

        // Slice(transferOwner:true) 将缓冲区所有权从 pk 转移到 pk2
        var pk2 = pk.Slice(8, writer.Position - 8, true);
        // value(可能是 IOwnerPacket)挂载到 Next 链,所有权随链传递
        if (value != null) pk2.Next = value;

        return pk2;
    }

    /// <summary>解码 请求/响应</summary>
    /// <param name="msg">消息</param>
    /// <returns>请求响应报文</returns>
    public virtual ApiMessage? Decode(IMessage msg)
    {
        var message = new ApiMessage();

        // 请求:action + args
        // 响应:action + code + result
        //var ms = msg.Payload!.GetStream();
        var reader = new SpanReader(msg.Payload!.GetSpan());

        message.Action = reader.ReadString();
        // 空 action 视为无效帧(如流式结束标记),返回 null 让上层检测
        if (message.Action.IsNullOrEmpty()) return null;

        // 异常响应才有code
        if (msg.Reply && msg.Error) message.Code = reader.ReadInt32();

        // 参数或结果
        if (reader.Available > 0)
        {
            var len = reader.ReadInt32();
            if (len > 0) message.Data = msg.Payload.Slice(reader.Position, len);
        }

        return message;
    }
    #endregion

    #region 流式编码
    /// <summary>编码流式数据块为数据包</summary>
    /// <param name="action">动作名称</param>
    /// <param name="code">错误码</param>
    /// <param name="value">流数据块。若为 IOwnerPacket,将挂载到返回包的 Next 链</param>
    /// <param name="isLast">是否最后一帧</param>
    /// <returns>编码后的数据包,isLast=true且value=null时返回空包标记结束</returns>
    public virtual IPacket EncodeStreamChunk(String action, Int32? code, IPacket? value, Boolean isLast)
    {
        if (isLast && value == null)
        {
            // 末帧空包:保留 action 名 + 无数据体,客户端通过 Data==null 检测结束
            var endPk = new OwnerPacket(8 + 1 + Encoding.UTF8.GetByteCount(action));
            var endWriter = new SpanWriter(endPk.GetSpan());
            endWriter.Advance(8);
            endWriter.Write(action);
            return endPk.Slice(8, endWriter.Position - 8, true);
        }

        var len = 8 + 1 + Encoding.UTF8.GetByteCount(action);
        if (code != null && code.Value is not ApiCode.Ok and not 200) len += 4;
        if (value != null) len += 4; // 数据长度字段固定4字节
        var pk = new OwnerPacket(len);

        var writer = new SpanWriter(pk.GetSpan());
        writer.Advance(8);
        writer.Write(action);

        if (code != null && code.Value is not ApiCode.Ok and not 200) writer.Write(code.Value);
        if (value != null) writer.Write(value.Total);

        var pk2 = pk.Slice(8, writer.Position - 8, true);
        if (value != null) pk2.Next = value;

        return pk2;
    }
    #endregion

    #region 日志
    /// <summary>日志提供者</summary>
    public ILog Log { get; set; } = Logger.Null;

    /// <summary>是否启用编码器日志</summary>
    internal Boolean LogEnable => Log != null && Log != Logger.Null;

    /// <summary>写日志</summary>
    /// <param name="format"></param>
    /// <param name="args"></param>
    public virtual void WriteLog(String format, params Object?[] args)
    {
        if (Log != null && Log != Logger.Null) Log.Info(format, args);
    }
    #endregion
}