必须填写至少10个字的日志
nnhy authored at 2012-07-27 18:48:21
8.23 KiB
X
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Xml.Serialization;
using NewLife.Serialization;
#if NET4
using System.Linq;
#else
using NewLife.Linq;
#endif

namespace NewLife.Messaging
{
    /// <summary>组消息</summary>
    /// <remarks>
    /// 对于超长消息,可拆分为多个组消息进行传输,然后在目的地重组。
    /// </remarks>
    public class GroupMessage : Message
    {
        #region 属性
        /// <summary>消息类型</summary>
        [XmlIgnore]
        public override MessageKind Kind { get { return MessageKind.Group; } }

        private Int32 _Identity;
        /// <summary>唯一标识</summary>
        public Int32 Identity { get { return _Identity; } set { _Identity = value; } }

        private Int32 _Index;
        /// <summary>在组中的索引位置</summary>
        public Int32 Index { get { return _Index; } set { _Index = value; } }

        private Int32 _Count;
        /// <summary>分组数</summary>
        public Int32 Count { get { return _Count; } set { _Count = value; } }

        private Byte[] _Data;
        /// <summary>数据</summary>
        public Byte[] Data { get { return _Data; } set { _Data = value; } }
        #endregion

        #region 方法
        #endregion

        #region 辅助
        /// <summary>已重载。</summary>
        /// <returns></returns>
        public override string ToString()
        {
            var len = Data == null ? 0 : Data.Length;
            return String.Format("{0} {1} {2}/{3} Length={4}", base.ToString(), Identity, Index, Count, len);
        }
        #endregion
    }

    /// <summary>消息组。</summary>
    /// <remarks>
    /// 20120421,连续两天半,完成消息分片组装算法,尽可能的让分片占用最大大小。
    /// </remarks>
    public class MessageGroup : IEnumerable<GroupMessage>
    {
        #region 属性
        private static Int32 _gid = 1;
        private Int32 _Identity = _gid++;
        /// <summary>唯一标识</summary>
        public Int32 Identity { get { return _Identity; } set { _Identity = value; } }

        private List<GroupMessage> _Items;
        /// <summary>消息集合</summary>
        List<GroupMessage> Items { get { return _Items ?? (_Items = new List<GroupMessage>()); } /*set { _Items = value; }*/ }

        /// <summary>第一个组消息,上面有总记录数</summary>
        public GroupMessage First { get { return Items.Count > 0 ? Items[0] : null; } }

        private Int32 _Total;
        /// <summary>总的组消息数</summary>
        public Int32 Total { get { return _Total; } private set { _Total = value; } }

        /// <summary>组消息个数</summary>
        public Int32 Count { get { return Items.Count; } }
        #endregion

        #region 拆分
        /// <summary>拆分数据流为多个消息</summary>
        /// <param name="stream"></param>
        /// <param name="size"></param>
        /// <param name="header"></param>
        public void Split(Stream stream, Int32 size, MessageHeader header = null)
        {
            // 组包消息全部采用消息头长度,这里估算,方便内部预留大小,如果包大小刚好在边界上,可能会浪费一个字节空间,不过这个可能性很小
            if (header == null) header = new MessageHeader();
            header.Length = size;

            // 消息头大小
            var headerLength = 0;
            if (header != null) headerLength = header.ToArray().Length;

            // 先估算数据包个数
            var count = (Int32)Math.Ceiling((Double)(stream.Length - stream.Position) / (size - (1 + 4 * 3 + 1) - headerLength));

            // 估计组消息头部长度,最大化构造包。
            // Kind + 消息头
            var len = 1 + headerLength;
            // Identity
            len += GetBytesCount(Identity);
            // Count
            len += 1;

            // 加上数据包大小。因为压缩整数的存在,这里不是绝对准确,但是大多数时候不会有问题
            len += GetBytesCount(size);
            // !!!不要忘了数据部分的对象引用
            len += 1;

            // 计算数据部分大小
            var index = Items.Count;
            while (stream.Position < stream.Length)
            {
                var msg = new GroupMessage();
                msg.Header = header.Clone();
                msg.Identity = Identity;
                msg.Index = ++index;
                //msg.Count = count;

                // 加上索引长度,计算真正的头部长度
                var trueLen = len + GetBytesCount(msg.Index);
                // 第一个元素采用精确Count
                if (msg.Index == 1) trueLen += GetBytesCount(count) - 1;

                var len2 = stream.Length - stream.Position;
                if (len2 > size - trueLen) len2 = size - trueLen;
                //var buffer = new Byte[len2];
                //stream.Read(buffer, 0, buffer.Length);
                //msg.Data = buffer;
                msg.Data = stream.ReadBytes(len2);

                // 减去Header以外的全部长度
                msg.Header.Length = (Int32)(trueLen + len2) - headerLength;

                // 最后一个修正长度,因为数据量可能很少,前面的GetBytesCount(size)可能不对。
                if (len2 < size - trueLen) msg.Header.Length += GetBytesCount(len2) - GetBytesCount(size);

                lock (Items)
                {
                    Items.Add(msg);
                }
            }

            //if (Items.Count > 0) Items[0].Count = Items.Count;
            // 第一个组消息的Count是准确的,总数Count小于128的全部使用实际总数Count,其它都是用0
            count = Items.Count;
            var isLittle = count < 128;
            foreach (var item in Items)
            {
                item.Count = isLittle ? count : 0;
            }
            if (count > 0) Items[0].Count = count;

            Total = count;
        }
        #endregion

        #region 重组
        /// <summary>添加组消息,返回是否已收到所有组消息。</summary>
        /// <param name="message"></param>
        /// <returns></returns>
        public Boolean Add(GroupMessage message)
        {
            if (Items.Count <= 0)
                Identity = message.Identity;
            else if (message.Identity != Identity)
                throw new ArgumentException("组消息的标识不匹配!", "message");

            lock (Items)
            {
                if (!Items.Any(e => e.Index == message.Index)) Items.Add(message);
            }

            if (message.Index == 1) Total = message.Count;

            return Items.Count > 0 && Items.Count == Total;
        }
        #endregion

        #region 输出
        /// <summary>获取整个数据流</summary>
        /// <returns></returns>
        public Stream GetStream()
        {
            var ms = new MemoryStream();
            foreach (var item in Items.OrderBy(e => e.Index))
            {
                ms.Write(item.Data, 0, item.Data.Length);
            }
            ms.Position = 0;
            return ms;
        }

        /// <summary>获取整个消息</summary>
        /// <returns></returns>
        public Message GetMessage() { return Message.Read(GetStream()); }
        #endregion

        #region 辅助
        /// <summary>已重载。</summary>
        /// <returns></returns>
        public override string ToString()
        {
            var ts = Items;
            var count = ts.Count > 0 ? ts[0].Count : 0;
            return String.Format("{0} {1}/{2}", Identity, ts.Count, count);
        }

        static Int32 GetBytesCount(Int64 n) { return BinaryWriterX.GetEncodedIntSize(n); }
        #endregion

        #region IEnumerable<GroupMessage> 成员
        IEnumerator<GroupMessage> IEnumerable<GroupMessage>.GetEnumerator() { return Items.GetEnumerator(); }

        IEnumerator IEnumerable.GetEnumerator() { return (this as IEnumerable<GroupMessage>).GetEnumerator(); }
        #endregion
    }
}