优化ETL过滤模块
大石头 authored at 2017-08-29 17:11:46
7.35 KiB
X
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NewLife.Queue.Protocols.Brokers;
using NewLife.Queue.Protocols.CenterServers.Requests;
using NewLife.Remoting;
using NewLife.Serialization;
using NewLife.Threading;
using NewLife.Net;

namespace NewLife.Queue.Center
{
    public class ClusterManager: IClusterManager
    {
        private readonly ConcurrentDictionary<string /*clusterName*/, Cluster> _clusterDict;
        private readonly CenterServer _centerServer;
        private readonly object _lockObj = new object();
        private TimerX _time;
        public ClusterManager(CenterServer centerServer)
        {
            _centerServer = centerServer;
            _clusterDict = new ConcurrentDictionary<string, Cluster>();
        }
        /// <summary>
        /// 
        /// </summary>
        /// <param name="apiSession"></param>
        /// <param name="request"></param>
        public void RegisterBroker(IApiSession apiSession, BrokerRegistrationRequest request)
        {
            lock (_lockObj)
            {
                var brokerInfo = request.BrokerInfo;
                var cluster = _clusterDict.GetOrAdd(brokerInfo.ClusterName, x => new Cluster { ClusterName = x });
                var brokerGroup = cluster.BrokerGroups.GetOrAdd(brokerInfo.GroupName, x => new BrokerGroup { GroupName = x });
                Broker broker;
                if (!brokerGroup.Brokers.TryGetValue(brokerInfo.BrokerName, out broker))
                {
                    var netSession = apiSession as INetSession;
                    if (netSession != null)
                    {
                        var connectionId = netSession.Remote.EndPoint.ToAddress();
                        broker = new Broker
                        {
                            BrokerInfo = request.BrokerInfo,
                            TotalSendThroughput = request.TotalSendThroughput,
                            TotalConsumeThroughput = request.TotalConsumeThroughput,
                            TotalUnConsumedMessageCount = request.TotalUnConsumedMessageCount,
                            TopicQueueInfoList = request.TopicQueueInfoList,
                            TopicConsumeInfoList = request.TopicConsumeInfoList,
                            ProducerList = request.ProducerList,
                            ConsumerList = request.ConsumerList,
                            ApiSession = apiSession,
                            ConnectionId = connectionId,
                            LastActiveTime = DateTime.Now,
                            FirstRegisteredTime = DateTime.Now,
                            Group = brokerGroup
                        };
                        if (brokerGroup.Brokers.TryAdd(brokerInfo.BrokerName, broker))
                        {
                            _centerServer.Log.Info("Registered new broker, brokerInfo: {0}", brokerInfo.ToJson());
                        }
                    }
                    else
                    {
                        _centerServer.Log.Error("Registered broker Erro, brokerInfo: {0}", brokerInfo.ToJson());
                    }
                   
                }
                else
                {
                    broker.LastActiveTime = DateTime.Now;
                    broker.TotalSendThroughput = request.TotalSendThroughput;
                    broker.TotalConsumeThroughput = request.TotalConsumeThroughput;
                    broker.TotalUnConsumedMessageCount = request.TotalUnConsumedMessageCount;

                    if (!broker.BrokerInfo.IsEqualsWith(request.BrokerInfo))
                    {
                        _centerServer.Log.Info("Broker basicInfo changed, old: {0}, new: {1}", broker.BrokerInfo, request.BrokerInfo);
                        broker.BrokerInfo = request.BrokerInfo;
 
                    }

                    broker.TopicQueueInfoList = request.TopicQueueInfoList;
                    broker.TopicConsumeInfoList = request.TopicConsumeInfoList;
                    broker.ProducerList = request.ProducerList;
                    broker.ConsumerList = request.ConsumerList;
                }
            }
        }

        public void Start()
        {
            lock (_lockObj)
            {
                _clusterDict.Clear();
            }
            _time = new TimerX(s => { ScanNotActiveBroker(); }, null, 1000, 1000);
        }
        public void Shutdown()
        {
            lock (_lockObj)
            {
                _clusterDict.Clear();
            }
            _time?.Dispose();
        }

        private void ScanNotActiveBroker()
        {
            lock (_lockObj)
            {
                foreach (var cluster in _clusterDict.Values)
                {
                    foreach (var brokerGroup in cluster.BrokerGroups.Values)
                    {
                        var notActiveBrokers = new List<Broker>();
                        foreach (var broker in brokerGroup.Brokers.Values)
                        {
                            if (broker.IsTimeout(_centerServer.CurrentCfg.BrokerInactiveMaxMilliseconds))
                            {
                                notActiveBrokers.Add(broker);
                            }
                        }
                        if (notActiveBrokers.Count > 0)
                        {
                            foreach (var broker in notActiveBrokers)
                            {
                                Broker removed;
                                if (brokerGroup.Brokers.TryRemove(broker.BrokerInfo.BrokerName, out removed))
                                {
                                    //_logger.InfoFormat("Removed timeout broker, brokerInfo: {0}, lastActiveTime: {1}", _jsonSerializer.Serialize(removed.BrokerInfo), removed.LastActiveTime);
                                }
                            }
                        }
                    }
                }
            }
        }
    }
    class Broker
    {
        public BrokerInfo BrokerInfo { get; set; }
        public long TotalSendThroughput { get; set; }
        public long TotalConsumeThroughput { get; set; }
        public long TotalUnConsumedMessageCount { get; set; }
        public IList<TopicQueueInfo> TopicQueueInfoList = new List<TopicQueueInfo>();
        public IList<TopicConsumeInfo> TopicConsumeInfoList = new List<TopicConsumeInfo>();
        public IList<string> ProducerList = new List<string>();
        public IList<ConsumerInfo> ConsumerList = new List<ConsumerInfo>();
        public string ConnectionId { get; set; }
        public IApiSession ApiSession { get; set; }
        public DateTime LastActiveTime { get; set; }
        public BrokerGroup Group { get; set; }
        public DateTime FirstRegisteredTime { get; set; }

        public bool IsTimeout(double timeoutMilliseconds)
        {
            return (DateTime.Now - LastActiveTime).TotalMilliseconds >= timeoutMilliseconds;
        }
    }

    class BrokerGroup
    {
        public string GroupName { get; set; }
        public ConcurrentDictionary<string /*brokerName*/, Broker> Brokers = new ConcurrentDictionary<string, Broker>();
    }
    class Cluster
    {
        public string ClusterName { get; set; }
        public ConcurrentDictionary<string /*groupName*/, BrokerGroup> BrokerGroups = new ConcurrentDictionary<string, BrokerGroup>();
    }

}