消息转发测试通过
Stone authored at 2016-03-17 12:52:54
3.40 KiB
X
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using NewLife.Net;
using NewLife.Net.Sockets;

namespace NewLife.MessageQueue
{
    /// <summary>MQ服务器</summary>
    public class MQServer : NetServer<MQSession>
    {
        public IDictionary<String, Topic> Topics { get; private set; }

        /// <summary>实例化</summary>
        public MQServer()
        {
            Port = 2234;

            Topics = new Dictionary<string, Topic>(StringComparer.OrdinalIgnoreCase);
        }
    }

    /// <summary>MQ会话</summary>
    public class MQSession : NetSession
    {
        #region 属性
        /// <summary>名称</summary>
        public String Name { get; set; }

        public MQServer Host { get; private set; }

        /// <summary>发布或订阅的主题。暂时没想好怎么做发布多主题或者订阅多主题</summary>
        public Topic Topic { get; set; }
        #endregion

        #region 主要方法
        public override void Start()
        {
            base.Start();

            Host = (this as INetSession).Host as MQServer;
        }

        protected override void OnReceive(ReceivedEventArgs e)
        {
            base.OnReceive(e);

            var str = e.ToStr();

            var act = str.Substring(null, "+");
            str = str.Substring("+");

            switch (act)
            {
                case "Name":
                    OnName(str);
                    break;
                case "Public":
                    OnPublic(str);
                    break;
                case "Subscribe":
                    OnSubscribe(str);
                    break;
                case "Message":
                    OnMessage(str);
                    break;
                default:
                    WriteLog("MQ会话收到:{0} {1}", act, str);
                    break;
            }
        }
        #endregion

        #region 标识
        protected virtual void OnName(String str)
        {
            Name = str;
            LogPrefix = Name;

            WriteLog("名称:{0}", Name);
        }
        #endregion

        #region 发布主题
        protected virtual void OnPublic(String str)
        {
            WriteLog("发布:{0}", str);

            Topic tp = null;
            if (!Host.Topics.TryGetValue(str, out tp))
            {
                tp = new Topic();
                tp.Name = str;
                Host.Topics.Add(str, tp);
            }

            Topic = tp;
        }
        #endregion

        #region 订阅主题
        protected virtual void OnSubscribe(String str)
        {
            WriteLog("订阅:{0}", str);

            Topic tp = null;
            if (Host.Topics.TryGetValue(str, out tp))
            {
                Topic = tp;

                tp.Subscribers.Add(this);
            }
        }
        #endregion

        #region 发送消息
        protected virtual void OnMessage(String str)
        {
            WriteLog("消息:{0}", str);

            if (Topic != null) Topic.Enqueue(str);
        }
        #endregion

        #region 推送消息
        public virtual Boolean SendMessage(String str)
        {
            Send(str);

            return true;
        }
        #endregion

        #region 辅助
        #endregion
    }
}