trim HttpHeaders
石头 编写于 2024-06-22 08:55:23
X
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using NewLife.Net;
using NewLife.Threading;

namespace NewLife.MessageQueue
{
    /// <summary>主题</summary>
    public class Topic
    {
        #region 属性
        public String Name { get; set; }

        //public List<MQSession> Publicers { get; private set; }

        public List<MQSession> Subscribers { get; private set; }

        public Queue<String> Queue { get; private set; }
        #endregion

        #region 构造函数
        public Topic()
        {
            //Publicers = new List<MQSession>();
            Subscribers = new List<MQSession>();
            Queue = new Queue<String>();
        }
        #endregion

        #region 进入队列
        public Boolean Enqueue(String msg)
        {
            if (Queue.Count > 10000) return false;

            Queue.Enqueue(msg);

            Notify();

            return true;
        }
        #endregion

        #region 推送消息
        private void Notify()
        {
            // 扫描一次,一旦发送有消息,则调用线程池线程处理
            if (_Timer == null)
                _Timer = new TimerX(Push, null, 0, 5000);
            else
                _Timer.NextTime = DateTime.MinValue;
        }

        private TimerX _Timer;

        private void Push(Object state)
        {
            if (Queue.Count == 0) return;
            if (Subscribers.Count == 0) return;

            ThreadPoolX.QueueUserWorkItem(() =>
            {
                while (Queue.Count > 0)
                {
                    // 消息出列
                    var item = Queue.Dequeue();
                    // 向每一个订阅者推送消息
                    foreach (var ss in Subscribers)
                    {
                        ss.SendMessage(item);
                    }
                }
            });
        }
        #endregion
    }
}