EventHub在订阅之前,支持注册topic和注销topic
石头 authored at 2025-12-20 17:05:02
11.89 KiB
X
using System.Text;
using NewLife.Collections;
using NewLife.Data;
using NewLife.Model;

namespace NewLife.Net;

/// <summary>远程通信Socket,仅具有收发功能</summary>
/// <remarks>
/// <para>提供基于网络连接的双向通信能力,专注于数据收发操作。</para>
/// <para>设计理念:轻量化接口,统一TCP/UDP处理模式,支持异步和事件驱动编程。</para>
/// <para>继承自 <see cref="ISocket"/> 和 <see cref="IExtend"/>,具备完整的Socket基础功能和扩展能力。</para>
/// </remarks>
public interface ISocketRemote : ISocket, IExtend
{
    #region 属性
    /// <summary>会话标识符</summary>
    /// <remarks>用于在多会话环境中唯一标识当前连接,便于日志追踪和会话管理</remarks>
    Int32 ID { get; }

    /// <summary>远程终结点地址</summary>
    /// <remarks>
    /// <para>发送操作的目标地址,同时也是接收数据的来源标识。</para>
    /// <para>对于TCP连接,该地址在建立连接后固定;对于UDP,可动态设置不同的目标。</para>
    /// </remarks>
    NetUri Remote { get; set; }

    /// <summary>最后一次通信时间</summary>
    /// <remarks>
    /// <para>记录最近一次成功收发数据的时间戳,用于会话活跃度检测。</para>
    /// <para>包括发送和接收操作,可用于实现超时断开、心跳检测等机制。</para>
    /// </remarks>
    DateTime LastTime { get; }
    #endregion

    #region 数据发送
    /// <summary>发送数据包</summary>
    /// <param name="data">要发送的数据包</param>
    /// <returns>实际发送的字节数,失败时返回负数</returns>
    /// <remarks>目标地址由 <see cref="Remote"/> 属性决定</remarks>
    Int32 Send(IPacket data);

    /// <summary>发送字节数组</summary>
    /// <param name="data">字节数组</param>
    /// <param name="offset">数据起始偏移量</param>
    /// <param name="count">发送字节数,-1表示发送从偏移量开始的所有数据</param>
    /// <returns>实际发送的字节数,失败时返回负数</returns>
    /// <remarks>目标地址由 <see cref="Remote"/> 属性决定</remarks>
    Int32 Send(Byte[] data, Int32 offset = 0, Int32 count = -1);

    /// <summary>发送数组段</summary>
    /// <param name="data">数组段</param>
    /// <returns>实际发送的字节数,失败时返回负数</returns>
    /// <remarks>目标地址由 <see cref="Remote"/> 属性决定</remarks>
    Int32 Send(ArraySegment<Byte> data);

    /// <summary>发送只读内存段</summary>
    /// <param name="data">只读内存段</param>
    /// <returns>实际发送的字节数,失败时返回负数</returns>
    /// <remarks>
    /// <para>目标地址由 <see cref="Remote"/> 属性决定。</para>
    /// <para>高性能API,避免不必要的内存拷贝,适用于.NET Core/.NET 5+环境。</para>
    /// </remarks>
    Int32 Send(ReadOnlySpan<Byte> data);
    #endregion

    #region 数据接收
    /// <summary>同步接收数据包</summary>
    /// <returns>接收到的数据包,无数据时返回null</returns>
    /// <remarks>
    /// <para>该方法会阻塞当前线程直到有数据到达或连接关闭。</para>
    /// <para>返回的数据包需要在使用完毕后正确释放,避免内存泄漏。</para>
    /// </remarks>
    IOwnerPacket? Receive();

    /// <summary>异步接收数据包</summary>
    /// <param name="cancellationToken">取消令牌</param>
    /// <returns>接收到的数据包,无数据时返回null</returns>
    /// <remarks>
    /// <para>推荐的异步接收方式,不会阻塞调用线程。</para>
    /// <para>返回的数据包需要在使用完毕后正确释放,避免内存泄漏。</para>
    /// </remarks>
    Task<IOwnerPacket?> ReceiveAsync(CancellationToken cancellationToken = default);

    /// <summary>数据接收事件</summary>
    /// <remarks>
    /// <para>当有新数据到达时触发,适用于事件驱动编程模式。</para>
    /// <para>事件参数包含原始数据包和经过管道处理后的消息对象。</para>
    /// </remarks>
    event EventHandler<ReceivedEventArgs> Received;
    #endregion

    #region 消息处理
    /// <summary>异步发送消息并等待响应</summary>
    /// <param name="message">要发送的消息对象</param>
    /// <returns>响应消息对象</returns>
    /// <remarks>
    /// <para>高级消息通信API,支持请求-响应模式。</para>
    /// <para>消息将经过 <see cref="ISocket.Pipeline"/> 编码后发送,响应经解码后返回。</para>
    /// </remarks>
    Task<Object> SendMessageAsync(Object message);

    /// <summary>异步发送消息并等待响应</summary>
    /// <param name="message">要发送的消息对象</param>
    /// <param name="cancellationToken">取消令牌</param>
    /// <returns>响应消息对象</returns>
    /// <remarks>
    /// <para>高级消息通信API,支持请求-响应模式和超时控制。</para>
    /// <para>消息将经过 <see cref="ISocket.Pipeline"/> 编码后发送,响应经解码后返回。</para>
    /// </remarks>
    Task<Object> SendMessageAsync(Object message, CancellationToken cancellationToken);

    /// <summary>发送消息,不等待响应</summary>
    /// <param name="message">要发送的消息对象</param>
    /// <returns>实际发送的字节数,失败时返回负数</returns>
    /// <remarks>
    /// <para>单向消息发送,适用于通知、推送等场景。</para>
    /// <para>消息将经过 <see cref="ISocket.Pipeline"/> 编码后发送。</para>
    /// </remarks>
    Int32 SendMessage(Object message);

    /// <summary>处理接收到的消息数据帧</summary>
    /// <param name="data">消息数据帧</param>
    /// <remarks>
    /// <para>供内部管道处理器调用,用于处理解码后的消息。</para>
    /// <para>通常由框架内部调用,应用代码一般无需直接使用。</para>
    /// </remarks>
    void Process(IData data);
    #endregion
}

/// <summary>Socket远程通信扩展方法</summary>
/// <remarks>
/// <para>提供便捷的数据发送、接收和消息处理功能。</para>
/// <para>包含字符串发送、流传输、文件传输等高级功能。</para>
/// </remarks>
public static class SocketRemoteHelper
{
    #region 扩展发送方法
    /// <summary>发送数据流</summary>
    /// <param name="session">Socket会话</param>
    /// <param name="stream">数据流</param>
    /// <param name="bufferSize"></param>
    /// <returns>实际发送的字节数</returns>
    /// <remarks>
    /// <para>以8KB缓冲区分块读取并发送流数据,适用于大文件传输。</para>
    /// <para>发送过程中如果出现错误会立即停止并返回已发送的字节数。</para>
    /// </remarks>
    public static Int32 Send(this ISocketRemote session, Stream stream, Int32 bufferSize = 8192)
    {
        var totalSent = 0;
        var buffer = Pool.Shared.Rent(bufferSize);

        try
        {
            while (true)
            {
                var bytesRead = stream.Read(buffer, 0, buffer.Length);
                if (bytesRead <= 0) break;

                var sent = session.Send(buffer, 0, bytesRead);
                if (sent < 0) break;

                totalSent += sent;
                if (bytesRead < buffer.Length) break;
            }
        }
        finally
        {
            Pool.Shared.Return(buffer);
        }

        return totalSent;
    }

    /// <summary>发送字符串</summary>
    /// <param name="session">Socket会话</param>
    /// <param name="message">要发送的字符串</param>
    /// <param name="encoding">文本编码,null表示UTF-8</param>
    /// <returns>实际发送的字节数</returns>
    public static Int32 Send(this ISocketRemote session, String message, Encoding? encoding = null)
    {
        if (String.IsNullOrEmpty(message))
            return session.Send(Pool.Empty);

        encoding ??= Encoding.UTF8;
        return session.Send(encoding.GetBytes(message));
    }
    #endregion

    #region 扩展接收方法
    /// <summary>接收字符串数据</summary>
    /// <param name="session">Socket会话</param>
    /// <param name="encoding">文本编码,null表示UTF-8</param>
    /// <returns>接收到的字符串,无数据时返回空字符串</returns>
    /// <remarks>该方法会阻塞当前线程直到有数据到达</remarks>
    public static String ReceiveString(this ISocketRemote session, Encoding? encoding = null)
    {
        using var packet = session.Receive();
        if (packet == null || packet.Length == 0) return String.Empty;

        return packet.ToStr(encoding ?? Encoding.UTF8);
    }
    #endregion

    #region 管道处理器扩展
    /// <summary>添加类型化处理器</summary>
    /// <typeparam name="THandler">处理器类型</typeparam>
    /// <param name="session">Socket会话</param>
    /// <remarks>处理器必须有无参构造函数</remarks>
    public static void Add<THandler>(this ISocket session) where THandler : IPipelineHandler, new() => GetPipeline(session).Add(new THandler());

    /// <summary>添加处理器实例</summary>
    /// <param name="session">Socket会话</param>
    /// <param name="handler">处理器实例</param>
    public static void Add(this ISocket session, IPipelineHandler handler) => GetPipeline(session).Add(handler);

#pragma warning disable CS0618 // 类型或成员已过时
    /// <summary>添加处理器实例</summary>
    /// <param name="session">Socket会话</param>
    /// <param name="handler">处理器实例</param>
    [Obsolete("=>IPipelineHandler")]
    public static void Add(this ISocket session, IHandler handler) => GetPipeline(session).Add(handler);
#pragma warning restore CS0618 // 类型或成员已过时

    /// <summary>获取或创建消息管道</summary>
    private static IPipeline GetPipeline(ISocket session) => session.Pipeline ??= new Pipeline();
    #endregion

    #region 高级消息传输
    /// <summary>分块发送数据流为多个消息包</summary>
    /// <param name="session">Socket会话</param>
    /// <param name="stream">数据流</param>
    /// <returns>发送的消息包数量</returns>
    /// <remarks>
    /// <para>将大数据流切分为多个消息包发送,接收方可按顺序重组。</para>
    /// <para>每个消息包会添加标准4字节消息头,由StandardCodec处理器负责编解码。</para>
    /// </remarks>
    public static Int32 SendMessages(this ISocketRemote session, Stream stream)
    {
        var messageCount = 0;
        var bufferSize = SocketSetting.Current.BufferSize;
        var buffer = Pool.Shared.Rent(bufferSize);

        try
        {
            while (true)
            {
                // 预留4字节消息头空间
                var bytesRead = stream.Read(buffer, 4, bufferSize - 4);
                if (bytesRead <= 0) break;

                // StandardCodec将在头部添加4字节长度信息
                var packet = new ArrayPacket(buffer, 4, bytesRead);
                session.SendMessage(packet);
                messageCount++;
            }
        }
        finally
        {
            Pool.Shared.Return(buffer);
        }

        return messageCount;
    }

    /// <summary>以消息包形式发送文件</summary>
    /// <param name="session">Socket会话</param>
    /// <param name="filePath">文件路径</param>
    /// <param name="compressed">是否启用压缩</param>
    /// <returns>发送的消息包数量</returns>
    /// <remarks>
    /// <para>自动处理文件读取和分块传输,支持可选的压缩功能。</para>
    /// <para>接收方需要按相同顺序重组消息包以还原完整文件。</para>
    /// </remarks>
    public static Int32 SendFile(this ISocketRemote session, String filePath, Boolean compressed = false)
    {
        var messageCount = 0;
        filePath.AsFile().OpenRead(compressed, stream =>
        {
            messageCount = session.SendMessages(stream);
        });
        return messageCount;
    }
    #endregion
}