优化ETL过滤模块
大石头 authored at 2017-08-29 17:11:46
5.95 KiB
X
using System;
using System.Collections.Concurrent;
using System.ComponentModel;
using System.Threading.Tasks;
using NewLife.Log;

namespace NewLife.MessageQueue
{
    /// <summary>消息队列主机</summary>
    public class MQHost : DisposeBase
    {
        #region 静态单一实例
        /// <summary>默认实例</summary>
        public static MQHost Instance { get; } = new MQHost();
        #endregion

        #region 属性
        /// <summary>名称</summary>
        public String Name { get; set; }

        /// <summary>上线下线提示</summary>
        public Boolean Tip { get; set; }

        ///// <summary>统计</summary>
        //public IStatistics Stat { get; } = new Statistics();
        #endregion

        #region 构造函数
        /// <summary>实例化一个消息队列主机</summary>
        public MQHost()
        {
            Name = GetType().Name.TrimEnd("Host");
        }
        #endregion

        #region 主题
        /// <summary>主题集合</summary>
        private ConcurrentDictionary<String, Topic> Topics { get; } = new ConcurrentDictionary<String, Topic>(StringComparer.OrdinalIgnoreCase);

        /// <summary>获取或添加主题</summary>
        /// <param name="topic">主题</param>
        /// <param name="create">是否创建</param>
        /// <returns></returns>
        private Topic Get(String topic, Boolean create)
        {
            if (create)
            {
                return Topics.GetOrAdd(topic, s =>
                {
                    WriteLog("创建主题 {0}", topic);
                    return new Topic(this, topic);
                });
            }

            Topic tp = null;
            Topics.TryGetValue(topic, out tp);

            return tp;
        }
        #endregion

        #region 订阅管理
        /// <summary>订阅主题</summary>
        /// <param name="user">消费者</param>
        /// <param name="topic">主题。沟通生产者消费者之间的桥梁</param>
        /// <param name="tag">标签。消费者用于在主题队列内部过滤消息</param>
        /// <param name="onMessage">消费消息的回调函数</param>
        /// <param name="userState">订阅者</param>
        /// <returns></returns>
        [DisplayName("订阅主题")]
        public Topic Subscribe(String user, String topic, String tag, Func<Subscriber, Message, Task> onMessage, Object userState = null)
        {
            if (user.IsNullOrEmpty()) throw new ArgumentNullException(nameof(user));
            if (topic.IsNullOrEmpty()) throw new ArgumentNullException(nameof(topic));
            if (onMessage == null) throw new ArgumentNullException(nameof(onMessage));

            var tp = Get(topic, true);
            if (tp != null)
            {
                WriteLog("{0}订阅({1}, {2})", user, topic, tag);

                var rs = tp.Add(user, tag, onMessage, userState);
                // 提示其它订阅者
                if (rs && Tip)
                {
                    var msg = new Message
                    {
                        Sender = user,
                        Topic = topic,
                        Tag = "Subscribe",
                        Content = "订阅主题",
                    };
                    tp.Send(msg);
                }
            }
            return tp;
        }

        /// <summary>取消订阅</summary>
        /// <param name="user">订阅者</param>
        /// <param name="topic">主题。沟通生产者消费者之间的桥梁</param>
        /// <param name="userState">订阅者</param>
        [DisplayName("取消订阅")]
        public void Unsubscribe(String user, String topic, Object userState)
        {
            if (user.IsNullOrEmpty()) throw new ArgumentNullException(nameof(user));

            if (!topic.IsNullOrEmpty())
            {
                WriteLog("{0}取消订阅({1})", user, topic);

                var tp = Get(topic, false);
                if (tp != null)
                {
                    var rs = tp.Remove(user, userState);
                    // 提示其它订阅者
                    if (rs && Tip) Send(user, topic, "Unsubscribe", "取消订阅");
                }
            }
            // 取消当前用户的所有订阅
            else
            {
                WriteLog("{0}取消所有{1}个订阅", user, Topics.Count);

                foreach (var item in Topics.Values)
                {
                    var rs = item.Remove(user, userState);
                    // 提示其它订阅者
                    if (rs && Tip) Send(user, topic, "Unsubscribe", "取消订阅");
                }
            }
        }
        #endregion

        #region 发布管理
        /// <summary>单向发送。不需要反馈</summary>
        /// <param name="msg">消息</param>
        public Int32 Send(Message msg)
        {
            var tp = Get(msg.Topic, true);
            if (tp == null) throw new ArgumentNullException(nameof(msg.Topic), "找不到主题");

            return tp.Send(msg);
        }

        /// <summary>单向发送。不需要反馈</summary>
        /// <param name="user">生产者</param>
        /// <param name="topic">主题</param>
        /// <param name="tag">标签</param>
        /// <param name="content">内容</param>
        public Int32 Send(String user, String topic, String tag, Object content)
        {
            var msg = new Message
            {
                Topic = topic,
                Sender = user,
                Tag = tag,
                Content = content
            };

            return Send(msg);
        }
        #endregion

        #region 日志
        /// <summary>日志</summary>
        public ILog Log { get; set; } = Logger.Null;

        /// <summary>写日志</summary>
        /// <param name="format"></param>
        /// <param name="args"></param>
        public void WriteLog(String format, params Object[] args)
        {
            Log?.Info(Name + " " + format, args);
        }
        #endregion
    }
}