diff --git a/Doc/req103.png b/Doc/req103.png
new file mode 100644
index 0000000..c298272
Binary files /dev/null and b/Doc/req103.png differ
diff --git a/Doc/req105_2.png b/Doc/req105_2.png
new file mode 100644
index 0000000..2291617
Binary files /dev/null and b/Doc/req105_2.png differ
diff --git a/Doc/req105.png b/Doc/req105.png
new file mode 100644
index 0000000..bce2da2
Binary files /dev/null and b/Doc/req105.png differ
diff --git a/Doc/res103.png b/Doc/res103.png
new file mode 100644
index 0000000..2677bed
Binary files /dev/null and b/Doc/res103.png differ
diff --git a/Doc/res105_2.png b/Doc/res105_2.png
new file mode 100644
index 0000000..7186e8c
Binary files /dev/null and b/Doc/res105_2.png differ
diff --git a/Doc/res105.png b/Doc/res105.png
new file mode 100644
index 0000000..3b0c447
Binary files /dev/null and b/Doc/res105.png differ
diff --git a/NewLife.RocketMQ/MQClient.cs b/NewLife.RocketMQ/MQClient.cs
index bc05106..2a9f111 100644
--- a/NewLife.RocketMQ/MQClient.cs
+++ b/NewLife.RocketMQ/MQClient.cs
@@ -1,7 +1,11 @@
using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
using System.Net.Sockets;
using NewLife.Net;
using NewLife.RocketMQ.Client;
+using NewLife.RocketMQ.Protocol;
namespace NewLife.RocketMQ
{
@@ -13,6 +17,7 @@ namespace NewLife.RocketMQ
public MQAdmin Config { get; }
private TcpClient _Client;
+ private Stream _Stream;
#endregion
#region 构造
@@ -43,6 +48,58 @@ namespace NewLife.RocketMQ
}
_Client = client;
+
+ _Stream = new BufferedStream(client.GetStream());
+ }
+
+ private Int32 g_id;
+ public Command Send(Command cmd)
+ {
+ if (cmd.Header.Opaque == 0) cmd.Header.Opaque = g_id++;
+
+ cmd.Write(_Stream);
+ //var ms = new MemoryStream();
+ //cmd.Write(ms);
+ //XTrace.WriteLine(ms.ToArray().ToHex());
+
+ var rs = new Command();
+ rs.Read(_Stream);
+
+ return rs;
+ }
+
+ public Command Send(RequestCode request, Object extFields = null)
+ {
+ var header = new Header
+ {
+ Code = (Int32)request,
+ };
+
+ var cmd = new Command
+ {
+ Header = header,
+ };
+
+ if (extFields != null) header.ExtFields = extFields.ToDictionary().ToDictionary(e => e.Key, e => e.Value + "");
+
+ return Send(cmd);
+ }
+ #endregion
+
+ #region 命令
+ public Command GetRouteInfo(String topic)
+ {
+ //var header = new Header
+ //{
+ // Code = (Int32)RequestCode.GET_ROUTEINTO_BY_TOPIC,
+ //};
+
+ //var cmd = new Command
+ //{
+ // Header = header,
+ //};
+
+ return Send(RequestCode.GET_ROUTEINTO_BY_TOPIC, new { topic });
}
#endregion
}
diff --git a/NewLife.RocketMQ/NewLife.RocketMQ.csproj b/NewLife.RocketMQ/NewLife.RocketMQ.csproj
index 96c8336..7ad03a2 100644
--- a/NewLife.RocketMQ/NewLife.RocketMQ.csproj
+++ b/NewLife.RocketMQ/NewLife.RocketMQ.csproj
@@ -5,7 +5,7 @@
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="NewLife.Core" Version="7.3.6763.36550" />
+ <PackageReference Include="NewLife.Core" Version="7.3.6839.35284" />
</ItemGroup>
<ItemGroup>
diff --git a/NewLife.RocketMQ/Producer/MQProducer.cs b/NewLife.RocketMQ/Producer/MQProducer.cs
index 0f7ad70..561966e 100644
--- a/NewLife.RocketMQ/Producer/MQProducer.cs
+++ b/NewLife.RocketMQ/Producer/MQProducer.cs
@@ -43,6 +43,8 @@ namespace NewLife.RocketMQ.Producer
var client = new MQClient(ClientId, this);
client.Start();
+ client.GetRouteInfo(CreateTopicKey);
+
_Client = client;
State = ServiceState.Running;
diff --git a/NewLife.RocketMQ/Protocol/Command.cs b/NewLife.RocketMQ/Protocol/Command.cs
new file mode 100644
index 0000000..71932df
--- /dev/null
+++ b/NewLife.RocketMQ/Protocol/Command.cs
@@ -0,0 +1,82 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using NewLife.Serialization;
+
+namespace NewLife.RocketMQ.Protocol
+{
+ /// <summary>命令</summary>
+ public class Command : IAccessor
+ {
+ #region 属性
+ /// <summary>头部</summary>
+ public Header Header { get; set; }
+
+ /// <summary>主体</summary>
+ public Byte[] Body { get; set; }
+
+ /// <summary>响应数据。自动解码Json</summary>
+ public IDictionary<String, Object> Data { get; private set; }
+ #endregion
+
+ #region 读写
+ public Boolean Read(Stream stream, Object context = null)
+ {
+ var bn = new Binary
+ {
+ Stream = stream,
+ IsLittleEndian = false,
+ };
+ var len = bn.Read<Int32>();
+
+ // 读取头部
+ var hlen = bn.Read<Int32>();
+ var json = bn.ReadBytes(hlen).ToStr();
+ Header = json.ToJsonEntity<Header>();
+
+ // 读取主体
+ if (len > 4 + hlen)
+ {
+ Body = bn.ReadBytes(len - 4 - hlen);
+
+ // 自动解码Json
+ if (Header.SerializeTypeCurrentRPC.EqualIgnoreCase("json"))
+ Data = new JsonParser(Body.ToStr()).Decode() as IDictionary<String, Object>;
+ }
+
+ return true;
+ }
+
+ public Boolean Write(Stream stream, Object context = null)
+ {
+ // 计算头部
+ var json = Header.ToJson();
+ //var js = new JsonWriter { LowerCaseName = true };
+ //var json = js.ToJson(Header, false);
+ var hs = json.GetBytes();
+ var buf = Body;
+
+ // 计算长度
+ var len = 4 + hs.Length;
+ if (buf != null) len += buf.Length;
+
+ // 写入总长度
+ var bn = new Binary
+ {
+ Stream = stream,
+ IsLittleEndian = false,
+ };
+ bn.Write(len);
+
+ // 写入头部
+ bn.Write(hs.Length);
+ stream.Write(hs);
+
+ // 写入主体
+ if (buf != null && buf.Length > 0) stream.Write(buf);
+
+ return true;
+ }
+ #endregion
+ }
+}
\ No newline at end of file
diff --git a/NewLife.RocketMQ/Protocol/Header.cs b/NewLife.RocketMQ/Protocol/Header.cs
index a298b22..ba847e4 100644
--- a/NewLife.RocketMQ/Protocol/Header.cs
+++ b/NewLife.RocketMQ/Protocol/Header.cs
@@ -10,15 +10,13 @@ namespace NewLife.RocketMQ.Protocol
/// <summary>请求/响应码</summary>
public Int32 Code { get; set; }
- /// <summary>由于要支持多语言,所以这一字段可以给通信双方知道对方通信层锁使用的开发语言</summary>
- public String Language { get; set; } = "CSharp";
-
- /// <summary>给通信层知道对方的版本号,响应方可以以此做兼容老版本等的特殊操作</summary>
- public Int32 Version { get; set; }
-
/// <summary>请求标识码。在Java版的通信层中,这个只是一个不断自增的整形,为了收到应答方响应的的时候找到对应的请求。</summary>
public Int32 Opaque { get; set; }
+ /// <summary>由于要支持多语言,所以这一字段可以给通信双方知道对方通信层锁使用的开发语言</summary>
+ /// <remarks>这里必须是JAVA,不能是CSharp,甚至Java都不行</remarks>
+ public String Language { get; set; } = "JAVA";
+
/// <summary>标识</summary>
/// <remarks>
/// 第0位标识是这次通信是request还是response,0标识request, 1 标识response。
@@ -26,6 +24,12 @@ namespace NewLife.RocketMQ.Protocol
/// </remarks>
public Int32 Flag { get; set; }
+ /// <summary>给通信层知道对方的版本号,响应方可以以此做兼容老版本等的特殊操作</summary>
+ public Int32 Version { get; set; } = 252;
+
+ /// <summary>序列化类型</summary>
+ public String SerializeTypeCurrentRPC { get; set; } = "JSON";
+
/// <summary>附带的文本信息。常见的如存放一些broker/nameserver返回的一些异常信息,方便开发人员定位问题。</summary>
public String Remark { get; set; }
@@ -35,7 +39,7 @@ namespace NewLife.RocketMQ.Protocol
/// 在Java的每个RemotingCammand中,其实都带有一个CommandCustomHeader的属性成员,可以认为他是一个强类型的extFields,
/// 再最后传输的时候,这个CommandCustomHeader会被忽略,而传输前会把其中的所有字段全部都原封不动塞到extFields中,以作传输。
/// </remarks>
- public Dictionary<String, String> ExtFields { get; set; } = new Dictionary<String, String>();
+ public IDictionary<String, String> ExtFields { get; set; } = new Dictionary<String, String>();
#endregion
}
}
\ No newline at end of file
diff --git a/NewLife.RocketMQ/Protocol/RequestCode.cs b/NewLife.RocketMQ/Protocol/RequestCode.cs
new file mode 100644
index 0000000..b6af7ad
--- /dev/null
+++ b/NewLife.RocketMQ/Protocol/RequestCode.cs
@@ -0,0 +1,151 @@
+namespace NewLife.RocketMQ.Protocol
+{
+ /// <summary>请求代码</summary>
+ public enum RequestCode
+ {
+ SEND_MESSAGE = 10,
+
+ PULL_MESSAGE = 11,
+
+ QUERY_MESSAGE = 12,
+ QUERY_BROKER_OFFSET = 13,
+ QUERY_CONSUMER_OFFSET = 14,
+ UPDATE_CONSUMER_OFFSET = 15,
+ UPDATE_AND_CREATE_TOPIC = 17,
+ GET_ALL_TOPIC_CONFIG = 21,
+ GET_TOPIC_CONFIG_LIST = 22,
+
+ GET_TOPIC_NAME_LIST = 23,
+
+ UPDATE_BROKER_CONFIG = 25,
+
+ GET_BROKER_CONFIG = 26,
+
+ TRIGGER_DELETE_FILES = 27,
+
+ GET_BROKER_RUNTIME_INFO = 28,
+ SEARCH_OFFSET_BY_TIMESTAMP = 29,
+ GET_MAX_OFFSET = 30,
+ GET_MIN_OFFSET = 31,
+
+ GET_EARLIEST_MSG_STORETIME = 32,
+
+ VIEW_MESSAGE_BY_ID = 33,
+
+ HEART_BEAT = 34,
+
+ UNREGISTER_CLIENT = 35,
+
+ CONSUMER_SEND_MSG_BACK = 36,
+
+ END_TRANSACTION = 37,
+ GET_CONSUMER_LIST_BY_GROUP = 38,
+
+ CHECK_TRANSACTION_STATE = 39,
+
+ NOTIFY_CONSUMER_IDS_CHANGED = 40,
+
+ LOCK_BATCH_MQ = 41,
+
+ UNLOCK_BATCH_MQ = 42,
+ GET_ALL_CONSUMER_OFFSET = 43,
+
+ GET_ALL_DELAY_OFFSET = 45,
+
+ CHECK_CLIENT_CONFIG = 46,
+
+ PUT_KV_CONFIG = 100,
+
+ GET_KV_CONFIG = 101,
+
+ DELETE_KV_CONFIG = 102,
+
+ REGISTER_BROKER = 103,
+
+ UNREGISTER_BROKER = 104,
+ GET_ROUTEINTO_BY_TOPIC = 105,
+
+ GET_BROKER_CLUSTER_INFO = 106,
+ UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200,
+ GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201,
+ GET_TOPIC_STATS_INFO = 202,
+ GET_CONSUMER_CONNECTION_LIST = 203,
+ GET_PRODUCER_CONNECTION_LIST = 204,
+ WIPE_WRITE_PERM_OF_BROKER = 205,
+
+ GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206,
+
+ DELETE_SUBSCRIPTIONGROUP = 207,
+ GET_CONSUME_STATS = 208,
+
+ SUSPEND_CONSUMER = 209,
+
+ RESUME_CONSUMER = 210,
+ RESET_CONSUMER_OFFSET_IN_CONSUMER = 211,
+ RESET_CONSUMER_OFFSET_IN_BROKER = 212,
+
+ ADJUST_CONSUMER_THREAD_POOL = 213,
+
+ WHO_CONSUME_THE_MESSAGE = 214,
+
+ DELETE_TOPIC_IN_BROKER = 215,
+
+ DELETE_TOPIC_IN_NAMESRV = 216,
+ GET_KVLIST_BY_NAMESPACE = 219,
+
+ RESET_CONSUMER_CLIENT_OFFSET = 220,
+
+ GET_CONSUMER_STATUS_FROM_CLIENT = 221,
+
+ INVOKE_BROKER_TO_RESET_OFFSET = 222,
+
+ INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223,
+
+ QUERY_TOPIC_CONSUME_BY_WHO = 300,
+
+ GET_TOPICS_BY_CLUSTER = 224,
+
+ REGISTER_FILTER_SERVER = 301,
+ REGISTER_MESSAGE_FILTER_CLASS = 302,
+
+ QUERY_CONSUME_TIME_SPAN = 303,
+
+ GET_SYSTEM_TOPIC_LIST_FROM_NS = 304,
+ GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305,
+
+ CLEAN_EXPIRED_CONSUMEQUEUE = 306,
+
+ GET_CONSUMER_RUNNING_INFO = 307,
+
+ QUERY_CORRECTION_OFFSET = 308,
+ CONSUME_MESSAGE_DIRECTLY = 309,
+
+ SEND_MESSAGE_V2 = 310,
+
+ GET_UNIT_TOPIC_LIST = 311,
+
+ GET_HAS_UNIT_SUB_TOPIC_LIST = 312,
+
+ GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313,
+
+ CLONE_GROUP_OFFSET = 314,
+
+ VIEW_BROKER_STATS_DATA = 315,
+
+ CLEAN_UNUSED_TOPIC = 316,
+
+ GET_BROKER_CONSUME_STATS = 317,
+
+ /// <summary>update the config of name server</summary>
+ UPDATE_NAMESRV_CONFIG = 318,
+
+ /// <summary>get config from name server</summary>
+ GET_NAMESRV_CONFIG = 319,
+
+ SEND_BATCH_MESSAGE = 320,
+
+ QUERY_CONSUME_QUEUE = 321,
+
+ QUERY_DATA_VERSION = 322,
+ }
+}
\ No newline at end of file
diff --git a/NewLife.RocketMQ/Protocol/ResponseCode.cs b/NewLife.RocketMQ/Protocol/ResponseCode.cs
new file mode 100644
index 0000000..1475c7c
--- /dev/null
+++ b/NewLife.RocketMQ/Protocol/ResponseCode.cs
@@ -0,0 +1,69 @@
+namespace NewLife.RocketMQ.Protocol
+{
+ /// <summary>响应码</summary>
+ public enum ResponseCode
+ {
+ SUCCESS = 0,
+
+ SYSTEM_ERROR = 1,
+
+ SYSTEM_BUSY = 2,
+
+ REQUEST_CODE_NOT_SUPPORTED = 3,
+
+ TRANSACTION_FAILED = 4,
+
+ FLUSH_DISK_TIMEOUT = 10,
+
+ SLAVE_NOT_AVAILABLE = 11,
+
+ FLUSH_SLAVE_TIMEOUT = 12,
+
+ MESSAGE_ILLEGAL = 13,
+
+ SERVICE_NOT_AVAILABLE = 14,
+
+ VERSION_NOT_SUPPORTED = 15,
+
+ NO_PERMISSION = 16,
+
+ TOPIC_NOT_EXIST = 17,
+ TOPIC_EXIST_ALREADY = 18,
+ PULL_NOT_FOUND = 19,
+
+ PULL_RETRY_IMMEDIATELY = 20,
+
+ PULL_OFFSET_MOVED = 21,
+
+ QUERY_NOT_FOUND = 22,
+
+ SUBSCRIPTION_PARSE_FAILED = 23,
+
+ SUBSCRIPTION_NOT_EXIST = 24,
+
+ SUBSCRIPTION_NOT_LATEST = 25,
+
+ SUBSCRIPTION_GROUP_NOT_EXIST = 26,
+
+ FILTER_DATA_NOT_EXIST = 27,
+
+ FILTER_DATA_NOT_LATEST = 28,
+
+ TRANSACTION_SHOULD_COMMIT = 200,
+
+ TRANSACTION_SHOULD_ROLLBACK = 201,
+
+ TRANSACTION_STATE_UNKNOW = 202,
+
+ TRANSACTION_STATE_GROUP_WRONG = 203,
+ NO_BUYER_ID = 204,
+
+ NOT_IN_CURRENT_UNIT = 205,
+
+ CONSUMER_NOT_ONLINE = 206,
+
+ CONSUME_MSG_TIMEOUT = 207,
+
+ NO_MESSAGE = 208,
+ }
+}
\ No newline at end of file
diff --git a/Test/packages.config b/Test/packages.config
index 93af704..551ec84 100644
--- a/Test/packages.config
+++ b/Test/packages.config
@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
- <package id="NewLife.Core" version="7.3.6763.36550" targetFramework="net461" />
+ <package id="NewLife.Core" version="7.3.6839.35284" targetFramework="net461" />
</packages>
\ No newline at end of file
diff --git a/Test/Program.cs b/Test/Program.cs
index 42555d9..83cd3ea 100644
--- a/Test/Program.cs
+++ b/Test/Program.cs
@@ -26,7 +26,8 @@ namespace Test
//SecretKey = "BvX6DpQffUz8xKIQ0u13EMxBW6YJmp",
ProducerGroup = "PID_Stone_001",
- NameServerAddress = "127.0.0.1:9876",
+ //NameServerAddress = "127.0.0.1:9876",
+ NameServerAddress = "192.168.0.4:9876",
InstanceName = "Producer",
};
diff --git a/Test/Test.csproj b/Test/Test.csproj
index 3d65b13..f307339 100644
--- a/Test/Test.csproj
+++ b/Test/Test.csproj
@@ -32,8 +32,8 @@
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
- <Reference Include="NewLife.Core, Version=7.3.6763.36550, Culture=neutral, processorArchitecture=MSIL">
- <HintPath>..\packages\NewLife.Core.7.3.6763.36550\lib\net45\NewLife.Core.dll</HintPath>
+ <Reference Include="NewLife.Core, Version=7.3.6839.35284, Culture=neutral, processorArchitecture=MSIL">
+ <HintPath>..\packages\NewLife.Core.7.3.6839.35284\lib\net45\NewLife.Core.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />