NewLife/NewLife.Remoting

[feat]RPC通信支持基于websocket,原来支持tcp/udp。启用ws后,原来的http无法访问,兼容性未做好,待评估是否继续保留http
智能大石头 编写于 2024-05-08 23:32:29
共计: 修改7个文件,增加249行、删除46行。
修改 +19 -1
修改 +1 -1
修改 +10 -24
增加 +67 -0
增加 +91 -0
修改 +49 -14
修改 +12 -6
修改 +19 -1
diff --git a/NewLife.Remoting/ApiClient.cs b/NewLife.Remoting/ApiClient.cs
index 4d1d838..bdb3c63 100644
--- a/NewLife.Remoting/ApiClient.cs
+++ b/NewLife.Remoting/ApiClient.cs
@@ -2,7 +2,9 @@
 using NewLife.Data;
 using NewLife.Log;
 using NewLife.Messaging;
+using NewLife.Model;
 using NewLife.Net;
+using NewLife.Remoting.Http;
 using NewLife.Threading;
 
 namespace NewLife.Remoting;
@@ -395,10 +397,21 @@ public class ApiClient : ApiHost, IApiClient
     /// <param name="svr"></param>
     protected virtual ISocketClient OnCreate(String svr)
     {
-        var client = new NetUri(svr).CreateRemote();
+        var uri = new NetUri(svr);
+        var client = uri.Type == NetType.WebSocket ?
+            new Uri(svr).CreateRemote() :
+            uri.CreateRemote();
+
+        if (uri.Type == NetType.WebSocket && client.Pipeline is Pipeline pipe)
+        {
+            pipe.Handlers.Clear();
+            client.Add<WebSocketClientCodec>();
+        }
+
         // 网络层采用消息层超时
         client.Timeout = Timeout;
         client.Tracer = Tracer;
+        client.Log = SocketLog;
 
         if (Local != null) client.Local = Local;
 
@@ -433,4 +446,9 @@ public class ApiClient : ApiHost, IApiClient
         WriteLog(msg);
     }
     #endregion
+
+    #region 日志
+    /// <summary>Socket层日志</summary>
+    public ILog SocketLog { get; set; } = Logger.Null;
+    #endregion
 }
\ No newline at end of file
修改 +1 -1
diff --git a/NewLife.Remoting/ApiHost.cs b/NewLife.Remoting/ApiHost.cs
index 7343850..9fe8e5c 100644
--- a/NewLife.Remoting/ApiHost.cs
+++ b/NewLife.Remoting/ApiHost.cs
@@ -57,7 +57,7 @@ public abstract class ApiHost : DisposeBase, IApiHost, IExtend, ILogFeature, ITr
     /// <summary>写日志</summary>
     /// <param name="format"></param>
     /// <param name="args"></param>
-    public void WriteLog(String format, params Object[] args) => Log?.Info(Name + " " + format, args);
+    public void WriteLog(String format, params Object[] args) => Log?.Info($"[{Name}]{format}", args);
 
     /// <summary>已重载。返回具有本类特征的字符串</summary>
     /// <returns>String</returns>
修改 +10 -24
diff --git a/NewLife.Remoting/ApiNetServer.cs b/NewLife.Remoting/ApiNetServer.cs
index d563fb8..e555264 100644
--- a/NewLife.Remoting/ApiNetServer.cs
+++ b/NewLife.Remoting/ApiNetServer.cs
@@ -1,6 +1,8 @@
-using NewLife.Log;
+using NewLife.Http;
+using NewLife.Log;
 using NewLife.Messaging;
 using NewLife.Net;
+using NewLife.Net.Handlers;
 using NewLife.Remoting.Http;
 
 namespace NewLife.Remoting;
@@ -32,14 +34,19 @@ class ApiNetServer : NetServer<ApiNetSession>, IApiServer
         //if (Local.Host.IsNullOrEmpty() || Local.Host == "*") AddressFamily = System.Net.Sockets.AddressFamily.Unspecified;
 
         // Http封包协议
-        //Add<HttpCodec>();
-        Add(new HttpCodec { AllowParseHeader = true });
+        //Add(new HttpCodec { AllowParseHeader = true });
+        Add<WebSocketServerCodec>();
 
         // 新生命标准网络封包协议
         Add(Host.GetMessageCodec());
 
         return true;
     }
+
+    /// <summary>为会话创建网络数据处理器。可作为业务处理实现,也可以作为前置协议解析</summary>
+    /// <param name="session"></param> 
+    /// <returns></returns>
+    public override INetHandler? CreateHandler(INetSession session) => new HttpSession();
 }
 
 class ApiNetSession : NetSession<ApiNetServer>, IApiSession
@@ -65,27 +72,6 @@ class ApiNetSession : NetSession<ApiNetServer>, IApiSession
         base.Start();
     }
 
-    ///// <summary>查找Api动作</summary>
-    ///// <param name="action"></param>
-    ///// <returns></returns>
-    //public virtual ApiAction? FindAction(String action) => _Host.Manager.Find(action);
-
-    ///// <summary>创建控制器实例</summary>
-    ///// <param name="api"></param>
-    ///// <returns></returns>
-    //public virtual Object CreateController(ApiAction api)
-    //{
-    //    var controller = api.Controller;
-    //    if (controller != null) return controller;
-
-    //    controller = _Host.ServiceProvider?.GetService(api.Type);
-
-    //    controller ??= api.Type.CreateInstance();
-    //    if (controller == null) throw new InvalidDataException($"无法创建[{api.Type.FullName}]的实例");
-
-    //    return controller;
-    //}
-
     protected override void OnReceive(ReceivedEventArgs e)
     {
         LastActive = DateTime.Now;
增加 +67 -0
diff --git a/NewLife.Remoting/Http/WebSocketClientCodec.cs b/NewLife.Remoting/Http/WebSocketClientCodec.cs
new file mode 100644
index 0000000..5703cd2
--- /dev/null
+++ b/NewLife.Remoting/Http/WebSocketClientCodec.cs
@@ -0,0 +1,67 @@
+using NewLife.Data;
+using NewLife.Http;
+using NewLife.Model;
+using NewLife.Net;
+
+namespace NewLife.Remoting.Http;
+
+/// <summary>WebSocket消息编码器</summary>
+public class WebSocketClientCodec : Handler
+{
+    /// <summary>打开连接</summary>
+    /// <param name="context">上下文</param>
+    public override Boolean Open(IHandlerContext context)
+    {
+        if (context.Owner is ISocketClient client)
+        {
+            // 连接必须是ws/wss协议
+            if (client.Remote.Type == NetType.WebSocket && client is WebSocketClient ws)
+            {
+                WebSocketClient.Handshake(client, ws.Uri);
+            }
+        }
+
+        return base.Open(context);
+    }
+
+    /// <summary>连接关闭时,清空粘包编码器</summary>
+    /// <param name="context"></param>
+    /// <param name="reason"></param>
+    /// <returns></returns>
+    public override Boolean Close(IHandlerContext context, String reason)
+    {
+        if (context.Owner is IExtend ss) ss["Codec"] = null;
+
+        return base.Close(context, reason);
+    }
+
+    /// <summary>读取数据</summary>
+    /// <param name="context"></param>
+    /// <param name="message"></param>
+    /// <returns></returns>
+    public override Object? Read(IHandlerContext context, Object message)
+    {
+        if (message is Packet pk)
+        {
+            var msg = new WebSocketMessage();
+            if (msg.Read(pk)) message = msg.Payload;
+        }
+
+        return base.Read(context, message);
+    }
+
+    /// <summary>发送消息时,写入数据</summary>
+    /// <param name="context"></param>
+    /// <param name="message"></param>
+    /// <returns></returns>
+    public override Object? Write(IHandlerContext context, Object message)
+    {
+        if (message is Packet pk)
+            message = new WebSocketMessage { Type = WebSocketMessageType.Binary, Payload = pk };
+
+        if (message is WebSocketMessage msg)
+            message = msg.ToPacket();
+
+        return base.Write(context, message);
+    }
+}
增加 +91 -0
diff --git a/NewLife.Remoting/Http/WebSocketServerCodec.cs b/NewLife.Remoting/Http/WebSocketServerCodec.cs
new file mode 100644
index 0000000..0274cce
--- /dev/null
+++ b/NewLife.Remoting/Http/WebSocketServerCodec.cs
@@ -0,0 +1,91 @@
+using NewLife.Data;
+using NewLife.Http;
+using NewLife.Model;
+using NewLife.Net;
+using WebSocket = NewLife.Http.WebSocket;
+using WebSocketMessageType = NewLife.Http.WebSocketMessageType;
+
+namespace NewLife.Remoting.Http;
+
+/// <summary>WebSocket消息编码器</summary>
+public class WebSocketServerCodec : Handler
+{
+    /// <summary>连接关闭时,清空粘包编码器</summary>
+    /// <param name="context"></param>
+    /// <param name="reason"></param>
+    /// <returns></returns>
+    public override Boolean Close(IHandlerContext context, String reason)
+    {
+        if (context.Owner is IExtend ss) ss["_websocket"] = null;
+
+        return base.Close(context, reason);
+    }
+
+    /// <summary>读取数据</summary>
+    /// <param name="context"></param>
+    /// <param name="message"></param>
+    /// <returns></returns>
+    public override Object? Read(IHandlerContext context, Object message)
+    {
+        if (message is not Packet pk) return base.Read(context, message);
+
+        // 连接必须是ws/wss协议
+        if (context.Owner is not ISocketRemote session || session.Remote.Type != NetType.Tcp) return base.Read(context, message);
+        if (context.Owner is not IExtend ss) return base.Read(context, message);
+
+        var websocket = ss["_websocket"] as WebSocket;
+        if (websocket == null)
+        {
+            var request = new HttpRequest();
+            if (request.Parse(pk) && request.IsCompleted)
+            {
+                var ctx = new DefaultHttpContext(null, request, null, null)
+                {
+                    ServiceProvider = session as IServiceProvider
+                };
+
+                // 处理 WebSocket 握手
+                websocket = WebSocket.Handshake(ctx);
+                if (websocket == null) return message;
+
+                var rs = ctx.Response;
+                if (rs == null) return message;
+
+                session.Send(rs.Build());
+
+                ss["_websocket"] = websocket;
+                return null;
+            }
+        }
+        if (websocket != null)
+        {
+            var msg = new WebSocketMessage();
+            if (msg.Read(pk)) message = msg.Payload;
+        }
+
+        return base.Read(context, message);
+    }
+
+    /// <summary>发送消息时,写入数据</summary>
+    /// <param name="context"></param>
+    /// <param name="message"></param>
+    /// <returns></returns>
+    public override Object? Write(IHandlerContext context, Object message)
+    {
+        // 仅编码websocket连接
+        if (context.Owner is IExtend ss && ss["_websocket"] is WebSocket)
+        {
+            if (message is Packet pk)
+            {
+                var msg = new WebSocketMessage
+                {
+                    Type = WebSocketMessageType.Binary,
+                    Payload = pk,
+                };
+                message = msg.ToPacket();
+            }
+        }
+
+        return base.Write(context, message);
+    }
+}
修改 +49 -14
diff --git a/Samples/Zero.RpcServer/ClientTest.cs b/Samples/Zero.RpcServer/ClientTest.cs
index 6c5de2a..587d204 100644
--- a/Samples/Zero.RpcServer/ClientTest.cs
+++ b/Samples/Zero.RpcServer/ClientTest.cs
@@ -1,7 +1,4 @@
-using System.Net.Sockets;
-using System.Text;
-using NewLife;
-using NewLife.Log;
+using NewLife.Log;
 using NewLife.Remoting;
 using NewLife.Security;
 using NewLife.Serialization;
@@ -18,8 +15,8 @@ static class ClientTest
         XTrace.WriteLine("Tcp开始连接!");
 
         // 连接服务端
-        var client = new ApiClient("tcp://127.0.0.2:12346");
-        client.Open();
+        var client = new ApiClient($"tcp://127.0.0.2:{port}");
+        client.Name = "小Tcp";
 
         await Process(client);
 
@@ -35,8 +32,10 @@ static class ClientTest
         XTrace.WriteLine("Udp开始连接!");
 
         // 连接服务端
-        var client = new ApiClient("udp://127.0.0.2:12346");
-        client.Open();
+        //var client = new ApiClient($"udp://127.0.0.3:{port}");
+        // UDP协议使用127.0.0.3地址后,服务端响应时发往127.0.0.1,导致客户端无法匹配接收
+        var client = new ApiClient($"udp://127.0.0.1:{port}");
+        client.Name = "小Udp";
 
         await Process(client);
 
@@ -52,8 +51,8 @@ static class ClientTest
         XTrace.WriteLine("WebSocket开始连接!");
 
         // 连接服务端
-        var client = new ApiClient("ws://127.0.0.2:12346");
-        client.Open();
+        var client = new ApiClient($"ws://127.0.0.4:{port}");
+        client.Name = "小Ws";
 
         await Process(client);
 
@@ -63,14 +62,50 @@ static class ClientTest
 
     static async Task Process(ApiClient client)
     {
-        // 获取所有接口
-        var apis = await client.InvokeAsync<String[]>("api/all");
+        try
+        {
+            client.Log = XTrace.Log;
+#if DEBUG
+            client.EncoderLog = XTrace.Log;
+            client.SocketLog = XTrace.Log;
+#endif
+            client.Open();
+
+            // 获取所有接口
+            client.WriteLog("获取所有接口");
+            var apis = await client.InvokeAsync<String[]>("api/all");
+            client.WriteLog("共有接口数:{0}", apis.Length);
+
+            // 获取服务端信息
+            client.WriteLog("获取服务端信息");
+            var state = Rand.NextString(8);
+            var state2 = Rand.NextString(8);
+            var infs = await client.InvokeAsync<IDictionary<String, Object>>("api/info", new { state, state2 });
+            client.WriteLog("服务端信息:{0}", infs.ToJson(true));
+        }
+        catch (Exception ex)
+        {
+            XTrace.WriteException(ex);
+        }
+    }
+
+    /// <summary>Http连接ApiServer</summary>
+    public static async void HttpTest(Int32 port)
+    {
+        await Task.Delay(4_000);
+        XTrace.WriteLine("");
+        XTrace.WriteLine("Http开始连接!");
+
+        // 连接服务端
+        var client = new ApiHttpClient($"http://127.0.0.2:{port}");
+        client.Log = XTrace.Log;
+
+        var apis = await client.GetAsync<String[]>("api/all");
         client.WriteLog("共有接口数:{0}", apis.Length);
 
-        // 获取服务端信息
         var state = Rand.NextString(8);
         var state2 = Rand.NextString(8);
-        var infs = await client.InvokeAsync<IDictionary<String, Object>>("api/info", new { state, state2 });
+        var infs = await client.PostAsync<IDictionary<String, Object>>("api/info", new { state, state2 });
         client.WriteLog("服务端信息:{0}", infs.ToJson(true));
     }
 }
修改 +12 -6
diff --git a/Samples/Zero.RpcServer/Program.cs b/Samples/Zero.RpcServer/Program.cs
index 9c7db62..36b2472 100644
--- a/Samples/Zero.RpcServer/Program.cs
+++ b/Samples/Zero.RpcServer/Program.cs
@@ -1,5 +1,8 @@
-using NewLife.Caching;
+using System.Diagnostics;
+using NewLife;
+using NewLife.Caching;
 using NewLife.Caching.Services;
+using NewLife.Configuration;
 using NewLife.Log;
 using NewLife.Model;
 using NewLife.Remoting;
@@ -20,8 +23,10 @@ services.AddSingleton<ICacheProvider, RedisCacheProvider>();
 // 引入Redis,用于消息队列和缓存,单例,带性能跟踪。一般使用上面的ICacheProvider替代
 //services.AddRedis("127.0.0.1:6379", "123456", 3, 5000);
 
+var port = 12345;
+
 // 实例化RPC服务端,指定端口,同时在Tcp/Udp/IPv4/IPv6上监听
-var server = new ApiServer(12346)
+var server = new ApiServer(port)
 {
     Name = "银河服务端",
 
@@ -33,7 +38,7 @@ var server = new ApiServer(12346)
     Tracer = star.Tracer,
 };
 
-// 注册服务控制器
+// 注册服务控制器,其中提供各种接口服务
 server.Register<MyController>();
 server.Register<UserController>();
 server.Register<AreaController>();
@@ -51,9 +56,10 @@ XTrace.WriteLine("服务端启动完成!");
 star?.Service?.Register("MyRpcServer", () => $"tcp://*:{server.Port},udp://*:{server.Port}");
 
 // 客户端测试,非服务端代码,正式使用时请注释掉
-_ = Task.Run(() => ClientTest.TcpTest(server.Port));
-_ = Task.Run(() => ClientTest.UdpTest(server.Port));
-_ = Task.Run(() => ClientTest.WebSocketTest(server.Port));
+_ = Task.Run(() => ClientTest.TcpTest(port));
+_ = Task.Run(() => ClientTest.UdpTest(port));
+_ = Task.Run(() => ClientTest.WebSocketTest(port));
+_ = Task.Run(() => ClientTest.HttpTest(port));
 
 // 阻塞,等待友好退出
 var host = services.BuildHost();