using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NewLife;
using NewLife.Caching;
using NewLife.Log;
using NewLife.Security;
using NewLife.Serialization;
using Xunit;
namespace XUnitTest.Queues;
//[Collection("Queue")]
public class ReliableQueueTests
{
private readonly FullRedis _redis;
public ReliableQueueTests()
{
var config = BasicTest.GetConfig();
_redis = new FullRedis();
_redis.Init(config);
_redis.Log = XTrace.Log;
#if DEBUG
_redis.ClientLog = XTrace.Log;
#endif
}
[Fact]
public void Queue_Normal()
{
var key = "ReliableQueue";
// 删除已有
_redis.Remove(key);
var queue = _redis.GetReliableQueue<String>(key);
queue.RetryInterval = 5;
// 发现回滚
var rcount = queue.RollbackAllAck();
if (rcount > 0)
{
XTrace.WriteLine("回滚:{0}", rcount);
Assert.Equal(rcount, queue.Count);
var rcount2 = _redis.Remove(key);
Assert.Equal(1, rcount2);
}
// 取出个数
var count = queue.Count;
Assert.True(queue.IsEmpty);
Assert.Equal(0, count);
// 添加
var vs = new[] { "1234", "abcd", "新生命团队", "ABEF" };
foreach (var item in vs)
queue.Add(item);
// 取出来
var vs2 = new[] { queue.TakeOne(), queue.TakeOne(), queue.TakeOne(), };
Assert.Equal(3, vs2.Length);
Assert.Equal("1234", vs2[0]);
Assert.Equal("abcd", vs2[1]);
Assert.Equal("新生命团队", vs2[2]);
Assert.Equal(1, queue.Count);
// 检查Ack队列
var ackList = _redis.GetList<String>(queue.AckKey);
Assert.Equal(vs2.Length, ackList.Count);
// 确认两个,留下一个未确认消息在Ack队列
var rs = queue.Acknowledge(vs2[0]) + queue.Acknowledge(vs2[1]);
Assert.Equal(2, rs);
Assert.Single(ackList);
// 捞出来Ack最后一个
var vs3 = queue.TakeAck(3).ToArray();
Assert.Empty(ackList);
Assert.Single(vs3);
Assert.Equal("新生命团队", vs3[0]);
// 读取队列最后一个,但不确认,留给下一次回滚用
var v4 = queue.TakeOne();
Assert.NotNull(v4);
// 现在队列没有数据,再取一个
var v5 = queue.TakeOne(1);
Assert.Null(v5);
}
[Fact]
public void Queue_Batch()
{
var key = "ReliableQueue_batch";
// 删除已有
_redis.Remove(key);
var queue = _redis.GetReliableQueue<String>(key);
queue.RetryInterval = 5;
// 发现回滚
var rcount = queue.RollbackAllAck();
if (rcount > 0)
{
XTrace.WriteLine("回滚:{0}", rcount);
Assert.Equal(rcount, queue.Count);
var rcount2 = _redis.Remove(key);
Assert.Equal(1, rcount2);
}
// 取出个数
var count = queue.Count;
Assert.True(queue.IsEmpty);
Assert.Equal(0, count);
// 添加
var vs = new[] { "1234", "abcd", "新生命团队", "ABEF" };
queue.Add(vs);
// 取出来
var vs2 = queue.Take(3).ToArray();
Assert.Equal(3, vs2.Length);
Assert.Equal("1234", vs2[0]);
Assert.Equal("abcd", vs2[1]);
Assert.Equal("新生命团队", vs2[2]);
Assert.Equal(1, queue.Count);
// 检查确认队列
var q2 = _redis.GetList<String>(queue.AckKey);
Assert.Equal(vs2.Length, q2.Count);
// 确认两个
var rs = queue.Acknowledge(vs2.Take(2).ToArray());
Assert.Equal(2, rs);
Assert.Single(q2);
// 捞出来Ack最后一个
var vs3 = queue.TakeAck(3).ToArray();
Assert.Empty(q2);
Assert.Single(vs3);
Assert.Equal("新生命团队", vs3[0]);
// 读取队列最后一个,但不确认,留给下一次回滚用
var vs4 = queue.Take(4).ToArray();
Assert.Single(vs4);
}
[Fact]
public void Queue_Block()
{
var key = "ReliableQueue_block";
// 删除已有
_redis.Remove(key);
var queue = _redis.GetReliableQueue<String>(key);
// 回滚死信,然后清空
var dead = queue.RollbackAllAck();
if (dead > 0) _redis.Remove(key);
// 取出个数
var count = queue.Count;
Assert.True(queue.IsEmpty);
Assert.Equal(0, count);
// 添加
var vs = new[] { "1234", "abcd", "新生命团队", "ABEF" };
foreach (var item in vs)
queue.Add(item);
// 对比个数
var count2 = queue.Count;
Assert.False(queue.IsEmpty);
Assert.Equal(vs.Length, count2);
// 取出来
Assert.Equal(vs[0], queue.TakeOne());
Assert.Equal(vs[1], queue.TakeOne());
Assert.Equal(vs[2], queue.TakeOne());
Assert.Equal(vs[3], queue.TakeOne());
queue.Acknowledge(vs);
// 延迟2秒生产消息
ThreadPool.QueueUserWorkItem(s => { Thread.Sleep(2000); queue.Add("xxyy"); });
var sw = Stopwatch.StartNew();
var rs = queue.TakeOne(3);
sw.Stop();
Assert.Equal("xxyy", rs);
Assert.True(sw.ElapsedMilliseconds >= 2000);
}
[Fact]
public void Queue_NotEnough()
{
var key = "ReliableQueue_not_enough";
// 删除已有
_redis.Remove(key);
var queue = _redis.GetReliableQueue<String>(key);
// 回滚死信,然后清空
var dead = queue.RollbackAllAck();
if (dead > 0) _redis.Remove(key);
// 取出个数
var count = queue.Count;
Assert.True(queue.IsEmpty);
Assert.Equal(0, count);
// 添加
var vs = new[] { "1234", "abcd" };
queue.Add(vs);
// 取出来
var vs2 = queue.Take(3).ToArray();
Assert.Equal(2, vs2.Length);
Assert.Equal("1234", vs2[0]);
Assert.Equal("abcd", vs2[1]);
queue.Acknowledge(vs2);
// 再取,这个时候已经没有元素
var vs4 = queue.Take(3).ToArray();
Assert.Empty(vs4);
// 管道批量获取
var vs3 = queue.Take(5).ToArray();
Assert.Empty(vs3);
// 对比个数
var count3 = queue.Count;
Assert.True(queue.IsEmpty);
Assert.Equal(count, count3);
}
/// <summary>AckKey独一无二,一百万个key测试</summary>
[Fact]
public void UniqueAckKey()
{
var key = "ReliableQueue_unique";
var hash = new HashSet<String>();
for (var i = 0; i < 1_000_000; i++)
{
var q = _redis.GetReliableQueue<String>(key);
//Assert.DoesNotContain(q.AckKey, hash);
var rs = hash.Contains(q.AckKey);
Assert.False(rs);
hash.Add(q.AckKey);
}
}
[Fact]
public void Queue_Benchmark()
{
var key = "ReliableQueue_benchmark";
_redis.Remove(key);
var q = _redis.GetReliableQueue<String>(key);
for (var i = 0; i < 1_000; i++)
{
var list = new List<String>();
for (var j = 0; j < 100; j++)
list.Add(Rand.NextString(32));
q.Add(list.ToArray());
}
Assert.Equal(1_000 * 100, q.Count);
var count = 0;
while (true)
{
var n = Rand.Next(1, 100);
var list = q.Take(n).ToList();
if (list.Count == 0) break;
var n2 = q.Acknowledge(list.ToArray());
Assert.Equal(list.Count, n2);
count += list.Count;
}
Assert.Equal(1_000 * 100, count);
}
[Fact]
public async Task Queue_Benchmark_Mutilate()
{
var key = "ReliableQueue_benchmark_mutilate";
_redis.Remove(key);
var queue = _redis.GetReliableQueue<String>(key);
// 回滚死信,然后清空
var dead = queue.RollbackAllAck();
if (dead > 0) _redis.Remove(key);
for (var i = 0; i < 1_000; i++)
{
var list = new List<String>();
for (var j = 0; j < 100; j++)
list.Add($"msgContent-{i}-{j}");
queue.Add(list.ToArray());
}
Assert.Equal(1_000 * 100, queue.Count);
//var count = 0;
var ths = new List<Task<Int32>>();
for (var i = 0; i < 16; i++)
{
ths.Add(Task.Run(() =>
{
var count = 0;
var queue2 = _redis.GetReliableQueue<String>(key);
while (true)
{
var n = Rand.Next(1, 100);
var list = queue2.Take(n).Where(e => !e.IsNullOrEmpty()).ToList();
if (list.Count == 0) break;
var n2 = queue2.Acknowledge(list.ToArray());
// Ack返回值似乎没那么准
//Assert.Equal(list.Count, n2);
//Interlocked.Add(ref count, list.Count);
count += list.Count;
}
return count;
}));
}
//Task.WaitAll(ths.ToArray());
var rs = (await Task.WhenAll(ths)).Sum();
Assert.Equal(1_000 * 100, rs);
}
[Fact]
public void RetryDeadAck()
{
var key = "ReliableQueue_RetryDeadAck";
_redis.Remove(key);
var queue = _redis.GetReliableQueue<String>(key);
queue.RetryInterval = 5;
// 清空
queue.ClearAllAck();
// 生产几个消息,消费但不确认
var list = new List<String>();
for (var i = 0; i < 5; i++)
list.Add(Rand.NextString(32));
queue.Add(list.ToArray());
var list2 = queue.Take(10).ToList();
Assert.Equal(list.Count, list2.Count);
// 确认队列里面有几个
var q2 = _redis.GetList<String>(queue.AckKey);
Assert.Equal(list.Count, q2.Count);
// 马上消费,消费不到
var vs3 = queue.Take(100).ToArray();
Assert.Empty(vs3);
// 等一定时间再消费
Thread.Sleep(queue.RetryInterval * 1000 + 10);
// 再次消费,应该有了
var vs4 = queue.Take(100).ToArray();
Assert.Equal(list.Count, vs4.Length);
// 确认队列里面的私信重新进入主队列,消费时再次进入确认队列
Assert.Equal(vs4.Length, q2.Count);
// 全部确认
queue.Acknowledge(vs4);
// 确认队列应该空了
Assert.Empty(q2);
}
[Fact]
public async Task Queue_Async()
{
var key = "ReliableQueue_Async";
// 删除已有
_redis.Remove(key);
var q = _redis.GetReliableQueue<String>(key);
// 发现回滚
var rcount = q.RollbackAllAck();
if (rcount > 0)
{
XTrace.WriteLine("回滚:{0}", rcount);
Assert.Equal(rcount, q.Count);
var rcount2 = _redis.Remove(key);
Assert.Equal(1, rcount2);
}
// 添加
var vs = new[] { "1234", "abcd", "新生命团队", "ABEF" };
q.Add(vs);
// 取出来
Assert.Equal("1234", await q.TakeOneAsync(0));
Assert.Equal("abcd", await q.TakeOneAsync(0));
Assert.Equal("新生命团队", await q.TakeOneAsync(0));
Assert.Equal("ABEF", await q.TakeOneAsync(0));
// 空消息
var sw = Stopwatch.StartNew();
var rs = await q.TakeOneAsync(2);
sw.Stop();
Assert.Null(rs);
Assert.True(sw.ElapsedMilliseconds >= 2000);
// 延迟2秒生产消息
ThreadPool.QueueUserWorkItem(s => { Thread.Sleep(2000); q.Add("xxyy"); });
sw = Stopwatch.StartNew();
rs = await q.TakeOneAsync(3);
sw.Stop();
Assert.Equal("xxyy", rs);
Assert.True(sw.ElapsedMilliseconds >= 2000);
}
[Fact]
public async Task Queue_PublishAndConsume()
{
var key = "ReliableQueue_PublishAndConsume";
// 删除已有
_redis.Remove(key);
var q = _redis.GetReliableQueue<MyModel>(key);
// 改变有效期
//q.BodyExpire = 5 * 60;
var expire = 5 * 60;
var dic = new Dictionary<String, MyModel>
{
["aaa"] = new MyModel { Id = 123, Name = "a123" },
["bbb"] = new MyModel { Id = 456, Name = "b456" },
["ccc"] = new MyModel { Id = 789, Name = "c789" },
};
// 生产
var rs = q.Publish(dic, expire);
Assert.Equal(dic.Count, rs);
// 查看并干掉第二项
var v2 = _redis.Get<MyModel>("bbb");
Assert.NotNull(v2);
Assert.Equal(456, v2.Id);
Assert.Equal("b456", v2.Name);
var ttl = _redis.GetExpire("bbb");
Assert.True(ttl.TotalSeconds <= expire);
Assert.True(ttl.TotalSeconds >= expire - 2);
rs = _redis.Remove("bbb");
// 消费第一项
XTrace.WriteLine("消费第一项");
var v1 = await q.ConsumeAsync(ProcessAsync, 3);
Assert.NotNull(v1);
Assert.Equal(123, v1.Id);
// 消费第二项
XTrace.WriteLine("消费第二项");
v2 = await q.ConsumeAsync(ProcessAsync, 3);
Assert.Null(v2);
// 消费第三项
XTrace.WriteLine("消费第三项");
var v3 = await q.ConsumeAsync(ProcessAsync, 3);
Assert.NotNull(v3);
Assert.Equal(789, v3.Id);
}
private Task<MyModel> ProcessAsync(MyModel msg)
{
switch (msg.Id)
{
case 123:
Assert.NotNull(msg);
Assert.Equal(123, msg.Id);
Assert.Equal("a123", msg.Name);
break;
case 456:
Assert.Null(msg);
break;
case 789:
Assert.NotNull(msg);
Assert.Equal(789, msg.Id);
Assert.Equal("c789", msg.Name);
break;
}
return Task.FromResult(msg);
}
private class MyModel
{
public Int32 Id { get; set; }
public String Name { get; set; }
public String TraceParent { get; set; }
}
[Fact]
public void AttachTraceId()
{
var key = "ReliableQueue_AttachTraceId";
_redis.Remove(key);
var queue = _redis.GetReliableQueue<MyModel>(key);
_redis.Tracer = new DefaultTracer { MaxSamples = 100 };
var queue2 = _redis.GetReliableQueue<String>(key);
// 清空
queue.ClearAllAck();
// 生产对象消息
var traceParent = "";
{
using var span = _redis.Tracer.NewSpan("test");
traceParent = span.ToString();
var model = new MyModel { Id = 1234, Name = "Stone" };
queue.Add(model);
}
{
var json = queue2.TakeOne();
Assert.NotNull(json);
var dic = JsonParser.Decode(json);
Assert.NotNull(dic["traceparent"]);
Assert.NotEqual(traceParent, dic["traceparent"]);
}
// 生产json消息
traceParent = "";
{
using var span = _redis.Tracer.NewSpan("test");
traceParent = span.ToString();
var model = new MyModel { Id = 1234, Name = "Stone" };
queue2.Add(model.ToJson());
}
{
var json = queue2.TakeOne();
Assert.NotNull(json);
var dic = JsonParser.Decode(json);
Assert.NotEqual(traceParent, dic["traceparent"]);
}
// 生产普通字符串消息
traceParent = "";
{
using var span = _redis.Tracer.NewSpan("test");
traceParent = span.ToString();
queue2.Add("Stone");
}
{
var msg = queue2.TakeOne();
Assert.Equal("Stone", msg);
}
}
[Fact]
public async Task AddDelay()
{
var key = "ReliableQueue_addDelay";
// 删除已有
_redis.Remove(key);
var queue = _redis.GetReliableQueue<String>(key);
queue.RetryInterval = 5;
// 发现回滚
var rcount = queue.RollbackAllAck();
if (rcount > 0)
{
XTrace.WriteLine("回滚:{0}", rcount);
Assert.Equal(rcount, queue.Count);
var rcount2 = _redis.Remove(key);
Assert.Equal(1, rcount2);
}
// 取出个数
var count = queue.Count;
Assert.True(queue.IsEmpty);
Assert.Equal(0, count);
// 初始化延迟
var dq = queue.InitDelay();
dq.TransferInterval = 2;
// 添加延迟消息
var sw = Stopwatch.StartNew();
var vs = new[] { "1234", "abcd", "新生命团队", "ABEF" };
foreach (var item in vs)
queue.AddDelay(item, 2);
// 可信队列消费
var v1 = await queue.TakeOneAsync(-1);
Assert.Null(v1);
// 到期以后
XTrace.WriteLine("可信队列阻塞消费");
var v2 = await queue.TakeOneAsync(5);
sw.Stop();
Assert.Equal("1234", v2);
// 延迟队列没有阻塞方法,需要等1秒
Assert.True(sw.ElapsedMilliseconds >= 2000);
queue.Acknowledge(v2);
}
[Fact]
public async Task BlockTest()
{
// 一个队列两个消费,阻塞是否叠加
var key = "ReliableQueue_BlockTest";
_redis.Timeout = 15_000;
var sw = Stopwatch.StartNew();
var t1 = Task.Run(async () =>
{
var queue = _redis.GetReliableQueue<String>(key);
await queue.TakeOneAsync(3);
});
var t2 = Task.Run(async () =>
{
var queue = _redis.GetReliableQueue<String>(key);
await queue.TakeOneAsync(3);
});
await Task.WhenAll(t1, t2);
sw.Stop();
XTrace.WriteLine("ReliableQueue_BlockTest: {0}", sw.Elapsed);
Assert.True(sw.ElapsedMilliseconds < 3_000 + 500);
}
private class RedisMessage<T> { public MyModel Data { get; set; } }
[Fact]
public async Task TakeOneNotDataAsync()
{
var queue = _redis.GetReliableQueue<RedisMessage<MyModel>>("TakeOneNotDataAsync");
queue.RetryInterval = 60;//重新处理确认队列中死信的间隔。默认60s
RedisMessage<MyModel>? message = await queue.TakeOneAsync(10);
Assert.Null(message);
var queue2 = _redis.GetReliableQueue<Int32>("TakeOneNotDataAsync_Int32");
queue2.RetryInterval = 60;//重新处理确认队列中死信的间隔。默认60s
int messageInt = await queue2.TakeOneAsync(10);
Assert.Equal(0, messageInt);
}
}
|