v10.10.2024.0701 使用IJsonHost改进Json序列化
大石头 编写于 2024-07-01 08:36:34 大石头 提交于 2024-07-01 08:48:33
X
using System;
using System.Diagnostics;
using System.IO;
using System.Threading;

namespace NewLife.IO
{
    /// <summary>读写流,继承自内存流,读写指针分开</summary>
    /// <remarks>
    /// 注意资源锁,读写不可同时进行,会出现抢锁的情况。
    /// </remarks>
    public class ReadWriteMemoryStream : MemoryStream
    {
        #region 属性
        private Int32 _ReadTimeout = Timeout.Infinite;
        /// <summary>获取或设置一个值(以毫秒为单位),该值确定流在超时前尝试读取多长时间。</summary>
        public override Int32 ReadTimeout { get { return _ReadTimeout; } set { _ReadTimeout = value; } }

        /// <summary>写位置</summary>
        public Int64 PositionForWrite { get; set; }

        /// <summary>最大长度,超过次长度时清空缓冲区</summary>
        public Int64 MaxLength { get; set; } = 1024 * 1024;

        private AutoResetEvent dataArrived = new AutoResetEvent(false);
        #endregion

        #region 扩展属性
        /// <summary>可用数据</summary>
        public Int64 AvailableData { get { return PositionForWrite - Position; } }
        #endregion

        #region 方法
        /// <summary>已重载。</summary>
        /// <param name="offset">偏移</param>
        /// <param name="loc"></param>
        /// <returns></returns>
        public long SeekForWrite(long offset, SeekOrigin loc)
        {
            Int64 r = 0;
            lock (rwLock)
            {
                Int64 p = Position;
                Position = PositionForWrite;

                r = base.Seek(offset, loc);

                PositionForWrite = Position;
                Position = p;
            }
            return r;
        }

        /// <summary>重设长度,</summary>
        void ResetLength()
        {
            // 写入指针必须超过最大长度
            if (PositionForWrite < MaxLength) return;
            //Int64 pos = Math.Min(Position, PositionForWrite);
            Int64 pos = Position;
            // 必须有剩余数据空间,并且剩余空间不能太小
            if (pos <= MaxLength / 2) return;

            Console.WriteLine("前移 {0}", pos);

            // 移动数据
            Byte[] buffer = GetBuffer();
            for (int i = 0; i < Length - pos; i++)
            {
                buffer[i] = buffer[pos + i];
            }

            SetLength(Length - pos);

            Position = 0;
            PositionForWrite -= pos;
        }
        #endregion

        #region 重载
        private Object rwLock = new Object();
        /// <summary>已重载。</summary>
        /// <param name="buffer">缓冲区</param>
        /// <param name="offset">偏移</param>
        /// <param name="count">数量</param>
        /// <returns></returns>
        public override int Read(byte[] buffer, int offset, int count)
        {
            Int32 rs = 0;
            var sw = new Stopwatch();
            sw.Start();
            while (rs <= 0) rs = ReadEx(buffer, offset, count, sw);
            return rs;
        }

        Int32 ReadEx(byte[] buffer, int offset, int count, Stopwatch sw)
        {
            // 如果没有数据
            if (PositionForWrite <= Position) CheckReadTimeout(sw);
            // 即使得到事件量,也未必能读到值,因为可能在多线程里面,数据被别的线程读走了
            // 这种情况下,本线程就需要继续等

            lock (rwLock)
            {
                return base.Read(buffer, offset, count);
            }
        }

        /// <summary>已重载。</summary>
        /// <returns></returns>
        public override int ReadByte()
        {
            Int32 rs = -1;
            var sw = new Stopwatch();
            sw.Start();
            while (rs <= -1) rs = ReadByteEx(sw);
            return rs;
        }

        Int32 ReadByteEx(Stopwatch sw)
        {
            // 如果没有数据
            if (PositionForWrite <= Position) CheckReadTimeout(sw);
            // 即使得到事件量,也未必能读到值,因为可能在多线程里面,数据被别的线程读走了
            // 这种情况下,本线程就需要继续等

            lock (rwLock)
            {
                return base.ReadByte();
            }
        }

        void CheckReadTimeout(Stopwatch sw)
        {
            if (PositionForWrite <= Position)
            {
                while (PositionForWrite <= Position)
                {
                    if (!dataArrived.WaitOne(ReadTimeout - (Int32)sw.ElapsedMilliseconds)) throw new TimeoutException();

                    if (ReadTimeout > 0 && sw.ElapsedMilliseconds >= ReadTimeout) throw new TimeoutException();
                }
            }
        }

        /// <summary>已重载。</summary>
        /// <param name="buffer">缓冲区</param>
        /// <param name="offset">偏移</param>
        /// <param name="count">数量</param>
        public override void Write(byte[] buffer, int offset, int count)
        {
            lock (rwLock)
            {
                Int64 p = Position;
                Position = PositionForWrite;

                base.Write(buffer, offset, count);

                PositionForWrite = Position;
                Position = p;

                if (PositionForWrite >= MaxLength) ResetLength();
            }
            dataArrived.Set();
        }

        /// <summary>已重载。</summary>
        /// <param name="value">数值</param>
        public override void WriteByte(byte value)
        {
            lock (rwLock)
            {
                Int64 p = Position;
                Position = PositionForWrite;

                base.WriteByte(value);

                PositionForWrite = Position;
                Position = p;

                if (PositionForWrite >= MaxLength) ResetLength();
            }
            dataArrived.Set();
        }

        /// <summary>资源释放,关闭事件量</summary>
        /// <param name="disposing"></param>
        protected override void Dispose(bool disposing)
        {
            dataArrived.Close();

            base.Dispose(disposing);
        }
        #endregion
    }
}