v9.7.2018.0421   支持运行时修改DAL连接字符串
大石头 authored at 2018-04-21 14:00:47
7.33 KiB
X
using System;
using System.IO;
using System.Linq;
using NewLife.Log;
using NewLife.Model;
using NewLife.Queue.Center.Controllers;
using NewLife.Queue.Protocols.Brokers;
using NewLife.Queue.Storage;
using NewLife.Remoting;
using NewLife.Serialization;

namespace NewLife.Queue.Broker
{
    public interface IQueue
    {
        string Topic { get; }
        int QueueId { get; }
        long NextOffset { get; }
        long IncrementNextOffset();
    }
    public class Queue : IQueue
    {
        private const string QueueSettingFileName = "queue.setting";
        private readonly ChunkWriter _chunkWriter;
        private readonly ChunkReader _chunkReader;
        private readonly ChunkManager _chunkManager;
        private readonly string _queueSettingFile;
        private QueueSetting _setting;
        private long _nextOffset = 0;
        private ILog _logger= QueueService.Log;

        public string Topic { get; private set; }
        public int QueueId { get; private set; }
        public long NextOffset { get { return _nextOffset; } }
        public QueueSetting Setting { get { return _setting; } }
        public QueueKey Key { get; private set; }

        public Queue(string topic, int queueId)
        {
            Topic = topic;
            QueueId = queueId;
            Key = new QueueKey(topic, queueId);


            _chunkManager = new ChunkManager("QueueChunk-" + Key.ToString(), BrokerService.Instance.Setting.QueueChunkConfig, BrokerService.Instance.Setting.IsMessageStoreMemoryMode, Topic + @"\" + QueueId);
            _chunkWriter = new ChunkWriter(_chunkManager);
            _chunkReader = new ChunkReader(_chunkManager, _chunkWriter);
            _queueSettingFile = Path.Combine(_chunkManager.ChunkPath, QueueSettingFileName);

        }

        public void Load()
        {
            _setting = LoadQueueSetting();
            if (_setting == null)
            {
                _setting = new QueueSetting();
                SaveQueueSetting();
            }
            if (_setting.IsDeleted)
            {
                return;
            }
            _chunkManager.Load(ReadMessageIndex);
            _chunkWriter.Open();

            var lastChunk = _chunkManager.GetLastChunk();
            var lastOffsetGlobalPosition = lastChunk.DataPosition + lastChunk.ChunkHeader.ChunkDataStartPosition;
            if (lastOffsetGlobalPosition > 0)
            {
                _nextOffset = lastOffsetGlobalPosition / _chunkManager.Config.ChunkDataUnitSize;
            }
        }
        public void Close()
        {
            _chunkWriter.Close();
            _chunkManager.Close();
        }
        public void AddMessage(long messagePosition, string messageTag)
        {
            _chunkWriter.Write(new QueueLogRecord(messagePosition + 1, messageTag.Length));
        }
        public long GetMessagePosition(long queueOffset, out int tagCode, bool autoCache = true)
        {
            tagCode = 0;

            var position = queueOffset * _chunkManager.Config.ChunkDataUnitSize;
            var record = _chunkReader.TryReadAt(position, ReadMessageIndex, autoCache);
            if (record == null)
            {
                return -1L;
            }

            tagCode = record.TagCode;
            return record.MessageLogPosition - 1;
        }
        public void SetProducerVisible(bool visible)
        {
            _setting.ProducerVisible = visible;
            SaveQueueSetting();
        }
        public void SetConsumerVisible(bool visible)
        {
            _setting.ConsumerVisible = visible;
            SaveQueueSetting();
        }
        public void Delete()
        {
            _setting.IsDeleted = true;
            SaveQueueSetting();

            Close();

            if (!_chunkManager.IsMemoryMode)
            {
                Directory.Delete(_chunkManager.ChunkPath, true);
            }
        }
        public long IncrementNextOffset()
        {
            return _nextOffset++;
        }
        public long GetMinQueueOffset()
        {
            if (_nextOffset == 0L)
            {
                return -1L;
            }
            return _chunkManager.GetFirstChunk().ChunkHeader.ChunkDataStartPosition / _chunkManager.Config.ChunkDataUnitSize;
        }
        public void DeleteMessages(long minMessagePosition)
        {
            var chunks = _chunkManager.GetAllChunks().Where(x => x.IsCompleted).OrderBy(x => x.ChunkHeader.ChunkNumber);

            foreach (var chunk in chunks)
            {
                var maxPosition = chunk.ChunkHeader.ChunkDataEndPosition - _chunkManager.Config.ChunkDataUnitSize;
                var record = _chunkReader.TryReadAt(maxPosition, ReadMessageIndex, false);
                if (record == null)
                {
                    continue;
                }
                var chunkLastMessagePosition = record.MessageLogPosition - 1;
                if (chunkLastMessagePosition < minMessagePosition)
                {
                    if (_chunkManager.RemoveChunk(chunk))
                    {
                        _logger.Info("Queue (topic: {0}, queueId: {1}) chunk #{2} is deleted, chunkLastMessagePosition: {3}, messageStoreMinMessagePosition: {4}", Topic, QueueId, chunk.ChunkHeader.ChunkNumber, chunkLastMessagePosition, minMessagePosition);
                    }
                }
            }
        }

        private QueueLogRecord ReadMessageIndex(byte[] recordBuffer)
        {
            var record = new QueueLogRecord();
            record.ReadFrom(recordBuffer);
            if (record.MessageLogPosition <= 0)
            {
                return null;
            }
            return record;
        }
        private QueueSetting LoadQueueSetting()
        {
            if (_chunkManager.IsMemoryMode)
            {
                return null;
            }
            if (!Directory.Exists(_chunkManager.ChunkPath))
            {
                Directory.CreateDirectory(_chunkManager.ChunkPath);
            }
            using (var stream = new FileStream(_queueSettingFile, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite))
            {
                using (var reader = new StreamReader(stream))
                {
                    var text = reader.ReadToEnd();
                    if (!string.IsNullOrEmpty(text))
                    {
                        return new JsonEncoder().Convert<QueueSetting>(text);
                    }
                    return null;
                }
            }
        }
        private void SaveQueueSetting()
        {
            if (_chunkManager.IsMemoryMode)
            {
                return;
            }
            if (!Directory.Exists(_chunkManager.ChunkPath))
            {
                Directory.CreateDirectory(_chunkManager.ChunkPath);
            }
            using (var stream = new FileStream(_queueSettingFile, FileMode.Create, FileAccess.ReadWrite, FileShare.ReadWrite))
            {
                using (var writer = new StreamWriter(stream))
                {
                    writer.Write(new JsonEncoder().Encode(_setting));
                }
            }
        }
    }
    public class QueueSetting
    {
        public bool ProducerVisible;
        public bool ConsumerVisible;
        public bool IsDeleted;

        public QueueSetting()
        {
            ProducerVisible = true;
            ConsumerVisible = true;
            IsDeleted = false;
        }
    }
}