所有分支的提交所有分支的提交都要跑test都要跑test
大石头 authored at 2022-03-29 23:35:41
4.01 KiB
NewLife.RocketMQ
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Threading;
using System.Threading.Tasks;
using NewLife.RocketMQ;
using NewLife.RocketMQ.Protocol;
using Xunit;

namespace XUnitTest.Integration;

/// <summary>Producer 集成测试</summary>
/// <remarks>
/// 需要本机启动 RocketMQ 并设置环境变量 ROCKETMQ_NAMESERVER=127.0.0.1:9876。
/// 启动命令:dotnet run --file scripts/RocketMqSetup.cs
/// </remarks>
/// <remarks>初始化</remarks>
/// <param name="fixture">RocketMQ Fixture</param>
[Collection("RocketMQ")]
public class ProducerIntegrationTests(RocketMqFixture fixture) : IClassFixture<RocketMqFixture>
{
    [Fact]
    [DisplayName("发送普通消息_返回SendOK")]
    public async Task PublishMessage_ReturnsSuccess()
    {
        using var producer = new Producer
        {
            Topic             = "integration-test-topic",
            NameServerAddress = fixture.NameServerAddress,
        };
        producer.Start();

        var result = await producer.PublishAsync("Hello from integration test");
        Assert.Equal(SendStatus.SendOK, result.Status);
        Assert.NotEmpty(result.MsgId);
    }

    [Fact]
    [DisplayName("发送带属性的消息_属性正常保存")]
    public async Task PublishMessageWithProperties_PropertiesPreserved()
    {
        using var producer = new Producer
        {
            Topic             = "integration-test-topic",
            NameServerAddress = fixture.NameServerAddress,
        };
        producer.Start();

        var msg = new Message
        {
            Topic = "integration-test-topic",
            Body  = "test body"u8.ToArray(),
            Keys  = "TestKey001",
            Tags  = "TagA",
        };

        var result = await producer.PublishAsync(msg, null);
        Assert.Equal(SendStatus.SendOK, result.Status);
    }

    [Fact]
    [DisplayName("并发发送多条消息_全部成功")]
    public async Task PublishMessagesParallel_AllSucceed()
    {
        using var producer = new Producer
        {
            Topic             = "integration-test-topic",
            NameServerAddress = fixture.NameServerAddress,
        };
        producer.Start();

        var tasks = new List<Task<SendResult>>();
        for (var i = 0; i < 10; i++)
        {
            tasks.Add(producer.PublishAsync($"Parallel message {i}"));
        }

        var results = await Task.WhenAll(tasks);
        foreach (var r in results)
        {
            Assert.Equal(SendStatus.SendOK, r.Status);
        }
    }
}

/// <summary>Consumer 集成测试</summary>
/// <remarks>初始化</remarks>
/// <param name="fixture">RocketMQ Fixture</param>
[Collection("RocketMQ")]
public class ConsumerIntegrationTests(RocketMqFixture fixture) : IClassFixture<RocketMqFixture>
{
    [Fact]
    [DisplayName("先发再消费_能收到消息")]
    public async Task ProduceThenConsume_MessageReceived()
    {
        const String topic   = "integration-consume-topic";
        const String content = "Hello Consumer";

        using var producer = new Producer
        {
            Topic             = topic,
            NameServerAddress = fixture.NameServerAddress,
        };
        producer.Start();
        await producer.PublishAsync(content);

        var tcs = new TaskCompletionSource<MessageExt>();
        using var consumer = new Consumer
        {
            Topic             = topic,
            Group             = "integration-consumer-group",
            NameServerAddress = fixture.NameServerAddress,
        };

        consumer.OnConsume = (queue, msgs) =>
        {
            foreach (var m in msgs)
            {
                if (!tcs.Task.IsCompleted)
                    tcs.TrySetResult(m);
            }
            return true;
        };
        consumer.Start();

        using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
        cts.Token.Register(() => tcs.TrySetCanceled());

        var received = await tcs.Task;
        Assert.NotNull(received);
        Assert.Equal(content, received.BodyString);
    }
}