v7.3.2018.0614   重构高性能资源池,减少GC压力,增加线程池,让异步任务得到平等竞争CPU的机会
大石头 authored at 2018-06-14 17:56:44
48.78 KiB
X
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using ECommon.Storage;
using NewLife.Log;
using NewLife.Queue.Storage.Exceptions;
using NewLife.Queue.Utilities;


namespace NewLife.Queue.Storage
{
    public unsafe class Chunk : IDisposable
    {
        #region Private Variables

        private ChunkHeader _chunkHeader;
        private ChunkFooter _chunkFooter;

        private readonly string _filename;
        private readonly ChunkManager _chunkManager;
        private readonly ChunkManagerConfig _chunkConfig;
        private readonly bool _isMemoryChunk;
        private readonly ConcurrentQueue<ReaderWorkItem> _readerWorkItemQueue = new ConcurrentQueue<ReaderWorkItem>();

        private readonly object _writeSyncObj = new object();
        private readonly object _cacheSyncObj = new object();
        private readonly object _freeMemoryObj = new object();

        private int _dataPosition;
        private bool _isCompleted;
        private bool _isDestroying;
        private bool _isMemoryFreed;
        private int _cachingChunk;
        private DateTime _lastActiveTime;
        private bool _isReadersInitialized;
        private int _flushedDataPosition;

        private Chunk _memoryChunk;
        private CacheItem[] _cacheItems;
        private IntPtr _cachedData;
        private int _cachedLength;

        private WriterWorkItem _writerWorkItem;

        #endregion

        #region Public Properties

        public string FileName { get { return _filename; } }
        public ChunkHeader ChunkHeader { get { return _chunkHeader; } }
        public ChunkFooter ChunkFooter { get { return _chunkFooter; } }
        public ChunkManagerConfig Config { get { return _chunkConfig; } }
        public bool IsCompleted { get { return _isCompleted; } }
        public DateTime LastActiveTime
        {
            get
            {
                var lastActiveTimeOfMemoryChunk = DateTime.MinValue;
                if (_memoryChunk != null)
                {
                    lastActiveTimeOfMemoryChunk = _memoryChunk.LastActiveTime;
                }
                return lastActiveTimeOfMemoryChunk >= _lastActiveTime ? lastActiveTimeOfMemoryChunk : _lastActiveTime;
            }
        }
        public bool IsMemoryChunk { get { return _isMemoryChunk; } }
        public bool HasCachedChunk { get { return _memoryChunk != null; } }
        public int DataPosition { get { return _dataPosition; } }
        public long GlobalDataPosition
        {
            get
            {
                return ChunkHeader.ChunkDataStartPosition + DataPosition;
            }
        }
        public bool IsFixedDataSize()
        {
            return _chunkConfig.ChunkDataUnitSize > 0 && _chunkConfig.ChunkDataCount > 0;
        }

        #endregion

        #region Constructors

        private Chunk(string filename, ChunkManager chunkManager, ChunkManagerConfig chunkConfig, bool isMemoryChunk)
        {
            Ensure.NotNullOrEmpty(filename, "filename");
            Ensure.NotNull(chunkManager, "chunkManager");
            Ensure.NotNull(chunkConfig, "chunkConfig");

            _filename = filename;
            _chunkManager = chunkManager;
            _chunkConfig = chunkConfig;
            _isMemoryChunk = isMemoryChunk;
            _lastActiveTime = DateTime.Now;
        }
        ~Chunk()
        {
            UnCacheFromMemory();
        }

        #endregion

        #region Factory Methods

        public static Chunk CreateNew(string filename, int chunkNumber, ChunkManager chunkManager, ChunkManagerConfig config, bool isMemoryChunk)
        {
            var chunk = new Chunk(filename, chunkManager, config, isMemoryChunk);

            try
            {
                chunk.InitNew(chunkNumber);
            }
            catch (OutOfMemoryException)
            {
                chunk.Dispose();
                throw;
            }
            catch (Exception ex)
            {
                QueueService.Log.Info(string.Format("Chunk {0} create failed.", chunk));
                XTrace.WriteException(ex);
                chunk.Dispose();
                throw;
            }

            return chunk;
        }
        public static Chunk FromCompletedFile(string filename, ChunkManager chunkManager, ChunkManagerConfig config, bool isMemoryChunk)
        {
            var chunk = new Chunk(filename, chunkManager, config, isMemoryChunk);

            try
            {
                chunk.InitCompleted();
            }
            catch (OutOfMemoryException)
            {
                chunk.Dispose();
                throw;
            }
            catch (Exception ex)
            {
                QueueService.Log.Info(string.Format("Chunk {0} init from completed file failed.", chunk));
                XTrace.WriteException(ex);
                chunk.Dispose();
                throw;
            }

            return chunk;
        }
        public static Chunk FromOngoingFile<T>(string filename, ChunkManager chunkManager, ChunkManagerConfig config, Func<byte[], T> readRecordFunc, bool isMemoryChunk) where T : ILogRecord
        {
            var chunk = new Chunk(filename, chunkManager, config, isMemoryChunk);

            try
            {
                chunk.InitOngoing(readRecordFunc);
            }
            catch (OutOfMemoryException)
            {
                chunk.Dispose();
                throw;
            }
            catch (Exception ex)
            {
                QueueService.Log.Info(string.Format("Chunk {0} init from ongoing file failed.", chunk));
                XTrace.WriteException(ex);
                chunk.Dispose();
                throw;
            }

            return chunk;
        }

        #endregion

        #region Init Methods

        private void InitCompleted()
        {
            var fileInfo = new FileInfo(_filename);
            if (!fileInfo.Exists)
            {
                throw new ChunkFileNotExistException(_filename);
            }

            _isCompleted = true;

            using (var fileStream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, _chunkConfig.ChunkReadBuffer, FileOptions.None))
            {
                using (var reader = new BinaryReader(fileStream))
                {
                    _chunkHeader = ReadHeader(fileStream, reader);
                    _chunkFooter = ReadFooter(fileStream, reader);

                    CheckCompletedFileChunk();
                }
            }

            _dataPosition = _chunkFooter.ChunkDataTotalSize;
            _flushedDataPosition = _chunkFooter.ChunkDataTotalSize;

            if (_isMemoryChunk)
            {
                LoadFileChunkToMemory();
            }
            else
            {
                SetFileAttributes();
            }

            InitializeReaderWorkItems();

            _lastActiveTime = DateTime.Now;
        }
        private void InitNew(int chunkNumber)
        {
            var chunkDataSize = 0;
            if (_chunkConfig.ChunkDataSize > 0)
            {
                chunkDataSize = _chunkConfig.ChunkDataSize;
            }
            else
            {
                chunkDataSize = _chunkConfig.ChunkDataUnitSize * _chunkConfig.ChunkDataCount;
            }

            _chunkHeader = new ChunkHeader(chunkNumber, chunkDataSize);

            _isCompleted = false;

            var fileSize = ChunkHeader.Size + _chunkHeader.ChunkDataTotalSize + ChunkFooter.Size;

            var writeStream = default(Stream);
            var tempFilename = string.Format("{0}.{1}.tmp", _filename, Guid.NewGuid());
            var tempFileStream = default(FileStream);

            try
            {
                if (_isMemoryChunk)
                {
                    _cachedLength = fileSize;
                    _cachedData = Marshal.AllocHGlobal(_cachedLength);
                    writeStream = new UnmanagedMemoryStream((byte*)_cachedData, _cachedLength, _cachedLength, FileAccess.ReadWrite);
                    writeStream.Write(_chunkHeader.AsByteArray(), 0, ChunkHeader.Size);
                }
                else
                {
                    var fileInfo = new FileInfo(_filename);
                    if (fileInfo.Exists)
                    {
                        File.SetAttributes(_filename, FileAttributes.Normal);
                        File.Delete(_filename);
                    }

                    tempFileStream = new FileStream(tempFilename, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.Read, _chunkConfig.ChunkWriteBuffer, FileOptions.None);
                    tempFileStream.SetLength(fileSize);
                    tempFileStream.Write(_chunkHeader.AsByteArray(), 0, ChunkHeader.Size);
                    tempFileStream.Flush(true);
                    tempFileStream.Close();

                    File.Move(tempFilename, _filename);

                    writeStream = new FileStream(_filename, FileMode.Open, FileAccess.ReadWrite, FileShare.Read, _chunkConfig.ChunkWriteBuffer, FileOptions.SequentialScan);
                    SetFileAttributes();
                }

                writeStream.Position = ChunkHeader.Size;

                _dataPosition = 0;
                _flushedDataPosition = 0;
                _writerWorkItem = new WriterWorkItem(new ChunkFileStream(writeStream, _chunkConfig.FlushOption));

                InitializeReaderWorkItems();

                if (!_isMemoryChunk)
                {
                    if (_chunkConfig.EnableCache)
                    {
                        var chunkSize = (ulong)GetChunkSize(_chunkHeader);
                        if (ChunkUtil.IsMemoryEnoughToCacheChunk(chunkSize, (uint)_chunkConfig.ChunkCacheMaxPercent))
                        {
                            try
                            {
                                _memoryChunk = CreateNew(_filename, chunkNumber, _chunkManager, _chunkConfig, true);
                            }
                            catch (OutOfMemoryException)
                            {
                                _cacheItems = new CacheItem[_chunkConfig.ChunkLocalCacheSize];
                            }
                            catch (Exception ex)
                            {
                                Log.Error(string.Format("Failed to cache new chunk {0}", this), ex);
                                _cacheItems = new CacheItem[_chunkConfig.ChunkLocalCacheSize];
                            }
                        }
                        else
                        {
                            _cacheItems = new CacheItem[_chunkConfig.ChunkLocalCacheSize];
                        }
                    }
                    else
                    {
                        _cacheItems = new CacheItem[_chunkConfig.ChunkLocalCacheSize];
                    }
                }
            }
            catch
            {
                if (!_isMemoryChunk)
                {
                    if (tempFileStream != null)
                    {
                        Helper.EatException(() => tempFileStream.Close());
                    }
                    if (File.Exists(tempFilename))
                    {
                        Helper.EatException(() =>
                        {
                            File.SetAttributes(tempFilename, FileAttributes.Normal);
                            File.Delete(tempFilename);
                        });
                    }
                }
                throw;
            }

            _lastActiveTime = DateTime.Now;
        }
        private void InitOngoing<T>(Func<byte[], T> readRecordFunc) where T : ILogRecord
        {
            var fileInfo = new FileInfo(_filename);
            if (!fileInfo.Exists)
            {
                throw new ChunkFileNotExistException(_filename);
            }

            _isCompleted = false;

            if (!TryParsingDataPosition(readRecordFunc, out _chunkHeader, out _dataPosition))
            {
                throw new ChunkBadDataException(string.Format("Failed to parse chunk data, chunk file: {0}", _filename));
            }

            _flushedDataPosition = _dataPosition;

            var writeStream = default(Stream);

            if (_isMemoryChunk)
            {
                var fileSize = ChunkHeader.Size + _chunkHeader.ChunkDataTotalSize + ChunkFooter.Size;
                _cachedLength = fileSize;
                _cachedData = Marshal.AllocHGlobal(_cachedLength);
                writeStream = new UnmanagedMemoryStream((byte*)_cachedData, _cachedLength, _cachedLength, FileAccess.ReadWrite);

                writeStream.Write(_chunkHeader.AsByteArray(), 0, ChunkHeader.Size);

                if (_dataPosition > 0)
                {
                    using (var fileStream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 8192, FileOptions.SequentialScan))
                    {
                        fileStream.Seek(ChunkHeader.Size, SeekOrigin.Begin);
                        var buffer = new byte[65536];
                        int toReadBytes = _dataPosition;

                        while (toReadBytes > 0)
                        {
                            int read = fileStream.Read(buffer, 0, Math.Min(toReadBytes, buffer.Length));
                            if (read == 0)
                            {
                                break;
                            }
                            toReadBytes -= read;
                            writeStream.Write(buffer, 0, read);
                        }
                    }
                }

                if (writeStream.Position != GetStreamPosition(_dataPosition))
                {
                    throw new InvalidOperationException(string.Format("UnmanagedMemoryStream position incorrect, expect: {0}, but: {1}", _dataPosition + ChunkHeader.Size, writeStream.Position));
                }
            }
            else
            {
                writeStream = new FileStream(_filename, FileMode.Open, FileAccess.ReadWrite, FileShare.Read, _chunkConfig.ChunkWriteBuffer, FileOptions.SequentialScan);
                writeStream.Position = GetStreamPosition(_dataPosition);
                SetFileAttributes();
            }

            _writerWorkItem = new WriterWorkItem(new ChunkFileStream(writeStream, _chunkConfig.FlushOption));

            InitializeReaderWorkItems();

            if (!_isMemoryChunk)
            {
                if (_chunkConfig.EnableCache)
                {
                    var chunkSize = (ulong)GetChunkSize(_chunkHeader);
                    if (ChunkUtil.IsMemoryEnoughToCacheChunk(chunkSize, (uint)_chunkConfig.ChunkCacheMaxPercent))
                    {
                        try
                        {
                            _memoryChunk = FromOngoingFile(_filename, _chunkManager, _chunkConfig, readRecordFunc, true);
                        }
                        catch (OutOfMemoryException)
                        {
                            _cacheItems = new CacheItem[_chunkConfig.ChunkLocalCacheSize];
                        }
                        catch (Exception ex)
                        {
                            Log.Error(string.Format("Failed to cache ongoing chunk {0}", this), ex);
                            _cacheItems = new CacheItem[_chunkConfig.ChunkLocalCacheSize];
                        }
                    }
                    else
                    {
                        _cacheItems = new CacheItem[_chunkConfig.ChunkLocalCacheSize];
                    }
                }
                else
                {
                    _cacheItems = new CacheItem[_chunkConfig.ChunkLocalCacheSize];
                }
            }

            _lastActiveTime = DateTime.Now;

            if (!_isMemoryChunk)
            {
                Log.Info("Ongoing chunk {0} initialized, _dataPosition: {1}", this, _dataPosition);
            }
        }

        #endregion

        #region Public Methods

        public bool TryCacheInMemory(bool shouldCacheNextChunk)
        {
            lock (_cacheSyncObj)
            {
                if (!_chunkConfig.EnableCache || _isMemoryChunk || !_isCompleted || _memoryChunk != null)
                {
                    _cachingChunk = 0;
                    return false;
                }

                try
                {
                    var chunkSize = (ulong)GetChunkSize(_chunkHeader);
                    if (!ChunkUtil.IsMemoryEnoughToCacheChunk(chunkSize, (uint)_chunkConfig.ChunkCacheMaxPercent))
                    {
                        return false;
                    }
                    _memoryChunk = FromCompletedFile(_filename, _chunkManager, _chunkConfig, true);
                    if (shouldCacheNextChunk)
                    {
                        Task.Factory.StartNew(() => _chunkManager.TryCacheNextChunk(this));
                    }
                    return true;
                }
                catch (OutOfMemoryException) { return false; }
                catch (Exception ex)
                {
                    Log.Error(string.Format("Failed to cache completed chunk {0}", this), ex);
                    return false;
                }
                finally
                {
                    _cachingChunk = 0;
                }
            }
        }
        public bool UnCacheFromMemory()
        {
            lock (_cacheSyncObj)
            {
                if (!_chunkConfig.EnableCache || _isMemoryChunk || !_isCompleted || _memoryChunk == null)
                {
                    return false;
                }

                try
                {
                    var memoryChunk = _memoryChunk;
                    _memoryChunk = null;
                    memoryChunk.Dispose();
                    return true;
                }
                catch (Exception ex)
                {
                    Log.Error(string.Format("Failed to uncache completed chunk {0}", this), ex);
                    return false;
                }
            }
        }
        public T TryReadAt<T>(long dataPosition, Func<byte[], T> readRecordFunc, bool autoCache = true) where T : class, ILogRecord
        {
            if (_isDestroying)
            {
                throw new ChunkReadException(string.Format("Chunk {0} is being deleting.", this));
            }

            _lastActiveTime = DateTime.Now;

            if (!_isMemoryChunk)
            {
                if (_cacheItems != null)
                {
                    var index = dataPosition % _chunkConfig.ChunkLocalCacheSize;
                    var cacheItem = _cacheItems[index];
                    if (cacheItem != null && cacheItem.RecordPosition == dataPosition)
                    {
                        var record = readRecordFunc(cacheItem.RecordBuffer);
                        if (record == null)
                        {
                            throw new ChunkReadException(
                                string.Format("Cannot read a record from data position {0}. Something is seriously wrong in chunk {1}.",
                                              dataPosition, this));
                        }
                        if (_chunkConfig.EnableChunkStatistic)
                        {
                            _chunkManager.AddCachedReadCount(ChunkHeader.ChunkNumber);
                        }
                        return record;
                    }
                }
                else if (_memoryChunk != null)
                {
                    var record = _memoryChunk.TryReadAt(dataPosition, readRecordFunc);
                    if (record != null && _chunkConfig.EnableChunkStatistic)
                    {
                        _chunkManager.AddUnmanagedReadCount(ChunkHeader.ChunkNumber);
                    }
                    return record;
                }
            }

            if (_chunkConfig.EnableCache && autoCache && !_isMemoryChunk && _isCompleted && Interlocked.CompareExchange(ref _cachingChunk, 1, 0) == 0)
            {
                Task.Factory.StartNew(() => TryCacheInMemory(true));
            }

            var readerWorkItem = GetReaderWorkItem();
            try
            {
                var currentDataPosition = DataPosition;
                if (dataPosition >= currentDataPosition)
                {
                    return null;
                }

                try
                {
                    var record = IsFixedDataSize() ?
                        TryReadFixedSizeForwardInternal(readerWorkItem, dataPosition, readRecordFunc) :
                        TryReadForwardInternal(readerWorkItem, dataPosition, readRecordFunc);
                    if (!_isMemoryChunk && _chunkConfig.EnableChunkStatistic)
                    {
                        _chunkManager.AddFileReadCount(ChunkHeader.ChunkNumber);
                    }
                    return record;
                }
                catch
                {
                    if (!_isMemoryChunk && _writerWorkItem != null && _writerWorkItem.LastFlushedPosition < GetStreamPosition(_dataPosition))
                    {
                        return null;
                    }
                    else
                    {
                        throw;
                    }
                }
            }
            finally
            {
                ReturnReaderWorkItem(readerWorkItem);
            }
        }
        public RecordWriteResult TryAppend(ILogRecord record)
        {
            if (_isCompleted)
            {
                throw new ChunkWriteException(this.ToString(), string.Format("Cannot write to a read-only chunk, isMemoryChunk: {0}, _dataPosition: {1}", _isMemoryChunk, _dataPosition));
            }

            _lastActiveTime = DateTime.Now;

            var writerWorkItem = _writerWorkItem;
            var bufferStream = writerWorkItem.BufferStream;
            var bufferWriter = writerWorkItem.BufferWriter;
            var recordBuffer = default(byte[]);

            if (IsFixedDataSize())
            {
                if (writerWorkItem.WorkingStream.Position + _chunkConfig.ChunkDataUnitSize > ChunkHeader.Size + _chunkHeader.ChunkDataTotalSize)
                {
                    return RecordWriteResult.NotEnoughSpace();
                }
                bufferStream.Position = 0;
                record.WriteTo(GlobalDataPosition, bufferWriter);
                var recordLength = (int)bufferStream.Length;
                if (recordLength != _chunkConfig.ChunkDataUnitSize)
                {
                    throw new ChunkWriteException(this.ToString(), string.Format("Invalid fixed data length, expected length {0}, but was {1}", _chunkConfig.ChunkDataUnitSize, recordLength));
                }

                if (_cacheItems != null)
                {
                    recordBuffer = new byte[recordLength];
                    Buffer.BlockCopy(bufferStream.GetBuffer(), 0, recordBuffer, 0, recordLength);
                }
            }
            else
            {
                bufferStream.SetLength(4);
                bufferStream.Position = 4;
                record.WriteTo(GlobalDataPosition, bufferWriter);
                var recordLength = (int)bufferStream.Length - 4;
                bufferWriter.Write(recordLength); // write record length suffix
                bufferStream.Position = 0;
                bufferWriter.Write(recordLength); // write record length prefix

                if (recordLength > _chunkConfig.MaxLogRecordSize)
                {
                    throw new ChunkWriteException(this.ToString(),
                        string.Format("Log record at data position {0} has too large length: {1} bytes, while limit is {2} bytes",
                                      _dataPosition, recordLength, _chunkConfig.MaxLogRecordSize));
                }

                if (writerWorkItem.WorkingStream.Position + recordLength + 2 * sizeof(int) > ChunkHeader.Size + _chunkHeader.ChunkDataTotalSize)
                {
                    return RecordWriteResult.NotEnoughSpace();
                }

                if (_cacheItems != null)
                {
                    recordBuffer = new byte[recordLength];
                    Buffer.BlockCopy(bufferStream.GetBuffer(), 4, recordBuffer, 0, recordLength);
                }
            }

            var writtenPosition = _dataPosition;
            var buffer = bufferStream.GetBuffer();

            lock (_writeSyncObj)
            {
                writerWorkItem.AppendData(buffer, 0, (int)bufferStream.Length);
            }

            _dataPosition = (int)writerWorkItem.WorkingStream.Position - ChunkHeader.Size;

            var position = ChunkHeader.ChunkDataStartPosition + writtenPosition;

            if (_chunkConfig.EnableCache)
            {
                if (_memoryChunk != null)
                {
                    var result = _memoryChunk.TryAppend(record);
                    if (!result.Success)
                    {
                        throw new ChunkWriteException(this.ToString(), "Append record to file chunk success, but append to memory chunk failed as memory space not enough, this should not be happened.");
                    }
                    else if (result.Position != position)
                    {
                        throw new ChunkWriteException(this.ToString(), string.Format("Append record to file chunk success, and append to memory chunk success, but the position is not equal, memory chunk write position: {0}, file chunk write position: {1}.", result.Position, position));
                    }
                }
                else if (_cacheItems != null && recordBuffer != null)
                {
                    var index = writtenPosition % _chunkConfig.ChunkLocalCacheSize;
                    _cacheItems[index] = new CacheItem { RecordPosition = writtenPosition, RecordBuffer = recordBuffer };
                }
            }
            else if (_cacheItems != null && recordBuffer != null)
            {
                var index = writtenPosition % _chunkConfig.ChunkLocalCacheSize;
                _cacheItems[index] = new CacheItem { RecordPosition = writtenPosition, RecordBuffer = recordBuffer };
            }

            if (!_isMemoryChunk && _chunkConfig.EnableChunkStatistic)
            {
                _chunkManager.AddWriteBytes(ChunkHeader.ChunkNumber, (int)bufferStream.Length);
            }

            return RecordWriteResult.Successful(position);
        }
        public void Flush()
        {
            if (_isMemoryChunk || _isCompleted) return;
            if (_writerWorkItem != null)
            {
                Helper.EatException(() => _writerWorkItem.FlushToDisk());
            }
        }
        public void Complete()
        {
            lock (_writeSyncObj)
            {
                if (_isCompleted) return;

                _chunkFooter = WriteFooter();
                if (!_isMemoryChunk)
                {
                    Flush();
                }

                _isCompleted = true;

                if (_writerWorkItem != null)
                {
                    Helper.EatException(() => _writerWorkItem.Dispose());
                    _writerWorkItem = null;
                }

                if (!_isMemoryChunk)
                {
                    if (_cacheItems != null)
                    {
                        _cacheItems = null;
                    }

                    SetFileAttributes();
                    if (_memoryChunk != null)
                    {
                        _memoryChunk.Complete();
                    }
                }
            }
        }
        public void Dispose()
        {
            Close();
        }
        public void Close()
        {
            lock (_writeSyncObj)
            {
                if (!_isCompleted)
                {
                    Flush();
                }

                if (_writerWorkItem != null)
                {
                    Helper.EatException(() => _writerWorkItem.Dispose());
                    _writerWorkItem = null;
                }

                if (!_isMemoryChunk)
                {
                    if (_cacheItems != null)
                    {
                        _cacheItems = null;
                    }
                }
                CloseAllReaderWorkItems();
                FreeMemory();
            }
        }
        public void Destroy()
        {
            if (_isMemoryChunk)
            {
                FreeMemory();
                return;
            }

            //检查当前chunk是否已完成
            if (!_isCompleted)
            {
                throw new InvalidOperationException(string.Format("Not allowed to delete a incompleted chunk {0}", this));
            }

            //首先设置删除标记
            _isDestroying = true;

            if (_cacheItems != null)
            {
                _cacheItems = null;
            }

            //释放缓存的内存
            UnCacheFromMemory();

            //关闭所有的ReaderWorkItem
            CloseAllReaderWorkItems();

            //删除Chunk文件
            File.SetAttributes(_filename, FileAttributes.Normal);
            File.Delete(_filename);
        }

        #endregion

        #region Helper Methods

        private void CheckCompletedFileChunk()
        {
            using (var fileStream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, _chunkConfig.ChunkReadBuffer, FileOptions.None))
            {
                //检查Chunk文件的实际大小是否正确
                var chunkFileSize = ChunkHeader.Size + _chunkFooter.ChunkDataTotalSize + ChunkFooter.Size;
                if (chunkFileSize != fileStream.Length)
                {
                    throw new ChunkBadDataException(
                        string.Format("The size of chunk {0} should be equals with fileStream's length {1}, but instead it was {2}.",
                                        this,
                                        fileStream.Length,
                                        chunkFileSize));
                }

                //如果Chunk中的数据是固定大小的,则还需要检查数据总数是否正确
                if (IsFixedDataSize())
                {
                    if (_chunkFooter.ChunkDataTotalSize != _chunkHeader.ChunkDataTotalSize)
                    {
                        throw new ChunkBadDataException(
                            string.Format("For fixed-size chunk, the total data size of chunk {0} should be {1}, but instead it was {2}.",
                                            this,
                                            _chunkHeader.ChunkDataTotalSize,
                                            _chunkFooter.ChunkDataTotalSize));
                    }
                }
            }
        }
        private void LoadFileChunkToMemory()
        {
            using (var fileStream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, 8192, FileOptions.None))
            {
                var cachedLength = (int)fileStream.Length;
                var cachedData = Marshal.AllocHGlobal(cachedLength);

                try
                {
                    using (var unmanagedStream = new UnmanagedMemoryStream((byte*)cachedData, cachedLength, cachedLength, FileAccess.ReadWrite))
                    {
                        fileStream.Seek(0, SeekOrigin.Begin);
                        var buffer = new byte[65536];
                        int toRead = cachedLength;
                        while (toRead > 0)
                        {
                            int read = fileStream.Read(buffer, 0, Math.Min(toRead, buffer.Length));
                            if (read == 0)
                            {
                                break;
                            }
                            toRead -= read;
                            unmanagedStream.Write(buffer, 0, read);
                        }
                    }
                }
                catch
                {
                    Marshal.FreeHGlobal(cachedData);
                    throw;
                }

                _cachedData = cachedData;
                _cachedLength = cachedLength;
            }
        }
        private void FreeMemory()
        {
            if (_isMemoryChunk && !_isMemoryFreed)
            {
                lock (_freeMemoryObj)
                {
                    var cachedData = Interlocked.Exchange(ref _cachedData, IntPtr.Zero);
                    if (cachedData != IntPtr.Zero)
                    {
                        try
                        {
                            Marshal.FreeHGlobal(cachedData);
                        }
                        catch (Exception ex)
                        {
                            Log.Error(string.Format("Failed to free memory of chunk {0}", this), ex);
                        }
                    }
                    _isMemoryFreed = true;
                }
            }
        }

        private void InitializeReaderWorkItems()
        {
            for (var i = 0; i < _chunkConfig.ChunkReaderCount; i++)
            {
                _readerWorkItemQueue.Enqueue(CreateReaderWorkItem());
            }
            _isReadersInitialized = true;
        }
        private void CloseAllReaderWorkItems()
        {
            if (!_isReadersInitialized) return;

            var watch = Stopwatch.StartNew();
            var closedCount = 0;

            while (closedCount < _chunkConfig.ChunkReaderCount)
            {
                ReaderWorkItem readerWorkItem;
                while (_readerWorkItemQueue.TryDequeue(out readerWorkItem))
                {
                    readerWorkItem.Reader.Close();
                    closedCount++;
                }

                if (closedCount >= _chunkConfig.ChunkReaderCount)
                {
                    break;
                }

                Thread.Sleep(1000);

                if (watch.ElapsedMilliseconds > 30 * 1000)
                {
                    Log.Error("Close chunk reader work items timeout, expect close count: {0}, real close count: {1}", _chunkConfig.ChunkReaderCount, closedCount);
                    break;
                }
            }
        }
        private ReaderWorkItem CreateReaderWorkItem()
        {
            var stream = default(Stream);
            if (_isMemoryChunk)
            {
                stream = new UnmanagedMemoryStream((byte*)_cachedData, _cachedLength);
            }
            else
            {
                stream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, _chunkConfig.ChunkReadBuffer, FileOptions.None);
            }
            return new ReaderWorkItem(stream, new BinaryReader(stream));
        }
        private ReaderWorkItem GetReaderWorkItem()
        {
            ReaderWorkItem readerWorkItem;
            while (!_readerWorkItemQueue.TryDequeue(out readerWorkItem))
            {
                Thread.Sleep(1);
            }
            return readerWorkItem;
        }
        private void ReturnReaderWorkItem(ReaderWorkItem readerWorkItem)
        {
            _readerWorkItemQueue.Enqueue(readerWorkItem);
        }

        private ChunkFooter WriteFooter()
        {
            var currentTotalDataSize = DataPosition;

            //如果是固定大小的数据,则检查总数据大小是否正确
            if (IsFixedDataSize())
            {
                if (currentTotalDataSize != _chunkHeader.ChunkDataTotalSize)
                {
                    throw new ChunkCompleteException(string.Format("Cannot write the chunk footer as the current total data size is incorrect. chunk: {0}, expectTotalDataSize: {1}, currentTotalDataSize: {2}",
                        this,
                        _chunkHeader.ChunkDataTotalSize,
                        currentTotalDataSize));
                }
            }

            var workItem = _writerWorkItem;
            var footer = new ChunkFooter(currentTotalDataSize);

            workItem.AppendData(footer.AsByteArray(), 0, ChunkFooter.Size);

            Flush(); // trying to prevent bug with resized file, but no data in it

            var oldStreamLength = workItem.WorkingStream.Length;
            var newStreamLength = ChunkHeader.Size + currentTotalDataSize + ChunkFooter.Size;

            if (newStreamLength != oldStreamLength)
            {
                workItem.ResizeStream(newStreamLength);
            }

            return footer;
        }
        private ChunkHeader ReadHeader(FileStream stream, BinaryReader reader)
        {
            if (stream.Length < ChunkHeader.Size)
            {
                throw new Exception(string.Format("Chunk file '{0}' is too short to even read ChunkHeader, its size is {1} bytes.", _filename, stream.Length));
            }
            stream.Seek(0, SeekOrigin.Begin);
            return ChunkHeader.FromStream(reader, stream);
        }
        private ChunkFooter ReadFooter(FileStream stream, BinaryReader reader)
        {
            if (stream.Length < ChunkFooter.Size)
            {
                throw new Exception(string.Format("Chunk file '{0}' is too short to even read ChunkFooter, its size is {1} bytes.", _filename, stream.Length));
            }
            stream.Seek(-ChunkFooter.Size, SeekOrigin.End);
            return ChunkFooter.FromStream(reader, stream);
        }
        private int GetChunkSize(ChunkHeader chunkHeader)
        {
            return ChunkHeader.Size + chunkHeader.ChunkDataTotalSize + ChunkFooter.Size;
        }

        private T TryReadForwardInternal<T>(ReaderWorkItem readerWorkItem, long dataPosition, Func<byte[], T> readRecordFunc) where T : ILogRecord
        {
            lock (_freeMemoryObj)
            {
                if (_isMemoryFreed)
                {
                    return default(T);
                }
                var currentDataPosition = DataPosition;

                if (dataPosition + 2 * sizeof(int) > currentDataPosition)
                {
                    throw new ChunkReadException(
                        string.Format("No enough space even for length prefix and suffix, data position: {0}, max data position: {1}, chunk: {2}",
                                      dataPosition, currentDataPosition, this));
                }

                readerWorkItem.Stream.Position = GetStreamPosition(dataPosition);

                var length = readerWorkItem.Reader.ReadInt32();
                if (length <= 0)
                {
                    throw new ChunkReadException(
                        string.Format("Log record at data position {0} has non-positive length: {1} in chunk {2}",
                                      dataPosition, length, this));
                }
                if (length > _chunkConfig.MaxLogRecordSize)
                {
                    throw new ChunkReadException(
                        string.Format("Log record at data position {0} has too large length: {1} bytes, while limit is {2} bytes, in chunk {3}",
                                      dataPosition, length, _chunkConfig.MaxLogRecordSize, this));
                }
                if (dataPosition + length + 2 * sizeof(int) > currentDataPosition)
                {
                    throw new ChunkReadException(
                        string.Format("There is not enough space to read full record (length prefix: {0}), data position: {1}, max data position: {2}, chunk: {3}",
                                      length, dataPosition, currentDataPosition, this));
                }

                var recordBuffer = readerWorkItem.Reader.ReadBytes(length);
                var record = readRecordFunc(recordBuffer);
                if (record == null)
                {
                    throw new ChunkReadException(
                        string.Format("Cannot read a record from data position {0}. Something is seriously wrong in chunk {1}.",
                                      dataPosition, this));
                }

                int suffixLength = readerWorkItem.Reader.ReadInt32();
                if (suffixLength != length)
                {
                    throw new ChunkReadException(
                        string.Format("Prefix/suffix length inconsistency: prefix length({0}) != suffix length ({1}), data position: {2}. Something is seriously wrong in chunk {3}.",
                                      length, suffixLength, dataPosition, this));
                }

                return record;
            }
        }
        private T TryReadFixedSizeForwardInternal<T>(ReaderWorkItem readerWorkItem, long dataPosition, Func<byte[], T> readRecordFunc) where T : ILogRecord
        {
            lock (_freeMemoryObj)
            {
                if (_isMemoryFreed)
                {
                    return default(T);
                }
                var currentDataPosition = DataPosition;

                if (dataPosition + _chunkConfig.ChunkDataUnitSize > currentDataPosition)
                {
                    throw new ChunkReadException(
                        string.Format("No enough space for fixed data record, data position: {0}, max data position: {1}, chunk: {2}",
                                      dataPosition, currentDataPosition, this));
                }

                var startStreamPosition = GetStreamPosition(dataPosition);
                readerWorkItem.Stream.Position = startStreamPosition;

                var recordBuffer = readerWorkItem.Reader.ReadBytes(_chunkConfig.ChunkDataUnitSize);
                var record = readRecordFunc(recordBuffer);
                if (record == null)
                {
                    throw new ChunkReadException(
                            string.Format("Read fixed record from data position: {0} failed, max data position: {1}. Something is seriously wrong in chunk {2}",
                                          dataPosition, currentDataPosition, this));
                }

                var recordLength = readerWorkItem.Stream.Position - startStreamPosition;
                if (recordLength != _chunkConfig.ChunkDataUnitSize)
                {
                    throw new ChunkReadException(
                            string.Format("Invalid fixed record length, expected length {0}, but was {1}, dataPosition: {2}. Something is seriously wrong in chunk {3}",
                                          _chunkConfig.ChunkDataUnitSize, recordLength, dataPosition, this));
                }

                return record;
            }
        }

        private bool TryParsingDataPosition<T>(Func<byte[], T> readRecordFunc, out ChunkHeader chunkHeader, out int dataPosition) where T : ILogRecord
        {
            using (var fileStream = new FileStream(_filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, _chunkConfig.ChunkReadBuffer, FileOptions.None))
            {
                using (var reader = new BinaryReader(fileStream))
                {
                    chunkHeader = ReadHeader(fileStream, reader);

                    fileStream.Position = ChunkHeader.Size;

                    var startStreamPosition = fileStream.Position;
                    var maxStreamPosition = fileStream.Length - ChunkFooter.Size;
                    var isFixedDataSize = IsFixedDataSize();

                    while (fileStream.Position < maxStreamPosition)
                    {
                        var success = false;
                        if (isFixedDataSize)
                        {
                            success = TryReadFixedSizeRecord(fileStream, reader, maxStreamPosition, readRecordFunc);
                        }
                        else
                        {
                            success = TryReadRecord(fileStream, reader, maxStreamPosition, readRecordFunc);
                        }

                        if (success)
                        {
                            startStreamPosition = fileStream.Position;
                        }
                        else
                        {
                            break;
                        }
                    }

                    if (startStreamPosition != fileStream.Position)
                    {
                        fileStream.Position = startStreamPosition;
                    }

                    dataPosition = (int)fileStream.Position - ChunkHeader.Size;

                    return true;
                }
            }
        }
        private bool TryReadRecord<T>(FileStream stream, BinaryReader reader, long maxStreamPosition, Func<byte[], T> readRecordFunc) where T : ILogRecord
        {
            try
            {
                var startStreamPosition = stream.Position;
                if (startStreamPosition + 2 * sizeof(int) > maxStreamPosition)
                {
                    return false;
                }

                var length = reader.ReadInt32();
                if (length <= 0 || length > _chunkConfig.MaxLogRecordSize)
                {
                    return false;
                }
                if (startStreamPosition + length + 2 * sizeof(int) > maxStreamPosition)
                {
                    return false;
                }

                var recordBuffer = reader.ReadBytes(length);
                var record = readRecordFunc(recordBuffer);
                if (record == null)
                {
                    return false;
                }

                int suffixLength = reader.ReadInt32();
                if (suffixLength != length)
                {
                    return false;
                }

                return true;
            }
            catch
            {
                return false;
            }
        }
        private bool TryReadFixedSizeRecord<T>(FileStream stream, BinaryReader reader, long maxStreamPosition, Func<byte[], T> readRecordFunc) where T : ILogRecord
        {
            try
            {
                var startStreamPosition = stream.Position;
                if (startStreamPosition + _chunkConfig.ChunkDataUnitSize > maxStreamPosition)
                {
                    return false;
                }

                var recordBuffer = reader.ReadBytes(_chunkConfig.ChunkDataUnitSize);
                var record = readRecordFunc(recordBuffer);
                if (record == null)
                {
                    return false;
                }

                var recordLength = stream.Position - startStreamPosition;
                if (recordLength != _chunkConfig.ChunkDataUnitSize)
                {
                    Log.Error("Invalid fixed data length, expected length {0}, but was {1}", _chunkConfig.ChunkDataUnitSize, recordLength);
                    return false;
                }

                return true;
            }
            catch
            {
                return false;
            }
        }
        private static long GetStreamPosition(long dataPosition)
        {
            return ChunkHeader.Size + dataPosition;
        }

        private void SetFileAttributes()
        {
            Helper.EatException(() => File.SetAttributes(_filename, FileAttributes.NotContentIndexed));
        }

        #endregion

        #region 日志
        /// <summary>日志</summary>
        public ILog Log { get; set; } = Logger.Null;
        #endregion

        class CacheItem
        {
            public long RecordPosition;
            public byte[] RecordBuffer;
        }
        class ChunkFileStream : IStream
        {
            public Stream Stream;
            public FlushOption FlushOption;

            public ChunkFileStream(Stream stream, FlushOption flushOption)
            {
                Stream = stream;
                FlushOption = flushOption;
            }

            public long Length
            {
                get
                {
                    return Stream.Length;
                }
            }

            public long Position
            {
                get
                {
                    return Stream.Position;
                }

                set
                {
                    Stream.Position = value;
                }
            }

            public void Dispose()
            {
                Stream.Dispose();
            }

            public void Flush()
            {
                var fileStream = Stream as FileStream;
                if (fileStream != null)
                {
                    if (FlushOption == FlushOption.FlushToDisk)
                    {
                        fileStream.Flush(true);
                    }
                    else
                    {
                        fileStream.Flush();
                    }
                }
                else
                {
                    Stream.Flush();
                }
            }

            public void SetLength(long value)
            {
                Stream.SetLength(value);
            }

            public void Write(byte[] buffer, int offset, int count)
            {
                Stream.Write(buffer, offset, count);
            }
        }
        public override string ToString()
        {
            return string.Format("({0}-#{1})", _chunkManager.Name, _chunkHeader.ChunkNumber);
        }
    }
}