9.8.2018.0630
大石头 编写于 2018-06-30 11:15:32
X
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using NewLife.Log;

namespace NewLife.MessageQueue
{
    /// <summary>消费者。相同标识的多个订阅者,构成消费者集群</summary>
    public class Consumer
    {
        /// <summary>主机</summary>
        public Topic Host { get; internal set; }

        /// <summary>用户</summary>
        public String User { get; }

        /// <summary>订阅者</summary>
        public ConcurrentDictionary<Object, Subscriber> Subscribers { get; } = new ConcurrentDictionary<Object, Subscriber>();

        /// <summary>实例化</summary>
        /// <param name="user"></param>
        public Consumer(String user)
        {
            User = user;
        }

        #region 订阅管理
        /// <summary>添加订阅者</summary>
        /// <param name="user">订阅者</param>
        /// <param name="tag">标签。消费者用于在主题队列内部过滤消息</param>
        /// <param name="onMessage">消费消息的回调函数</param>
        /// <returns></returns>
        public Boolean Add(Object user, String tag, Func<Subscriber, Message, Task> onMessage)
        {
            if (user == null) user = "";
            if (Subscribers.ContainsKey(user)) return false;

            var scb = new Subscriber(user, tag, onMessage)
            {
                Host = this
            };
            Subscribers[user] = scb;

            // 自动删除
            if (user is IDisposable2 dp) dp.OnDisposed += (s, e) => Remove(user);

            return true;
        }

        /// <summary>移除订阅者</summary>
        /// <param name="user">订阅者</param>
        /// <returns></returns>
        public Boolean Remove(Object user)
        {
            if (user == null) user = "";
            if (!Subscribers.Remove(user)) return false;

            return true;
        }
        #endregion

        private Int32 _next = 0;
        internal async Task<Boolean> Dispatch(Message msg)
        {
            // 向其中一个订阅者推送消息
            var ss = Subscribers.ToValueArray();
            for (var i = 0; i < ss.Count; i++)
            {
                var idx = _next + i;
                if (idx >= ss.Count) idx = 0;

                var item = ss[idx];
                if (item.IsMatch(msg))
                {
                    try
                    {
                        // 如果出错,会自动选择下一个订阅者
                        _next = idx + 1;
                        await item.NoitfyAsync(msg);
                        return true;
                    }
                    catch { }
                }
            }

            return false;
        }

        #region 日志
        /// <summary>日志</summary>
        public ILog Log { get; set; } = Logger.Null;

        /// <summary>写日志</summary>
        /// <param name="format"></param>
        /// <param name="args"></param>
        public void WriteLog(String format, params Object[] args)
        {
            Log?.Info(format, args);
        }
        #endregion
    }
}