优化ETL过滤模块
大石头 编写于 2017-08-29 17:11:46
X
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using NewLife.Data;

namespace NewLife.Net
{
    /// <summary>发送队列。以队列发送数据包,自动拆分大包,合并小包</summary>
    class SendQueue : DisposeBase
    {
        #region 属性
        public SessionBase Session { get; private set; }

        public Int32 BufferSize { get; private set; }

        private SocketAsyncEventArgs _seSend;
        private Int32 _Sending;
        private ConcurrentQueue<QueueItem> _SendQueue = new ConcurrentQueue<QueueItem>();
        #endregion

        #region 构造
        public SendQueue(SessionBase session)
        {
            Session = session;

            BufferSize = session.BufferSize;
        }

        /// <summary>销毁</summary>
        /// <param name="disposing"></param>
        protected override void OnDispose(Boolean disposing)
        {
            base.OnDispose(disposing);

            var reason = GetType().Name + (disposing ? "Dispose" : "GC");
            Release(reason);
        }
        #endregion

        #region 主要方法
        internal Boolean Add(Packet pk, IPEndPoint remote)
        {
            var count = pk.Total;
            var ss = Session;
            ss.StatSend.Increment(count);
            if (ss.LogSend) ss.WriteLog("SendAsync [{0}]: {1}", count, pk.ToHex());

            ss.LastTime = DateTime.Now;

            // 打开UDP广播
            if (ss.Local.Type == NetType.Udp && remote != null && Object.Equals(remote.Address, IPAddress.Broadcast)) ss.Client.EnableBroadcast = true;

            // 同时只允许一个异步发送,其它发送放入队列

            // 考虑到超长数据包,拆分为多个包
            if (count <= BufferSize)
            {
                var qi = new QueueItem();
                qi.Packet = pk;
                qi.Remote = remote;

                _SendQueue.Enqueue(qi);
            }
            else
            {
                // 数据包切分,共用数据区,不需要内存拷贝
                var idx = 0;
                while (true)
                {
                    var remain = count - idx;
                    if (remain <= 0) break;

                    var len = Math.Min(remain, BufferSize);

                    var qi = new QueueItem();
                    //qi.Packet = new Packet(pk.Data, pk.Offset + idx, len);
                    qi.Packet = new Packet(pk.ReadBytes(idx, len));
                    qi.Remote = remote;

                    _SendQueue.Enqueue(qi);

                    idx += len;
                }
            }

            Check(false);

            return true;
        }

        internal void Release(String reason)
        {
            _seSend.TryDispose();
            _seSend = null;

            Session.WriteLog("释放SendSA {0} {1}", 1, reason);
        }

        void Check(Boolean io)
        {
            // 如果已销毁,则停止检查发送队列
            if (Session.Client == null || Session.Disposed) return;

            var qu = _SendQueue;
            if (qu.Count == 0) return;

            // 如果没有在发送,就开始发送
            if (Interlocked.CompareExchange(ref _Sending, 1, 0) != 0) return;

            QueueItem qi = null;
            if (!qu.TryDequeue(out qi)) return;

            var se = _seSend;
            if (se == null)
            {
                var buf = new Byte[BufferSize];
                se = _seSend = new SocketAsyncEventArgs();
                se.SetBuffer(buf, 0, buf.Length);
                se.Completed += (s, e) => Process(e);

                Session.WriteLog("创建SendSA {0}", 1);
            }

            se.RemoteEndPoint = qi.Remote;

            // 拷贝缓冲区,设置长度
            var p = 0;
            var remote = qi.Remote;

            // 为了提高吞吐量,减少数据收发次数,尽可能的把发送队列多个数据包合并成为一个大包发出
            while (true)
            {
                var pk = qi.Packet;
                var len = pk.Total;

                if (pk?.Data == null || se.Buffer == null) break;

                pk.WriteTo(se.Buffer, p);
                p += len;

                // 不足最大长度,试试下一个
                if (!qu.TryPeek(out qi)) break;
                if (qi.Remote + "" != remote + "") break;
                if (p + qi.Packet.Count > BufferSize) break;

                if (!qu.TryDequeue(out qi)) break;
            }

            se.SetBuffer(0, p);

            if (!Session.OnSendAsync(se))
            {
                if (io)
                    Process(se);
                else
                    Task.Factory.StartNew(s => Process(s as SocketAsyncEventArgs), se);
            }
        }

        void Process(SocketAsyncEventArgs se)
        {
            if (!Session.Active)
            {
                Release("!Active " + se.SocketError);

                return;
            }

            // 判断成功失败
            if (se.SocketError != SocketError.Success)
            {
                // 未被关闭Socket时,可以继续使用
                //if (!se.IsNotClosed())
                {
                    var ex = se.GetException();
                    if (ex != null) Session.OnError("SendAsync", ex);

                    Release("SocketError " + se.SocketError);

                    //if (se.SocketError == SocketError.ConnectionReset) Dispose();
                    if (se.SocketError == SocketError.ConnectionReset) Session.Close("SendAsync " + se.SocketError);

                    return;
                }
            }

            // 发送新的数据
            if (Interlocked.CompareExchange(ref _Sending, 0, 1) == 1) Check(true);
        }
        #endregion

        class QueueItem
        {
            public Packet Packet { get; set; }
            public IPEndPoint Remote { get; set; }
        }
    }
}