v2.6 使用v11核心库的IPacket
大石头 authored at 2024-09-17 10:15:30
4.42 KiB
NewLife.RocketMQ
using System.Net;
using NewLife.Collections;
using NewLife.Data;
using NewLife.Serialization;

namespace NewLife.RocketMQ.Protocol;

/// <summary>消息扩展</summary>
public class MessageExt : Message, IAccessor
{
    #region 属性
    /// <summary>队列编号</summary>
    public Int32 QueueId { get; set; }

    /// <summary>存储大小</summary>
    public Int32 StoreSize { get; set; }

    /// <summary>CRC校验</summary>
    public Int32 BodyCRC { get; set; }

    /// <summary>队列偏移</summary>
    public Int64 QueueOffset { get; set; }

    /// <summary>提交日志偏移</summary>
    public Int64 CommitLogOffset { get; set; }

    /// <summary>系统标记</summary>
    public Int32 SysFlag { get; set; }

    /// <summary>生产时间</summary>
    public Int64 BornTimestamp { get; set; }

    /// <summary>生产主机</summary>
    public String BornHost { get; set; }

    /// <summary>存储时间</summary>
    public Int64 StoreTimestamp { get; set; }

    /// <summary>存储主机</summary>
    public String StoreHost { get; set; }

    /// <summary>重新消费次数</summary>
    public Int32 ReconsumeTimes { get; set; }

    /// <summary>准备事务偏移</summary>
    public Int64 PreparedTransactionOffset { get; set; }

    /// <summary>属性</summary>
    public IDictionary<String, String> Properties { get; set; }

    /// <summary>消息编号</summary>
    public String MsgId { get; set; }
    #endregion

    #region 构造
    /// <summary>友好字符串</summary>
    /// <returns></returns>
    public override String ToString() => $"[{CommitLogOffset}]{base.ToString()}";
    #endregion

    #region 读写
    /// <summary>从数据流中读取</summary>
    /// <param name="stream"></param>
    /// <param name="context"></param>
    /// <returns></returns>
    public Boolean Read(Stream stream, Object context = null)
    {
        var bn = context as Binary;

        // 读取
        StoreSize = bn.Read<Int32>();
        if (StoreSize <= 0) return false;

        var n = bn.Read<Int32>();
        BodyCRC = bn.Read<Int32>();
        QueueId = bn.Read<Int32>();
        Flag = bn.Read<Int32>();
        QueueOffset = bn.Read<Int64>();
        CommitLogOffset = bn.Read<Int64>();
        SysFlag = bn.Read<Int32>();

        BornTimestamp = bn.Read<Int64>();
        var buf = bn.ReadBytes(4);
        var ip = new IPAddress(buf);
        var port = bn.Read<Int32>();
        BornHost = $"{ip}:{port}";

        StoreTimestamp = bn.Read<Int64>();
        var buf2 = bn.ReadBytes(4);
        var ip2 = new IPAddress(buf2);
        var port2 = bn.Read<Int32>();
        StoreHost = $"{ip2}:{port2}";

        ReconsumeTimes = bn.Read<Int32>();
        PreparedTransactionOffset = bn.Read<Int64>();

        // 主体
        var len = bn.Read<Int32>();
        Body = bn.ReadBytes(len);
        if ((SysFlag & 1) == 1)
        {
            /*uncompress*/
            // ZLIB格式RFC1950,要去掉头部两个字节
            Body = Body.ReadBytes(2, -1).Decompress();
            //var gs = new MemoryStream(Body);
            //Body = gs.DecompressGZip().ReadBytes();
        }

        // 主题
        len = bn.Read<Byte>();
        Topic = bn.ReadBytes(len).ToStr();

        var len2 = bn.Read<Int16>();
        var str = bn.ReadBytes(len2).ToStr();
        var dic = ParseProperties(str);
        if (dic != null && dic.Count > 0) Properties = dic;

        // MsgId
        var ms = Pool.MemoryStream.Get();
        ms.Write(buf);
        ms.Write(port.GetBytes(false));
        ms.Write(CommitLogOffset.GetBytes(false));

        MsgId = ms.Put(true).ToHex(0, 16);

        return true;
    }

    /// <summary>读取所有消息</summary>
    /// <param name="body"></param>
    /// <returns></returns>
    public static IList<MessageExt> ReadAll(IPacket body)
    {
        //var ms = new MemoryStream(body);
        var ms = body.GetStream();
        var bn = new Binary
        {
            Stream = ms,
            IsLittleEndian = false,
        };

        var list = new List<MessageExt>();
        while (ms.Position < ms.Length)
        {
            var msg = new MessageExt();
            if (!msg.Read(ms, bn)) break;

            list.Add(msg);
        }

        return list;
    }

    /// <summary>写入命令到数据流</summary>
    /// <param name="stream"></param>
    /// <param name="context"></param>
    /// <returns></returns>
    public Boolean Write(Stream stream, Object context = null) => true;
    #endregion
}