粘包初步测试通过。 返回的数据包出现乱序,可能是服务端并行处理的问题
nnhy authored at 2015-03-27 00:32:06
5.60 KiB
X
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;

namespace NewLife.Net
{
    /// <summary>消息包数据流。以包的形式读写数据流,解决粘包问题</summary>
    public class PacketStream : Stream
    {
        #region 属性
        private Int32 _Size;
        /// <summary>包大小</summary>
        public Int32 Size { get { return _Size; } set { _Size = value; } }

        private Int32 _ReadPosition;
        /// <summary>_ms中当前数据包开始的位置</summary>
        private Int32 ReadPosition { get { return _ReadPosition; } set { _ReadPosition = value; } }

        Stream _s;
        MemoryStream _ms = new MemoryStream();
        #endregion

        #region 构造
        /// <summary>构造一个消息包数据流</summary>
        /// <param name="stream"></param>
        public PacketStream(Stream stream) { _s = stream; }
        #endregion

        #region 写入
        /// <summary>写入</summary>
        /// <param name="buffer"></param>
        /// <param name="offset"></param>
        /// <param name="count"></param>
        public override void Write(byte[] buffer, int offset, int count)
        {
            if (count < 0) count = buffer.Length - offset;

            var ms = new MemoryStream(4 + count);
            ms.WriteEncodedInt(count);
            ms.Write(buffer, offset, count);
            ms.Position = 0;

            _s.Write(ms);
        }

        /// <summary>忽略缓存立马向下一层写入所有数据</summary>
        public override void Flush() { _s.Flush(); }
        #endregion

        #region 读取
        /// <summary>读取</summary>
        /// <param name="buffer"></param>
        /// <param name="offset"></param>
        /// <param name="count"></param>
        /// <returns></returns>
        public override int Read(byte[] buffer, int offset, int count)
        {
            var p = ReadPosition + Size;
            // 一个全新的包,或者还没完全的包,都要从底层读取更多数据
            if (Size == 0 || p > _ms.Length)
            {
                // 检查内存流空间大小,至少能放下当前数据包
                if (_ms.Capacity < p) _ms.Capacity = p;
                // 从内部流读取数据,写到最后。这里必须移动指针,因为以前可能移动到别的地方去了
                _ms.Position = _ms.Length;
                _ms.Write(_s);

                // 尝试读取长度
                if (Size == 0)
                {
                    _ms.Position = ReadPosition;
                    while (Size == 0)
                    {
                        var n = 0;
                        if (!_ms.TryReadEncodedInt(out n)) return 0;
                        Size = n;
                    }
                    // 读取长度后,指针移到新的位置
                    ReadPosition = (Int32)_ms.Position;
                    p = ReadPosition + Size;
                }
            }
            // 数据还是不够一个包,走吧
            if (Size == 0 || p > _ms.Length) return 0;

            // 这里开始,说明数据足够一个包,开始读出来
            _ms.Position = ReadPosition;
            // 如果缓冲区大小不够,则需要特殊处理
            var rs = _ms.Read(buffer, offset, Size <= count ? Size : count);
            // 扔掉多余部分
            if (Size > count) _ms.ReadBytes(Size - count);
            ReadPosition += Size;

            // 清空包大小,准备下一次包
            Size = 0;
            // 如果后面没有数据,直接把长度设为0重用缓冲区,否则缓冲区过大时需要拷贝
            if (_ms.Position == _ms.Length)
            {
                _ms.SetLength(0);
                ReadPosition = 0;
            }
            else
            {
                // 读取下一次大小
                Size = _ms.ReadEncodedInt();
                ReadPosition = (Int32)_ms.Position;

                if (_ms.Position > 8 * 1024)
                {
                    // 构造新的缓冲区
                    var ns = new MemoryStream();
                    ns.Write(_ms);
                    _ms.SetLength(0);
                    ReadPosition = 0;

                    // 剩余数据重新写入缓冲区
                    ns.Position = 0;
                    _ms.Write(ns);
                }
            }
            return rs;
        }
        #endregion

        #region 重载
        /// <summary>是否可以读取</summary>
        public override bool CanRead { get { return true; } }

        /// <summary>是否可以查找</summary>
        public override bool CanSeek { get { return false; } }

        /// <summary>是否可以写入</summary>
        public override bool CanWrite { get { return true; } }

        /// <summary>长度</summary>
        public override long Length { get { return _ms.Length; } }

        /// <summary>位置</summary>
        public override long Position { get { return _s.Position; } set { _s.Position = value; } }

        /// <summary>设置当前流中的位置</summary>
        /// <param name="offset"></param>
        /// <param name="origin"></param>
        /// <returns></returns>
        public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }

        /// <summary>设置长度</summary>
        /// <param name="value"></param>
        public override void SetLength(long value) { throw new NotSupportedException(); }

        #endregion
    }
}