在所有Task异步任务中,长耗时(>500ms)的任务需要使用LongRunning来创建,避免占用线程池影响高并发吞吐
石头 authored at 2025-11-12 15:47:48
4.63 KiB
X
using System.Diagnostics.CodeAnalysis;
using NewLife.Log;
using NewLife.Messaging;

namespace NewLife.Caching;

/// <summary>消息队列事件总线。通过消息队列来发布和订阅消息</summary>
/// <remarks>
/// - 使用 <see cref="ICache.GetQueue{T}(String)"/> 获取后端队列;
/// - 订阅时启动后台消费循环,将队列消息分发给本地订阅者;
/// - 支持取消令牌优雅停止后台循环。
/// </remarks>
public class QueueEventBus<TEvent>(ICache cache, String topic) : EventBus<TEvent>
{
    private IProducerConsumer<TEvent>? _queue;
    private CancellationTokenSource? _source;
    private Task? _consumerTask;

    /// <summary>销毁。先取消后台任务,再释放资源</summary>
    /// <param name="disposing">是否由 <see cref="DisposeBase.Dispose()"/> 调用</param>
    protected override void Dispose(Boolean disposing)
    {
        base.Dispose(disposing);

        // 取消后台消费循环,并等待其退出后再释放 CTS
        var src = Interlocked.Exchange(ref _source, null);
        if (src != null)
        {
            try
            {
                if (!src.IsCancellationRequested) src.Cancel();
            }
            catch (ObjectDisposedException) { }
        }

        var task = Interlocked.Exchange(ref _consumerTask, null);
        if (task != null)
        {
            try { task.Wait(3_000); }
            catch (AggregateException) { }
        }

        src?.Dispose();
    }

    /// <summary>初始化:按需创建队列实例</summary>
    [MemberNotNull(nameof(_queue))]
    protected virtual void Init() => _queue ??= cache.GetQueue<TEvent>(topic);

    /// <summary>发布消息到消息队列</summary>
    /// <param name="event">事件</param>
    /// <param name="context">上下文</param>
    /// <param name="cancellationToken">取消令牌</param>
    public override Task<Int32> PublishAsync(TEvent @event, IEventContext<TEvent>? context = null, CancellationToken cancellationToken = default)
    {
        if (cancellationToken.IsCancellationRequested)
        {
#if NET45
            var tcs = new TaskCompletionSource<Int32>();
            tcs.SetCanceled();
            return tcs.Task;
#else
            return Task.FromCanceled<Int32>(cancellationToken);
#endif
        }

        Init();
        var rs = _queue.Add(@event);

        return Task.FromResult(rs);
    }

    /// <summary>订阅消息。启动后台循环,从消息队列订阅消息并分发到本地订阅者</summary>
    /// <param name="handler">处理器</param>
    /// <param name="clientId">客户标识。每个客户只能订阅一次,重复订阅将会挤掉前一次订阅</param>
    public override Boolean Subscribe(IEventHandler<TEvent> handler, String clientId = "")
    {
        if (_source == null)
        {
            var source = new CancellationTokenSource();
            if (Interlocked.CompareExchange(ref _source, source, null) == null)
            {
                Init();
                // 将固定的 source 传入,避免闭包捕获 _source 产生竞争
                var t = Task.Factory.StartNew(() => ConsumeMessage(source), source.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap();
                Interlocked.Exchange(ref _consumerTask, t);
            }
            else
            {
                // 如果竞争失败,释放本地创建的 source
                source.Dispose();
            }
        }

        return base.Subscribe(handler, clientId);
    }

    /// <summary>从队列消费消息并通过事件总线分发给本地订阅者</summary>
    /// <param name="source">取消令牌源</param>
    protected virtual async Task ConsumeMessage(CancellationTokenSource source)
    {
        DefaultSpan.Current = null;
        var cancellationToken = source.Token;
        try
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                var msg = await _queue!.TakeOneAsync(15, cancellationToken).ConfigureAwait(false);
                if (msg != null)
                {
                    // 发布到事件总线
                    await DispatchAsync(msg, null, cancellationToken).ConfigureAwait(false);
                }
                else
                {
                    await Task.Delay(1_000, cancellationToken).ConfigureAwait(false);
                }
            }
        }
        catch (TaskCanceledException) { }
        catch (OperationCanceledException) { }
        catch (Exception ex)
        {
            XTrace.WriteException(ex);
        }
        finally
        {
            // 不在此处二次取消,生命周期由外部(Dispose/取消订阅)管理
        }
    }
}