refactor: rename NovaDbXxx to NovaXxx across entire project
copilot-swe-agent[bot] authored at 2026-02-18 20:12:17
7.36 KiB
NewLife.NovaDb
using System;
using System.Collections.Generic;
using NewLife.NovaDb.Cluster;
using NewLife.NovaDb.WAL;
using Xunit;

namespace XUnitTest.Cluster;

public class ReplicaClientTests : IDisposable
{
    private readonly ReplicaClient _client;
    private readonly NodeInfo _localNode;

    public ReplicaClientTests()
    {
        _localNode = new NodeInfo
        {
            NodeId = "slave-1",
            Endpoint = "127.0.0.1:9001",
            Role = NodeRole.Slave
        };
        _client = new ReplicaClient(_localNode, "127.0.0.1:9000");
    }

    public void Dispose()
    {
        _client.Dispose();
    }

    [Fact(DisplayName = "测试创建从节点客户端")]
    public void TestCreateReplicaClient()
    {
        Assert.NotNull(_client);
        Assert.Equal("slave-1", _client.LocalNode.NodeId);
        Assert.Equal("127.0.0.1:9000", _client.MasterEndpoint);
        Assert.Equal(0UL, _client.LastAppliedLsn);
        Assert.False(_client.IsConnected);
        Assert.Equal(0, _client.AppliedRecordCount);
    }

    [Fact(DisplayName = "测试连接和断开")]
    public void TestConnectAndDisconnect()
    {
        _client.Connect();
        Assert.True(_client.IsConnected);
        Assert.Equal(NodeState.Syncing, _localNode.State);

        _client.Disconnect();
        Assert.False(_client.IsConnected);
        Assert.Equal(NodeState.Offline, _localNode.State);
    }

    [Fact(DisplayName = "测试应用 WAL 记录")]
    public void TestApplyRecords()
    {
        var records = new List<WalRecord>
        {
            new() { Lsn = 1, RecordType = WalRecordType.UpdatePage, PageId = 10, Data = new Byte[] { 1, 2, 3 } },
            new() { Lsn = 2, RecordType = WalRecordType.CommitTx, TxId = 1, Data = Array.Empty<Byte>() },
            new() { Lsn = 3, RecordType = WalRecordType.UpdatePage, PageId = 20, Data = new Byte[] { 4, 5, 6 } }
        };

        _client.ApplyRecords(records);

        Assert.Equal(3UL, _client.LastAppliedLsn);
        Assert.Equal(3, _client.AppliedRecordCount);
        Assert.Equal(3UL, _localNode.ReplicatedLsn);
    }

    [Fact(DisplayName = "测试幂等应用")]
    public void TestIdempotentApply()
    {
        var records = new List<WalRecord>
        {
            new() { Lsn = 1, RecordType = WalRecordType.UpdatePage, PageId = 10, Data = new Byte[] { 1, 2 } },
            new() { Lsn = 2, RecordType = WalRecordType.UpdatePage, PageId = 20, Data = new Byte[] { 3, 4 } }
        };

        _client.ApplyRecords(records);
        Assert.Equal(2, _client.AppliedRecordCount);

        // 重复应用相同记录,不应增加计数
        _client.ApplyRecords(records);
        Assert.Equal(2, _client.AppliedRecordCount);
        Assert.Equal(2UL, _client.LastAppliedLsn);
    }

    [Fact(DisplayName = "测试断点续传")]
    public void TestResumeFromCheckpoint()
    {
        var batch1 = new List<WalRecord>
        {
            new() { Lsn = 1, RecordType = WalRecordType.UpdatePage, PageId = 10, Data = new Byte[] { 1 } },
            new() { Lsn = 2, RecordType = WalRecordType.UpdatePage, PageId = 20, Data = new Byte[] { 2 } }
        };

        _client.ApplyRecords(batch1);
        Assert.Equal(2UL, _client.GetResumePosition());

        // 从断点续传
        var batch2 = new List<WalRecord>
        {
            new() { Lsn = 2, RecordType = WalRecordType.UpdatePage, PageId = 20, Data = new Byte[] { 2 } },
            new() { Lsn = 3, RecordType = WalRecordType.UpdatePage, PageId = 30, Data = new Byte[] { 3 } }
        };

        _client.ApplyRecords(batch2);
        Assert.Equal(3UL, _client.GetResumePosition());
        Assert.Equal(3, _client.AppliedRecordCount);
    }

    [Fact(DisplayName = "测试重置 LSN")]
    public void TestResetToLsn()
    {
        var records = new List<WalRecord>
        {
            new() { Lsn = 1, RecordType = WalRecordType.UpdatePage, PageId = 10, Data = new Byte[] { 1 } },
            new() { Lsn = 2, RecordType = WalRecordType.UpdatePage, PageId = 20, Data = new Byte[] { 2 } }
        };

        _client.ApplyRecords(records);
        Assert.Equal(2UL, _client.LastAppliedLsn);

        _client.ResetToLsn(0);
        Assert.Equal(0UL, _client.LastAppliedLsn);
        Assert.Equal(0UL, _localNode.ReplicatedLsn);
    }

    [Fact(DisplayName = "测试应用回调")]
    public void TestApplyCallback()
    {
        var appliedPages = new Dictionary<UInt64, Byte[]>();
        using var client = new ReplicaClient(
            new NodeInfo { NodeId = "slave-2", Endpoint = "127.0.0.1:9002", Role = NodeRole.Slave },
            "127.0.0.1:9000",
            (pageId, data) => appliedPages[pageId] = data
        );

        var records = new List<WalRecord>
        {
            new() { Lsn = 1, RecordType = WalRecordType.UpdatePage, PageId = 10, Data = new Byte[] { 1, 2, 3 } },
            new() { Lsn = 2, RecordType = WalRecordType.CommitTx, TxId = 1, Data = Array.Empty<Byte>() },
            new() { Lsn = 3, RecordType = WalRecordType.UpdatePage, PageId = 20, Data = new Byte[] { 4, 5, 6 } }
        };

        client.ApplyRecords(records);

        // 只有 UpdatePage 类型会触发回调
        Assert.Equal(2, appliedPages.Count);
        Assert.True(appliedPages.ContainsKey(10));
        Assert.True(appliedPages.ContainsKey(20));
        Assert.Equal(new Byte[] { 1, 2, 3 }, appliedPages[10]);
        Assert.Equal(new Byte[] { 4, 5, 6 }, appliedPages[20]);
    }

    [Fact(DisplayName = "测试端到端复制")]
    public void TestEndToEndReplication()
    {
        // 模拟主节点
        var masterInfo = new NodeInfo { NodeId = "master-1", Endpoint = "127.0.0.1:9000", Role = NodeRole.Master };
        using var manager = new ReplicationManager("/tmp/e2e.wal", masterInfo);

        // 模拟从节点
        var slaveNode = new NodeInfo { NodeId = "slave-1", Endpoint = "127.0.0.1:9001", Role = NodeRole.Slave };
        var appliedPages = new Dictionary<UInt64, Byte[]>();
        using var replica = new ReplicaClient(
            slaveNode,
            "127.0.0.1:9000",
            (pageId, data) => appliedPages[pageId] = data
        );

        // 注册从节点
        manager.RegisterSlave(new NodeInfo { NodeId = "slave-1", Endpoint = "127.0.0.1:9001", Role = NodeRole.Slave });

        // 主节点写入
        manager.AppendRecord(new WalRecord { RecordType = WalRecordType.BeginTx, TxId = 1, Data = Array.Empty<Byte>() });
        manager.AppendRecord(new WalRecord { RecordType = WalRecordType.UpdatePage, PageId = 100, Data = new Byte[] { 10, 20, 30 } });
        manager.AppendRecord(new WalRecord { RecordType = WalRecordType.CommitTx, TxId = 1, Data = Array.Empty<Byte>() });

        Assert.Equal(3UL, manager.MasterLsn);
        Assert.Equal(3UL, manager.GetReplicationLag("slave-1"));

        // 从节点拉取并应用
        var pending = manager.GetPendingRecords("slave-1");
        Assert.Equal(3, pending.Count);

        replica.Connect();
        replica.ApplyRecords(pending);

        Assert.Equal(3UL, replica.LastAppliedLsn);
        Assert.True(appliedPages.ContainsKey(100));
        Assert.Equal(new Byte[] { 10, 20, 30 }, appliedPages[100]);

        // 确认复制
        manager.AcknowledgeReplication("slave-1", replica.LastAppliedLsn);

        Assert.Equal(0UL, manager.GetReplicationLag("slave-1"));
        Assert.True(manager.IsFullySynced(3));

        // 清理缓冲区
        var cleaned = manager.CleanupBuffer();
        Assert.Equal(3, cleaned);
    }
}