NewLife/NewLife.RocketMQ

客户端到名称服务器的第一个请求响应命令成功
大石头 authored at 2018-10-14 17:35:06
80fe383
Tree
1 Parent(s) d279739
Summary: 16 changed files with 378 additions and 12 deletions.
Added +0 -0
Added +0 -0
Added +0 -0
Added +0 -0
Added +0 -0
Added +0 -0
Modified +57 -0
Modified +1 -1
Modified +2 -0
Added +82 -0
Modified +11 -7
Added +151 -0
Added +69 -0
Modified +1 -1
Modified +2 -1
Modified +2 -2
Added +0 -0
diff --git a/Doc/req103.png b/Doc/req103.png
new file mode 100644
index 0000000..c298272
Binary files /dev/null and b/Doc/req103.png differ
Added +0 -0
diff --git a/Doc/req105_2.png b/Doc/req105_2.png
new file mode 100644
index 0000000..2291617
Binary files /dev/null and b/Doc/req105_2.png differ
Added +0 -0
diff --git a/Doc/req105.png b/Doc/req105.png
new file mode 100644
index 0000000..bce2da2
Binary files /dev/null and b/Doc/req105.png differ
Added +0 -0
diff --git a/Doc/res103.png b/Doc/res103.png
new file mode 100644
index 0000000..2677bed
Binary files /dev/null and b/Doc/res103.png differ
Added +0 -0
diff --git a/Doc/res105_2.png b/Doc/res105_2.png
new file mode 100644
index 0000000..7186e8c
Binary files /dev/null and b/Doc/res105_2.png differ
Added +0 -0
diff --git a/Doc/res105.png b/Doc/res105.png
new file mode 100644
index 0000000..3b0c447
Binary files /dev/null and b/Doc/res105.png differ
Modified +57 -0
diff --git a/NewLife.RocketMQ/MQClient.cs b/NewLife.RocketMQ/MQClient.cs
index bc05106..2a9f111 100644
--- a/NewLife.RocketMQ/MQClient.cs
+++ b/NewLife.RocketMQ/MQClient.cs
@@ -1,7 +1,11 @@
 using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
 using System.Net.Sockets;
 using NewLife.Net;
 using NewLife.RocketMQ.Client;
+using NewLife.RocketMQ.Protocol;
 
 namespace NewLife.RocketMQ
 {
@@ -13,6 +17,7 @@ namespace NewLife.RocketMQ
         public MQAdmin Config { get; }
 
         private TcpClient _Client;
+        private Stream _Stream;
         #endregion
 
         #region 构造
@@ -43,6 +48,58 @@ namespace NewLife.RocketMQ
             }
 
             _Client = client;
+
+            _Stream = new BufferedStream(client.GetStream());
+        }
+
+        private Int32 g_id;
+        public Command Send(Command cmd)
+        {
+            if (cmd.Header.Opaque == 0) cmd.Header.Opaque = g_id++;
+
+            cmd.Write(_Stream);
+            //var ms = new MemoryStream();
+            //cmd.Write(ms);
+            //XTrace.WriteLine(ms.ToArray().ToHex());
+
+            var rs = new Command();
+            rs.Read(_Stream);
+
+            return rs;
+        }
+
+        public Command Send(RequestCode request, Object extFields = null)
+        {
+            var header = new Header
+            {
+                Code = (Int32)request,
+            };
+
+            var cmd = new Command
+            {
+                Header = header,
+            };
+
+            if (extFields != null) header.ExtFields = extFields.ToDictionary().ToDictionary(e => e.Key, e => e.Value + "");
+
+            return Send(cmd);
+        }
+        #endregion
+
+        #region 命令
+        public Command GetRouteInfo(String topic)
+        {
+            //var header = new Header
+            //{
+            //    Code = (Int32)RequestCode.GET_ROUTEINTO_BY_TOPIC,
+            //};
+
+            //var cmd = new Command
+            //{
+            //    Header = header,
+            //};
+
+            return Send(RequestCode.GET_ROUTEINTO_BY_TOPIC, new { topic });
         }
         #endregion
     }
Modified +1 -1
diff --git a/NewLife.RocketMQ/NewLife.RocketMQ.csproj b/NewLife.RocketMQ/NewLife.RocketMQ.csproj
index 96c8336..7ad03a2 100644
--- a/NewLife.RocketMQ/NewLife.RocketMQ.csproj
+++ b/NewLife.RocketMQ/NewLife.RocketMQ.csproj
@@ -5,7 +5,7 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="NewLife.Core" Version="7.3.6763.36550" />
+    <PackageReference Include="NewLife.Core" Version="7.3.6839.35284" />
   </ItemGroup>
 
   <ItemGroup>
Modified +2 -0
diff --git a/NewLife.RocketMQ/Producer/MQProducer.cs b/NewLife.RocketMQ/Producer/MQProducer.cs
index 0f7ad70..561966e 100644
--- a/NewLife.RocketMQ/Producer/MQProducer.cs
+++ b/NewLife.RocketMQ/Producer/MQProducer.cs
@@ -43,6 +43,8 @@ namespace NewLife.RocketMQ.Producer
                     var client = new MQClient(ClientId, this);
                     client.Start();
 
+                    client.GetRouteInfo(CreateTopicKey);
+
                     _Client = client;
 
                     State = ServiceState.Running;
Added +82 -0
diff --git a/NewLife.RocketMQ/Protocol/Command.cs b/NewLife.RocketMQ/Protocol/Command.cs
new file mode 100644
index 0000000..71932df
--- /dev/null
+++ b/NewLife.RocketMQ/Protocol/Command.cs
@@ -0,0 +1,82 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using NewLife.Serialization;
+
+namespace NewLife.RocketMQ.Protocol
+{
+    /// <summary>命令</summary>
+    public class Command : IAccessor
+    {
+        #region 属性
+        /// <summary>头部</summary>
+        public Header Header { get; set; }
+
+        /// <summary>主体</summary>
+        public Byte[] Body { get; set; }
+
+        /// <summary>响应数据。自动解码Json</summary>
+        public IDictionary<String, Object> Data { get; private set; }
+        #endregion
+
+        #region 读写
+        public Boolean Read(Stream stream, Object context = null)
+        {
+            var bn = new Binary
+            {
+                Stream = stream,
+                IsLittleEndian = false,
+            };
+            var len = bn.Read<Int32>();
+
+            // 读取头部
+            var hlen = bn.Read<Int32>();
+            var json = bn.ReadBytes(hlen).ToStr();
+            Header = json.ToJsonEntity<Header>();
+
+            //  读取主体
+            if (len > 4 + hlen)
+            {
+                Body = bn.ReadBytes(len - 4 - hlen);
+
+                // 自动解码Json
+                if (Header.SerializeTypeCurrentRPC.EqualIgnoreCase("json"))
+                    Data = new JsonParser(Body.ToStr()).Decode() as IDictionary<String, Object>;
+            }
+
+            return true;
+        }
+
+        public Boolean Write(Stream stream, Object context = null)
+        {
+            // 计算头部
+            var json = Header.ToJson();
+            //var js = new JsonWriter { LowerCaseName = true };
+            //var json = js.ToJson(Header, false);
+            var hs = json.GetBytes();
+            var buf = Body;
+
+            // 计算长度
+            var len = 4 + hs.Length;
+            if (buf != null) len += buf.Length;
+
+            // 写入总长度
+            var bn = new Binary
+            {
+                Stream = stream,
+                IsLittleEndian = false,
+            };
+            bn.Write(len);
+
+            // 写入头部
+            bn.Write(hs.Length);
+            stream.Write(hs);
+
+            // 写入主体
+            if (buf != null && buf.Length > 0) stream.Write(buf);
+
+            return true;
+        }
+        #endregion
+    }
+}
\ No newline at end of file
Modified +11 -7
diff --git a/NewLife.RocketMQ/Protocol/Header.cs b/NewLife.RocketMQ/Protocol/Header.cs
index a298b22..ba847e4 100644
--- a/NewLife.RocketMQ/Protocol/Header.cs
+++ b/NewLife.RocketMQ/Protocol/Header.cs
@@ -10,15 +10,13 @@ namespace NewLife.RocketMQ.Protocol
         /// <summary>请求/响应码</summary>
         public Int32 Code { get; set; }
 
-        /// <summary>由于要支持多语言,所以这一字段可以给通信双方知道对方通信层锁使用的开发语言</summary>
-        public String Language { get; set; } = "CSharp";
-
-        /// <summary>给通信层知道对方的版本号,响应方可以以此做兼容老版本等的特殊操作</summary>
-        public Int32 Version { get; set; }
-
         /// <summary>请求标识码。在Java版的通信层中,这个只是一个不断自增的整形,为了收到应答方响应的的时候找到对应的请求。</summary>
         public Int32 Opaque { get; set; }
 
+        /// <summary>由于要支持多语言,所以这一字段可以给通信双方知道对方通信层锁使用的开发语言</summary>
+        /// <remarks>这里必须是JAVA,不能是CSharp,甚至Java都不行</remarks>
+        public String Language { get; set; } = "JAVA";
+
         /// <summary>标识</summary>
         /// <remarks>
         /// 第0位标识是这次通信是request还是response,0标识request, 1 标识response。
@@ -26,6 +24,12 @@ namespace NewLife.RocketMQ.Protocol
         /// </remarks>
         public Int32 Flag { get; set; }
 
+        /// <summary>给通信层知道对方的版本号,响应方可以以此做兼容老版本等的特殊操作</summary>
+        public Int32 Version { get; set; } = 252;
+
+        /// <summary>序列化类型</summary>
+        public String SerializeTypeCurrentRPC { get; set; } = "JSON";
+
         /// <summary>附带的文本信息。常见的如存放一些broker/nameserver返回的一些异常信息,方便开发人员定位问题。</summary>
         public String Remark { get; set; }
 
@@ -35,7 +39,7 @@ namespace NewLife.RocketMQ.Protocol
         /// 在Java的每个RemotingCammand中,其实都带有一个CommandCustomHeader的属性成员,可以认为他是一个强类型的extFields,
         /// 再最后传输的时候,这个CommandCustomHeader会被忽略,而传输前会把其中的所有字段全部都原封不动塞到extFields中,以作传输。
         /// </remarks>
-        public Dictionary<String, String> ExtFields { get; set; } = new Dictionary<String, String>();
+        public IDictionary<String, String> ExtFields { get; set; } = new Dictionary<String, String>();
         #endregion
     }
 }
\ No newline at end of file
Added +151 -0
diff --git a/NewLife.RocketMQ/Protocol/RequestCode.cs b/NewLife.RocketMQ/Protocol/RequestCode.cs
new file mode 100644
index 0000000..b6af7ad
--- /dev/null
+++ b/NewLife.RocketMQ/Protocol/RequestCode.cs
@@ -0,0 +1,151 @@
+namespace NewLife.RocketMQ.Protocol
+{
+    /// <summary>请求代码</summary>
+    public enum RequestCode
+    {
+        SEND_MESSAGE = 10,
+
+        PULL_MESSAGE = 11,
+
+        QUERY_MESSAGE = 12,
+        QUERY_BROKER_OFFSET = 13,
+        QUERY_CONSUMER_OFFSET = 14,
+        UPDATE_CONSUMER_OFFSET = 15,
+        UPDATE_AND_CREATE_TOPIC = 17,
+        GET_ALL_TOPIC_CONFIG = 21,
+        GET_TOPIC_CONFIG_LIST = 22,
+
+        GET_TOPIC_NAME_LIST = 23,
+
+        UPDATE_BROKER_CONFIG = 25,
+
+        GET_BROKER_CONFIG = 26,
+
+        TRIGGER_DELETE_FILES = 27,
+
+        GET_BROKER_RUNTIME_INFO = 28,
+        SEARCH_OFFSET_BY_TIMESTAMP = 29,
+        GET_MAX_OFFSET = 30,
+        GET_MIN_OFFSET = 31,
+
+        GET_EARLIEST_MSG_STORETIME = 32,
+
+        VIEW_MESSAGE_BY_ID = 33,
+
+        HEART_BEAT = 34,
+
+        UNREGISTER_CLIENT = 35,
+
+        CONSUMER_SEND_MSG_BACK = 36,
+
+        END_TRANSACTION = 37,
+        GET_CONSUMER_LIST_BY_GROUP = 38,
+
+        CHECK_TRANSACTION_STATE = 39,
+
+        NOTIFY_CONSUMER_IDS_CHANGED = 40,
+
+        LOCK_BATCH_MQ = 41,
+
+        UNLOCK_BATCH_MQ = 42,
+        GET_ALL_CONSUMER_OFFSET = 43,
+
+        GET_ALL_DELAY_OFFSET = 45,
+
+        CHECK_CLIENT_CONFIG = 46,
+
+        PUT_KV_CONFIG = 100,
+
+        GET_KV_CONFIG = 101,
+
+        DELETE_KV_CONFIG = 102,
+
+        REGISTER_BROKER = 103,
+
+        UNREGISTER_BROKER = 104,
+        GET_ROUTEINTO_BY_TOPIC = 105,
+
+        GET_BROKER_CLUSTER_INFO = 106,
+        UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200,
+        GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201,
+        GET_TOPIC_STATS_INFO = 202,
+        GET_CONSUMER_CONNECTION_LIST = 203,
+        GET_PRODUCER_CONNECTION_LIST = 204,
+        WIPE_WRITE_PERM_OF_BROKER = 205,
+
+        GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206,
+
+        DELETE_SUBSCRIPTIONGROUP = 207,
+        GET_CONSUME_STATS = 208,
+
+        SUSPEND_CONSUMER = 209,
+
+        RESUME_CONSUMER = 210,
+        RESET_CONSUMER_OFFSET_IN_CONSUMER = 211,
+        RESET_CONSUMER_OFFSET_IN_BROKER = 212,
+
+        ADJUST_CONSUMER_THREAD_POOL = 213,
+
+        WHO_CONSUME_THE_MESSAGE = 214,
+
+        DELETE_TOPIC_IN_BROKER = 215,
+
+        DELETE_TOPIC_IN_NAMESRV = 216,
+        GET_KVLIST_BY_NAMESPACE = 219,
+
+        RESET_CONSUMER_CLIENT_OFFSET = 220,
+
+        GET_CONSUMER_STATUS_FROM_CLIENT = 221,
+
+        INVOKE_BROKER_TO_RESET_OFFSET = 222,
+
+        INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223,
+
+        QUERY_TOPIC_CONSUME_BY_WHO = 300,
+
+        GET_TOPICS_BY_CLUSTER = 224,
+
+        REGISTER_FILTER_SERVER = 301,
+        REGISTER_MESSAGE_FILTER_CLASS = 302,
+
+        QUERY_CONSUME_TIME_SPAN = 303,
+
+        GET_SYSTEM_TOPIC_LIST_FROM_NS = 304,
+        GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305,
+
+        CLEAN_EXPIRED_CONSUMEQUEUE = 306,
+
+        GET_CONSUMER_RUNNING_INFO = 307,
+
+        QUERY_CORRECTION_OFFSET = 308,
+        CONSUME_MESSAGE_DIRECTLY = 309,
+
+        SEND_MESSAGE_V2 = 310,
+
+        GET_UNIT_TOPIC_LIST = 311,
+
+        GET_HAS_UNIT_SUB_TOPIC_LIST = 312,
+
+        GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313,
+
+        CLONE_GROUP_OFFSET = 314,
+
+        VIEW_BROKER_STATS_DATA = 315,
+
+        CLEAN_UNUSED_TOPIC = 316,
+
+        GET_BROKER_CONSUME_STATS = 317,
+
+        /// <summary>update the config of name server</summary>
+        UPDATE_NAMESRV_CONFIG = 318,
+
+        /// <summary>get config from name server</summary>
+        GET_NAMESRV_CONFIG = 319,
+
+        SEND_BATCH_MESSAGE = 320,
+
+        QUERY_CONSUME_QUEUE = 321,
+
+        QUERY_DATA_VERSION = 322,
+    }
+}
\ No newline at end of file
Added +69 -0
diff --git a/NewLife.RocketMQ/Protocol/ResponseCode.cs b/NewLife.RocketMQ/Protocol/ResponseCode.cs
new file mode 100644
index 0000000..1475c7c
--- /dev/null
+++ b/NewLife.RocketMQ/Protocol/ResponseCode.cs
@@ -0,0 +1,69 @@
+namespace NewLife.RocketMQ.Protocol
+{
+    /// <summary>响应码</summary>
+    public enum ResponseCode
+    {
+        SUCCESS = 0,
+
+        SYSTEM_ERROR = 1,
+
+        SYSTEM_BUSY = 2,
+
+        REQUEST_CODE_NOT_SUPPORTED = 3,
+
+        TRANSACTION_FAILED = 4,
+
+        FLUSH_DISK_TIMEOUT = 10,
+
+        SLAVE_NOT_AVAILABLE = 11,
+
+        FLUSH_SLAVE_TIMEOUT = 12,
+
+        MESSAGE_ILLEGAL = 13,
+
+        SERVICE_NOT_AVAILABLE = 14,
+
+        VERSION_NOT_SUPPORTED = 15,
+
+        NO_PERMISSION = 16,
+
+        TOPIC_NOT_EXIST = 17,
+        TOPIC_EXIST_ALREADY = 18,
+        PULL_NOT_FOUND = 19,
+
+        PULL_RETRY_IMMEDIATELY = 20,
+
+        PULL_OFFSET_MOVED = 21,
+
+        QUERY_NOT_FOUND = 22,
+
+        SUBSCRIPTION_PARSE_FAILED = 23,
+
+        SUBSCRIPTION_NOT_EXIST = 24,
+
+        SUBSCRIPTION_NOT_LATEST = 25,
+
+        SUBSCRIPTION_GROUP_NOT_EXIST = 26,
+
+        FILTER_DATA_NOT_EXIST = 27,
+
+        FILTER_DATA_NOT_LATEST = 28,
+
+        TRANSACTION_SHOULD_COMMIT = 200,
+
+        TRANSACTION_SHOULD_ROLLBACK = 201,
+
+        TRANSACTION_STATE_UNKNOW = 202,
+
+        TRANSACTION_STATE_GROUP_WRONG = 203,
+        NO_BUYER_ID = 204,
+
+        NOT_IN_CURRENT_UNIT = 205,
+
+        CONSUMER_NOT_ONLINE = 206,
+
+        CONSUME_MSG_TIMEOUT = 207,
+
+        NO_MESSAGE = 208,
+    }
+}
\ No newline at end of file
Modified +1 -1
diff --git a/Test/packages.config b/Test/packages.config
index 93af704..551ec84 100644
--- a/Test/packages.config
+++ b/Test/packages.config
@@ -1,4 +1,4 @@
 <?xml version="1.0" encoding="utf-8"?>
 <packages>
-  <package id="NewLife.Core" version="7.3.6763.36550" targetFramework="net461" />
+  <package id="NewLife.Core" version="7.3.6839.35284" targetFramework="net461" />
 </packages>
\ No newline at end of file
Modified +2 -1
diff --git a/Test/Program.cs b/Test/Program.cs
index 42555d9..83cd3ea 100644
--- a/Test/Program.cs
+++ b/Test/Program.cs
@@ -26,7 +26,8 @@ namespace Test
                 //SecretKey = "BvX6DpQffUz8xKIQ0u13EMxBW6YJmp",
 
                 ProducerGroup = "PID_Stone_001",
-                NameServerAddress = "127.0.0.1:9876",
+                //NameServerAddress = "127.0.0.1:9876",
+                NameServerAddress = "192.168.0.4:9876",
                 InstanceName = "Producer",
             };
 
Modified +2 -2
diff --git a/Test/Test.csproj b/Test/Test.csproj
index 3d65b13..f307339 100644
--- a/Test/Test.csproj
+++ b/Test/Test.csproj
@@ -32,8 +32,8 @@
     <WarningLevel>4</WarningLevel>
   </PropertyGroup>
   <ItemGroup>
-    <Reference Include="NewLife.Core, Version=7.3.6763.36550, Culture=neutral, processorArchitecture=MSIL">
-      <HintPath>..\packages\NewLife.Core.7.3.6763.36550\lib\net45\NewLife.Core.dll</HintPath>
+    <Reference Include="NewLife.Core, Version=7.3.6839.35284, Culture=neutral, processorArchitecture=MSIL">
+      <HintPath>..\packages\NewLife.Core.7.3.6839.35284\lib\net45\NewLife.Core.dll</HintPath>
     </Reference>
     <Reference Include="System" />
     <Reference Include="System.Core" />