优化ETL过滤模块
大石头 authored at 2017-08-29 17:11:46
2.15 KiB
X
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using NewLife.Queue.Center.Controllers;
using NewLife.Queue.Storage;
using NewLife.Queue.Utilities;

namespace NewLife.Queue.Broker.DeleteMessageStrategies
{
    public class DeleteMessageByTimeStrategy : IDeleteMessageStrategy
    {
        /// <summary>表示消息可以保存的最大小时数;
        /// <remarks>
        /// 比如设置为24 * 7,则表示如果某个chunk里的所有消息都消费过了,且该chunk里的所有消息都是24 * 7小时之前存储的,则该chunk就可以被删除了。
        /// 默认值为24 * 30,即保存一个月;用户可以根据自己服务器磁盘的大小决定消息可以保留多久。
        /// </remarks>
        /// </summary>
        public int MaxStorageHours { get; private set; }

        public DeleteMessageByTimeStrategy(int maxStorageHours = 24 * 30)
        {
            Ensure.Positive(maxStorageHours, "maxStorageHours");
            MaxStorageHours = maxStorageHours;
        }

        public IEnumerable<Chunk> GetAllowDeleteChunks(ChunkManager chunkManager, long maxMessagePosition)
        {
            var chunks = new List<Chunk>();
            var allCompletedChunks = chunkManager
                .GetAllChunks()
                .Where(x => x.IsCompleted && CheckMessageConsumeOffset(x, maxMessagePosition))
                .OrderBy(x => x.ChunkHeader.ChunkNumber);

            foreach (var chunk in allCompletedChunks)
            {
                var lastWriteTime = new FileInfo(chunk.FileName).LastWriteTime;
                var storageHours = (DateTime.Now - lastWriteTime).TotalHours;
                if (storageHours >= MaxStorageHours)
                {
                    chunks.Add(chunk);
                }
            }

            return chunks;
        }

        private bool CheckMessageConsumeOffset(Chunk currentChunk, long maxMessagePosition)
        {
            if (BrokerService.Instance.Setting.DeleteMessageIgnoreUnConsumed)
            {
                return true;
            }
            return currentChunk.ChunkHeader.ChunkDataEndPosition <= maxMessagePosition;
        }
    }
}