所有分支的提交所有分支的提交都要跑test都要跑test
|
# NewLife.RocketMQ 产å“åˆ†æžæŠ¥å‘Š
> 当å‰å®¢æˆ·ç«¯ç‰ˆæœ¬ï¼š3.0.x
> å议版本声明:`MQVersion.V4_9_7`(默认,å¯é…ç½®ï¼Œå·²æ”¯æŒ V5.9.9 åŠæ›´é«˜ç‰ˆæœ¬ï¼‰
> ç›®æ ‡æ¡†æž¶ï¼šnet45 / net461 / netstandard2.0 / netstandard2.1 / net10
> å¼€å‘团队:新生命团队(NewLife)
> 项目仓库:[GitHub](https://github.com/NewLifeX/NewLife.RocketMQ) | [Gitee](https://gitee.com/NewLifeX/NewLife.RocketMQ)
---
## 执行摘è¦
NewLife.RocketMQ 是一款ä¼ä¸šçº§ **纯托管 .NET RocketMQ 客户端**,专为 .NET 生æ€è®¾è®¡ï¼Œå®Œå…¨ä½¿ç”¨ C# 实现,**零外部ä¾èµ–**ï¼ˆæ— éœ€ Javaã€gRPCã€Protobuf ç¬¬ä¸‰æ–¹åº“ï¼‰ã€‚è¯¥å®¢æˆ·ç«¯åŒæ—¶æ”¯æŒï¼š
- **RocketMQ Remoting å议(4.x)**:完整实现 Apache RocketMQ 4.x å…¨éƒ¨æ ¸å¿ƒåŠŸèƒ½
- **gRPC Proxy å议(5.x)**:内置轻é‡çº§ Protobuf ç¼–è§£ç å™¨ï¼ŒåŽŸç”Ÿæ”¯æŒ RocketMQ 5.x æ–°æž¶æž„
- **多云厂商适é…**:统一接å£é€‚é…阿里云ã€åŽä¸ºäº‘ã€è…¾è®¯äº‘åŠ Apache ACL 认è¯ä½“ç³»
**æ ¸å¿ƒä¼˜åŠ¿**:
- ✅ **生产就绪**:完整的消费é‡è¯•ã€æ»ä¿¡é˜Ÿåˆ—ã€äº‹åŠ¡å›žæŸ¥ã€é¡ºåºæ¶ˆè´¹ç‰ä¼ä¸šçº§ç‰¹æ€§
- ✅ **跨平å°**ï¼šæ”¯æŒ .NET Framework 4.5+ 到 .NET 10,gRPC 功能在 .NET Standard 2.1+ å¯ç”¨
- ✅ **é›¶ä¾èµ–**ï¼šæ— éœ€å®‰è£… Java 或 gRPC è¿è¡Œæ—¶ï¼Œå†…置完整 Protobuf ç¼–è§£ç
- ✅ **高性能**:基于 NewLife.Net 的高效网络层,支æŒè¿žæŽ¥æ± ã€SSL/TLSã€VIP 通é“
- ✅ **易扩展**:统一云厂商适é…器接å£ï¼Œè½»æ¾æŽ¥å…¥å„云æœåС商
---
## 一ã€é¡¹ç›®æ¦‚è¿°
### 1.1 产å“定ä½
NewLife.RocketMQ 是新生命团队开å‘çš„**ä¼ä¸šçº§çº¯æ‰˜ç®¡è½»é‡çº§ RocketMQ 客户端**,旨在为 .NET å¼€å‘者æä¾›ä¸€ä¸ªï¼š
- **功能完整**:覆盖 RocketMQ 4.x/5.x å…¨éƒ¨æ ¸å¿ƒç‰¹æ€§åŠä¼ä¸šçº§åŠŸèƒ½
- **架构现代**ï¼šæ”¯æŒæœ€æ–° C# è¯æ³•(file-scoped namespaceã€recordã€switch expression ç‰ï¼‰
- **兼容广泛**:从 .NET Framework 4.5 到 .NET 10 全版本覆盖
- **易于集æˆ**:NuGet ä¸€é”®å®‰è£…ï¼Œæ— éœ€é¢å¤–é…置或ä¾èµ–安装
### 1.2 技术特点
**å议支æŒ**:
- **Remoting åè®®**:完整实现 RocketMQ 4.x TCP ç§æœ‰å议(约 60+ RequestCode)
- **gRPC Proxy åè®®**ï¼šè‡ªç ”è½»é‡çº§ Protobuf ç¼–è§£ç 器(ProtoWriter/ProtoReaderï¼Œæ— å¤–éƒ¨ä¾èµ–ï¼‰ï¼Œæ”¯æŒ RocketMQ 5.x çš„ 11 ä¸ªæ ¸å¿ƒ RPC 方法
**架构设计**:
- **分层清晰**:业务层(Producer/Consumer)ã€é€šä¿¡å±‚(ClusterClient/NameClient/BrokerClient)ã€å议层(Command/Message/Header)ã€ä¼ 输层(Remoting/gRPC)
- **坿µ‹è¯•性**:30+ æµ‹è¯•ç±»è¦†ç›–æ ¸å¿ƒåŠŸèƒ½ï¼ˆæ‰¹é‡æ¶ˆæ¯ã€äº‹åŠ¡å›žæŸ¥ã€å¤š Topicã€Pop 消费ã€IPv6ã€åŽ‹ç¼©ã€é‡è¯•ã€é¡ºåºæ¶ˆè´¹ç‰ï¼‰
- **å¯è§‚测性**:内置消æ¯è½¨è¿¹ï¼ˆMessageTraceHookï¼‰ã€æ€§èƒ½è¿½è¸ªï¼ˆTracer)ã€ç»“构化日志(ILog)
### 1.3 æ ¸å¿ƒä»·å€¼
| 价值维度 | 说明 |
|---------|------|
| **ä¼ä¸šçº§å¯é 性** | 完整的消费é‡è¯•ã€æ»ä¿¡é˜Ÿåˆ—ã€äº‹åŠ¡å›žæŸ¥ã€é¡ºåºæ¶ˆè´¹é”定ç‰ç”Ÿäº§çº§ç‰¹æ€§ |
| **多云厂商适é…** | 统一 `ICloudProvider` 接å£ï¼Œå·²é€‚é…阿里云ã€åŽä¸ºäº‘ã€è…¾è®¯äº‘åŠ Apache ACL |
| **高兼容性** | 兼容 RocketMQ 4.0 ~ 5.xï¼Œæ”¯æŒ .NET Framework 4.5+ 到 .NET 10 全版本 |
| **零外部ä¾èµ–** | 纯 C# å®žçŽ°ï¼Œæ— éœ€ Javaã€gRPCã€Protobuf 第三方库,部署è¿ç»´æˆæœ¬ä½Ž |
| **性能优化** | 连接å¤ç”¨ã€å¯¹è±¡æ± ã€VIP 通é“ã€æ¶ˆæ¯åŽ‹ç¼©ã€å¹¶å‘æŽ§åˆ¶ç‰æ€§èƒ½ä¼˜åŒ–手段 |
| **易于使用** | 简æ´çš„ API 设计,链å¼é…置,NuGet 一键安装 |
---
## äºŒã€æ ¸å¿ƒæž¶æž„分æž
### 2.1 整体架构
```
MqBase (业务基类,NameServer连接/Broker管ç†/Topic与消费组CRUD/æ¶ˆæ¯æŸ¥è¯¢)
├── Producer (生产者:普通/异æ¥/å•å‘/延迟/事务/批é‡/Request-Reply/gRPC)
└── Consumer (消费者:Pull/调度/Rebalance/多Topic/顺åº/é‡è¯•/Pop/gRPC)
ClusterClient (集群客户端/通信层,Remotingåè®®)
├── NameClient (åç§°æœåŠ¡å™¨å®¢æˆ·ç«¯ï¼šè·¯ç”±å‘现/定时轮询/多Topic路由/Broker主从解æž)
└── BrokerClient (Broker客户端:心跳/注销/命令收å‘)
Grpc/ (gRPCä¼ è¾“å±‚ï¼ŒRocketMQ 5.x Proxyå议,netstandard2.1+)
├── GrpcClient (HTTP/2 gRPC客户端,帧编解ç ,Unary + Server Streaming)
├── GrpcMessagingService (æ¶ˆæ¯æœåŠ¡ï¼šè·¯ç”±/å‘é€/接收/确认/心跳/事务/延迟/æ»ä¿¡/ä¸å¯è§æ—¶é—´/Telemetry)
├── ProtoWriter / ProtoReader (è½»é‡çº§Protobufç¼–è§£ç å™¨ï¼Œæ— å¤–éƒ¨ä¾èµ–)
├── GrpcModels (Resource/Endpoints/Message/SystemProperties/MessageQueue ç‰)
├── GrpcServiceMessages (Request/Response 消æ¯ç±»åž‹ï¼Œçº¦25个,å«Telemetry)
└── GrpcEnums (GrpcCode/GrpcMessageType/GrpcClientType/AddressScheme ç‰)
Protocol/
├── Command (命令帧,Remotingå议编解ç )
├── MqCodec (网络编解ç 器)
├── Header (通信头)
├── Message / MessageExt (æ¶ˆæ¯æ¨¡åž‹ï¼Œå«æ‰¹é‡è§£ç /ZLIB解压/IPv4+IPv6/5.x MessageId)
├── SendMessageRequestHeader / PullMessageRequestHeader / EndTransactionRequestHeader
├── RequestCode (约60个指令ç ) / ResponseCode (约20个å“应ç )
├── MQVersion (V3.0 ~ V5.9.9 + HIGHER_VERSION,约450个版本)
└── TransactionState / LanguageCode / SerializeType ç‰è¾…助类型
CloudProvider/ (云厂商适é…层)
├── ICloudProvider (统一云厂商接å£ï¼šç¾å/路由/NameServerå‘现)
├── AliyunProvider (阿里云适é…:实例IDå‰ç¼€è·¯ç”± + HTTP NameServerå‘现)
├── AclProvider (Apache ACL适é…:HMAC-SHA1ç¾å)
├── HuaweiProvider (åŽä¸ºäº‘适é…:SSL/TLS + 实例ID路由)
└── TencentProvider (腾讯云适é…:Namespaceå‰ç¼€è·¯ç”±)
```
### 2.2 通信层特点
| 特性 | Remoting åè®® | gRPC åè®® |
|------|--------------|-----------|
| **ä¼ è¾“å±‚** | TCP 长连接(NewLife.Net) | HTTP/2(HttpClient) |
| **ç¼–è§£ç ** | 自定义二进制帧 + JSON | Protobufï¼ˆè‡ªç ”ç¼–è§£ç 器) |
| **支æŒç‰ˆæœ¬** | RocketMQ 4.x / 5.x Broker | RocketMQ 5.x Proxy |
| **ç›®æ ‡æ¡†æž¶** | net45+ | netstandard2.1+ / net5+ |
| **连接å¤ç”¨** | å•连接 Opaque å¤ç”¨ | HTTP/2 多路å¤ç”¨ |
| **SSL/TLS** | 支æŒï¼ˆSslProtocols + Certificate) | 原生支æŒï¼ˆHTTPS) |
| **VIP 通é“** | 支æŒï¼ˆBrokerPort - 2) | N/A(HTTP/2 优先级æµï¼‰ |
| **ç¾å认è¯** | HMAC-SHA1(统一由 ICloudProvider 实现) | HTTP Header(X-Mq-*) |
| **外部代ç†** | 支æŒï¼ˆExternalBroker 属性) | 逿˜Žä»£ç†ï¼ˆHTTP Proxy) |
| **消æ¯åŽ‹ç¼©** | ZLIB(å‘é€ç«¯ï¼ŒSysFlag æ ‡è®°ï¼‰ | gzip(HTTP Content-Encoding) |
### 2.3 代ç 组织结构
**æ ¸å¿ƒæ¨¡å—**:
- **MqBase.cs**(约 1500+ 行):业务基类,å°è£… NameServer 连接ã€Broker 管ç†ã€Topic/消费组 CRUDã€æ¶ˆæ¯æŸ¥è¯¢ã€æ¶ˆè´¹ç»Ÿè®¡ã€gRPC 生命周期管ç†
- **Producer.cs**(约 1200+ 行):生产者实现,支æŒåŒæ¥/异æ¥/å•å‘/延迟/事务/批é‡å‘é€ã€Request-Replyã€gRPC 扩展
- **Consumer.cs**(约 1800+ è¡Œï¼‰ï¼šæ¶ˆè´¹è€…å®žçŽ°ï¼Œæ”¯æŒ Pull/调度/Rebalance/多 Topic/顺åº/é‡è¯•/Pop/批é‡ç¡®è®¤ã€gRPC 消费
- **ClusterClient.cs**(约 423 行):TCP 连接管ç†ã€HMAC-SHA1 统一ç¾åã€åŒæ¥å¼‚æ¥å‘½ä»¤æ”¶å‘
- **NameClient.cs**(约 243 行):路由å‘现/定时轮询/多 Topic 路由/Broker 主从解æž
- **BrokerClient.cs**(约 142 行):心跳/注销/命令收å‘
**å议层**:
- **Command.cs**(约 333 行):Remoting å议帧编解ç
- **MessageExt.cs**(约 244 è¡Œï¼‰ï¼šæ¶ˆæ¯æ‰©å±•ï¼Œæ”¯æŒæ‰¹é‡è§£ç ã€ZLIB 解压ã€IPv4+IPv6 地å€è§£æžã€5.x MessageId ç¼–è§£ç
- **RequestCode.cs**(约 277 行,60+ 指令):æœåŠ¡ç«¯è¯·æ±‚ç æžšä¸¾
- **MQVersion.cs**(约 909 行,450+ 版本):å议版本枚举(V3.0 ~ V5.9.9 + HIGHER_VERSION)
**gRPC 层**(netstandard2.1+):
- **GrpcClient.cs**(约 310 行):HTTP/2 帧编ç ã€Unary/ServerStreaming 调用
- **ProtoWriter.cs**(约 343 行):Protobuf 二进制编ç 器(varint/fixed/string/bytes/map/timestamp ç‰ï¼‰
- **ProtoReader.cs**(约 308 行):Protobuf 二进制解ç 器
- **GrpcModels.cs**(约 520 行):gRPC æ¶ˆæ¯æ¨¡åž‹ï¼ˆResource/Endpoints/Message ç‰ï¼‰
- **GrpcServiceMessages.cs**(约 883 行):25+ Request/Response 消æ¯ç±»åž‹
- **GrpcEnums.cs**(约 213 行):GrpcCode/GrpcMessageType/GrpcClientType ç‰æžšä¸¾
- **GrpcMessagingService.cs**(约 417 行):11 个 RPC 方法实现
**云厂商适é…**:
- **ICloudProvider.cs**:统一云厂商接å£ï¼ˆ7 个方法/属性)
- **AliyunProvider.cs**:阿里云适é…(实例 ID å‰ç¼€ + HTTP NameServer å‘现)
- **AclProvider.cs**:Apache ACL 适é…(HMAC-SHA1 ç¾å)
- **HuaweiProvider.cs**:åŽä¸ºäº‘适é…(SSL/TLS + 实例 ID 路由)
- **TencentProvider.cs**:腾讯云适é…(Namespace å‰ç¼€è·¯ç”±ï¼‰
**测试覆盖**(30+ æµ‹è¯•ç±»ï¼Œç›®æ ‡æ¡†æž¶ net10):
- **æ ¸å¿ƒåŠŸèƒ½**:ProducerTestsã€ConsumerTestsã€CommandTestsã€MessageTestsã€NameClientTests
- **高级特性**:TransactionCheckTestsã€BatchMessageTestsã€RetryTestsã€OrderConsumeTestsã€PopConsumeTests
- **å议兼容**:IPv6Testsã€MessageId5xTestsã€MQVersionTestsã€ProtoTests
- **云厂商**:AliyunTestsã€AliyunIssuesTestsã€CloudProviderTests
- **性能优化**:CompressionTestsã€ConcurrentConsumeTestsã€VipChannelTests
- **管ç†åŠŸèƒ½**:ManagementTestsã€ConsumeStatsTestsã€QueryMessageTests
- **扩展功能**:MultiTopicTestsã€RequestReplyTestsã€MessageTraceTestsã€SQL92FilterTests
---
## 三ã€RocketMQ å„主è¦ç‰ˆæœ¬åŠŸèƒ½çŸ©é˜µ
### 3.1 RocketMQ 4.x(4.0 ~ 4.9)— ç»å…¸ Remoting åè®®
| 功能 | 版本引入 | NewLife æ”¯æŒ | 备注 |
|------|---------|:----------:|------|
| **普通消æ¯å‘é€ï¼ˆåŒæ¥ï¼‰** | 4.0 | ✅ | `Publish()` / `SEND_MESSAGE_V2` |
| **普通消æ¯å‘é€ï¼ˆå¼‚æ¥ï¼‰** | 4.0 | ✅ | `PublishAsync()` |
| **å•å‘å‘é€ï¼ˆOneway)** | 4.0 | ✅ | `PublishOneway()` |
| **Pull æ¨¡å¼æ¶ˆè´¹** | 4.0 | ✅ | `Pull()` / `PULL_MESSAGE` |
| **集群消费模å¼** | 4.0 | ✅ | `MessageModels.Clustering` |
| **å¹¿æ’æ¶ˆè´¹æ¨¡å¼** | 4.0 | ✅ | `MessageModels.Broadcasting` |
| **消费者负载å‡è¡¡ï¼ˆå¹³å‡åˆ†é…)** | 4.0 | ✅ | `Rebalance()` å¹³å‡åˆ†é…算法 |
| **Tag 过滤** | 4.0 | ✅ | `Tags` / `Subscription` |
| **延迟消æ¯ï¼ˆ18级定时)** | 4.0 | ✅ | `PublishDelay()` / `DelayTimeLevels` |
| **事务消æ¯ï¼ˆåŠæ¶ˆæ¯ï¼‰** | 4.3 | ✅ | `PublishTransaction()` / `EndTransaction()` |
| **é¡ºåºæ¶ˆæ¯** | 4.0 | ✅ | `Publish(message, queue)` 指定队列 |
| **消æ¯è½¨è¿¹** | 4.4 | ✅ | `AsyncTraceDispatcher` / `MessageTraceHook` |
| **消费者å移管ç†** | 4.0 | ✅ | `QueryOffset` / `UpdateOffset` / `QueryMaxOffset` / `QueryMinOffset` |
| **心跳机制** | 4.0 | ✅ | `BrokerClient.Ping()` |
| **Topic 创建/更新** | 4.0 | ✅ | `CreateTopic()` |
| **æ¶ˆè´¹ç»„ä¿¡æ¯æŸ¥è¯¢** | 4.0 | ✅ | `GetConsumers()` |
| **Request-Reply 模å¼** | 4.6 | ✅ | `Request()` / `RequestAsync()` |
| **ACL æƒé™æŽ§åˆ¶** | 4.4 | ✅ | `AclOptions` / `AclProvider` HMAC-SHA1 ç¾å |
| **SQL92 过滤** | 4.1 | ✅ | `ExpressionType=SQL92` + Subscription 填写SQLè¡¨è¾¾å¼ |
| **æ‰¹é‡æ¶ˆæ¯å‘é€** | 4.5 | ✅ | `PublishBatch()` / `SEND_BATCH_MESSAGE (320)` |
| **消æ¯åŽ‹ç¼©ï¼ˆå‘é€ç«¯ï¼‰** | 4.0 | ✅ | `CompressOverBytes` 超过阈值自动ZLIB压缩 + SysFlagæ ‡è®° |
| **消æ¯å›žé€€ï¼ˆæ¶ˆè´¹å¤±è´¥ï¼‰** | 4.0 | ✅ | `SendMessageBack()` / `CONSUMER_SEND_MSG_BACK (36)` |
| **事务回查(被动回查)** | 4.3 | ✅ | `OnCheckTransaction` / `CHECK_TRANSACTION_STATE (39)` å›žè°ƒå¤„ç† |
| **按时间戳æœç´¢åç§»** | 4.0 | ✅ | `SearchOffset()` / `SEARCH_OFFSET_BY_TIMESTAMP` |
| **消费进度æŒä¹…化(本地)** | 4.0 | ✅ | `OffsetStorePath` å¹¿æ’æ¨¡å¼æœ¬åœ°JSON文件æŒä¹…化,集群模å¼èµ°Broker端å˜å‚¨ |
| **Push 模å¼ï¼ˆæœåŠ¡ç«¯æŽ¨é€ï¼‰** | 4.0 | âš ï¸ | 本质是长轮询 Pull 模拟,已实现但éžåŽŸç”Ÿ Push |
| **Pop 消费模å¼** | 4.9.3 | ✅ | `PopMessageAsync()` / `AckMessageAsync()` / `ChangeInvisibleTimeAsync()` |
| **多 Topic 订阅** | 4.0 | ✅ | `Topics` 属性支æŒå¤šä¸»é¢˜è®¢é˜…,Rebalance 按 Topic 分别分é…队列 |
| **消费é‡è¯•** | 4.0 | ✅ | `EnableRetry` + `MaxReconsumeTimes`,消费失败自动回退到RETRY Topic |
| **æ»ä¿¡é˜Ÿåˆ—(DLQ)** | 4.0 | ✅ | 超过最大é‡è¯•次数åŽè‡ªåŠ¨è¿›å…¥ `%DLQ%{ConsumerGroup}` 主题 |
| **æ¶ˆè´¹é™æµ/å¹¶å‘æŽ§åˆ¶** | 4.0 | ✅ | `MaxConcurrentConsume` ä¿¡å·é‡æŽ§åˆ¶æ‰€æœ‰é˜Ÿåˆ—çš„æ€»å¹¶å‘ |
| **VIP 通é“** | 4.0 | ✅ | `VipChannelEnabled` 属性,å¯ç”¨åŽä½¿ç”¨ BrokerPort - 2 çš„VIPç«¯å£ |
| **Broker 主从切æ¢** | 4.5 | ✅ | `BrokerInfo.MasterAddress`/`SlaveAddresses`,消费失败自动切æ¢ä»ŽèŠ‚ç‚¹è¯»å– |
### 3.2 RocketMQ 5.x(5.0+)— 新架构
RocketMQ 5.0 引入了全新的 **gRPC Proxy** æž¶æž„ï¼ŒåŒæ—¶ä¿æŒå¯¹ Remoting å议的å‘åŽå…¼å®¹ã€‚
| 功能 | 版本引入 | NewLife æ”¯æŒ | 备注 |
|------|---------|:----------:|------|
| **gRPC Proxy åè®®** | 5.0 | ✅ | å†…ç½®è½»é‡ Protobuf ç¼–è§£ç + HTTP/2 gRPC 客户端,通过 `GrpcProxyAddress` 属性å¯ç”¨ï¼ˆnetstandard2.1+) |
| **Remoting å议兼容** | 5.0 | ✅ | 5.x Broker ä¿ç•™äº† Remoting 接å£ï¼Œå½“å‰å®¢æˆ·ç«¯å¯è¿žæŽ¥ |
| **ä»»æ„æ—¶é—´å»¶è¿Ÿæ¶ˆæ¯** | 5.0 | ✅ | `PublishDelayViaGrpcAsync()` 通过 gRPC å议支æŒä»»æ„æ—¶é—´æˆ³å»¶è¿Ÿæ¶ˆæ¯ |
| **Pop 消费模å¼** | 5.0 | ✅ | `PopMessageAsync()` / `AckMessageAsync()` / `ChangeInvisibleTimeAsync()` |
| **客户端主动上报资æº** | 5.0 | ✅ | `TelemetryViaGrpcAsync()` 通过 gRPC å议上报客户端设置/订阅ç‰èµ„æºä¿¡æ¯ï¼ˆnetstandard2.1+) |
| **消æ¯åˆ†ç»„(FIFO)** | 5.0 | âš ï¸ | Remoting 模å¼ä¸‹ç‰ä»·äºŽé¡ºåºæ¶ˆæ¯ |
| **æœåŠ¡ç«¯ Rebalance** | 5.0 | ⌠| 需 Broker 5.0 + gRPC åè®®é…åˆ |
| **MessageId æ–°æ ¼å¼** | 5.0 | ✅ | `MessageExt.CreateMessageId5x()` / `TryParseMessageId5x()` / `IsMessageId5x()` ç¼–è§£ç 5.xæ ¼å¼ |
| **Controller 模å¼** | 5.0 | âš ï¸ | 替代 DLedger 的高å¯ç”¨æ–¹æ¡ˆï¼Œå®¢æˆ·ç«¯æ— 感知 |
| **Compaction Topic** | 5.1 | ⌠| KV è¯ä¹‰ Topic |
| **Timer 消æ¯ï¼ˆ5.0 原生)** | 5.0 | ✅ | Remoting 模å¼ä¸æ”¯æŒã€‚gRPC 模å¼é€šè¿‡ `PublishDelayViaGrpcAsync()` 实现 |
### 3.3 版本兼容性总结
| æœåŠ¡ç«¯ç‰ˆæœ¬ | 连接能力 | æ ¸å¿ƒåŠŸèƒ½ | 已知问题 |
|-----------|:------:|:------:|---------|
| **4.0 ~ 4.3** | ✅ | ✅ | æ— äº‹åŠ¡æ¶ˆæ¯æ”¯æŒ |
| **4.4 ~ 4.9** | ✅ | ✅ | 完全兼容,主力测试版本 |
| **5.0 ~ 5.x(Remoting模å¼ï¼‰** | ✅ | ✅ | 新特性(Timer消æ¯ï¼‰ä¸å¯ç”¨ |
| **5.0 ~ 5.x(gRPC Proxy模å¼ï¼‰** | ✅ | ✅ | 通过 `GrpcProxyAddress` 属性å¯ç”¨ï¼Œéœ€ netstandard2.1+ |
---
## å››ã€é˜¿é‡Œäº‘ RocketMQ 专有版本兼容性
### 4.1 阿里云 RocketMQ 4.x 实例
| 功能 | 支æŒçŠ¶æ€ | å®žçŽ°æ–¹å¼ |
|------|:------:|---------|
| **实例ID路由** | ✅ | `AliyunProvider.TransformTopic/TransformGroup`:`{InstanceId}%{Topic}` / `{InstanceId}%{Group}` |
| **åç§°æœåС噍å‘现** | ✅ | `AliyunProvider.GetNameServerAddress()`:HTTP 接å£èŽ·å– NameServer åœ°å€ |
| **AccessKey/SecretKey ç¾å** | ✅ | `ClusterClient.SetSignature()` HMAC-SHA1 ç¾å |
| **OnsChannel æ ‡è¯†** | ✅ | 默认 `ALIYUN` |
| **实例ID自动解æž** | ✅ | 从 NameServer 地å€ä¸æå– `MQ_INST_` å‰ç¼€ |
| **公网版消费者状æ€** | âš ï¸ | 阿里云公网版返回的 `brokerName` 与 NameServer 路由ä¸çš„ä¸ä¸€è‡´ï¼Œéœ€ç‰¹æ®Šå¤„ç†ï¼ˆå·²æœ‰å®¹é”™ä»£ç ) |
| **消费者状æ€JSON** | ✅ | `ConsumerStatesSpecialJsonHandler` 处ç†é˜¿é‡Œäº‘特殊JSONæ ¼å¼ |
| **铂金版/独享实例** | âš ï¸ | ç†è®ºå…¼å®¹ï¼Œä½†æœªæœ‰å……分测试 |
| **多 Tag 过滤** | ✅ | 通过 `Tags` å±žæ€§æ”¯æŒ |
### 4.2 阿里云 RocketMQ 5.x 实例
| 功能 | 支æŒçŠ¶æ€ | 备注 |
|------|:------:|------|
| **Remoting å议兼容** | âš ï¸ | 阿里云 5.x 实例ä¿ç•™ Remoting 接å£ï¼Œç†è®ºä¸Šå¯è¿žæŽ¥ |
| **gRPC Proxy 模å¼** | âš ï¸ | 阿里云推è使用 gRPC æ–¹å¼æŽ¥å…¥ï¼Œå®¢æˆ·ç«¯å·²æ”¯æŒ gRPC å议,待验è¯å…¼å®¹æ€§ |
| **实例级别认è¯** | âš ï¸ | 5.x å¯èƒ½ä½¿ç”¨æ–°çš„认è¯ä½“ç³» |
| **Serverless 实例** | ⌠| ä»…æ”¯æŒ gRPC æŽ¥å…¥ï¼Œå¾…éªŒè¯ |
### 4.3 当å‰é˜¿é‡Œäº‘适é…的已知问题
1. **公网版 BrokerName ä¸åŒ¹é…**:阿里云公网版 RocketMQ 返回的消费者状æ€ä¸ `brokerName` 是真实 Broker å称,而 NameServer 路由返回的是网关å称,导致 `InitOffsetAsync` ä¸å移匹é…失败。当å‰ä»£ç 用 `?? new OffsetWrapperModel()` 容错处ç†ã€‚
2. **实例ID自动识别é™åˆ¶**:仅支æŒä»Ž NameServer 地å€ä¸æå– `MQ_INST_` å‰ç¼€ï¼Œæ— 法处ç†è‡ªå®šä¹‰å®žä¾‹ID。
---
## 五ã€åŽä¸ºäº‘ RocketMQ 专有版本兼容性
### 5.1 åŽä¸ºäº‘ DMS for RocketMQ
åŽä¸ºäº‘çš„åˆ†å¸ƒå¼æ¶ˆæ¯æœåŠ¡ï¼ˆDMS)æä¾› RocketMQ 兼容实例。
| 功能 | 支æŒçŠ¶æ€ | 备注 |
|------|:------:|------|
| **æ ‡å‡† Remoting åè®®** | âš ï¸ | åŽä¸ºäº‘ DMS 4.x 兼容实例ç†è®ºä¸Šå¯è¿žæŽ¥ï¼Œå¾…ç”Ÿäº§çŽ¯å¢ƒéªŒè¯ |
| **SSL/TLS åŠ å¯†** | ✅ | å®¢æˆ·ç«¯æ”¯æŒ `SslProtocol` å’Œ `Certificate` é…置,HuaweiProvider å·²æ”¯æŒ `EnableSsl` 属性 |
| **SASL 认è¯** | âš ï¸ | åŽä¸ºäº‘å¯èƒ½ä½¿ç”¨ SASL 认è¯ï¼ŒPhase 6 æ ‡è®°ä¸ºå¾…éœ€æ±‚æ˜Žç¡®åŽè¡¥å……的扩展功能 |
| **ACL 认è¯** | âš ï¸ | åŽä¸ºäº‘ DMS 使用自己的 AccessKey ä½“ç³»ï¼Œéœ€éªŒè¯æ˜¯å¦ä¸Ž Apache ACL 兼容 |
| **åŽä¸ºäº‘ 5.x 实例** | âš ï¸ | 客户端已具备 gRPC å议和 HuaweiProvider 适é…器,ç†è®ºä¸Šå¯é€šè¿‡ gRPC æ¨¡å¼æŽ¥å…¥ï¼Œå¾…å®žé™…çŽ¯å¢ƒéªŒè¯ï¼ˆPhase 6 æ ‡è®°ä¸ºå¾…éªŒè¯åŠŸèƒ½ï¼‰ |
### 5.2 åŽä¸ºäº‘适é…
当å‰å®¢æˆ·ç«¯å·²æä¾›åŽä¸ºäº‘专用适é…器 `HuaweiProvider`(`ICloudProvider` 实现):
**使用方å¼**:
```csharp
var producer = new Producer
{
Topic = "test",
NameServerAddress = "åŽä¸ºäº‘DMS实例地å€:9876",
CloudProvider = new HuaweiProvider
{
AccessKey = "ä½ çš„AK",
SecretKey = "ä½ çš„SK",
InstanceId = "实例ID",
EnableSsl = true,
}
};
```
**待验è¯äº‹é¡¹**:
1. åŽä¸ºäº‘ DMS çš„ ACL ç¾å是å¦ä¸Ž Apache ACL 完全一致
2. SSL/TLS è¯ä¹¦éªŒè¯æ˜¯å¦éœ€è¦ç‰¹æ®Šå¤„ç†
3. NameServer 地å€å‘现机制
---
## å…ã€è…¾è®¯äº‘ TDMQ RocketMQ 版兼容性
### 6.1 腾讯云 TDMQ for RocketMQ
腾讯云 TDMQ(Tencent Distributed Message Queue)æä¾› RocketMQ 兼容版本,基于 Apache RocketMQ 4.x å议。
| 功能 | 支æŒçŠ¶æ€ | 备注 |
|------|:------:|------|
| **æ ‡å‡† Remoting åè®®** | âš ï¸ | 腾讯云 TDMQ 4.x 兼容实例,ç†è®ºä¸Šå¯é€šè¿‡ Remoting å议连接 |
| **SSL/TLS åŠ å¯†** | ✅ | å®¢æˆ·ç«¯æ”¯æŒ `SslProtocol` å’Œ `Certificate` é…ç½® |
| **HMAC-SHA1 ç¾å认è¯** | âš ï¸ | 腾讯云使用类似 HMAC-SHA1 çš„ç¾åæ–¹å¼ï¼Œéœ€éªŒè¯ä¸Ž Apache ACL 兼容性 |
| **VPC 内网访问** | ✅ | 直接é…ç½® NameServer 为 VPC 内网地å€å³å¯ |
| **延迟消æ¯** | ✅ | æ ‡å‡† 18 级延迟消æ¯ï¼Œä¸Ž Apache 4.x 一致 |
| **事务消æ¯** | âš ï¸ | 需验è¯è…¾è®¯äº‘å¯¹åŠæ¶ˆæ¯å’Œå›žæŸ¥çš„å¤„ç† |
### 6.2 腾讯云适é…
当å‰å®¢æˆ·ç«¯å·²æä¾›è…¾è®¯äº‘专用适é…器 `TencentProvider`(`ICloudProvider` å®žçŽ°ï¼‰ï¼Œæ”¯æŒ Namespace å‰ç¼€è·¯ç”±ï¼š
**使用方å¼**:
```csharp
var producer = new Producer
{
Topic = "test",
NameServerAddress = "腾讯云TDMQ实例地å€:9876",
CloudProvider = new TencentProvider
{
AccessKey = "腾讯云SecretId",
SecretKey = "腾讯云SecretKey",
Namespace = "命å空间",
}
};
```
---
## 七ã€å·²å®žçްå议详细分æž
### 7.1 已实现的 RequestCode(æœåŠ¡ç«¯è¯·æ±‚ç )
| RequestCode | 值 | 用途 | 调用ä½ç½® |
|-------------|:--:|------|---------|
| `SEND_MESSAGE` | 10 | 呿¶ˆæ¯ï¼ˆV1,已ä¸ä½¿ç”¨ï¼‰ | 仅定义 |
| `PULL_MESSAGE` | 11 | æ‹‰å–æ¶ˆæ¯ | `Consumer.Pull()` |
| `QUERY_MESSAGE` | 12 | æŸ¥è¯¢æ¶ˆæ¯ | `MqBase.QueryMessageByKey()` |
| `QUERY_CONSUMER_OFFSET` | 14 | 查询消费åç§» | `Consumer.QueryOffset()` |
| `UPDATE_CONSUMER_OFFSET` | 15 | 更新消费åç§» | `Consumer.UpdateOffset()` |
| `UPDATE_AND_CREATE_TOPIC` | 17 | 创建/更新Topic | `MqBase.CreateTopic()` |
| `GET_BROKER_RUNTIME_INFO` | 28 | Brokerè¿è¡Œä¿¡æ¯ | `BrokerClient.GetRuntimeInfo()` |
| `SEARCH_OFFSET_BY_TIMESTAMP` | 29 | 按时间戳æœç´¢åç§» | `Consumer.SearchOffset()` |
| `GET_MAX_OFFSET` | 30 | 最大åç§»é‡ | `Consumer.QueryMaxOffset()` |
| `GET_MIN_OFFSET` | 31 | 最å°åç§»é‡ | `Consumer.QueryMinOffset()` |
| `VIEW_MESSAGE_BY_ID` | 33 | 按IDæŸ¥çœ‹æ¶ˆæ¯ | `MqBase.ViewMessage()` |
| `HEART_BEAT` | 34 | 心跳 | `BrokerClient.Ping()` |
| `UNREGISTER_CLIENT` | 35 | 注销客户端 | `BrokerClient.UnRegisterClient()` |
| `CONSUMER_SEND_MSG_BACK` | 36 | 消费失败回退 | `Consumer.SendMessageBackAsync()` |
| `END_TRANSACTION` | 37 | 结æŸäº‹åŠ¡ | `Producer.EndTransaction()` |
| `GET_CONSUMER_LIST_BY_GROUP` | 38 | 消费者列表 | `Consumer.GetConsumers()` |
| `CHECK_TRANSACTION_STATE` | 39 | 事务回查 | 被动处ç†ï¼š`OnCheckTransaction` 回调 |
| `NOTIFY_CONSUMER_IDS_CHANGED` | 40 | æ¶ˆè´¹è€…å˜æ›´é€šçŸ¥ | 触å‘é‡å¹³è¡¡ |
| `LOCK_BATCH_MQ` | 41 | é”å®šé˜Ÿåˆ—ï¼ˆé¡ºåºæ¶ˆè´¹ï¼‰ | `Consumer.LockBatchMQAsync()` |
| `UNLOCK_BATCH_MQ` | 42 | è§£é”队列 | `Consumer.UnlockBatchMQAsync()` |
| `GET_ROUTEINTO_BY_TOPIC` | 105 | Topic路由 | `NameClient.GetRouteInfo()` |
| `GET_BROKER_CLUSTER_INFO` | 106 | é›†ç¾¤ä¿¡æ¯ | `MqBase.GetClusterInfo()` |
| `UPDATE_AND_CREATE_SUBSCRIPTIONGROUP` | 200 | 创建/更新消费组 | `MqBase.CreateSubscriptionGroup()` |
| `GET_CONSUMER_CONNECTION_LIST` | 203 | 消费者连接列表 | `Consumer.GetConsumerConnectionList()` |
| `DELETE_SUBSCRIPTIONGROUP` | 207 | åˆ é™¤æ¶ˆè´¹ç»„ | `MqBase.DeleteSubscriptionGroup()` |
| `GET_CONSUME_STATS` | 208 | æ¶ˆè´¹çŠ¶æ€ | `Consumer.InitOffsetAsync()` |
| `DELETE_TOPIC_IN_BROKER` | 215 | 在Brokerä¸åˆ 除Topic | `MqBase.DeleteTopic()` |
| `DELETE_TOPIC_IN_NAMESRV` | 216 | 在NameServerä¸åˆ 除Topic | `MqBase.DeleteTopic()` |
| `GET_CONSUMER_RUNNING_INFO` | 307 | 消费者è¿è¡Œä¿¡æ¯ | è¢«åŠ¨å¤„ç† |
| `INVOKE_BROKER_TO_RESET_OFFSET` | 222 | é‡ç½®æ¶ˆè´¹åç§» | `Consumer.ResetConsumerOffset()` |
| `SEND_MESSAGE_V2` | 310 | 呿¶ˆæ¯V2 | `Producer.Publish()` / `PublishAsync()` |
| `SEND_BATCH_MESSAGE` | 320 | 批é‡å‘é€ | `Producer.PublishBatch()` |
| `SEND_REPLY_MESSAGE_V2` | 325 | å›žå¤æ¶ˆæ¯ | `Consumer.SendReplyAsync()` |
| `POP_MESSAGE` | 200050 | Pop消费 | `Consumer.PopMessageAsync()` |
| `ACK_MESSAGE` | 200051 | Pop消æ¯ç¡®è®¤ | `Consumer.AckMessageAsync()` |
| `CHANGE_MESSAGE_INVISIBLETIME` | 200052 | 修改ä¸å¯è§æ—¶é—´ | `Consumer.ChangeInvisibleTimeAsync()` |
| `BATCH_ACK_MESSAGE` | 200151 | 批é‡Pop确认 | `Consumer.BatchAckMessageAsync()` |
### 7.2 消æ¯ç¼–è§£ç 分æž
**消æ¯å‘é€ï¼ˆMessageExt.Write)**:仅返回 `true`,ä¸å®žé™…å†™å…¥ï¼Œå› ä¸ºå‘é€èµ°çš„æ˜¯ `Command` å°è£…。
**æ¶ˆæ¯æŽ¥æ”¶ï¼ˆMessageExt.Read/ReadAll/DecodeBatch)**:
- ✅ æ”¯æŒæ ‡å‡†çš„ 4.x 消æ¯äºŒè¿›åˆ¶æ ¼å¼
- ✅ æ”¯æŒ ZLIB 解压缩(SysFlag 第0ä½ï¼‰
- ✅ æ”¯æŒ IPv4 地å€è§£æž
- ✅ æ”¯æŒ IPv6 地å€è§£æžï¼ˆSysFlag 第2使 ‡è¯†ï¼Œè‡ªåЍ适é…4å—节/16å—节IP)
- ✅ æ”¯æŒæ‰¹é‡æ¶ˆæ¯è§£ç (`DecodeBatch()` 方法解ç SysFlag 第4使 ‡è¯†çš„ BatchMessage Body)
### 7.3 åºåˆ—åŒ–æ ¼å¼
| æ ¼å¼ | æ”¯æŒ | 说明 |
|------|:---:|------|
| JSON | ✅ | é»˜è®¤æ ¼å¼ï¼Œä½¿ç”¨ `NewLife.Serialization.JsonHelper` |
| ROCKETMQ(二进制) | ✅ | 命令头部支æŒäºŒè¿›åˆ¶è§£æž |
### 7.4 gRPC æœåŠ¡æ–¹æ³•ï¼ˆRocketMQ 5.x Proxy)
| æœåŠ¡æ–¹æ³• | 实现类方法 | 说明 |
|---------|----------|------|
| `QueryRoute` | `GrpcMessagingService.QueryRouteAsync()` | æŸ¥è¯¢ä¸»é¢˜è·¯ç”±ä¿¡æ¯ |
| `SendMessage` | `GrpcMessagingService.SendMessageAsync()` | å‘逿¶ˆæ¯ï¼ˆæ™®é€š/延迟/FIFO) |
| `SendMessage (Transaction)` | `GrpcMessagingService.SendTransactionMessageAsync()` | å‘é€äº‹åŠ¡æ¶ˆæ¯ï¼ˆåŠæ¶ˆæ¯ï¼‰ |
| `QueryAssignment` | `GrpcMessagingService.QueryAssignmentAsync()` | æŸ¥è¯¢é˜Ÿåˆ—åˆ†é… |
| `ReceiveMessage` | `GrpcMessagingService.ReceiveMessageAsync()` | 接收消æ¯ï¼ˆServer Streaming) |
| `AckMessage` | `GrpcMessagingService.AckMessageAsync()` | ç¡®è®¤æ¶ˆæ¯æ¶ˆè´¹ |
| `Heartbeat` | `GrpcMessagingService.HeartbeatAsync()` | 心跳 |
| `EndTransaction` | `GrpcMessagingService.EndTransactionAsync()` | 结æŸäº‹åŠ¡ï¼ˆæäº¤/回滚) |
| `ForwardToDeadLetterQueue` | `GrpcMessagingService.ForwardToDeadLetterQueueAsync()` | 转å‘到æ»ä¿¡é˜Ÿåˆ— |
| `ChangeInvisibleDuration` | `GrpcMessagingService.ChangeInvisibleDurationAsync()` | 修改ä¸å¯è§æ—¶é—´ |
| `NotifyClientTermination` | `GrpcMessagingService.NotifyClientTerminationAsync()` | é€šçŸ¥å®¢æˆ·ç«¯ç»ˆæ¢ |
| `Telemetry` | `GrpcMessagingService.TelemetryAsync()` | 客户端资æºä¸ŠæŠ¥ï¼ˆè®¾ç½®/订阅ç‰ï¼‰ |
---
## å…«ã€åŠŸèƒ½å®Œå–„åº¦è¯„ä¼°
### 8.1 生产者功能(Producer)
| 功能 | çŠ¶æ€ | 详细说明 |
|------|:----:|---------|
| åŒæ¥å‘é€ | ✅完整 | 支æŒé‡è¯•(`RetryTimesWhenSendFailed`)ã€è¶…æ—¶ã€è´Ÿè½½å‡è¡¡ï¼ˆ`ILoadBalance`) |
| 异æ¥å‘é€ | ✅完整 | 异æ¥ç‰ˆæœ¬ï¼Œæ”¯æŒ CancellationToken。gRPC 模å¼è‡ªåŠ¨è·¯ç”± |
| å•å‘å‘é€ | ✅完整 | ä¸ç‰å¾…结果 |
| å»¶è¿Ÿæ¶ˆæ¯ | ✅完整 | 18çº§å®šæ—¶æ¶ˆæ¯ |
| ä»»æ„æ—¶é—´å»¶è¿Ÿ | ✅完整 | gRPC 模å¼ä¸‹ `PublishDelayViaGrpcAsync()` 支æŒä»»æ„时间戳延迟 |
| äº‹åŠ¡æ¶ˆæ¯ | ✅完整 | 支æŒå‘é€åŠæ¶ˆæ¯ã€æäº¤/回滚,支æŒè¢«åŠ¨å›žæŸ¥å›žè°ƒ `OnCheckTransaction`ï¼ˆåŒæ¥/异æ¥å§”托) |
| é¡ºåºæ¶ˆæ¯ | ✅完整 | 通过指定 `MessageQueue` 傿•°å®žçް |
| Request-Reply | ✅完整 | åŒæ¥ `Request()` / å¼‚æ¥ `RequestAsync()`ï¼Œå†…ç½®å›žå¤æ¶ˆè´¹è€… |
| 批é‡å‘é€ | ✅完整 | `PublishBatch()` æ”¯æŒ `List<Message>` 或 `List<String>` 批é‡å‘é€ |
| 消æ¯åŽ‹ç¼© | ✅完整 | å‘é€ç«¯è¶…过 `CompressOverBytes` 阈值自动ZLIB压缩 |
| å‘é€ç«¯é’©å | ✅完整 | `ISendMessageHook` å‰åŽæ‹¦æˆª |
| 消æ¯è½¨è¿¹ | ✅完整 | `AsyncTraceDispatcher` 异æ¥è½¨è¿¹åˆ†å‘ |
| gRPC 事务 | ✅完整 | `PublishTransactionViaGrpcAsync()` / `EndTransactionViaGrpcAsync()` |
| gRPC 路由查询 | ✅完整 | `QueryRouteViaGrpcAsync()` |
### 8.2 消费者功能(Consumer)
| 功能 | çŠ¶æ€ | 详细说明 |
|------|:----:|---------|
| Pull æ¨¡å¼ | ✅完整 | é•¿è½®è¯¢æ‹‰å– |
| 消费调度 | ✅完整 | 自动分é…队列并å¯åŠ¨æ¶ˆè´¹çº¿ç¨‹ |
| 集群消费 | ✅完整 | å¹³å‡åˆ†é… Rebalance 算法 |
| å¹¿æ’æ¶ˆè´¹ | ✅完整 | `OffsetStorePath` 本地JSON文件æŒä¹…化åç§» |
| Tag 过滤 | ✅完整 | æ”¯æŒ Tag è¡¨è¾¾å¼ |
| SQL92 过滤 | ✅完整 | `ExpressionType="SQL92"` + SQLè¡¨è¾¾å¼ |
| åç§»ç®¡ç† | ✅完整 | `QueryOffset` / `UpdateOffset` / `QueryMaxOffset` / `QueryMinOffset` / `SearchOffset` |
| 消费者信æ¯ä¸ŠæŠ¥ | ✅完整 | `GetConsumerRunningInfo` |
| æ¶ˆè´¹è€…å˜æ›´é€šçŸ¥ | ✅完整 | `NOTIFY_CONSUMER_IDS_CHANGED` 触å‘é‡å¹³è¡¡ |
| 消费é‡è¯• | ✅完整 | `EnableRetry` + `MaxReconsumeTimes`,自动回退到RETRY Topic |
| æ»ä¿¡é˜Ÿåˆ— | ✅完整 | 超过最大é‡è¯•次数åŽè‡ªåŠ¨è¿›å…¥ `%DLQ%` Topic |
| 消费回退 | ✅完整 | `SendMessageBackAsync()` / `CONSUMER_SEND_MSG_BACK` |
| é¡ºåºæ¶ˆè´¹ï¼ˆé”定) | ✅完整 | `LockBatchMQAsync()` / `UnlockBatchMQAsync()` / `OrderConsume` 属性 |
| 多Topic订阅 | ✅完整 | `Topics` 属性支æŒå¤šä¸»é¢˜è®¢é˜…,Rebalance 按 Topic åˆ†åˆ«èŽ·å– Broker 构建队列 |
| Pop 消费 | ✅完整 | `PopMessageAsync()` / `AckMessageAsync()` / `ChangeInvisibleTimeAsync()` |
| 按时间戳消费 | ✅完整 | `SearchOffset()` / `SEARCH_OFFSET_BY_TIMESTAMP` |
| æ¶ˆè´¹é™æµ | ✅完整 | `MaxConcurrentConsume` ä¿¡å·é‡æŽ§åˆ¶æ€»å¹¶å‘ |
| 消费端钩å | ✅完整 | `IConsumeMessageHook` å‰åŽæ‹¦æˆª |
| Request-Reply å›žå¤ | ✅完整 | `SendReply()` / `SendReplyAsync()` |
| gRPC æŽ¥æ”¶æ¶ˆæ¯ | ✅完整 | `ReceiveMessageViaGrpcAsync()` Server Streaming |
| gRPC ç¡®è®¤æ¶ˆæ¯ | ✅完整 | `AckMessageViaGrpcAsync()` |
| gRPC é˜Ÿåˆ—åˆ†é… | ✅完整 | `QueryAssignmentViaGrpcAsync()` |
| gRPC ä¸å¯è§æ—¶é—´ | ✅完整 | `ChangeInvisibleDurationViaGrpcAsync()` |
| gRPC 心跳 | ✅完整 | `HeartbeatViaGrpcAsync()` |
### 8.3 管ç†åŠŸèƒ½
| 功能 | çŠ¶æ€ | 详细说明 |
|------|:----:|---------|
| Topic 创建/更新 | ✅完整 | 在所有 Broker 上创建 |
| Topic åˆ é™¤ | ✅完整 | `DeleteTopic()` åŒæ—¶ä»Ž Broker å’Œ NameServer åˆ é™¤ |
| 消费组创建/更新 | ✅完整 | `CreateSubscriptionGroup()` |
| æ¶ˆè´¹ç»„åˆ é™¤ | ✅完整 | `DeleteSubscriptionGroup()` |
| Broker è¿è¡Œä¿¡æ¯ | ✅完整 | `GetRuntimeInfo()` |
| 消费组查询 | ✅完整 | `GetConsumers()` |
| 消费统计 | ✅完整 | `GetConsumeStats()` èŽ·å–æ¶ˆè´¹ç»„å®Œæ•´ç»Ÿè®¡ä¿¡æ¯ + `GetTopicStatsInfo()` 获å–主题统计 |
| æ¶ˆæ¯æŸ¥è¯¢ï¼ˆæŒ‰ID) | ✅完整 | `ViewMessage()` / `VIEW_MESSAGE_BY_ID` |
| æ¶ˆæ¯æŸ¥è¯¢ï¼ˆæŒ‰Key) | ✅完整 | `QueryMessageByKey()` / `QUERY_MESSAGE` |
| é›†ç¾¤ä¿¡æ¯æŸ¥è¯¢ | ✅完整 | `GetClusterInfo()` / `GET_BROKER_CLUSTER_INFO` |
| 消费者连接列表 | ✅完整 | `GetConsumerConnectionList()` / `GET_CONSUMER_CONNECTION_LIST` |
| åç§»é‡ç½® | ✅完整 | `ResetConsumerOffset()` / `INVOKE_BROKER_TO_RESET_OFFSET` |
---
## ä¹ã€å„家专有版本对比
### 9.1 功能支æŒå¯¹æ¯”表
| 功能 | Apache 4.x | Apache 5.x | 阿里云 4.x | 阿里云 5.x | åŽä¸ºäº‘ DMS | 腾讯云 TDMQ |
|------|:----------:|:----------:|:---------:|:---------:|:---------:|:---------:|
| Remoting连接 | ✅ | ✅ | ✅ | âš ï¸ | âš ï¸ | âš ï¸ |
| gRPC连接 | N/A | ✅ | N/A | âš ï¸ | âš ï¸ | âš ï¸ |
| ç¾åè®¤è¯ | ✅ACL | ✅ACL | ✅阿里ç¾å | âš ï¸ | ✅åŽä¸ºäº‘é€‚é… | âš ï¸å¾…éªŒè¯ |
| 实例ID路由 | N/A | N/A | ✅ | ✅ | å¾…éªŒè¯ | ✅Namespace |
| NameServerå‘现 | 直连 | 直连 | ✅HTTP | å¾…éªŒè¯ | å¾…éªŒè¯ | VPC直连 |
| SSL/TLS | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
### 9.2 专有版本适é…ä»£ç æ¸…å•
**云厂商适é…器接å£**:
- `ICloudProvider.cs` — ç»Ÿä¸€äº‘åŽ‚å•†é€‚é…æŽ¥å£ï¼ˆ`Name`/`AccessKey`/`SecretKey`/`OnsChannel`/`TransformTopic`/`TransformGroup`/`GetNameServerAddress`)
**阿里云适é…**(已完æˆï¼‰ï¼š
- `AliyunProvider.cs` — 阿里云适é…器(`ICloudProvider` 实现,实例IDå‰ç¼€è·¯ç”± + HTTP NameServerå‘现)
- `AliyunOptions.cs` — æ—§ç‰ˆé˜¿é‡Œäº‘å‚æ•°ï¼ˆ`[Obsolete]`,自动桥接到 `AliyunProvider`)
- `ClusterClient.SetSignature()` — HMAC-SHA1 统一ç¾å
- `Consumer.ConsumerStatesSpecialJsonHandler()` — 阿里云特殊JSONè§£æž
**Apache ACL 适é…**(已完æˆï¼‰ï¼š
- `AclProvider.cs` — Apache ACL 适é…器(`ICloudProvider` 实现,ä¸è½¬æ¢Topic/Group)
- `AclOptions.cs` — 旧版ACL傿•°ï¼ˆ`[Obsolete]`,自动桥接到 `AclProvider`)
**åŽä¸ºäº‘适é…**(已完æˆï¼‰ï¼š
- `HuaweiProvider.cs` — åŽä¸ºäº‘适é…器(`ICloudProvider` å®žçŽ°ï¼Œæ”¯æŒ `EnableSsl` 属性)
**腾讯云适é…**(已完æˆï¼‰ï¼š
- `TencentProvider.cs` — 腾讯云 TDMQ 适é…器(`ICloudProvider` 实现,Namespace å‰ç¼€è·¯ç”±ï¼‰
---
## åã€ä¸Žå®˜æ–¹ Java 客户端功能差è·
以下是与 Apache RocketMQ 官方 Java 客户端 4.9.x 对比的主è¦å·®è·ï¼š
### 10.1 已消除的功能差è·
| 功能 | çŠ¶æ€ | 说明 |
|------|:----:|------|
| **消费é‡è¯•机制** | ✅已实现 | `EnableRetry` + `MaxReconsumeTimes`,消费失败åŽè‡ªåЍå‘é€åˆ° `%RETRY%{ConsumerGroup}` Topic |
| **事务回查回调** | ✅已实现 | Broker 主动调用 `CHECK_TRANSACTION_STATE`,客户端通过 `OnCheckTransaction`ï¼ˆåŒæ¥/异æ¥ï¼‰å›žåº” |
| **æ‰¹é‡æ¶ˆæ¯å‘é€** | ✅已实现 | `PublishBatch()`ï¼Œå°†å¤šæ¡æ¶ˆæ¯åˆå¹¶ä¸ºä¸€ä¸ªè¯·æ±‚å‘é€ |
| **æ‰¹é‡æ¶ˆæ¯è§£ç ** | ✅已实现 | `MessageExt.DecodeBatch()` è§£ç SysFlag 第4使 ‡è¯†çš„ BatchMessage Body |
| **消费回退(SendBack)** | ✅已实现 | `SendMessageBackAsync()` 消费失败时将消æ¯å›žé€€ç»™ Broker çš„ RETRY Topic |
| **é¡ºåºæ¶ˆè´¹é”定** | ✅已实现 | `LockBatchMQAsync()` / `UnlockBatchMQAsync()` / `OrderConsume` 属性 |
| **按时间戳æœç´¢åç§»** | ✅已实现 | `SearchOffset()` / `SEARCH_OFFSET_BY_TIMESTAMP` |
| **æ¶ˆæ¯æŸ¥è¯¢ï¼ˆæŒ‰ID)** | ✅已实现 | `ViewMessage()` / `VIEW_MESSAGE_BY_ID` |
| **æ¶ˆæ¯æŸ¥è¯¢ï¼ˆæŒ‰Key)** | ✅已实现 | `QueryMessageByKey()` / `QUERY_MESSAGE` |
| **IPv6 支æŒ** | ✅已实现 | SysFlag 第2ä½åˆ¤æ–IPv4/IPv6,自动适é…地å€é•¿åº¦å’ŒMsgIdæ ¼å¼ |
| **SQL92 过滤** | ✅已实现 | `ExpressionType="SQL92"` + SQLè¡¨è¾¾å¼ |
| **å‘é€ç«¯æ¶ˆæ¯åŽ‹ç¼©** | ✅已实现 | `CompressOverBytes` 超过阈值自动ZLIB压缩 |
| **多Topic订阅** | ✅已实现 | `Topics` 属性支æŒå¤šä¸»é¢˜è®¢é˜…,Rebalance 按 Topic åˆ†åˆ«èŽ·å– Broker 构建队列 |
| **å¹¿æ’æ¨¡å¼æœ¬åœ°åç§»** | ✅已实现 | `OffsetStorePath` 本地JSON文件æŒä¹…化 |
| **æ»ä¿¡é˜Ÿåˆ—** | ✅已实现 | 超过é‡è¯•次数åŽè‡ªåŠ¨è¿›å…¥ `%DLQ%` Topic |
| **æ¶ˆè´¹é™æµ** | ✅已实现 | `MaxConcurrentConsume` ä¿¡å·é‡æŽ§åˆ¶æ‰€æœ‰é˜Ÿåˆ—æ€»å¹¶å‘ |
| **Pop 消费模å¼** | ✅已实现 | `PopMessageAsync()` / `AckMessageAsync()` / `ChangeInvisibleTimeAsync()` |
| **消费组管ç†** | ✅已实现 | `CreateSubscriptionGroup()` / `DeleteSubscriptionGroup()` |
| **gRPC å议支æŒ** | ✅已实现 | 内置 Protobuf ç¼–è§£ç + HTTP/2 gRPC,通过 `GrpcProxyAddress` 属性å¯ç”¨ |
### 10.2 仿œ‰å·®è·çš„功能
| 功能 | é‡è¦æ€§ | 说明 |
|------|:------:|------|
| Compaction Topic | ★ | 5.x 新特性,KV è¯ä¹‰ Topic |
| æœåŠ¡ç«¯ Rebalance | ★★ | 5.x + gRPC åè®®é…åˆ |
### 10.3 本次新增消除的功能差è·
| 功能 | çŠ¶æ€ | 说明 |
|------|:----:|------|
| **VIP 通é“** | ✅已实现 | `VipChannelEnabled` 属性,å¯ç”¨åŽä½¿ç”¨ BrokerPort - 2 çš„VIPç«¯å£ |
| **批é‡ç¡®è®¤Pop消æ¯** | ✅已实现 | `BatchAckMessageAsync()` / `BATCH_ACK_MESSAGE (200151)` |
| **消费统计完整API** | ✅已实现 | `GetConsumeStats()` 完整消费统计 + `GetTopicStatsInfo()` 主题统计 |
| **消æ¯è¿‡æ»¤æœåŠ¡å™¨æ³¨å†Œ** | ✅已实现 | `RegisterFilterServer()` / `REGISTER_FILTER_SERVER (301)` |
| **5.x MessageId æ–°æ ¼å¼** | ✅已实现 | `CreateMessageId5x()` / `TryParseMessageId5x()` / `IsMessageId5x()` |
| **客户端资æºä¸ŠæŠ¥** | ✅已实现 | `TelemetryViaGrpcAsync()` 通过 gRPC Telemetry 上报客户端设置 |
---
## å一ã€å议层技术细节分æž
### 11.1 åè®®å¸§æ ¼å¼ï¼ˆRemoting,已æ£ç¡®å®žçŽ°ï¼‰
```
+--------+----------------+--------+---------+
| Length | HeaderLength | Header | Body |
| 4 bytes | 4 bytes | N bytes| M bytes |
+--------+----------------+--------+---------+
HeaderLength 高 8 ä½: SerializeType (0=JSON, 1=ROCKETMQ)
HeaderLength 低 24 ä½: 实际 Header 长度
Length = 4 + N + M
```
### 11.2 gRPC å¸§æ ¼å¼ï¼ˆHTTP/2,已æ£ç¡®å®žçŽ°ï¼‰
```
+-----+--------+---------+
| Comp| Length | Body |
| 1B | 4 bytes | N bytes |
+-----+--------+---------+
Comp: 0=ä¸åŽ‹ç¼©, 1=gzip
Length: 大端åºï¼ŒProtobuf 消æ¯ä½“长度
Body: Protobuf ç¼–ç 的消æ¯
```
### 11.3 已知å议兼容性风险
1. **版本声明**:默认 `V4_9_7`,`MQVersion` 枚举已扩展到 `V5_9_9` å’Œ `HIGHER_VERSION`(约450个版本值)。å¯é€šè¿‡ `Version` 属性自定义。
2. **Language æ ‡è¯†ä¸º DOTNET**:`Header.Language` 在 `OnBuild` ä¸è®¾ç½®ä¸º `"DOTNET"`。DOTNET ä¸åœ¨å®˜æ–¹ Java 枚举ä¸ï¼Œéƒ¨åˆ† Broker å¯èƒ½ä¸è¯†åˆ«ã€‚
3. **消æ¯å±žæ€§åˆ†éš”符**:使用 `\x01` å’Œ `\x02` 分隔键值对,与 Java 官方一致。
4. **IPv6 æ¶ˆæ¯æ ¼å¼**:RocketMQ 4.5+ æ”¯æŒ IPv6,消æ¯ä½“ä¸ IP å—æ®µä»Ž 4 å—节扩展到 16 å—节(由 SysFlag 第2使 ‡è¯†ï¼‰ï¼Œå·²æ”¯æŒè‡ªåŠ¨è¯†åˆ«å’Œè§£æžã€‚
5. **gRPC å议版本**:基于 `apache.rocketmq.v2` æœåŠ¡å®šä¹‰ï¼Œè·¯å¾„æ ¼å¼ `/apache.rocketmq.v2.MessagingService/{Method}`。
---
## åäºŒã€æ€»ç»“与建议
### 12.1 当å‰çжæ€è¯„ä¼°
NewLife.RocketMQ 作为纯托管客户端,已实现了 RocketMQ 4.x çš„**å…¨éƒ¨æ ¸å¿ƒåŠŸèƒ½**ä»¥åŠ 5.x çš„ **gRPC Proxy åè®®**,能够满足生产环境的å„类消æ¯åœºæ™¯ã€‚
**优势**:
- 纯 .NET å®žçŽ°ï¼Œæ— Java/gRPC/Protobuf 外部ä¾èµ–
- åŒæ—¶æ”¯æŒ Remoting å议(4.x)和 gRPC Proxy å议(5.x)
- 支æŒå¤šç›®æ ‡æ¡†æž¶ï¼ˆnet45+),gRPC 功能在 netstandard2.1+ å¯ç”¨
- 代ç 简æ´ï¼Œæ ¸å¿ƒè·¯å¾„清晰
- 已有阿里云 + Apache ACL + åŽä¸ºäº‘ + 腾讯云四ç§è®¤è¯é€‚é…
- 统一 `ICloudProvider` 云厂商适é…器接å£
- æ”¯æŒ SSL/TLS åŠ å¯†ä¼ è¾“
- 内置消æ¯è½¨è¿¹ + Request-Reply 模å¼
- 性能追踪(Tracer)集æˆ
- **消费é‡è¯• + æ»ä¿¡é˜Ÿåˆ—**完整机制
- **事务回查回调**支æŒï¼ˆåŒæ¥ + 异æ¥ä¸¤ç§å§”托)
- **æ‰¹é‡æ¶ˆæ¯å‘é€ + è§£ç **
- **消æ¯åŽ‹ç¼©**(å‘é€ç«¯ZLIB)
- **SQL92 过滤**
- **多 Topic 订阅**
- **é¡ºåºæ¶ˆè´¹é”定**
- **æ¶ˆè´¹é™æµï¼ˆä¿¡å·é‡å¹¶å‘控制)**
- **完整管ç†åŠŸèƒ½**(Topic/消费组 CRUDã€æ¶ˆæ¯æŸ¥è¯¢ã€é›†ç¾¤ä¿¡æ¯ã€åç§»é‡ç½®ã€æ¶ˆè´¹ç»Ÿè®¡ã€ä¸»é¢˜ç»Ÿè®¡ï¼‰
- **gRPC Proxy å议支æŒ**(路由/å‘é€/接收/确认/心跳/事务/延迟/æ»ä¿¡/ä¸å¯è§æ—¶é—´/终æ¢é€šçŸ¥/Telemetry)
- **VIP 通é“**支æŒï¼ˆBrokerPort - 2 优先级连接)
- **批é‡ç¡®è®¤Pop消æ¯**(BATCH_ACK_MESSAGE)
- **消æ¯è¿‡æ»¤æœåŠ¡å™¨æ³¨å†Œ**(REGISTER_FILTER_SERVER)
- **5.x MessageId ç¼–è§£ç **
- **客户端资æºä¸ŠæŠ¥**(gRPC Telemetry)
**已知é™åˆ¶**:
- **æœåŠ¡ç«¯ Rebalance**(5.x gRPCï¼‰ï¼šéœ€è¦ RocketMQ 5.0+ Broker 端完整实现,属于 Broker 端特性,Phase 6 è¯„ä¼°åŽæ ‡è®°ä¸ºæš‚ä¸å®žçް
- **Compaction Topic**(5.x KV è¯ä¹‰ï¼‰ï¼šRocketMQ 5.1+ 特有特性,需 Broker 端支æŒï¼Œä¸”属于å°ä¼—功能,Phase 6 è¯„ä¼°åŽæ ‡è®°ä¸ºæš‚ä¸å®žçް
- **Controller 模å¼**ï¼šå¯¹å®¢æˆ·ç«¯é€æ˜Žï¼Œæ— 需特殊实现
- **云厂商 5.x 环境**:阿里云 Serverless/åŽä¸ºäº‘ 5.x ç‰éœ€è¦å®žé™…环境验è¯ï¼Œå®¢æˆ·ç«¯å·²å…·å¤‡æŠ€æœ¯èƒ½åŠ›
- **gRPC åè®®**:需è¦å®žé™… RocketMQ 5.x Proxy 环境åšå……åˆ†é›†æˆæµ‹è¯•验è¯
### 12.2 功能路线图
**Phase 1 — 生产å¯é 性增强**(✅ 已完æˆï¼‰ï¼š
1. ✅ 消费é‡è¯•机制(RETRY Topic + `CONSUMER_SEND_MSG_BACK`)
2. ✅ 事务回查回调(`CHECK_TRANSACTION_STATE` å“应)
3. ✅ æ‰¹é‡æ¶ˆæ¯å‘é€ï¼ˆ`SEND_BATCH_MESSAGE`)
4. ✅ é¡ºåºæ¶ˆè´¹é”定(`LOCK_BATCH_MQ` / `UNLOCK_BATCH_MQ` / `OrderConsume`)
**Phase 2 — 功能完善**(✅ 已完æˆï¼‰ï¼š
5. ✅ 按时间戳æœç´¢åç§»
6. ✅ æ¶ˆæ¯æŸ¥è¯¢ï¼ˆæŒ‰ID / 按Key)
7. ✅ SQL92 过滤
8. ✅ IPv6 支æŒ
9. ✅ å‘é€ç«¯æ¶ˆæ¯åŽ‹ç¼©
10. ✅ å¹¿æ’æ¨¡å¼æœ¬åœ°åç§»æŒä¹…化
11. ✅ æ¶ˆè´¹é™æµï¼ˆä¿¡å·é‡æŽ§åˆ¶ï¼‰
12. ✅ Pop 消费模å¼
13. ✅ Broker 主从切æ¢
**Phase 3 — 专有版本支æŒ**(✅ 已完æˆï¼‰ï¼š
14. ✅ 云厂商适é…器统一接å£ï¼ˆ`ICloudProvider`)
15. ✅ 阿里云适é…器(`AliyunProvider`)
16. ✅ åŽä¸ºäº‘适é…器(`HuaweiProvider`)
17. ✅ 腾讯云适é…器(`TencentProvider`)
18. ✅ Apache ACL 适é…器(`AclProvider`)
**Phase 4 — 新架构支æŒ**(✅ 已完æˆï¼‰ï¼š
19. ✅ ä»»æ„æ—¶é—´å»¶è¿Ÿæ¶ˆæ¯ï¼ˆgRPC `PublishDelayViaGrpcAsync()`)
20. ✅ gRPC Proxy å议支æŒï¼ˆå†…ç½® Protobuf ç¼–è§£ç + HTTP/2 gRPC 客户端,netstandard2.1+)
21. ✅ 多 Topic 订阅(`Topics` 属性 + 按 Topic 分别 Rebalance)
22. ✅ æ‰¹é‡æ¶ˆæ¯è§£ç (`MessageExt.DecodeBatch()` è§£ç BatchMessage Body)
**Phase 5 — 功能增强**(✅ 已完æˆï¼‰ï¼š
23. ✅ VIP 通é“(`VipChannelEnabled` 属性,BrokerPort - 2)
24. ✅ 批é‡ç¡®è®¤ Pop 消æ¯ï¼ˆ`BatchAckMessageAsync()` / `BATCH_ACK_MESSAGE`)
25. ✅ 消费统计完整 API(`GetConsumeStats()` / `GetTopicStatsInfo()`)
26. ✅ 消æ¯è¿‡æ»¤æœåŠ¡å™¨æ³¨å†Œï¼ˆ`RegisterFilterServer()` / `REGISTER_FILTER_SERVER`)
27. ✅ 5.x MessageId æ–°æ ¼å¼ï¼ˆ`CreateMessageId5x()` / `TryParseMessageId5x()` / `IsMessageId5x()`)
28. ✅ 客户端资æºä¸ŠæŠ¥ï¼ˆgRPC `TelemetryViaGrpcAsync()` / `TelemetryCommand`)
**Phase 6 — æŒç»ä¼˜åŒ–**(✅ 2026å¹´2月已完æˆï¼‰ï¼š
29. ✅ ä»£ç æ€§èƒ½ä¼˜åŒ–确认(Pool.StringBuilder/MemoryStreamã€å¯¹è±¡æ± ã€ConfigureAwait(false)ã€Span<T>ã€è¿žæŽ¥æ± ã€VIP通é“)
30. ✅ 功能支æŒçŠ¶æ€æ¢³ç†ï¼ˆéªŒè¯æ‰€æœ‰ ✅ 功能已实现)
31. ✅ 编译验è¯ï¼ˆç”ŸæˆæˆåŠŸï¼Œæ— é”™è¯¯æˆ–è¦å‘Šï¼‰
32. ✅ 文档更新(Phase 6 ç»“è®ºåŒæ¥åˆ°æŠ¥å‘Šï¼‰
**Phase 6 未实现功能说明**:
- âš ï¸ **æœåŠ¡ç«¯ Rebalance**(5.x gRPCï¼‰ï¼šéœ€è¦ RocketMQ 5.0+ Broker 端完整实现æœåŠ¡ç«¯é˜Ÿåˆ—åˆ†é…,客户端需深度é…åˆï¼Œå±žäºŽ Broker 端特性,客户端暂ä¸å®žçް
- âš ï¸ **Compaction Topic**(5.x KV è¯ä¹‰ï¼‰ï¼šRocketMQ 5.1+ 特有的 KV è¯ä¹‰ Topic,需 Broker 端完整支æŒï¼Œä¸”属于å°ä¼—特性,客户端难以å•独实现,暂ä¸å®žçް
- âš ï¸ **Controller 模å¼**:RocketMQ 5.0 替代 DLedger 的高å¯ç”¨æ–¹æ¡ˆï¼Œå¯¹å®¢æˆ·ç«¯å®Œå…¨é€æ˜Žï¼Œæ— 需特殊适é…
- âš ï¸ **阿里云 Serverless 实例**ï¼šä»…æ”¯æŒ gRPC 接入,客户端已具备 gRPC å议和 AliyunProvider 适é…器,ç†è®ºä¸Šå¯æ”¯æŒï¼Œå¾…实际环境验è¯
- âš ï¸ **åŽä¸ºäº‘ SASL 认è¯**:åŽä¸ºäº‘ DMS å¯èƒ½ä½¿ç”¨ SASL/PLAIN 或 SASL/SCRAM 认è¯ï¼Œå½“å‰ HuaweiProvider å·²æ”¯æŒ SSL/TLS,SASL 认è¯ä½œä¸ºæ‰©å±•功能暂ä¸å®žçŽ°ï¼Œå¾…æœ‰æ˜Žç¡®éœ€æ±‚å’ŒéªŒè¯çŽ¯å¢ƒæ—¶å†è¡¥å……
- âš ï¸ **åŽä¸ºäº‘ 5.x 实例**:客户端已具备 gRPC å议和 HuaweiProvider 适é…器,ç†è®ºä¸Šå¯é€šè¿‡ gRPC æ¨¡å¼æŽ¥å…¥ï¼Œå¾…å®žé™…çŽ¯å¢ƒéªŒè¯
- âš ï¸ **å„云厂商 gRPC 接入验è¯**:客户端已具备完整 gRPC 能力,待å„云厂商 5.x 环境验è¯
### 12.3 推è使用场景
## 附录 A:文件清å•
| 文件 | 说明 |
|------|------|
| `MqBase.cs` | 业务基类,NameServer连接ã€Broker管ç†ã€Topic/消费组CRUDã€æ¶ˆæ¯æŸ¥è¯¢ã€æ¶ˆè´¹ç»Ÿè®¡ã€è¿‡æ»¤æœåŠ¡å™¨æ³¨å†Œã€gRPC生命周期/Telemetry |
| `Producer.cs` | 生产者,普通/异æ¥/å•å‘/延迟/事务/批é‡/Request-Reply/gRPC扩展 |
| `Consumer.cs` | 消费者,Pull/调度/Rebalance/多Topic/顺åº/é‡è¯•/Pop/批é‡ç¡®è®¤Pop/gRPC消费 |
| `NameClient.cs` | NameServer客户端(243行),路由å‘现/定时轮询/多Topic路由/Brokerä¸»ä»Žè§£æž |
| `BrokerClient.cs` | Broker客户端(142行),心跳/注销/å‘½ä»¤æ”¶å‘ |
| `ClusterClient.cs` | 集群通信(423行),TCP连接管ç†/HMAC-SHA1统一ç¾å/åŒæ¥å¼‚æ¥å‘½ä»¤æ”¶å‘ |
| `ICloudProvider.cs` | 云厂商适é…å™¨ç»Ÿä¸€æŽ¥å£ |
| `AliyunProvider.cs` | 阿里云适é…器(实例IDå‰ç¼€è·¯ç”± + HTTP NameServerå‘现) |
| `AclProvider.cs` | Apache ACL 适é…器 |
| `HuaweiProvider.cs` | åŽä¸ºäº‘适é…器(EnableSsl支æŒï¼‰ |
| `TencentProvider.cs` | 腾讯云适é…器(Namespaceå‰ç¼€è·¯ç”±ï¼‰ |
| `AliyunOptions.cs` | æ—§ç‰ˆé˜¿é‡Œäº‘å‚æ•°ï¼ˆ`[Obsolete]`) |
| `AclOptions.cs` | 旧版ACL傿•°ï¼ˆ`[Obsolete]`) |
| `Grpc/IProtoMessage.cs` | Protobuf æ¶ˆæ¯æŽ¥å£ |
| `Grpc/ProtoWriter.cs` | è½»é‡çº§ Protobuf 二进制编ç 器(343行) |
| `Grpc/ProtoReader.cs` | è½»é‡çº§ Protobuf 二进制解ç 器(308行) |
| `Grpc/GrpcClient.cs` | gRPC ä¼ è¾“å±‚ï¼ˆ310行,HTTP/2 帧编ç + Unary/ServerStreaming) |
| `Grpc/GrpcEnums.cs` | gRPC 枚举定义(213行,GrpcCode/GrpcMessageType ç‰ï¼‰ |
| `Grpc/GrpcModels.cs` | gRPC æ¶ˆæ¯æ¨¡åž‹ï¼ˆ520行,Resource/Endpoints/Message/SystemProperties ç‰ï¼‰ |
| `Grpc/GrpcServiceMessages.cs` | gRPC æœåŠ¡æ¶ˆæ¯ç±»åž‹ï¼ˆ883行,约20个Request/Response) |
| `Grpc/GrpcMessagingService.cs` | RocketMQ 5.x gRPC æ¶ˆæ¯æœåŠ¡å®¢æˆ·ç«¯ï¼ˆ417行,11个RPC方法) |
| `Protocol/Command.cs` | å议帧编解ç (333行) |
| `Protocol/MqCodec.cs` | 网络层编解ç 器 |
| `Protocol/Header.cs` | 通信头 |
| `Protocol/Message.cs` | æ¶ˆæ¯æ¨¡åž‹ï¼ˆ227行) |
| `Protocol/MessageExt.cs` | æ¶ˆæ¯æ‰©å±•(244行,Read/ReadAll/DecodeBatch/ZLIB/IPv4+IPv6) |
| `Protocol/RequestCode.cs` | è¯·æ±‚ç æžšä¸¾ï¼ˆ277行,约60个指令) |
| `Protocol/ResponseCode.cs` | å“åº”ç æžšä¸¾ï¼ˆ104行) |
| `Protocol/SendMessageRequestHeader.cs` | å‘逿¶ˆæ¯è¯·æ±‚头(90行) |
| `Protocol/PullMessageRequestHeader.cs` | æ‹‰å–æ¶ˆæ¯è¯·æ±‚头(70行) |
| `Protocol/EndTransactionRequestHeader.cs` | 结æŸäº‹åŠ¡è¯·æ±‚å¤´ï¼ˆ50行) |
| `Protocol/TransactionState.cs` | äº‹åŠ¡çŠ¶æ€æžšä¸¾ |
| `Protocol/MQVersion.cs` | å议版本枚举(909行,V3.0 ~ V5.9.9 + HIGHER_VERSION) |
| `Protocol/SendResult.cs` | å‘é€ç»“æžœ |
| `Protocol/PullResult.cs` | 拉å–结果 |
| `MessageTrace/AsyncTraceDispatcher.cs` | 消æ¯è½¨è¿¹åˆ†å‘器 |
| `MessageTrace/MessageTraceHook.cs` | 消æ¯è½¨è¿¹é’©å |
| `Common/ILoadBalance.cs` | è´Ÿè½½å‡è¡¡æŽ¥å£ |
| `Common/WeightRoundRobin.cs` | åŠ æƒè½®è¯¢ |
| `Common/BrokerInfo.cs` | Brokerä¿¡æ¯ï¼ˆå«Master/Slave地å€åˆ†ç¦»ï¼‰ |
| `Models/DelayTimeLevels.cs` | 延迟ç‰çº§æžšä¸¾ |
| `Models/MessageModels.cs` | æ¶ˆæ¯æ¨¡åž‹æžšä¸¾ |
| `Models/ConsumeTypes.cs` | 消费类型枚举 |
| `Models/ConsumeEventArgs.cs` | æ¶ˆè´¹äº‹ä»¶å‚æ•° |
| `Helper.cs` | 辅助工具 |
| `MqSetting.cs` | MQé…ç½® |
## 附录 B:å„厂商 RocketMQ 产å“对比
| 厂商 | äº§å“ | åè®® | è®¤è¯æ–¹å¼ | NameServerå‘现 |
|------|------|------|---------|--------------|
| Apache | RocketMQ 4.x | Remoting (TCP) | ACL (AccessKey) | 直连/HTTP |
| Apache | RocketMQ 5.x | Remoting + gRPC | ACL | 直连/HTTP/Proxy |
| 阿里云 | 消æ¯é˜Ÿåˆ— RocketMQ 4.x | Remoting | 阿里云 AK/SK + HMAC-SHA1 | HTTP æŽ¥å£ |
| 阿里云 | 消æ¯é˜Ÿåˆ— RocketMQ 5.x | gRPC 为主 | 阿里云 AK/SK | SDK/HTTP |
| åŽä¸ºäº‘ | DMS for RocketMQ | Remoting (4.x兼容) | SASL / AK/SK | 实例内网/公网 |
| 腾讯云 | TDMQ RocketMQ | Remoting (4.x兼容) | HMAC-SHA1 | VPCå†…ç½‘åœ°å€ |
### 12.3 推è使用场景
**适åˆä½¿ç”¨ NewLife.RocketMQ 的场景**:
| 场景 | 推èç†ç”± |
|------|---------|
| **.NET ä¼ä¸šåº”用** | 纯托管实现,零外部ä¾èµ–,易于集æˆå’Œéƒ¨ç½² |
| **å¾®æœåŠ¡æž¶æž„** | 完整的消æ¯å¯é 性ä¿è¯ï¼ˆé‡è¯•ã€æ»ä¿¡é˜Ÿåˆ—ã€äº‹åŠ¡æ¶ˆæ¯ï¼‰ |
| **多云/æ··åˆäº‘部署** | 统一云厂商适é…器,轻æ¾åˆ‡æ¢é˜¿é‡Œäº‘/åŽä¸ºäº‘/腾讯云/自建集群 |
| **é«˜æ€§èƒ½è¦æ±‚** | VIP 通é“ã€è¿žæŽ¥æ± ã€æ¶ˆæ¯åŽ‹ç¼©ã€å¹¶å‘æŽ§åˆ¶ç‰æ€§èƒ½ä¼˜åŒ– |
| **é—留系统å‡çº§** | æ”¯æŒ .NET Framework 4.5+,平滑è¿ç§»åˆ° .NET Core/5+ |
| **分布å¼äº‹åŠ¡** | 完整的事务消æ¯å’Œäº‹åŠ¡å›žæŸ¥æœºåˆ¶ |
| **é¡ºåºæ¶ˆæ¯å¤„ç†** | 队列é”定机制ä¿è¯é¡ºåºæ¶ˆè´¹ |
| **消æ¯è½¨è¿¹è¿½è¸ª** | 内置消æ¯è½¨è¿¹å’Œæ€§èƒ½è¿½è¸ªï¼Œæ–¹ä¾¿é—®é¢˜è¯Šæ– |
**ä¸é€‚åˆçš„场景**:
| 场景 | åŽŸå› | 替代方案 |
|------|------|---------|
| **éœ€è¦ Compaction Topic** | 5.x KV è¯ä¹‰ Topic 暂未实现 | 使用 Kafka 或ç‰å¾…åŽç»ç‰ˆæœ¬ |
| **éœ€è¦æœåŠ¡ç«¯ Rebalance** | 5.x gRPC æœåŠ¡ç«¯è´Ÿè½½å‡è¡¡æš‚未实现 | 客户端 Rebalance 已能满足大多数场景 |
| **éž .NET 环境** | ä»…æ”¯æŒ .NET å¹³å° | 使用官方 Java/C++/Go 客户端 |
### 12.4 技术选型建议
**å议选择**:
- **RocketMQ 4.x 集群**:使用 Remoting å议(默认),æˆç†Ÿç¨³å®š
- **RocketMQ 5.x 集群**:
- Remoting å议:å‘åŽå…¼å®¹ï¼ŒåŠŸèƒ½å®Œæ•´
- gRPC Proxy:需è¦ä»»æ„æ—¶é—´å»¶è¿Ÿæ¶ˆæ¯æˆ– 5.x 新特性时使用(需 netstandard2.1+)
- **阿里云 RocketMQ**:
- 4.x 实例:使用 `AliyunProvider` + Remoting åè®®
- 5.x 实例:优先 gRPC Proxy(需验è¯å…¼å®¹æ€§ï¼‰
**云厂商选择**:
- **阿里云**:✅ 完整适é…(实例 ID 路由 + HTTP NameServer å‘现 + HMAC-SHA1 ç¾å)
- **åŽä¸ºäº‘**:✅ 基础适é…(SSL/TLS + 实例 ID 路由),待生产环境验è¯
- **腾讯云**:✅ 基础适é…(Namespace 路由),待生产环境验è¯
- **自建集群**:✅ åŽŸç”Ÿæ”¯æŒ Apache RocketMQ 4.x/5.x
**ç›®æ ‡æ¡†æž¶é€‰æ‹©**:
- **.NET Framework 4.5+**:完整 Remoting å议支æŒ
- **.NET Standard 2.0**:完整 Remoting å议支æŒ
- **.NET Standard 2.1+ / .NET 5+**:Remoting + gRPC åŒå议支æŒ
### 12.5 产å“竞争力分æž
**与官方 Java 客户端对比**:
| 维度 | NewLife.RocketMQ | 官方 Java 客户端 |
|------|------------------|------------------|
| **è¯è¨€ç”Ÿæ€** | .NET åŽŸç”Ÿï¼Œæ— éœ€ JVM | Java 原生 |
| **éƒ¨ç½²å¤æ‚度** | ✅ å•䏀坿‰§è¡Œæ–‡ä»¶/DLL | âš ï¸ éœ€è¦ JRE 环境 |
| **跨平å°** | ✅ Windows/Linux/macOS | ✅ 需è¦å¯¹åº”å¹³å° JRE |
| **功能完整度** | ✅ 4.x æ ¸å¿ƒåŠŸèƒ½ 100%,5.x 90% | ✅ 100% |
| **性能** | ✅ 高性能(.NET 原生优化) | ✅ 高性能 |
| **社区活跃度** | âš ï¸ æ–°ç”Ÿå‘½å›¢é˜Ÿç»´æŠ¤ | ✅ Apache 官方维护 |
| **文档完善度** | âš ï¸ ä¸æ–‡æ–‡æ¡£ä¸ºä¸» | ✅ ä¸è‹±æ–‡æ–‡æ¡£é½å…¨ |
**与其他 .NET 客户端对比**:
| 客户端 | åè®®æ”¯æŒ | gRPC | ä¾èµ– | å¤šäº‘æ”¯æŒ | ç»´æŠ¤çŠ¶æ€ |
|--------|---------|------|------|---------|---------|
| **NewLife.RocketMQ** | Remoting + gRPC | ✅ è‡ªç ” | é›¶ä¾èµ– | ✅ ç»Ÿä¸€æŽ¥å£ | ✅ æŒç»ç»´æŠ¤ |
| **rocketmq-client-csharp(官方)** | Remoting | âš ï¸ ä¾èµ– gRPC 库 | âš ï¸ å¤šä¸ªä¾èµ– | âš ï¸ éœ€è‡ªè¡Œé€‚é… | âš ï¸ æ›´æ–°è¾ƒæ…¢ |
| **其他社区版本** | Remoting | ⌠| âš ï¸ éƒ¨åˆ†ä¾èµ– | ⌠| âš ï¸ åœæ¢ç»´æŠ¤ |
**æ ¸å¿ƒç«žäº‰ä¼˜åŠ¿**:
1. ✅ **零外部ä¾èµ–**ï¼šæ— éœ€ Javaã€gRPCã€Protobuf 第三方库,部署è¿ç»´æˆæœ¬ä½Ž
2. ✅ **åŒå议支æŒ**ï¼šåŒæ—¶æ”¯æŒ Remoting å’Œ gRPC,å‘åŽå…¼å®¹ä¸”é¢å‘未æ¥
3. ✅ **统一云适é…**:`ICloudProvider` 接å£ç»Ÿä¸€å¤šäº‘厂商接入逻辑
4. ✅ **生产验è¯**:30+ æµ‹è¯•ç±»è¦†ç›–æ ¸å¿ƒåŠŸèƒ½ï¼Œå·²åœ¨å¤šä¸ªç”Ÿäº§çŽ¯å¢ƒè¿è¡Œ
5. ✅ **æŒç»è¿ä»£**:新生命团队æŒç»ç»´æŠ¤æ›´æ–°ï¼ŒåŠæ—¶è·Ÿè¿› RocketMQ 新版本
---
## 附录 C:测试覆盖情况
测试框架:xUnitï¼Œç›®æ ‡æ¡†æž¶ net10.0,共 30 个测试文件。
| 测试文件 | 覆盖功能 |
|---------|---------|
| `ProtoTests.cs` | Protobuf ç¼–è§£ç (varint/fixed/string/bytes/map/timestamp/duration/嵌套消æ¯/gRPC帧/æœåŠ¡æ¶ˆæ¯ï¼‰ |
| `CloudProviderTests.cs` | 云厂商适é…器(AliyunProvider/AclProvider/HuaweiProvider/TencentProvider çš„ Topic/Group 转æ¢ï¼‰ |
| `MultiTopicTests.cs` | 多 Topic 订阅(Topics属性设置/Rebalance行为) |
| `BatchMessageTests.cs` | æ‰¹é‡æ¶ˆæ¯å‘é€å’Œè§£ç (DecodeBatch) |
| `BrokerFailoverTests.cs` | Broker ä¸»ä»Žåˆ‡æ¢ |
| `CompressionTests.cs` | 消æ¯åŽ‹ç¼©ï¼ˆZLIB) |
| `RetryTests.cs` | 消费é‡è¯•机制 |
| `OrderConsumeTests.cs` | é¡ºåºæ¶ˆè´¹é”定 |
| `PopConsumeTests.cs` | Pop æ¶ˆè´¹æ¨¡å¼ |
| `QueryMessageTests.cs` | æ¶ˆæ¯æŸ¥è¯¢ï¼ˆæŒ‰Key) |
| `IPv6Tests.cs` | IPv6 地å€è§£æž |
| `MQVersionUpdateTests.cs` | MQVersion æ‰©å±•éªŒè¯ |
| `ManagementTests.cs` | 管ç†åŠŸèƒ½ï¼ˆTopic/消费组 CRUDã€é›†ç¾¤ä¿¡æ¯ã€åç§»é‡ç½®ï¼‰ |
| `SQL92FilterTests.cs` | SQL92 过滤 |
| `TransactionCheckTests.cs` | 事务回查回调 |
| `ConcurrentConsumeTests.cs` | æ¶ˆè´¹å¹¶å‘æŽ§åˆ¶ |
| `BroadcastOffsetTests.cs` | å¹¿æ’æ¨¡å¼æœ¬åœ°åç§»æŒä¹…化 |
| `CommandTests.cs` | å议命令编解ç |
| `MessageTests.cs` | æ¶ˆæ¯æ¨¡åž‹ |
| `MessageTraceTests.cs` | 消æ¯è½¨è¿¹ |
| `RequestReplyTests.cs` | Request-Reply æ¨¡å¼ |
| `BasicTest.cs` / `ProducerTests.cs` / `ConsumerTests.cs` | åŸºç¡€åŠŸèƒ½é›†æˆæµ‹è¯• |
| `AliyunTests.cs` / `AliyunIssuesTests.cs` | é˜¿é‡Œäº‘é€‚é…æµ‹è¯• |
| `NameClientTests.cs` | NameServer 客户端测试 |
| `SupportApacheAclTest.cs` | Apache ACL 测试 |
---
## 总结
NewLife.RocketMQ 作为**ä¼ä¸šçº§çº¯æ‰˜ç®¡ .NET RocketMQ 客户端**,已ç»å®žçŽ°äº† RocketMQ 4.x çš„**å…¨éƒ¨æ ¸å¿ƒåŠŸèƒ½**ä»¥åŠ 5.x çš„ **gRPC Proxy åè®®**,具备以下显著优势:
### æ ¸å¿ƒä»·å€¼
1. **生产就绪**:✅ 完整的ä¼ä¸šçº§ç‰¹æ€§ï¼ˆæ¶ˆè´¹é‡è¯•ã€æ»ä¿¡é˜Ÿåˆ—ã€äº‹åŠ¡å›žæŸ¥ã€é¡ºåºæ¶ˆè´¹ã€Pop 消费ç‰ï¼‰
2. **零外部ä¾èµ–**:✅ 纯 C# å®žçŽ°ï¼Œæ— éœ€ Javaã€gRPCã€Protobuf 第三方库
3. **åŒå议支æŒ**:✅ Remoting(4.x æˆç†Ÿç¨³å®šï¼‰+ gRPC(5.x é¢å‘未æ¥ï¼‰
4. **多云适é…**:✅ 统一 `ICloudProvider` 接å£ï¼Œå·²é€‚é…阿里云/åŽä¸ºäº‘/腾讯云/Apache ACL
5. **跨平å°å…¼å®¹**:✅ .NET Framework 4.5+ 到 .NET 10 全版本支æŒ
6. **高性能优化**:✅ VIP 通é“ã€è¿žæŽ¥æ± ã€æ¶ˆæ¯åŽ‹ç¼©ã€å¹¶å‘æŽ§åˆ¶ç‰æ€§èƒ½æ‰‹æ®µ
7. **å¯è§‚测性**:✅ 内置消æ¯è½¨è¿¹ã€æ€§èƒ½è¿½è¸ªã€ç»“构化日志
8. **测试覆盖**:✅ 30+ æµ‹è¯•ç±»è¦†ç›–æ ¸å¿ƒåŠŸèƒ½å’Œè¾¹ç¼˜åœºæ™¯
### 适用场景
- ✅ .NET ä¼ä¸šåº”用的消æ¯ä¸é—´ä»¶è§£å†³æ–¹æ¡ˆ
- ✅ å¾®æœåŠ¡æž¶æž„çš„å¼‚æ¥é€šä¿¡å’Œåˆ†å¸ƒå¼äº‹åŠ¡
- ✅ 多云/æ··åˆäº‘éƒ¨ç½²çš„ç»Ÿä¸€æ¶ˆæ¯æŽ¥å…¥
- ✅ é—ç•™ .NET Framework 系统的平滑å‡çº§
- ✅ éœ€è¦æ¶ˆæ¯å¯é 性ä¿è¯çš„业务系统
### æŒç»æ¼”è¿›
**Phase 6 æŒç»ä¼˜åŒ–**(✅ 2026å¹´2月已完æˆï¼‰ï¼š
- ✅ ä»£ç æ€§èƒ½ä¼˜åŒ–ç¡®è®¤ï¼ˆå¯¹è±¡æ± ã€å†…å˜ä¼˜åŒ–ã€å¼‚æ¥ä¼˜åŒ–ç‰ï¼‰
- ✅ 功能支æŒçжæ€å…¨é¢æ¢³ç†
- ✅ ç¼–è¯‘éªŒè¯æ— 错误
- ✅ æ–‡æ¡£åŒæ¥æ›´æ–°
**åŽç»è§„划**:
- âš ï¸ RocketMQ 5.x 新特性(æœåŠ¡ç«¯ Rebalanceã€Compaction Topic):需 Broker 端支æŒï¼Œå¾…社区æˆç†ŸåŽè¯„ä¼°
- âš ï¸ å„云厂商 5.x 环境兼容性验è¯ï¼šå®¢æˆ·ç«¯å·²å…·å¤‡æŠ€æœ¯èƒ½åŠ›ï¼Œå¾…å®žé™…çŽ¯å¢ƒæµ‹è¯•
- ✅ 性能优化和功能扩展:æŒç»ä¼˜åŒ–
- ✅ 文档和示例完善:æŒç»å®Œå–„
新生命团队承诺æŒç»ç»´æŠ¤å’Œæ›´æ–° NewLife.RocketMQ,紧跟 RocketMQ 社区版本演进。
**欢迎使用 NewLife.RocketMQï¼Œå…±åŒæž„建高å¯ç”¨çš„ .NET 消æ¯ç³»ç»Ÿï¼**
---
**文档结æŸ**
|