所有分支的提交所有分支的提交都要跑test都要跑test
|
# NewLife.RocketMQ ¼Ü¹¹Éè¼ÆÎĵµ
## Ò»¡¢×ÜÌå¼Ü¹¹
### 1.1 Éè¼ÆÀíÄî
NewLife.RocketMQ ²ÉÓÃ**·Ö²ãÇåÎú¡¢Ö°Ôðµ¥Ò»¡¢¿ÉÀ©Õ¹**µÄ¼Ü¹¹Éè¼Æ£º
- **´¿ÍйÜʵÏÖ**£ºÍêȫʹÓà C# ʵÏÖ£¬ÎÞÐè Java¡¢gRPC¡¢Protobuf µÚÈý·½¿â
- **Ë«ÐÒéÖ§³Ö**£ºÍ¬Ê±Ö§³Ö Remoting ÐÒ飨4.x£©ºÍ gRPC ÐÒ飨5.x£©
- **ÔÆ³§ÉÌÊÊÅä**£ºÍ³Ò» `ICloudProvider` ½Ó¿Ú£¬ÇáËɽÓÈë¸÷ÔÆ·þÎñÉÌ
- **¸ßÐÔÄÜÓÅ»¯**£ºÁ¬½Ó¸´ÓᢶÔÏ󳨡¢VIP ͨµÀ¡¢ÏûϢѹËõµÈÓÅ»¯ÊÖ¶Î
- **¿É²âÊÔÐÔ**£º30+ ²âÊÔÀา¸ÇºËÐŦÄܺͱßÔµ³¡¾°
### 1.2 ²ã´Î½á¹¹
```
©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´
©¦ ÒµÎñ²ã (MqBase) ©¦
©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦
©¦ ©¦ Producer ©¦ ©¦ Consumer ©¦ ©¦
©¦ ©¦ Éú²úÕßÒµÎñÂß¼ ©¦ ©¦ Ïû·ÑÕßÒµÎñÂß¼ ©¦ ©¦
©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦
©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼
©¦
©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´
©¦ ͨÐŲã (Client) ©¦
©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦
©¦ ©¦ ClusterClient ©¦ ©¦ GrpcClient ©¦ ©¦
©¦ ©¦ TCP ³¤Á¬½Ó¹ÜÀí ©¦ ©¦ HTTP/2 gRPC ©¦ ©¦
©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦
©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦
©¦ ©¦ NameClient ©¦ ©¦GrpcMessaging ©¦ ©¦
©¦ ©¦ ·ÓÉ·¢ÏÖ ©¦ ©¦Service ©¦ ©¦
©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦
©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦
©¦ ©¦ BrokerClient ©¦ ©¦
©¦ ©¦ ÐÄÌø/×¢Ïú ©¦ ©¦
©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦
©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼
©¦
©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´
©¦ ÐÒé²ã (Protocol) ©¦
©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦
©¦ ©¦ Command ©¦ ©¦ GrpcModels ©¦ ©¦
©¦ ©¦ Remoting Ö¡ ©¦ ©¦ gRPC ÏûϢģÐÍ ©¦ ©¦
©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦
©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦
©¦ ©¦ MessageExt ©¦ ©¦ ProtoWriter ©¦ ©¦
©¦ ©¦ ÏûϢģÐÍ ©¦ ©¦ Protobuf ±àÂë ©¦ ©¦
©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦
©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦
©¦ ©¦ RequestCode ©¦ ©¦ ProtoReader ©¦ ©¦
©¦ ©¦ ÇëÇóÂëö¾Ù ©¦ ©¦ Protobuf ½âÂë ©¦ ©¦
©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦
©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼
©¦
©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´
©¦ ´«Êä²ã (Transport) ©¦
©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦
©¦ ©¦ NewLife.Net ©¦ ©¦ HttpClient ©¦ ©¦
©¦ ©¦ TCP Socket ©¦ ©¦ HTTP/2 ©¦ ©¦
©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦
©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼
©¦
©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´
©¦ ÔÆ³§ÉÌÊÊÅä²ã (CloudProvider) ©¦
©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦
©¦ ©¦Aliyun ©¦ ©¦Huawei ©¦ ©¦Tencent ©¦ ©¦ ACL ©¦ ©¦
©¦ ©¦Provider©¦ ©¦Provider©¦ ©¦Provider©¦ ©¦Provider©¦ ©¦
©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦
©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼
```
---
## ¶þ¡¢Remoting ÐÒé¼Ü¹¹£¨RocketMQ 4.x/5.x Broker£©
### 2.1 ÐÒé¸ÅÊö
Remoting ÐÒéÊÇ RocketMQ µÄ¾µä TCP ˽ÓÐÐÒ飬²ÉÓÃ**¶þ½øÖÆÖ¡¸ñʽ + JSON/¶þ½øÖÆÐòÁл¯**£¬¾ßÓиßЧ¡¢Îȶ¨µÄÌØµã¡£
### 2.2 ÐÒéÖ¡¸ñʽ
```
©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´
©¦ Remoting ÐÒéÖ¡ ©¦
©À©¤©¤©¤©¤©¤©¤©¤©¤©Ð©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©Ð©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©Ð©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©È
©¦ Length ©¦HeaderLength©¦ Header ©¦ Body ©¦
©¦ 4 bytes©¦ 4 bytes ©¦ N bytes ©¦ M bytes ©¦
©¸©¤©¤©¤©¤©¤©¤©¤©¤©Ø©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©Ø©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©Ø©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼
Length = 4 + N + M £¨×ܳ¤¶È£©
HeaderLength ¸ß 8 λ£ºSerializeType (0=JSON, 1=ROCKETMQ)
HeaderLength µÍ 24 λ£ºÊµ¼Ê Header ³¤¶È
```
**ÐÒéÖ¡±à½âÂëÁ÷³Ì**£º
```
·¢ËÍ·½£º
Message ¡ú Command.Create() ¡ú SetBody() ¡ú Build()
¡ú [Length][HeaderLength][Header][Body] ¡ú TCP Socket
½ÓÊÕ·½£º
TCP Socket ¡ú ReadLength(4 bytes) ¡ú ReadHeaderLength(4 bytes)
¡ú ReadHeader(N bytes) ¡ú ReadBody(M bytes)
¡ú Command.Parse() ¡ú Message
```
### 2.3 ͨÐŲã¼Ü¹¹
#### 2.3.1 ClusterClient£¨¼¯Èº¿Í»§¶Ë£©
**Ö°Ôð**£º
- TCP Á¬½Ó¹ÜÀíÓ븴ÓÃ
- ÃüÁî·¢ËÍÓëÏìÓ¦½ÓÊÕ
- HMAC-SHA1 ͳһǩÃû
- Opaque ÇëÇó-ÏìӦƥÅä
**ºËÐÄÁ÷³Ì**£º
```
©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´
©¦ ClusterClient ¹¤×÷Á÷³Ì ©¦
©À©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©È
©¦ 1. GetClient(address) - »ñÈ¡»ò´´½¨ TCP Á¬½Ó ©¦
©¦ 2. SetSignature() - ÉèÖÃÔÆ³§ÉÌÇ©Ãû ©¦
©¦ 3. Invoke() / InvokeAsync() - ·¢ËÍÃüÁî ©¦
©¦ 4. Opaque Æ¥ÅäÏìÓ¦ ©¦
©¦ 5. ÏìÓ¦³¬Ê±´¦Àí ©¦
©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼
```
**Á¬½Ó¸´ÓûúÖÆ**£º
```csharp
// µ¥Á¬½Ó Opaque ¸´ÓÃ
private ConcurrentDictionary<String, TcpSession> _clients = new();
public TcpSession GetClient(String address)
{
return _clients.GetOrAdd(address, addr =>
{
var client = new TcpSession();
client.Remote = new NetUri(addr);
client.Open();
return client;
});
}
```
#### 2.3.2 NameClient£¨Ãû³Æ·þÎñÆ÷¿Í»§¶Ë£©
**Ö°Ôð**£º
- Topic ·ÓÉ·¢ÏÖ
- ¶¨Ê±ÂÖѯ·ÓɸüУ¨30s£©
- ¶à Topic ·ÓɹÜÀí
- Broker Ö÷´ÓµØÖ·½âÎö
**·ÓÉ·¢ÏÖÁ÷³Ì**£º
```
Start() ¡ú UpdateRouteAsync()
¡ý
GetRouteInfo(Topic) ¡ú NameServer
¡ý
Parse Response:
- BrokerDatas (Broker ¼¯ÈºÐÅÏ¢)
- QueueDatas (¶ÓÁзֲ¼ÐÅÏ¢)
¡ý
Build BrokerInfo:
- MasterAddress (Ö÷½ÚµãµØÖ·)
- SlaveAddresses (´Ó½ÚµãµØÖ·Áбí)
¡ý
Cache & Notify ¡ú OnBrokerChange Event
¡ý
¶¨Ê±Æ÷ 30s ºóÖØ¸´
```
#### 2.3.3 BrokerClient£¨Broker ¿Í»§¶Ë£©
**Ö°Ôð**£º
- Broker ÐÄÌø£¨30s£©
- ¿Í»§¶Ë×¢Ïú
- ÃüÁîÊÕ·¢£¨¼Ì³Ð×Ô ClusterClient£©
**ÐÄÌø»úÖÆ**£º
```
Start() ¡ú SendHeartbeat()
¡ý
Build HeartbeatData:
- ClientID
- ProducerDataSet / ConsumerDataSet
¡ý
Send HEART_BEAT (34) ¡ú Broker
¡ý
¶¨Ê±Æ÷ 30s ºóÖØ¸´
```
### 2.4 ºËÐÄ RequestCode
| ·ÖÀà | RequestCode | Öµ | ˵Ã÷ |
|------|------------|:--:|------|
| **ÏûÏ¢·¢ËÍ** | SEND_MESSAGE_V2 | 310 | ·¢ËÍÏûÏ¢V2 |
| | SEND_BATCH_MESSAGE | 320 | ÅúÁ¿·¢ËÍ |
| | SEND_REPLY_MESSAGE_V2 | 325 | »Ø¸´ÏûÏ¢ |
| **ÏûÏ¢ÀÈ¡** | PULL_MESSAGE | 11 | ÀÈ¡ÏûÏ¢ |
| | POP_MESSAGE | 200050 | Pop Ïû·Ñ |
| | ACK_MESSAGE | 200051 | Pop È·ÈÏ |
| **ÊÂÎñÏûÏ¢** | END_TRANSACTION | 37 | ½áÊøÊÂÎñ |
| | CHECK_TRANSACTION_STATE | 39 | ÊÂÎñ»Ø²é |
| **Æ«ÒÆ¹ÜÀí** | QUERY_CONSUMER_OFFSET | 14 | ²éÑ¯Æ«ÒÆ |
| | UPDATE_CONSUMER_OFFSET | 15 | ¸üÐÂÆ«ÒÆ |
| | SEARCH_OFFSET_BY_TIMESTAMP | 29 | °´Ê±¼ä´ÁËÑË÷ |
| **˳ÐòÏû·Ñ** | LOCK_BATCH_MQ | 41 | Ëø¶¨¶ÓÁÐ |
| | UNLOCK_BATCH_MQ | 42 | ½âËø¶ÓÁÐ |
| **·ÓɹÜÀí** | GET_ROUTEINTO_BY_TOPIC | 105 | Topic ·ÓÉ |
| | GET_BROKER_CLUSTER_INFO | 106 | ¼¯ÈºÐÅÏ¢ |
| **ÐÄÌø** | HEART_BEAT | 34 | ÐÄÌø |
| | UNREGISTER_CLIENT | 35 | ×¢Ïú¿Í»§¶Ë |
| | NOTIFY_CONSUMER_IDS_CHANGED | 40 | Ïû·ÑÕß±ä¸ü֪ͨ |
### 2.5 ÏûÏ¢±à½âÂë
#### 2.5.1 ÏûÏ¢·¢ËͱàÂë
```
Message ¡ú SendMessageRequestHeader
¡ý
Build Command:
- RequestCode = 310 (SEND_MESSAGE_V2)
- Header (JSON »ò¶þ½øÖÆ)
- Body (ÏûÏ¢Ì壬¿ÉѹËõ)
¡ý
Check Compress:
if (Body.Length > CompressOverBytes)
Body = ZLIB.Compress(Body)
SysFlag |= 0x1 // ±ê¼ÇѹËõ
¡ý
Send to Broker
```
#### 2.5.2 ÏûÏ¢½ÓÊÕ½âÂë
```
Receive from Broker
¡ý
MessageExt.Read(buffer):
1. Parse SysFlag
2. Check Compress (SysFlag & 0x1)
if compressed: Body = ZLIB.Decompress(Body)
3. Check IPv6 (SysFlag & 0x4)
if IPv6: IP = 16 bytes
else: IP = 4 bytes
4. Parse Properties (Key-Value)
¡ý
Return MessageExt
```
#### 2.5.3 ÅúÁ¿ÏûÏ¢½âÂë
```
MessageExt.DecodeBatch(body):
while (offset < body.Length)
{
Read TotalSize (4 bytes)
Read MagicCode (4 bytes)
Read BodyCRC (4 bytes)
Read SysFlag (4 bytes)
Read BornTimestamp (8 bytes)
Read BornHost (8 bytes, IPv6) or (4 bytes, IPv4)
Read StoreTimestamp (8 bytes)
Read StoreHost (8 bytes, IPv6) or (4 bytes, IPv4)
Read ReconsumeTimes (4 bytes)
Read PreparedTransactionOffset (8 bytes)
Read BodyLength (4 bytes)
Read Body (BodyLength bytes)
Read TopicLength (1 byte)
Read Topic (TopicLength bytes)
Read PropertiesLength (2 bytes)
Read Properties (PropertiesLength bytes)
Build MessageExt
offset += TotalSize
}
Return List<MessageExt>
```
### 2.6 Ç©ÃûÈÏÖ¤»úÖÆ
#### 2.6.1 HMAC-SHA1 Ç©ÃûÁ÷³Ì
```
ClusterClient.SetSignature(request, provider):
1. Build SignatureData:
- AccessKey
- OnsChannel
- Timestamp
- RequestData (sorted properties)
2. HMAC-SHA1(SignatureData, SecretKey)
3. Add to ExtFields:
- AccessKey
- Signature
- OnsChannel
```
#### 2.6.2 ÔÆ³§ÉÌÊÊÅä
```
©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´
©¦ ICloudProvider ½Ó¿Ú ©¦
©À©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©È
©¦ - TransformTopic(topic) - ת»»Ö÷ÌâÃû ©¦
©¦ - TransformGroup(group) - ת»»Ïû·Ñ×éÃû ©¦
©¦ - GetNameServerAddress() - »ñÈ¡ NameServer ©¦
©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼
©¦
©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©à©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´
©¦ ©¦ ©¦
©°©¤©¤©¤¨‹©¤©¤©¤©´ ©°©¤©¤©¤©¤¨‹©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤¨‹©¤©¤©¤©¤©´
©¦Aliyun ©¦ ©¦ Huawei ©¦ ©¦Tencent ©¦
©¦ÊµÀýID ©¦ ©¦ÊµÀýID + ©¦ ©¦Namespace©¦
©¦Ç°×º ©¦ ©¦SSL/TLS ©¦ ©¦Ç°×º ©¦
©¸©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼
```
---
## Èý¡¢gRPC ÐÒé¼Ü¹¹£¨RocketMQ 5.x Proxy£©
### 3.1 ÐÒé¸ÅÊö
gRPC ÐÒéÊÇ RocketMQ 5.x ÒýÈëµÄмܹ¹£¬²ÉÓà **HTTP/2 + Protobuf** ±ê×¼ÐÒ飬¾ßÓÐÒÔÏÂÌØµã£º
- »ùÓÚ HTTP/2 ¶à·¸´ÓÃ
- Protobuf ¶þ½øÖÆÐòÁл¯
- Ö§³Ö Unary£¨ÇëÇó-ÏìÓ¦£©ºÍ Server Streaming£¨·þÎñ¶ËÁ÷ʽ£©
- ÄÚÖÃÇáÁ¿¼¶ Protobuf ±à½âÂëÆ÷£¨**ÎÞÍⲿÒÀÀµ**£©
**½öÔÚ netstandard2.1+ / net5+ Ä¿±ê¿ò¼Ü¿ÉÓÃ**
### 3.2 gRPC Ö¡¸ñʽ
```
©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´
©¦ gRPC ÏûÏ¢Ö¡ ©¦
©À©¤©¤©¤©¤©¤©¤©¤©¤©Ð©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©Ð©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©È
©¦ Comp ©¦ Length ©¦ Body ©¦
©¦ 1 byte ©¦ 4 bytes ©¦ N bytes ©¦
©¸©¤©¤©¤©¤©¤©¤©¤©¤©Ø©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©Ø©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼
Comp: 0=²»Ñ¹Ëõ, 1=gzip ѹËõ
Length: ´ó¶ËÐò£¬Protobuf ÏûÏ¢Ì峤¶È
Body: Protobuf ±àÂëµÄÏûÏ¢
```
**HTTP/2 ÇëÇó¸ñʽ**£º
```
POST /apache.rocketmq.v2.MessagingService/{MethodName} HTTP/2
Host: proxy.example.com:8081
Content-Type: application/grpc
X-Mq-Language: DOTNET
X-Mq-Client-Version: 3.0.2026.0701
Authorization: ...
[gRPC ÏûÏ¢Ö¡]
```
### 3.3 ͨÐŲã¼Ü¹¹
#### 3.3.1 GrpcClient£¨gRPC ¿Í»§¶Ë£©
**Ö°Ôð**£º
- HTTP/2 Á¬½Ó¹ÜÀí
- gRPC Ö¡±à½âÂë
- Unary ºÍ Server Streaming µ÷ÓÃ
- ÇëÇó³¬Ê±¿ØÖÆ
**ºËÐÄÁ÷³Ì**£º
```
©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´
©¦ GrpcClient ¹¤×÷Á÷³Ì ©¦
©À©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©È
©¦ 1. InvokeAsync(service, method, request) ©¦
©¦ 2. Build HTTP/2 Request ©¦
©¦ 3. Encode Request (ProtoWriter) ©¦
©¦ 4. Send via HttpClient ©¦
©¦ 5. Receive Response Stream ©¦
©¦ 6. Decode Response (ProtoReader) ©¦
©¦ 7. Return TResponse ©¦
©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼
```
**Unary µ÷ÓÃ**£º
```csharp
public async Task<TResponse> InvokeAsync<TRequest, TResponse>(
String service, String method, TRequest request)
{
var url = $"{BaseUri}/{service}/{method}";
var body = ProtoWriter.Encode(request);
var content = new ByteArrayContent(BuildFrame(body));
var response = await _httpClient.PostAsync(url, content);
var stream = await response.Content.ReadAsStreamAsync();
return ProtoReader.Decode<TResponse>(stream);
}
```
**Server Streaming µ÷ÓÃ**£º
```csharp
public async IAsyncEnumerable<TResponse> InvokeStreamAsync<TRequest, TResponse>(
String service, String method, TRequest request)
{
var url = $"{BaseUri}/{service}/{method}";
var body = ProtoWriter.Encode(request);
var content = new ByteArrayContent(BuildFrame(body));
var response = await _httpClient.PostAsync(url, content,
HttpCompletionOption.ResponseHeadersRead);
var stream = await response.Content.ReadAsStreamAsync();
while (!stream.EndOfStream)
{
yield return ProtoReader.Decode<TResponse>(stream);
}
}
```
#### 3.3.2 GrpcMessagingService£¨ÏûÏ¢·þÎñ£©
**11 ¸öºËÐÄ RPC ·½·¨**£º
| ·½·¨ | ÀàÐÍ | ˵Ã÷ |
|------|------|------|
| QueryRoute | Unary | ²éѯÖ÷Ìâ·ÓÉ |
| SendMessage | Unary | ·¢ËÍÏûÏ¢£¨ÆÕͨ/ÑÓ³Ù/FIFO/ÊÂÎñ£© |
| QueryAssignment | Unary | ²éѯ¶ÓÁзÖÅä |
| ReceiveMessage | Server Streaming | ½ÓÊÕÏûÏ¢£¨³¤ÂÖѯ£© |
| AckMessage | Unary | È·ÈÏÏûÏ¢Ïû·Ñ |
| Heartbeat | Unary | ÐÄÌø |
| EndTransaction | Unary | ½áÊøÊÂÎñ |
| ForwardToDeadLetterQueue | Unary | ת·¢µ½ËÀÐŶÓÁÐ |
| ChangeInvisibleDuration | Unary | Ð޸IJ»¿É¼ûʱ¼ä |
| NotifyClientTermination | Unary | ֪ͨ¿Í»§¶ËÖÕÖ¹ |
| Telemetry | Bidirectional Streaming | ¿Í»§¶Ë×ÊÔ´Éϱ¨ |
**·¢ËÍÏûÏ¢Á÷³Ì**£º
```
SendMessageAsync(topic, body, tag, keys, ...)
¡ý
Build GrpcMessage:
- Topic (GrpcResource)
- SystemProperties (Tag, MessageType, Keys, ...)
- UserProperties (×Ô¶¨ÒåÊôÐÔ)
- Body (ÏûÏ¢Ìå)
¡ý
Check MessageType:
- NORMAL: ÆÕͨÏûÏ¢
- DELAY: DeliveryTimestamp ÉèÖÃÑÓ³Ùʱ¼ä
- FIFO: MessageGroup ÉèÖ÷Ö×é
- TRANSACTION: ÊÂÎñÏûÏ¢
¡ý
Build SendMessageRequest
¡ý
GrpcClient.InvokeAsync("SendMessage")
¡ý
Return SendMessageResponse (MessageId, Status, ...)
```
**½ÓÊÕÏûÏ¢Á÷³Ì£¨Server Streaming£©**£º
```
ReceiveMessageAsync(topic, group, timeout)
¡ý
Build ReceiveMessageRequest:
- Group (GrpcResource)
- MessageQueue (¶ÓÁÐÐÅÏ¢)
- LongPollingTimeout (³¤ÂÖѯ³¬Ê±)
¡ý
GrpcClient.InvokeStreamAsync("ReceiveMessage")
¡ý
await foreach (var response in stream)
{
Process Messages
Yield return message
}
```
### 3.4 Protobuf ±à½âÂëÆ÷
NewLife.RocketMQ ÄÚÖÃ**ÇáÁ¿¼¶ Protobuf ±à½âÂëÆ÷**£¨ProtoWriter/ProtoReader£©£¬ÍêÈ«×ÔÑУ¬ÎÞÐèÒÀÀµ Google.Protobuf »ò protobuf-net¡£
#### 3.4.1 ProtoWriter£¨±àÂëÆ÷£©
**Ö§³ÖµÄÊý¾ÝÀàÐÍ**£º
| Wire Type | ÀàÐÍ | ˵Ã÷ |
|-----------|------|------|
| 0 | Varint | int32, int64, uint32, uint64, bool, enum |
| 1 | Fixed64 | fixed64, sfixed64, double |
| 2 | Length-delimited | string, bytes, embedded messages, packed repeated |
| 5 | Fixed32 | fixed32, sfixed32, float |
**±àÂëÁ÷³Ì**£º
```csharp
public class ProtoWriter
{
public void WriteTag(Int32 fieldNumber, WireType wireType)
{
var tag = (fieldNumber << 3) | (Int32)wireType;
WriteVarint((UInt32)tag);
}
public void WriteString(Int32 fieldNumber, String value)
{
WriteTag(fieldNumber, WireType.LengthDelimited);
var bytes = Encoding.UTF8.GetBytes(value);
WriteVarint((UInt32)bytes.Length);
Write(bytes);
}
public void WriteMessage<T>(Int32 fieldNumber, T value)
where T : IProtoMessage
{
var buffer = new MemoryStream();
var writer = new ProtoWriter(buffer);
value.WriteTo(writer);
WriteTag(fieldNumber, WireType.LengthDelimited);
WriteVarint((UInt32)buffer.Length);
Write(buffer.ToArray());
}
}
```
#### 3.4.2 ProtoReader£¨½âÂëÆ÷£©
**½âÂëÁ÷³Ì**£º
```csharp
public class ProtoReader
{
public (Int32 fieldNumber, WireType wireType) ReadTag()
{
var tag = ReadVarint();
var fieldNumber = (Int32)(tag >> 3);
var wireType = (WireType)(tag & 0x7);
return (fieldNumber, wireType);
}
public String ReadString()
{
var length = ReadVarint();
var bytes = Read((Int32)length);
return Encoding.UTF8.GetString(bytes);
}
public T ReadMessage<T>() where T : IProtoMessage, new()
{
var length = ReadVarint();
var bytes = Read((Int32)length);
var stream = new MemoryStream(bytes);
var reader = new ProtoReader(stream);
var message = new T();
message.ReadFrom(reader);
return message;
}
}
```
#### 3.4.3 gRPC ÏûϢģÐÍ
**ºËÐÄÏûÏ¢ÀàÐÍ**£º
```csharp
// ×ÊÔ´£¨Topic/Group£©
public class GrpcResource : IProtoMessage
{
public String ResourceNamespace { get; set; } // Field 1
public String Name { get; set; } // Field 2
}
// ÏûÏ¢
public class GrpcMessage : IProtoMessage
{
public GrpcResource Topic { get; set; } // Field 1
public IDictionary<String, String> UserProperties { get; set; } // Field 2
public GrpcSystemProperties SystemProperties { get; set; } // Field 3
public Byte[] Body { get; set; } // Field 4
}
// ϵͳÊôÐÔ
public class GrpcSystemProperties : IProtoMessage
{
public String Tag { get; set; } // Field 1
public IList<String> Keys { get; set; } // Field 2
public String MessageId { get; set; } // Field 3
public String BornHost { get; set; } // Field 4
public DateTime BornTimestamp { get; set; } // Field 5
public String MessageGroup { get; set; } // Field 11 (FIFO)
public DateTime? DeliveryTimestamp { get; set; } // Field 12 (ÑÓ³Ù)
public GrpcMessageType MessageType { get; set; } // Field 14
}
```
### 3.5 gRPC ÌØÓй¦ÄÜ
#### 3.5.1 ÈÎÒâʱ¼äÑÓ³ÙÏûÏ¢
Remoting ÐÒé½öÖ§³Ö 18 ¼¶Ô¤ÉèÑÓ³Ù£¬gRPC ÐÒéÖ§³ÖÈÎÒâʱ¼ä´ÁÑÓ³Ù£º
```csharp
// ÑÓ³Ùµ½Ö¸¶¨Ê±¼ä
var deliveryTime = DateTime.Now.AddMinutes(30);
await producer.PublishDelayViaGrpcAsync("ÑÓ³ÙÏûÏ¢", deliveryTime);
```
**ʵÏÖÔÀí**£º
```
GrpcSystemProperties.MessageType = DELAY
GrpcSystemProperties.DeliveryTimestamp = deliveryTime (DateTime)
¡ý
Protobuf Field 12 (timestamp):
seconds = (deliveryTime - UnixEpoch).TotalSeconds
nanos = deliveryTime.Millisecond * 1_000_000
¡ý
Proxy ÊÕµ½ºóÑÓ³ÙͶµÝµ½ Topic
```
#### 3.5.2 ¿Í»§¶Ë×ÊÔ´Éϱ¨£¨Telemetry£©
```
TelemetryViaGrpcAsync()
¡ý
Build TelemetryCommand:
- Settings (¿Í»§¶ËÅäÖÃ)
- Endpoints (Á¬½Ó¶Ëµã)
- Threads (Ïß³ÌÊý)
- Language (DOTNET)
¡ý
Build TelemetryRequest
¡ý
GrpcClient.InvokeStreamAsync("Telemetry")
¡ý
Bidirectional Streaming:
Client ¡ú Server: TelemetryCommand
Server ¡ú Client: TelemetryCommand (ÑéÖ¤/Ö¸Áî)
```
---
## ËÄ¡¢¸ºÔؾùºâÓë·ÓÉ
### 4.1 Éú²úÕ߸ºÔؾùºâ
**ILoadBalance ½Ó¿Ú**£º
```csharp
public interface ILoadBalance
{
String Name { get; }
Boolean Ready { get; set; }
MessageQueue Select(IList<MessageQueue> queues);
}
```
**WeightRoundRobin£¨¼ÓȨÂÖѯ£©**£º
```
Build Queue List:
For each Broker in Brokers:
For i in 0..WriteQueueNums:
Queue { BrokerName, QueueId, Weight = Broker.Weight }
¡ý
Sort by (BrokerName, QueueId)
¡ý
Round Robin with Weight:
currentWeight[queue] += queue.Weight
maxWeightQueue = Max(currentWeight)
currentWeight[maxWeightQueue] -= TotalWeight
Return maxWeightQueue
```
### 4.2 Ïû·ÑÕ߸ºÔؾùºâ£¨Rebalance£©
**ƽ¾ù·ÖÅäËã·¨**£º
```
Get All Consumers in Group (sorted)
Get All Queues in Topic (sorted)
¡ý
Find My Index in Consumers
¡ý
Allocate Queues:
averageSize = queues.Count / consumers.Count
remainder = queues.Count % consumers.Count
if (myIndex < remainder)
myQueues = queues[(myIndex * (averageSize + 1))..((myIndex + 1) * (averageSize + 1))]
else
startIndex = myIndex * averageSize + remainder
myQueues = queues[startIndex..(startIndex + averageSize)]
¡ý
Lock Queues (if OrderConsume)
¡ý
Start Consume Threads
```
**¶à Topic ¶©ÔÄ**£º
```
Topics = "topic1;topic2;topic3"
¡ý
For each Topic:
Get Brokers from NameServer
Build Queue List
Rebalance(Topic, Queues)
¡ý
Merge All Queues
Start Consume
```
---
## Îå¡¢ÏûÏ¢¿É¿¿ÐÔ±£Ö¤
### 5.1 Ïû·ÑÖØÊÔ»úÖÆ
```
Consumer.OnConsume() return false
¡ý
SendMessageBackAsync(msg):
- Topic = "%RETRY%{ConsumerGroup}"
- DelayLevel = msg.ReconsumeTimes + 3
- ReconsumeTimes++
¡ý
if (ReconsumeTimes >= MaxReconsumeTimes)
Topic = "%DLQ%{ConsumerGroup}" // ½øÈëËÀÐŶÓÁÐ
¡ý
Send CONSUMER_SEND_MSG_BACK (36) to Broker
```
### 5.2 ÊÂÎñÏûÏ¢Á÷³Ì
```
1. Producer.PublishTransaction("¶©µ¥´´½¨")
¡ý
2. Send Half Message (PreparedTransaction = true)
¡ý
3. Execute Local Transaction
¡ý
4. EndTransaction(sendResult, Commit/Rollback)
- Send END_TRANSACTION (37) to Broker
¡ý
5. Broker ¶¨ÆÚ»Ø²é (CHECK_TRANSACTION_STATE)
- Producer.OnCheckTransaction(msg, transactionId)
- Return TransactionState (Commit/Rollback/Unknown)
```
### 5.3 ˳ÐòÏû·Ñ±£Ö¤
```
Producer:
SelectQueue(key) - Ïàͬ key ½øÈëͬһ¶ÓÁÐ
Publish(message, queue)
¡ý
Consumer:
OrderConsume = true
LockBatchMQAsync(queues) - Ïò Broker ÉêÇëËø¶¨¶ÓÁÐ
¡ý
Single Thread Consume Each Queue
Offset Commit After Success
¡ý
UnlockBatchMQAsync(queues) - ÊÍ·ÅËø¶¨
```
---
## Áù¡¢ÐÔÄÜÓÅ»¯²ßÂÔ
### 6.1 Á¬½Ó¸´ÓÃ
**Remoting ÐÒé**£º
- µ¥Á¬½Ó Opaque ¸´Óã¨RequestId Æ¥ÅäÏìÓ¦£©
- Á¬½Ó³Ø¹ÜÀí£¨ConcurrentDictionary£©
- ÐÄÌø±£»î£¨30s£©
**gRPC ÐÒé**£º
- HTTP/2 ¶à·¸´Óã¨ÌìȻ֧³Ö£©
- HttpClient Á¬½Ó³Ø
### 6.2 ÏûϢѹËõ
```
if (message.Body.Length > CompressOverBytes)
{
message.Body = ZLIB.Compress(message.Body);
message.SysFlag |= 0x1; // ±ê¼ÇѹËõ
}
```
**½ÓÊÕ¶Ë×Ô¶¯½âѹ**£º
```
if ((msg.SysFlag & 0x1) != 0)
{
msg.Body = ZLIB.Decompress(msg.Body);
}
```
### 6.3 VIP ͨµÀ
```
if (VipChannelEnabled)
{
var port = brokerAddress.Port;
vipAddress = $"{brokerAddress.Host}:{port - 2}";
client = GetClient(vipAddress); // ÓÅÏȼ¶¸ü¸ßµÄÁ¬½Ó
}
```
### 6.4 ²¢·¢¿ØÖÆ
```
private SemaphoreSlim _consumeSemaphore;
public void Start()
{
_consumeSemaphore = new SemaphoreSlim(MaxConcurrentConsume);
}
await _consumeSemaphore.WaitAsync(cancellationToken);
try
{
await ProcessMessageAsync(message);
}
finally
{
_consumeSemaphore.Release();
}
```
---
## Æß¡¢À©Õ¹ÐÔÉè¼Æ
### 7.1 ÔÆ³§ÉÌÊÊÅäÆ÷
```csharp
public interface ICloudProvider
{
String Name { get; }
String AccessKey { get; }
String SecretKey { get; }
String OnsChannel { get; }
String TransformTopic(String topic);
String TransformGroup(String group);
String GetNameServerAddress();
}
```
**ʵÏÖʾÀý£¨°¢ÀïÔÆ£©**£º
```csharp
public class AliyunProvider : ICloudProvider
{
public String InstanceId { get; set; }
public String TransformTopic(String topic)
{
return $"{InstanceId}%{topic}";
}
public String TransformGroup(String group)
{
return $"{InstanceId}%{group}";
}
public String GetNameServerAddress()
{
// HTTP ½Ó¿Ú»ñÈ¡ NameServer µØÖ·
var url = $"http://MQInstXXX.aliyuncs.com:80/rocketmq/nsaddr4client-internal";
return HttpGet(url);
}
}
```
### 7.2 ÏûÏ¢¹³×Ó
```csharp
public interface ISendMessageHook
{
void SendMessageBefore(SendMessageContext context);
void SendMessageAfter(SendMessageContext context);
}
public interface IConsumeMessageHook
{
void ConsumeBefore(ConsumeMessageContext context);
void ConsumeAfter(ConsumeMessageContext context);
}
```
**ʹÓó¡¾°**£º
- ÏûÏ¢¹ì¼£×·×Ù£¨MessageTraceHook£©
- ×Ô¶¨ÒåÈÕÖ¾¼Ç¼
- ÐÔÄÜ¼à¿Ø
- ÒµÎñÀ¹½Ø
---
## °Ë¡¢×ܽá
NewLife.RocketMQ ͨ¹ý**·Ö²ãÇåÎú¡¢Ö°Ôðµ¥Ò»¡¢¿ÉÀ©Õ¹**µÄ¼Ü¹¹Éè¼Æ£¬ÊµÏÖÁË£º
1. **Ë«ÐÒéÖ§³Ö**£ºRemoting£¨Îȶ¨¸ßЧ£©+ gRPC£¨ÃæÏòδÀ´£©
2. **ÁãÍⲿÒÀÀµ**£º´¿ C# ʵÏÖ£¬ÄÚÖà Protobuf ±à½âÂëÆ÷
3. **¶àÔÆÊÊÅä**£ºÍ³Ò» ICloudProvider ½Ó¿Ú£¬ÇáËɽÓÈë¸÷ÔÆ·þÎñÉÌ
4. **¸ßÐÔÄÜ**£ºÁ¬½Ó¸´Óá¢ÏûϢѹËõ¡¢VIP ͨµÀ¡¢²¢·¢¿ØÖÆ
5. **¸ß¿É¿¿**£ºÏû·ÑÖØÊÔ¡¢ËÀÐŶÓÁС¢ÊÂÎñ»Ø²é¡¢Ë³ÐòÏû·ÑËø¶¨
6. **Ò×À©Õ¹**£º¹³×Ó»úÖÆ¡¢ÔƳ§ÉÌÊÊÅäÆ÷¡¢¸ºÔؾùºâ½Ó¿Ú
---
**Îĵµ½áÊø**
|