[feat] ApiHttpClient新增DownloadFileAsync,支持从多个地址下载文件到本地。适用于内网集群部署时的文件下载,主要用在星尘中,某节点内网地址和公网地址并存时。
智能大石头 authored at 2025-12-16 18:33:15
8.76 KiB
X
using NewLife.Data;
using NewLife.Log;
using NewLife.Threading;

namespace NewLife.Net.Handlers;

/// <summary>消息匹配队列接口。用于把响应数据包配对到请求包</summary>
public interface IMatchQueue
{
    /// <summary>加入请求队列</summary>
    /// <param name="owner">拥有者</param>
    /// <param name="request">请求消息</param>
    /// <param name="msTimeout">超时取消时间</param>
    /// <param name="source">任务源</param>
    Task<Object> Add(Object? owner, Object request, Int32 msTimeout, TaskCompletionSource<Object> source);

    /// <summary>检查请求队列是否有匹配该响应的请求</summary>
    /// <param name="owner">拥有者</param>
    /// <param name="response">响应消息</param>
    /// <param name="result">任务结果</param>
    /// <param name="callback">用于检查匹配的回调</param>
    /// <returns></returns>
    Boolean Match(Object? owner, Object response, Object result, Func<Object?, Object?, Boolean> callback);

    /// <summary>清空队列</summary>
    void Clear();
}

/// <summary>消息匹配队列。子类可重载以自定义请求响应匹配逻辑</summary>
public class DefaultMatchQueue : IMatchQueue
{
    private struct ItemWrap
    {
        public Item? Value;
    }

    class Item
    {
        public Object? Owner { get; set; }
        public Object? Request { get; set; }
        public Int64 EndTime { get; set; }
        public TaskCompletionSource<Object>? Source { get; set; }
        public ISpan? Span { get; set; }
    }

    // 固定槽位数组 + 计数。使用CAS清理,避免多线程重复清理造成 _Count 不一致
    private readonly ItemWrap[] Items;
    private Int32 _Count;
    private TimerX? _Timer;

    // 追加一个游标,减少每次从0开始扫描导致的热点
    private Int32 _cursor;

    /// <summary>按指定大小来初始化队列</summary>
    /// <param name="size"></param>
    public DefaultMatchQueue(Int32 size = 256) => Items = new ItemWrap[size];

    /// <summary>加入请求队列</summary>
    /// <param name="owner">拥有者</param>
    /// <param name="request">请求的数据</param>
    /// <param name="msTimeout">超时取消时间</param>
    /// <param name="source">任务源</param>
    public virtual Task<Object> Add(Object? owner, Object request, Int32 msTimeout, TaskCompletionSource<Object> source)
    {
        var now = Runtime.TickCount64;

        // 控制超时时间,默认15秒
        if (msTimeout <= 10) msTimeout = 15_000;

        var ext = owner as IExtend;
        var qi = new Item
        {
            Owner = owner,
            Request = request,
            EndTime = now + msTimeout,
            Source = source,
            Span = ext?["Span"] as ISpan,
        };

        // 若计数已接近容量,先做一次快速清理以回收过期项,避免“看似满”的误判
        var items = Items;
        if (Volatile.Read(ref _Count) >= items.Length)
        {
            Check(null);
        }

        // 加入队列(从游标位置开始扫描,避免总是从0导致争用)
        var len = items.Length;
        var start = _cursor;
        for (var offset = 0; offset < len; ++offset)
        {
            var i = start + offset;
            if (i >= len) i -= len;

            if (Interlocked.CompareExchange(ref items[i].Value, qi, null) == null)
            {
                Interlocked.Increment(ref _Count);

                // 推进游标
                if (++i >= len) i = 0;
                _cursor = i;

                if (_Timer == null)
                {
                    lock (this)
                    {
                        _Timer ??= new TimerX(Check, null, 1000, 1000, "Match") { Async = true };
                    }
                }

                return source.Task;
            }
        }

        // 第一次扫描失败后,再进行一次同步清理并重试,最后才认为真的满
        Check(null);

        // 重试一次
        items = Items; // 允许未来可能的扩容,这里重新读取引用
        len = items.Length;
        start = _cursor;
        for (var offset = 0; offset < len; ++offset)
        {
            var i = start + offset;
            if (i >= len) i -= len;

            if (Interlocked.CompareExchange(ref items[i].Value, qi, null) == null)
            {
                Interlocked.Increment(ref _Count);

                if (++i >= len) i = 0;
                _cursor = i;

                if (_Timer == null)
                {
                    lock (this)
                    {
                        _Timer ??= new TimerX(Check, null, 1000, 1000, "Match") { Async = true };
                    }
                }

                return source.Task;
            }
        }

        DefaultTracer.Instance?.NewError("net:MatchQueue:IsFull", new { items.Length });
        throw new XException("The matching queue is full [{0}]", items.Length);
    }

    /// <summary>检查请求队列是否有匹配该响应的请求</summary>
    /// <param name="owner">拥有者</param>
    /// <param name="response">响应消息</param>
    /// <param name="result">任务结果</param>
    /// <param name="callback">用于检查匹配的回调</param>
    /// <returns></returns>
    public virtual Boolean Match(Object? owner, Object response, Object result, Func<Object?, Object?, Boolean> callback)
    {
        if (Volatile.Read(ref _Count) <= 0) return false;

        // 直接遍历,队列不会很长
        var qs = Items;
        for (var i = 0; i < qs.Length; i++)
        {
            var qi = Volatile.Read(ref qs[i].Value);
            if (qi == null) continue;

            if (qi.Owner == owner && callback(qi.Request, response))
            {
                // CAS 置空,确保仅一次成功清理,避免并发重复清理造成 _Count 错乱
                if (Interlocked.CompareExchange(ref qs[i].Value, null, qi) != qi) continue;

                Interlocked.Decrement(ref _Count);

                // 异步设置完成结果,否则可能会在当前线程恢复上层await,导致堵塞当前任务
                var src = qi.Source;
                if (src != null && !src.Task.IsCompleted)
                {
                    qi.Span?.AppendTag($"{Runtime.TickCount64} MatchQueue.SetResult(Matched)");
#if NET45
                    Task.Factory.StartNew(() => src.TrySetResult(result));
#else
                    src.TrySetResult(result);
#endif
                }

                return true;
            }
        }

        if (SocketSetting.Current.Debug)
            XTrace.WriteLine("MatchQueue.Check 失败 [{0}] result={1} Items={2}", response, result, _Count);

        return false;
    }

    /// <summary>定时检查发送队列,超时未收到响应则重发</summary>
    /// <param name="state"></param>
    void Check(Object? state)
    {
        if (Volatile.Read(ref _Count) <= 0) return;

        // 直接遍历,队列不会很长
        var now = Runtime.TickCount64;
        var qs = Items;
        for (var i = 0; i < qs.Length; i++)
        {
            var qi = Volatile.Read(ref qs[i].Value);
            if (qi == null) continue;

            // 过期取消
            if (qi.EndTime <= now)
            {
                if (Interlocked.CompareExchange(ref qs[i].Value, null, qi) != qi) continue;

                Interlocked.Decrement(ref _Count);

                // 异步取消任务,避免在当前线程执行上层await的延续任务
                var src = qi.Source;
                if (src != null && !src.Task.IsCompleted)
                {
                    qi.Span?.AppendTag($"{Runtime.TickCount64} MatchQueue.Expired({qi.EndTime}<={now})");

#if NET45
                    Task.Factory.StartNew(() => src.TrySetCanceled());
#else
                    src.TrySetCanceled();
#endif
                }
            }
        }
    }

    /// <summary>清空队列</summary>
    public virtual void Clear()
    {
        var qs = Items;
        for (var i = 0; i < qs.Length; ++i)
        {
            var qi = Interlocked.Exchange(ref qs[i].Value, null);
            if (qi == null) continue;

            Interlocked.Decrement(ref _Count);

            // 异步取消任务,避免在当前线程执行上层await的延续任务
            var src = qi.Source;
            if (src != null && !src.Task.IsCompleted)
            {
                qi.Span?.AppendTag("MatchQueue.Clear()");

#if NET45
                Task.Factory.StartNew(() => src.TrySetCanceled());
#else
                src.TrySetCanceled();
#endif
            }
        }
        _Count = 0;
    }
}