[feat]星尘监控增加数据标量Value,例如数据库操作所影响的行数,平台负责汇总统计
智能大石头 authored at 2024-03-26 23:26:39
17.35 KiB
Stardust
using System.Collections.Concurrent;
using NewLife;
using NewLife.Log;
using NewLife.Threading;
using Stardust.Data.Monitors;
using XCode;
using XCode.Model;

namespace Stardust.Server.Services;

/// <summary>追踪统计服务</summary>
public interface ITraceStatService
{
    /// <summary>添加需要统计的追踪数据</summary>
    /// <param name="traces"></param>
    void Add(IList<TraceData> traces);

    /// <summary>统计特定应用和时间</summary>
    /// <param name="appId"></param>
    /// <param name="time"></param>
    void Add(Int32 appId, DateTime time);
}

/// <summary>追踪统计服务</summary>
public class TraceStatService : ITraceStatService
{
    /// <summary>流计算周期。默认5秒</summary>
    public Int32 FlowPeriod { get; set; } = 5;

    /// <summary>批计算周期。默认30秒</summary>
    public Int32 BatchPeriod { get; set; } = 30;

    private TimerX _timerFlow;
    private TimerX _timerBatch;
    private readonly ConcurrentBag<String> _bagDay = new();
    private readonly ConcurrentBag<String> _bagHour = new();
    private readonly ConcurrentDictionary<String, ConcurrentBag<DateTime>> _bagMinute = new();
    private readonly ConcurrentQueue<TraceData> _queue = new();

    /* 延迟队列技术 */
    private readonly DayQueue _dayQueue = new() { Period = 60 };
    private readonly HourQueue _hourQueue = new() { Period = 60 };
    private readonly MinuteQueue _minuteQueue = new() { Period = 60 };
    private readonly AppMinuteQueue _appMinuteQueue = new() { Period = 60 };

    private Int32 _count;
    private readonly ITracer _tracer;

    public TraceStatService(ITracer tracer) => _tracer = tracer;

    /// <summary>添加需要统计的追踪数据</summary>
    /// <param name="traces"></param>
    public void Add(IList<TraceData> traces)
    {
        if (traces == null || traces.Count == 0) return;

        // 放入队列包,凑批处理
        foreach (var item in traces)
        {
            {
                var key = $"{item.AppId}_{item.StatDate.ToFullString()}";
                if (!_bagDay.Contains(key)) _bagDay.Add(key);
            }
            {
                var key = $"{item.AppId}_{item.StatHour.ToFullString()}";
                if (!_bagHour.Contains(key)) _bagHour.Add(key);
            }
            {
                var key = $"{item.AppId}_{item.StatMinute:yyyyMMddHH}";
                var bag = _bagMinute.GetOrAdd(key, new ConcurrentBag<DateTime>());
                if (!bag.Contains(item.StatMinute)) bag.Add(item.StatMinute);
            }
        }

        // 限制增量队列长度,避免内存暴涨。过多数据留给定时批处理
        if (_count > 100_000) return;

        // 加入队列,增量计算
        foreach (var item in traces)
        {
            _queue.Enqueue(item);
            Interlocked.Increment(ref _count);
        }

        // 初始化定时器,用于流式增量计算和批量计算
        Init();
    }

    /// <summary>统计特定应用和时间</summary>
    /// <param name="appId"></param>
    /// <param name="time"></param>
    public void Add(Int32 appId, DateTime time)
    {
        Init();

        {
            var key = $"{appId}_{time.Date.ToFullString()}";
            if (!_bagDay.Contains(key)) _bagDay.Add(key);
        }
        {
            var hour = time.Date.AddHours(time.Hour);
            var key = $"{appId}_{hour.ToFullString()}";
            if (!_bagHour.Contains(key)) _bagHour.Add(key);
        }
        {
            var minute = time.Date.AddHours(time.Hour).AddMinutes(time.Minute / 5 * 5);
            var key = $"{appId}_{minute:yyyyMMddHH}";
            var bag = _bagMinute.GetOrAdd(key, new ConcurrentBag<DateTime>());
            if (!bag.Contains(minute)) bag.Add(minute);
        }

        _timerBatch?.SetNext(3_000);
    }

    /// <summary>初始化定时器</summary>
    public void Init()
    {
        if (_timerFlow == null && FlowPeriod > 0)
        {
            lock (this)
            {
                _timerFlow ??= new TimerX(DoFlowStat, null, 5_000, FlowPeriod * 1000) { Async = true };
            }
        }
        if (_timerBatch == null && BatchPeriod > 0)
        {
            lock (this)
            {
                _timerBatch ??= new TimerX(DoBatchStat, null, 5_000, BatchPeriod * 1000) { Async = true };
            }
        }
    }

    /// <summary>流式计算,增量累加</summary>
    /// <param name="state"></param>
    private void DoFlowStat(Object state)
    {
        if (_queue.IsEmpty) return;

        using var span = _tracer?.NewSpan("TraceFlowStat", new { queue = _count });

        // 限制每次只处理这么多
        var count = 100_000;
        while (count-- > 0)
        {
            if (!_queue.TryDequeue(out var td)) break;
            Interlocked.Decrement(ref _count);

            // 过滤异常数据
            if (td.AppId <= 0 || td.Name.IsNullOrEmpty()) continue;

            // 每日
            {
                var st = _dayQueue.GetOrAdd(td.StatDate, td.AppId, td.ItemId, out var key);

                st.Total += td.Total;
                st.Errors += td.Errors;
                st.TotalCost += td.TotalCost;
                if (st.MaxCost < td.MaxCost) st.MaxCost = td.MaxCost;
                if (st.MinCost <= 0 || st.MinCost > td.MinCost && td.MinCost > 0) st.MinCost = td.MinCost;
                st.TotalValue += td.TotalValue;

                _dayQueue.Commit(key);
            }

            // 小时
            {
                var st = _hourQueue.GetOrAdd(td.StatHour, td.AppId, td.ItemId, out var key);

                st.Total += td.Total;
                st.Errors += td.Errors;
                st.TotalCost += td.TotalCost;
                if (st.MaxCost < td.MaxCost) st.MaxCost = td.MaxCost;
                if (st.MinCost <= 0 || st.MinCost > td.MinCost && td.MinCost > 0) st.MinCost = td.MinCost;
                st.TotalValue += td.TotalValue;

                _hourQueue.Commit(key);
            }

            // 分钟
            {
                var st = _minuteQueue.GetOrAdd(td.StatMinute, td.AppId, td.ItemId, out var key);

                st.Total += td.Total;
                st.Errors += td.Errors;
                st.TotalCost += td.TotalCost;
                if (st.MaxCost < td.MaxCost) st.MaxCost = td.MaxCost;
                if (st.MinCost <= 0 || st.MinCost > td.MinCost && td.MinCost > 0) st.MinCost = td.MinCost;
                st.TotalValue += td.TotalValue;

                _minuteQueue.Commit(key);
            }

            // 应用分钟
            {
                var st = _appMinuteQueue.GetOrAdd(td.StatMinute, td.AppId, out var key);

                st.Total += td.Total;
                st.Errors += td.Errors;
                st.TotalCost += td.TotalCost;
                if (st.MaxCost < td.MaxCost) st.MaxCost = td.MaxCost;
                if (st.MinCost <= 0 || st.MinCost > td.MinCost && td.MinCost > 0) st.MinCost = td.MinCost;

                _appMinuteQueue.Commit(key);
            }
        }
    }

    /// <summary>批计算,覆盖缺失</summary>
    /// <param name="state"></param>
    private void DoBatchStat(Object state)
    {
        var keys = _bagMinute.Keys;
        foreach (var item in keys)
        {
            // 摘取下来
            if (_bagMinute.TryRemove(item, out var bag))
            {
                var ss = item.Split("_");
                var appId = ss[0].ToInt();
                var list = new List<DateTime>();
                while (bag.TryTake(out var dt))
                {
                    if (!list.Contains(dt)) list.Add(dt);
                }

                // 批量处理该应用,取最小时间和最大时间
                if (list.Count > 0) ProcessMinute(appId, list.Min(), list.Max());
            }
        }

        // 休息5000ms,让分钟统计落库
        Thread.Sleep(5000);

        while (_bagHour.TryTake(out var key))
        {
            var ss = key.Split("_");
            ProcessHour(ss[0].ToInt(), ss[1].ToDateTime());
        }
        while (_bagDay.TryTake(out var key))
        {
            var ss = key.Split("_");
            ProcessDay(ss[0].ToInt(), ss[1].ToDateTime());
        }
    }

    private void ProcessDay(Int32 appId, DateTime time)
    {
        if (appId <= 0 || time.Year < 2000) return;

        var date = time.Date;
        using var span = _tracer?.NewSpan("TraceBatchStat-Day", time);

        // 统计数据。由小时级汇总
        var list = TraceHourStat.FindAllByAppIdWithCache(appId, date, date.AddDays(1));
        if (list.Count == 0) return;

        span?.AppendTag($"TraceHourStat.Count={list.Count}");

        // 昨日统计
        var sts2 = TraceDayStat.FindAllByAppIdWithCache(appId, date.AddDays(-1));
        span?.AppendTag($"Yesterday={sts2.Count}");

        // 聚合
        // 分组聚合,这里包含了每个接口在该日内的所有分钟统计,需要求和
        foreach (var item in list.GroupBy(e => e.ItemId))
        {
            if (item.Key == 0) continue;

            var st = _dayQueue.GetOrAdd(date, appId, item.Key, out var key);

            var vs = item.ToList();
            st.Total = vs.Sum(e => e.Total);
            st.Errors = vs.Sum(e => e.Errors);
            st.TotalCost = vs.Sum(e => e.TotalCost);
            st.MaxCost = vs.Max(e => e.MaxCost);
            var vs2 = vs.Where(e => e.MinCost > 0).ToList();
            if (vs2.Count > 0) st.MinCost = vs2.Min(e => e.MinCost);

            // 计算TP99
            if (st.Total >= 50)
            {
                var totalCost = st.TotalCost;
                var ms = vs.Select(e => e.MaxCost).OrderByDescending(e => e).ToList();
                var n = (Int32)Math.Round(st.Total * 0.01);
                var i = 0;
                for (i = 0; i < n && i < ms.Count; i++)
                {
                    totalCost -= ms[i];
                }

                // 重新计算平均耗时,去掉了头部1%的最大值
                if (totalCost > 0)
                {
                    st.Cost = (Int32)Math.Round((Double)totalCost / (st.Total - i));
                }
            }

            // 计算种类
            var ti = TraceItem.FindById(item.Key);
            if (ti != null)
            {
                st.Type = ti.Kind;
                st.Name = ti + "";
            }

            // 计算环比
            var st2 = sts2.FirstOrDefault(e => e.ItemId == item.Key);
            if (st2 != null) st.RingRate = st2.Total <= 0 ? 1 : (Double)st.Total / st2.Total;

            //// 强制触发种类计算
            //st.Valid(false);

            _dayQueue.Commit(key);
        }
    }

    private void ProcessHour(Int32 appId, DateTime time)
    {
        if (appId <= 0 || time.Year < 2000) return;

        using var span = _tracer?.NewSpan("TraceBatchStat-Hour", new { appId, time });
        time = time.Date.AddHours(time.Hour);

        // 统计数据。分钟级统计可能因埋点名称污染,导致产生大量数据,这里过滤要最大1000行
        var list = TraceMinuteStat.FindAllByAppIdWithCache(appId, time, time.AddHours(1), 24 * 60 / 5 * 1000);
        list = list.Where(e => e.StatTime >= time && e.StatTime < time.AddHours(1)).ToList();
        if (list.Count == 0) return;

        span?.AppendTag($"TraceMinuteStat.Count={list.Count}");

        // 昨日统计
        var sts2 = TraceHourStat.FindAllByAppIdWithCache(appId, time.AddDays(-1), time.AddDays(-1).AddHours(1));
        span?.AppendTag($"Yesterday={sts2.Count}");

        // 分组聚合,这里包含了每个接口在该小时内的所有分钟统计,需要求和
        foreach (var item in list.GroupBy(e => e.ItemId))
        {
            if (item.Key == 0) continue;

            var st = _hourQueue.GetOrAdd(time, appId, item.Key, out var key);

            var vs = item.ToList();
            st.Total = vs.Sum(e => e.Total);
            st.Errors = vs.Sum(e => e.Errors);
            st.TotalCost = vs.Sum(e => e.TotalCost);
            st.MaxCost = vs.Max(e => e.MaxCost);
            var vs2 = vs.Where(e => e.MinCost > 0).ToList();
            if (vs2.Count > 0) st.MinCost = vs2.Min(e => e.MinCost);

            // 计算TP99
            if (st.Total >= 50)
            {
                var totalCost = st.TotalCost;
                var ms = vs.Select(e => e.MaxCost).OrderByDescending(e => e).ToList();
                var n = (Int32)Math.Round(st.Total * 0.01);
                var i = 0;
                for (i = 0; i < n && i < ms.Count; i++)
                {
                    totalCost -= ms[i];
                }

                // 重新计算平均耗时,去掉了头部1%的最大值
                if (totalCost > 0)
                {
                    st.Cost = (Int32)Math.Round((Double)totalCost / (st.Total - i));
                }
            }

            // 计算环比
            var st2 = sts2.FirstOrDefault(e => e.ItemId == item.Key);
            if (st2 != null) st.RingRate = st2.Total <= 0 ? 1 : (Double)st.Total / st2.Total;

            _hourQueue.Commit(key);
        }
    }

    private void ProcessMinute(Int32 appId, DateTime start, DateTime end)
    {
        if (appId <= 0 || start.Year < 2000 || end.Year < 2000) return;

        using var span = _tracer?.NewSpan("TraceBatchStat-Minute", $"{start.ToFullString()}-{end.ToFullString()}");

        // 排除项
        var app = AppTracer.FindByID(appId);
        var excludes = app.Excludes.Split(",", ";") ?? new String[0];

        start = start.Date.AddHours(start.Hour).AddMinutes(start.Minute / 5 * 5);
        end = end.Date.AddHours(end.Hour).AddMinutes(end.Minute / 5 * 5);

        // 统计数据
        var list = TraceData.SearchGroupAppAndName(appId, start, end);
        list = list.Where(e => !e.Name.IsNullOrEmpty()).ToList();
        if (list.Count == 0) return;

        // 剔除指定项
        list = list.Where(e => !e.Name.IsNullOrEmpty() && !excludes.Any(y => y.IsMatch(e.Name, StringComparison.OrdinalIgnoreCase))).ToList();

        // 聚合
        foreach (var item in list)
        {
            var st = _minuteQueue.GetOrAdd(item.StatMinute, appId, item.ItemId, out var key);

            st.Total = item.Total;
            st.Errors = item.Errors;
            st.TotalCost = item.TotalCost;
            st.MaxCost = item.MaxCost;
            st.MinCost = item.MinCost;
            st.TotalValue = item.TotalValue;

            _minuteQueue.Commit(key);
        }

        // 聚合应用分钟统计
        foreach (var item in list.GroupBy(e => e.AppId + "#" + e.StatMinute))
        {
            var traces = item.ToList();
            var st = _appMinuteQueue.GetOrAdd(traces[0].StatMinute, traces[0].AppId, out var key);

            st.Total = traces.Sum(e => e.Total);
            st.Errors = traces.Sum(e => e.Errors);
            st.TotalCost = traces.Sum(e => e.TotalCost);
            st.MaxCost = traces.Max(e => e.MaxCost);
            //st.MinCost = traces.Min(e => e.MinCost);
            var vs2 = traces.Where(e => e.MinCost > 0).ToList();
            if (vs2.Count > 0) st.MinCost = vs2.Min(e => e.MinCost);

            // 计算TP99
            if (st.Total >= 50)
            {
                //// 计算总耗时,如果某个跟踪项数据较多,则其平均值已被掐头
                //var totalCost = traces.Sum(e => e.Total >= 50 ? (e.Total - 1) * e.Cost : e.TotalCost);
                var totalCost = st.TotalCost;

                var ms = traces.Select(e => e.MaxCost).OrderByDescending(e => e).ToList();
                var n = (Int32)Math.Round(st.Total * 0.01);
                var i = 0;
                for (i = 0; i < n && i < ms.Count; i++)
                {
                    totalCost -= ms[i];
                }

                // 重新计算平均耗时,去掉了头部1%的最大值
                if (totalCost > 0)
                {
                    st.Cost = (Int32)Math.Round((Double)totalCost / (st.Total - i));
                }
            }

            _appMinuteQueue.Commit(key);
        }
    }
}

internal class DayQueue : MyQueue
{
    public TraceDayStat GetOrAdd(DateTime date, Int32 appId, Int32 itemId, out String key)
    {
        var model = new TraceStatModel { Time = date, AppId = appId, ItemId = itemId };
        key = model.Key;
        return GetOrAdd(key, k => TraceDayStat.FindOrAdd(model));
    }
}

internal class HourQueue : MyQueue
{
    public TraceHourStat GetOrAdd(DateTime date, Int32 appId, Int32 itemId, out String key)
    {
        var model = new TraceStatModel { Time = date, AppId = appId, ItemId = itemId };
        key = model.Key;
        return GetOrAdd(key, k => TraceHourStat.FindOrAdd(model));
    }
}

internal class MinuteQueue : MyQueue
{
    public TraceMinuteStat GetOrAdd(DateTime date, Int32 appId, Int32 itemId, out String key)
    {
        var model = new TraceStatModel { Time = date, AppId = appId, ItemId = itemId };
        key = model.Key;
        return GetOrAdd(key, k => TraceMinuteStat.FindOrAdd(model));
    }
}

internal class AppMinuteQueue : MyQueue
{
    public AppMinuteStat GetOrAdd(DateTime date, Int32 appId, out String key)
    {
        var model = new TraceStatModel { Time = date, AppId = appId };
        key = model.Key;
        return GetOrAdd(key, k => AppMinuteStat.FindOrAdd(model));
    }
}

internal class MyQueue : EntityDeferredQueue
{
    #region 方法
    /// <summary>处理一批</summary>
    /// <param name="list"></param>
    public override Int32 Process(IList<Object> list)
    {
        if (list.Count == 0) return 0;

        return list.Cast<IEntity>().Update();
    }
    #endregion
}