增加ACL配置,基于Apache 4.9.3版本测试
顾小顾 authored at 2022-03-19 20:50:42 顾小顾 committed at 2022-03-19 20:51:18
3.02 KiB
NewLife.RocketMQ
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using NewLife;
using NewLife.RocketMQ;
using NewLife.RocketMQ.Protocol;
using Xunit;

namespace XUnitTestRocketMQ
{
    public class SupportApacheAclTest
    {
        private const String NameServerAddress = "127.0.0.1:9876";
        private const String TestTopic = "newlife_acl_test_topic";

        /// <summary>
        /// 创建Topic时默认为系统 Topic => TBW102,可省略
        /// </summary>
        private const String DefaultSysTopic = "TBW102";

        private readonly AclOptions _aclOptions = new AclOptions() {AccessKey = "rocketmq2AcKey", SecretKey = "rocketmq2SeKey", OnsChannel = "LOCAL"};
        
        [Fact]
        public void CreateTopicTest()
        {
            using var producer = CreateProducerInstance(DefaultSysTopic);
            producer.Start();
            producer.CreateTopic(TestTopic, 2);
            producer.Dispose();
        }

        [Fact]
        public void PublishMessageTest()
        {
            using var producer = CreateProducerInstance(TestTopic);
            producer.Start();

            var pubResultList = new List<Boolean>();
            for (var i = 0; i < 10; i++)
            {
                const String message = "大家好才是真的好!";
                var pubResult = producer.Publish(message, "new_life_test_tag");
                pubResultList.Add(pubResult.Status == SendStatus.SendOK);
            }

            Assert.True(pubResultList.All(_ => true));
            producer.Dispose();
        }

        [Fact]
        public void ConsumeMessageTest()
        {
            using var consumer = CreateConsumerInstance(TestTopic);
            consumer.OnConsume = OnConsume;
            consumer.Start();
            Thread.Sleep(3000);
           
            static Boolean OnConsume(MessageQueue q, MessageExt[] ms)
            {
                Console.WriteLine("[{0}@{1}]收到消息[{2}]", q.BrokerName, q.QueueId, ms.Length);

                foreach (var item in ms.ToList())
                {
                    Console.WriteLine($"消息:主键【{item.Keys}】,产生时间【{item.BornTimestamp.ToDateTime()}】,内容【{item.Body.ToStr()}】");
                }

                return true;
            }
        }

        private Producer CreateProducerInstance(String topic)
        {
            var producer = new Producer();

            producer.NameServerAddress = NameServerAddress;
            producer.Topic = topic;
            producer.AclOptions = _aclOptions;

            return producer;
        }
        
        private Consumer CreateConsumerInstance(String topic)
        {
            var consumer = new Consumer();

            consumer.NameServerAddress = NameServerAddress;
            consumer.Topic = topic;
            consumer.AclOptions = _aclOptions;
            consumer.Group = "new_life_test_group";
            consumer.FromLastOffset = true;
            consumer.BatchSize = 5;
            
            return consumer;
        }
    }
}