v9.10.2019.0101  全面巩固批量Insert/Update/Upsert,支持数据备份、恢复和同步,支持实体列表保存到文件以及加载
大石头 authored at 2019-01-01 13:38:33
7.96 KiB
X
# WebSocket双向通信 ## 概述 NewLife.Core 支持 WebSocket 协议的客户端与服务端实现,基于原有的管道(Pipeline)和 TCP 连接层进行封装。`WebSocketClient` 继承自 `TcpSession`,内置 `WebSocketCodec` 编解码器,自动完成握手、心跳(Ping/Pong)、帧编解码等底层细节,应用层只需处理消息收发。 **命名空间**:`NewLife.Net`(客户端)、`NewLife.Http`(消息类型) **文档地址**:/core/websocket ## 核心类型 | 类型 | 说明 | |------|------| | `WebSocketClient` | WebSocket 客户端,继承 `TcpSession`,支持 ws:// 和 wss:// | | `WebSocketSession` | 服务端 WebSocket 会话,适配现有 `NetServer` 体系 | | `WebSocketCodec` | 管道编解码器,封装 WebSocket 帧格式 | | `WebSocketMessage` | WebSocket 消息帧模型 | | `WebSocketMessageType` | 消息类型枚举(Text/Binary/Ping/Pong/Close) | ## WebSocketMessage 消息帧 ```csharp public class WebSocketMessage : IDisposable { /// <summary>是否最终帧(FIN bit)</summary> public Boolean Fin { get; set; } /// <summary>消息类型(Opcode)</summary> public WebSocketMessageType Type { get; set; } /// <summary>掩码密钥(客户端发送时自动设置)</summary> public Byte[]? MaskKey { get; set; } /// <summary>消息负载数据</summary> public IPacket? Payload { get; set; } /// <summary>关闭状态码(Close帧时有效)</summary> public Int32 CloseStatus { get; set; } /// <summary>关闭原因描述(Close帧时有效)</summary> public String? StatusDescription { get; set; } } ``` ### WebSocketMessageType 枚举 ```csharp public enum WebSocketMessageType { Data = 0, // 附加数据(分片续包) Text = 1, // 文本消息(UTF-8) Binary = 2, // 二进制消息 Close = 8, // 连接关闭 Ping = 9, // 心跳请求 Pong = 10, // 心跳响应 } ``` ## 快速开始 ### 客户端连接与收发 ```csharp using NewLife.Net; var client = new WebSocketClient("ws://127.0.0.1:8080/ws"); // 收到消息 client.Received += (sender, e) => { if (e.Message is WebSocketMessage msg) { if (msg.Type == WebSocketMessageType.Text) Console.WriteLine($"收到文本: {msg.Payload?.ToStr()}"); else if (msg.Type == WebSocketMessageType.Binary) Console.WriteLine($"收到二进制: {msg.Payload?.Total} 字节"); } }; await client.OpenAsync(); // 发送文本 await client.SendTextAsync("Hello Server!"); // 发送二进制 await client.SendBinaryAsync("data".GetBytes()); // 发送结构化消息 var msg2 = new WebSocketMessage { Type = WebSocketMessageType.Text, Payload = "ping".GetBytes() }; await client.SendMessageAsync(msg2); ``` ### 自定义请求头(携带 Token) ```csharp var client = new WebSocketClient("ws://api.example.com/events"); client.SetRequestHeader("Authorization", "Bearer " + accessToken); client.SetRequestHeader("X-App-Version", "2.1.0"); await client.OpenAsync(); ``` ## API 参考 ### WebSocketClient ```csharp public class WebSocketClient : TcpSession { /// <summary>服务器 URI(ws:// 或 wss://)</summary> public Uri Uri { get; set; } /// <summary>心跳间隔。默认 120 秒(2分钟发一次 Ping)</summary> public TimeSpan KeepAlive { get; set; } = TimeSpan.FromSeconds(120); /// <summary>自定义请求头(握手时携带)</summary> public IDictionary<String, String?>? RequestHeaders { get; set; } /// <summary>设置单个请求头</summary> public void SetRequestHeader(String headerName, String? headerValue); /// <summary>发送 WebSocket 消息帧</summary> public Task SendMessageAsync(WebSocketMessage message, CancellationToken cancellationToken = default); /// <summary>发送文本消息(UTF-8)</summary> public Task SendTextAsync(String text, CancellationToken cancellationToken = default); /// <summary>发送文本消息(字节)</summary> public Task SendTextAsync(Byte[] data, CancellationToken cancellationToken = default); /// <summary>发送二进制消息</summary> public Task SendBinaryAsync(IPacket data, CancellationToken cancellationToken = default); /// <summary>握手(建立 WebSocket 升级请求),Open 阶段自动调用</summary> public static Boolean Handshake(ISocketClient client, Uri uri); } ``` ### WebSocketCodec(管道处理器) ```csharp public class WebSocketCodec : Handler { /// <summary> /// 用户数据包模式。 /// false(默认):上层 Received 事件的 e.Message 为 WebSocketMessage 对象; /// true:自动提取 Payload,e.Packet 为原始数据,忽略帧类型 /// </summary> public Boolean UserPacket { get; set; } } ``` ## 握手流程(HTTP Upgrade) ``` 客户端 服务端 | TCP建连 | |>| | GET /ws HTTP/1.1 | | Upgrade: websocket | | Connection: Upgrade | | Sec-WebSocket-Key: <随机Base64> | |>| | HTTP/1.1 101 | | Sec-WebSocket-Accept: <哈希>| |<| | [WebSocket双向帧传输] | ``` 握手完全由 `WebSocketCodec.Open` 和 `WebSocketSession.HandeShake` 自动处理,无需应用层介入。 ## 使用场景 ### 场景一:实时消息推送(服务端主动推送) ```csharp // 服务端(继承 NetServer) public class PushServer : NetServer<WebSocketSession> { protected override void OnStart() { base.OnStart(); // 启动定时推送 TimerX.Delay(DoPush, 1000); } private void DoPush(Object state) { var msg = new WebSocketMessage { Type = WebSocketMessageType.Text, Payload = $"{{\"time\":\"{DateTime.Now:HH:mm:ss}\"}}".GetBytes(), }; foreach (var session in Sessions.Values.OfType<WebSocketSession>()) session.Send(msg.ToPacket()); } } ``` ### 场景二:客户端订阅实时报价 ```csharp var client = new WebSocketClient("wss://stream.example.com/quotes"); client.KeepAlive = TimeSpan.FromSeconds(30); client.Received += (sender, e) => { if (e.Message is WebSocketMessage msg && msg.Type == WebSocketMessageType.Text) { var quote = JsonHelper.ToObject<Quote>(msg.Payload?.ToStr()); UpdateUI(quote); } }; await client.OpenAsync(); // 订阅频道 await client.SendTextAsync("{\"cmd\":\"subscribe\",\"channel\":\"BTCUSDT\"}"); ``` ### 场景三:双向 RPC 通道 ```csharp // 客户端发送请求 var req = new { id = Interlocked.Increment(ref _seq), cmd = "getUserInfo", uid = 12345, }; await client.SendTextAsync(JsonHelper.ToJson(req)); // 服务端响应 server.Received += (sender, e) => { if (e.Message is WebSocketMessage msg && msg.Type == WebSocketMessageType.Text) { var request = JsonHelper.ToObject<RpcRequest>(msg.Payload?.ToStr()); var result = ProcessRpc(request); if (e.Session is WebSocketSession session) session.Send(JsonHelper.ToJson(result).GetBytes()); } }; ``` ## 注意事项 - **Payload 零拷贝,生命周期短**:`WebSocketMessage.Payload` 直接切片自接收缓冲区,不能跨异步调用持有。需要延迟使用时,先 `payload.ToArray()` 深拷贝。 - **掩码破坏性操作**:客户端服务端的帧带掩码,解帧时原地 XOR 解码,不要对同一缓冲重复解析。 - **`KeepAlive` 默认 120 秒**:若服务器配置了更短的连接超时,应缩小 `KeepAlive` 以防断连。 - **wss:// 需要 TLS 支持**:在 .NET Framework 上需要确保 TLS 可用(`SslStream`),且服务器证书有效。 - **服务端会话隔离**:每条连接对应一个 `WebSocketSession` 实例,不要在多个 Session 之间共享状态,否则需加锁。 - **消息分片**:当前实现中 `Fin=false` 的分片帧会被丢弃,仅处理完整帧(FIN=1);应用层发送时无需分片。