所有分支的提交所有分支的提交都要跑test都要跑test
大石头 authored at 2022-03-29 23:35:41
4.07 KiB
NewLife.RocketMQ
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Threading.Tasks;
using NewLife.Log;
using NewLife.RocketMQ;
using NewLife.RocketMQ.Protocol;
using Xunit;

namespace XUnitTestRocketMQ;

/// <summary>管理功能测试</summary>
public class ManagementTests
{
    [Fact(Skip = "需要RocketMQ服务器")]
    [DisplayName("DeleteTopic_删除主题")]
    public void DeleteTopic_Test()
    {
        var set = BasicTest.GetConfig();
        using var mq = new Producer
        {
            NameServerAddress = set.NameServer,
            Log = XTrace.Log,
        };
        mq.Start();

        // 先创建再删除
        mq.CreateTopic("nx_delete_test", 2);
        var count = mq.DeleteTopic("nx_delete_test");
        Assert.True(count >= 0);
    }

    [Fact(Skip = "需要RocketMQ服务器")]
    [DisplayName("CreateSubscriptionGroup_创建消费组")]
    public void CreateSubscriptionGroup_Test()
    {
        var set = BasicTest.GetConfig();
        using var mq = new Producer
        {
            NameServerAddress = set.NameServer,
            Log = XTrace.Log,
        };
        mq.Start();

        var count = mq.CreateSubscriptionGroup("nx_test_group", true, 16, 1);
        Assert.True(count >= 0);
    }

    [Fact(Skip = "需要RocketMQ服务器")]
    [DisplayName("DeleteSubscriptionGroup_删除消费组")]
    public void DeleteSubscriptionGroup_Test()
    {
        var set = BasicTest.GetConfig();
        using var mq = new Producer
        {
            NameServerAddress = set.NameServer,
            Log = XTrace.Log,
        };
        mq.Start();

        mq.CreateSubscriptionGroup("nx_delete_group");
        var count = mq.DeleteSubscriptionGroup("nx_delete_group");
        Assert.True(count >= 0);
    }

    [Fact(Skip = "需要RocketMQ服务器")]
    [DisplayName("ViewMessage_按ID查看消息")]
    public void ViewMessage_Test()
    {
        var set = BasicTest.GetConfig();
        using var mq = new Producer
        {
            Topic = "nx_test",
            NameServerAddress = set.NameServer,
            Log = XTrace.Log,
        };
        mq.Start();

        // 先发一条消息
        var sr = mq.Publish("ViewMessage_Test", "TagA", null);
        Assert.NotNull(sr?.MsgId);

        // 尝试查看(可能因Broker索引延迟返回null)
        var msg = mq.ViewMessage(sr.MsgId);
        // 不做严格断言,仅验证不抛出异常
    }

    [Fact(Skip = "需要RocketMQ服务器")]
    [DisplayName("GetClusterInfo_获取集群信息")]
    public void GetClusterInfo_Test()
    {
        var set = BasicTest.GetConfig();
        using var mq = new Producer
        {
            NameServerAddress = set.NameServer,
            Log = XTrace.Log,
        };
        mq.Start();

        var info = mq.GetClusterInfo();
        // 不做严格断言,仅验证不抛出异常
    }

    [Fact(Skip = "需要RocketMQ服务器")]
    [DisplayName("GetConsumerConnectionList_获取消费者连接列表")]
    public async Task GetConsumerConnectionList_Test()
    {
        var set = BasicTest.GetConfig();
        using var consumer = new Consumer
        {
            Topic = "nx_test",
            Group = "test",
            NameServerAddress = set.NameServer,
            Log = XTrace.Log,
        };
        consumer.Start();

        var list = await consumer.GetConsumerConnectionList("test");
        // 不做严格断言,仅验证不抛出异常
    }

    [Fact(Skip = "需要RocketMQ服务器")]
    [DisplayName("ResetConsumerOffset_重置消费偏移")]
    public async Task ResetConsumerOffset_Test()
    {
        var set = BasicTest.GetConfig();
        using var consumer = new Consumer
        {
            Topic = "nx_test",
            Group = "test",
            NameServerAddress = set.NameServer,
            Log = XTrace.Log,
        };
        consumer.Start();

        // 重置到一小时前
        var timestamp = DateTimeOffset.UtcNow.AddHours(-1).ToUnixTimeMilliseconds();
        var result = await consumer.ResetConsumerOffset(timestamp);
        // 不做严格断言,仅验证不抛出异常
    }
}