所有分支的提交所有分支的提交都要跑test都要跑test
大石头 authored at 2022-03-29 23:35:41
26.04 KiB
NewLife.RocketMQ
# NewLife.RocketMQ ¼Ü¹¹Éè¼ÆÎĵµ ## Ò»¡¢×ÜÌå¼Ü¹¹ ### 1.1 Éè¼ÆÀíÄî NewLife.RocketMQ ²ÉÓÃ**·Ö²ãÇåÎú¡¢Ö°Ôðµ¥Ò»¡¢¿ÉÀ©Õ¹**µÄ¼Ü¹¹Éè¼Æ£º - **´¿ÍйÜʵÏÖ**£ºÍêȫʹÓà C# ʵÏÖ£¬ÎÞÐè Java¡¢gRPC¡¢Protobuf µÚÈý·½¿â - **˫ЭÒéÖ§³Ö**£ºÍ¬Ê±Ö§³Ö Remoting ЭÒ飨4.x£©ºÍ gRPC ЭÒ飨5.x£© - **ÔÆ³§ÉÌÊÊÅä**£ºÍ³Ò» `ICloudProvider` ½Ó¿Ú£¬ÇáËɽÓÈë¸÷ÔÆ·þÎñÉÌ - **¸ßÐÔÄÜÓÅ»¯**£ºÁ¬½Ó¸´ÓᢶÔÏ󳨡¢VIP ͨµÀ¡¢ÏûϢѹËõµÈÓÅ»¯ÊÖ¶Î - **¿É²âÊÔÐÔ**£º30+ ²âÊÔÀา¸ÇºËÐŦÄܺͱßÔµ³¡¾° ### 1.2 ²ã´Î½á¹¹ ``` ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ÒµÎñ²ã (MqBase) ©¦ ©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ©¦ ©¦ Producer ©¦ ©¦ Consumer ©¦ ©¦ ©¦ ©¦ Éú²úÕßÒµÎñÂß¼­ ©¦ ©¦ Ïû·ÑÕßÒµÎñÂß¼­ ©¦ ©¦ ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ͨÐŲã (Client) ©¦ ©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ©¦ ©¦ ClusterClient ©¦ ©¦ GrpcClient ©¦ ©¦ ©¦ ©¦ TCP ³¤Á¬½Ó¹ÜÀí ©¦ ©¦ HTTP/2 gRPC ©¦ ©¦ ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦ ©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ©¦ ©¦ NameClient ©¦ ©¦GrpcMessaging ©¦ ©¦ ©¦ ©¦ ·ÓÉ·¢ÏÖ ©¦ ©¦Service ©¦ ©¦ ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦ ©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ©¦ ©¦ BrokerClient ©¦ ©¦ ©¦ ©¦ ÐÄÌø/×¢Ïú ©¦ ©¦ ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ЭÒé²ã (Protocol) ©¦ ©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ©¦ ©¦ Command ©¦ ©¦ GrpcModels ©¦ ©¦ ©¦ ©¦ Remoting Ö¡ ©¦ ©¦ gRPC ÏûϢģÐÍ ©¦ ©¦ ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦ ©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ©¦ ©¦ MessageExt ©¦ ©¦ ProtoWriter ©¦ ©¦ ©¦ ©¦ ÏûϢģÐÍ ©¦ ©¦ Protobuf ±àÂë ©¦ ©¦ ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦ ©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ©¦ ©¦ RequestCode ©¦ ©¦ ProtoReader ©¦ ©¦ ©¦ ©¦ ÇëÇóÂëö¾Ù ©¦ ©¦ Protobuf ½âÂë ©¦ ©¦ ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ´«Êä²ã (Transport) ©¦ ©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ©¦ ©¦ NewLife.Net ©¦ ©¦ HttpClient ©¦ ©¦ ©¦ ©¦ TCP Socket ©¦ ©¦ HTTP/2 ©¦ ©¦ ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ÔÆ³§ÉÌÊÊÅä²ã (CloudProvider) ©¦ ©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ©¦ ©¦Aliyun ©¦ ©¦Huawei ©¦ ©¦Tencent ©¦ ©¦ ACL ©¦ ©¦ ©¦ ©¦Provider©¦ ©¦Provider©¦ ©¦Provider©¦ ©¦Provider©¦ ©¦ ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ``` --- ## ¶þ¡¢Remoting ЭÒé¼Ü¹¹£¨RocketMQ 4.x/5.x Broker£© ### 2.1 ЭÒé¸ÅÊö Remoting ЭÒéÊÇ RocketMQ µÄ¾­µä TCP ˽ÓÐЭÒ飬²ÉÓÃ**¶þ½øÖÆÖ¡¸ñʽ + JSON/¶þ½øÖÆÐòÁл¯**£¬¾ßÓиßЧ¡¢Îȶ¨µÄÌØµã¡£ ### 2.2 ЭÒéÖ¡¸ñʽ ``` ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ Remoting ЭÒéÖ¡ ©¦ ©À©¤©¤©¤©¤©¤©¤©¤©¤©Ð©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©Ð©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©Ð©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©È ©¦ Length ©¦HeaderLength©¦ Header ©¦ Body ©¦ ©¦ 4 bytes©¦ 4 bytes ©¦ N bytes ©¦ M bytes ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©Ø©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©Ø©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©Ø©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ Length = 4 + N + M £¨×ܳ¤¶È£© HeaderLength ¸ß 8 λ£ºSerializeType (0=JSON, 1=ROCKETMQ) HeaderLength µÍ 24 λ£ºÊµ¼Ê Header ³¤¶È ``` **ЭÒéÖ¡±à½âÂëÁ÷³Ì**£º ``` ·¢ËÍ·½£º Message ¡ú Command.Create() ¡ú SetBody() ¡ú Build() ¡ú [Length][HeaderLength][Header][Body] ¡ú TCP Socket ½ÓÊÕ·½£º TCP Socket ¡ú ReadLength(4 bytes) ¡ú ReadHeaderLength(4 bytes) ¡ú ReadHeader(N bytes) ¡ú ReadBody(M bytes) ¡ú Command.Parse() ¡ú Message ``` ### 2.3 ͨÐŲã¼Ü¹¹ #### 2.3.1 ClusterClient£¨¼¯Èº¿Í»§¶Ë£© **Ö°Ôð**£º - TCP Á¬½Ó¹ÜÀíÓ븴Óà - ÃüÁî·¢ËÍÓëÏìÓ¦½ÓÊÕ - HMAC-SHA1 ͳһǩÃû - Opaque ÇëÇó-ÏìӦƥÅä **ºËÐÄÁ÷³Ì**£º ``` ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ClusterClient ¹¤×÷Á÷³Ì ©¦ ©À©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©È ©¦ 1. GetClient(address) - »ñÈ¡»ò´´½¨ TCP Á¬½Ó ©¦ ©¦ 2. SetSignature() - ÉèÖÃÔÆ³§ÉÌÇ©Ãû ©¦ ©¦ 3. Invoke() / InvokeAsync() - ·¢ËÍÃüÁî ©¦ ©¦ 4. Opaque Æ¥ÅäÏìÓ¦ ©¦ ©¦ 5. ÏìÓ¦³¬Ê±´¦Àí ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ``` **Á¬½Ó¸´ÓûúÖÆ**£º ```csharp // µ¥Á¬½Ó Opaque ¸´Óà private ConcurrentDictionary<String, TcpSession> _clients = new(); public TcpSession GetClient(String address) { return _clients.GetOrAdd(address, addr => { var client = new TcpSession(); client.Remote = new NetUri(addr); client.Open(); return client; }); } ``` #### 2.3.2 NameClient£¨Ãû³Æ·þÎñÆ÷¿Í»§¶Ë£© **Ö°Ôð**£º - Topic ·ÓÉ·¢ÏÖ - ¶¨Ê±ÂÖѯ·ÓɸüУ¨30s£© - ¶à Topic ·ÓɹÜÀí - Broker Ö÷´ÓµØÖ·½âÎö **·ÓÉ·¢ÏÖÁ÷³Ì**£º ``` Start() ¡ú UpdateRouteAsync() ¡ý GetRouteInfo(Topic) ¡ú NameServer ¡ý Parse Response: - BrokerDatas (Broker ¼¯ÈºÐÅÏ¢) - QueueDatas (¶ÓÁзֲ¼ÐÅÏ¢) ¡ý Build BrokerInfo: - MasterAddress (Ö÷½ÚµãµØÖ·) - SlaveAddresses (´Ó½ÚµãµØÖ·Áбí) ¡ý Cache & Notify ¡ú OnBrokerChange Event ¡ý ¶¨Ê±Æ÷ 30s ºóÖØ¸´ ``` #### 2.3.3 BrokerClient£¨Broker ¿Í»§¶Ë£© **Ö°Ôð**£º - Broker ÐÄÌø£¨30s£© - ¿Í»§¶Ë×¢Ïú - ÃüÁîÊÕ·¢£¨¼Ì³Ð×Ô ClusterClient£© **ÐÄÌø»úÖÆ**£º ``` Start() ¡ú SendHeartbeat() ¡ý Build HeartbeatData: - ClientID - ProducerDataSet / ConsumerDataSet ¡ý Send HEART_BEAT (34) ¡ú Broker ¡ý ¶¨Ê±Æ÷ 30s ºóÖØ¸´ ``` ### 2.4 ºËÐÄ RequestCode | ·ÖÀà | RequestCode | Öµ | ˵Ã÷ | |------|------------|:--:|------| | **ÏûÏ¢·¢ËÍ** | SEND_MESSAGE_V2 | 310 | ·¢ËÍÏûÏ¢V2 | | | SEND_BATCH_MESSAGE | 320 | ÅúÁ¿·¢ËÍ | | | SEND_REPLY_MESSAGE_V2 | 325 | »Ø¸´ÏûÏ¢ | | **ÏûÏ¢À­È¡** | PULL_MESSAGE | 11 | À­È¡ÏûÏ¢ | | | POP_MESSAGE | 200050 | Pop Ïû·Ñ | | | ACK_MESSAGE | 200051 | Pop È·ÈÏ | | **ÊÂÎñÏûÏ¢** | END_TRANSACTION | 37 | ½áÊøÊÂÎñ | | | CHECK_TRANSACTION_STATE | 39 | ÊÂÎñ»Ø²é | | **Æ«ÒÆ¹ÜÀí** | QUERY_CONSUMER_OFFSET | 14 | ²éÑ¯Æ«ÒÆ | | | UPDATE_CONSUMER_OFFSET | 15 | ¸üÐÂÆ«ÒÆ | | | SEARCH_OFFSET_BY_TIMESTAMP | 29 | °´Ê±¼ä´ÁËÑË÷ | | **˳ÐòÏû·Ñ** | LOCK_BATCH_MQ | 41 | Ëø¶¨¶ÓÁÐ | | | UNLOCK_BATCH_MQ | 42 | ½âËø¶ÓÁÐ | | **·ÓɹÜÀí** | GET_ROUTEINTO_BY_TOPIC | 105 | Topic ·ÓÉ | | | GET_BROKER_CLUSTER_INFO | 106 | ¼¯ÈºÐÅÏ¢ | | **ÐÄÌø** | HEART_BEAT | 34 | ÐÄÌø | | | UNREGISTER_CLIENT | 35 | ×¢Ïú¿Í»§¶Ë | | | NOTIFY_CONSUMER_IDS_CHANGED | 40 | Ïû·ÑÕß±ä¸ü֪ͨ | ### 2.5 ÏûÏ¢±à½âÂë #### 2.5.1 ÏûÏ¢·¢ËͱàÂë ``` Message ¡ú SendMessageRequestHeader ¡ý Build Command: - RequestCode = 310 (SEND_MESSAGE_V2) - Header (JSON »ò¶þ½øÖÆ) - Body (ÏûÏ¢Ì壬¿ÉѹËõ) ¡ý Check Compress: if (Body.Length > CompressOverBytes) Body = ZLIB.Compress(Body) SysFlag |= 0x1 // ±ê¼ÇѹËõ ¡ý Send to Broker ``` #### 2.5.2 ÏûÏ¢½ÓÊÕ½âÂë ``` Receive from Broker ¡ý MessageExt.Read(buffer): 1. Parse SysFlag 2. Check Compress (SysFlag & 0x1) if compressed: Body = ZLIB.Decompress(Body) 3. Check IPv6 (SysFlag & 0x4) if IPv6: IP = 16 bytes else: IP = 4 bytes 4. Parse Properties (Key-Value) ¡ý Return MessageExt ``` #### 2.5.3 ÅúÁ¿ÏûÏ¢½âÂë ``` MessageExt.DecodeBatch(body): while (offset < body.Length) { Read TotalSize (4 bytes) Read MagicCode (4 bytes) Read BodyCRC (4 bytes) Read SysFlag (4 bytes) Read BornTimestamp (8 bytes) Read BornHost (8 bytes, IPv6) or (4 bytes, IPv4) Read StoreTimestamp (8 bytes) Read StoreHost (8 bytes, IPv6) or (4 bytes, IPv4) Read ReconsumeTimes (4 bytes) Read PreparedTransactionOffset (8 bytes) Read BodyLength (4 bytes) Read Body (BodyLength bytes) Read TopicLength (1 byte) Read Topic (TopicLength bytes) Read PropertiesLength (2 bytes) Read Properties (PropertiesLength bytes) Build MessageExt offset += TotalSize } Return List<MessageExt> ``` ### 2.6 Ç©ÃûÈÏÖ¤»úÖÆ #### 2.6.1 HMAC-SHA1 Ç©ÃûÁ÷³Ì ``` ClusterClient.SetSignature(request, provider): 1. Build SignatureData: - AccessKey - OnsChannel - Timestamp - RequestData (sorted properties) 2. HMAC-SHA1(SignatureData, SecretKey) 3. Add to ExtFields: - AccessKey - Signature - OnsChannel ``` #### 2.6.2 ÔÆ³§ÉÌÊÊÅä ``` ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ICloudProvider ½Ó¿Ú ©¦ ©À©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©È ©¦ - TransformTopic(topic) - ת»»Ö÷ÌâÃû ©¦ ©¦ - TransformGroup(group) - ת»»Ïû·Ñ×éÃû ©¦ ©¦ - GetNameServerAddress() - »ñÈ¡ NameServer ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¦ ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©à©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ ©¦ ©¦ ©°©¤©¤©¤¨‹©¤©¤©¤©´ ©°©¤©¤©¤©¤¨‹©¤©¤©¤©¤©´ ©°©¤©¤©¤©¤¨‹©¤©¤©¤©¤©´ ©¦Aliyun ©¦ ©¦ Huawei ©¦ ©¦Tencent ©¦ ©¦ÊµÀýID ©¦ ©¦ÊµÀýID + ©¦ ©¦Namespace©¦ ©¦Ç°×º ©¦ ©¦SSL/TLS ©¦ ©¦Ç°×º ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ``` --- ## Èý¡¢gRPC ЭÒé¼Ü¹¹£¨RocketMQ 5.x Proxy£© ### 3.1 ЭÒé¸ÅÊö gRPC ЭÒéÊÇ RocketMQ 5.x ÒýÈëµÄмܹ¹£¬²ÉÓà **HTTP/2 + Protobuf** ±ê׼ЭÒ飬¾ßÓÐÒÔÏÂÌØµã£º - »ùÓÚ HTTP/2 ¶à·¸´Óà - Protobuf ¶þ½øÖÆÐòÁл¯ - Ö§³Ö Unary£¨ÇëÇó-ÏìÓ¦£©ºÍ Server Streaming£¨·þÎñ¶ËÁ÷ʽ£© - ÄÚÖÃÇáÁ¿¼¶ Protobuf ±à½âÂëÆ÷£¨**ÎÞÍⲿÒÀÀµ**£© **½öÔÚ netstandard2.1+ / net5+ Ä¿±ê¿ò¼Ü¿ÉÓÃ** ### 3.2 gRPC Ö¡¸ñʽ ``` ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ gRPC ÏûÏ¢Ö¡ ©¦ ©À©¤©¤©¤©¤©¤©¤©¤©¤©Ð©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©Ð©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©È ©¦ Comp ©¦ Length ©¦ Body ©¦ ©¦ 1 byte ©¦ 4 bytes ©¦ N bytes ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©Ø©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©Ø©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ Comp: 0=²»Ñ¹Ëõ, 1=gzip ѹËõ Length: ´ó¶ËÐò£¬Protobuf ÏûÏ¢Ì峤¶È Body: Protobuf ±àÂëµÄÏûÏ¢ ``` **HTTP/2 ÇëÇó¸ñʽ**£º ``` POST /apache.rocketmq.v2.MessagingService/{MethodName} HTTP/2 Host: proxy.example.com:8081 Content-Type: application/grpc X-Mq-Language: DOTNET X-Mq-Client-Version: 3.0.2026.0701 Authorization: ... [gRPC ÏûÏ¢Ö¡] ``` ### 3.3 ͨÐŲã¼Ü¹¹ #### 3.3.1 GrpcClient£¨gRPC ¿Í»§¶Ë£© **Ö°Ôð**£º - HTTP/2 Á¬½Ó¹ÜÀí - gRPC Ö¡±à½âÂë - Unary ºÍ Server Streaming µ÷Óà - ÇëÇó³¬Ê±¿ØÖÆ **ºËÐÄÁ÷³Ì**£º ``` ©°©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©´ ©¦ GrpcClient ¹¤×÷Á÷³Ì ©¦ ©À©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©È ©¦ 1. InvokeAsync(service, method, request) ©¦ ©¦ 2. Build HTTP/2 Request ©¦ ©¦ 3. Encode Request (ProtoWriter) ©¦ ©¦ 4. Send via HttpClient ©¦ ©¦ 5. Receive Response Stream ©¦ ©¦ 6. Decode Response (ProtoReader) ©¦ ©¦ 7. Return TResponse ©¦ ©¸©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¤©¼ ``` **Unary µ÷ÓÃ**£º ```csharp public async Task<TResponse> InvokeAsync<TRequest, TResponse>( String service, String method, TRequest request) { var url = $"{BaseUri}/{service}/{method}"; var body = ProtoWriter.Encode(request); var content = new ByteArrayContent(BuildFrame(body)); var response = await _httpClient.PostAsync(url, content); var stream = await response.Content.ReadAsStreamAsync(); return ProtoReader.Decode<TResponse>(stream); } ``` **Server Streaming µ÷ÓÃ**£º ```csharp public async IAsyncEnumerable<TResponse> InvokeStreamAsync<TRequest, TResponse>( String service, String method, TRequest request) { var url = $"{BaseUri}/{service}/{method}"; var body = ProtoWriter.Encode(request); var content = new ByteArrayContent(BuildFrame(body)); var response = await _httpClient.PostAsync(url, content, HttpCompletionOption.ResponseHeadersRead); var stream = await response.Content.ReadAsStreamAsync(); while (!stream.EndOfStream) { yield return ProtoReader.Decode<TResponse>(stream); } } ``` #### 3.3.2 GrpcMessagingService£¨ÏûÏ¢·þÎñ£© **11 ¸öºËÐÄ RPC ·½·¨**£º | ·½·¨ | ÀàÐÍ | ˵Ã÷ | |------|------|------| | QueryRoute | Unary | ²éѯÖ÷Ìâ·ÓÉ | | SendMessage | Unary | ·¢ËÍÏûÏ¢£¨ÆÕͨ/ÑÓ³Ù/FIFO/ÊÂÎñ£© | | QueryAssignment | Unary | ²éѯ¶ÓÁзÖÅä | | ReceiveMessage | Server Streaming | ½ÓÊÕÏûÏ¢£¨³¤ÂÖѯ£© | | AckMessage | Unary | È·ÈÏÏûÏ¢Ïû·Ñ | | Heartbeat | Unary | ÐÄÌø | | EndTransaction | Unary | ½áÊøÊÂÎñ | | ForwardToDeadLetterQueue | Unary | ת·¢µ½ËÀÐŶÓÁÐ | | ChangeInvisibleDuration | Unary | Ð޸IJ»¿É¼ûʱ¼ä | | NotifyClientTermination | Unary | ֪ͨ¿Í»§¶ËÖÕÖ¹ | | Telemetry | Bidirectional Streaming | ¿Í»§¶Ë×ÊÔ´Éϱ¨ | **·¢ËÍÏûÏ¢Á÷³Ì**£º ``` SendMessageAsync(topic, body, tag, keys, ...) ¡ý Build GrpcMessage: - Topic (GrpcResource) - SystemProperties (Tag, MessageType, Keys, ...) - UserProperties (×Ô¶¨ÒåÊôÐÔ) - Body (ÏûÏ¢Ìå) ¡ý Check MessageType: - NORMAL: ÆÕͨÏûÏ¢ - DELAY: DeliveryTimestamp ÉèÖÃÑÓ³Ùʱ¼ä - FIFO: MessageGroup ÉèÖ÷Ö×é - TRANSACTION: ÊÂÎñÏûÏ¢ ¡ý Build SendMessageRequest ¡ý GrpcClient.InvokeAsync("SendMessage") ¡ý Return SendMessageResponse (MessageId, Status, ...) ``` **½ÓÊÕÏûÏ¢Á÷³Ì£¨Server Streaming£©**£º ``` ReceiveMessageAsync(topic, group, timeout) ¡ý Build ReceiveMessageRequest: - Group (GrpcResource) - MessageQueue (¶ÓÁÐÐÅÏ¢) - LongPollingTimeout (³¤ÂÖѯ³¬Ê±) ¡ý GrpcClient.InvokeStreamAsync("ReceiveMessage") ¡ý await foreach (var response in stream) { Process Messages Yield return message } ``` ### 3.4 Protobuf ±à½âÂëÆ÷ NewLife.RocketMQ ÄÚÖÃ**ÇáÁ¿¼¶ Protobuf ±à½âÂëÆ÷**£¨ProtoWriter/ProtoReader£©£¬ÍêÈ«×ÔÑУ¬ÎÞÐèÒÀÀµ Google.Protobuf »ò protobuf-net¡£ #### 3.4.1 ProtoWriter£¨±àÂëÆ÷£© **Ö§³ÖµÄÊý¾ÝÀàÐÍ**£º | Wire Type | ÀàÐÍ | ˵Ã÷ | |-----------|------|------| | 0 | Varint | int32, int64, uint32, uint64, bool, enum | | 1 | Fixed64 | fixed64, sfixed64, double | | 2 | Length-delimited | string, bytes, embedded messages, packed repeated | | 5 | Fixed32 | fixed32, sfixed32, float | **±àÂëÁ÷³Ì**£º ```csharp public class ProtoWriter { public void WriteTag(Int32 fieldNumber, WireType wireType) { var tag = (fieldNumber << 3) | (Int32)wireType; WriteVarint((UInt32)tag); } public void WriteString(Int32 fieldNumber, String value) { WriteTag(fieldNumber, WireType.LengthDelimited); var bytes = Encoding.UTF8.GetBytes(value); WriteVarint((UInt32)bytes.Length); Write(bytes); } public void WriteMessage<T>(Int32 fieldNumber, T value) where T : IProtoMessage { var buffer = new MemoryStream(); var writer = new ProtoWriter(buffer); value.WriteTo(writer); WriteTag(fieldNumber, WireType.LengthDelimited); WriteVarint((UInt32)buffer.Length); Write(buffer.ToArray()); } } ``` #### 3.4.2 ProtoReader£¨½âÂëÆ÷£© **½âÂëÁ÷³Ì**£º ```csharp public class ProtoReader { public (Int32 fieldNumber, WireType wireType) ReadTag() { var tag = ReadVarint(); var fieldNumber = (Int32)(tag >> 3); var wireType = (WireType)(tag & 0x7); return (fieldNumber, wireType); } public String ReadString() { var length = ReadVarint(); var bytes = Read((Int32)length); return Encoding.UTF8.GetString(bytes); } public T ReadMessage<T>() where T : IProtoMessage, new() { var length = ReadVarint(); var bytes = Read((Int32)length); var stream = new MemoryStream(bytes); var reader = new ProtoReader(stream); var message = new T(); message.ReadFrom(reader); return message; } } ``` #### 3.4.3 gRPC ÏûϢģÐÍ **ºËÐÄÏûÏ¢ÀàÐÍ**£º ```csharp // ×ÊÔ´£¨Topic/Group£© public class GrpcResource : IProtoMessage { public String ResourceNamespace { get; set; } // Field 1 public String Name { get; set; } // Field 2 } // ÏûÏ¢ public class GrpcMessage : IProtoMessage { public GrpcResource Topic { get; set; } // Field 1 public IDictionary<String, String> UserProperties { get; set; } // Field 2 public GrpcSystemProperties SystemProperties { get; set; } // Field 3 public Byte[] Body { get; set; } // Field 4 } // ϵͳÊôÐÔ public class GrpcSystemProperties : IProtoMessage { public String Tag { get; set; } // Field 1 public IList<String> Keys { get; set; } // Field 2 public String MessageId { get; set; } // Field 3 public String BornHost { get; set; } // Field 4 public DateTime BornTimestamp { get; set; } // Field 5 public String MessageGroup { get; set; } // Field 11 (FIFO) public DateTime? DeliveryTimestamp { get; set; } // Field 12 (ÑÓ³Ù) public GrpcMessageType MessageType { get; set; } // Field 14 } ``` ### 3.5 gRPC ÌØÓй¦ÄÜ #### 3.5.1 ÈÎÒâʱ¼äÑÓ³ÙÏûÏ¢ Remoting ЭÒé½öÖ§³Ö 18 ¼¶Ô¤ÉèÑÓ³Ù£¬gRPC ЭÒéÖ§³ÖÈÎÒâʱ¼ä´ÁÑÓ³Ù£º ```csharp // ÑÓ³Ùµ½Ö¸¶¨Ê±¼ä var deliveryTime = DateTime.Now.AddMinutes(30); await producer.PublishDelayViaGrpcAsync("ÑÓ³ÙÏûÏ¢", deliveryTime); ``` **ʵÏÖÔ­Àí**£º ``` GrpcSystemProperties.MessageType = DELAY GrpcSystemProperties.DeliveryTimestamp = deliveryTime (DateTime) ¡ý Protobuf Field 12 (timestamp): seconds = (deliveryTime - UnixEpoch).TotalSeconds nanos = deliveryTime.Millisecond * 1_000_000 ¡ý Proxy ÊÕµ½ºóÑÓ³ÙͶµÝµ½ Topic ``` #### 3.5.2 ¿Í»§¶Ë×ÊÔ´Éϱ¨£¨Telemetry£© ``` TelemetryViaGrpcAsync() ¡ý Build TelemetryCommand: - Settings (¿Í»§¶ËÅäÖÃ) - Endpoints (Á¬½Ó¶Ëµã) - Threads (Ïß³ÌÊý) - Language (DOTNET) ¡ý Build TelemetryRequest ¡ý GrpcClient.InvokeStreamAsync("Telemetry") ¡ý Bidirectional Streaming: Client ¡ú Server: TelemetryCommand Server ¡ú Client: TelemetryCommand (ÑéÖ¤/Ö¸Áî) ``` --- ## ËÄ¡¢¸ºÔؾùºâÓë·ÓÉ ### 4.1 Éú²úÕ߸ºÔؾùºâ **ILoadBalance ½Ó¿Ú**£º ```csharp public interface ILoadBalance { String Name { get; } Boolean Ready { get; set; } MessageQueue Select(IList<MessageQueue> queues); } ``` **WeightRoundRobin£¨¼ÓȨÂÖѯ£©**£º ``` Build Queue List: For each Broker in Brokers: For i in 0..WriteQueueNums: Queue { BrokerName, QueueId, Weight = Broker.Weight } ¡ý Sort by (BrokerName, QueueId) ¡ý Round Robin with Weight: currentWeight[queue] += queue.Weight maxWeightQueue = Max(currentWeight) currentWeight[maxWeightQueue] -= TotalWeight Return maxWeightQueue ``` ### 4.2 Ïû·ÑÕ߸ºÔؾùºâ£¨Rebalance£© **ƽ¾ù·ÖÅäËã·¨**£º ``` Get All Consumers in Group (sorted) Get All Queues in Topic (sorted) ¡ý Find My Index in Consumers ¡ý Allocate Queues: averageSize = queues.Count / consumers.Count remainder = queues.Count % consumers.Count if (myIndex < remainder) myQueues = queues[(myIndex * (averageSize + 1))..((myIndex + 1) * (averageSize + 1))] else startIndex = myIndex * averageSize + remainder myQueues = queues[startIndex..(startIndex + averageSize)] ¡ý Lock Queues (if OrderConsume) ¡ý Start Consume Threads ``` **¶à Topic ¶©ÔÄ**£º ``` Topics = "topic1;topic2;topic3" ¡ý For each Topic: Get Brokers from NameServer Build Queue List Rebalance(Topic, Queues) ¡ý Merge All Queues Start Consume ``` --- ## Îå¡¢ÏûÏ¢¿É¿¿ÐÔ±£Ö¤ ### 5.1 Ïû·ÑÖØÊÔ»úÖÆ ``` Consumer.OnConsume() return false ¡ý SendMessageBackAsync(msg): - Topic = "%RETRY%{ConsumerGroup}" - DelayLevel = msg.ReconsumeTimes + 3 - ReconsumeTimes++ ¡ý if (ReconsumeTimes >= MaxReconsumeTimes) Topic = "%DLQ%{ConsumerGroup}" // ½øÈëËÀÐŶÓÁÐ ¡ý Send CONSUMER_SEND_MSG_BACK (36) to Broker ``` ### 5.2 ÊÂÎñÏûÏ¢Á÷³Ì ``` 1. Producer.PublishTransaction("¶©µ¥´´½¨") ¡ý 2. Send Half Message (PreparedTransaction = true) ¡ý 3. Execute Local Transaction ¡ý 4. EndTransaction(sendResult, Commit/Rollback) - Send END_TRANSACTION (37) to Broker ¡ý 5. Broker ¶¨ÆÚ»Ø²é (CHECK_TRANSACTION_STATE) - Producer.OnCheckTransaction(msg, transactionId) - Return TransactionState (Commit/Rollback/Unknown) ``` ### 5.3 ˳ÐòÏû·Ñ±£Ö¤ ``` Producer: SelectQueue(key) - Ïàͬ key ½øÈëͬһ¶ÓÁÐ Publish(message, queue) ¡ý Consumer: OrderConsume = true LockBatchMQAsync(queues) - Ïò Broker ÉêÇëËø¶¨¶ÓÁÐ ¡ý Single Thread Consume Each Queue Offset Commit After Success ¡ý UnlockBatchMQAsync(queues) - ÊÍ·ÅËø¶¨ ``` --- ## Áù¡¢ÐÔÄÜÓÅ»¯²ßÂÔ ### 6.1 Á¬½Ó¸´Óà **Remoting ЭÒé**£º - µ¥Á¬½Ó Opaque ¸´Óã¨RequestId Æ¥ÅäÏìÓ¦£© - Á¬½Ó³Ø¹ÜÀí£¨ConcurrentDictionary£© - ÐÄÌø±£»î£¨30s£© **gRPC ЭÒé**£º - HTTP/2 ¶à·¸´Óã¨ÌìȻ֧³Ö£© - HttpClient Á¬½Ó³Ø ### 6.2 ÏûϢѹËõ ``` if (message.Body.Length > CompressOverBytes) { message.Body = ZLIB.Compress(message.Body); message.SysFlag |= 0x1; // ±ê¼ÇѹËõ } ``` **½ÓÊÕ¶Ë×Ô¶¯½âѹ**£º ``` if ((msg.SysFlag & 0x1) != 0) { msg.Body = ZLIB.Decompress(msg.Body); } ``` ### 6.3 VIP ͨµÀ ``` if (VipChannelEnabled) { var port = brokerAddress.Port; vipAddress = $"{brokerAddress.Host}:{port - 2}"; client = GetClient(vipAddress); // ÓÅÏȼ¶¸ü¸ßµÄÁ¬½Ó } ``` ### 6.4 ²¢·¢¿ØÖÆ ``` private SemaphoreSlim _consumeSemaphore; public void Start() { _consumeSemaphore = new SemaphoreSlim(MaxConcurrentConsume); } await _consumeSemaphore.WaitAsync(cancellationToken); try { await ProcessMessageAsync(message); } finally { _consumeSemaphore.Release(); } ``` --- ## Æß¡¢À©Õ¹ÐÔÉè¼Æ ### 7.1 ÔÆ³§ÉÌÊÊÅäÆ÷ ```csharp public interface ICloudProvider { String Name { get; } String AccessKey { get; } String SecretKey { get; } String OnsChannel { get; } String TransformTopic(String topic); String TransformGroup(String group); String GetNameServerAddress(); } ``` **ʵÏÖʾÀý£¨°¢ÀïÔÆ£©**£º ```csharp public class AliyunProvider : ICloudProvider { public String InstanceId { get; set; } public String TransformTopic(String topic) { return $"{InstanceId}%{topic}"; } public String TransformGroup(String group) { return $"{InstanceId}%{group}"; } public String GetNameServerAddress() { // HTTP ½Ó¿Ú»ñÈ¡ NameServer µØÖ· var url = $"http://MQInstXXX.aliyuncs.com:80/rocketmq/nsaddr4client-internal"; return HttpGet(url); } } ``` ### 7.2 ÏûÏ¢¹³×Ó ```csharp public interface ISendMessageHook { void SendMessageBefore(SendMessageContext context); void SendMessageAfter(SendMessageContext context); } public interface IConsumeMessageHook { void ConsumeBefore(ConsumeMessageContext context); void ConsumeAfter(ConsumeMessageContext context); } ``` **ʹÓó¡¾°**£º - ÏûÏ¢¹ì¼£×·×Ù£¨MessageTraceHook£© - ×Ô¶¨ÒåÈÕÖ¾¼Ç¼ - ÐÔÄÜ¼à¿Ø - ÒµÎñÀ¹½Ø --- ## °Ë¡¢×ܽá NewLife.RocketMQ ͨ¹ý**·Ö²ãÇåÎú¡¢Ö°Ôðµ¥Ò»¡¢¿ÉÀ©Õ¹**µÄ¼Ü¹¹Éè¼Æ£¬ÊµÏÖÁË£º 1. **˫ЭÒéÖ§³Ö**£ºRemoting£¨Îȶ¨¸ßЧ£©+ gRPC£¨ÃæÏòδÀ´£© 2. **ÁãÍⲿÒÀÀµ**£º´¿ C# ʵÏÖ£¬ÄÚÖà Protobuf ±à½âÂëÆ÷ 3. **¶àÔÆÊÊÅä**£ºÍ³Ò» ICloudProvider ½Ó¿Ú£¬ÇáËɽÓÈë¸÷ÔÆ·þÎñÉÌ 4. **¸ßÐÔÄÜ**£ºÁ¬½Ó¸´Óá¢ÏûϢѹËõ¡¢VIP ͨµÀ¡¢²¢·¢¿ØÖÆ 5. **¸ß¿É¿¿**£ºÏû·ÑÖØÊÔ¡¢ËÀÐŶÓÁС¢ÊÂÎñ»Ø²é¡¢Ë³ÐòÏû·ÑËø¶¨ 6. **Ò×À©Õ¹**£º¹³×Ó»úÖÆ¡¢ÔƳ§ÉÌÊÊÅäÆ÷¡¢¸ºÔؾùºâ½Ó¿Ú --- **Îĵµ½áÊø**