NewLife/NewLife.Kafka

v1.1 增加埋点
大石头 编写于 2023-10-31 14:15:11
共计: 修改8个文件,增加314行、删除214行。
增加 +112 -0
修改 +1 -1
修改 +1 -1
修改 +1 -1
修改 +148 -159
修改 +42 -43
修改 +8 -8
修改 +1 -1
增加 +112 -0
diff --git a/.editorconfig b/.editorconfig
new file mode 100644
index 0000000..0f826be
--- /dev/null
+++ b/.editorconfig
@@ -0,0 +1,112 @@
+# EditorConfig is awesome:http://EditorConfig.org
+# https://docs.microsoft.com/en-us/visualstudio/ide/editorconfig-code-style-settings-reference
+
+# top-most EditorConfig file
+root = true
+
+# Don't use tabs for indentation.
+[*]
+indent_style = space
+# (Please don't specify an indent_size here; that has too many unintended consequences.)
+
+# Code files
+[*.{cs,csx,vb,vbx}]
+indent_size = 4
+insert_final_newline = false
+charset = utf-8-bom
+end_of_line = crlf
+
+# Xml project files
+[*.{csproj,vbproj,vcxproj,vcxproj.filters,proj,projitems,shproj}]
+indent_size = 2
+
+# Xml config files
+[*.{props,targets,ruleset,config,nuspec,resx,vsixmanifest,vsct}]
+indent_size = 2
+
+# JSON files
+[*.json]
+indent_size = 2
+
+# Dotnet code style settings:
+[*.{cs,vb}]
+# Sort using and Import directives with System.* appearing first
+dotnet_sort_system_directives_first = true
+
+csharp_indent_case_contents = true
+csharp_indent_switch_labels = true
+csharp_indent_labels = flush_left
+
+#csharp_space_after_cast = true
+#csharp_space_after_keywords_in_control_flow_statements = true
+#csharp_space_between_method_declaration_parameter_list_parentheses = true
+#csharp_space_between_method_call_parameter_list_parentheses = true
+#csharp_space_between_parentheses = control_flow_statements, type_casts
+
+# 单行放置代码
+csharp_preserve_single_line_statements = true
+csharp_preserve_single_line_blocks = true
+
+# Avoid "this." and "Me." if not necessary
+dotnet_style_qualification_for_field = false:warning
+dotnet_style_qualification_for_property = false:warning
+dotnet_style_qualification_for_method = false:warning
+dotnet_style_qualification_for_event = false:warning
+
+# Use language keywords instead of framework type names for type references
+dotnet_style_predefined_type_for_locals_parameters_members = false:suggestion
+dotnet_style_predefined_type_for_member_access = false:suggestion
+#dotnet_style_require_accessibility_modifiers = for_non_interface_members:none/always:suggestion
+
+# Suggest more modern language features when available
+dotnet_style_object_initializer = true:suggestion
+dotnet_style_collection_initializer = true:suggestion
+dotnet_style_coalesce_expression = true:suggestion
+dotnet_style_null_propagation = true:suggestion
+dotnet_style_explicit_tuple_names = true:suggestion
+dotnet_style_prefer_inferred_tuple_names = true:suggestion
+dotnet_style_prefer_inferred_anonymous_type_member_names = true:suggestion
+
+# CSharp code style settings:
+[*.cs]
+# Prefer "var" everywhere
+csharp_style_var_for_built_in_types = true:warning
+csharp_style_var_when_type_is_apparent = true:warning
+csharp_style_var_elsewhere = true:warning
+
+# Prefer method-like constructs to have a block body
+csharp_style_expression_bodied_methods = when_on_single_line:suggestion
+csharp_style_expression_bodied_constructors = when_on_single_line:suggestion
+csharp_style_expression_bodied_operators = when_on_single_line:suggestion
+
+# Prefer property-like constructs to have an expression-body
+csharp_style_expression_bodied_properties = true:suggestion
+csharp_style_expression_bodied_indexers = true:suggestion
+#csharp_style_expression_bodied_accessors = true:suggestion
+
+# Suggest more modern language features when available
+csharp_style_pattern_matching_over_is_with_cast_check = true:suggestion
+csharp_style_pattern_matching_over_as_with_null_check = true:suggestion
+csharp_style_inlined_variable_declaration = true:suggestion
+
+csharp_prefer_simple_default_expression = true:suggestion
+csharp_style_deconstructed_variable_declaration = true:suggestion
+csharp_style_pattern_local_over_anonymous_function = true:suggestion
+
+csharp_style_throw_expression = true:suggestion
+csharp_style_conditional_delegate_call = true:suggestion
+
+# 单行不需要大括号
+csharp_prefer_braces = false:suggestion
+
+# Newline settings
+csharp_new_line_before_open_brace = all
+csharp_new_line_before_else = true
+csharp_new_line_before_catch = true
+csharp_new_line_before_finally = true
+csharp_new_line_before_members_in_object_initializers = true
+csharp_new_line_before_members_in_anonymous_types = true
+csharp_new_line_between_query_expression_clauses = true
+
+[*.md]
+trim_trailing_whitespace = false
\ No newline at end of file
修改 +1 -1
diff --git a/.github/workflows/publish-beta.yml b/.github/workflows/publish-beta.yml
index 2ed9755..d22ea13 100644
--- a/.github/workflows/publish-beta.yml
+++ b/.github/workflows/publish-beta.yml
@@ -16,7 +16,7 @@ jobs:
     - name: Setup .NET
       uses: actions/setup-dotnet@v2
       with:
-        dotnet-version: 6.0.x
+        dotnet-version: 7.0.x
     - name: Restore
       run: |
         dotnet restore NewLife.Kafka/NewLife.Kafka.csproj
修改 +1 -1
diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml
index c6ff0da..1f617d9 100644
--- a/.github/workflows/publish.yml
+++ b/.github/workflows/publish.yml
@@ -15,7 +15,7 @@ jobs:
     - name: Setup .NET
       uses: actions/setup-dotnet@v2
       with:
-        dotnet-version: 6.0.x
+        dotnet-version: 7.0.x
     - name: Restore
       run: |
         dotnet restore NewLife.Kafka/NewLife.Kafka.csproj
修改 +1 -1
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 35c87fb..eaf7d07 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -17,7 +17,7 @@ jobs:
     - name: Setup .NET
       uses: actions/setup-dotnet@v2
       with:
-        dotnet-version: 6.0.x
+        dotnet-version: 7.0.x
     - name: Build
       run: |
         dotnet build -c Release
修改 +148 -159
diff --git a/NewLife.Kafka/KfkClient.cs b/NewLife.Kafka/KfkClient.cs
index 372f1d5..7235c70 100644
--- a/NewLife.Kafka/KfkClient.cs
+++ b/NewLife.Kafka/KfkClient.cs
@@ -1,80 +1,85 @@
 using Confluent.Kafka;
-using NewLife.Collections;
 using NewLife.Log;
+using NewLife.Serialization;
 using NewLife.Threading;
-using System;
-using System.Collections.Generic;
-using System.Linq;
 
-namespace NewLife.Kafka
+namespace NewLife.Kafka;
+
+/// <summary>消费客户端</summary>
+public class KfkClient : DisposeBase
 {
-    /// <summary>消费客户端</summary>
-    public class KfkClient : DisposeBase
-    {
-        #region 属性
-        /// <summary>是否使用中</summary>
-        public Boolean Active { get; private set; }
+    #region 属性
+    /// <summary>是否使用中</summary>
+    public Boolean Active { get; private set; }
 
-        /// <summary>消费者</summary>
-        public IConsumer<String, String> Consumer { get; set; }
+    /// <summary>服务器集群地址</summary>
+    public String Servers { get; set; }
 
-        /// <summary>主题</summary>
-        public String Topic { get; set; }
+    /// <summary>主题</summary>
+    public String Topic { get; set; }
 
-        /// <summary>消费者</summary>
-        public String GroupID { get; set; }
+    /// <summary>消费者</summary>
+    public String GroupID { get; set; }
 
-        /// <summary>服务器集群地址</summary>
-        public String Servers { get; set; }
+    /// <summary>批大小。消费后整批处理,默认1000</summary>
+    public Int32 BatchSize { get; set; } = 1000;
 
-        /// <summary>批大小。消费后整批处理,默认1000</summary>
-        public Int32 BatchSize { get; set; } = 1000;
+    /// <summary>消费者</summary>
+    public IConsumer<String, String> Consumer { get; set; }
 
-        /// <summary>每次消费完成后自动提交偏移量</summary>
-        public Boolean AutoCommited { get; set; } = true;
+    ///// <summary>每次消费完成后自动提交偏移量</summary>
+    //public Boolean AutoCommited { get; set; } = true;
 
-        /// <summary>分区</summary>
-        protected IList<Int32> Partitions { get; } = new List<Int32>();
-        #endregion
+    /// <summary>分区</summary>
+    protected IList<Int32> Partitions { get; } = new List<Int32>();
 
-        #region 构造
-        /// <summary>销毁</summary>
-        /// <param name="disposing"></param>
-        protected override void Dispose(bool disposing)
-        {
-            base.Dispose(disposing);
+    /// <summary>性能追踪</summary>
+    public ITracer Tracer { get; set; }
+    #endregion
 
-            Stop();
-        }
-        #endregion
+    #region 构造
+    /// <summary>销毁</summary>
+    /// <param name="disposing"></param>
+    protected override void Dispose(bool disposing)
+    {
+        base.Dispose(disposing);
 
-        #region 开始停止
-        /// <summary>确保已创建</summary>
-        public virtual void EnsureCreate()
-        {
-            var csm = Consumer;
-            if (csm != null) return;
+        Stop();
+    }
+    #endregion
 
-            if (Topic.IsNullOrEmpty()) throw new Exception($"消费主题不能为空!");
+    #region 开始停止
+    /// <summary>确保已创建</summary>
+    public virtual void EnsureCreate()
+    {
+        var csm = Consumer;
+        if (csm != null) return;
 
-            var setting = LoadSetting();
-            //csm = new Consumer(setting);
-            var builder = new ConsumerBuilder<String, String>(setting.ToDictionary(e => e.Key, e => e.Value + ""));
-            csm = builder.Build();
+        if (Topic.IsNullOrEmpty()) throw new Exception($"消费主题不能为空!");
 
-            //// 加载错误事件和消费错误事件处理函数
-            //csm.OnConsumeError += WriteLog;
-            //csm.OnError += WriteLog;
-            //csm.OnLog += WriteLog;
+        var setting = LoadSetting();
+        DefaultSpan.Current?.AppendTag(setting.ToJson());
 
-            Consumer = csm;
-        }
+        //csm = new Consumer(setting);
+        var builder = new ConsumerBuilder<String, String>(setting.ToDictionary(e => e.Key, e => e.Value + ""));
+        csm = builder.Build();
 
-        /// <summary>开始</summary>
-        public void Start()
-        {
-            if (Active) return;
+        //// 加载错误事件和消费错误事件处理函数
+        //csm.OnConsumeError += WriteLog;
+        //csm.OnError += WriteLog;
+        //csm.OnLog += WriteLog;
 
+        Consumer = csm;
+    }
+
+    /// <summary>开始</summary>
+    public void Start()
+    {
+        if (Active) return;
+
+        using var span = Tracer?.NewSpan($"kafka:{Topic}:Start");
+        try
+        {
             EnsureCreate();
 
             // 挂载消费主题
@@ -90,23 +95,28 @@ namespace NewLife.Kafka
                 Consumer.Subscribe(Topic);
             }
 
-            InitStat();
             _timer = new TimerX(DoConsume, null, 0, 1000) { Async = true };
 
             Active = true;
         }
-
-        /// <summary>停止</summary>
-        public void Stop()
+        catch (Exception ex)
         {
-            if (!Active) return;
+            span?.SetError(ex, null);
+            throw;
+        }
+    }
+
+    /// <summary>停止</summary>
+    public void Stop()
+    {
+        if (!Active) return;
 
+        using var span = Tracer?.NewSpan($"kafka:{Topic}:Stop");
+        try
+        {
             _timer.TryDispose();
             _timer = null;
 
-            _stTimer.TryDispose();
-            _stTimer = null;
-
             Consumer.Unassign();
             Consumer.Unsubscribe();
 
@@ -114,28 +124,35 @@ namespace NewLife.Kafka
 
             Active = false;
         }
-        #endregion
+        catch (Exception ex)
+        {
+            span?.SetError(ex, null);
+            throw;
+        }
+    }
+    #endregion
 
-        #region 消费
-        private TimerX _timer;
+    #region 消费
+    private TimerX _timer;
 
-        private void DoConsume(Object state)
-        {
-            var list = new List<ConsumeResult<String, String>>();
+    private void DoConsume(Object state)
+    {
+        var list = new List<ConsumeResult<String, String>>();
 
-            // 多次拉取,批量处理
-            for (var i = 0; i < BatchSize; i++)
+        // 多次拉取,批量处理
+        for (var i = 0; i < BatchSize; i++)
+        {
+            var result = Consumer.Consume(10);
+            if (result != null)
             {
-                var result = Consumer.Consume(10);
-                if (result != null)
-                {
-                    list.Add(result);
-
-                    Stat.Increment(1, 0);
-                }
+                list.Add(result);
             }
+        }
 
-            if (list.Count > 0)
+        if (list.Count > 0)
+        {
+            using var span = Tracer?.NewSpan($"mq:{Topic}:Consume");
+            try
             {
                 // 批量处理
                 OnProcess(list.Select(e => e.Message).ToList());
@@ -146,93 +163,65 @@ namespace NewLife.Kafka
                 // 马上开始下一次
                 TimerX.Current.SetNext(-1);
             }
-        }
-
-        /// <summary>收到消息事件</summary>
-        public event EventHandler<IList<Message<String, String>>> OnMessage;
-
-        /// <summary>处理一批消息</summary>
-        /// <param name="messages"></param>
-        protected virtual void OnProcess(IList<Message<String, String>> messages) => OnMessage?.Invoke(this, messages);
-        #endregion
-
-        #region 统计
-        /// <summary>消费统计</summary>
-        public ICounter Stat { get; set; } = new PerfCounter();
-
-        /// <summary>显示统计信息的周期。默认60秒,0表示不显示统计信息</summary>
-        public Int32 StatPeriod { get; set; } = 60;
+            catch (Exception ex)
+            {
+                span?.SetError(ex, null);
 
-        private TimerX _stTimer;
-        private void InitStat()
-        {
-            var p = StatPeriod * 1000;
-            _stTimer = new TimerX(ShowStat, null, p, p) { Async = true };
+                throw;
+            }
         }
+    }
 
-        private String _Last;
-        private void ShowStat(Object stat)
-        {
-            var sb = Pool.StringBuilder.Get();
-            var pf = Stat;
-            if (pf != null && pf.Value > 0) sb.AppendFormat("消费:{0} ", pf);
-
-            var msg = sb.Put(true);
-            if (msg.IsNullOrEmpty() || msg == _Last) return;
-            _Last = msg;
+    /// <summary>收到消息事件</summary>
+    public event EventHandler<IList<Message<String, String>>> OnMessage;
 
-            XTrace.WriteLine(msg);
-        }
-        #endregion
-
-        #region 辅助
-        /// <summary>
-        /// 加载设置
-        /// </summary>
-        /// <returns></returns>
-        protected virtual Dictionary<String, Object> LoadSetting()
-        {
-            var cfg = KfkSetting.Current;
+    /// <summary>处理一批消息</summary>
+    /// <param name="messages"></param>
+    protected virtual void OnProcess(IList<Message<String, String>> messages) => OnMessage?.Invoke(this, messages);
+    #endregion
 
-            //// 从配置中心读取集群地址
-            //if (Servers.IsNullOrEmpty()) Servers = ConfigClient.Instance.Get("Kafka.Server");
+    #region 辅助
+    /// <summary>
+    /// 加载设置
+    /// </summary>
+    /// <returns></returns>
+    protected virtual Dictionary<String, Object> LoadSetting()
+    {
+        var cfg = KfkSetting.Current;
 
-            var dic = new Dictionary<String, Object>
-            {
-                { "group.id", GroupID },
-                { "bootstrap.servers", Servers },
-                { "enable.auto.commit", false },
-                { "auto.offset.reset",cfg.AutoReset}
-            };
-
-            // 设置属性
-            if (cfg.MaxMessages >= 0) dic.Add("consume.callback.max.messages", cfg.MaxMessages);
-            if (cfg.FetchMaxBytes >= 0) dic.Add("fetch.max.bytes", cfg.FetchMaxBytes);
-            if (cfg.MaxMessageBytes >= 0) dic.Add("message.max.bytes", cfg.MaxMessageBytes);
-            if (cfg.QMaxMessagesKbytes >= 0) dic.Add("queued.max.messages.kbytes", cfg.QMaxMessagesKbytes);
-            if (cfg.StoreSyncInterval >= 0) dic.Add("offset.store.sync.interval.ms", cfg.StoreSyncInterval);
-            //if (cfg.AutoCommitInterval >= 0) dic.Add("auto.commit.interval.ms", cfg.AutoCommitInterval);
-            if (cfg.FetchWaitTime >= 0) dic.Add("fetch.wait.max.ms", cfg.FetchWaitTime);
-            if (cfg.RecMessageMaxBytes >= 0) dic.Add("receive.message.max.bytes", cfg.RecMessageMaxBytes);
-            if (cfg.FetchMessageMaxBytes >= 0) dic.Add("fetch.message.max.bytes", cfg.FetchMessageMaxBytes);
-
-            return dic;
-        }
+        //// 从配置中心读取集群地址
+        //if (Servers.IsNullOrEmpty()) Servers = ConfigClient.Instance.Get("Kafka.Server");
 
-        /// <summary>
-        /// 写日志
-        /// </summary>
-        /// <param name="sender"></param>
-        /// <param name="e"></param>
-        protected virtual void WriteLog(Object sender, Object e)
+        var dic = new Dictionary<String, Object>
         {
-            //if (e is Message<String, String> msg)
-            //    XTrace.Log.Error($"消费错误:{msg.Error} TopicPartitionOffset={msg.TopicPartitionOffset}");
+            { "group.id", GroupID },
+            { "bootstrap.servers", Servers },
+            { "enable.auto.commit", false },
+            { "auto.offset.reset",cfg.AutoReset}
+        };
+
+        // 设置属性
+        if (cfg.MaxMessages >= 0) dic.Add("consume.callback.max.messages", cfg.MaxMessages);
+        if (cfg.FetchMaxBytes >= 0) dic.Add("fetch.max.bytes", cfg.FetchMaxBytes);
+        if (cfg.MaxMessageBytes >= 0) dic.Add("message.max.bytes", cfg.MaxMessageBytes);
+        if (cfg.QMaxMessagesKbytes >= 0) dic.Add("queued.max.messages.kbytes", cfg.QMaxMessagesKbytes);
+        if (cfg.StoreSyncInterval >= 0) dic.Add("offset.store.sync.interval.ms", cfg.StoreSyncInterval);
+        //if (cfg.AutoCommitInterval >= 0) dic.Add("auto.commit.interval.ms", cfg.AutoCommitInterval);
+        if (cfg.FetchWaitTime >= 0) dic.Add("fetch.wait.max.ms", cfg.FetchWaitTime);
+        if (cfg.RecMessageMaxBytes >= 0) dic.Add("receive.message.max.bytes", cfg.RecMessageMaxBytes);
+        if (cfg.FetchMessageMaxBytes >= 0) dic.Add("fetch.message.max.bytes", cfg.FetchMessageMaxBytes);
+
+        return dic;
+    }
 
-            if (e is Error msg2) XTrace.Log.Error(msg2.Reason);
+    /// <summary>日志</summary>
+    public ILog Log { get; set; } = Logger.Null;
 
-            if (e is LogMessage msg3) XTrace.WriteLine(msg3.Message);
-        }
-        #endregion
-    }
+    /// <summary>
+    /// 写日志
+    /// </summary>
+    /// <param name="format"></param>
+    /// <param name="args"></param>
+    public virtual void WriteLog(String format, params Object[] args) => Log?.Info(format, args);
+    #endregion
 }
\ No newline at end of file
修改 +42 -43
diff --git a/NewLife.Kafka/KfkSetting.cs b/NewLife.Kafka/KfkSetting.cs
index b2db815..d98c749 100644
--- a/NewLife.Kafka/KfkSetting.cs
+++ b/NewLife.Kafka/KfkSetting.cs
@@ -1,47 +1,46 @@
-using NewLife.Xml;
-using System;
+using NewLife.Configuration;
+using NewLife.Xml;
 using System.ComponentModel;
 
-namespace NewLife.Kafka
+namespace NewLife.Kafka;
+
+/// <summary>设置</summary>
+[Config("Kafka")]
+public class KfkSetting : XmlConfig<KfkSetting>
 {
-    /// <summary>设置</summary>
-    [XmlConfigFile("Config/Kafka.config", 15000)]
-    public class KfkSetting : XmlConfig<KfkSetting>
-    {
-        /// <summary>重新开始偏移量</summary>
-        [Description("重新开始偏移量 auto.offset.reset(smallest, earliest, beginning, largest, latest, end, error)")]
-        public String AutoReset { get; set; } = "earliest";
-
-        /// <summary>一次调度的最大消息数量"consume.callback.max.messages"</summary>
-        [Description("consume.callback.max.messages 一次调度的最大消息数量,(0 .. 1000000	)默认(0表示不限制)")]
-        public Int32 MaxMessages { get; set; }
-
-        /// <summary></summary>
-        [Description("最大消息字节 message.max.bytes")]
-        public Int32 MaxMessageBytes { get; set; } = 1_000_000;
-
-        /// <summary></summary>
-        [Description("receive.message.max.bytes")]
-        public Int32 RecMessageMaxBytes { get; set; } = 100_002_976;
-
-        /// <summary>1 .. 1000000000</summary>
-        [Description("fetch.max.bytes 0 .. 2147483135)")]
-        public Int32 FetchMaxBytes { get; set; } = 100_000_000;
-
-        /// <summary>1 .. 2097151</summary>
-        [Description("queued.max.messages.kbytes 队列最大消息大小(1 .. 2097151)")]
-        public Int32 QMaxMessagesKbytes { get; set; } = 1_048_576;
-
-        /// <summary>offset.store.sync.interval.ms 默认-1</summary>
-        [Description("偏移异步存储同步间隔 offset.store.sync.interval.ms")]
-        public Int32 StoreSyncInterval { get; set; } = -1;
-
-        /// <summary>抓取等待时间</summary>
-        [Description("fetch.wait.max.ms 抽取等待最大时间 0 .. 300000(默认100)")]
-        public Int32 FetchWaitTime { get; set; }
-
-        /// <summary>经过测试最大长度如果过长会导致抽取速度降低</summary>
-        [Description("fetch.message.max.bytes 抽取消息最大字节默认1mb,经过测试最大长度如果过长会导致抽取速度降低,建议降低最大消息长度")]
-        public Int32 FetchMessageMaxBytes { get; set; } = 524288;
-    }
+    /// <summary>重新开始偏移量</summary>
+    [Description("重新开始偏移量 auto.offset.reset(smallest, earliest, beginning, largest, latest, end, error)")]
+    public String AutoReset { get; set; } = "earliest";
+
+    /// <summary>一次调度的最大消息数量"consume.callback.max.messages"</summary>
+    [Description("consume.callback.max.messages 一次调度的最大消息数量,(0 .. 1000000	)默认(0表示不限制)")]
+    public Int32 MaxMessages { get; set; }
+
+    /// <summary></summary>
+    [Description("最大消息字节 message.max.bytes")]
+    public Int32 MaxMessageBytes { get; set; } = 1_000_000;
+
+    /// <summary></summary>
+    [Description("receive.message.max.bytes")]
+    public Int32 RecMessageMaxBytes { get; set; } = 100_002_976;
+
+    /// <summary>1 .. 1000000000</summary>
+    [Description("fetch.max.bytes 0 .. 2147483135)")]
+    public Int32 FetchMaxBytes { get; set; } = 100_000_000;
+
+    /// <summary>1 .. 2097151</summary>
+    [Description("queued.max.messages.kbytes 队列最大消息大小(1 .. 2097151)")]
+    public Int32 QMaxMessagesKbytes { get; set; } = 1_048_576;
+
+    /// <summary>offset.store.sync.interval.ms 默认-1</summary>
+    [Description("偏移异步存储同步间隔 offset.store.sync.interval.ms")]
+    public Int32 StoreSyncInterval { get; set; } = -1;
+
+    /// <summary>抓取等待时间</summary>
+    [Description("fetch.wait.max.ms 抽取等待最大时间 0 .. 300000(默认100)")]
+    public Int32 FetchWaitTime { get; set; }
+
+    /// <summary>经过测试最大长度如果过长会导致抽取速度降低</summary>
+    [Description("fetch.message.max.bytes 抽取消息最大字节默认1mb,经过测试最大长度如果过长会导致抽取速度降低,建议降低最大消息长度")]
+    public Int32 FetchMessageMaxBytes { get; set; } = 524288;
 }
修改 +8 -8
diff --git a/NewLife.Kafka/NewLife.Kafka.csproj b/NewLife.Kafka/NewLife.Kafka.csproj
index c8af5b4..800cc80 100644
--- a/NewLife.Kafka/NewLife.Kafka.csproj
+++ b/NewLife.Kafka/NewLife.Kafka.csproj
@@ -1,19 +1,19 @@
 <Project Sdk="Microsoft.NET.Sdk">
   <PropertyGroup>
-    <TargetFrameworks>netstandard2.1</TargetFrameworks>
+    <TargetFrameworks>net462;netstandard2.0</TargetFrameworks>
     <AssemblyName>NewLife.Kafka</AssemblyName>
     <RootNamespace>NewLife.Kafka</RootNamespace>
     <AssemblyTitle>Kafka客户端</AssemblyTitle>
     <Description></Description>
     <Company>新生命开发团队</Company>
-    <Copyright>版权所有(C) 新生命开发团队 2002~2022</Copyright>
-    <VersionPrefix>1.0</VersionPrefix>
+    <Copyright>版权所有(C) 新生命开发团队 2002~2023</Copyright>
+    <VersionPrefix>1.1</VersionPrefix>
     <VersionSuffix>$([System.DateTime]::Now.ToString(`yyyy.MMdd`))</VersionSuffix>
     <Version>$(VersionPrefix).$(VersionSuffix)</Version>
     <FileVersion>$(Version)</FileVersion>
     <AssemblyVersion>$(VersionPrefix).*</AssemblyVersion>
     <Deterministic>false</Deterministic>
-    <OutputPath>..\..\Bin</OutputPath>
+    <OutputPath>..\Bin</OutputPath>
     <GenerateDocumentationFile>True</GenerateDocumentationFile>
     <ImplicitUsings>enable</ImplicitUsings>
     <LangVersion>latest</LangVersion>
@@ -22,11 +22,11 @@
   <PropertyGroup>
     <PackageId>$(AssemblyName)</PackageId>
     <Authors>$(Company)</Authors>
-    <PackageProjectUrl>https://www.yuque.com/smartstone/nx/redis</PackageProjectUrl>
+    <PackageProjectUrl>https://newlifex.com</PackageProjectUrl>
     <PackageIcon>leaf.png</PackageIcon>
     <RepositoryUrl>https://github.com/NewLifeX/NewLife.Kafka</RepositoryUrl>
     <RepositoryType>git</RepositoryType>
-    <PackageTags>新生命团队;X组件;NewLife;$(AssemblyName)</PackageTags>
+    <PackageTags>新生命团队;X组件;NewLife;Kafka;$(AssemblyName)</PackageTags>
     <PackageReleaseNotes></PackageReleaseNotes>
     <PackageLicenseExpression>MIT</PackageLicenseExpression>
     <AllowedOutputExtensionsInPackageBuildOutputFolder>$(AllowedOutputExtensionsInPackageBuildOutputFolder);.pdb</AllowedOutputExtensionsInPackageBuildOutputFolder>
@@ -44,9 +44,9 @@
   </ItemGroup>
 
   <ItemGroup>
-    <PackageReference Include="NewLife.Core" Version="9.0.2022.401" />
+    <PackageReference Include="NewLife.Core" Version="10.6.2023.1001" />
     <PackageReference Include="Confluent.Kafka">
-      <Version>1.8.2</Version>
+      <Version>2.3.0</Version>
     </PackageReference>
   </ItemGroup>
 
修改 +1 -1
diff --git a/Test/Test.csproj b/Test/Test.csproj
index 96a9520..414f340 100644
--- a/Test/Test.csproj
+++ b/Test/Test.csproj
@@ -7,7 +7,7 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="Confluent.Kafka" Version="1.8.2" />
+    <PackageReference Include="Confluent.Kafka" Version="2.3.0" />
   </ItemGroup>
 
 </Project>