diff --git a/NewLife.RocketMQ/Protocol/Message.cs b/NewLife.RocketMQ/Protocol/Message.cs
index 36acf97..e2dbb09 100644
--- a/NewLife.RocketMQ/Protocol/Message.cs
+++ b/NewLife.RocketMQ/Protocol/Message.cs
@@ -14,10 +14,18 @@ public class Message
public String Topic { get; set; }
/// <summary>标签</summary>
- public String Tags { get; set; }
+ public String Tags
+ {
+ get => Properties.TryGetValue("TAGS", out var str) ? str : null;
+ set => Properties["TAGS"] = value;
+ }
/// <summary>键</summary>
- public String Keys { get; set; }
+ public String Keys
+ {
+ get => Properties.TryGetValue("KEYS", out var str) ? str : null;
+ set => Properties["KEYS"] = value;
+ }
/// <summary>标记</summary>
public Int32 Flag { get; set; }
@@ -31,16 +39,37 @@ public class Message
public String BodyString { get => _BodyString ??= Body?.ToStr(); set => Body = (_BodyString = value)?.GetBytes(); }
/// <summary>等待存储消息</summary>
- public Boolean WaitStoreMsgOK { get; set; } = true;
+ public Boolean WaitStoreMsgOK
+ {
+ get => Properties.TryGetValue("WAIT", out var str) ? str.ToBoolean() : true;
+ set => Properties["WAIT"] = value.ToString();
+ }
/// <summary>延迟时间等级</summary>
- public Int32 DelayTimeLevel { get; set; }
+ public Int32 DelayTimeLevel
+ {
+ get => Properties.TryGetValue("DELAY", out var str) ? str.ToInt() : 0;
+ set => Properties["DELAY"] = value.ToString();
+ }
/// <summary>事务标识</summary>
- public String TransactionId { get; set; }
+ public String TransactionId
+ {
+ get => Properties.TryGetValue("UNIQ_KEY", out var str) ? str : null;
+ set => Properties["UNIQ_KEY"] = value;
+ }
+
+ /// <summary>附加属性</summary>
+ public IDictionary<String, String> Properties { get; set; }
#endregion
#region 构造
+ /// <summary>实例化</summary>
+ public Message()
+ {
+ Properties = new NullableDictionary<String, String>(StringComparer.OrdinalIgnoreCase);
+ }
+
/// <summary>友好字符串</summary>
/// <returns></returns>
public override String ToString() => Body != null && Body.Length > 0 ? BodyString : base.ToString();
@@ -77,29 +106,53 @@ public class Message
{
var sb = Pool.StringBuilder.Get();
- if (!TransactionId.IsNullOrEmpty()) sb.AppendFormat("{0}\u0001{1}\u0002", "UNIQ_KEY", TransactionId);
- sb.AppendFormat("{0}\u0001{1}\u0002", "WAIT", WaitStoreMsgOK);
- if (!Tags.IsNullOrEmpty()) sb.AppendFormat("{0}\u0001{1}\u0002", "TAGS", Tags);
- if (!Keys.IsNullOrEmpty()) sb.AppendFormat("{0}\u0001{1}\u0002", "KEYS", Keys);
- if (DelayTimeLevel > 0) sb.AppendFormat("{0}\u0001{1}\u0002", "DELAY", DelayTimeLevel);
+ if (Properties != null && Properties.Count > 0)
+ {
+ foreach (var item in Properties)
+ {
+ sb.AppendFormat("{0}\u0001{1}\u0002", item.Key, item.Value);
+ }
+ }
return sb.Return(true);
}
+ /// <summary>设置属性</summary>
+ /// <param name="key"></param>
+ /// <param name="value"></param>
+ public void PutUserProperty(String key, String value)
+ {
+ if (String.IsNullOrEmpty(key)) throw new ArgumentNullException(nameof(key));
+ if (String.IsNullOrEmpty(value)) throw new ArgumentNullException(nameof(value));
+
+ Properties[key] = value;
+ }
+
+ /// <summary>获取属性</summary>
+ /// <param name="key"></param>
+ /// <returns></returns>
+ public String GetUserProperty(String key)
+ {
+ Properties.TryGetValue(key, out var value);
+ return value;
+ }
+
/// <summary>分析字典属性</summary>
/// <param name="properties"></param>
public IDictionary<String, String> ParseProperties(String properties)
{
- if (properties.IsNullOrEmpty()) return null;
+ if (properties.IsNullOrEmpty()) return Properties;
var dic = SplitAsDictionary(properties, "\u0001", "\u0002");
+ Properties = dic;
+
if (TryGetAndRemove(dic, nameof(Tags), out var str)) Tags = str;
if (TryGetAndRemove(dic, nameof(Keys), out str)) Keys = str;
if (TryGetAndRemove(dic, "DELAY", out str)) DelayTimeLevel = str.ToInt();
if (TryGetAndRemove(dic, "WAIT", out str)) WaitStoreMsgOK = str.ToBoolean();
-
- return dic;
+
+ return Properties;
}
private static IDictionary<String, String> SplitAsDictionary(String value, String nameValueSeparator, String separator)