v9.7.2018.0421   支持运行时修改DAL连接字符串
大石头 authored at 2018-04-21 14:00:47
15.05 KiB
X
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using ECommon.Storage;
using NewLife.Queue.Utilities;
using NewLife.Log;
using NewLife.Model;
using NewLife.Queue.Scheduling;

namespace NewLife.Queue.Storage
{
    public class ChunkManager : IDisposable
    {
        private readonly object _lockObj = new object();
        private readonly ChunkManagerConfig _config;
        private readonly IDictionary<int, Chunk> _chunks;
        private readonly string _chunkPath;
        private readonly bool _isMemoryMode;
        private int _nextChunkNumber;
        private int _uncachingChunks;
        private int _isCachingNextChunk;
        private ConcurrentDictionary<int, BytesInfo> _bytesWriteDict;
        private ConcurrentDictionary<int, CountInfo> _fileReadDict;
        private ConcurrentDictionary<int, CountInfo> _unmanagedReadDict;
        private ConcurrentDictionary<int, CountInfo> _cachedReadDict;

        class BytesInfo
        {
            public long PreviousBytes;
            public long CurrentBytes;

            public long UpgradeBytes()
            {
                var incrementBytes = CurrentBytes - PreviousBytes;
                PreviousBytes = CurrentBytes;
                return incrementBytes;
            }
        }
        class CountInfo
        {
            public long PreviousCount;
            public long CurrentCount;

            public long UpgradeCount()
            {
                var incrementCount = CurrentCount - PreviousCount;
                PreviousCount = CurrentCount;
                return incrementCount;
            }
        }

        public string Name { get; private set; }
        public ChunkManagerConfig Config { get { return _config; } }
        public string ChunkPath { get { return _chunkPath; } }
        public bool IsMemoryMode { get { return _isMemoryMode; } }

        public ChunkManager(string name, ChunkManagerConfig config, bool isMemoryMode, string relativePath = null)
        {
            Ensure.NotNull(name, "name");
            Ensure.NotNull(config, "config");

            Name = name;
            _config = config;
            _isMemoryMode = isMemoryMode;
            if (string.IsNullOrEmpty(relativePath))
            {
                _chunkPath = _config.BasePath;
            }
            else
            {
                _chunkPath = Path.Combine(_config.BasePath, relativePath);
            }
            if (!Directory.Exists(_chunkPath))
            {
                Directory.CreateDirectory(_chunkPath);
            }
            _chunks = new ConcurrentDictionary<int, Chunk>();
            _bytesWriteDict = new ConcurrentDictionary<int, BytesInfo>();
            _fileReadDict = new ConcurrentDictionary<int, CountInfo>();
            _unmanagedReadDict = new ConcurrentDictionary<int, CountInfo>();
            _cachedReadDict = new ConcurrentDictionary<int, CountInfo>();
        }

        public void Load<T>(Func<byte[], T> readRecordFunc) where T : ILogRecord
        {
            if (_isMemoryMode) return;

            lock (_lockObj)
            {
                if (!Directory.Exists(_chunkPath))
                {
                    Directory.CreateDirectory(_chunkPath);
                }

                var tempFiles = _config.FileNamingStrategy.GetTempFiles(_chunkPath);
                foreach (var file in tempFiles)
                {
                    File.SetAttributes(file, FileAttributes.Normal);
                    File.Delete(file);
                }

                var files = _config.FileNamingStrategy.GetChunkFiles(_chunkPath);
                if (files.Length > 0)
                {
                    var cachedChunkCount = 0;
                    for (var i = files.Length - 2; i >= 0; i--)
                    {
                        var file = files[i];
                        var chunk = Chunk.FromCompletedFile(file, this, _config, _isMemoryMode);
                        if (_config.EnableCache && cachedChunkCount < _config.PreCacheChunkCount)
                        {
                            if (chunk.TryCacheInMemory(false))
                            {
                                cachedChunkCount++;
                            }
                        }
                        AddChunk(chunk);
                    }
                    var lastFile = files[files.Length - 1];
                    AddChunk(Chunk.FromOngoingFile(lastFile, this, _config, readRecordFunc, _isMemoryMode));
                }

                if (_config.EnableCache)
                {
                    QueueService.ScheduleService.StartTask("UncacheChunks", () => UncacheChunks(), 1000, 1000);
                }
            }
        }
        public int GetChunkCount()
        {
            return _chunks.Count;
        }
        public IList<Chunk> GetAllChunks()
        {
            return _chunks.Values.ToList();
        }
        public Chunk AddNewChunk()
        {
            lock (_lockObj)
            {
                var chunkNumber = _nextChunkNumber;
                var chunkFileName = _config.FileNamingStrategy.GetFileNameFor(_chunkPath, chunkNumber);
                var chunk = Chunk.CreateNew(chunkFileName, chunkNumber, this, _config, _isMemoryMode);

                AddChunk(chunk);

                return chunk;
            }
        }
        public Chunk GetFirstChunk()
        {
            lock (_lockObj)
            {
                if (_chunks.Count == 0)
                {
                    AddNewChunk();
                }
                var minChunkNum = _chunks.Keys.Min();
                return _chunks[minChunkNum];
            }
        }
        public Chunk GetLastChunk()
        {
            lock (_lockObj)
            {
                if (_chunks.Count == 0)
                {
                    AddNewChunk();
                }
                return _chunks[_nextChunkNumber - 1];
            }
        }
        public int GetChunkNum(long dataPosition)
        {
            return (int)(dataPosition / _config.GetChunkDataSize());
        }
        public Chunk GetChunkFor(long dataPosition)
        {
            var chunkNum = (int)(dataPosition / _config.GetChunkDataSize());
            return GetChunk(chunkNum);
        }
        public Chunk GetChunk(int chunkNum)
        {
            if (_chunks.ContainsKey(chunkNum))
            {
                return _chunks[chunkNum];
            }
            return null;
        }
        public bool RemoveChunk(Chunk chunk)
        {
            lock (_lockObj)
            {
                if (_chunks.Remove(chunk.ChunkHeader.ChunkNumber))
                {
                    try
                    {
                        chunk.Destroy();
                    }
                    catch (Exception ex)
                    {
                        Log.Error(string.Format("Destroy chunk {0} has exception.", chunk), ex);
                    }
                    return true;
                }
                return false;
            }
        }
        public void TryCacheNextChunk(Chunk currentChunk)
        {
            if (!_config.EnableCache) return;

            if (Interlocked.CompareExchange(ref _isCachingNextChunk, 1, 0) == 0)
            {
                try
                {
                    var nextChunkNumber = currentChunk.ChunkHeader.ChunkNumber + 1;
                    var nextChunk = GetChunk(nextChunkNumber);
                    if (nextChunk != null && !nextChunk.IsMemoryChunk && nextChunk.IsCompleted && !nextChunk.HasCachedChunk)
                    {
                        Task.Factory.StartNew(() => nextChunk.TryCacheInMemory(false));
                    }
                }
                finally
                {
                    Interlocked.Exchange(ref _isCachingNextChunk, 0);
                }
            }
        }

        public void Start()
        {
            if (_config.EnableChunkStatistic)
            {
                QueueService.ScheduleService.StartTask("LogChunkStatisticStatus", LogChunkStatisticStatus, 1000, 1000);
            }
        }
        public void Shutdown()
        {
            if (_config.EnableChunkStatistic)
            {
                QueueService.ScheduleService.StopTask("LogChunkStatisticStatus");
            }
        }
        public void AddWriteBytes(int chunkNum, int byteCount)
        {
            _bytesWriteDict.AddOrUpdate(chunkNum, GetDefaultBytesInfo, (chunkNumber, current) => UpdateBytesInfo(chunkNumber, current, byteCount));
        }
        public void AddFileReadCount(int chunkNum)
        {
            _fileReadDict.AddOrUpdate(chunkNum, GetDefaultCountInfo, UpdateCountInfo);
        }
        public void AddUnmanagedReadCount(int chunkNum)
        {
            _unmanagedReadDict.AddOrUpdate(chunkNum, GetDefaultCountInfo, UpdateCountInfo);
        }
        public void AddCachedReadCount(int chunkNum)
        {
            _cachedReadDict.AddOrUpdate(chunkNum, GetDefaultCountInfo, UpdateCountInfo);
        }

        public void Dispose()
        {
            Close();
        }
        public void Close()
        {
            lock (_lockObj)
            {
                QueueService.ScheduleService.StopTask("UncacheChunks");

                foreach (var chunk in _chunks.Values)
                {
                    try
                    {
                        chunk.Close();
                    }
                    catch (Exception ex)
                    {
                        Log.Error(string.Format("Chunk {0} close failed.", chunk), ex);
                    }
                }
            }
        }

        private void AddChunk(Chunk chunk)
        {
            _chunks.Add(chunk.ChunkHeader.ChunkNumber, chunk);
            _nextChunkNumber = chunk.ChunkHeader.ChunkNumber + 1;
        }
        private int UncacheChunks(int maxUncacheCount = 10)
        {
            var uncachedCount = 0;

            if (Interlocked.CompareExchange(ref _uncachingChunks, 1, 0) == 0)
            {
                try
                {
                    var usedMemoryPercent = ChunkUtil.GetUsedMemoryPercent();
                    if (usedMemoryPercent <= (ulong)_config.ChunkCacheMinPercent)
                    {
                        return 0;
                    }

                    if (Log.Level == LogLevel.Info)
                    {
                        Log.Debug("Current memory usage {0}% exceed the chunkCacheMinPercent {1}%, try to uncache chunks.", usedMemoryPercent, _config.ChunkCacheMinPercent);
                    }

                    var chunks = _chunks.Values.Where(x => x.IsCompleted && !x.IsMemoryChunk && x.HasCachedChunk).OrderBy(x => x.LastActiveTime).ToList();

                    foreach (var chunk in chunks)
                    {
                        if ((DateTime.Now - chunk.LastActiveTime).TotalSeconds >= _config.ChunkInactiveTimeMaxSeconds)
                        {
                            if (chunk.UnCacheFromMemory())
                            {
                                Thread.Sleep(100); //即便有内存释放了,由于通过API读取到的内存使用数可能不会立即更新,所以等待一定时间后检查内存是否足够
                                uncachedCount++;
                                if (uncachedCount >= maxUncacheCount || ChunkUtil.GetUsedMemoryPercent() <= (ulong)_config.ChunkCacheMinPercent)
                                {
                                    break;
                                }
                            }
                        }
                    }

                    if (Log.Level== LogLevel.Info)
                    {
                        if (uncachedCount > 0)
                        {
                            Log.Debug("Uncached {0} chunks, current memory usage: {1}%", uncachedCount, ChunkUtil.GetUsedMemoryPercent());
                        }
                        else
                        {
                            Log.Debug("No chunks uncached.");
                        }
                    }
                }
                catch (Exception ex)
                {
                    Log.Error("Uncaching chunks has exception.", ex);
                }
                finally
                {
                    Interlocked.Exchange(ref _uncachingChunks, 0);
                }
            }

            return uncachedCount;
        }
        private CountInfo GetDefaultCountInfo(int chunkNum)
        {
            return new CountInfo { CurrentCount = 1 };
        }
        private CountInfo UpdateCountInfo(int chunkNum, CountInfo countInfo)
        {
            Interlocked.Increment(ref countInfo.CurrentCount);
            return countInfo;
        }
        private BytesInfo GetDefaultBytesInfo(int chunkNum)
        {
            return new BytesInfo();
        }
        private BytesInfo UpdateBytesInfo(int chunkNum, BytesInfo bytesInfo, int bytesAdd)
        {
            Interlocked.Add(ref bytesInfo.CurrentBytes, bytesAdd);
            return bytesInfo;
        }
        private void LogChunkStatisticStatus()
        {
            if (Log.Level != LogLevel.Info) return;
            var bytesWriteStatus = UpdateWriteStatus(_bytesWriteDict);
            var unmanagedReadStatus = UpdateReadStatus(_unmanagedReadDict);
            var fileReadStatus = UpdateReadStatus(_fileReadDict);
            var cachedReadStatus = UpdateReadStatus(_cachedReadDict);
            Log.Debug("{0}, maxChunk:#{1}, write:{2}, unmanagedCacheRead:{3}, localCacheRead:{4}, fileRead:{5}", Name, GetLastChunk().ChunkHeader.ChunkNumber, bytesWriteStatus, unmanagedReadStatus, cachedReadStatus, fileReadStatus);
        }
        private string UpdateWriteStatus(ConcurrentDictionary<int, BytesInfo> dict)
        {
            var list = new List<string>();
            var toRemoveKeys = new List<int>();

            foreach (var entry in dict)
            {
                var chunkNum = entry.Key;
                var throughput = entry.Value.UpgradeBytes() / 1024;
                if (throughput > 0)
                {
                    list.Add(string.Format("[chunk:#{0},bytes:{1}KB]", chunkNum, throughput));
                }
                else
                {
                    toRemoveKeys.Add(chunkNum);
                }
            }

            foreach (var key in toRemoveKeys)
            {
                _bytesWriteDict.Remove(key);
            }

            return list.Count == 0 ? "[]" : string.Join(",", list);
        }
        private string UpdateReadStatus(ConcurrentDictionary<int, CountInfo> dict)
        {
            var list = new List<string>();

            foreach (var entry in dict)
            {
                var chunkNum = entry.Key;
                var throughput = entry.Value.UpgradeCount();
                if (throughput > 0)
                {
                    list.Add(string.Format("[chunk:#{0},count:{1}]", chunkNum, throughput));
                }
            }

            return list.Count == 0 ? "[]" : string.Join(",", list);
        }


        #region 日志
        /// <summary>日志</summary>
        public ILog Log { get; set; } = QueueService.Log;
        #endregion
    }
}