所有分支的提交所有分支的提交都要跑test都要跑test
大石头 authored at 2022-03-29 23:35:41
2.64 KiB
NewLife.RocketMQ
using NewLife;
using NewLife.Log;
using NewLife.RocketMQ;
using NewLife.RocketMQ.Client;
using NewLife.RocketMQ.Protocol;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace XUnitTest.Cloud;

public class AliyunTests
{
    private static void SetConfig(MqBase mq)
    {
        //mq.Server = "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet";
        mq.Configure(MqSetting.Current);

        mq.Log = XTrace.Log;
    }

    [Fact(Skip = "需要阿里云RocketMQ服务器支持")]
    public void CreateTopic()
    {
        var mq = new Producer
        {
            //Topic = "nx_test",
        };
        SetConfig(mq);

        mq.Start();

        // 创建topic时,start前不能指定topic,让其使用默认TBW102
        Assert.Equal("TBW102", mq.Topic);

        mq.CreateTopic("nx_test", 2);
    }

    [Fact(Skip = "需要阿里云RocketMQ服务器支持")]
    static void ProduceTest()
    {
        using var mq = new Producer
        {
            Topic = "test1",
        };
        SetConfig(mq);

        mq.Start();

        for (var i = 0; i < 10; i++)
        {
            var str = "学无先后达者为师" + i;
            //var str = Rand.NextString(1337);

            var sr = mq.Publish(str, "TagA", null);
        }
    }

    [Fact(Skip = "需要阿里云RocketMQ服务器支持")]
    static async Task ProduceAsyncTest()
    {
        using var mq = new Producer
        {
            Topic = "test1",
        };
        SetConfig(mq);

        mq.Start();

        for (var i = 0; i < 10; i++)
        {
            var str = "学无先后达者为师" + i;
            //var str = Rand.NextString(1337);

            var sr = await mq.PublishAsync(str, "TagA", null);
        }
    }

    private static Consumer _consumer;
    [Fact(Skip = "需要阿里云RocketMQ服务器支持")]
    static void ConsumeTest()
    {
        var consumer = new Consumer
        {
            Topic = "test1",
            Group = "test",

            FromLastOffset = true,
            BatchSize = 20,
        };
        SetConfig(consumer);

        consumer.OnConsume = OnConsume;
        consumer.Start();

        _consumer = consumer;

        Thread.Sleep(3000);
    }

    private static Boolean OnConsume(MessageQueue q, MessageExt[] ms)
    {
        Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);

        foreach (var item in ms.ToList())
        {
            Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】");
        }

        return true;
    }
}