修正协议类型不正确的BUG
nnhy authored at 2012-06-27 14:53:36
25.56 KiB
X
using System;
using System.Collections;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using NewLife.Exceptions;
using NewLife.Net.Common;
using NewLife.Reflection;
using NewLife.Threading;

namespace NewLife.Net.Sockets
{
    /// <summary>Socket基类</summary>
    /// <remarks>
    /// 主要是对Socket封装一层,把所有异步操作结果转移到事件中<see cref="Completed"/>去。
    /// 参数池维护这所有事件参数,借出<see cref="Pop"/>时挂接<see cref="Completed"/>事件,
    /// 归还<see cref="Push"/>时,取消<see cref="Completed"/>事件。
    /// 
    /// 网络模型处理流程
    /// 1,实例化对象,Socket属性为空,可以由外部赋值。此时可以设置协议和监听地址端口等。
    /// 2,如果外部没有给Socket赋值,EnsureCreate构造Socket。此时可以给Socket设置各种参数。
    /// 3,使用Bind绑定本地监听地址和端口。到此完成了初始化的所有工作。
    /// 
    /// 异步处理流程:
    /// AcceptAsync/ReceiveAsync    异步接受/异步接收
    ///     StartAsync             统一的开始异步方法,如果异步方法同步返回,则采用线程池调用回调方法,转为异步处理
    ///         EnsureCreate       确保已创建Socket
    ///         Bind               确保Socket已绑定到本地地址和端口
    ///         Pop                借出网络参数
    ///         =>RaiseComplete    异步回调方法,处理所有异步事件的起始点
    ///             ->Completed    完成事件,可以取消处理
    ///                 Push       取消处理时,归还网络参数
    ///                 return
    ///             OnComplete     子类通过重载来处理各种异步事件
    ///                 OnAccept/OnReceive
    ///                     Push    如果处理过程中,没有外部注册事件,则马上归还事件,然后开始新的异步操作
    ///                     AcceptAsync/ReceiveAsync
    ///                     return
    ///                 Process    这里<see cref="UseThreadPool"/>决定是否采用线程池处理,因此OnProcess必须有独立的异常处理能力,保证回收网络参数
    ///                     ->OnProcess   统一的事件处理核心
    ///                         AcceptAsync/ReceiveAsync    这里<see cref="NoDelay"/>觉得是否马上开始异步。
    ///                         OnError 如果异步处理失败,不是<see cref="SocketError.Success"/>,则触发错误事件,然后退出
    ///                             return
    ///                         ProcessAccept/ProcessReceive
    ///                         Push    每次用完都还,保证不出错丢失
    ///                         OnError 处理异常时触发错误事件
    ///                         AcceptAsync/ReceiveAsync
    ///                         OnError 重新开始异步处理出错,触发错误事件,但不干涉当前处理中的网络参数
    ///             OnError
    ///         Push   开始异步处理异常时,归还网络参数再抛出异常
    /// </remarks>
    public class SocketBase : Netbase, ISocket
    {
        #region 属性
        private Socket _Socket;
        /// <summary>套接字</summary>
        internal protected Socket Socket
        {
            get { return _Socket; }
            set
            {
                // 外部设置套接字时,除非是已绑定的,否则不清除本地Uri
                if (value != null && value.IsBound) _LocalUri = _RemoteUri = null;

                _Socket = value;
            }
        }

        /// <summary>基础Socket对象</summary>
        Socket ISocket.Socket { get { return Socket; } set { Socket = value; } }

        /// <summary>协议类型</summary>
        public virtual ProtocolType ProtocolType { get { return LocalUri.ProtocolType; } }

        /// <summary>监听本地地址</summary>
        public IPAddress Address { get { return LocalUri.Address; } set { LocalUri.Address = value; } }

        /// <summary>监听本地端口</summary>
        public Int32 Port { get { return LocalUri.Port; } set { LocalUri.Port = value; } }

        /// <summary>本地地址族</summary>
        public AddressFamily AddressFamily
        {
            get { return Address.AddressFamily; }
            set
            {
                // 根据地址族选择合适的本地地址
                LocalUri.Address = LocalUri.Address.GetRightAny(value);
            }
        }

        /// <summary>本地终结点</summary>
        public IPEndPoint LocalEndPoint { get { return LocalUri.EndPoint; } }

        /// <summary>远程终结点</summary>
        public IPEndPoint RemoteEndPoint { get { return RemoteUri.EndPoint; } }

        private NetUri _LocalUri;
        /// <summary>本地地址</summary>
        public NetUri LocalUri
        {
            get
            {
                if (_LocalUri != null) return _LocalUri;

                var uri = new NetUri();
                var socket = Socket;
                try
                {
                    if (socket != null) uri.EndPoint = socket.LocalEndPoint as IPEndPoint;
                }
                catch (ObjectDisposedException) { }

                return _LocalUri = uri;
            }
        }

        private NetUri _RemoteUri;
        /// <summary>远程地址</summary>
        public NetUri RemoteUri
        {
            get
            {
                if (_RemoteUri != null)
                {
                    _RemoteUri.ProtocolType = ProtocolType;
                    return _RemoteUri;
                }

                var uri = new NetUri();
                uri.ProtocolType = ProtocolType;
                var socket = Socket;
                try
                {
                    if (socket != null && socket.Connected) uri.EndPoint = socket.RemoteEndPoint as IPEndPoint;
                }
                catch (ObjectDisposedException) { }

                return _RemoteUri = uri;
            }
        }

        //private Int32 _BufferSize = 10240;
        //! 注意:大于85K会进入LOH(大对象堆)
        //private Int32 _BufferSize = 80 * 1024;
        private Int32 _BufferSize = 8 * 1024;
        /// <summary>缓冲区大小</summary>
        public Int32 BufferSize { get { return _BufferSize; } set { _BufferSize = value; } }

        private Boolean _NoDelay = true;
        /// <summary>禁用接收延迟,收到数据后马上建立异步读取再处理本次数据</summary>
        public Boolean NoDelay { get { return _NoDelay; } set { _NoDelay = value; } }

        private Boolean _UseThreadPool;
        /// <summary>是否使用线程池处理事件。建议仅在事件处理非常耗时时使用线程池来处理。</summary>
        public Boolean UseThreadPool { get { return _UseThreadPool; } set { _UseThreadPool = value; } }
        #endregion

        #region 扩展属性
        private Boolean _ReuseAddress;
        /// <summary>允许将套接字绑定到已在使用中的地址。</summary>
        public Boolean ReuseAddress
        {
            get
            {
                if (Socket == null) return _ReuseAddress;

                Object value = Socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress);
                return value != null && Convert.ToBoolean(value);
            }
            set
            {
                if (Socket != null) Socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, value);
                _ReuseAddress = value;
            }
        }

        private IDictionary _Items;
        /// <summary>数据字典</summary>
        public IDictionary Items { get { return _Items ?? (_Items = new Hashtable(StringComparer.OrdinalIgnoreCase)); } }

        private Int32 _AsyncCount;
        /// <summary>异步操作计数</summary>
        public virtual Int32 AsyncCount { get { return _AsyncCount; } /*set { _AsyncCount = value; }*/ }
        #endregion

        #region 构造
        static SocketBase()
        {
            // 注册接口实现
            NetService.Install();

            // 自动收缩内存,调试状态1分钟一次,非调试状态10分钟一次
            Int32 time = NetHelper.Debug ? 60000 : 600000;
            new TimerX(s => Runtime.ReleaseMemory(), null, time, time, false);

            try
            {
                // 写在另外一个方法里面,保证不会在当前方法编译的时候就报错
                CheckNet21();

                // 回收一次,引发编译NetEventArgs的析构函数
                GC.Collect();
            }
            catch (TypeLoadException ex)
            {
                if (ex.TypeName.Contains("SocketAsyncEventArgs"))
                    throw new XException("NewLife.Net网络库需要.Net2.0 Sp1支持!", ex);
                else
                    throw ex;
            }
        }

        static void CheckNet21()
        {
            var e = new NetEventArgs();
            //(e as IDisposable).Dispose();
            e.AcceptSocket = null;
        }

        /// <summary>实例化</summary>
        public SocketBase() { SetShowEventLog(); }
        #endregion

        #region 释放资源
        /// <summary>关闭网络操作</summary>
        public void Close() { Dispose(); }

        /// <summary>子类重载实现资源释放逻辑</summary>
        /// <param name="disposing">从Dispose调用(释放所有资源)还是析构函数调用(释放非托管资源)</param>
        protected override void OnDispose(bool disposing)
        {
            base.OnDispose(disposing);

            var socket = Socket;
            if (socket == null) return;

            var hand = SafeHandle;
            if (hand != null && !hand.IsClosed)
            {
                // 先用Shutdown禁用Socket(发送未完成发送的数据),再用Close关闭,这是一种比较优雅的关闭Socket的方法
                if (socket.Connected)
                {
                    try
                    {
                        socket.Disconnect(ReuseAddress);
                        socket.Shutdown(SocketShutdown.Both);
                    }
                    catch (SocketException ex2)
                    {
                        if (ex2.SocketErrorCode != SocketError.NotConnected) WriteLog(ex2.ToString());
                    }
                    catch (ObjectDisposedException) { }
                    catch (Exception ex3)
                    {
                        if (NetHelper.Debug) WriteLog(ex3.ToString());
                    }
                }

                socket.Close();
            }
            Socket = null;

            if (_Statistics != null)
            {
                IDisposable dp = _Statistics as IDisposable;
                if (dp != null) dp.Dispose();
                _Statistics = null;
            }
        }

        private static MemberInfoX[] _mSafeHandle;
        /// <summary>SafeHandle字段</summary>
        private static MemberInfoX mSafeHandle
        {
            get
            {
                if (_mSafeHandle != null && _mSafeHandle.Length > 0) return _mSafeHandle[0];

                MemberInfoX pix = FieldInfoX.Create(typeof(Socket), "m_Handle");
                if (pix == null) pix = PropertyInfoX.Create(typeof(Socket), "SafeHandle");
                _mSafeHandle = new MemberInfoX[] { pix };

                return pix;
            }
        }

        /// <summary>安全句柄</summary>
        private SafeHandle SafeHandle
        {
            get
            {
                if (Socket == null) return null;

                var pix = mSafeHandle;
                if (pix != null) return pix.GetValue(Socket) as SafeHandle;

                return null;
            }
        }
        #endregion

        #region 方法
        /// <summary>确保创建基础Socket对象</summary>
        protected virtual void EnsureCreate()
        {
            if (Socket != null || Disposed) return;

            Socket socket = null;
            var addrf = AddressFamily;
            var protocol = ProtocolType;
            switch (ProtocolType)
            {
                case ProtocolType.Tcp:
                    socket = new Socket(addrf, SocketType.Stream, protocol);
                    socket.SetTcpKeepAlive(true);
                    break;
                case ProtocolType.Udp:
                    socket = new Socket(addrf, SocketType.Dgram, protocol);
                    break;
                default:
                    socket = new Socket(addrf, SocketType.Unknown, protocol);
                    break;
            }

            // 设置超时时间
            socket.SendTimeout = 10000;
            socket.ReceiveTimeout = 10000;

            // 加大接收缓冲区,为BufferSize的10倍,但不超过80k
            var bufsize = BufferSize * 10;
            if (bufsize > 80 * 1024) bufsize = 80 * 1024;
            socket.ReceiveBufferSize = bufsize;

            Socket = socket;

            if (_ReuseAddress) ReuseAddress = true;
        }

        /// <summary>绑定本地终结点</summary>
        public virtual void Bind()
        {
            if (Disposed) return;

            EnsureCreate();
            var socket = Socket;
            if (socket != null && !socket.IsBound)
            {
                var ep = new IPEndPoint(Address, Port);
                socket.Bind(ep);

                _LocalUri = _RemoteUri = null;
            }
        }

        /// <summary>开始异步操作。由用户调用,所以该方法在异常时需要回收网络事件</summary>
        /// <param name="callback"></param>
        /// <param name="needBuffer">是否需要缓冲区,默认需要,只有Accept不需要</param>
        internal protected void StartAsync(Func<NetEventArgs, Boolean> callback, Boolean needBuffer = true)
        {
            //! 特别需要注意:外部已经把网络事件转交给当前函数,当前函数要负责,所以退出前要回收
            if (Disposed) return;

            EnsureCreate();
            var socket = Socket;
            if (socket == null) return;

            // Accepti得到的socket不需要绑定
            if (!socket.IsBound) Bind();

            // 如果没有传入网络事件参数,从对象池借用
            var e = Pop();

            e.SetBuffer(needBuffer ? BufferSize : 0);

            Boolean result;
            try
            {
                // 除非同步返回,否则这里以后不允许再碰这个e,用result保存,就是考虑到异步完成,e将具有不可预测性
                result = callback(e);
            }
            catch
            {
                // 如果callback或RaiseCompleteAsync成功,都将由里面的方法负责回收参数;
                // 否则,这里需要自己回收参数。
                Push(e);
                throw;
            }

            // 如果立即返回,则异步处理完成事件
            if (!result)
            {
                // 如果已销毁或取消,则不处理
                if (!e.Cancel)
                    ThreadPool.QueueUserWorkItem(state => RaiseComplete(state as NetEventArgs), e);
                else
                    Push(e);
            }
            else
                // 异步开始,增加一个计数
                Interlocked.Increment(ref _AsyncCount);
        }
        #endregion

        #region 完成事件
        /// <summary>从池里拿一个对象。回收原则参考<see cref="Push"/></summary>
        /// <returns></returns>
        public NetEventArgs Pop()
        {
            var e = NetEventArgs.Pop();

            e.AcceptSocket = Socket;
            e.Socket = this;
            e.Completed += OnCompleted;

            return e;
        }

        /// <summary>把对象归还到池里</summary>
        /// <remarks>
        /// 网络事件参数使用原则:
        /// 1,得到者负责回收(通过方法参数得到)
        /// 2,正常执行时自己负责回收,异常时顶级或OnError负责回收
        /// 3,把回收责任交给别的方法
        /// 4,事件订阅者不允许回收,不允许另作他用
        /// </remarks>
        /// <param name="e"></param>
        public void Push(NetEventArgs e)
        {
            e.Completed -= OnCompleted;
            NetEventArgs.Push(e);
        }

        /// <summary>完成事件,将在工作线程中被调用,不要占用太多时间。</summary>
        public event EventHandler<NetEventArgs> Completed;

        private void OnCompleted(Object sender, NetEventArgs e)
        {
            // 异步完成,减少一个计数
            Interlocked.Decrement(ref _AsyncCount);

            if (e is NetEventArgs)
                RaiseComplete(e as NetEventArgs);
            else
                throw new InvalidOperationException("所有套接字事件参数必须来自于事件参数池Pool!");
        }

        /// <summary>触发完成事件。如果是异步返回,则在IO线程池中执行;如果是同步返回,则在用户线程池中执行。
        /// 可能由工作线程(事件触发)调用,也可能由用户线程通过线程池线程调用。
        /// 作为顶级,将会处理所有异常并调用OnError,其中OnError有能力回收参数e。
        /// </summary>
        /// <param name="e"></param>
        void RaiseComplete(NetEventArgs e)
        {
            if (ShowEventLog && EnableLog) ShowEvent(e);

            try
            {
                if (Completed != null)
                {
                    e.Cancel = false;
                    Completed(this, e);
                    if (e.Cancel)
                    {
                        Push(e);
                        return;
                    }
                }

                // 这里直接处理操作取消
                if (e.SocketError != SocketError.OperationAborted)
                    OnComplete(e);
                else
                    OnError(e, null);
                // 这里可以改造为使用多线程处理事件
                //ThreadPoolCallback(OnCompleted, e);
            }
            catch (Exception ex)
            {
                OnError(e, ex);
                // 都是在线程池线程里面了,不要往外抛出异常
                //throw;
            }
        }

        /// <summary>完成事件分发中心。
        /// 正常执行时OnComplete必须保证回收参数e,异常时RaiseComplete将能够代为回收
        /// </summary>
        /// <param name="e"></param>
        protected virtual void OnComplete(NetEventArgs e) { }
        #endregion

        #region 异步结果处理
        /// <summary>处理异步结果。重点涉及<see cref="NoDelay"/>。内部负责回收参数</summary>
        /// <param name="e">事件参数</param>
        /// <param name="start">开始新异步操作的委托</param>
        /// <param name="process">处理结果的委托</param>
        protected virtual void Process(NetEventArgs e, Func start, Action<NetEventArgs> process)
        {
            if (UseThreadPool)
            {
                ThreadPool.QueueUserWorkItem(s =>
                {
                    OnProcess(e, start, process);
                });
            }
            else
            {
                OnProcess(e, start, process);
            }
        }

        void OnProcess(NetEventArgs e, Func start, Action<NetEventArgs> process)
        {
            // 再次开始,如果异常,记录异常信息,待业务处理完成后再抛出异常
            Exception err = null;
            var isAborted = e.SocketError == SocketError.OperationAborted;
            if (NoDelay && !isAborted && !Disposed)
            {
                try
                {
                    start();
                }
                catch (Exception ex) { err = ex; }
            }

            // Socket错误由各个处理器来处理
            if (e.SocketError != SocketError.Success)
            {
                OnError(e, null);
                return;
            }

            Boolean result = false;
            try
            {
                // 业务处理的任何异常,都将引发Error事件,但不会影响重新建立新的异步操作
                process(e);

                result = true;
            }
            catch (Exception ex)
            {
                try
                {
                    OnError(e, ex);
                }
                catch { }
            }

            // 每次用完都还,保证不出错丢失
            if (result) Push(e);

            // 重新抛出前面捕获的异常
            if (err != null) throw err;

            // 如果不是操作取消,在处理业务完成后再开始异步操作
            if (!NoDelay && !isAborted && !Disposed) start();
        }
        #endregion

        #region 错误处理
        /// <summary>错误发生/断开连接时</summary>
        public event EventHandler<NetEventArgs> Error;

        /// <summary>错误发生/断开连接时。拦截Error事件中的所有异常,不外抛,防止因为Error异常导致多次调用OnError</summary>
        /// <param name="e"></param>
        /// <param name="ex"></param>
        protected void ProcessError(NetEventArgs e, Exception ex)
        {
            if (Error != null)
            {
                if (ex != null)
                {
                    if (e == null) e = Pop();
                    e.Error = ex;
                }

                try
                {
                    Error(this, e);
                }
                catch (Exception ex2)
                {
                    WriteLog(ex2.ToString());
                }
            }

            // 不管有没有外部事件,都要归还网络事件参数,那是对象池的东西,不是你的
            if (e != null) Push(e);
        }

        /// <summary>错误发生时。负责调用Error事件以及回收网络事件参数</summary>
        /// <remarks>OnError除了会调用ProcessError外,还会关闭Socket</remarks>
        /// <param name="e"></param>
        /// <param name="ex"></param>
        protected virtual void OnError(NetEventArgs e, Exception ex)
        {
            try
            {
                ProcessError(e, ex);
            }
            finally
            {
                Close();
            }
        }
        #endregion

        #region 辅助
        /// <summary>检查缓冲区大小</summary>
        /// <param name="e"></param>
        [Conditional("DEBUG")]
        internal protected void CheckBufferSize(NetEventArgs e)
        {
            //Int32 n = e.BytesTransferred;
            //if (n >= e.Buffer.Length || ProtocolType == ProtocolType.Tcp && n >= 1460 || ProtocolType == ProtocolType.Udp && n >= 1464)
            //{
            //    WriteLog("接收的实际数据大小{0}超过了缓冲区大小,需要根据真实MTU调整缓冲区大小以提高效率!", n);
            //}
        }

        private Boolean _ShowEventLog;
        /// <summary>是否显示事件日志</summary>
        public Boolean ShowEventLog { get { return _ShowEventLog; } set { _ShowEventLog = value; } }

        [Conditional("DEBUG")]
        void SetShowEventLog() { ShowEventLog = true; }

        void ShowEvent(NetEventArgs e)
        {
            //WriteLog("Completed[{4}] {0} {1} {2} [{3}]", this, e.LastOperation, e.SocketError, e.BytesTransferred, e.ID);
            var sb = new StringBuilder();
            sb.AppendFormat("[{0}] {1} {2}://{3}", e.ID, e.LastOperation, ProtocolType, LocalEndPoint);
            var ep = e.RemoteIPEndPoint;
            if (ep == null || ep.Address.IsAny()) ep = RemoteEndPoint;
            if ((ep == null || ep.Address.IsAny()) && e.LastOperation == SocketAsyncOperation.Accept && e.AcceptSocket != null) ep = e.AcceptSocket.RemoteEndPoint as IPEndPoint;
            if (ep != null && !ep.Address.IsAny()) sb.AppendFormat("=>{0}", ep);
            //sb.AppendFormat(" {0}", e.LastOperation);
            if (e.SocketError != SocketError.Success) sb.AppendFormat(" {0}", e.SocketError);
            sb.AppendFormat(" [{0}]", e.BytesTransferred);
            WriteLog(sb.ToString());
        }

        /// <summary>已重载。</summary>
        /// <returns></returns>
        public override string ToString()
        {
            //if (RemoteEndPoint == null)
            return String.Format("{0}://{1}", ProtocolType, LocalEndPoint);
            //else
            //    return String.Format("{0}://{1}=>{2}", ProtocolType, LocalEndPoint, RemoteEndPoint);
        }
        #endregion

        #region 统计
        private IStatistics _Statistics;
        /// <summary>统计信息,默认关闭,通过<see cref="IStatistics.Enable"/>打开。</summary>
        public IStatistics Statistics { get { return _Statistics ?? (_Statistics = NetService.Resolve<IStatistics>()); } }

        /// <summary>增加计数</summary>
        protected void IncCounter() { Statistics.Increment(); }
        #endregion
    }
}