RPC远程过程调用,二进制封装,提供高吞吐低延迟的高性能RPC框架
大石头 authored at 2022-08-10 13:26:19
11.04 KiB
NewLife.Remoting
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NewLife;
using NewLife.Data;
using NewLife.Log;
using NewLife.Messaging;
using NewLife.Remoting;
using Xunit;

namespace XUnitTest.Remoting;

/// <summary>服务端主动下发与双向通信集成测试</summary>
/// <remarks>
/// 验证服务端向指定会话下发消息、客户端注册 Action 供服务端回调、
/// 广播场景、断开重连后恢复通信等行为。
/// </remarks>
public class BidirectionalIntegrationTests : DisposeBase
{
    private readonly ApiServer _Server;
    private readonly Int32 _Port;

    public BidirectionalIntegrationTests()
    {
        _Server = new ApiServer(0)
        {
            Log = XTrace.Log,
            ShowError = true,
        };
        _Server.Register<BidiController>();
        _Server.Start();

        _Port = _Server.Port;
    }

    protected override void Dispose(Boolean disposing)
    {
        base.Dispose(disposing);
        _Server.TryDispose();
    }

    #region 服务端下发到指定会话
    [Fact(DisplayName = "服务端下发_指定会话收到消息")]
    public async Task ServerPushToSpecificSessionTest()
    {
        var received = new TaskCompletionSource<String>();

        using var client = new CallbackApiClient($"tcp://127.0.0.1:{_Port}");
        client.MessageCallback = (action, data) =>
        {
            if (action == "Notify") received.TrySetResult(data ?? "");
        };

        // 建立连接
        await client.InvokeAsync<String[]>("Api/All");

        // 找到该会话并下发
        var session = _Server.Server.AllSessions.FirstOrDefault();
        Assert.NotNull(session);

        session.InvokeOneWay("Notify", new { content = "TargetMsg" });

        var result = await Task.WhenAny(received.Task, Task.Delay(3000));
        Assert.True(received.Task.IsCompleted, "指定会话应收到下发消息");
        Assert.Contains("TargetMsg", await received.Task);
    }

    [Fact(DisplayName = "服务端下发_仅目标会话收到")]
    public async Task ServerPushOnlyTargetReceivesTest()
    {
        var client1Received = 0;
        var client2Received = 0;

        using var client1 = new CallbackApiClient($"tcp://127.0.0.1:{_Port}");
        client1.MessageCallback = (_, _) => Interlocked.Increment(ref client1Received);

        using var client2 = new CallbackApiClient($"tcp://127.0.0.1:{_Port}");
        client2.MessageCallback = (_, _) => Interlocked.Increment(ref client2Received);

        await client1.InvokeAsync<String[]>("Api/All");
        await client2.InvokeAsync<String[]>("Api/All");

        var sessions = _Server.Server.AllSessions;
        Assert.Equal(2, sessions.Length);

        // 仅向第一个会话下发
        sessions[0].InvokeOneWay("TargetOnly", new { msg = "OnlyFirst" });

        await Task.Delay(500);

        // 第一个收到,第二个不应收到
        Assert.True(client1Received > 0 || client2Received > 0, "至少一个客户端应收到消息");
        // 两个加起来应只有1条
        Assert.Equal(1, client1Received + client2Received);
    }
    #endregion

    #region 广播与选择性广播
    [Fact(DisplayName = "广播_所有客户端都收到")]
    public async Task BroadcastAllReceiveTest()
    {
        var receivedCounts = new Int32[3];
        var allDone = new TaskCompletionSource<Boolean>();
        var clients = new List<CallbackApiClient>();

        try
        {
            for (var i = 0; i < 3; i++)
            {
                var index = i;
                var client = new CallbackApiClient($"tcp://127.0.0.1:{_Port}");
                client.MessageCallback = (_, _) =>
                {
                    Interlocked.Increment(ref receivedCounts[index]);
                    if (receivedCounts.All(c => c >= 1))
                        allDone.TrySetResult(true);
                };
                await client.InvokeAsync<String[]>("Api/All");
                clients.Add(client);
            }

            // 广播
            var count = _Server.InvokeAll("BroadcastAction", new { data = "Hello" });
            Assert.Equal(3, count);

            await Task.WhenAny(allDone.Task, Task.Delay(3000));
            for (var i = 0; i < 3; i++)
            {
                Assert.True(receivedCounts[i] >= 1, $"客户端{i}应收到广播消息");
            }
        }
        finally
        {
            foreach (var c in clients) c.TryDispose();
        }
    }

    [Fact(DisplayName = "广播_零客户端时返回0")]
    public void BroadcastWithNoClientsTest()
    {
        var count = _Server.InvokeAll("NoClients", new { data = "test" });
        Assert.Equal(0, count);
    }
    #endregion

    #region 连续下发
    [Fact(DisplayName = "连续下发_多条消息按序到达")]
    public async Task SequentialPushTest()
    {
        var messages = new List<String>();
        var allReceived = new TaskCompletionSource<Boolean>();

        using var client = new CallbackApiClient($"tcp://127.0.0.1:{_Port}");
        client.MessageCallback = (action, data) =>
        {
            lock (messages)
            {
                messages.Add($"{action}:{data}");
                if (messages.Count >= 5) allReceived.TrySetResult(true);
            }
        };

        await client.InvokeAsync<String[]>("Api/All");
        var session = _Server.Server.AllSessions.FirstOrDefault();
        Assert.NotNull(session);

        // 连续下发5条
        for (var i = 0; i < 5; i++)
        {
            session.InvokeOneWay("Seq", new { index = i });
        }

        await Task.WhenAny(allReceived.Task, Task.Delay(3000));
        Assert.True(allReceived.Task.IsCompleted, "应收到全部5条消息");
        Assert.Equal(5, messages.Count);
    }
    #endregion

    #region Received事件完整性
    [Fact(DisplayName = "Received事件_服务端下发触发客户端Received")]
    public async Task ClientReceivedEventFromPushTest()
    {
        // 使用独立 server 避免跨测试干扰
        using var server = new ApiServer(0) { Log = XTrace.Log, ShowError = true };
        server.Register<BidiController>();
        server.Start();

        var receivedActions = new List<String>();
        var allDone = new TaskCompletionSource<Boolean>();

        using var client = new CallbackApiClient($"tcp://127.0.0.1:{server.Port}");
        client.MessageCallback = (action, _) =>
        {
            lock (receivedActions)
            {
                receivedActions.Add(action);
                if (receivedActions.Count >= 2) allDone.TrySetResult(true);
            }
        };

        // 建立连接
        await client.InvokeAsync<String[]>("Api/All");

        // 等待连接就绪
        await Task.Delay(100);

        var session = server.Server.AllSessions.FirstOrDefault();
        Assert.NotNull(session);

        // 服务端主动下发2条消息
        session.InvokeOneWay("Push1", new { data = "A" });
        session.InvokeOneWay("Push2", new { data = "B" });

        await Task.WhenAny(allDone.Task, Task.Delay(5000));
        Assert.Contains("Push1", receivedActions);
        Assert.Contains("Push2", receivedActions);
    }

    [Fact(DisplayName = "Received事件_服务端侧触发")]
    public async Task ServerReceivedEventTest()
    {
        using var server = new ApiServer(0) { Log = XTrace.Log, ShowError = true };
        server.Register<BidiController>();

        var interceptedActions = new List<String>();
        server.Received += (s, e) =>
        {
            if (e.ApiMessage != null)
            {
                lock (interceptedActions)
                    interceptedActions.Add(e.ApiMessage.Action);
            }
        };

        server.Start();

        using var client = new ApiClient($"tcp://127.0.0.1:{server.Port}");
        await client.InvokeAsync<String>("Bidi/Ping");
        await client.InvokeAsync<Int32>("Bidi/Add", new { a = 1, b = 2 });

        Assert.Contains("Bidi/Ping", interceptedActions);
        Assert.Contains("Bidi/Add", interceptedActions);
    }
    #endregion

    #region 客户端重新连接
    [Fact(DisplayName = "重新连接_新客户端连接成功")]
    public async Task NewClientAfterDisconnectTest()
    {
        // 第一个客户端连接并调用
        var client1 = new ApiClient($"tcp://127.0.0.1:{_Port}");
        var result1 = await client1.InvokeAsync<String>("Bidi/Ping");
        Assert.Equal("Pong", result1);
        Assert.Single(_Server.Server.AllSessions);

        // 关闭第一个客户端
        client1.Close("测试断开");
        client1.Dispose();

        await Task.Delay(300);
        Assert.Empty(_Server.Server.AllSessions);

        // 第二个客户端连接并调用
        using var client2 = new ApiClient($"tcp://127.0.0.1:{_Port}");
        var result2 = await client2.InvokeAsync<String>("Bidi/Ping");
        Assert.Equal("Pong", result2);
        Assert.Single(_Server.Server.AllSessions);
    }

    [Fact(DisplayName = "重新连接_SetServer后可正常调用")]
    public async Task ReconnectViaSetServerTest()
    {
        using var client = new ApiClient($"tcp://127.0.0.1:{_Port}");

        var result1 = await client.InvokeAsync<String>("Bidi/Ping");
        Assert.Equal("Pong", result1);

        // 使用 SetServer 重置连接
        client.SetServer($"tcp://127.0.0.1:{_Port}");

        var result2 = await client.InvokeAsync<String>("Bidi/Ping");
        Assert.Equal("Pong", result2);
    }
    #endregion

    #region 高频快速下发
    [Fact(DisplayName = "高频下发_大量消息不丢失")]
    public async Task HighFrequencyPushTest()
    {
        var receivedCount = 0;
        var total = 100;
        var allDone = new TaskCompletionSource<Boolean>();

        using var client = new CallbackApiClient($"tcp://127.0.0.1:{_Port}");
        client.MessageCallback = (_, _) =>
        {
            if (Interlocked.Increment(ref receivedCount) >= total)
                allDone.TrySetResult(true);
        };

        await client.InvokeAsync<String[]>("Api/All");
        var session = _Server.Server.AllSessions.FirstOrDefault();
        Assert.NotNull(session);

        // 高频下发
        for (var i = 0; i < total; i++)
        {
            session.InvokeOneWay("HF", new { index = i });
        }

        await Task.WhenAny(allDone.Task, Task.Delay(5000));
        Assert.True(receivedCount >= total * 0.9, $"应至少收到{total * 0.9}条,实际收到{receivedCount}条");
    }
    #endregion

    #region 辅助类
    class BidiController
    {
        public String Ping() => "Pong";

        public Int32 Add(Int32 a, Int32 b) => a + b;
    }

    class CallbackApiClient : ApiClient
    {
        public Action<String, String?>? MessageCallback;

        public CallbackApiClient(String uri) : base(uri) { }

        protected override void OnReceive(IMessage message, ApiReceivedEventArgs e)
        {
            base.OnReceive(message, e);

            if (message.Reply) return;

            var action = e.ApiMessage?.Action ?? "";
            var data = e.ApiMessage?.Data?.ToStr();
            MessageCallback?.Invoke(action, data);
        }
    }
    #endregion
}