Upgrade Nuget
大石头 authored at 2025-07-14 14:10:44
2.41 KiB
NewLife.RocketMQ
using System;
using System.Threading;
using NewLife;
using NewLife.Log;
using NewLife.RocketMQ;
using NewLife.RocketMQ.Protocol;
using Xunit;

namespace XUnitTestRocketMQ
{
    public class MessageTraceTests
    {
        private const String Topic = "TopicDemo";
        private const String Group = "TraceTestGroup";
        private const String NameServerAddress = "127.0.0.1:9876";

        [Fact]
        public void Producer_And_Consumer_With_Trace_Enabled_Should_Work()
        {
            // 使用 ManualResetEvent 来同步测试的完成
            var mre = new ManualResetEvent(false);

            // 1. 创建并启动消费者
            var consumer = new Consumer
            {
                Topic = Topic,
                Group = Group,
                NameServerAddress = NameServerAddress,
                EnableMessageTrace = true, // 启用消息轨迹
                Log = XTrace.Log
            };

            consumer.OnConsume = (q, msgs) =>
            {
                foreach (var msg in msgs)
                {
                    XTrace.WriteLine($"消费到消息: {msg.Body.ToStr()}");
                }
                // 设置事件,表示消费成功
                mre.Set();
                return true;
            };

            consumer.Start();

            // 2. 创建并启动生产者
            var producer = new Producer
            {
                Topic = Topic,
                Group = Group, // 生产者组可以和消费者组不同,这里为了简单使用同一个
                NameServerAddress = NameServerAddress,
                EnableMessageTrace = true, // 启用消息轨迹
                Log = XTrace.Log
            };

            producer.Start();

            // 3. 发送消息
            var messageBody = "Hello, RocketMQ with Message Trace!";
            var sendResult = producer.Publish(messageBody);

            Assert.NotNull(sendResult);
            Assert.Equal(SendStatus.SendOK, sendResult.Status);
            XTrace.WriteLine($"消息发送成功: MsgId={sendResult.MsgId}");

            // 4. 等待消费者处理消息,设置一个超时时间以防测试挂起
            bool consumed = mre.WaitOne(TimeSpan.FromSeconds(30));

            // 5. 清理资源
            producer.Stop();
            consumer.Stop();

            // 6. 断言
            Assert.True(consumed, "消费者在超时时间内没有收到消息。");
        }
    }
}