NewLife/NewLife.RocketMQ

Merge branch 'master' of https://github.com/NewLifeX/NewLife.RocketMQ
大石头 authored at 2025-12-07 14:04:37
7e5154d
Tree
2 Parent(s) 8d5e195 + 2166416
Summary: 4 changed files with 72 additions and 19 deletions.
Modified +4 -0
Modified +1 -1
Modified +66 -13
Modified +1 -5
Modified +4 -0
diff --git a/NewLife.RocketMQ/Consumer.cs b/NewLife.RocketMQ/Consumer.cs
index 8aeab58..c7bd8dd 100644
--- a/NewLife.RocketMQ/Consumer.cs
+++ b/NewLife.RocketMQ/Consumer.cs
@@ -48,6 +48,9 @@ public class Consumer : MqBase
     /// <summary>消息模型。广播/集群</summary>
     public MessageModels MessageModel { get; set; } = MessageModels.Clustering;
 
+    /// <summary>消费类型。CONSUME_PASSIVELY/CONSUME_ACTIVELY</summary>
+    public String ConsumeType { get; set; } = "CONSUME_PASSIVELY";
+
     /// <summary>消费委托</summary>
     public Func<MessageQueue, MessageExt[], Boolean> OnConsume;
 
@@ -111,6 +114,7 @@ public class Consumer : MqBase
                 ConsumeFromWhere = FromLastOffset ? "CONSUME_FROM_LAST_OFFSET" : "CONSUME_FROM_FIRST_OFFSET",
                 MessageModel = MessageModel.ToString().ToUpper(),
                 SubscriptionDataSet = new[] { sd },
+                ConsumeType = ConsumeType,
             };
 
             list = new[] { cd };
Modified +1 -1
diff --git a/NewLife.RocketMQ/MessageTrace/MessageTraceHook.cs b/NewLife.RocketMQ/MessageTrace/MessageTraceHook.cs
index 324cf58..3e4acad 100644
--- a/NewLife.RocketMQ/MessageTrace/MessageTraceHook.cs
+++ b/NewLife.RocketMQ/MessageTrace/MessageTraceHook.cs
@@ -36,7 +36,7 @@ namespace NewLife.RocketMQ.MessageTrace
         public void ExecuteHookAfter(SendMessageContext context)
         {
 
-            if (context.Message.Topic.Equals("RMQ_SYS_TRACE_TOPIC"))
+            if (context.Message?.Topic?.Equals("RMQ_SYS_TRACE_TOPIC") == true)
             {
                 return;
             }
Modified +66 -13
diff --git a/NewLife.RocketMQ/Protocol/Message.cs b/NewLife.RocketMQ/Protocol/Message.cs
index 36acf97..e2dbb09 100644
--- a/NewLife.RocketMQ/Protocol/Message.cs
+++ b/NewLife.RocketMQ/Protocol/Message.cs
@@ -14,10 +14,18 @@ public class Message
     public String Topic { get; set; }
 
     /// <summary>标签</summary>
-    public String Tags { get; set; }
+    public String Tags
+    {
+        get => Properties.TryGetValue("TAGS", out var str) ? str : null;
+        set => Properties["TAGS"] = value;
+    }
 
     /// <summary>键</summary>
-    public String Keys { get; set; }
+    public String Keys
+    {
+        get => Properties.TryGetValue("KEYS", out var str) ? str : null;
+        set => Properties["KEYS"] = value;
+    }
 
     /// <summary>标记</summary>
     public Int32 Flag { get; set; }
@@ -31,16 +39,37 @@ public class Message
     public String BodyString { get => _BodyString ??= Body?.ToStr(); set => Body = (_BodyString = value)?.GetBytes(); }
 
     /// <summary>等待存储消息</summary>
-    public Boolean WaitStoreMsgOK { get; set; } = true;
+    public Boolean WaitStoreMsgOK
+    {
+        get => Properties.TryGetValue("WAIT", out var str) ? str.ToBoolean() : true;
+        set => Properties["WAIT"] = value.ToString();
+    }
 
     /// <summary>延迟时间等级</summary>
-    public Int32 DelayTimeLevel { get; set; }
+    public Int32 DelayTimeLevel
+    {
+        get => Properties.TryGetValue("DELAY", out var str) ? str.ToInt() : 0;
+        set => Properties["DELAY"] = value.ToString();
+    }
 
     /// <summary>事务标识</summary>
-    public String TransactionId { get; set; }
+    public String TransactionId
+    {
+        get => Properties.TryGetValue("UNIQ_KEY", out var str) ? str : null;
+        set => Properties["UNIQ_KEY"] = value;
+    }
+
+    /// <summary>附加属性</summary>
+    public IDictionary<String, String> Properties { get; set; }
     #endregion
 
     #region 构造
+    /// <summary>实例化</summary>
+    public Message()
+    {
+        Properties = new NullableDictionary<String, String>(StringComparer.OrdinalIgnoreCase);
+    }
+
     /// <summary>友好字符串</summary>
     /// <returns></returns>
     public override String ToString() => Body != null && Body.Length > 0 ? BodyString : base.ToString();
@@ -77,29 +106,53 @@ public class Message
     {
         var sb = Pool.StringBuilder.Get();
 
-        if (!TransactionId.IsNullOrEmpty()) sb.AppendFormat("{0}\u0001{1}\u0002", "UNIQ_KEY", TransactionId);
-        sb.AppendFormat("{0}\u0001{1}\u0002", "WAIT", WaitStoreMsgOK);
-        if (!Tags.IsNullOrEmpty()) sb.AppendFormat("{0}\u0001{1}\u0002", "TAGS", Tags);
-        if (!Keys.IsNullOrEmpty()) sb.AppendFormat("{0}\u0001{1}\u0002", "KEYS", Keys);
-        if (DelayTimeLevel > 0) sb.AppendFormat("{0}\u0001{1}\u0002", "DELAY", DelayTimeLevel);
+        if (Properties != null && Properties.Count > 0)
+        {
+            foreach (var item in Properties)
+            {
+                sb.AppendFormat("{0}\u0001{1}\u0002", item.Key, item.Value);
+            }
+        }
 
         return sb.Return(true);
     }
 
+    /// <summary>设置属性</summary>
+    /// <param name="key"></param>
+    /// <param name="value"></param>
+    public void PutUserProperty(String key, String value)
+    {
+        if (String.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key));
+        if (String.IsNullOrEmpty(value)) throw new ArgumentNullException(nameof(value));
+
+        Properties[key] = value;
+    }
+
+    /// <summary>获取属性</summary>
+    /// <param name="key"></param>
+    /// <returns></returns>
+    public String GetUserProperty(String key)
+    {
+        Properties.TryGetValue(key, out var value);
+        return value;
+    }
+
     /// <summary>分析字典属性</summary>
     /// <param name="properties"></param>
     public IDictionary<String, String> ParseProperties(String properties)
     {
-        if (properties.IsNullOrEmpty()) return null;
+        if (properties.IsNullOrEmpty()) return Properties;
 
         var dic = SplitAsDictionary(properties, "\u0001", "\u0002");
 
+        Properties = dic;
+
         if (TryGetAndRemove(dic, nameof(Tags), out var str)) Tags = str;
         if (TryGetAndRemove(dic, nameof(Keys), out str)) Keys = str;
         if (TryGetAndRemove(dic, "DELAY", out str)) DelayTimeLevel = str.ToInt();
         if (TryGetAndRemove(dic, "WAIT", out str)) WaitStoreMsgOK = str.ToBoolean();
-
-        return dic;
+        
+        return Properties;
     }
 
     private static IDictionary<String, String> SplitAsDictionary(String value, String nameValueSeparator, String separator)
Modified +1 -5
diff --git a/NewLife.RocketMQ/Protocol/MessageExt.cs b/NewLife.RocketMQ/Protocol/MessageExt.cs
index dba3528..977e108 100644
--- a/NewLife.RocketMQ/Protocol/MessageExt.cs
+++ b/NewLife.RocketMQ/Protocol/MessageExt.cs
@@ -45,9 +45,6 @@ public class MessageExt : Message, IAccessor
     /// <summary>准备事务偏移</summary>
     public Int64 PreparedTransactionOffset { get; set; }
 
-    /// <summary>属性</summary>
-    public IDictionary<String, String> Properties { get; set; }
-
     /// <summary>消息编号</summary>
     public String MsgId { get; set; }
     #endregion
@@ -112,8 +109,7 @@ public class MessageExt : Message, IAccessor
 
         var len2 = bn.Read<Int16>();
         var str = bn.ReadBytes(len2).ToStr();
-        var dic = ParseProperties(str);
-        if (dic != null && dic.Count > 0) Properties = dic;
+        ParseProperties(str);
 
         // MsgId
         var ms = Pool.MemoryStream.Get();