refactor: rename NovaDbXxx to NovaXxx across entire project
copilot-swe-agent[bot] authored at 2026-02-18 20:12:17
19.82 KiB
NewLife.NovaDb
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using NewLife.NovaDb.Core;
using NewLife.NovaDb.Engine.Flux;

namespace XUnitTest.Engine.Flux;

/// <summary>StreamManager 单元测试</summary>
public class StreamManagerTests : IDisposable
{
    private readonly String _testDir;

    public StreamManagerTests()
    {
        _testDir = Path.Combine(Path.GetTempPath(), $"StreamManagerTests_{Guid.NewGuid():N}");
        Directory.CreateDirectory(_testDir);
    }

    public void Dispose()
    {
        if (Directory.Exists(_testDir))
        {
            try { Directory.Delete(_testDir, recursive: true); }
            catch { }
        }
    }

    private (FluxEngine engine, StreamManager manager) CreateManager()
    {
        var options = new DbOptions { FluxPartitionHours = 1 };
        var engine = new FluxEngine(_testDir, options);
        var manager = new StreamManager(engine);
        return (engine, manager);
    }

    [Fact(DisplayName = "测试发布消息")]
    public void TestPublish()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var now = DateTime.UtcNow.Ticks;
            var entry = new FluxEntry
            {
                Timestamp = now,
                Fields = new Dictionary<String, Object?> { ["msg"] = "hello" }
            };

            var id = manager.Publish(entry);

            Assert.Equal(now, id.Timestamp);
            Assert.Equal(0, id.Sequence);
            Assert.Equal(1, engine.GetEntryCount());
        }
    }

    [Fact(DisplayName = "测试消息 ID 自增")]
    public void TestMessageIdAutoIncrement()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var now = DateTime.UtcNow.Ticks;

            var id1 = manager.Publish(new FluxEntry { Timestamp = now, Fields = new Dictionary<String, Object?> { ["v"] = 1 } });
            var id2 = manager.Publish(new FluxEntry { Timestamp = now, Fields = new Dictionary<String, Object?> { ["v"] = 2 } });
            var id3 = manager.Publish(new FluxEntry { Timestamp = now, Fields = new Dictionary<String, Object?> { ["v"] = 3 } });

            Assert.Equal(0, id1.Sequence);
            Assert.Equal(1, id2.Sequence);
            Assert.Equal(2, id3.Sequence);
        }
    }

    [Fact(DisplayName = "测试创建消费组")]
    public void TestCreateConsumerGroup()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            manager.CreateConsumerGroup("group1");
            manager.CreateConsumerGroup("group2");

            var names = manager.GetConsumerGroupNames();
            Assert.Equal(2, names.Count);
            Assert.Contains("group1", names);
            Assert.Contains("group2", names);
        }
    }

    [Fact(DisplayName = "测试消费组读取")]
    public void TestReadGroup()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var now = DateTime.UtcNow.Ticks;

            // 发布 5 条消息
            for (var i = 0; i < 5; i++)
            {
                manager.Publish(new FluxEntry
                {
                    Timestamp = now + i,
                    Fields = new Dictionary<String, Object?> { ["v"] = i }
                });
            }

            manager.CreateConsumerGroup("g1");

            // 读取 3 条
            var messages = manager.ReadGroup("g1", "consumer1", 3);
            Assert.Equal(3, messages.Count);

            // 再读取剩余
            var remaining = manager.ReadGroup("g1", "consumer1", 10);
            Assert.Equal(2, remaining.Count);

            // 没有更多消息
            var empty = manager.ReadGroup("g1", "consumer1", 10);
            Assert.Empty(empty);
        }
    }

    [Fact(DisplayName = "测试确认消息")]
    public void TestAcknowledge()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var now = DateTime.UtcNow.Ticks;
            var id = manager.Publish(new FluxEntry
            {
                Timestamp = now,
                Fields = new Dictionary<String, Object?> { ["v"] = 1 }
            });

            manager.CreateConsumerGroup("g1");
            manager.ReadGroup("g1", "consumer1", 1);

            // 确认前有 1 条 pending
            var pending = manager.GetPendingEntries("g1");
            Assert.Single(pending);

            // 确认消息
            var acked = manager.Acknowledge("g1", id);
            Assert.True(acked);

            // 确认后 pending 为空
            pending = manager.GetPendingEntries("g1");
            Assert.Empty(pending);
        }
    }

    [Fact(DisplayName = "测试待确认条目")]
    public void TestPendingEntries()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var now = DateTime.UtcNow.Ticks;

            for (var i = 0; i < 3; i++)
            {
                manager.Publish(new FluxEntry
                {
                    Timestamp = now + i,
                    Fields = new Dictionary<String, Object?> { ["v"] = i }
                });
            }

            manager.CreateConsumerGroup("g1");
            manager.ReadGroup("g1", "consumer1", 3);

            var pending = manager.GetPendingEntries("g1");
            Assert.Equal(3, pending.Count);

            // 每条 pending 都有消费者信息
            foreach (var p in pending)
            {
                Assert.Equal("consumer1", p.Consumer);
                Assert.Equal(1, p.DeliveryCount);
                Assert.NotNull(p.Id);
            }
        }
    }

    [Fact(DisplayName = "测试消费组不存在异常")]
    public void TestConsumerGroupNotFound()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var ex = Assert.Throws<NovaException>(() => manager.ReadGroup("nonexistent", "c1", 1));
            Assert.Equal(ErrorCode.ConsumerGroupNotFound, ex.Code);
        }
    }

    [Fact(DisplayName = "测试多消费组独立消费")]
    public void TestMultipleConsumerGroups()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var now = DateTime.UtcNow.Ticks;

            for (var i = 0; i < 3; i++)
            {
                manager.Publish(new FluxEntry
                {
                    Timestamp = now + i,
                    Fields = new Dictionary<String, Object?> { ["v"] = i }
                });
            }

            manager.CreateConsumerGroup("g1");
            manager.CreateConsumerGroup("g2");

            // 两个组各自独立读取
            var g1Messages = manager.ReadGroup("g1", "c1", 10);
            var g2Messages = manager.ReadGroup("g2", "c2", 10);

            Assert.Equal(3, g1Messages.Count);
            Assert.Equal(3, g2Messages.Count);
        }
    }

    #region 阻塞读取测试
    [Fact(DisplayName = "阻塞读取-有消息时立即返回")]
    public async Task TestBlockingReadWithAvailableMessages()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var now = DateTime.UtcNow.Ticks;
            manager.Publish(new FluxEntry
            {
                Timestamp = now,
                Fields = new Dictionary<String, Object?> { ["v"] = 1 }
            });

            manager.CreateConsumerGroup("g1");

            var sw = Stopwatch.StartNew();
            var result = await manager.ReadGroupAsync("g1", "c1", 10, TimeSpan.FromSeconds(5));
            sw.Stop();

            Assert.Single(result);
            // 有消息时应立即返回,不应等待
            Assert.True(sw.ElapsedMilliseconds < 1000);
        }
    }

    [Fact(DisplayName = "阻塞读取-超时返回空列表")]
    public async Task TestBlockingReadTimeout()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            manager.CreateConsumerGroup("g1");

            var sw = Stopwatch.StartNew();
            var result = await manager.ReadGroupAsync("g1", "c1", 10, TimeSpan.FromMilliseconds(200));
            sw.Stop();

            Assert.Empty(result);
            // 应等待约 200ms
            Assert.True(sw.ElapsedMilliseconds >= 150);
        }
    }

    [Fact(DisplayName = "阻塞读取-取消令牌")]
    public async Task TestBlockingReadCancellation()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            manager.CreateConsumerGroup("g1");

            using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));

            await Assert.ThrowsAsync<OperationCanceledException>(async () =>
            {
                await manager.ReadGroupAsync("g1", "c1", 10, TimeSpan.FromSeconds(10), cts.Token);
            });
        }
    }

    [Fact(DisplayName = "阻塞读取-新消息唤醒")]
    public async Task TestBlockingReadWakesOnPublish()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            manager.CreateConsumerGroup("g1");

            // 后台延迟发布消息
            var publishTask = Task.Run(async () =>
            {
                await Task.Delay(200);
                manager.Publish(new FluxEntry
                {
                    Timestamp = DateTime.UtcNow.Ticks,
                    Fields = new Dictionary<String, Object?> { ["v"] = "wakeup" }
                });
            });

            var sw = Stopwatch.StartNew();
            var result = await manager.ReadGroupAsync("g1", "c1", 10, TimeSpan.FromSeconds(5));
            sw.Stop();

            await publishTask;

            Assert.Single(result);
            // 应在约 200ms 后被唤醒,而不是等待 5 秒
            Assert.True(sw.ElapsedMilliseconds < 3000);
        }
    }
    #endregion

    #region 延迟消息测试
    [Fact(DisplayName = "延迟消息-发布和投递")]
    public void TestDelayedMessagePublishAndDelivery()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var now = DateTime.UtcNow.Ticks;
            var entry = new FluxEntry
            {
                Timestamp = now,
                Fields = new Dictionary<String, Object?> { ["v"] = "delayed" }
            };

            // 发布延迟消息(延迟 0 秒,立即可投递)
            manager.PublishDelayed(entry, TimeSpan.Zero);

            // 尚未投递,引擎中无条目
            Assert.Equal(0, engine.GetEntryCount());
            Assert.Equal(1, manager.GetDelayedMessageCount());

            // 投递到期消息
            var delivered = manager.DeliverDueMessages();
            Assert.Equal(1, delivered);
            Assert.Equal(1, engine.GetEntryCount());
            Assert.Equal(0, manager.GetDelayedMessageCount());
        }
    }

    [Fact(DisplayName = "定时投递-过去时间立即投递")]
    public void TestScheduledMessageDelivery()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var entry = new FluxEntry
            {
                Timestamp = DateTime.UtcNow.Ticks,
                Fields = new Dictionary<String, Object?> { ["v"] = "scheduled" }
            };

            // 投递时间设为过去
            manager.PublishScheduled(entry, DateTime.UtcNow.AddSeconds(-1));

            // 投递到期消息
            var delivered = manager.DeliverDueMessages();
            Assert.Equal(1, delivered);
            Assert.Equal(1, engine.GetEntryCount());
        }
    }

    [Fact(DisplayName = "延迟消息-未到期不投递")]
    public void TestDelayedMessageNotDeliveredBeforeDue()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var entry = new FluxEntry
            {
                Timestamp = DateTime.UtcNow.Ticks,
                Fields = new Dictionary<String, Object?> { ["v"] = "future" }
            };

            // 延迟 1 小时
            manager.PublishDelayed(entry, TimeSpan.FromHours(1));

            var delivered = manager.DeliverDueMessages();
            Assert.Equal(0, delivered);
            Assert.Equal(0, engine.GetEntryCount());
            Assert.Equal(1, manager.GetDelayedMessageCount());
        }
    }

    [Fact(DisplayName = "延迟消息-ReadGroup自动投递到期消息")]
    public void TestDelayedMessageAutoDeliverOnRead()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var entry = new FluxEntry
            {
                Timestamp = DateTime.UtcNow.Ticks,
                Fields = new Dictionary<String, Object?> { ["v"] = "auto-deliver" }
            };

            // 投递时间设为过去,消息即将到期
            manager.PublishScheduled(entry, DateTime.UtcNow.AddSeconds(-1));

            manager.CreateConsumerGroup("g1");

            // ReadGroup 内部会调用 DeliverDueMessages
            var messages = manager.ReadGroup("g1", "c1", 10);
            Assert.Single(messages);
        }
    }
    #endregion

    #region 死信队列测试
    [Fact(DisplayName = "死信队列-手动移入和读取")]
    public void TestDeadLetterManualMoveAndRead()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var now = DateTime.UtcNow.Ticks;
            var id = manager.Publish(new FluxEntry
            {
                Timestamp = now,
                Fields = new Dictionary<String, Object?> { ["v"] = 1 }
            });

            manager.CreateConsumerGroup("g1");
            manager.ReadGroup("g1", "c1", 1);

            // 移入死信队列
            manager.MoveToDeadLetter("g1", id, "处理失败");

            // 原 pending 应被清除
            var pending = manager.GetPendingEntries("g1");
            Assert.Empty(pending);

            // 读取死信
            var deadLetters = manager.ReadDeadLetters("g1");
            Assert.Single(deadLetters);
            Assert.Equal(id, deadLetters[0].Id);
            Assert.Equal("处理失败", deadLetters[0].Reason);
            Assert.Equal(1, deadLetters[0].FailureCount);
        }
    }

    [Fact(DisplayName = "死信队列-重新投递")]
    public void TestDeadLetterRetry()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var now = DateTime.UtcNow.Ticks;
            var id = manager.Publish(new FluxEntry
            {
                Timestamp = now,
                Fields = new Dictionary<String, Object?> { ["v"] = 1 }
            });

            manager.CreateConsumerGroup("g1");
            manager.ReadGroup("g1", "c1", 1);

            // 移入死信队列
            manager.MoveToDeadLetter("g1", id, "error");

            // 重新投递
            var retried = manager.RetryDeadLetter("g1", id);
            Assert.True(retried);

            // 死信队列应为空
            var deadLetters = manager.ReadDeadLetters("g1");
            Assert.Empty(deadLetters);

            // 新消息应可被消费
            Assert.Equal(2, engine.GetEntryCount());
        }
    }

    [Fact(DisplayName = "死信队列-删除死信")]
    public void TestDeadLetterDelete()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var now = DateTime.UtcNow.Ticks;
            var id = manager.Publish(new FluxEntry
            {
                Timestamp = now,
                Fields = new Dictionary<String, Object?> { ["v"] = 1 }
            });

            manager.CreateConsumerGroup("g1");
            manager.ReadGroup("g1", "c1", 1);
            manager.MoveToDeadLetter("g1", id, "error");

            var deleted = manager.DeleteDeadLetter("g1", id);
            Assert.True(deleted);

            var deadLetters = manager.ReadDeadLetters("g1");
            Assert.Empty(deadLetters);

            // 删除不存在的死信应返回 false
            var deletedAgain = manager.DeleteDeadLetter("g1", id);
            Assert.False(deletedAgain);
        }
    }

    [Fact(DisplayName = "死信队列-ProcessExpiredPending按重试次数移入")]
    public void TestProcessExpiredPendingByRetryCount()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            manager.MaxRetryCount = 2;

            var now = DateTime.UtcNow.Ticks;
            var id = manager.Publish(new FluxEntry
            {
                Timestamp = now,
                Fields = new Dictionary<String, Object?> { ["v"] = 1 }
            });

            manager.CreateConsumerGroup("g1");
            manager.ReadGroup("g1", "c1", 1);

            // 模拟多次投递:手动增加 DeliveryCount
            var pending = manager.GetPendingEntries("g1");
            Assert.Single(pending);
            Assert.Equal(1, pending[0].DeliveryCount);

            // 通过 ConsumerGroup.AddPending 增加投递次数
            // 这里我们直接修改 pending entry 来模拟(因为 ReadGroup 不会重复投递同一条已投递消息)
            // 实际上需要通过 AddPending 来增加,但 StreamManager 的 ReadGroup 跳过已投递消息
            // 所以我们通过 ProcessExpiredPending 的 maxAge 来测试
            var moved = manager.ProcessExpiredPending("g1", TimeSpan.Zero);

            // maxAge=0 表示所有 pending 都已过期(因为投递时间已在过去)
            Assert.Equal(1, moved);

            var deadLetters = manager.ReadDeadLetters("g1");
            Assert.Single(deadLetters);
            Assert.Contains("age exceeded", deadLetters[0].Reason);
        }
    }

    [Fact(DisplayName = "死信队列-ProcessExpiredPending按年龄移入")]
    public void TestProcessExpiredPendingByAge()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var now = DateTime.UtcNow.Ticks;

            for (var i = 0; i < 3; i++)
            {
                manager.Publish(new FluxEntry
                {
                    Timestamp = now + i,
                    Fields = new Dictionary<String, Object?> { ["v"] = i }
                });
            }

            manager.CreateConsumerGroup("g1");
            manager.ReadGroup("g1", "c1", 3);

            // 所有消息 age > 0,使用 maxAge=0 应全部移入死信
            var moved = manager.ProcessExpiredPending("g1", TimeSpan.Zero);
            Assert.Equal(3, moved);

            var pending = manager.GetPendingEntries("g1");
            Assert.Empty(pending);

            var deadLetters = manager.ReadDeadLetters("g1");
            Assert.Equal(3, deadLetters.Count);
        }
    }

    [Fact(DisplayName = "死信队列-未过期不处理")]
    public void TestProcessExpiredPendingNotExpired()
    {
        var (engine, manager) = CreateManager();
        using (engine)
        using (manager)
        {
            var now = DateTime.UtcNow.Ticks;
            manager.Publish(new FluxEntry
            {
                Timestamp = now,
                Fields = new Dictionary<String, Object?> { ["v"] = 1 }
            });

            manager.CreateConsumerGroup("g1");
            manager.ReadGroup("g1", "c1", 1);

            // 使用足够长的 maxAge,消息不应被移入死信
            var moved = manager.ProcessExpiredPending("g1", TimeSpan.FromHours(1));
            Assert.Equal(0, moved);

            var pending = manager.GetPendingEntries("g1");
            Assert.Single(pending);
        }
    }
    #endregion
}