using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NewLife.Caching;
using NewLife.Caching.Queues;
using NewLife.Log;
using NewLife.Security;
using Xunit;
namespace XUnitTest.Queues;
//[Collection("Queue")]
public class QueueTests
{
private FullRedis _redis;
public QueueTests()
{
var config = BasicTest.GetConfig();
_redis = new FullRedis();
_redis.Init(config);
_redis.Log = XTrace.Log;
#if DEBUG
_redis.ClientLog = XTrace.Log;
#endif
}
[Fact]
public void Queue_Normal()
{
var key = "Queue_normal";
// 删除已有
_redis.Remove(key);
var q = _redis.GetQueue<String>(key);
_redis.SetExpire(key, TimeSpan.FromMinutes(60));
Assert.NotNull(q as RedisQueue<String>);
// 取出个数
var count = q.Count;
Assert.True(q.IsEmpty);
Assert.Equal(0, count);
// 添加
var vs = new[] { "1234", "abcd", "新生命团队", "ABEF" };
q.Add(vs);
// 对比个数
var count2 = q.Count;
Assert.False(q.IsEmpty);
Assert.Equal(count + vs.Length, count2);
// 取出来
var vs2 = q.Take(2).ToArray();
Assert.Equal(2, vs2.Length);
Assert.Equal("1234", vs2[0]);
Assert.Equal("abcd", vs2[1]);
// 管道批量获取
var q2 = q as RedisQueue<String>;
q2.MinPipeline = 4;
var vs3 = q.Take(5).ToArray();
Assert.Equal(2, vs3.Length);
Assert.Equal("新生命团队", vs3[0]);
Assert.Equal("ABEF", vs3[1]);
// 对比个数
var count3 = q.Count;
Assert.True(q.IsEmpty);
Assert.Equal(count, count3);
}
[Fact]
public void Queue_Block()
{
var key = "Queue_block";
// 删除已有
_redis.Remove(key);
var q = _redis.GetQueue<String>(key);
_redis.SetExpire(key, TimeSpan.FromMinutes(60));
Assert.NotNull(q as RedisQueue<String>);
// 取出个数
var count = q.Count;
Assert.True(q.IsEmpty);
Assert.Equal(0, count);
// 添加
var vs = new[] { "1234", "abcd", "新生命团队", "ABEF" };
foreach (var item in vs)
q.Add(item);
// 对比个数
var count2 = q.Count;
Assert.False(q.IsEmpty);
Assert.Equal(vs.Length, count2);
// 取出来
Assert.Equal(vs[0], q.TakeOne());
Assert.Equal(vs[1], q.TakeOne());
Assert.Equal(vs[2], q.TakeOne());
Assert.Equal(vs[3], q.TakeOne());
// 延迟2秒生产消息
ThreadPool.QueueUserWorkItem(s => { Thread.Sleep(2000); q.Add("xxyy"); });
var sw = Stopwatch.StartNew();
var rs = q.TakeOne(3);
sw.Stop();
Assert.Equal("xxyy", rs);
Assert.True(sw.ElapsedMilliseconds >= 2000);
}
[Fact]
public void Queue_NotEnough()
{
var key = "Queue_not_enough";
// 删除已有
_redis.Remove(key);
var q = _redis.GetQueue<String>(key);
_redis.SetExpire(key, TimeSpan.FromMinutes(60));
Assert.NotNull(q as RedisQueue<String>);
// 取出个数
var count = q.Count;
Assert.True(q.IsEmpty);
Assert.Equal(0, count);
// 添加
var vs = new[] { "1234", "abcd" };
q.Add(vs);
// 取出来
var vs2 = q.Take(3).ToArray();
Assert.Equal(2, vs2.Length);
Assert.Equal("1234", vs2[0]);
Assert.Equal("abcd", vs2[1]);
// 再取,这个时候已经没有元素
var vs4 = q.Take(3).ToArray();
Assert.Empty(vs4);
// 管道批量获取
var vs3 = q.Take(5).ToArray();
Assert.Empty(vs3);
// 对比个数
var count3 = q.Count;
Assert.True(q.IsEmpty);
Assert.Equal(count, count3);
}
[Fact]
public void Queue_Benchmark()
{
var key = "Queue_benchmark";
_redis.Remove(key);
var q = _redis.GetQueue<String>(key);
for (var i = 0; i < 1_000; i++)
{
var list = new List<String>();
for (var j = 0; j < 100; j++)
list.Add(Rand.NextString(32));
q.Add(list.ToArray());
}
Assert.Equal(1_000 * 100, q.Count);
var count = 0;
while (true)
{
var n = Rand.Next(1, 100);
var list = q.Take(n).ToList();
if (list.Count == 0) break;
count += list.Count;
}
Assert.Equal(1_000 * 100, count);
}
[Fact]
public async Task Queue_Benchmark_Mutilate()
{
var key = "Queue_benchmark_mutilate";
_redis.Remove(key);
var q = _redis.GetQueue<String>(key);
for (var i = 0; i < 1_000; i++)
{
var list = new List<String>();
for (var j = 0; j < 100; j++)
list.Add(Rand.NextString(32));
q.Add(list.ToArray());
}
Assert.Equal(1_000 * 100, q.Count);
var count = 0;
var ths = new List<Task>();
for (var i = 0; i < 16; i++)
ths.Add(Task.Run(() =>
{
while (true)
{
var n = Rand.Next(1, 100);
var list = q.Take(n).ToList();
if (list.Count == 0) break;
Interlocked.Add(ref count, list.Count);
}
}));
await Task.WhenAll(ths.ToArray());
Assert.Equal(1_000 * 100, count);
}
[Fact]
public async Task Queue_Async()
{
var key = "Queue_Async";
// 删除已有
_redis.Remove(key);
var q = _redis.GetQueue<String>(key);
// 添加
var vs = new[] { "1234", "abcd", "新生命团队", "ABEF" };
q.Add(vs);
// 取出来
Assert.Equal("1234", await q.TakeOneAsync(0));
Assert.Equal("abcd", await q.TakeOneAsync(0));
Assert.Equal("新生命团队", await q.TakeOneAsync(0));
Assert.Equal("ABEF", await q.TakeOneAsync(0));
// 空消息
var sw = Stopwatch.StartNew();
var rs = await q.TakeOneAsync(2);
sw.Stop();
Assert.Null(rs);
Assert.True(sw.ElapsedMilliseconds >= 2000);
// 延迟2秒生产消息
ThreadPool.QueueUserWorkItem(s => { Thread.Sleep(2000); q.Add("xxyy"); });
sw = Stopwatch.StartNew();
rs = await q.TakeOneAsync(3);
sw.Stop();
Assert.Equal("xxyy", rs);
Assert.True(sw.ElapsedMilliseconds >= 2000);
}
[Fact]
public void Queue_NoAck()
{
var key = "Queue_NoAck";
// 删除已有
_redis.Remove(key);
var q = _redis.GetQueue<String>(key);
//Assert.Throws<NotSupportedException>(() => q.Acknowledge(""));
var rs = q.Acknowledge("");
Assert.Equal(-1, rs);
}
}
|