v10.10.2024.0701 使用IJsonHost改进Json序列化
大石头 编写于 2024-07-01 08:36:34 大石头 提交于 2024-07-01 08:48:33
X
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
using NewLife;
using NewLife.Log;

namespace System.Threading.Tasks;

/// <summary>任务助手</summary>
public static class TaskHelper
{
    #region 任务已完成
    /// <summary>是否正确完成</summary>
    /// <param name="task"></param>
    /// <returns></returns>
    public static Boolean IsOK(this Task task) => task != null && task.Status == TaskStatus.RanToCompletion;
    #endregion

    #region 异常日志/执行时间
    /// <summary>捕获异常并输出日志</summary>
    /// <param name="task"></param>
    /// <param name="log"></param>
    /// <returns></returns>
    public static Task LogException(this Task task, ILog log = null)
    {
        if (task == null) return null;

        if (log == null) log = XTrace.Log;
        if (log == Logger.Null || !log.Enable) return task;

        return task.ContinueWith(t =>
        {
            if (t.IsFaulted && t.Exception != null && t.Exception.InnerException != null) log.Error(null, t.Exception.InnerException);
        }, TaskContinuationOptions.OnlyOnFaulted);
    }

    /// <summary>捕获异常并输出日志</summary>
    /// <param name="task"></param>
    /// <param name="log"></param>
    /// <returns></returns>
    public static Task<TResult> LogException<TResult>(this Task<TResult> task, ILog log = null)
    {
        if (task == null) return null;

        if (log == null) log = XTrace.Log;
        if (log == Logger.Null || !log.Enable) return task;

        task.ContinueWith(t =>
        {
            if (t.IsFaulted && t.Exception != null && t.Exception.InnerException != null) log.Error(null, t.Exception.InnerException);
        }, TaskContinuationOptions.OnlyOnFaulted);

        return task;
    }

    /// <summary>捕获异常并输出日志</summary>
    /// <param name="task"></param>
    /// <param name="errCallback"></param>
    /// <returns></returns>
    public static Task LogException(this Task task, Action<Exception> errCallback)
    {
        if (task == null) return null;

        if (errCallback == null) return task;

        return task.ContinueWith(t =>
        {
            if (t.IsFaulted && t.Exception != null && t.Exception.InnerException != null) errCallback(t.Exception.InnerException);
        }, TaskContinuationOptions.OnlyOnFaulted);
    }

    /// <summary>统计时间并输出日志</summary>
    /// <param name="task"></param>
    /// <param name="name"></param>
    /// <param name="log"></param>
    /// <returns></returns>
    public static Task LogTime(this Task task, String name, ILog log = null)
    {
        if (task == null) return null;

        if (log == null) log = XTrace.Log;
        if (log == Logger.Null || !log.Enable) return task;

        var sw = Stopwatch.StartNew();

        return task.ContinueWith(t =>
        {
            sw.Stop();
            log.Info("{0} 耗时 {0}ms", name, sw.ElapsedMilliseconds);
        }, TaskContinuationOptions.OnlyOnRanToCompletion);
    }

    /// <summary>统计时间并输出日志</summary>
    /// <param name="task"></param>
    /// <param name="name"></param>
    /// <param name="log"></param>
    /// <returns></returns>
    public static Task<TResult> LogTime<TResult>(this Task<TResult> task, String name, ILog log = null)
    {
        if (task == null) return null;

        if (log == null) log = XTrace.Log;
        if (log == Logger.Null || !log.Enable) return task;

        var sw = Stopwatch.StartNew();

        return task.ContinueWith(t =>
        {
            sw.Stop();
            log.Info("{0} 耗时 {1}ms", name, sw.ElapsedMilliseconds);
            return t.Result;
        }, TaskContinuationOptions.OnlyOnRanToCompletion);
    }
    #endregion

    #region 数据流异步
    /// <summary>异步读取数据流</summary>
    /// <param name="stream"></param>
    /// <param name="buffer"></param>
    /// <param name="offset"></param>
    /// <param name="count"></param>
    /// <returns></returns>
    public static Task<Int32> ReadAsync(this Stream stream, Byte[] buffer, Int32 offset, Int32 count)
    {
        return Task<Int32>.Factory.FromAsync(stream.BeginRead, stream.EndRead, buffer, offset, count, null);
    }

    /// <summary>异步读取数据流</summary>
    /// <param name="stream"></param>
    /// <param name="buffer"></param>
    /// <returns></returns>
    public static Task<Int32> ReadAsync(this Stream stream, Byte[] buffer)
    {
        return Task<Int32>.Factory.FromAsync(stream.BeginRead, stream.EndRead, buffer, 0, buffer.Length, null);
    }

    /// <summary>异步读取数据</summary>
    /// <param name="stream"></param>
    /// <param name="length"></param>
    /// <returns></returns>
    public static Task<Byte[]> ReadAsync(this Stream stream, Int32 length)
    {
        if (length <= 0) throw new ArgumentOutOfRangeException("length");

        var buffer = new Byte[length];
        var task = Task.Factory.FromAsync(stream.BeginRead, stream.EndRead, buffer, 0, length, null);
        return task.ContinueWith(t =>
        {
            var len = t.Result;
            if (len == length) return buffer;

            return buffer.ReadBytes(len);
        });
    }

    /// <summary>异步写入数据流</summary>
    /// <param name="stream"></param>
    /// <param name="buffer"></param>
    /// <param name="offset"></param>
    /// <param name="count"></param>
    /// <returns></returns>
    public static Task WriteAsync(this Stream stream, Byte[] buffer, Int32 offset, Int32 count)
    {
        return Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, offset, count, null);
    }

    /// <summary>异步写入数据流</summary>
    /// <param name="stream"></param>
    /// <param name="buffer"></param>
    /// <returns></returns>
    public static Task WriteAsync(this Stream stream, Byte[] buffer)
    {
        return Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, 0, buffer.Length, null);
    }
    #endregion

    #region Socket异步
    /// <summary>异步连接</summary>
    /// <param name="client"></param>
    /// <param name="host"></param>
    /// <param name="port"></param>
    /// <returns></returns>
    public static Task ConnectAsync(this TcpClient client, String host, Int32 port)
    {
        return Task.Factory.FromAsync(client.BeginConnect, client.EndConnect, host, port, null);
    }

    /// <summary>异步发送数据</summary>
    /// <param name="client"></param>
    /// <param name="datagram"></param>
    /// <param name="bytes"></param>
    /// <returns></returns>
    public static Task SendAsync(this UdpClient client, Byte[] datagram, Int32 bytes)
    {
        return Task.Factory.FromAsync(client.BeginSend, client.EndSend, datagram, bytes, null);
    }

    /// <summary>异步发送数据</summary>
    /// <param name="client"></param>
    /// <param name="datagram"></param>
    /// <returns></returns>
    public static Task SendAsync(this UdpClient client, Byte[] datagram)
    {
        return Task.Factory.FromAsync(client.BeginSend, client.EndSend, datagram, datagram.Length, null);
    }

    /// <summary>异步发送数据</summary>
    /// <param name="client"></param>
    /// <param name="datagram"></param>
    /// <param name="bytes"></param>
    /// <param name="endPoint"></param>
    /// <returns></returns>
    public static Task SendAsync(this UdpClient client, Byte[] datagram, Int32 bytes, IPEndPoint endPoint)
    {
        return Task.Factory.FromAsync(client.BeginSend, client.EndSend, datagram, bytes, endPoint, null);
    }

    /// <summary>异步接收数据</summary>
    /// <param name="client"></param>
    /// <returns></returns>
    public static Task<Byte[]> ReceiveAsync(this UdpClient client)
    {
        return Task<Byte[]>.Factory.FromAsync(client.BeginReceive, ar =>
        {
            IPEndPoint ep = null;
            return client.EndReceive(ar, ref ep);
        }, null);
    }
    #endregion

    #region 任务转换
    /// <summary>任务转换</summary>
    /// <typeparam name="TResult"></typeparam>
    /// <param name="task"></param>
    /// <param name="cancellationToken"></param>
    /// <param name="result"></param>
    /// <returns></returns>
    public static Task<TResult> ToTask<TResult>(this Task task, CancellationToken cancellationToken = default, TResult result = default)
    {
        if (task == null) return null;

        if (task.IsCompleted)
        {
            if (task.IsFaulted) return FromErrors<TResult>(task.Exception.InnerExceptions);

            if (task.IsCanceled || cancellationToken.IsCancellationRequested) return Canceled<TResult>();

            if (task.Status == TaskStatus.RanToCompletion) return FromResult<TResult>(result);
        }
        return ToTaskContinuation(task, result);
    }

    private static Task<TResult> FromErrors<TResult>(IEnumerable<Exception> exceptions)
    {
        var tcs = new TaskCompletionSource<TResult>();
        tcs.SetException(exceptions);
        return tcs.Task;
    }

    private static Task<TResult> Canceled<TResult>()
    {
        return CancelCache<TResult>.Canceled;
    }

    private static class CancelCache<TResult>
    {
        public static readonly Task<TResult> Canceled = GetCancelledTask();

        private static Task<TResult> GetCancelledTask()
        {
            var tcs = new TaskCompletionSource<TResult>();
            tcs.SetCanceled();
            return tcs.Task;
        }
    }

    private static Task<TResult> FromResult<TResult>(TResult result)
    {
        var tcs = new TaskCompletionSource<TResult>();
        tcs.SetResult(result);
        return tcs.Task;
    }

    private static Task<TResult> ToTaskContinuation<TResult>(Task task, TResult result)
    {
        var tcs = new TaskCompletionSource<TResult>();
        task.ContinueWith(delegate (Task innerTask)
        {
            if (task.Status == TaskStatus.RanToCompletion)
            {
                tcs.TrySetResult(result);
                return;
            }
            tcs.TrySetFromTask(innerTask);
        }, TaskContinuationOptions.ExecuteSynchronously);
        return tcs.Task;
    }

    private static Boolean TrySetFromTask<TResult>(this TaskCompletionSource<TResult> tcs, Task source)
    {
        if (source.Status == TaskStatus.Canceled) return tcs.TrySetCanceled();

        if (source.Status == TaskStatus.Faulted) return tcs.TrySetException(source.Exception.InnerExceptions);

        if (source.Status == TaskStatus.RanToCompletion)
        {
            return tcs.TrySetResult((source is not Task<TResult> task) ? default : task.Result);
        }
        return false;
    }
    #endregion
}