9.8.2018.0630
大石头 authored at 2018-06-30 11:15:32
8.96 KiB
X
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using NewLife.Log;

namespace System.Threading.Tasks
{
    /// <summary>任务助手</summary>
    public static class TaskHelper
    {
        #region 任务已完成
        /// <summary>是否正确完成</summary>
        /// <param name="task"></param>
        /// <returns></returns>
        public static Boolean IsOK(this Task task) => task != null && task.Status == TaskStatus.RanToCompletion;
        #endregion

        #region 异常日志/执行时间
        /// <summary>捕获异常并输出日志</summary>
        /// <param name="task"></param>
        /// <param name="log"></param>
        /// <returns></returns>
        public static Task LogException(this Task task, ILog log = null)
        {
            if (task == null) return null;

            if (log == null) log = XTrace.Log;
            if (log == Logger.Null || !log.Enable) return task;

            return task.ContinueWith(t =>
            {
                if (t.IsFaulted && t.Exception != null && t.Exception.InnerException != null) log.Error(null, t.Exception.InnerException);
            }, TaskContinuationOptions.OnlyOnFaulted);
        }

        /// <summary>捕获异常并输出日志</summary>
        /// <param name="task"></param>
        /// <param name="log"></param>
        /// <returns></returns>
        public static Task<TResult> LogException<TResult>(this Task<TResult> task, ILog log = null)
        {
            if (task == null) return null;

            if (log == null) log = XTrace.Log;
            if (log == Logger.Null || !log.Enable) return task;

            task.ContinueWith(t =>
            {
                if (t.IsFaulted && t.Exception != null && t.Exception.InnerException != null) log.Error(null, t.Exception.InnerException);
            }, TaskContinuationOptions.OnlyOnFaulted);

            return task;
        }

        /// <summary>捕获异常并输出日志</summary>
        /// <param name="task"></param>
        /// <param name="errCallback"></param>
        /// <returns></returns>
        public static Task LogException(this Task task, Action<Exception> errCallback)
        {
            if (task == null) return null;

            if (errCallback == null) return task;

            return task.ContinueWith(t =>
            {
                if (t.IsFaulted && t.Exception != null && t.Exception.InnerException != null) errCallback(t.Exception.InnerException);
            }, TaskContinuationOptions.OnlyOnFaulted);
        }

        /// <summary>统计时间并输出日志</summary>
        /// <param name="task"></param>
        /// <param name="name"></param>
        /// <param name="log"></param>
        /// <returns></returns>
        public static Task LogTime(this Task task, String name, ILog log = null)
        {
            if (task == null) return null;

            if (log == null) log = XTrace.Log;
            if (log == Logger.Null || !log.Enable) return task;

            var sw = Stopwatch.StartNew();

            return task.ContinueWith(t =>
            {
                sw.Stop();
                log.Info("{0} 耗时 {0}ms", name, sw.ElapsedMilliseconds);
            }, TaskContinuationOptions.OnlyOnRanToCompletion);
        }

        /// <summary>统计时间并输出日志</summary>
        /// <param name="task"></param>
        /// <param name="name"></param>
        /// <param name="log"></param>
        /// <returns></returns>
        public static Task<TResult> LogTime<TResult>(this Task<TResult> task, String name, ILog log = null)
        {
            if (task == null) return null;

            if (log == null) log = XTrace.Log;
            if (log == Logger.Null || !log.Enable) return task;

            var sw = Stopwatch.StartNew();

            return task.ContinueWith(t =>
            {
                sw.Stop();
                log.Info("{0} 耗时 {1}ms", name, sw.ElapsedMilliseconds);
                return t.Result;
            }, TaskContinuationOptions.OnlyOnRanToCompletion);
        }
        #endregion

        #region 数据流异步
#if !__CORE__
        /// <summary>异步读取数据流</summary>
        /// <param name="stream"></param>
        /// <param name="buffer"></param>
        /// <param name="offset"></param>
        /// <param name="count"></param>
        /// <returns></returns>
        public static Task<Int32> ReadAsync(this Stream stream, Byte[] buffer, Int32 offset, Int32 count)
        {
            return Task<Int32>.Factory.FromAsync(stream.BeginRead, stream.EndRead, buffer, offset, count, null);
        }

        /// <summary>异步读取数据</summary>
        /// <param name="stream"></param>
        /// <param name="length"></param>
        /// <returns></returns>
        public static Task<Byte[]> ReadAsync(this Stream stream, Int32 length)
        {
            if (length <= 0) throw new ArgumentOutOfRangeException("length");

            var buffer = new Byte[length];
            var task = Task.Factory.FromAsync(stream.BeginRead, stream.EndRead, buffer, 0, length, null);
            return task.ContinueWith(t =>
            {
                var len = t.Result;
                if (len == length) return buffer;

                return buffer.ReadBytes(len);
            });
        }

        /// <summary>异步写入数据流</summary>
        /// <param name="stream"></param>
        /// <param name="buffer"></param>
        /// <param name="offset"></param>
        /// <param name="count"></param>
        /// <returns></returns>
        public static Task WriteAsync(this Stream stream, Byte[] buffer, Int32 offset, Int32 count)
        {
            return Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, offset, count, null);
        }
#endif
        #endregion

        #region 任务转换
        /// <summary>任务转换</summary>
        /// <typeparam name="TResult"></typeparam>
        /// <param name="task"></param>
        /// <param name="cancellationToken"></param>
        /// <param name="result"></param>
        /// <returns></returns>
        public static Task<TResult> ToTask<TResult>(this Task task, CancellationToken cancellationToken = default(CancellationToken), TResult result = default(TResult))
        {
            if (task == null) return null;

            if (task.IsCompleted)
            {
                if (task.IsFaulted) return FromErrors<TResult>(task.Exception.InnerExceptions);

                if (task.IsCanceled || cancellationToken.IsCancellationRequested) return Canceled<TResult>();

                if (task.Status == TaskStatus.RanToCompletion) return FromResult<TResult>(result);
            }
            return ToTaskContinuation(task, result);
        }

        private static Task<TResult> FromErrors<TResult>(IEnumerable<Exception> exceptions)
        {
            var tcs = new TaskCompletionSource<TResult>();
            tcs.SetException(exceptions);
            return tcs.Task;
        }

        private static Task<TResult> Canceled<TResult>()
        {
            return CancelCache<TResult>.Canceled;
        }

        private static class CancelCache<TResult>
        {
            public static readonly Task<TResult> Canceled = GetCancelledTask();

            private static Task<TResult> GetCancelledTask()
            {
                var tcs = new TaskCompletionSource<TResult>();
                tcs.SetCanceled();
                return tcs.Task;
            }
        }

        private static Task<TResult> FromResult<TResult>(TResult result)
        {
            var tcs = new TaskCompletionSource<TResult>();
            tcs.SetResult(result);
            return tcs.Task;
        }

        private static Task<TResult> ToTaskContinuation<TResult>(Task task, TResult result)
        {
            var tcs = new TaskCompletionSource<TResult>();
            task.ContinueWith(delegate (Task innerTask)
            {
                if (task.Status == TaskStatus.RanToCompletion)
                {
                    tcs.TrySetResult(result);
                    return;
                }
                tcs.TrySetFromTask(innerTask);
            }, TaskContinuationOptions.ExecuteSynchronously);
            return tcs.Task;
        }

        private static Boolean TrySetFromTask<TResult>(this TaskCompletionSource<TResult> tcs, Task source)
        {
            if (source.Status == TaskStatus.Canceled) return tcs.TrySetCanceled();

            if (source.Status == TaskStatus.Faulted) return tcs.TrySetException(source.Exception.InnerExceptions);

            if (source.Status == TaskStatus.RanToCompletion)
            {
                var task = source as Task<TResult>;
                return tcs.TrySetResult((task == null) ? default(TResult) : task.Result);
            }
            return false;
        }
        #endregion
    }
}