合并主线最近几年来的主要更新,重点推进TinyHttpClient,替代HttpClient
大石头 authored at 2023-03-08 18:39:12
21.50 KiB
X_NET20
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.InteropServices;

namespace System.Threading.Tasks
{
	public static class Parallel
	{
		[StructLayout(LayoutKind.Explicit)]
		private struct StealValue64
		{
			[FieldOffset(0)]
			public long Value;

			[FieldOffset(0)]
			public int Actual;

			[FieldOffset(4)]
			public int Stolen;
		}

		private class StealRange
		{
			public StealValue64 V64 = default(StealValue64);

			public StealRange(int fromInclusive, int i, int step)
			{
				V64.Actual = fromInclusive + i * step;
			}
		}

		private static readonly bool sixtyfour = IntPtr.Size == 8;

		internal static int GetBestWorkerNumber()
		{
			return GetBestWorkerNumber(TaskScheduler.Current);
		}

		internal static int GetBestWorkerNumber(TaskScheduler scheduler)
		{
			return scheduler.MaximumConcurrencyLevel;
		}

		private static int GetBestWorkerNumber(int from, int to, ParallelOptions options, out int step)
		{
			int num = GetBestWorkerNumber(options.TaskScheduler);
			if (options != null && options.MaxDegreeOfParallelism != -1)
			{
				num = options.MaxDegreeOfParallelism;
			}
			if ((step = (to - from) / num) < 5)
			{
				step = 5;
				num = (to - from) / 5;
				if (num < 1)
				{
					num = 1;
				}
			}
			return num;
		}

		private static void HandleExceptions(IEnumerable<Task> tasks)
		{
			HandleExceptions(tasks, null);
		}

		private static void HandleExceptions(IEnumerable<Task> tasks, ParallelLoopState.ExternalInfos infos)
		{
			List<Exception> exs = new List<Exception>();
			foreach (Task t in tasks)
			{
				if (t.Exception != null)
				{
					exs.Add(t.Exception);
				}
			}
			if (exs.Count > 0)
			{
				if (infos != null)
				{
					infos.IsExceptional = true;
				}
				throw new AggregateException(exs).Flatten();
			}
		}

		private static void InitTasks(Task[] tasks, int count, Action action, ParallelOptions options)
		{
			TaskCreationOptions creation = TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent;
			for (int i = 0; i < count; i++)
			{
				if (options == null)
				{
					tasks[i] = Task.Factory.StartNew(action, creation);
				}
				else
				{
					tasks[i] = Task.Factory.StartNew(action, options.CancellationToken, creation, options.TaskScheduler);
				}
			}
		}

		public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body)
		{
			return For(fromInclusive, toExclusive, ParallelOptions.Default, body);
		}

		public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body)
		{
			return For(fromInclusive, toExclusive, ParallelOptions.Default, body);
		}

		public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body)
		{
			return For(fromInclusive, toExclusive, parallelOptions, delegate(int index, ParallelLoopState state)
			{
				body(index);
			});
		}

		public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body)
		{
			return For(fromInclusive, toExclusive, parallelOptions, () => null, delegate(int i, ParallelLoopState s, object l)
			{
				body(i, s);
				return null;
			}, delegate
			{
			});
		}

		public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
		{
			return For(fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
		}

		public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
		{
			if (body == null)
			{
				throw new ArgumentNullException("body");
			}
			if (localInit == null)
			{
				throw new ArgumentNullException("localInit");
			}
			if (localFinally == null)
			{
				throw new ArgumentNullException("localFinally");
			}
			if (parallelOptions == null)
			{
				throw new ArgumentNullException("options");
			}
			if (fromInclusive >= toExclusive)
			{
				return new ParallelLoopResult(null, isCompleted: true);
			}
			int step;
			int num = GetBestWorkerNumber(fromInclusive, toExclusive, parallelOptions, out step);
			Task[] tasks = new Task[num];
			StealRange[] ranges = new StealRange[num];
			for (int i = 0; i < num; i++)
			{
				ranges[i] = new StealRange(fromInclusive, i, step);
			}
			ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos();
			int currentIndex = -1;
			Action workerMethod = delegate
			{
				int num2 = Interlocked.Increment(ref currentIndex);
				StealRange stealRange = ranges[num2];
				int actual = stealRange.V64.Actual;
				int num3 = ((num2 + 1 == num) ? toExclusive : Math.Min(toExclusive, actual + step));
				TLocal val = localInit();
				ParallelLoopState parallelLoopState = new ParallelLoopState(infos);
				CancellationToken cancellationToken = parallelOptions.CancellationToken;
				try
				{
					for (int num4 = actual; num4 < num3; num4 = (stealRange.V64.Actual = num4 + 1))
					{
						if (infos.IsStopped)
						{
							return;
						}
						cancellationToken.ThrowIfCancellationRequested();
						if (num4 >= num3 - stealRange.V64.Stolen)
						{
							break;
						}
						if (infos.LowestBreakIteration.HasValue && infos.LowestBreakIteration > num4)
						{
							return;
						}
						parallelLoopState.CurrentIteration = num4;
						val = body(num4, parallelLoopState, val);
						if (num4 + 1 >= num3 - stealRange.V64.Stolen)
						{
							break;
						}
					}
					int num5 = num + num2;
					for (int j = num2 + 1; j < num5; j++)
					{
						int num6 = j % num;
						stealRange = ranges[num6];
						num3 = ((num6 + 1 == num) ? toExclusive : Math.Min(toExclusive, fromInclusive + (num6 + 1) * step));
						int num7 = -1;
						while (true)
						{
							StealValue64 stealValue = default(StealValue64);
							long num8 = (stealValue.Value = (sixtyfour ? stealRange.V64.Value : Interlocked.CompareExchange(ref stealRange.V64.Value, 0L, 0L)));
							if (stealValue.Actual >= num3 - stealValue.Stolen - 2)
							{
								break;
							}
							num7 = ++stealValue.Stolen;
							if (Interlocked.CompareExchange(ref stealRange.V64.Value, stealValue.Value, num8) == num8)
							{
								num7 = num3 - num7;
								if (num7 <= stealRange.V64.Actual)
								{
									break;
								}
								val = body(num7, parallelLoopState, val);
							}
						}
					}
				}
				finally
				{
					localFinally(val);
				}
			};
			InitTasks(tasks, num, workerMethod, parallelOptions);
			try
			{
				Task.WaitAll(tasks);
			}
			catch
			{
				HandleExceptions(tasks, infos);
			}
			return new ParallelLoopResult(infos.LowestBreakIteration, !infos.IsStopped && !infos.IsExceptional);
		}

		public static ParallelLoopResult For(long fromInclusive, long toExclusive, Action<long> body)
		{
			return For(fromInclusive, toExclusive, ParallelOptions.Default, body);
		}

		public static ParallelLoopResult For(long fromInclusive, long toExclusive, Action<long, ParallelLoopState> body)
		{
			return For(fromInclusive, toExclusive, ParallelOptions.Default, body);
		}

		public static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body)
		{
			return For(fromInclusive, toExclusive, parallelOptions, delegate(long index, ParallelLoopState state)
			{
				body(index);
			});
		}

		public static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long, ParallelLoopState> body)
		{
			return For(fromInclusive, toExclusive, parallelOptions, () => null, delegate(long i, ParallelLoopState s, object l)
			{
				body(i, s);
				return null;
			}, delegate
			{
			});
		}

		public static ParallelLoopResult For<TLocal>(long fromInclusive, long toExclusive, Func<TLocal> localInit, Func<long, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
		{
			return For(fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
		}

		public static ParallelLoopResult For<TLocal>(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<long, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
		{
			if (body == null)
			{
				throw new ArgumentNullException("body");
			}
			if (localInit == null)
			{
				throw new ArgumentNullException("localInit");
			}
			if (localFinally == null)
			{
				throw new ArgumentNullException("localFinally");
			}
			if (parallelOptions == null)
			{
				throw new ArgumentNullException("options");
			}
			if (fromInclusive >= toExclusive)
			{
				return new ParallelLoopResult(null, isCompleted: true);
			}
			throw new NotImplementedException();
		}

		private static ParallelLoopResult ForEach<TSource, TLocal>(Func<int, IList<IEnumerator<TSource>>> enumerable, ParallelOptions options, Func<TLocal> init, Func<TSource, ParallelLoopState, TLocal, TLocal> action, Action<TLocal> destruct)
		{
			if (enumerable == null)
			{
				throw new ArgumentNullException("source");
			}
			if (options == null)
			{
				throw new ArgumentNullException("options");
			}
			if (action == null)
			{
				throw new ArgumentNullException("action");
			}
			if (init == null)
			{
				throw new ArgumentNullException("init");
			}
			if (destruct == null)
			{
				throw new ArgumentNullException("destruct");
			}
			int num = Math.Min(GetBestWorkerNumber(), (options != null && options.MaxDegreeOfParallelism != -1) ? options.MaxDegreeOfParallelism : int.MaxValue);
			Task[] tasks = new Task[num];
			ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos();
			SimpleConcurrentBag<TSource> bag = new SimpleConcurrentBag<TSource>(num);
			IList<IEnumerator<TSource>> slices = enumerable(num);
			int sliceIndex = -1;
			Action workerMethod = delegate
			{
				IEnumerator<TSource> enumerator = slices[Interlocked.Increment(ref sliceIndex)];
				TLocal val = init();
				ParallelLoopState arg = new ParallelLoopState(infos);
				int nextIndex = bag.GetNextIndex();
				CancellationToken cancellationToken = options.CancellationToken;
				try
				{
					bool flag = true;
					TSource value;
					while (flag)
					{
						if (infos.IsStopped || infos.IsBroken.Value)
						{
							return;
						}
						cancellationToken.ThrowIfCancellationRequested();
						for (int i = 0; i < 5; i++)
						{
							if (!(flag = enumerator.MoveNext()))
							{
								break;
							}
							bag.Add(nextIndex, enumerator.Current);
						}
						for (int j = 0; j < 5; j++)
						{
							if (!bag.TryTake(nextIndex, out value))
							{
								break;
							}
							if (infos.IsStopped)
							{
								return;
							}
							cancellationToken.ThrowIfCancellationRequested();
							val = action(value, arg, val);
						}
					}
					while (bag.TrySteal(nextIndex, out value))
					{
						cancellationToken.ThrowIfCancellationRequested();
						val = action(value, arg, val);
						if (infos.IsStopped || infos.IsBroken.Value)
						{
							break;
						}
					}
				}
				finally
				{
					destruct(val);
				}
			};
			InitTasks(tasks, num, workerMethod, options);
			try
			{
				Task.WaitAll(tasks);
			}
			catch
			{
				HandleExceptions(tasks, infos);
			}
			return new ParallelLoopResult(infos.LowestBreakIteration, !infos.IsStopped && !infos.IsExceptional);
		}

		public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body)
		{
			if (source == null)
			{
				throw new ArgumentNullException("source");
			}
			if (body == null)
			{
				throw new ArgumentNullException("body");
			}
			return ForEach(Partitioner.Create(source), ParallelOptions.Default, () => null, delegate(TSource e, ParallelLoopState s, object l)
			{
				body(e);
				return null;
			}, delegate
			{
			});
		}

		public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
		{
			if (source == null)
			{
				throw new ArgumentNullException("source");
			}
			if (body == null)
			{
				throw new ArgumentNullException("body");
			}
			return ForEach(Partitioner.Create(source), ParallelOptions.Default, () => null, delegate(TSource e, ParallelLoopState s, object l)
			{
				body(e, s);
				return null;
			}, delegate
			{
			});
		}

		public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState, long> body)
		{
			if (source == null)
			{
				throw new ArgumentNullException("source");
			}
			if (body == null)
			{
				throw new ArgumentNullException("body");
			}
			return ForEach(Partitioner.Create(source), ParallelOptions.Default, () => null, delegate(TSource e, ParallelLoopState s, object l)
			{
				body(e, s, -1L);
				return null;
			}, delegate
			{
			});
		}

		public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource, ParallelLoopState> body)
		{
			if (body == null)
			{
				throw new ArgumentNullException("body");
			}
			return ForEach(source, ParallelOptions.Default, () => null, delegate(TSource e, ParallelLoopState s, object l)
			{
				body(e, s);
				return null;
			}, delegate
			{
			});
		}

		public static ParallelLoopResult ForEach<TSource>(OrderablePartitioner<TSource> source, Action<TSource, ParallelLoopState, long> body)
		{
			if (body == null)
			{
				throw new ArgumentNullException("body");
			}
			return ForEach(source, ParallelOptions.Default, () => null, delegate(TSource e, ParallelLoopState s, long i, object l)
			{
				body(e, s, i);
				return null;
			}, delegate
			{
			});
		}

		public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body)
		{
			if (body == null)
			{
				throw new ArgumentNullException("body");
			}
			return ForEach(source, ParallelOptions.Default, () => null, delegate(TSource e, ParallelLoopState s, object l)
			{
				body(e);
				return null;
			}, delegate
			{
			});
		}

		public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource> body)
		{
			if (source == null)
			{
				throw new ArgumentNullException("source");
			}
			if (body == null)
			{
				throw new ArgumentNullException("body");
			}
			return ForEach(Partitioner.Create(source), parallelOptions, () => null, delegate(TSource e, ParallelLoopState s, object l)
			{
				body(e);
				return null;
			}, delegate
			{
			});
		}

		public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body)
		{
			if (source == null)
			{
				throw new ArgumentNullException("source");
			}
			if (body == null)
			{
				throw new ArgumentNullException("body");
			}
			return ForEach(Partitioner.Create(source), parallelOptions, () => null, delegate(TSource e, ParallelLoopState s, object l)
			{
				body(e, s);
				return null;
			}, delegate
			{
			});
		}

		public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState, long> body)
		{
			if (source == null)
			{
				throw new ArgumentNullException("source");
			}
			if (body == null)
			{
				throw new ArgumentNullException("body");
			}
			return ForEach(Partitioner.Create(source), parallelOptions, () => null, delegate(TSource e, ParallelLoopState s, long i, object l)
			{
				body(e, s, i);
				return null;
			}, delegate
			{
			});
		}

		public static ParallelLoopResult ForEach<TSource>(OrderablePartitioner<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState, long> body)
		{
			if (body == null)
			{
				throw new ArgumentNullException("body");
			}
			return ForEach(source, parallelOptions, () => null, delegate(TSource e, ParallelLoopState s, long i, object l)
			{
				body(e, s, i);
				return null;
			}, delegate
			{
			});
		}

		public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, ParallelOptions parallelOptions, Action<TSource> body)
		{
			if (body == null)
			{
				throw new ArgumentNullException("body");
			}
			return ForEach(source, parallelOptions, () => null, delegate(TSource e, ParallelLoopState s, object l)
			{
				body(e);
				return null;
			}, delegate
			{
			});
		}

		public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body)
		{
			return ForEach(source, parallelOptions, () => null, delegate(TSource e, ParallelLoopState s, object l)
			{
				body(e, s);
				return null;
			}, delegate
			{
			});
		}

		public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
		{
			if (source == null)
			{
				throw new ArgumentNullException("source");
			}
			return ForEach(Partitioner.Create(source), ParallelOptions.Default, localInit, body, localFinally);
		}

		public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally)
		{
			return ForEach(Partitioner.Create(source), ParallelOptions.Default, localInit, body, localFinally);
		}

		public static ParallelLoopResult ForEach<TSource, TLocal>(OrderablePartitioner<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally)
		{
			return ForEach(source, ParallelOptions.Default, localInit, body, localFinally);
		}

		public static ParallelLoopResult ForEach<TSource, TLocal>(Partitioner<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
		{
			return ForEach(source, ParallelOptions.Default, localInit, body, localFinally);
		}

		public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
		{
			if (source == null)
			{
				throw new ArgumentNullException("source");
			}
			return ForEach(Partitioner.Create(source), parallelOptions, localInit, body, localFinally);
		}

		public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally)
		{
			if (source == null)
			{
				throw new ArgumentNullException("source");
			}
			return ForEach(Partitioner.Create(source), parallelOptions, localInit, body, localFinally);
		}

		public static ParallelLoopResult ForEach<TSource, TLocal>(Partitioner<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
		{
			if (source == null)
			{
				throw new ArgumentNullException("source");
			}
			if (body == null)
			{
				throw new ArgumentNullException("body");
			}
			return ForEach(source.GetPartitions, parallelOptions, localInit, body, localFinally);
		}

		public static ParallelLoopResult ForEach<TSource, TLocal>(OrderablePartitioner<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally)
		{
			if (source == null)
			{
				throw new ArgumentNullException("source");
			}
			if (body == null)
			{
				throw new ArgumentNullException("body");
			}
			return ForEach(source.GetOrderablePartitions, parallelOptions, localInit, (KeyValuePair<long, TSource> e, ParallelLoopState s, TLocal l) => body(e.Value, s, e.Key, l), localFinally);
		}

		public static void Invoke(params Action[] actions)
		{
			if (actions == null)
			{
				throw new ArgumentNullException("actions");
			}
			Invoke(ParallelOptions.Default, actions);
		}

		public static void Invoke(ParallelOptions parallelOptions, params Action[] actions)
		{
			if (parallelOptions == null)
			{
				throw new ArgumentNullException("parallelOptions");
			}
			if (actions == null)
			{
				throw new ArgumentNullException("actions");
			}
			if (actions.Length == 0)
			{
				throw new ArgumentException("actions is empty");
			}
			foreach (Action a in actions)
			{
				if (a == null)
				{
					throw new ArgumentException("One action in actions is null", "actions");
				}
			}
			if (actions.Length == 1)
			{
				actions[0]();
				return;
			}
			Task[] ts = new Task[actions.Length];
			for (int i = 0; i < ts.Length; i++)
			{
				ts[i] = Task.Factory.StartNew(actions[i], parallelOptions.CancellationToken, TaskCreationOptions.None, parallelOptions.TaskScheduler);
			}
			try
			{
				Task.WaitAll(ts, parallelOptions.CancellationToken);
			}
			catch
			{
				HandleExceptions(ts);
			}
		}

		internal static Task[] SpawnBestNumber(Action action, Action callback)
		{
			return SpawnBestNumber(action, -1, callback);
		}

		internal static Task[] SpawnBestNumber(Action action, int dop, Action callback)
		{
			return SpawnBestNumber(action, dop, wait: false, callback);
		}

		internal static Task[] SpawnBestNumber(Action action, int dop, bool wait, Action callback)
		{
			int num = ((dop != -1) ? dop : (wait ? (GetBestWorkerNumber() + 1) : GetBestWorkerNumber()));
			CountdownEvent evt = new CountdownEvent(num);
			Task[] tasks = new Task[num];
			for (int i = 0; i < num; i++)
			{
				tasks[i] = Task.Factory.StartNew(delegate
				{
					action();
					evt.Signal();
					if (callback != null && evt.IsSet)
					{
						callback();
					}
				});
			}
			if (wait)
			{
				Task.WaitAll(tasks);
			}
			return tasks;
		}
	}
}