[feat]RPC通信支持基于websocket,原来支持tcp/udp。启用ws后,原来的http无法访问,兼容性未做好,待评估是否继续保留http智能大石头 编写于 2024-05-08 23:32:29
diff --git a/NewLife.Remoting/ApiClient.cs b/NewLife.Remoting/ApiClient.cs
index 4d1d838..bdb3c63 100644
--- a/NewLife.Remoting/ApiClient.cs
+++ b/NewLife.Remoting/ApiClient.cs
@@ -2,7 +2,9 @@
using NewLife.Data;
using NewLife.Log;
using NewLife.Messaging;
+using NewLife.Model;
using NewLife.Net;
+using NewLife.Remoting.Http;
using NewLife.Threading;
namespace NewLife.Remoting;
@@ -395,10 +397,21 @@ public class ApiClient : ApiHost, IApiClient
/// <param name="svr"></param>
protected virtual ISocketClient OnCreate(String svr)
{
- var client = new NetUri(svr).CreateRemote();
+ var uri = new NetUri(svr);
+ var client = uri.Type == NetType.WebSocket ?
+ new Uri(svr).CreateRemote() :
+ uri.CreateRemote();
+
+ if (uri.Type == NetType.WebSocket && client.Pipeline is Pipeline pipe)
+ {
+ pipe.Handlers.Clear();
+ client.Add<WebSocketClientCodec>();
+ }
+
// 网络层采用消息层超时
client.Timeout = Timeout;
client.Tracer = Tracer;
+ client.Log = SocketLog;
if (Local != null) client.Local = Local;
@@ -433,4 +446,9 @@ public class ApiClient : ApiHost, IApiClient
WriteLog(msg);
}
#endregion
+
+ #region 日志
+ /// <summary>Socket层日志</summary>
+ public ILog SocketLog { get; set; } = Logger.Null;
+ #endregion
}
\ No newline at end of file
diff --git a/NewLife.Remoting/ApiHost.cs b/NewLife.Remoting/ApiHost.cs
index 7343850..9fe8e5c 100644
--- a/NewLife.Remoting/ApiHost.cs
+++ b/NewLife.Remoting/ApiHost.cs
@@ -57,7 +57,7 @@ public abstract class ApiHost : DisposeBase, IApiHost, IExtend, ILogFeature, ITr
/// <summary>写日志</summary>
/// <param name="format"></param>
/// <param name="args"></param>
- public void WriteLog(String format, params Object[] args) => Log?.Info(Name + " " + format, args);
+ public void WriteLog(String format, params Object[] args) => Log?.Info($"[{Name}]{format}", args);
/// <summary>已重载。返回具有本类特征的字符串</summary>
/// <returns>String</returns>
diff --git a/NewLife.Remoting/ApiNetServer.cs b/NewLife.Remoting/ApiNetServer.cs
index d563fb8..e555264 100644
--- a/NewLife.Remoting/ApiNetServer.cs
+++ b/NewLife.Remoting/ApiNetServer.cs
@@ -1,6 +1,8 @@
-using NewLife.Log;
+using NewLife.Http;
+using NewLife.Log;
using NewLife.Messaging;
using NewLife.Net;
+using NewLife.Net.Handlers;
using NewLife.Remoting.Http;
namespace NewLife.Remoting;
@@ -32,14 +34,19 @@ class ApiNetServer : NetServer<ApiNetSession>, IApiServer
//if (Local.Host.IsNullOrEmpty() || Local.Host == "*") AddressFamily = System.Net.Sockets.AddressFamily.Unspecified;
// Http封包协议
- //Add<HttpCodec>();
- Add(new HttpCodec { AllowParseHeader = true });
+ //Add(new HttpCodec { AllowParseHeader = true });
+ Add<WebSocketServerCodec>();
// 新生命标准网络封包协议
Add(Host.GetMessageCodec());
return true;
}
+
+ /// <summary>为会话创建网络数据处理器。可作为业务处理实现,也可以作为前置协议解析</summary>
+ /// <param name="session"></param>
+ /// <returns></returns>
+ public override INetHandler? CreateHandler(INetSession session) => new HttpSession();
}
class ApiNetSession : NetSession<ApiNetServer>, IApiSession
@@ -65,27 +72,6 @@ class ApiNetSession : NetSession<ApiNetServer>, IApiSession
base.Start();
}
- ///// <summary>查找Api动作</summary>
- ///// <param name="action"></param>
- ///// <returns></returns>
- //public virtual ApiAction? FindAction(String action) => _Host.Manager.Find(action);
-
- ///// <summary>创建控制器实例</summary>
- ///// <param name="api"></param>
- ///// <returns></returns>
- //public virtual Object CreateController(ApiAction api)
- //{
- // var controller = api.Controller;
- // if (controller != null) return controller;
-
- // controller = _Host.ServiceProvider?.GetService(api.Type);
-
- // controller ??= api.Type.CreateInstance();
- // if (controller == null) throw new InvalidDataException($"无法创建[{api.Type.FullName}]的实例");
-
- // return controller;
- //}
-
protected override void OnReceive(ReceivedEventArgs e)
{
LastActive = DateTime.Now;
diff --git a/NewLife.Remoting/Http/WebSocketClientCodec.cs b/NewLife.Remoting/Http/WebSocketClientCodec.cs
new file mode 100644
index 0000000..5703cd2
--- /dev/null
+++ b/NewLife.Remoting/Http/WebSocketClientCodec.cs
@@ -0,0 +1,67 @@
+using NewLife.Data;
+using NewLife.Http;
+using NewLife.Model;
+using NewLife.Net;
+
+namespace NewLife.Remoting.Http;
+
+/// <summary>WebSocket消息编码器</summary>
+public class WebSocketClientCodec : Handler
+{
+ /// <summary>打开连接</summary>
+ /// <param name="context">上下文</param>
+ public override Boolean Open(IHandlerContext context)
+ {
+ if (context.Owner is ISocketClient client)
+ {
+ // 连接必须是ws/wss协议
+ if (client.Remote.Type == NetType.WebSocket && client is WebSocketClient ws)
+ {
+ WebSocketClient.Handshake(client, ws.Uri);
+ }
+ }
+
+ return base.Open(context);
+ }
+
+ /// <summary>连接关闭时,清空粘包编码器</summary>
+ /// <param name="context"></param>
+ /// <param name="reason"></param>
+ /// <returns></returns>
+ public override Boolean Close(IHandlerContext context, String reason)
+ {
+ if (context.Owner is IExtend ss) ss["Codec"] = null;
+
+ return base.Close(context, reason);
+ }
+
+ /// <summary>读取数据</summary>
+ /// <param name="context"></param>
+ /// <param name="message"></param>
+ /// <returns></returns>
+ public override Object? Read(IHandlerContext context, Object message)
+ {
+ if (message is Packet pk)
+ {
+ var msg = new WebSocketMessage();
+ if (msg.Read(pk)) message = msg.Payload;
+ }
+
+ return base.Read(context, message);
+ }
+
+ /// <summary>发送消息时,写入数据</summary>
+ /// <param name="context"></param>
+ /// <param name="message"></param>
+ /// <returns></returns>
+ public override Object? Write(IHandlerContext context, Object message)
+ {
+ if (message is Packet pk)
+ message = new WebSocketMessage { Type = WebSocketMessageType.Binary, Payload = pk };
+
+ if (message is WebSocketMessage msg)
+ message = msg.ToPacket();
+
+ return base.Write(context, message);
+ }
+}
diff --git a/NewLife.Remoting/Http/WebSocketServerCodec.cs b/NewLife.Remoting/Http/WebSocketServerCodec.cs
new file mode 100644
index 0000000..0274cce
--- /dev/null
+++ b/NewLife.Remoting/Http/WebSocketServerCodec.cs
@@ -0,0 +1,91 @@
+using NewLife.Data;
+using NewLife.Http;
+using NewLife.Model;
+using NewLife.Net;
+using WebSocket = NewLife.Http.WebSocket;
+using WebSocketMessageType = NewLife.Http.WebSocketMessageType;
+
+namespace NewLife.Remoting.Http;
+
+/// <summary>WebSocket消息编码器</summary>
+public class WebSocketServerCodec : Handler
+{
+ /// <summary>连接关闭时,清空粘包编码器</summary>
+ /// <param name="context"></param>
+ /// <param name="reason"></param>
+ /// <returns></returns>
+ public override Boolean Close(IHandlerContext context, String reason)
+ {
+ if (context.Owner is IExtend ss) ss["_websocket"] = null;
+
+ return base.Close(context, reason);
+ }
+
+ /// <summary>读取数据</summary>
+ /// <param name="context"></param>
+ /// <param name="message"></param>
+ /// <returns></returns>
+ public override Object? Read(IHandlerContext context, Object message)
+ {
+ if (message is not Packet pk) return base.Read(context, message);
+
+ // 连接必须是ws/wss协议
+ if (context.Owner is not ISocketRemote session || session.Remote.Type != NetType.Tcp) return base.Read(context, message);
+ if (context.Owner is not IExtend ss) return base.Read(context, message);
+
+ var websocket = ss["_websocket"] as WebSocket;
+ if (websocket == null)
+ {
+ var request = new HttpRequest();
+ if (request.Parse(pk) && request.IsCompleted)
+ {
+ var ctx = new DefaultHttpContext(null, request, null, null)
+ {
+ ServiceProvider = session as IServiceProvider
+ };
+
+ // 处理 WebSocket 握手
+ websocket = WebSocket.Handshake(ctx);
+ if (websocket == null) return message;
+
+ var rs = ctx.Response;
+ if (rs == null) return message;
+
+ session.Send(rs.Build());
+
+ ss["_websocket"] = websocket;
+ return null;
+ }
+ }
+ if (websocket != null)
+ {
+ var msg = new WebSocketMessage();
+ if (msg.Read(pk)) message = msg.Payload;
+ }
+
+ return base.Read(context, message);
+ }
+
+ /// <summary>发送消息时,写入数据</summary>
+ /// <param name="context"></param>
+ /// <param name="message"></param>
+ /// <returns></returns>
+ public override Object? Write(IHandlerContext context, Object message)
+ {
+ // 仅编码websocket连接
+ if (context.Owner is IExtend ss && ss["_websocket"] is WebSocket)
+ {
+ if (message is Packet pk)
+ {
+ var msg = new WebSocketMessage
+ {
+ Type = WebSocketMessageType.Binary,
+ Payload = pk,
+ };
+ message = msg.ToPacket();
+ }
+ }
+
+ return base.Write(context, message);
+ }
+}
diff --git a/Samples/Zero.RpcServer/ClientTest.cs b/Samples/Zero.RpcServer/ClientTest.cs
index 6c5de2a..587d204 100644
--- a/Samples/Zero.RpcServer/ClientTest.cs
+++ b/Samples/Zero.RpcServer/ClientTest.cs
@@ -1,7 +1,4 @@
-using System.Net.Sockets;
-using System.Text;
-using NewLife;
-using NewLife.Log;
+using NewLife.Log;
using NewLife.Remoting;
using NewLife.Security;
using NewLife.Serialization;
@@ -18,8 +15,8 @@ static class ClientTest
XTrace.WriteLine("Tcp开始连接!");
// 连接服务端
- var client = new ApiClient("tcp://127.0.0.2:12346");
- client.Open();
+ var client = new ApiClient($"tcp://127.0.0.2:{port}");
+ client.Name = "小Tcp";
await Process(client);
@@ -35,8 +32,10 @@ static class ClientTest
XTrace.WriteLine("Udp开始连接!");
// 连接服务端
- var client = new ApiClient("udp://127.0.0.2:12346");
- client.Open();
+ //var client = new ApiClient($"udp://127.0.0.3:{port}");
+ // UDP协议使用127.0.0.3地址后,服务端响应时发往127.0.0.1,导致客户端无法匹配接收
+ var client = new ApiClient($"udp://127.0.0.1:{port}");
+ client.Name = "小Udp";
await Process(client);
@@ -52,8 +51,8 @@ static class ClientTest
XTrace.WriteLine("WebSocket开始连接!");
// 连接服务端
- var client = new ApiClient("ws://127.0.0.2:12346");
- client.Open();
+ var client = new ApiClient($"ws://127.0.0.4:{port}");
+ client.Name = "小Ws";
await Process(client);
@@ -63,14 +62,50 @@ static class ClientTest
static async Task Process(ApiClient client)
{
- // 获取所有接口
- var apis = await client.InvokeAsync<String[]>("api/all");
+ try
+ {
+ client.Log = XTrace.Log;
+#if DEBUG
+ client.EncoderLog = XTrace.Log;
+ client.SocketLog = XTrace.Log;
+#endif
+ client.Open();
+
+ // 获取所有接口
+ client.WriteLog("获取所有接口");
+ var apis = await client.InvokeAsync<String[]>("api/all");
+ client.WriteLog("共有接口数:{0}", apis.Length);
+
+ // 获取服务端信息
+ client.WriteLog("获取服务端信息");
+ var state = Rand.NextString(8);
+ var state2 = Rand.NextString(8);
+ var infs = await client.InvokeAsync<IDictionary<String, Object>>("api/info", new { state, state2 });
+ client.WriteLog("服务端信息:{0}", infs.ToJson(true));
+ }
+ catch (Exception ex)
+ {
+ XTrace.WriteException(ex);
+ }
+ }
+
+ /// <summary>Http连接ApiServer</summary>
+ public static async void HttpTest(Int32 port)
+ {
+ await Task.Delay(4_000);
+ XTrace.WriteLine("");
+ XTrace.WriteLine("Http开始连接!");
+
+ // 连接服务端
+ var client = new ApiHttpClient($"http://127.0.0.2:{port}");
+ client.Log = XTrace.Log;
+
+ var apis = await client.GetAsync<String[]>("api/all");
client.WriteLog("共有接口数:{0}", apis.Length);
- // 获取服务端信息
var state = Rand.NextString(8);
var state2 = Rand.NextString(8);
- var infs = await client.InvokeAsync<IDictionary<String, Object>>("api/info", new { state, state2 });
+ var infs = await client.PostAsync<IDictionary<String, Object>>("api/info", new { state, state2 });
client.WriteLog("服务端信息:{0}", infs.ToJson(true));
}
}
diff --git a/Samples/Zero.RpcServer/Program.cs b/Samples/Zero.RpcServer/Program.cs
index 9c7db62..36b2472 100644
--- a/Samples/Zero.RpcServer/Program.cs
+++ b/Samples/Zero.RpcServer/Program.cs
@@ -1,5 +1,8 @@
-using NewLife.Caching;
+using System.Diagnostics;
+using NewLife;
+using NewLife.Caching;
using NewLife.Caching.Services;
+using NewLife.Configuration;
using NewLife.Log;
using NewLife.Model;
using NewLife.Remoting;
@@ -20,8 +23,10 @@ services.AddSingleton<ICacheProvider, RedisCacheProvider>();
// 引入Redis,用于消息队列和缓存,单例,带性能跟踪。一般使用上面的ICacheProvider替代
//services.AddRedis("127.0.0.1:6379", "123456", 3, 5000);
+var port = 12345;
+
// 实例化RPC服务端,指定端口,同时在Tcp/Udp/IPv4/IPv6上监听
-var server = new ApiServer(12346)
+var server = new ApiServer(port)
{
Name = "银河服务端",
@@ -33,7 +38,7 @@ var server = new ApiServer(12346)
Tracer = star.Tracer,
};
-// 注册服务控制器
+// 注册服务控制器,其中提供各种接口服务
server.Register<MyController>();
server.Register<UserController>();
server.Register<AreaController>();
@@ -51,9 +56,10 @@ XTrace.WriteLine("服务端启动完成!");
star?.Service?.Register("MyRpcServer", () => $"tcp://*:{server.Port},udp://*:{server.Port}");
// 客户端测试,非服务端代码,正式使用时请注释掉
-_ = Task.Run(() => ClientTest.TcpTest(server.Port));
-_ = Task.Run(() => ClientTest.UdpTest(server.Port));
-_ = Task.Run(() => ClientTest.WebSocketTest(server.Port));
+_ = Task.Run(() => ClientTest.TcpTest(port));
+_ = Task.Run(() => ClientTest.UdpTest(port));
+_ = Task.Run(() => ClientTest.WebSocketTest(port));
+_ = Task.Run(() => ClientTest.HttpTest(port));
// 阻塞,等待友好退出
var host = services.BuildHost();