RPC远程过程调用,二进制封装,提供高吞吐低延迟的高性能RPC框架
大石头 authored at 2022-08-10 13:26:19
6.07 KiB
NewLife.Remoting
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using NewLife;
using NewLife.Log;
using NewLife.Net;
using NewLife.Remoting;
using Xunit;

namespace XUnitTest.Integration;

/// <summary>连接池/集群集成测试</summary>
/// <remarks>
/// 验证 ApiClient 的 UsePool 模式(ClientPoolCluster)和默认单连接模式(ClientSingleCluster)
/// 在实际 RPC 调用中的行为差异,包括连接复用、负载均衡和故障转移。
/// </remarks>
public class ClusterIntegrationTests : DisposeBase
{
    private readonly ApiServer _Server;
    private readonly Int32 _Port;

    public ClusterIntegrationTests()
    {
        _Server = new ApiServer(0)
        {
            Log = XTrace.Log,
            ShowError = true,
        };
        _Server.Register<ClusterTestController>();
        _Server.Start();

        _Port = _Server.Port;
    }

    protected override void Dispose(Boolean disposing)
    {
        base.Dispose(disposing);
        _Server.TryDispose();
    }

    #region 单连接模式
    [Fact(DisplayName = "单连接模式_基本调用")]
    public async Task SingleClusterBasicCallTest()
    {
        using var client = new ApiClient($"tcp://127.0.0.1:{_Port}")
        {
            UsePool = false,
        };

        var result = await client.InvokeAsync<String>("ClusterTest/Echo", new { msg = "Hello" });
        Assert.Equal("Echo:Hello", result);
    }

    [Fact(DisplayName = "单连接模式_多次调用复用连接")]
    public async Task SingleClusterReuseConnectionTest()
    {
        using var client = new ApiClient($"tcp://127.0.0.1:{_Port}")
        {
            UsePool = false,
        };

        // 多次调用使用同一连接
        for (var i = 0; i < 10; i++)
        {
            var result = await client.InvokeAsync<Int32>("ClusterTest/Add", new { a = i, b = 1 });
            Assert.Equal(i + 1, result);
        }

        // 单连接模式应只有一个会话
        Assert.Single(_Server.Server.AllSessions);
    }

    [Fact(DisplayName = "单连接模式_并发调用")]
    public async Task SingleClusterConcurrentCallTest()
    {
        using var client = new ApiClient($"tcp://127.0.0.1:{_Port}")
        {
            UsePool = false,
        };

        var tasks = Enumerable.Range(0, 50)
            .Select(i => client.InvokeAsync<Int32>("ClusterTest/Add", new { a = i, b = 1 }))
            .ToList();

        var results = await Task.WhenAll(tasks);

        for (var i = 0; i < 50; i++)
        {
            Assert.Equal(i + 1, results[i]);
        }
    }
    #endregion

    #region 连接池模式
    [Fact(DisplayName = "连接池模式_基本调用")]
    public async Task PoolClusterBasicCallTest()
    {
        using var client = new ApiClient($"tcp://127.0.0.1:{_Port}")
        {
            UsePool = true,
        };

        var result = await client.InvokeAsync<String>("ClusterTest/Echo", new { msg = "Hello" });
        Assert.Equal("Echo:Hello", result);
    }

    [Fact(DisplayName = "连接池模式_并发调用可复用连接")]
    public async Task PoolClusterConcurrentCallTest()
    {
        using var client = new ApiClient($"tcp://127.0.0.1:{_Port}")
        {
            UsePool = true,
        };

        var tasks = Enumerable.Range(0, 20)
            .Select(i => client.InvokeAsync<Int32>("ClusterTest/Add", new { a = i, b = 1 }))
            .ToList();

        var results = await Task.WhenAll(tasks);

        for (var i = 0; i < 20; i++)
        {
            Assert.Equal(i + 1, results[i]);
        }
    }
    #endregion

    #region 多服务器地址
    [Fact(DisplayName = "多服务器_地址列表")]
    public async Task MultiServerAddressListTest()
    {
        // 创建第二个服务器
        using var server2 = new ApiServer(0) { Log = XTrace.Log, ShowError = true };
        server2.Register<ClusterTestController>();
        server2.Start();

        // 两个服务器地址列表
        var uri1 = $"tcp://127.0.0.1:{_Port}";
        var uri2 = $"tcp://127.0.0.1:{server2.Port}";

        using var client = new ApiClient($"{uri1},{uri2}")
        {
            UsePool = false,
        };

        // 调用应成功(连接到任一服务器)
        var result = await client.InvokeAsync<String>("ClusterTest/Echo", new { msg = "Hello" });
        Assert.Equal("Echo:Hello", result);
    }

    [Fact(DisplayName = "单连接故障转移_不可用地址跳过")]
    public async Task SingleClusterFailoverTest()
    {
        // 第一个地址是不可用的,第二个是可用的
        var badUri = "tcp://127.0.0.1:1"; // 端口1通常不可用
        var goodUri = $"tcp://127.0.0.1:{_Port}";

        using var client = new ApiClient($"{badUri},{goodUri}")
        {
            UsePool = false,
        };

        // 故障转移:跳过不可用地址连接到可用地址
        var result = await client.InvokeAsync<String>("ClusterTest/Echo", new { msg = "Failover" });
        Assert.Equal("Echo:Failover", result);
    }
    #endregion

    #region SetServer动态切换
    [Fact(DisplayName = "SetServer_动态切换服务器地址")]
    public async Task SetServerDynamicSwitchTest()
    {
        using var client = new ApiClient($"tcp://127.0.0.1:{_Port}")
        {
            UsePool = false,
        };

        // 第一次调用
        var result = await client.InvokeAsync<String>("ClusterTest/Echo", new { msg = "First" });
        Assert.Equal("Echo:First", result);

        // 创建新服务器
        using var server2 = new ApiServer(0) { Log = XTrace.Log, ShowError = true };
        server2.Register<ClusterTestController>();
        server2.Start();

        // 动态切换
        client.SetServer($"tcp://127.0.0.1:{server2.Port}");

        // 再次调用应连接到新服务器
        result = await client.InvokeAsync<String>("ClusterTest/Echo", new { msg = "Second" });
        Assert.Equal("Echo:Second", result);
    }
    #endregion

    #region 辅助类
    class ClusterTestController
    {
        public String Echo(String msg) => $"Echo:{msg}";

        public Int32 Add(Int32 a, Int32 b) => a + b;
    }
    #endregion
}