RPC远程过程调用,二进制封装,提供高吞吐低延迟的高性能RPC框架
大石头 authored at 2022-08-10 13:26:19
13.53 KiB
NewLife.Remoting
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NewLife;
using NewLife.Data;
using NewLife.Log;
using NewLife.Messaging;
using NewLife.Remoting;
using NewLife.Security;
using Xunit;

namespace XUnitTest.Integration;

/// <summary>ApiServer与ApiClient集成测试</summary>
public class ApiServerClientIntegrationTests : DisposeBase
{
    private readonly ApiServer _Server;
    private readonly Int32 _Port;

    public ApiServerClientIntegrationTests()
    {
        _Server = new ApiServer(0)
        {
            //Multiplex = false,
            Log = XTrace.Log,
            ShowError = true,
        };
        _Server.Register<IntegrationController>();
        _Server.Start();

        _Port = _Server.Port;
    }

    protected override void Dispose(Boolean disposing)
    {
        base.Dispose(disposing);
        _Server.TryDispose();
    }

    #region 基础RPC测试
    [Fact(DisplayName = "完整RPC调用流程")]
    public async Task FullRpcFlowTest()
    {
        using var client = new ApiClient($"tcp://127.0.0.1:{_Port}");

        // 1. 获取API列表
        var apis = await client.InvokeAsync<String[]>("Api/All");
        Assert.NotNull(apis);
        Assert.True(apis.Length > 0);

        // 2. 简单调用
        var greeting = await client.InvokeAsync<String>("Integration/Greet", new { name = "World" });
        Assert.Equal("Hello, World!", greeting);

        // 3. 复杂参数
        var result = await client.InvokeAsync<CalculateResult>("Integration/Calculate", new
        {
            a = 10,
            b = 3,
            operation = "add"
        });
        Assert.NotNull(result);
        Assert.Equal(13, result.Value);

        // 4. 异常处理
        var ex = await Assert.ThrowsAsync<ApiException>(
            () => client.InvokeAsync<Object>("Integration/Error"));
        Assert.Equal(500, ex.Code);
    }

    [Theory(DisplayName = "多种参数类型测试")]
    [InlineData("add", 10, 5, 15)]
    [InlineData("sub", 10, 5, 5)]
    [InlineData("mul", 10, 5, 50)]
    [InlineData("div", 10, 5, 2)]
    public async Task MultipleParameterTypesTest(String operation, Int32 a, Int32 b, Int32 expected)
    {
        using var client = new ApiClient($"tcp://127.0.0.1:{_Port}");

        var result = await client.InvokeAsync<CalculateResult>("Integration/Calculate", new { a, b, operation });

        Assert.NotNull(result);
        Assert.Equal(expected, result.Value);
        Assert.Equal(operation, result.Operation);
    }
    #endregion

    #region 服务端下发测试
    [Fact(DisplayName = "服务端主动下发消息")]
    public async Task ServerPushMessageTest()
    {
        var receivedMessages = new List<String>();
        var messageReceived = new TaskCompletionSource<Boolean>();

        using var client = new ReceivableApiClient($"tcp://127.0.0.1:{_Port}");
        client.MessageReceived += (action, data) =>
        {
            receivedMessages.Add($"{action}:{data}");
            // 收到 ServerNotify 消息时完成
            if (action == "ServerNotify")
                messageReceived.TrySetResult(true);
        };

        // 建立连接
        await client.InvokeAsync<String[]>("Api/All");

        // 获取会话并下发消息
        var session = _Server.Server.AllSessions.FirstOrDefault();
        Assert.NotNull(session);

        session.InvokeOneWay("ServerNotify", new { message = "Hello from server" });

        // 等待消息到达
        var completed = await Task.WhenAny(messageReceived.Task, Task.Delay(2000));
        Assert.True(messageReceived.Task.IsCompleted, "应收到服务端推送");

        // 应包含 ServerNotify 消息
        Assert.Contains(receivedMessages, m => m.StartsWith("ServerNotify:"));
    }

    [Fact(DisplayName = "广播消息到所有客户端")]
    public async Task BroadcastToAllClientsTest()
    {
        var receivedCounts = new Int32[3];
        var allReceived = new TaskCompletionSource<Boolean>();
        var clients = new List<ReceivableApiClient>();

        try
        {
            // 创建多个客户端
            for (var i = 0; i < 3; i++)
            {
                var index = i;
                var client = new ReceivableApiClient($"tcp://127.0.0.1:{_Port}");
                client.MessageReceived += (action, data) =>
                {
                    Interlocked.Increment(ref receivedCounts[index]);
                    if (receivedCounts.All(c => c >= 1))
                        allReceived.TrySetResult(true);
                };
                await client.InvokeAsync<String[]>("Api/All");
                clients.Add(client);
            }

            Assert.Equal(3, _Server.Server.AllSessions.Length);

            // 广播消息
            var count = _Server.InvokeAll("Broadcast", new { data = "BroadcastData" });
            Assert.Equal(3, count);

            // 等待所有客户端收到
            await Task.WhenAny(allReceived.Task, Task.Delay(3000));

            for (var i = 0; i < 3; i++)
            {
                Assert.True(receivedCounts[i] >= 1, $"客户端{i}应收到广播消息");
            }
        }
        finally
        {
            foreach (var client in clients)
            {
                client.TryDispose();
            }
        }
    }
    #endregion

    #region 大数据传输测试
    [Theory(DisplayName = "大数据传输测试")]
    [InlineData(1024)]          // 1KB
    [InlineData(32 * 1024)]     // 32KB
    [InlineData(64 * 1024)]     // 64KB
    public async Task LargeDataTransferTest(Int32 size)
    {
        using var client = new ApiClient($"tcp://127.0.0.1:{_Port}");

        var data = Rand.NextBytes(size);
        var result = await client.InvokeAsync<Packet>("Integration/Echo", data);

        Assert.NotNull(result);
        Assert.Equal(size, result.Total);
        Assert.True(data.SequenceEqual(result.ToArray()));
    }

    [Fact(DisplayName = "大消息分片处理")]
    public async Task LargeMessageChunkTest()
    {
        using var client = new ApiClient($"tcp://127.0.0.1:{_Port}");

        // 发送多个中等大小的数据
        for (var i = 0; i < 5; i++)
        {
            var data = Rand.NextBytes(10 * 1024);
            var result = await client.InvokeAsync<Packet>("Integration/Echo", data);

            Assert.NotNull(result);
            Assert.Equal(data.Length, result.Total);
        }
    }
    #endregion

    #region 并发压力测试
    [Fact(DisplayName = "高并发调用测试")]
    public async Task HighConcurrencyTest()
    {
        using var client = new ApiClient($"tcp://127.0.0.1:{_Port}");

        var tasks = new List<Task<Int32>>();
        var count = 100;

        for (var i = 0; i < count; i++)
        {
            var index = i;
            tasks.Add(client.InvokeAsync<Int32>("Integration/Add", new { a = index, b = 1 }));
        }

        var results = await Task.WhenAll(tasks);

        for (var i = 0; i < count; i++)
        {
            Assert.Equal(i + 1, results[i]);
        }
    }

    [Fact(DisplayName = "多客户端并发测试", Timeout = 15000)]
    public async Task MultiClientConcurrencyTest()
    {
        var clientCount = 5;
        var callsPerClient = 20;
        var clients = new List<ApiClient>();
        var allTasks = new List<Task<Int32>>();

        try
        {
            // 等待前序测试释放资源
            await Task.Delay(100);

            // 先创建所有客户端,等待连接建立
            for (var c = 0; c < clientCount; c++)
            {
                var client = new ApiClient($"tcp://127.0.0.1:{_Port}")
                {
                    Timeout = 5000, // 增加超时时间
                };
                clients.Add(client);
            }

            // 等待连接建立
            await Task.Delay(200);

            // 批量发起请求
            for (var c = 0; c < clientCount; c++)
            {
                var client = clients[c];
                for (var i = 0; i < callsPerClient; i++)
                {
                    var index = c * callsPerClient + i;
                    allTasks.Add(client.InvokeAsync<Int32>("Integration/Add", new { a = index, b = 1 }));
                }
            }

            var results = await Task.WhenAll(allTasks);

            // 验证所有结果
            Assert.Equal(clientCount * callsPerClient, results.Length);
            for (var i = 0; i < results.Length; i++)
            {
                Assert.Equal(i + 1, results[i]);
            }
        }
        finally
        {
            foreach (var client in clients)
            {
                client.TryDispose();
            }
            // 清理后等待资源释放
            await Task.Delay(100);
        }
    }
    #endregion

    #region 连接断开重连测试
    [Fact(DisplayName = "客户端断开后服务端会话清理")]
    public async Task ClientDisconnectSessionCleanupTest()
    {
        // 创建并连接客户端
        var client = new ApiClient($"tcp://127.0.0.1:{_Port}");
        await client.InvokeAsync<String[]>("Api/All");

        Assert.Single(_Server.Server.AllSessions);

        // 关闭客户端
        client.Close("测试断开");
        client.Dispose();

        // 等待服务端检测到断开
        await Task.Delay(500);

        // 会话应被清理
        Assert.Empty(_Server.Server.AllSessions);
    }
    #endregion

    #region 异常场景测试
    [Fact(DisplayName = "参数验证异常")]
    public async Task ParameterValidationTest()
    {
        using var client = new ApiClient($"tcp://127.0.0.1:{_Port}");

        // 缺少必要参数
        var ex = await Assert.ThrowsAsync<ApiException>(
            () => client.InvokeAsync<CalculateResult>("Integration/Calculate", new { a = 10 }));

        // 应返回服务端异常
        Assert.True(ex.Code >= 400);
    }

    [Fact(DisplayName = "除零异常")]
    public async Task DivideByZeroTest()
    {
        using var client = new ApiClient($"tcp://127.0.0.1:{_Port}");

        var ex = await Assert.ThrowsAsync<ApiException>(
            () => client.InvokeAsync<CalculateResult>("Integration/Calculate", new
            {
                a = 10,
                b = 0,
                operation = "div"
            }));

        Assert.Equal(500, ex.Code);
    }
    #endregion

    #region 编码器测试
    [Fact(DisplayName = "Json编码器测试")]
    public async Task JsonEncoderTest()
    {
        using var client = new ApiClient($"tcp://127.0.0.1:{_Port}");

        // 复杂嵌套对象
        var request = new
        {
            user = new { id = 1, name = "Test" },
            items = new[] { "a", "b", "c" },
            count = 3
        };

        var result = await client.InvokeAsync<ComplexResult>("Integration/ProcessComplex", request);

        Assert.NotNull(result);
        Assert.Equal(1, result.UserId);
        Assert.Equal("Test", result.UserName);
        Assert.Equal(3, result.ItemCount);
    }
    #endregion

    #region 服务端处理统计测试
    [Fact(DisplayName = "服务端处理统计")]
    public async Task ServerStatProcessTest()
    {
        // 启用统计
        _Server.StatPeriod = 10;

        using var client = new ApiClient($"tcp://127.0.0.1:{_Port}");

        for (var i = 0; i < 10; i++)
        {
            await client.InvokeAsync<String>("Integration/Greet", new { name = $"User{i}" });
        }

        // 统计应有记录
        Assert.NotNull(_Server.StatProcess);
        Assert.True(_Server.StatProcess.Value >= 10);
    }
    #endregion

    #region 测试辅助类
    class IntegrationController
    {
        public String Greet(String name) => $"Hello, {name}!";

        public Int32 Add(Int32 a, Int32 b) => a + b;

        public CalculateResult Calculate(Int32 a, Int32 b, String operation)
        {
            var value = operation?.ToLower() switch
            {
                "add" => a + b,
                "sub" => a - b,
                "mul" => a * b,
                "div" => a / b,
                _ => throw new ArgumentException($"未知操作: {operation}")
            };

            return new CalculateResult
            {
                Value = value,
                Operation = operation ?? "",
                A = a,
                B = b
            };
        }

        public void Error() => throw new InvalidOperationException("测试异常");

        public IPacket Echo(IPacket pk) => pk;

        public ComplexResult ProcessComplex(IDictionary<String, Object> user, String[] items, Int32 count)
        {
            return new ComplexResult
            {
                UserId = user["id"].ToInt(),
                UserName = user["name"]?.ToString() ?? "",
                ItemCount = items?.Length ?? 0
            };
        }
    }

    class CalculateResult
    {
        public Int32 Value { get; set; }
        public String Operation { get; set; } = "";
        public Int32 A { get; set; }
        public Int32 B { get; set; }
    }

    class ComplexResult
    {
        public Int32 UserId { get; set; }
        public String UserName { get; set; } = "";
        public Int32 ItemCount { get; set; }
    }

    class ReceivableApiClient : ApiClient
    {
        public event Action<String, String?>? MessageReceived;

        public ReceivableApiClient(String uri) : base(uri) { }

        protected override void OnReceive(IMessage message, ApiReceivedEventArgs e)
        {
            base.OnReceive(message, e);

            var action = e.ApiMessage?.Action ?? "";
            var data = e.ApiMessage?.Data?.ToStr();
            MessageReceived?.Invoke(action, data);
        }
    }
    #endregion
}