所有分支的提交所有分支的提交都要跑test都要跑test
大石头 authored at 2022-03-29 23:35:41
9.04 KiB
NewLife.RocketMQ
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.IO;
using System.Text;
using NewLife;
using NewLife.Data;
using NewLife.RocketMQ;
using NewLife.RocketMQ.Protocol;
using Xunit;

namespace XUnitTestRocketMQ;

/// <summary>批量消息发送测试</summary>
[Collection("Basic")]
public class BatchMessageTests
{
    [Fact]
    [DisplayName("批量消息发送_消息列表为空时抛出异常")]
    public void PublishBatch_EmptyList_ThrowsException()
    {
        using var mq = new Producer { Topic = "nx_test" };

        Assert.Throws<ArgumentException>(() => mq.PublishBatch(new List<Message>()));
    }

    [Fact]
    [DisplayName("批量消息发送_消息列表为null时抛出异常")]
    public void PublishBatch_NullList_ThrowsException()
    {
        using var mq = new Producer { Topic = "nx_test" };

        Assert.Throws<ArgumentException>(() => mq.PublishBatch((IList<Message>)null));
    }

    [Fact]
    [DisplayName("批量消息字符串重载_空列表抛出异常")]
    public void PublishBatch_StringOverload_EmptyList_ThrowsException()
    {
        using var mq = new Producer { Topic = "nx_test" };

        Assert.Throws<ArgumentException>(() => mq.PublishBatch(new List<String>()));
    }

    [Fact(Skip = "需要RocketMQ服务器")]
    [DisplayName("批量消息发送_发送多条消息")]
    public void PublishBatch_SendMultipleMessages()
    {
        var set = BasicTest.GetConfig();
        using var mq = new Producer
        {
            Topic = "nx_test",
            NameServerAddress = set.NameServer,
        };
        mq.Start();

        var messages = new List<Message>();
        for (var i = 0; i < 5; i++)
        {
            var msg = new Message();
            msg.SetBody($"批量消息{i}");
            msg.Tags = "TagBatch";
            messages.Add(msg);
        }

        var result = mq.PublishBatch(messages);
        Assert.NotNull(result);
        Assert.Equal(SendStatus.SendOK, result.Status);
    }

    #region 批量消息解码辅助方法
    /// <summary>写入大端序Int32</summary>
    private static void WriteBE32(MemoryStream ms, Int32 value)
    {
        ms.WriteByte((Byte)(value >> 24));
        ms.WriteByte((Byte)(value >> 16));
        ms.WriteByte((Byte)(value >> 8));
        ms.WriteByte((Byte)value);
    }

    /// <summary>写入大端序Int64</summary>
    private static void WriteBE64(MemoryStream ms, Int64 value)
    {
        WriteBE32(ms, (Int32)(value >> 32));
        WriteBE32(ms, (Int32)value);
    }

    /// <summary>写入大端序Int16</summary>
    private static void WriteBE16(MemoryStream ms, Int16 value)
    {
        ms.WriteByte((Byte)(value >> 8));
        ms.WriteByte((Byte)value);
    }

    /// <summary>构建批量消息Body(多条子消息的二进制数据)</summary>
    private static Byte[] BuildBatchBody(params (String body, String topic, String props)[] items)
    {
        var ms = new MemoryStream();

        foreach (var (body, topic, props) in items)
        {
            var bodyBytes = body.GetBytes();
            var topicBytes = topic.GetBytes();
            var propsBytes = props.GetBytes();

            // TotalSize = 4+4+4+4 + 4+bodyLen + 1+topicLen + 2+propsLen
            var totalSize = 4 + 4 + 4 + 4 + bodyBytes.Length + 1 + topicBytes.Length + 2 + propsBytes.Length;

            WriteBE32(ms, totalSize);       // TotalSize
            WriteBE32(ms, 0);               // MagicCode
            WriteBE32(ms, 0);               // BodyCRC
            WriteBE32(ms, 0);               // Flag

            WriteBE32(ms, bodyBytes.Length); // BodyLength
            ms.Write(bodyBytes, 0, bodyBytes.Length);

            ms.WriteByte((Byte)topicBytes.Length); // TopicLength
            ms.Write(topicBytes, 0, topicBytes.Length);

            WriteBE16(ms, (Int16)propsBytes.Length); // PropertiesLength
            if (propsBytes.Length > 0) ms.Write(propsBytes, 0, propsBytes.Length);
        }

        return ms.ToArray();
    }

    /// <summary>构建一条完整的外层消息二进制(带SysFlag=0x10标识批量)</summary>
    private static Byte[] BuildOuterMessage(Byte[] batchBody)
    {
        var ms = new MemoryStream();
        var ipBytes = new Byte[] { 127, 0, 0, 1 };
        var topic = "batch_topic"u8.ToArray();
        var props = ""u8.ToArray();
        var sysFlag = 0x10; // 批量消息标志

        var storeSize = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 +
                        8 + 4 + 4 +
                        8 + 4 + 4 +
                        4 + 8 +
                        4 + batchBody.Length + 1 + topic.Length + 2 + props.Length;

        WriteBE32(ms, storeSize);
        WriteBE32(ms, 0);            // MagicCode
        WriteBE32(ms, 0);            // BodyCRC
        WriteBE32(ms, 0);            // QueueId
        WriteBE32(ms, 0);            // Flag
        WriteBE64(ms, 0L);           // QueueOffset
        WriteBE64(ms, 100L);         // CommitLogOffset
        WriteBE32(ms, sysFlag);      // SysFlag (batch)

        WriteBE64(ms, 1000L);        // BornTimestamp
        ms.Write(ipBytes, 0, 4);     // BornHost IP
        WriteBE32(ms, 9876);         // BornHost Port

        WriteBE64(ms, 2000L);        // StoreTimestamp
        ms.Write(ipBytes, 0, 4);     // StoreHost IP
        WriteBE32(ms, 10911);        // StoreHost Port

        WriteBE32(ms, 0);            // ReconsumeTimes
        WriteBE64(ms, 0L);           // PreparedTransactionOffset

        WriteBE32(ms, batchBody.Length); // BodyLength
        ms.Write(batchBody, 0, batchBody.Length);

        ms.WriteByte((Byte)topic.Length);
        ms.Write(topic, 0, topic.Length);

        WriteBE16(ms, (Int16)props.Length);

        return ms.ToArray();
    }
    #endregion

    #region 批量消息解码测试
    [Fact]
    [DisplayName("批量消息解码_解码2条子消息")]
    public void DecodeBatch_TwoMessages()
    {
        var batchBody = BuildBatchBody(
            ("hello", "topic1", ""),
            ("world", "topic2", "")
        );

        var parent = new MessageExt
        {
            QueueId = 1,
            CommitLogOffset = 100,
            SysFlag = 0x10,
            BornTimestamp = 1000,
            BornHost = "127.0.0.1:9876",
            StoreTimestamp = 2000,
            StoreHost = "127.0.0.1:10911",
            Body = batchBody,
            Topic = "batch_topic",
            MsgId = "PARENT_ID",
        };

        var msgs = MessageExt.DecodeBatch(parent);

        Assert.Equal(2, msgs.Count);

        Assert.Equal("hello", msgs[0].BodyString);
        Assert.Equal("topic1", msgs[0].Topic);
        Assert.Equal(1, msgs[0].QueueId);
        Assert.Equal(100, msgs[0].CommitLogOffset);
        Assert.Equal("127.0.0.1:9876", msgs[0].BornHost);

        Assert.Equal("world", msgs[1].BodyString);
        Assert.Equal("topic2", msgs[1].Topic);
    }

    [Fact]
    [DisplayName("批量消息解码_解码带Properties的子消息")]
    public void DecodeBatch_WithProperties()
    {
        var props = "TAGS\u0001TagA\u0002KEYS\u0001Key1\u0002";
        var batchBody = BuildBatchBody(
            ("data", "my_topic", props)
        );

        var parent = new MessageExt
        {
            SysFlag = 0x10,
            Body = batchBody,
            MsgId = "P1",
        };

        var msgs = MessageExt.DecodeBatch(parent);

        Assert.Single(msgs);
        Assert.Equal("data", msgs[0].BodyString);
        Assert.Equal("my_topic", msgs[0].Topic);
        Assert.Equal("TagA", msgs[0].Tags);
        Assert.Equal("Key1", msgs[0].Keys);
    }

    [Fact]
    [DisplayName("批量消息解码_空Body返回空列表")]
    public void DecodeBatch_EmptyBody()
    {
        var parent = new MessageExt
        {
            SysFlag = 0x10,
            Body = [],
            MsgId = "P1",
        };

        var msgs = MessageExt.DecodeBatch(parent);
        Assert.Empty(msgs);
    }

    [Fact]
    [DisplayName("批量消息解码_Null参数抛出异常")]
    public void DecodeBatch_NullParent_Throws()
    {
        Assert.Throws<ArgumentNullException>(() => MessageExt.DecodeBatch(null));
    }

    [Fact]
    [DisplayName("批量消息ReadAll自动展开")]
    public void ReadAll_BatchMessage_AutoExpand()
    {
        var batchBody = BuildBatchBody(
            ("msg1", "t1", ""),
            ("msg2", "t2", ""),
            ("msg3", "t3", "")
        );

        var data = BuildOuterMessage(batchBody);
        var pk = new ArrayPacket(data);
        var msgs = MessageExt.ReadAll(pk);

        Assert.Equal(3, msgs.Count);
        Assert.Equal("msg1", msgs[0].BodyString);
        Assert.Equal("msg2", msgs[1].BodyString);
        Assert.Equal("msg3", msgs[2].BodyString);
        Assert.Equal("t1", msgs[0].Topic);
        Assert.Equal("t2", msgs[1].Topic);
        Assert.Equal("t3", msgs[2].Topic);
    }

    [Fact]
    [DisplayName("SysFlag批量位判断")]
    public void SysFlag_Batch_Bit()
    {
        Assert.NotEqual(0, 0x10 & 0x10);  // 批量
        Assert.Equal(0, 0 & 0x10);        // 普通
        Assert.NotEqual(0, 0x11 & 0x10);  // 压缩+批量
        Assert.Equal(0, 0x01 & 0x10);     // 仅压缩
    }
    #endregion
}