所有分支的提交所有分支的提交都要跑test都要跑test
大石头 authored at 2022-03-29 23:35:41
5.53 KiB
NewLife.RocketMQ
using System;
using System.ComponentModel;
using System.IO;
using System.Net;
using NewLife.Data;
using NewLife.RocketMQ.Protocol;
using Xunit;

namespace XUnitTestRocketMQ;

/// <summary>IPv6消息解码测试</summary>
public class IPv6Tests
{
    /// <summary>写入大端序Int32</summary>
    private static void WriteBigEndianInt32(MemoryStream ms, Int32 value)
    {
        ms.WriteByte((Byte)(value >> 24));
        ms.WriteByte((Byte)(value >> 16));
        ms.WriteByte((Byte)(value >> 8));
        ms.WriteByte((Byte)value);
    }

    /// <summary>写入大端序Int64</summary>
    private static void WriteBigEndianInt64(MemoryStream ms, Int64 value)
    {
        WriteBigEndianInt32(ms, (Int32)(value >> 32));
        WriteBigEndianInt32(ms, (Int32)value);
    }

    /// <summary>写入大端序Int16</summary>
    private static void WriteBigEndianInt16(MemoryStream ms, Int16 value)
    {
        ms.WriteByte((Byte)(value >> 8));
        ms.WriteByte((Byte)value);
    }

    /// <summary>构造消息的二进制数据</summary>
    private static Byte[] BuildMessageBinary(Boolean ipv6)
    {
        var ms = new MemoryStream();

        var ipBytes = ipv6 ? IPAddress.IPv6Loopback.GetAddressBytes() : new Byte[] { 127, 0, 0, 1 };
        // SysFlag: 第2位(0x04)标识IPv6
        var sysFlag = ipv6 ? 4 : 0;

        var body = "hello"u8.ToArray();
        var topic = "test_topic"u8.ToArray();
        var props = ""u8.ToArray();

        // 计算StoreSize
        var ipSize = ipv6 ? 16 : 4;
        // StoreSize(4) + MagicCode(4) + BodyCRC(4) + QueueId(4) + Flag(4) +
        // QueueOffset(8) + CommitLogOffset(8) + SysFlag(4) +
        // BornTimestamp(8) + BornIP(ipSize) + BornPort(4) +
        // StoreTimestamp(8) + StoreIP(ipSize) + StorePort(4) +
        // ReconsumeTimes(4) + PreparedTransactionOffset(8) +
        // BodyLen(4) + body + TopicLen(1) + topic + PropsLen(2) + props
        var storeSize = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 +
                        8 + ipSize + 4 +
                        8 + ipSize + 4 +
                        4 + 8 +
                        4 + body.Length + 1 + topic.Length + 2 + props.Length;

        WriteBigEndianInt32(ms, storeSize);  // StoreSize
        WriteBigEndianInt32(ms, 0);          // MagicCode
        WriteBigEndianInt32(ms, 0);          // BodyCRC
        WriteBigEndianInt32(ms, 1);          // QueueId
        WriteBigEndianInt32(ms, 0);          // Flag
        WriteBigEndianInt64(ms, 100L);       // QueueOffset
        WriteBigEndianInt64(ms, 200L);       // CommitLogOffset
        WriteBigEndianInt32(ms, sysFlag);    // SysFlag

        WriteBigEndianInt64(ms, 1000L);      // BornTimestamp
        ms.Write(ipBytes, 0, ipBytes.Length);// BornHost IP
        WriteBigEndianInt32(ms, 9876);       // BornHost Port

        WriteBigEndianInt64(ms, 2000L);      // StoreTimestamp
        ms.Write(ipBytes, 0, ipBytes.Length);// StoreHost IP
        WriteBigEndianInt32(ms, 10911);      // StoreHost Port

        WriteBigEndianInt32(ms, 0);          // ReconsumeTimes
        WriteBigEndianInt64(ms, 0L);         // PreparedTransactionOffset

        WriteBigEndianInt32(ms, body.Length); // BodyLength
        ms.Write(body, 0, body.Length);

        ms.WriteByte((Byte)topic.Length);    // Topic length (1 byte)
        ms.Write(topic, 0, topic.Length);

        WriteBigEndianInt16(ms, (Int16)props.Length); // Properties length (2 bytes)
        if (props.Length > 0) ms.Write(props, 0, props.Length);

        return ms.ToArray();
    }

    [Fact]
    [DisplayName("IPv4消息正确解码")]
    public void ReadMessage_IPv4()
    {
        var data = BuildMessageBinary(false);
        var pk = new ArrayPacket(data);
        var msgs = MessageExt.ReadAll(pk);

        Assert.Single(msgs);
        var msg = msgs[0];
        Assert.Equal(1, msg.QueueId);
        Assert.Equal(100, msg.QueueOffset);
        Assert.Equal(200, msg.CommitLogOffset);
        Assert.Equal(0, msg.SysFlag & 4); // 不是IPv6
        Assert.StartsWith("127.0.0.1:", msg.BornHost);
        Assert.StartsWith("127.0.0.1:", msg.StoreHost);
        Assert.Contains("9876", msg.BornHost);
        Assert.Contains("10911", msg.StoreHost);
        Assert.Equal("hello", msg.BodyString);
        Assert.Equal("test_topic", msg.Topic);
        // IPv4 MsgId应该是32个hex字符(16字节)
        Assert.Equal(32, msg.MsgId.Length);
    }

    [Fact]
    [DisplayName("IPv6消息正确解码")]
    public void ReadMessage_IPv6()
    {
        var data = BuildMessageBinary(true);
        var pk = new ArrayPacket(data);
        var msgs = MessageExt.ReadAll(pk);

        Assert.Single(msgs);
        var msg = msgs[0];
        Assert.Equal(1, msg.QueueId);
        Assert.Equal(100, msg.QueueOffset);
        Assert.Equal(200, msg.CommitLogOffset);
        Assert.NotEqual(0, msg.SysFlag & 4); // 是IPv6
        Assert.Contains("::1", msg.BornHost);
        Assert.Contains("::1", msg.StoreHost);
        Assert.Contains("9876", msg.BornHost);
        Assert.Contains("10911", msg.StoreHost);
        Assert.Equal("hello", msg.BodyString);
        Assert.Equal("test_topic", msg.Topic);
        // IPv6 MsgId应该是56个hex字符(28字节)
        Assert.Equal(56, msg.MsgId.Length);
    }

    [Fact]
    [DisplayName("SysFlag第2位判断IPv6")]
    public void SysFlag_IPv6_Bit()
    {
        // SysFlag=0 -> IPv4
        Assert.Equal(0, 0 & 4);

        // SysFlag=4 -> IPv6
        Assert.NotEqual(0, 4 & 4);

        // SysFlag=5(压缩+IPv6) -> IPv6
        Assert.NotEqual(0, 5 & 4);

        // SysFlag=1(仅压缩) -> IPv4
        Assert.Equal(0, 1 & 4);
    }
}