所有分支的提交所有分支的提交都要跑test都要跑test
大石头 authored at 2022-03-29 23:35:41
2.08 KiB
NewLife.RocketMQ
using System;
using System.Threading;
using NewLife.Log;
using NewLife.RocketMQ;
using NewLife.RocketMQ.Protocol;
using Xunit;
using XUnitTest.Integration;

namespace XUnitTest.Producers;

public class ProducerTracerTests
{
    private const String Topic = "TopicDemo";
    private const String Group = "TraceTestGroup";

    [Fact]
    public void Producer_And_Consumer_With_Trace_Enabled_Should_Work()
    {
        XTrace.UseConsole();

        var set = BasicTest.GetConfig();
        var mre = new ManualResetEvent(false);

        // 1. 先启动消费者,确保 Rebalance 完成后再发送消息
        using var consumer = new Consumer
        {
            Topic             = Topic,
            Group             = Group,
            NameServerAddress = set.NameServer,
            FromLastOffset    = true,
            Log               = XTrace.Log,
        };

        consumer.OnConsume = (q, ms) =>
        {
            foreach (var item in ms)
            {
                XTrace.WriteLine("消费到消息: {0}", item.BodyString);
                mre.Set();
            }
            return true;
        };

        consumer.Start();

        // 等待消费者完成 Rebalance
        Thread.Sleep(3000);

        // 2. 创建并启动生产者
        using var producer = new Producer
        {
            Topic             = Topic,
            Group             = Group,
            NameServerAddress = set.NameServer,
            EnableMessageTrace = true,
            Log               = XTrace.Log,
        };

        producer.Start();

        // 3. 发送消息
        var messageBody = "Hello, RocketMQ with Message Trace!";
        var sendResult = producer.Publish(messageBody);

        Assert.NotNull(sendResult);
        Assert.Equal(SendStatus.SendOK, sendResult.Status);
        XTrace.WriteLine("消息发送成功: MsgId={0}", sendResult.MsgId);

        // 4. 等待消费者收到消息,30 秒超时
        var consumed = mre.WaitOne(TimeSpan.FromSeconds(30));

        // 5. 断言
        Assert.True(consumed, "消费者在超时时间内没有收到消息。");
    }
}