v7.3.2018.0614   重构高性能资源池,减少GC压力,增加线程池,让异步任务得到平等竞争CPU的机会
大石头 编写于 2018-06-14 17:56:44
X
using System;
using System.Collections;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using NewLife.Exceptions;
using NewLife.Log;
using NewLife.Net.Common;
using NewLife.Reflection;
using NewLife.Threading;

namespace NewLife.Net.Sockets
{
    /// <summary>Socket基类</summary>
    /// <remarks>
    /// 主要是对Socket封装一层,把所有异步操作结果转移到<see cref="RaiseComplete"/>去。
    /// 
    /// 网络模型处理流程
    /// 1,实例化对象,Socket属性为空,可以由外部赋值。此时可以设置协议和监听地址端口等。
    /// 2,如果外部没有给Socket赋值,EnsureCreate构造Socket。此时可以给Socket设置各种参数。
    /// 3,使用Bind绑定本地监听地址和端口。到此完成了初始化的所有工作。
    /// 
    /// 异步处理流程:
    /// AcceptAsync/ReceiveAsync    异步接受/异步接收
    ///     StartAsync             统一的开始异步方法,如果异步方法同步返回,则采用线程池调用回调方法,转为异步处理
    ///         EnsureCreate       确保已创建Socket
    ///         Bind               确保Socket已绑定到本地地址和端口
    ///         Pop                借出网络参数
    ///         =>RaiseComplete    异步回调方法,处理所有异步事件的起始点
    ///             ->Completed    完成事件,可以取消处理
    ///                 Push       取消处理时,归还网络参数
    ///                 return
    ///             OnComplete     子类通过重载来处理各种异步事件
    ///                 OnAccept/OnReceive
    ///                     Push    如果处理过程中,没有外部注册事件,则马上归还事件,然后开始新的异步操作
    ///                     AcceptAsync/ReceiveAsync
    ///                     return
    ///                 Process
    ///                     ->OnProcess   统一的事件处理核心
    ///                         AcceptAsync/ReceiveAsync
    ///                         OnError 如果异步处理失败,不是<see cref="SocketError.Success"/>,则触发错误事件,然后退出
    ///                             return
    ///                         ProcessAccept/ProcessReceive
    ///                         Push    每次用完都还,保证不出错丢失
    ///                         OnError 处理异常时触发错误事件
    ///                         AcceptAsync/ReceiveAsync
    ///                         OnError 重新开始异步处理出错,触发错误事件,但不干涉当前处理中的网络参数
    ///             OnError
    ///         Push   开始异步处理异常时,归还网络参数再抛出异常
    /// </remarks>
    public class SocketBase : Netbase, ISocket
    {
        #region 属性
        private Socket _Socket;
        /// <summary>套接字</summary>
        internal protected Socket Socket
        {
            get { return _Socket; }
            set
            {
                // 外部设置套接字时,除非是已绑定的,否则不清除本地Uri
                //if (value != null && value.IsBound) _LocalUri = _RemoteUri = null;
                if (value != null && value.IsBound) _LocalUri = null;

                _Socket = value;
            }
        }

        /// <summary>基础Socket对象</summary>
        Socket ISocket.Socket { get { return Socket; } set { Socket = value; } }

        #region 本地
        private NetUri _LocalUri;
        /// <summary>本地地址</summary>
        public NetUri LocalUri
        {
            get
            {
                if (_LocalUri != null) return _LocalUri;

                var uri = new NetUri(ProtocolType, new IPEndPoint(IPAddress.Any, 0));
                uri.ProtocolType = ProtocolType;
                var socket = Socket;
                try
                {
                    if (socket != null)
                    {
                        uri.EndPoint = socket.LocalEndPoint as IPEndPoint;
                        uri.ProtocolType = socket.ProtocolType;
                    }
                }
                catch (ObjectDisposedException) { }

                return _LocalUri = uri;
            }
        }

        /// <summary>协议类型</summary>
        public virtual ProtocolType ProtocolType { get { return _LocalUri == null ? 0 : _LocalUri.ProtocolType; } }

        /// <summary>监听本地地址</summary>
        public IPAddress Address { get { return LocalUri.Address; } set { LocalUri.Address = value; } }

        /// <summary>监听本地端口</summary>
        public Int32 Port { get { return LocalUri.Port; } set { LocalUri.Port = value; } }

        /// <summary>本地地址族</summary>
        public AddressFamily AddressFamily
        {
            get { return Address.AddressFamily; }
            set
            {
                // 根据地址族选择合适的本地地址
                LocalUri.Address = LocalUri.Address.GetRightAny(value);
            }
        }

        /// <summary>本地终结点</summary>
        public IPEndPoint LocalEndPoint { get { return LocalUri.EndPoint; } }
        #endregion

        //private Int32 _BufferSize = 10240;
        //! 注意:大于85K会进入LOH(大对象堆)
        //private Int32 _BufferSize = 80 * 1024;
        private Int32 _BufferSize = 8 * 1024;
        /// <summary>缓冲区大小</summary>
        public Int32 BufferSize { get { return _BufferSize; } set { _BufferSize = value; } }

        //private Boolean _NoDelay = true;
        ///// <summary>禁用接收延迟,收到数据后马上建立异步读取再处理本次数据</summary>
        //public Boolean NoDelay { get { return _NoDelay; } set { _NoDelay = value; } }

        //private Boolean _UseThreadPool;
        ///// <summary>是否使用线程池处理事件。建议仅在事件处理非常耗时时使用线程池来处理。</summary>
        //public Boolean UseThreadPool { get { return _UseThreadPool; } set { _UseThreadPool = value; } }

        /// <summary>不管客户端还是服务端,只允许用一个网络事件</summary>
        NetEventArgs _arg;
        #endregion

        #region 扩展属性
        private Boolean _ReuseAddress;
        /// <summary>允许将套接字绑定到已在使用中的地址。</summary>
        public Boolean ReuseAddress
        {
            get
            {
                if (Socket == null) return _ReuseAddress;

                Object value = Socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress);
                return value != null && Convert.ToBoolean(value);
            }
            set
            {
                if (Socket != null) Socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, value);
                _ReuseAddress = value;
            }
        }

        private IDictionary _Items;
        /// <summary>数据字典</summary>
        public IDictionary Items { get { return _Items ?? (_Items = new Hashtable(StringComparer.OrdinalIgnoreCase)); } }

        private Int32 _AsyncCount;
        /// <summary>异步操作计数</summary>
        public virtual Int32 AsyncCount { get { return _AsyncCount; } /*set { _AsyncCount = value; }*/ }
        #endregion

        #region 构造
        static SocketBase()
        {
            // 注册接口实现
            NetService.Install();

            // 自动收缩内存,调试状态1分钟一次,非调试状态10分钟一次
            Int32 time = NetHelper.Debug ? 60000 : 600000;
            new TimerX(s => Runtime.ReleaseMemory(), null, time, time, false);

#if !NET4
            try
            {
                // 写在另外一个方法里面,保证不会在当前方法编译的时候就报错
                CheckNet21();

                // 回收一次,引发编译NetEventArgs的析构函数
                GC.Collect();
            }
            catch (TypeLoadException ex)
            {
                if (ex.TypeName.Contains("SocketAsyncEventArgs"))
                    throw new XException("NewLife.Net网络库需要.Net2.0 Sp1支持!", ex);
                else
                    throw ex;
            }
#endif
        }

#if !NET4
        static void CheckNet21()
        {
            var e = new NetEventArgs();
            //(e as IDisposable).Dispose();
            e.AcceptSocket = null;
        }
#endif

        /// <summary>实例化</summary>
        public SocketBase()
        {
            // 子类可能重载协议类型
            LocalUri.ProtocolType = ProtocolType;
            //RemoteUri.ProtocolType = ProtocolType;

            _arg = NetEventArgs.Pop();
            _arg.Completed += (s, e) => RaiseComplete(e as NetEventArgs);

            SetShowEventLog();
        }
        #endregion

        #region 释放资源
        /// <summary>关闭网络操作</summary>
        public void Close() { Dispose(); }

        /// <summary>子类重载实现资源释放逻辑</summary>
        /// <param name="disposing">从Dispose调用(释放所有资源)还是析构函数调用(释放非托管资源)</param>
        protected override void OnDispose(bool disposing)
        {
            base.OnDispose(disposing);

            //  if (_arg != null) NetEventArgs.Push(_arg); //此处不能做此操作,会导致异常,并且连接无法正常断开,移动把网络清理干净后,再将参数还回池中

            var socket = Socket;
            if (socket != null)
            {
                //NetHelper.Close(socket, ReuseAddress);
                // 此时Socket可能已经销毁,不能进行Socket操作
                NetHelper.Close(socket, _ReuseAddress);
                Socket = null;
            }

            if (_Statistics != null)
            {
                var dp = _Statistics as IDisposable;
                if (dp != null) dp.Dispose();
                _Statistics = null;
            }
            if (_arg != null)
                NetEventArgs.Push(_arg);
        }
        #endregion

        #region 方法
        /// <summary>确保创建基础Socket对象</summary>
        protected virtual void EnsureCreate()
        {
            if (Socket != null || Disposed) return;

            Socket socket = null;
            var addrf = AddressFamily;
            var protocol = ProtocolType;
            switch (ProtocolType)
            {
                case ProtocolType.Tcp:
                    socket = new Socket(addrf, SocketType.Stream, protocol);
                    socket.SetTcpKeepAlive(true);
                    break;
                case ProtocolType.Udp:
                    socket = new Socket(addrf, SocketType.Dgram, protocol);
                    break;
                default:
                    socket = new Socket(addrf, SocketType.Unknown, protocol);
                    break;
            }

            // 设置超时时间
            socket.SendTimeout = 10000;
            socket.ReceiveTimeout = 10000;

            // 加大接收缓冲区,为BufferSize的10倍,但不超过80k
            var bufsize = BufferSize * 10;
            if (bufsize > 80 * 1024) bufsize = 80 * 1024;
            socket.ReceiveBufferSize = bufsize;

            Socket = socket;

            if (_ReuseAddress) ReuseAddress = true;
        }

        /// <summary>绑定本地终结点</summary>
        public virtual void Bind()
        {
            if (Disposed) return;

            EnsureCreate();
            var socket = Socket;
            if (socket != null && !socket.IsBound)
            {
                var ep = new IPEndPoint(Address, Port);
                socket.Bind(ep);

                //_LocalUri = _RemoteUri = null;
                _LocalUri = null;
            }
        }

        /// <summary>开始异步操作。</summary>
        /// <param name="callback"></param>
        internal protected void StartAsync(Func<NetEventArgs, Boolean> callback)
        {
            if (Disposed) return;

            EnsureCreate();
            var socket = Socket;
            if (socket == null) return;

            // Accept得到的socket不需要绑定
            if (!socket.IsBound) Bind();

            var e = _arg;
            // 如果立即返回,则异步处理完成事件
            if (!callback(e))
            {
                // 如果已销毁或取消,则不处理
                if (!e.Cancel) ThreadPool.QueueUserWorkItem(state => RaiseComplete(state as NetEventArgs), e);
            }
            else
                // 异步开始,增加一个计数
                Interlocked.Increment(ref _AsyncCount);
        }
        #endregion

        #region 完成事件
        /// <summary>触发完成事件。如果是异步返回,则在IO线程池中执行;如果是同步返回,则在用户线程池中执行。
        /// 可能由工作线程(事件触发)调用,也可能由用户线程通过线程池线程调用。
        /// 作为顶级,将会处理所有异常并调用OnError,其中OnError有能力回收参数e。
        /// </summary>
        /// <param name="e"></param>
        void RaiseComplete(NetEventArgs e)
        {
            // 异步完成,减少一个计数
            Interlocked.Decrement(ref _AsyncCount);

            if (ShowEventLog && Log.Level >= LogLevel.Debug) ShowEvent(e);

            try
            {
                // 这里直接处理操作取消
                if (e.SocketError != SocketError.OperationAborted)
                    OnComplete(e);
                else
                    OnError(e, null);
            }
            catch (Exception ex)
            {
                // 都是在线程池线程里面了,不要往外抛出异常
                OnError(e, ex);
            }
        }

        /// <summary>完成事件分发中心。
        /// 正常执行时OnComplete必须保证回收参数e,异常时RaiseComplete将能够代为回收
        /// </summary>
        /// <param name="e"></param>
        protected virtual void OnComplete(NetEventArgs e) { }
        #endregion

        #region 异步结果处理
        /// <summary>处理异步结果。</summary>
        /// <param name="e">事件参数</param>
        /// <param name="start">开始新异步操作的委托</param>
        /// <param name="process">处理结果的委托</param>
        protected virtual void Process(NetEventArgs e, Func start, Action<NetEventArgs> process)
        {
            // Socket错误由各个处理器来处理
            if (e.SocketError != SocketError.Success)
            {
                OnError(e, null);
                return;
            }

            try
            {
                // 业务处理的任何异常,都将引发Error事件,但不会影响重新建立新的异步操作
                process(e);
            }
            catch (Exception ex)
            {
                try
                {
                    OnError(e, ex);
                }
                catch { }

                return;
            }

            // 如果不是操作取消,在处理业务完成后再开始异步操作
            if (!Disposed) start();
        }
        #endregion

        #region 错误处理
        /// <summary>错误发生/断开连接时</summary>
        public event EventHandler<ExceptionEventArgs> Error;

        /// <summary>错误发生/断开连接时。拦截Error事件中的所有异常,不外抛,防止因为Error异常导致多次调用OnError</summary>
        /// <param name="e"></param>
        /// <param name="ex"></param>
        internal protected void ProcessError(NetEventArgs e, Exception ex)
        {
            if (Error != null)
            {
                try
                {
                    Error(this, new ExceptionEventArgs { Exception = ex });
                }
                catch (Exception ex2)
                {
                    WriteLog(ex2.ToString());
                }
            }
        }

        /// <summary>错误发生时。负责调用Error事件以及回收网络事件参数</summary>
        /// <remarks>OnError除了会调用ProcessError外,还会关闭Socket</remarks>
        /// <param name="e"></param>
        /// <param name="ex"></param>
        protected virtual void OnError(NetEventArgs e, Exception ex)
        {
            try
            {
                ProcessError(e, ex);
            }
            finally
            {
                Close();
            }
        }
        #endregion

        #region 辅助
        /// <summary>检查缓冲区大小</summary>
        /// <param name="e"></param>
        [Conditional("DEBUG")]
        internal protected void CheckBufferSize(NetEventArgs e)
        {
            //Int32 n = e.BytesTransferred;
            //if (n >= e.Buffer.Length || ProtocolType == ProtocolType.Tcp && n >= 1460 || ProtocolType == ProtocolType.Udp && n >= 1464)
            //{
            //    WriteLog("接收的实际数据大小{0}超过了缓冲区大小,需要根据真实MTU调整缓冲区大小以提高效率!", n);
            //}
        }

        private Boolean _ShowEventLog;
        /// <summary>是否显示事件日志</summary>
        public Boolean ShowEventLog { get { return _ShowEventLog; } set { _ShowEventLog = value; } }

        [Conditional("DEBUG")]
        void SetShowEventLog() { ShowEventLog = true; }

        void ShowEvent(NetEventArgs e)
        {
            //WriteLog("Completed[{4}] {0} {1} {2} [{3}]", this, e.LastOperation, e.SocketError, e.BytesTransferred, e.ID);
            var sb = new StringBuilder();
            sb.AppendFormat("[{0}] {1} {2}://{3}", e.ID, e.LastOperation, ProtocolType, LocalEndPoint);
            var ep = e.RemoteIPEndPoint;
            //if (ep == null || ep.Address.IsAny()) ep = RemoteEndPoint;
            if ((ep == null || ep.Address.IsAny()) && e.LastOperation == SocketAsyncOperation.Accept && e.AcceptSocket != null) ep = e.AcceptSocket.RemoteEndPoint as IPEndPoint;
            if (ep != null && !ep.Address.IsAny()) sb.AppendFormat("=>{0}", ep);
            //sb.AppendFormat(" {0}", e.LastOperation);
            if (e.SocketError != SocketError.Success) sb.AppendFormat(" {0}", e.SocketError);
            sb.AppendFormat(" [{0}]", e.BytesTransferred);
            WriteLog(sb.ToString());
        }

        /// <summary>已重载。</summary>
        /// <returns></returns>
        public override string ToString()
        {
            ////if (RemoteEndPoint == null)
            //return String.Format("{0}://{1}", ProtocolType, LocalEndPoint);
            ////else
            ////    return String.Format("{0}://{1}=>{2}", ProtocolType, LocalEndPoint, RemoteEndPoint);
            return LocalUri.ToString();
        }
        #endregion

        #region 统计
        private IStatistics _Statistics;
        /// <summary>统计信息,默认关闭,通过<see cref="IStatistics.Enable"/>打开。</summary>
        public IStatistics Statistics { get { return _Statistics ?? (_Statistics = NetService.Container.Resolve<IStatistics>()); } }

        /// <summary>增加计数</summary>
        protected void IncCounter() { Statistics.Increment(); }
        #endregion
    }
}