NewLife/X

[feat]v10.10 新增WebSocketClient,通信测试通过
大石头 authored at 2024-04-27 20:56:50
4dbfb2c
Tree
1 Parent(s) a1e5873
Summary: 10 changed files with 293 additions and 51 deletions.
Added +0 -0
Modified +2 -3
Modified +26 -9
Modified +16 -16
Modified +14 -0
Modified +1 -1
Modified +8 -13
Modified +186 -5
Modified +32 -2
Modified +8 -2
Added +0 -0
diff --git "a/Doc/WebSocket\345\217\214\345\220\221\351\200\232\344\277\241.pcapng" "b/Doc/WebSocket\345\217\214\345\220\221\351\200\232\344\277\241.pcapng"
new file mode 100644
index 0000000..5a73b41
Binary files /dev/null and "b/Doc/WebSocket\345\217\214\345\220\221\351\200\232\344\277\241.pcapng" differ
Modified +2 -3
diff --git a/NewLife.Core/Http/HttpRequest.cs b/NewLife.Core/Http/HttpRequest.cs
index 3c6bb85..800b283 100644
--- a/NewLife.Core/Http/HttpRequest.cs
+++ b/NewLife.Core/Http/HttpRequest.cs
@@ -76,8 +76,7 @@ public class HttpRequest : HttpBase
         if (Method.IsNullOrEmpty()) Method = length > 0 ? "POST" : "GET";
 
         // 分解主机和资源
-        var uri = RequestUri;
-        if (uri == null) uri = new Uri("/");
+        var uri = RequestUri ?? new Uri("/");
 
         if (Host.IsNullOrEmpty())
         {
@@ -101,7 +100,7 @@ public class HttpRequest : HttpBase
 
         // 构建头部
         var sb = Pool.StringBuilder.Get();
-        sb.AppendFormat("{0} {1} HTTP/{2}\r\n", Method, uri, Version);
+        sb.AppendFormat("{0} {1} HTTP/{2}\r\n", Method, uri.PathAndQuery, Version);
         sb.AppendFormat("Host: {0}\r\n", Host);
 
         // 内容长度
Modified +26 -9
diff --git a/NewLife.Core/Http/HttpSession.cs b/NewLife.Core/Http/HttpSession.cs
index c4b2a35..8085f6c 100644
--- a/NewLife.Core/Http/HttpSession.cs
+++ b/NewLife.Core/Http/HttpSession.cs
@@ -1,4 +1,5 @@
-using System.Net;
+using System.Drawing;
+using System.Net;
 using System.Web;
 using NewLife.Data;
 using NewLife.Log;
@@ -153,28 +154,34 @@ public class HttpSession : INetHandler
         using var span = _session.Host.Tracer?.NewSpan(path);
         if (span != null)
         {
-            // 解析上游请求链路
+            span.Tag = $"{_session.Remote.EndPoint} {request.Method} {request.RequestUri}";
             span.Detach(request.Headers);
+            span.Value = request.ContentLength;
 
             if (span is DefaultSpan ds && ds.TraceFlag > 0)
             {
-                var tag = $"{_session.Remote.EndPoint} {request.Method} {request.RequestUri.OriginalString}";
-
+                var flag = false;
                 if (request.BodyLength > 0 &&
                     request.Body != null &&
                     request.Body.Total < 8 * 1024 &&
                     request.ContentType.EqualIgnoreCase(TagTypes))
                 {
-                    tag += "\r\n" + request.Body.ToStr(null, 0, 1024);
+                    span.AppendTag("\r\n<=\r\n" + request.Body.ToStr(null, 0, 1024));
+                    flag = true;
                 }
 
-                if (tag.Length < 500)
+                if (span.Tag.Length < 500)
                 {
+                    if (!flag) span.AppendTag("\r\n<=");
                     var vs = request.Headers.Where(e => !e.Key.EqualIgnoreCase(ExcludeHeaders)).ToDictionary(e => e.Key, e => e.Value + "");
-                    tag += "\r\n" + vs.Join("\r\n", e => $"{e.Key}: {e.Value}");
+                    span.AppendTag("\r\n" + vs.Join(Environment.NewLine, e => $"{e.Key}:{e.Value}"));
+                }
+                else if (!flag)
+                {
+                    span.AppendTag("\r\n<=\r\n");
+                    span.AppendTag($"ContentLength: {request.ContentLength}\r\n");
+                    span.AppendTag($"ContentType: {request.ContentType}");
                 }
-
-                span.SetTag(tag);
             }
         }
 
@@ -199,6 +206,16 @@ public class HttpSession : INetHandler
             _websocket ??= WebSocket.Handshake(context);
 
             handler.ProcessRequest(context);
+
+            // 根据状态码识别异常
+            if (span != null)
+            {
+                var res = context.Response;
+                span.Value += res.ContentLength;
+                var code = res.StatusCode;
+                if (code == HttpStatusCode.BadRequest || code > HttpStatusCode.NotFound)
+                    span.SetError(new HttpRequestException($"Http Error {(Int32)code} {code}"), null);
+            }
         }
         catch (Exception ex)
         {
Modified +16 -16
diff --git a/NewLife.Core/Log/ISpan.cs b/NewLife.Core/Log/ISpan.cs
index 33593f8..3a198e6 100644
--- a/NewLife.Core/Log/ISpan.cs
+++ b/NewLife.Core/Log/ISpan.cs
@@ -311,22 +311,22 @@ public static class SpanExtension
         return request;
     }
 
-    ///// <summary>把片段信息附加到http请求头上</summary>
-    ///// <param name="span">片段</param>
-    ///// <param name="headers">http请求头</param>
-    ///// <returns></returns>
-    //public static HttpRequestHeaders Attach(this ISpan span, HttpRequestHeaders headers)
-    //{
-    //    if (span == null || headers == null) return headers;
-
-    //    // 注入参数名
-    //    var name = GetAttachParameter(span);
-    //    if (name.IsNullOrEmpty()) return headers;
-
-    //    if (!headers.Contains(name)) headers.Add(name, span.ToString());
-
-    //    return headers;
-    //}
+    /// <summary>把片段信息附加到http请求头上</summary>
+    /// <param name="span">片段</param>
+    /// <param name="headers">http请求头</param>
+    /// <returns></returns>
+    public static IDictionary<String, String> Attach(this ISpan span, IDictionary<String, String> headers)
+    {
+        //if (span == null || headers == null) return headers;
+
+        // 注入参数名
+        var name = GetAttachParameter(span);
+        if (name.IsNullOrEmpty()) return headers;
+
+        if (!headers.ContainsKey(name)) headers.Add(name, span.ToString()!);
+
+        return headers;
+    }
 
     /// <summary>把片段信息附加到http请求头上</summary>
     /// <param name="span">片段</param>
Modified +14 -0
diff --git a/NewLife.Core/Net/NetHelper.cs b/NewLife.Core/Net/NetHelper.cs
index e0b5745..f82d376 100644
--- a/NewLife.Core/Net/NetHelper.cs
+++ b/NewLife.Core/Net/NetHelper.cs
@@ -657,6 +657,20 @@ public static class NetHelper
             };
     }
 
+    /// <summary>根据Uri创建客户端,主要支持Http/WebSocket</summary>
+    /// <param name="uri"></param>
+    /// <returns></returns>
+    /// <exception cref="NotSupportedException"></exception>
+    public static ISocketClient CreateRemote(this Uri uri)
+    {
+        return uri.Scheme switch
+        {
+            "ws" => new WebSocketClient(uri) { SslProtocol = SslProtocols.Tls12 },
+            "wss" => new WebSocketClient(uri),
+            _ => throw new NotSupportedException($"The {uri.Scheme} protocol is not supported"),
+        };
+    }
+
     internal static Socket CreateTcp(Boolean ipv4 = true) => new(ipv4 ? AddressFamily.InterNetwork : AddressFamily.InterNetworkV6, SocketType.Stream, ProtocolType.Tcp);
 
     internal static Socket CreateUdp(Boolean ipv4 = true) => new(ipv4 ? AddressFamily.InterNetwork : AddressFamily.InterNetworkV6, SocketType.Dgram, ProtocolType.Udp);
Modified +1 -1
diff --git a/NewLife.Core/Net/SessionBase.cs b/NewLife.Core/Net/SessionBase.cs
index 5e1794d..4ddee0f 100644
--- a/NewLife.Core/Net/SessionBase.cs
+++ b/NewLife.Core/Net/SessionBase.cs
@@ -751,7 +751,7 @@ public abstract class SessionBase : DisposeBase, ISocketClient, ITransport, ILog
     #region 日志
 
     /// <summary>日志前缀</summary>
-    public virtual String LogPrefix { get; set; }
+    public virtual String? LogPrefix { get; set; }
 
     /// <summary>日志对象。禁止设为空对象</summary>
     public ILog Log { get; set; } = Logger.Null;
Modified +8 -13
diff --git a/NewLife.Core/Net/TcpSession.cs b/NewLife.Core/Net/TcpSession.cs
index 8d942be..e3e14de 100644
--- a/NewLife.Core/Net/TcpSession.cs
+++ b/NewLife.Core/Net/TcpSession.cs
@@ -480,22 +480,18 @@ public class TcpSession : SessionBase, ISocketSession
     #endregion 自动重连
 
     #region 辅助
-
-    private String? _LogPrefix;
-
     /// <summary>日志前缀</summary>
-    public override String LogPrefix
+    public override String? LogPrefix
     {
         get
         {
-            if (_LogPrefix == null)
-            {
-                var name = _Server == null ? "" : _Server.Name;
-                _LogPrefix = $"{name}[{ID}].";
-            }
-            return _LogPrefix;
+            var pf = base.LogPrefix;
+            if (pf == null && _Server != null)
+                pf = base.LogPrefix = $"{_Server.Name}[{ID}].";
+
+            return pf;
         }
-        set { _LogPrefix = value; }
+        set { base.LogPrefix = value; }
     }
 
     /// <summary>已重载。</summary>
@@ -509,6 +505,5 @@ public class TcpSession : SessionBase, ISocketSession
 
         return _Server == null ? $"{local}=>{remote}" : $"{local}<={remote}";
     }
-
-    #endregion 辅助
+    #endregion
 }
\ No newline at end of file
Modified +186 -5
diff --git a/NewLife.Core/Net/WebSocketClient.cs b/NewLife.Core/Net/WebSocketClient.cs
index 1f7505d..962fc65 100644
--- a/NewLife.Core/Net/WebSocketClient.cs
+++ b/NewLife.Core/Net/WebSocketClient.cs
@@ -1,12 +1,193 @@
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
+using System.Net;
+using System.Security.Cryptography;
+using NewLife.Data;
+using NewLife.Http;
+using NewLife.Log;
+using NewLife.Security;
+#if !NET45
+using TaskEx = System.Threading.Tasks.Task;
+#endif
 
 namespace NewLife.Net;
 
 /// <summary>WebSocket客户端</summary>
 public class WebSocketClient : TcpSession
 {
+    #region 属性
+    /// <summary>资源地址</summary>
+    public Uri Uri { get; set; } = null!;
+
+    private String? _Key;
+    #endregion
+
+    #region 构造
+    /// <summary>实例化</summary>
+    public WebSocketClient() { }
+
+    /// <summary>实例化</summary>
+    /// <param name="uri"></param>
+    public WebSocketClient(Uri uri)
+    {
+        Uri = uri;
+
+        Remote = new NetUri(uri.ToString());
+    }
+
+    /// <summary>实例化</summary>
+    /// <param name="url"></param>
+    public WebSocketClient(String url) : this(new Uri(url)) { }
+    #endregion
+
+    /// <summary>打开连接,建立WebSocket请求</summary>
+    /// <returns></returns>
+    protected override Boolean OnOpen()
+    {
+        var remote = Remote;
+        if (remote == null || remote.Address.IsAny() || remote.Port == 0)
+        {
+            remote = Remote = new NetUri(Uri.ToString());
+        }
+
+        if (!base.OnOpen()) return false;
+
+        // 连接必须是ws/wss协议
+        if (remote.Type != NetType.WebSocket) return false;
+
+        //todo 建立WebSocket请求
+        var request = new HttpRequest
+        {
+            Method = "GET",
+            RequestUri = Uri
+        };
+        request.Headers["Connection"] = "Upgrade";
+        request.Headers["Upgrade"] = "websocket";
+        request.Headers["Sec-WebSocket-Version"] = "13";
+
+        _Key = Rand.NextBytes(16).ToBase64();
+        request.Headers["Sec-WebSocket-Key"] = _Key;
+
+        // 注入链路跟踪标记
+        DefaultSpan.Current?.Attach(request.Headers);
+
+        // 设置为激活
+        Active = true;
+
+        using var span = Tracer?.NewSpan($"net:{Name}:WebSocket", Uri + "");
+        try
+        {
+            // 发送请求
+            var req = request.Build();
+            Send(req);
+
+            // 接收响应
+            var rs = Receive();
+            if (rs == null || rs.Count == 0) return false;
+
+            // 解析响应
+            var res = new HttpResponse();
+            if (!res.Parse(rs)) return false;
+
+            //if (res.StatusCode != HttpStatusCode.OK) throw new Exception($"{(Int32)res.StatusCode} {res.StatusDescription}");
+            if (res.StatusCode != HttpStatusCode.SwitchingProtocols) throw new Exception("WebSocket握手失败!" + res.StatusDescription);
+
+            // 检查响应头
+            if (!res.Headers.TryGetValue("Sec-WebSocket-Accept", out var accept) ||
+                accept != SHA1.Create().ComputeHash((_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").GetBytes()).ToBase64())
+                throw new Exception("WebSocket握手失败!");
+        }
+        catch (Exception ex)
+        {
+            span?.SetError(ex, null);
+            WriteLog("WebSocket握手失败!" + ex.Message);
+
+            Close("WebSocket");
+            Dispose();
+
+            return false;
+        }
+
+        Active = false;
+
+        return true;
+    }
+
+    /// <summary>接收WebSocket消息</summary>
+    /// <param name="cancellationToken"></param>
+    /// <returns></returns>
+    public virtual async Task<WebSocketMessage?> ReceiveMessageAsync(CancellationToken cancellationToken = default)
+    {
+        var rs = await base.ReceiveAsync(cancellationToken);
+        if (rs == null) return null;
+
+        var msg = new WebSocketMessage();
+        if (!msg.Read(rs)) return null;
+
+        return msg;
+    }
+
+    /// <summary>发送消息</summary>
+    /// <param name="message"></param>
+    /// <param name="cancellationToken"></param>
+    /// <returns></returns>
+    public Task SendMessageAsync(WebSocketMessage message, CancellationToken cancellationToken = default)
+    {
+        var pk = message.ToPacket();
+        Send(pk);
+        //SendMessage(message);
+
+        return TaskEx.CompletedTask;
+    }
+
+    /// <summary>发送文本</summary>
+    /// <param name="data"></param>
+    /// <param name="cancellationToken"></param>
+    /// <returns></returns>
+    public Task SendTextAsync(Packet data, CancellationToken cancellationToken = default)
+    {
+        var msg = new WebSocketMessage
+        {
+            Type = WebSocketMessageType.Text,
+            Payload = data,
+        };
+
+        return SendMessageAsync(msg);
+    }
+
+    /// <summary>发送文本</summary>
+    /// <param name="text"></param>
+    /// <param name="cancellationToken"></param>
+    /// <returns></returns>
+    public Task SendTextAsync(String text, CancellationToken cancellationToken = default) => SendTextAsync(text.GetBytes(), cancellationToken);
+
+    /// <summary>发送二进制数据</summary>
+    /// <param name="data"></param>
+    /// <param name="cancellationToken"></param>
+    /// <returns></returns>
+    public Task SendBinaryAsync(Packet data, CancellationToken cancellationToken = default)
+    {
+        var msg = new WebSocketMessage
+        {
+            Type = WebSocketMessageType.Binary,
+            Payload = data,
+        };
+
+        return SendMessageAsync(msg);
+    }
+
+    /// <summary>发送关闭</summary>
+    /// <param name="closeStatus"></param>
+    /// <param name="statusDescription"></param>
+    /// <param name="cancellationToken"></param>
+    /// <returns></returns>
+    public Task CloseAsync(Int32 closeStatus, String? statusDescription = null, CancellationToken cancellationToken = default)
+    {
+        var msg = new WebSocketMessage
+        {
+            Type = WebSocketMessageType.Close,
+            CloseStatus = closeStatus,
+            StatusDescription = statusDescription,
+        };
+
+        return SendMessageAsync(msg);
+    }
 }
Modified +32 -2
diff --git a/Samples/Zero.HttpServer/ClientTest.cs b/Samples/Zero.HttpServer/ClientTest.cs
index 42eb0e0..486e431 100644
--- a/Samples/Zero.HttpServer/ClientTest.cs
+++ b/Samples/Zero.HttpServer/ClientTest.cs
@@ -3,6 +3,7 @@ using System.Text;
 using NewLife;
 using NewLife.Data;
 using NewLife.Log;
+using NewLife.Net;
 using NewLife.Remoting;
 using NewLife.Serialization;
 
@@ -27,6 +28,7 @@ static class ClientTest
 
         // Api接口请求
         var http = new ApiHttpClient("http://127.0.0.5:8080");
+        http.Log = XTrace.Log;
 
         // 请求接口,返回data部分
         var rs = await http.GetAsync<String>("/user", new { act = "Delete", uid = 1234 });
@@ -39,9 +41,9 @@ static class ClientTest
         client.Dispose();
     }
 
-    public static async Task WebSocketClientTest()
+    public static async Task WebSocketTest()
     {
-        await Task.Delay(5_000);
+        await Task.Delay(3_000);
         XTrace.WriteLine("");
         XTrace.WriteLine("WebSocket客户端开始连接!");
 
@@ -59,4 +61,32 @@ static class ClientTest
 
         client.Dispose();
     }
+
+    public static async Task WebSocketClientTest()
+    {
+        await Task.Delay(5_000);
+        XTrace.WriteLine("");
+        XTrace.WriteLine("WebSocketClient开始连接!");
+
+        var client = new WebSocketClient("ws://127.0.0.7:8080/ws")
+        {
+            Name = "小ws客户",
+            Log = XTrace.Log
+        };
+        if (client is TcpSession tcp) tcp.MaxAsync = 0;
+
+        await client.SendTextAsync("Hello NewLife");
+
+        var rs = await client.ReceiveMessageAsync(default);
+        XTrace.WriteLine(rs.Payload.ToStr());
+
+        // 关闭连接
+        await client.CloseAsync(1000, "通信完成", default);
+        XTrace.WriteLine("Close");
+
+        rs = await client.ReceiveMessageAsync(default);
+        XTrace.WriteLine("Close [{0}] {1}", rs.CloseStatus, rs.StatusDescription);
+
+        client.Dispose();
+    }
 }
Modified +8 -2
diff --git a/Samples/Zero.HttpServer/Program.cs b/Samples/Zero.HttpServer/Program.cs
index 5fc8fc6..346c785 100644
--- a/Samples/Zero.HttpServer/Program.cs
+++ b/Samples/Zero.HttpServer/Program.cs
@@ -24,9 +24,13 @@ services.AddSingleton<ICacheProvider, RedisCacheProvider>();
 // 创建Http服务器
 var server = new HttpServer
 {
+    Name = "新生命Http服务器",
     Port = 8080,
+
     Log = XTrace.Log,
-    //SessionLog = XTrace.Log,
+#if DEBUG
+    SessionLog = XTrace.Log,
+#endif
     Tracer = star.Tracer,
 };
 
@@ -48,12 +52,14 @@ server.Map("/my", new MyHttpHandler());
 server.Map("/ws", new WebSocketHandler());
 
 server.Start();
+XTrace.WriteLine("服务端启动完成!");
 
 // 注册到星尘,非必须
 await star.Service?.RegisterAsync("Zero.HttpServer", $"http://*:{server.Port}");
 
 // 客户端测试,非服务端代码,正式使用时请注释掉
-_ = Task.Run(ClientTest.HttpClientTest);
+//_ = Task.Run(ClientTest.HttpClientTest);
+//_ = Task.Run(ClientTest.WebSocketTest);
 _ = Task.Run(ClientTest.WebSocketClientTest);
 
 // 异步阻塞,友好退出