[feat]v10.10 新增WebSocketClient,通信测试通过大石头 authored at 2024-04-27 20:56:50
diff --git "a/Doc/WebSocket\345\217\214\345\220\221\351\200\232\344\277\241.pcapng" "b/Doc/WebSocket\345\217\214\345\220\221\351\200\232\344\277\241.pcapng"
new file mode 100644
index 0000000..5a73b41
Binary files /dev/null and "b/Doc/WebSocket\345\217\214\345\220\221\351\200\232\344\277\241.pcapng" differ
diff --git a/NewLife.Core/Http/HttpRequest.cs b/NewLife.Core/Http/HttpRequest.cs
index 3c6bb85..800b283 100644
--- a/NewLife.Core/Http/HttpRequest.cs
+++ b/NewLife.Core/Http/HttpRequest.cs
@@ -76,8 +76,7 @@ public class HttpRequest : HttpBase
if (Method.IsNullOrEmpty()) Method = length > 0 ? "POST" : "GET";
// 分解主机和资源
- var uri = RequestUri;
- if (uri == null) uri = new Uri("/");
+ var uri = RequestUri ?? new Uri("/");
if (Host.IsNullOrEmpty())
{
@@ -101,7 +100,7 @@ public class HttpRequest : HttpBase
// 构建头部
var sb = Pool.StringBuilder.Get();
- sb.AppendFormat("{0} {1} HTTP/{2}\r\n", Method, uri, Version);
+ sb.AppendFormat("{0} {1} HTTP/{2}\r\n", Method, uri.PathAndQuery, Version);
sb.AppendFormat("Host: {0}\r\n", Host);
// 内容长度
diff --git a/NewLife.Core/Http/HttpSession.cs b/NewLife.Core/Http/HttpSession.cs
index c4b2a35..8085f6c 100644
--- a/NewLife.Core/Http/HttpSession.cs
+++ b/NewLife.Core/Http/HttpSession.cs
@@ -1,4 +1,5 @@
-using System.Net;
+using System.Drawing;
+using System.Net;
using System.Web;
using NewLife.Data;
using NewLife.Log;
@@ -153,28 +154,34 @@ public class HttpSession : INetHandler
using var span = _session.Host.Tracer?.NewSpan(path);
if (span != null)
{
- // 解析上游请求链路
+ span.Tag = $"{_session.Remote.EndPoint} {request.Method} {request.RequestUri}";
span.Detach(request.Headers);
+ span.Value = request.ContentLength;
if (span is DefaultSpan ds && ds.TraceFlag > 0)
{
- var tag = $"{_session.Remote.EndPoint} {request.Method} {request.RequestUri.OriginalString}";
-
+ var flag = false;
if (request.BodyLength > 0 &&
request.Body != null &&
request.Body.Total < 8 * 1024 &&
request.ContentType.EqualIgnoreCase(TagTypes))
{
- tag += "\r\n" + request.Body.ToStr(null, 0, 1024);
+ span.AppendTag("\r\n<=\r\n" + request.Body.ToStr(null, 0, 1024));
+ flag = true;
}
- if (tag.Length < 500)
+ if (span.Tag.Length < 500)
{
+ if (!flag) span.AppendTag("\r\n<=");
var vs = request.Headers.Where(e => !e.Key.EqualIgnoreCase(ExcludeHeaders)).ToDictionary(e => e.Key, e => e.Value + "");
- tag += "\r\n" + vs.Join("\r\n", e => $"{e.Key}: {e.Value}");
+ span.AppendTag("\r\n" + vs.Join(Environment.NewLine, e => $"{e.Key}:{e.Value}"));
+ }
+ else if (!flag)
+ {
+ span.AppendTag("\r\n<=\r\n");
+ span.AppendTag($"ContentLength: {request.ContentLength}\r\n");
+ span.AppendTag($"ContentType: {request.ContentType}");
}
-
- span.SetTag(tag);
}
}
@@ -199,6 +206,16 @@ public class HttpSession : INetHandler
_websocket ??= WebSocket.Handshake(context);
handler.ProcessRequest(context);
+
+ // 根据状态码识别异常
+ if (span != null)
+ {
+ var res = context.Response;
+ span.Value += res.ContentLength;
+ var code = res.StatusCode;
+ if (code == HttpStatusCode.BadRequest || code > HttpStatusCode.NotFound)
+ span.SetError(new HttpRequestException($"Http Error {(Int32)code} {code}"), null);
+ }
}
catch (Exception ex)
{
diff --git a/NewLife.Core/Log/ISpan.cs b/NewLife.Core/Log/ISpan.cs
index 33593f8..3a198e6 100644
--- a/NewLife.Core/Log/ISpan.cs
+++ b/NewLife.Core/Log/ISpan.cs
@@ -311,22 +311,22 @@ public static class SpanExtension
return request;
}
- ///// <summary>把片段信息附加到http请求头上</summary>
- ///// <param name="span">片段</param>
- ///// <param name="headers">http请求头</param>
- ///// <returns></returns>
- //public static HttpRequestHeaders Attach(this ISpan span, HttpRequestHeaders headers)
- //{
- // if (span == null || headers == null) return headers;
-
- // // 注入参数名
- // var name = GetAttachParameter(span);
- // if (name.IsNullOrEmpty()) return headers;
-
- // if (!headers.Contains(name)) headers.Add(name, span.ToString());
-
- // return headers;
- //}
+ /// <summary>把片段信息附加到http请求头上</summary>
+ /// <param name="span">片段</param>
+ /// <param name="headers">http请求头</param>
+ /// <returns></returns>
+ public static IDictionary<String, String> Attach(this ISpan span, IDictionary<String, String> headers)
+ {
+ //if (span == null || headers == null) return headers;
+
+ // 注入参数名
+ var name = GetAttachParameter(span);
+ if (name.IsNullOrEmpty()) return headers;
+
+ if (!headers.ContainsKey(name)) headers.Add(name, span.ToString()!);
+
+ return headers;
+ }
/// <summary>把片段信息附加到http请求头上</summary>
/// <param name="span">片段</param>
diff --git a/NewLife.Core/Net/NetHelper.cs b/NewLife.Core/Net/NetHelper.cs
index e0b5745..f82d376 100644
--- a/NewLife.Core/Net/NetHelper.cs
+++ b/NewLife.Core/Net/NetHelper.cs
@@ -657,6 +657,20 @@ public static class NetHelper
};
}
+ /// <summary>根据Uri创建客户端,主要支持Http/WebSocket</summary>
+ /// <param name="uri"></param>
+ /// <returns></returns>
+ /// <exception cref="NotSupportedException"></exception>
+ public static ISocketClient CreateRemote(this Uri uri)
+ {
+ return uri.Scheme switch
+ {
+ "ws" => new WebSocketClient(uri) { SslProtocol = SslProtocols.Tls12 },
+ "wss" => new WebSocketClient(uri),
+ _ => throw new NotSupportedException($"The {uri.Scheme} protocol is not supported"),
+ };
+ }
+
internal static Socket CreateTcp(Boolean ipv4 = true) => new(ipv4 ? AddressFamily.InterNetwork : AddressFamily.InterNetworkV6, SocketType.Stream, ProtocolType.Tcp);
internal static Socket CreateUdp(Boolean ipv4 = true) => new(ipv4 ? AddressFamily.InterNetwork : AddressFamily.InterNetworkV6, SocketType.Dgram, ProtocolType.Udp);
diff --git a/NewLife.Core/Net/SessionBase.cs b/NewLife.Core/Net/SessionBase.cs
index 5e1794d..4ddee0f 100644
--- a/NewLife.Core/Net/SessionBase.cs
+++ b/NewLife.Core/Net/SessionBase.cs
@@ -751,7 +751,7 @@ public abstract class SessionBase : DisposeBase, ISocketClient, ITransport, ILog
#region 日志
/// <summary>日志前缀</summary>
- public virtual String LogPrefix { get; set; }
+ public virtual String? LogPrefix { get; set; }
/// <summary>日志对象。禁止设为空对象</summary>
public ILog Log { get; set; } = Logger.Null;
diff --git a/NewLife.Core/Net/TcpSession.cs b/NewLife.Core/Net/TcpSession.cs
index 8d942be..e3e14de 100644
--- a/NewLife.Core/Net/TcpSession.cs
+++ b/NewLife.Core/Net/TcpSession.cs
@@ -480,22 +480,18 @@ public class TcpSession : SessionBase, ISocketSession
#endregion 自动重连
#region 辅助
-
- private String? _LogPrefix;
-
/// <summary>日志前缀</summary>
- public override String LogPrefix
+ public override String? LogPrefix
{
get
{
- if (_LogPrefix == null)
- {
- var name = _Server == null ? "" : _Server.Name;
- _LogPrefix = $"{name}[{ID}].";
- }
- return _LogPrefix;
+ var pf = base.LogPrefix;
+ if (pf == null && _Server != null)
+ pf = base.LogPrefix = $"{_Server.Name}[{ID}].";
+
+ return pf;
}
- set { _LogPrefix = value; }
+ set { base.LogPrefix = value; }
}
/// <summary>已重载。</summary>
@@ -509,6 +505,5 @@ public class TcpSession : SessionBase, ISocketSession
return _Server == null ? $"{local}=>{remote}" : $"{local}<={remote}";
}
-
- #endregion 辅助
+ #endregion
}
\ No newline at end of file
diff --git a/NewLife.Core/Net/WebSocketClient.cs b/NewLife.Core/Net/WebSocketClient.cs
index 1f7505d..962fc65 100644
--- a/NewLife.Core/Net/WebSocketClient.cs
+++ b/NewLife.Core/Net/WebSocketClient.cs
@@ -1,12 +1,193 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
+using System.Net;
+using System.Security.Cryptography;
+using NewLife.Data;
+using NewLife.Http;
+using NewLife.Log;
+using NewLife.Security;
+#if !NET45
+using TaskEx = System.Threading.Tasks.Task;
+#endif
namespace NewLife.Net;
/// <summary>WebSocket客户端</summary>
public class WebSocketClient : TcpSession
{
+ #region 属性
+ /// <summary>资源地址</summary>
+ public Uri Uri { get; set; } = null!;
+
+ private String? _Key;
+ #endregion
+
+ #region 构造
+ /// <summary>实例化</summary>
+ public WebSocketClient() { }
+
+ /// <summary>实例化</summary>
+ /// <param name="uri"></param>
+ public WebSocketClient(Uri uri)
+ {
+ Uri = uri;
+
+ Remote = new NetUri(uri.ToString());
+ }
+
+ /// <summary>实例化</summary>
+ /// <param name="url"></param>
+ public WebSocketClient(String url) : this(new Uri(url)) { }
+ #endregion
+
+ /// <summary>打开连接,建立WebSocket请求</summary>
+ /// <returns></returns>
+ protected override Boolean OnOpen()
+ {
+ var remote = Remote;
+ if (remote == null || remote.Address.IsAny() || remote.Port == 0)
+ {
+ remote = Remote = new NetUri(Uri.ToString());
+ }
+
+ if (!base.OnOpen()) return false;
+
+ // 连接必须是ws/wss协议
+ if (remote.Type != NetType.WebSocket) return false;
+
+ //todo 建立WebSocket请求
+ var request = new HttpRequest
+ {
+ Method = "GET",
+ RequestUri = Uri
+ };
+ request.Headers["Connection"] = "Upgrade";
+ request.Headers["Upgrade"] = "websocket";
+ request.Headers["Sec-WebSocket-Version"] = "13";
+
+ _Key = Rand.NextBytes(16).ToBase64();
+ request.Headers["Sec-WebSocket-Key"] = _Key;
+
+ // 注入链路跟踪标记
+ DefaultSpan.Current?.Attach(request.Headers);
+
+ // 设置为激活
+ Active = true;
+
+ using var span = Tracer?.NewSpan($"net:{Name}:WebSocket", Uri + "");
+ try
+ {
+ // 发送请求
+ var req = request.Build();
+ Send(req);
+
+ // 接收响应
+ var rs = Receive();
+ if (rs == null || rs.Count == 0) return false;
+
+ // 解析响应
+ var res = new HttpResponse();
+ if (!res.Parse(rs)) return false;
+
+ //if (res.StatusCode != HttpStatusCode.OK) throw new Exception($"{(Int32)res.StatusCode} {res.StatusDescription}");
+ if (res.StatusCode != HttpStatusCode.SwitchingProtocols) throw new Exception("WebSocket握手失败!" + res.StatusDescription);
+
+ // 检查响应头
+ if (!res.Headers.TryGetValue("Sec-WebSocket-Accept", out var accept) ||
+ accept != SHA1.Create().ComputeHash((_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").GetBytes()).ToBase64())
+ throw new Exception("WebSocket握手失败!");
+ }
+ catch (Exception ex)
+ {
+ span?.SetError(ex, null);
+ WriteLog("WebSocket握手失败!" + ex.Message);
+
+ Close("WebSocket");
+ Dispose();
+
+ return false;
+ }
+
+ Active = false;
+
+ return true;
+ }
+
+ /// <summary>接收WebSocket消息</summary>
+ /// <param name="cancellationToken"></param>
+ /// <returns></returns>
+ public virtual async Task<WebSocketMessage?> ReceiveMessageAsync(CancellationToken cancellationToken = default)
+ {
+ var rs = await base.ReceiveAsync(cancellationToken);
+ if (rs == null) return null;
+
+ var msg = new WebSocketMessage();
+ if (!msg.Read(rs)) return null;
+
+ return msg;
+ }
+
+ /// <summary>发送消息</summary>
+ /// <param name="message"></param>
+ /// <param name="cancellationToken"></param>
+ /// <returns></returns>
+ public Task SendMessageAsync(WebSocketMessage message, CancellationToken cancellationToken = default)
+ {
+ var pk = message.ToPacket();
+ Send(pk);
+ //SendMessage(message);
+
+ return TaskEx.CompletedTask;
+ }
+
+ /// <summary>发送文本</summary>
+ /// <param name="data"></param>
+ /// <param name="cancellationToken"></param>
+ /// <returns></returns>
+ public Task SendTextAsync(Packet data, CancellationToken cancellationToken = default)
+ {
+ var msg = new WebSocketMessage
+ {
+ Type = WebSocketMessageType.Text,
+ Payload = data,
+ };
+
+ return SendMessageAsync(msg);
+ }
+
+ /// <summary>发送文本</summary>
+ /// <param name="text"></param>
+ /// <param name="cancellationToken"></param>
+ /// <returns></returns>
+ public Task SendTextAsync(String text, CancellationToken cancellationToken = default) => SendTextAsync(text.GetBytes(), cancellationToken);
+
+ /// <summary>发送二进制数据</summary>
+ /// <param name="data"></param>
+ /// <param name="cancellationToken"></param>
+ /// <returns></returns>
+ public Task SendBinaryAsync(Packet data, CancellationToken cancellationToken = default)
+ {
+ var msg = new WebSocketMessage
+ {
+ Type = WebSocketMessageType.Binary,
+ Payload = data,
+ };
+
+ return SendMessageAsync(msg);
+ }
+
+ /// <summary>发送关闭</summary>
+ /// <param name="closeStatus"></param>
+ /// <param name="statusDescription"></param>
+ /// <param name="cancellationToken"></param>
+ /// <returns></returns>
+ public Task CloseAsync(Int32 closeStatus, String? statusDescription = null, CancellationToken cancellationToken = default)
+ {
+ var msg = new WebSocketMessage
+ {
+ Type = WebSocketMessageType.Close,
+ CloseStatus = closeStatus,
+ StatusDescription = statusDescription,
+ };
+
+ return SendMessageAsync(msg);
+ }
}
diff --git a/Samples/Zero.HttpServer/ClientTest.cs b/Samples/Zero.HttpServer/ClientTest.cs
index 42eb0e0..486e431 100644
--- a/Samples/Zero.HttpServer/ClientTest.cs
+++ b/Samples/Zero.HttpServer/ClientTest.cs
@@ -3,6 +3,7 @@ using System.Text;
using NewLife;
using NewLife.Data;
using NewLife.Log;
+using NewLife.Net;
using NewLife.Remoting;
using NewLife.Serialization;
@@ -27,6 +28,7 @@ static class ClientTest
// Api接口请求
var http = new ApiHttpClient("http://127.0.0.5:8080");
+ http.Log = XTrace.Log;
// 请求接口,返回data部分
var rs = await http.GetAsync<String>("/user", new { act = "Delete", uid = 1234 });
@@ -39,9 +41,9 @@ static class ClientTest
client.Dispose();
}
- public static async Task WebSocketClientTest()
+ public static async Task WebSocketTest()
{
- await Task.Delay(5_000);
+ await Task.Delay(3_000);
XTrace.WriteLine("");
XTrace.WriteLine("WebSocket客户端开始连接!");
@@ -59,4 +61,32 @@ static class ClientTest
client.Dispose();
}
+
+ public static async Task WebSocketClientTest()
+ {
+ await Task.Delay(5_000);
+ XTrace.WriteLine("");
+ XTrace.WriteLine("WebSocketClient开始连接!");
+
+ var client = new WebSocketClient("ws://127.0.0.7:8080/ws")
+ {
+ Name = "小ws客户",
+ Log = XTrace.Log
+ };
+ if (client is TcpSession tcp) tcp.MaxAsync = 0;
+
+ await client.SendTextAsync("Hello NewLife");
+
+ var rs = await client.ReceiveMessageAsync(default);
+ XTrace.WriteLine(rs.Payload.ToStr());
+
+ // 关闭连接
+ await client.CloseAsync(1000, "通信完成", default);
+ XTrace.WriteLine("Close");
+
+ rs = await client.ReceiveMessageAsync(default);
+ XTrace.WriteLine("Close [{0}] {1}", rs.CloseStatus, rs.StatusDescription);
+
+ client.Dispose();
+ }
}
diff --git a/Samples/Zero.HttpServer/Program.cs b/Samples/Zero.HttpServer/Program.cs
index 5fc8fc6..346c785 100644
--- a/Samples/Zero.HttpServer/Program.cs
+++ b/Samples/Zero.HttpServer/Program.cs
@@ -24,9 +24,13 @@ services.AddSingleton<ICacheProvider, RedisCacheProvider>();
// 创建Http服务器
var server = new HttpServer
{
+ Name = "新生命Http服务器",
Port = 8080,
+
Log = XTrace.Log,
- //SessionLog = XTrace.Log,
+#if DEBUG
+ SessionLog = XTrace.Log,
+#endif
Tracer = star.Tracer,
};
@@ -48,12 +52,14 @@ server.Map("/my", new MyHttpHandler());
server.Map("/ws", new WebSocketHandler());
server.Start();
+XTrace.WriteLine("服务端启动完成!");
// 注册到星尘,非必须
await star.Service?.RegisterAsync("Zero.HttpServer", $"http://*:{server.Port}");
// 客户端测试,非服务端代码,正式使用时请注释掉
-_ = Task.Run(ClientTest.HttpClientTest);
+//_ = Task.Run(ClientTest.HttpClientTest);
+//_ = Task.Run(ClientTest.WebSocketTest);
_ = Task.Run(ClientTest.WebSocketClientTest);
// 异步阻塞,友好退出