using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using NewLife.NovaDb.Core;
using NewLife.NovaDb.Engine.Flux;
namespace XUnitTest.Engine.Flux;
/// <summary>StreamManager 单元测试</summary>
public class StreamManagerTests : IDisposable
{
private readonly String _testDir;
public StreamManagerTests()
{
_testDir = Path.Combine(Path.GetTempPath(), $"StreamManagerTests_{Guid.NewGuid():N}");
Directory.CreateDirectory(_testDir);
}
public void Dispose()
{
if (Directory.Exists(_testDir))
{
try { Directory.Delete(_testDir, recursive: true); }
catch { }
}
}
private (FluxEngine engine, StreamManager manager) CreateManager()
{
var options = new DbOptions { FluxPartitionHours = 1 };
var engine = new FluxEngine(_testDir, options);
var manager = new StreamManager(engine);
return (engine, manager);
}
[Fact(DisplayName = "测试发布消息")]
public void TestPublish()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var now = DateTime.UtcNow.Ticks;
var entry = new FluxEntry
{
Timestamp = now,
Fields = new Dictionary<String, Object?> { ["msg"] = "hello" }
};
var id = manager.Publish(entry);
Assert.Equal(now, id.Timestamp);
Assert.Equal(0, id.Sequence);
Assert.Equal(1, engine.GetEntryCount());
}
}
[Fact(DisplayName = "测试消息 ID 自增")]
public void TestMessageIdAutoIncrement()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var now = DateTime.UtcNow.Ticks;
var id1 = manager.Publish(new FluxEntry { Timestamp = now, Fields = new Dictionary<String, Object?> { ["v"] = 1 } });
var id2 = manager.Publish(new FluxEntry { Timestamp = now, Fields = new Dictionary<String, Object?> { ["v"] = 2 } });
var id3 = manager.Publish(new FluxEntry { Timestamp = now, Fields = new Dictionary<String, Object?> { ["v"] = 3 } });
Assert.Equal(0, id1.Sequence);
Assert.Equal(1, id2.Sequence);
Assert.Equal(2, id3.Sequence);
}
}
[Fact(DisplayName = "测试创建消费组")]
public void TestCreateConsumerGroup()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
manager.CreateConsumerGroup("group1");
manager.CreateConsumerGroup("group2");
var names = manager.GetConsumerGroupNames();
Assert.Equal(2, names.Count);
Assert.Contains("group1", names);
Assert.Contains("group2", names);
}
}
[Fact(DisplayName = "测试消费组读取")]
public void TestReadGroup()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var now = DateTime.UtcNow.Ticks;
// 发布 5 条消息
for (var i = 0; i < 5; i++)
{
manager.Publish(new FluxEntry
{
Timestamp = now + i,
Fields = new Dictionary<String, Object?> { ["v"] = i }
});
}
manager.CreateConsumerGroup("g1");
// 读取 3 条
var messages = manager.ReadGroup("g1", "consumer1", 3);
Assert.Equal(3, messages.Count);
// 再读取剩余
var remaining = manager.ReadGroup("g1", "consumer1", 10);
Assert.Equal(2, remaining.Count);
// 没有更多消息
var empty = manager.ReadGroup("g1", "consumer1", 10);
Assert.Empty(empty);
}
}
[Fact(DisplayName = "测试确认消息")]
public void TestAcknowledge()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var now = DateTime.UtcNow.Ticks;
var id = manager.Publish(new FluxEntry
{
Timestamp = now,
Fields = new Dictionary<String, Object?> { ["v"] = 1 }
});
manager.CreateConsumerGroup("g1");
manager.ReadGroup("g1", "consumer1", 1);
// 确认前有 1 条 pending
var pending = manager.GetPendingEntries("g1");
Assert.Single(pending);
// 确认消息
var acked = manager.Acknowledge("g1", id);
Assert.True(acked);
// 确认后 pending 为空
pending = manager.GetPendingEntries("g1");
Assert.Empty(pending);
}
}
[Fact(DisplayName = "测试待确认条目")]
public void TestPendingEntries()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var now = DateTime.UtcNow.Ticks;
for (var i = 0; i < 3; i++)
{
manager.Publish(new FluxEntry
{
Timestamp = now + i,
Fields = new Dictionary<String, Object?> { ["v"] = i }
});
}
manager.CreateConsumerGroup("g1");
manager.ReadGroup("g1", "consumer1", 3);
var pending = manager.GetPendingEntries("g1");
Assert.Equal(3, pending.Count);
// 每条 pending 都有消费者信息
foreach (var p in pending)
{
Assert.Equal("consumer1", p.Consumer);
Assert.Equal(1, p.DeliveryCount);
Assert.NotNull(p.Id);
}
}
}
[Fact(DisplayName = "测试消费组不存在异常")]
public void TestConsumerGroupNotFound()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var ex = Assert.Throws<NovaException>(() => manager.ReadGroup("nonexistent", "c1", 1));
Assert.Equal(ErrorCode.ConsumerGroupNotFound, ex.Code);
}
}
[Fact(DisplayName = "测试多消费组独立消费")]
public void TestMultipleConsumerGroups()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var now = DateTime.UtcNow.Ticks;
for (var i = 0; i < 3; i++)
{
manager.Publish(new FluxEntry
{
Timestamp = now + i,
Fields = new Dictionary<String, Object?> { ["v"] = i }
});
}
manager.CreateConsumerGroup("g1");
manager.CreateConsumerGroup("g2");
// 两个组各自独立读取
var g1Messages = manager.ReadGroup("g1", "c1", 10);
var g2Messages = manager.ReadGroup("g2", "c2", 10);
Assert.Equal(3, g1Messages.Count);
Assert.Equal(3, g2Messages.Count);
}
}
#region 阻塞读取测试
[Fact(DisplayName = "阻塞读取-有消息时立即返回")]
public async Task TestBlockingReadWithAvailableMessages()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var now = DateTime.UtcNow.Ticks;
manager.Publish(new FluxEntry
{
Timestamp = now,
Fields = new Dictionary<String, Object?> { ["v"] = 1 }
});
manager.CreateConsumerGroup("g1");
var sw = Stopwatch.StartNew();
var result = await manager.ReadGroupAsync("g1", "c1", 10, TimeSpan.FromSeconds(5));
sw.Stop();
Assert.Single(result);
// 有消息时应立即返回,不应等待
Assert.True(sw.ElapsedMilliseconds < 1000);
}
}
[Fact(DisplayName = "阻塞读取-超时返回空列表")]
public async Task TestBlockingReadTimeout()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
manager.CreateConsumerGroup("g1");
var sw = Stopwatch.StartNew();
var result = await manager.ReadGroupAsync("g1", "c1", 10, TimeSpan.FromMilliseconds(200));
sw.Stop();
Assert.Empty(result);
// 应等待约 200ms
Assert.True(sw.ElapsedMilliseconds >= 150);
}
}
[Fact(DisplayName = "阻塞读取-取消令牌")]
public async Task TestBlockingReadCancellation()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
manager.CreateConsumerGroup("g1");
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
await Assert.ThrowsAsync<OperationCanceledException>(async () =>
{
await manager.ReadGroupAsync("g1", "c1", 10, TimeSpan.FromSeconds(10), cts.Token);
});
}
}
[Fact(DisplayName = "阻塞读取-新消息唤醒")]
public async Task TestBlockingReadWakesOnPublish()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
manager.CreateConsumerGroup("g1");
// 后台延迟发布消息
var publishTask = Task.Run(async () =>
{
await Task.Delay(200);
manager.Publish(new FluxEntry
{
Timestamp = DateTime.UtcNow.Ticks,
Fields = new Dictionary<String, Object?> { ["v"] = "wakeup" }
});
});
var sw = Stopwatch.StartNew();
var result = await manager.ReadGroupAsync("g1", "c1", 10, TimeSpan.FromSeconds(5));
sw.Stop();
await publishTask;
Assert.Single(result);
// 应在约 200ms 后被唤醒,而不是等待 5 秒
Assert.True(sw.ElapsedMilliseconds < 3000);
}
}
#endregion
#region 延迟消息测试
[Fact(DisplayName = "延迟消息-发布和投递")]
public void TestDelayedMessagePublishAndDelivery()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var now = DateTime.UtcNow.Ticks;
var entry = new FluxEntry
{
Timestamp = now,
Fields = new Dictionary<String, Object?> { ["v"] = "delayed" }
};
// 发布延迟消息(延迟 0 秒,立即可投递)
manager.PublishDelayed(entry, TimeSpan.Zero);
// 尚未投递,引擎中无条目
Assert.Equal(0, engine.GetEntryCount());
Assert.Equal(1, manager.GetDelayedMessageCount());
// 投递到期消息
var delivered = manager.DeliverDueMessages();
Assert.Equal(1, delivered);
Assert.Equal(1, engine.GetEntryCount());
Assert.Equal(0, manager.GetDelayedMessageCount());
}
}
[Fact(DisplayName = "定时投递-过去时间立即投递")]
public void TestScheduledMessageDelivery()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var entry = new FluxEntry
{
Timestamp = DateTime.UtcNow.Ticks,
Fields = new Dictionary<String, Object?> { ["v"] = "scheduled" }
};
// 投递时间设为过去
manager.PublishScheduled(entry, DateTime.UtcNow.AddSeconds(-1));
// 投递到期消息
var delivered = manager.DeliverDueMessages();
Assert.Equal(1, delivered);
Assert.Equal(1, engine.GetEntryCount());
}
}
[Fact(DisplayName = "延迟消息-未到期不投递")]
public void TestDelayedMessageNotDeliveredBeforeDue()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var entry = new FluxEntry
{
Timestamp = DateTime.UtcNow.Ticks,
Fields = new Dictionary<String, Object?> { ["v"] = "future" }
};
// 延迟 1 小时
manager.PublishDelayed(entry, TimeSpan.FromHours(1));
var delivered = manager.DeliverDueMessages();
Assert.Equal(0, delivered);
Assert.Equal(0, engine.GetEntryCount());
Assert.Equal(1, manager.GetDelayedMessageCount());
}
}
[Fact(DisplayName = "延迟消息-ReadGroup自动投递到期消息")]
public void TestDelayedMessageAutoDeliverOnRead()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var entry = new FluxEntry
{
Timestamp = DateTime.UtcNow.Ticks,
Fields = new Dictionary<String, Object?> { ["v"] = "auto-deliver" }
};
// 投递时间设为过去,消息即将到期
manager.PublishScheduled(entry, DateTime.UtcNow.AddSeconds(-1));
manager.CreateConsumerGroup("g1");
// ReadGroup 内部会调用 DeliverDueMessages
var messages = manager.ReadGroup("g1", "c1", 10);
Assert.Single(messages);
}
}
#endregion
#region 死信队列测试
[Fact(DisplayName = "死信队列-手动移入和读取")]
public void TestDeadLetterManualMoveAndRead()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var now = DateTime.UtcNow.Ticks;
var id = manager.Publish(new FluxEntry
{
Timestamp = now,
Fields = new Dictionary<String, Object?> { ["v"] = 1 }
});
manager.CreateConsumerGroup("g1");
manager.ReadGroup("g1", "c1", 1);
// 移入死信队列
manager.MoveToDeadLetter("g1", id, "处理失败");
// 原 pending 应被清除
var pending = manager.GetPendingEntries("g1");
Assert.Empty(pending);
// 读取死信
var deadLetters = manager.ReadDeadLetters("g1");
Assert.Single(deadLetters);
Assert.Equal(id, deadLetters[0].Id);
Assert.Equal("处理失败", deadLetters[0].Reason);
Assert.Equal(1, deadLetters[0].FailureCount);
}
}
[Fact(DisplayName = "死信队列-重新投递")]
public void TestDeadLetterRetry()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var now = DateTime.UtcNow.Ticks;
var id = manager.Publish(new FluxEntry
{
Timestamp = now,
Fields = new Dictionary<String, Object?> { ["v"] = 1 }
});
manager.CreateConsumerGroup("g1");
manager.ReadGroup("g1", "c1", 1);
// 移入死信队列
manager.MoveToDeadLetter("g1", id, "error");
// 重新投递
var retried = manager.RetryDeadLetter("g1", id);
Assert.True(retried);
// 死信队列应为空
var deadLetters = manager.ReadDeadLetters("g1");
Assert.Empty(deadLetters);
// 新消息应可被消费
Assert.Equal(2, engine.GetEntryCount());
}
}
[Fact(DisplayName = "死信队列-删除死信")]
public void TestDeadLetterDelete()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var now = DateTime.UtcNow.Ticks;
var id = manager.Publish(new FluxEntry
{
Timestamp = now,
Fields = new Dictionary<String, Object?> { ["v"] = 1 }
});
manager.CreateConsumerGroup("g1");
manager.ReadGroup("g1", "c1", 1);
manager.MoveToDeadLetter("g1", id, "error");
var deleted = manager.DeleteDeadLetter("g1", id);
Assert.True(deleted);
var deadLetters = manager.ReadDeadLetters("g1");
Assert.Empty(deadLetters);
// 删除不存在的死信应返回 false
var deletedAgain = manager.DeleteDeadLetter("g1", id);
Assert.False(deletedAgain);
}
}
[Fact(DisplayName = "死信队列-ProcessExpiredPending按重试次数移入")]
public void TestProcessExpiredPendingByRetryCount()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
manager.MaxRetryCount = 2;
var now = DateTime.UtcNow.Ticks;
var id = manager.Publish(new FluxEntry
{
Timestamp = now,
Fields = new Dictionary<String, Object?> { ["v"] = 1 }
});
manager.CreateConsumerGroup("g1");
manager.ReadGroup("g1", "c1", 1);
// 模拟多次投递:手动增加 DeliveryCount
var pending = manager.GetPendingEntries("g1");
Assert.Single(pending);
Assert.Equal(1, pending[0].DeliveryCount);
// 通过 ConsumerGroup.AddPending 增加投递次数
// 这里我们直接修改 pending entry 来模拟(因为 ReadGroup 不会重复投递同一条已投递消息)
// 实际上需要通过 AddPending 来增加,但 StreamManager 的 ReadGroup 跳过已投递消息
// 所以我们通过 ProcessExpiredPending 的 maxAge 来测试
var moved = manager.ProcessExpiredPending("g1", TimeSpan.Zero);
// maxAge=0 表示所有 pending 都已过期(因为投递时间已在过去)
Assert.Equal(1, moved);
var deadLetters = manager.ReadDeadLetters("g1");
Assert.Single(deadLetters);
Assert.Contains("age exceeded", deadLetters[0].Reason);
}
}
[Fact(DisplayName = "死信队列-ProcessExpiredPending按年龄移入")]
public void TestProcessExpiredPendingByAge()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var now = DateTime.UtcNow.Ticks;
for (var i = 0; i < 3; i++)
{
manager.Publish(new FluxEntry
{
Timestamp = now + i,
Fields = new Dictionary<String, Object?> { ["v"] = i }
});
}
manager.CreateConsumerGroup("g1");
manager.ReadGroup("g1", "c1", 3);
// 所有消息 age > 0,使用 maxAge=0 应全部移入死信
var moved = manager.ProcessExpiredPending("g1", TimeSpan.Zero);
Assert.Equal(3, moved);
var pending = manager.GetPendingEntries("g1");
Assert.Empty(pending);
var deadLetters = manager.ReadDeadLetters("g1");
Assert.Equal(3, deadLetters.Count);
}
}
[Fact(DisplayName = "死信队列-未过期不处理")]
public void TestProcessExpiredPendingNotExpired()
{
var (engine, manager) = CreateManager();
using (engine)
using (manager)
{
var now = DateTime.UtcNow.Ticks;
manager.Publish(new FluxEntry
{
Timestamp = now,
Fields = new Dictionary<String, Object?> { ["v"] = 1 }
});
manager.CreateConsumerGroup("g1");
manager.ReadGroup("g1", "c1", 1);
// 使用足够长的 maxAge,消息不应被移入死信
var moved = manager.ProcessExpiredPending("g1", TimeSpan.FromHours(1));
Assert.Equal(0, moved);
var pending = manager.GetPendingEntries("g1");
Assert.Single(pending);
}
}
#endregion
}
|