v10.10.2024.0701 使用IJsonHost改进Json序列化
大石头 编写于 2024-07-01 08:36:34 大石头 提交于 2024-07-01 08:48:33
X
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.InteropServices;

namespace System.Threading.Tasks;

public static class Parallel
{
	[StructLayout(LayoutKind.Explicit)]
	private struct StealValue64
	{
		[FieldOffset(0)]
		public long Value;

		[FieldOffset(0)]
		public int Actual;

		[FieldOffset(4)]
		public int Stolen;
	}

	private class StealRange
	{
		public StealValue64 V64 = default(StealValue64);

		public StealRange(int fromInclusive, int i, int step)
		{
			V64.Actual = fromInclusive + i * step;
		}
	}

	private static readonly bool sixtyfour = IntPtr.Size == 8;

	internal static int GetBestWorkerNumber()
	{
		return GetBestWorkerNumber(TaskScheduler.Current);
	}

	internal static int GetBestWorkerNumber(TaskScheduler scheduler)
	{
		return scheduler.MaximumConcurrencyLevel;
	}

	private static int GetBestWorkerNumber(int from, int to, ParallelOptions options, out int step)
	{
		int num = GetBestWorkerNumber(options.TaskScheduler);
		if (options != null && options.MaxDegreeOfParallelism != -1)
		{
			num = options.MaxDegreeOfParallelism;
		}
		if ((step = (to - from) / num) < 5)
		{
			step = 5;
			num = (to - from) / 5;
			if (num < 1)
			{
				num = 1;
			}
		}
		return num;
	}

	private static void HandleExceptions(IEnumerable<Task> tasks)
	{
		HandleExceptions(tasks, null);
	}

	private static void HandleExceptions(IEnumerable<Task> tasks, ParallelLoopState.ExternalInfos infos)
	{
		List<Exception> list = new List<Exception>();
		foreach (Task task in tasks)
		{
			if (task.Exception != null)
			{
				list.Add(task.Exception);
			}
		}
		if (list.Count > 0)
		{
			if (infos != null)
			{
				infos.IsExceptional = true;
			}
			throw new AggregateException(list).Flatten();
		}
	}

	private static void InitTasks(Task[] tasks, int count, Action action, ParallelOptions options)
	{
		TaskCreationOptions creationOptions = TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent;
		for (int i = 0; i < count; i++)
		{
			if (options == null)
			{
				tasks[i] = Task.Factory.StartNew(action, creationOptions);
			}
			else
			{
				tasks[i] = Task.Factory.StartNew(action, options.CancellationToken, creationOptions, options.TaskScheduler);
			}
		}
	}

	public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body)
	{
		return For(fromInclusive, toExclusive, ParallelOptions.Default, body);
	}

	public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body)
	{
		return For(fromInclusive, toExclusive, ParallelOptions.Default, body);
	}

	public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body)
	{
		return For(fromInclusive, toExclusive, parallelOptions, delegate(int index, ParallelLoopState state)
		{
			body(index);
		});
	}

	public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body)
	{
		return For(fromInclusive, toExclusive, parallelOptions, () => null, delegate(int i, ParallelLoopState s, object l)
		{
			body(i, s);
			return null;
		}, delegate
		{
		});
	}

	public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
	{
		return For(fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
	}

	public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
	{
		if (body == null)
		{
			throw new ArgumentNullException("body");
		}
		if (localInit == null)
		{
			throw new ArgumentNullException("localInit");
		}
		if (localFinally == null)
		{
			throw new ArgumentNullException("localFinally");
		}
		if (parallelOptions == null)
		{
			throw new ArgumentNullException("options");
		}
		if (fromInclusive >= toExclusive)
		{
			return new ParallelLoopResult(null, isCompleted: true);
		}
		int step;
		int num = GetBestWorkerNumber(fromInclusive, toExclusive, parallelOptions, out step);
		Task[] tasks = new Task[num];
		StealRange[] ranges = new StealRange[num];
		for (int i = 0; i < num; i++)
		{
			ranges[i] = new StealRange(fromInclusive, i, step);
		}
		ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos();
		int currentIndex = -1;
		Action action = delegate
		{
			int num2 = Interlocked.Increment(ref currentIndex);
			StealRange stealRange = ranges[num2];
			int actual = stealRange.V64.Actual;
			int num3 = ((num2 + 1 == num) ? toExclusive : Math.Min(toExclusive, actual + step));
			TLocal val = localInit();
			ParallelLoopState parallelLoopState = new ParallelLoopState(infos);
			CancellationToken cancellationToken = parallelOptions.CancellationToken;
			try
			{
				for (int num4 = actual; num4 < num3; num4 = (stealRange.V64.Actual = num4 + 1))
				{
					if (infos.IsStopped)
					{
						return;
					}
					cancellationToken.ThrowIfCancellationRequested();
					if (num4 >= num3 - stealRange.V64.Stolen)
					{
						break;
					}
					if (infos.LowestBreakIteration.HasValue && infos.LowestBreakIteration > num4)
					{
						return;
					}
					parallelLoopState.CurrentIteration = num4;
					val = body(num4, parallelLoopState, val);
					if (num4 + 1 >= num3 - stealRange.V64.Stolen)
					{
						break;
					}
				}
				int num5 = num + num2;
				for (int j = num2 + 1; j < num5; j++)
				{
					int num6 = j % num;
					stealRange = ranges[num6];
					num3 = ((num6 + 1 == num) ? toExclusive : Math.Min(toExclusive, fromInclusive + (num6 + 1) * step));
					int num7 = -1;
					while (true)
					{
						StealValue64 stealValue = default(StealValue64);
						long num8 = (stealValue.Value = (sixtyfour ? stealRange.V64.Value : Interlocked.CompareExchange(ref stealRange.V64.Value, 0L, 0L)));
						if (stealValue.Actual >= num3 - stealValue.Stolen - 2)
						{
							break;
						}
						num7 = ++stealValue.Stolen;
						if (Interlocked.CompareExchange(ref stealRange.V64.Value, stealValue.Value, num8) == num8)
						{
							num7 = num3 - num7;
							if (num7 <= stealRange.V64.Actual)
							{
								break;
							}
							val = body(num7, parallelLoopState, val);
						}
					}
				}
			}
			finally
			{
				localFinally(val);
			}
		};
		InitTasks(tasks, num, action, parallelOptions);
		try
		{
			Task.WaitAll(tasks);
		}
		catch
		{
			HandleExceptions(tasks, infos);
		}
		return new ParallelLoopResult(infos.LowestBreakIteration, !infos.IsStopped && !infos.IsExceptional);
	}

	public static ParallelLoopResult For(long fromInclusive, long toExclusive, Action<long> body)
	{
		return For(fromInclusive, toExclusive, ParallelOptions.Default, body);
	}

	public static ParallelLoopResult For(long fromInclusive, long toExclusive, Action<long, ParallelLoopState> body)
	{
		return For(fromInclusive, toExclusive, ParallelOptions.Default, body);
	}

	public static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body)
	{
		return For(fromInclusive, toExclusive, parallelOptions, delegate(long index, ParallelLoopState state)
		{
			body(index);
		});
	}

	public static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long, ParallelLoopState> body)
	{
		return For(fromInclusive, toExclusive, parallelOptions, () => null, delegate(long i, ParallelLoopState s, object l)
		{
			body(i, s);
			return null;
		}, delegate
		{
		});
	}

	public static ParallelLoopResult For<TLocal>(long fromInclusive, long toExclusive, Func<TLocal> localInit, Func<long, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
	{
		return For(fromInclusive, toExclusive, ParallelOptions.Default, localInit, body, localFinally);
	}

	public static ParallelLoopResult For<TLocal>(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<long, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
	{
		if (body == null)
		{
			throw new ArgumentNullException("body");
		}
		if (localInit == null)
		{
			throw new ArgumentNullException("localInit");
		}
		if (localFinally == null)
		{
			throw new ArgumentNullException("localFinally");
		}
		if (parallelOptions == null)
		{
			throw new ArgumentNullException("options");
		}
		if (fromInclusive >= toExclusive)
		{
			return new ParallelLoopResult(null, isCompleted: true);
		}
		throw new NotImplementedException();
	}

	private static ParallelLoopResult ForEach<TSource, TLocal>(Func<int, IList<IEnumerator<TSource>>> enumerable, ParallelOptions options, Func<TLocal> init, Func<TSource, ParallelLoopState, TLocal, TLocal> action, Action<TLocal> destruct)
	{
		if (enumerable == null)
		{
			throw new ArgumentNullException("source");
		}
		if (options == null)
		{
			throw new ArgumentNullException("options");
		}
		if (action == null)
		{
			throw new ArgumentNullException("action");
		}
		if (init == null)
		{
			throw new ArgumentNullException("init");
		}
		if (destruct == null)
		{
			throw new ArgumentNullException("destruct");
		}
		int num = Math.Min(GetBestWorkerNumber(), (options != null && options.MaxDegreeOfParallelism != -1) ? options.MaxDegreeOfParallelism : int.MaxValue);
		Task[] tasks = new Task[num];
		ParallelLoopState.ExternalInfos infos = new ParallelLoopState.ExternalInfos();
		SimpleConcurrentBag<TSource> bag = new SimpleConcurrentBag<TSource>(num);
		IList<IEnumerator<TSource>> slices = enumerable(num);
		int sliceIndex = -1;
		Action action2 = delegate
		{
			IEnumerator<TSource> enumerator = slices[Interlocked.Increment(ref sliceIndex)];
			TLocal val = init();
			ParallelLoopState arg = new ParallelLoopState(infos);
			int nextIndex = bag.GetNextIndex();
			CancellationToken cancellationToken = options.CancellationToken;
			try
			{
				bool flag = true;
				TSource value;
				while (flag)
				{
					if (infos.IsStopped || infos.IsBroken.Value)
					{
						return;
					}
					cancellationToken.ThrowIfCancellationRequested();
					for (int i = 0; i < 5; i++)
					{
						if (!(flag = enumerator.MoveNext()))
						{
							break;
						}
						bag.Add(nextIndex, enumerator.Current);
					}
					for (int j = 0; j < 5; j++)
					{
						if (!bag.TryTake(nextIndex, out value))
						{
							break;
						}
						if (infos.IsStopped)
						{
							return;
						}
						cancellationToken.ThrowIfCancellationRequested();
						val = action(value, arg, val);
					}
				}
				while (bag.TrySteal(nextIndex, out value))
				{
					cancellationToken.ThrowIfCancellationRequested();
					val = action(value, arg, val);
					if (infos.IsStopped || infos.IsBroken.Value)
					{
						break;
					}
				}
			}
			finally
			{
				destruct(val);
			}
		};
		InitTasks(tasks, num, action2, options);
		try
		{
			Task.WaitAll(tasks);
		}
		catch
		{
			HandleExceptions(tasks, infos);
		}
		return new ParallelLoopResult(infos.LowestBreakIteration, !infos.IsStopped && !infos.IsExceptional);
	}

	public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body)
	{
		if (source == null)
		{
			throw new ArgumentNullException("source");
		}
		if (body == null)
		{
			throw new ArgumentNullException("body");
		}
		return ForEach(Partitioner.Create(source), ParallelOptions.Default, () => null, delegate(TSource e, ParallelLoopState s, object l)
		{
			body(e);
			return null;
		}, delegate
		{
		});
	}

	public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
	{
		if (source == null)
		{
			throw new ArgumentNullException("source");
		}
		if (body == null)
		{
			throw new ArgumentNullException("body");
		}
		return ForEach(Partitioner.Create(source), ParallelOptions.Default, () => null, delegate(TSource e, ParallelLoopState s, object l)
		{
			body(e, s);
			return null;
		}, delegate
		{
		});
	}

	public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState, long> body)
	{
		if (source == null)
		{
			throw new ArgumentNullException("source");
		}
		if (body == null)
		{
			throw new ArgumentNullException("body");
		}
		return ForEach(Partitioner.Create(source), ParallelOptions.Default, () => null, delegate(TSource e, ParallelLoopState s, object l)
		{
			body(e, s, -1L);
			return null;
		}, delegate
		{
		});
	}

	public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource, ParallelLoopState> body)
	{
		if (body == null)
		{
			throw new ArgumentNullException("body");
		}
		return ForEach(source, ParallelOptions.Default, () => null, delegate(TSource e, ParallelLoopState s, object l)
		{
			body(e, s);
			return null;
		}, delegate
		{
		});
	}

	public static ParallelLoopResult ForEach<TSource>(OrderablePartitioner<TSource> source, Action<TSource, ParallelLoopState, long> body)
	{
		if (body == null)
		{
			throw new ArgumentNullException("body");
		}
		return ForEach(source, ParallelOptions.Default, () => null, delegate(TSource e, ParallelLoopState s, long i, object l)
		{
			body(e, s, i);
			return null;
		}, delegate
		{
		});
	}

	public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body)
	{
		if (body == null)
		{
			throw new ArgumentNullException("body");
		}
		return ForEach(source, ParallelOptions.Default, () => null, delegate(TSource e, ParallelLoopState s, object l)
		{
			body(e);
			return null;
		}, delegate
		{
		});
	}

	public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource> body)
	{
		if (source == null)
		{
			throw new ArgumentNullException("source");
		}
		if (body == null)
		{
			throw new ArgumentNullException("body");
		}
		return ForEach(Partitioner.Create(source), parallelOptions, () => null, delegate(TSource e, ParallelLoopState s, object l)
		{
			body(e);
			return null;
		}, delegate
		{
		});
	}

	public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body)
	{
		if (source == null)
		{
			throw new ArgumentNullException("source");
		}
		if (body == null)
		{
			throw new ArgumentNullException("body");
		}
		return ForEach(Partitioner.Create(source), parallelOptions, () => null, delegate(TSource e, ParallelLoopState s, object l)
		{
			body(e, s);
			return null;
		}, delegate
		{
		});
	}

	public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState, long> body)
	{
		if (source == null)
		{
			throw new ArgumentNullException("source");
		}
		if (body == null)
		{
			throw new ArgumentNullException("body");
		}
		return ForEach(Partitioner.Create(source), parallelOptions, () => null, delegate(TSource e, ParallelLoopState s, long i, object l)
		{
			body(e, s, i);
			return null;
		}, delegate
		{
		});
	}

	public static ParallelLoopResult ForEach<TSource>(OrderablePartitioner<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState, long> body)
	{
		if (body == null)
		{
			throw new ArgumentNullException("body");
		}
		return ForEach(source, parallelOptions, () => null, delegate(TSource e, ParallelLoopState s, long i, object l)
		{
			body(e, s, i);
			return null;
		}, delegate
		{
		});
	}

	public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, ParallelOptions parallelOptions, Action<TSource> body)
	{
		if (body == null)
		{
			throw new ArgumentNullException("body");
		}
		return ForEach(source, parallelOptions, () => null, delegate(TSource e, ParallelLoopState s, object l)
		{
			body(e);
			return null;
		}, delegate
		{
		});
	}

	public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body)
	{
		return ForEach(source, parallelOptions, () => null, delegate(TSource e, ParallelLoopState s, object l)
		{
			body(e, s);
			return null;
		}, delegate
		{
		});
	}

	public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
	{
		if (source == null)
		{
			throw new ArgumentNullException("source");
		}
		return ForEach(Partitioner.Create(source), ParallelOptions.Default, localInit, body, localFinally);
	}

	public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally)
	{
		return ForEach(Partitioner.Create(source), ParallelOptions.Default, localInit, body, localFinally);
	}

	public static ParallelLoopResult ForEach<TSource, TLocal>(OrderablePartitioner<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally)
	{
		return ForEach(source, ParallelOptions.Default, localInit, body, localFinally);
	}

	public static ParallelLoopResult ForEach<TSource, TLocal>(Partitioner<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
	{
		return ForEach(source, ParallelOptions.Default, localInit, body, localFinally);
	}

	public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
	{
		if (source == null)
		{
			throw new ArgumentNullException("source");
		}
		return ForEach(Partitioner.Create(source), parallelOptions, localInit, body, localFinally);
	}

	public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally)
	{
		if (source == null)
		{
			throw new ArgumentNullException("source");
		}
		return ForEach(Partitioner.Create(source), parallelOptions, localInit, body, localFinally);
	}

	public static ParallelLoopResult ForEach<TSource, TLocal>(Partitioner<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)
	{
		if (source == null)
		{
			throw new ArgumentNullException("source");
		}
		if (body == null)
		{
			throw new ArgumentNullException("body");
		}
		return ForEach(source.GetPartitions, parallelOptions, localInit, body, localFinally);
	}

	public static ParallelLoopResult ForEach<TSource, TLocal>(OrderablePartitioner<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally)
	{
		if (source == null)
		{
			throw new ArgumentNullException("source");
		}
		if (body == null)
		{
			throw new ArgumentNullException("body");
		}
		return ForEach(source.GetOrderablePartitions, parallelOptions, localInit, (KeyValuePair<long, TSource> e, ParallelLoopState s, TLocal l) => body(e.Value, s, e.Key, l), localFinally);
	}

	public static void Invoke(params Action[] actions)
	{
		if (actions == null)
		{
			throw new ArgumentNullException("actions");
		}
		Invoke(ParallelOptions.Default, actions);
	}

	public static void Invoke(ParallelOptions parallelOptions, params Action[] actions)
	{
		if (parallelOptions == null)
		{
			throw new ArgumentNullException("parallelOptions");
		}
		if (actions == null)
		{
			throw new ArgumentNullException("actions");
		}
		if (actions.Length == 0)
		{
			throw new ArgumentException("actions is empty");
		}
		foreach (Action action in actions)
		{
			if (action == null)
			{
				throw new ArgumentException("One action in actions is null", "actions");
			}
		}
		if (actions.Length == 1)
		{
			actions[0]();
			return;
		}
		Task[] array = new Task[actions.Length];
		for (int j = 0; j < array.Length; j++)
		{
			array[j] = Task.Factory.StartNew(actions[j], parallelOptions.CancellationToken, TaskCreationOptions.None, parallelOptions.TaskScheduler);
		}
		try
		{
			Task.WaitAll(array, parallelOptions.CancellationToken);
		}
		catch
		{
			HandleExceptions(array);
		}
	}

	internal static Task[] SpawnBestNumber(Action action, Action callback)
	{
		return SpawnBestNumber(action, -1, callback);
	}

	internal static Task[] SpawnBestNumber(Action action, int dop, Action callback)
	{
		return SpawnBestNumber(action, dop, wait: false, callback);
	}

	internal static Task[] SpawnBestNumber(Action action, int dop, bool wait, Action callback)
	{
		int num = ((dop != -1) ? dop : (wait ? (GetBestWorkerNumber() + 1) : GetBestWorkerNumber()));
		CountdownEvent evt = new CountdownEvent(num);
		Task[] array = new Task[num];
		for (int i = 0; i < num; i++)
		{
			array[i] = Task.Factory.StartNew(delegate
			{
				action();
				evt.Signal();
				if (callback != null && evt.IsSet)
				{
					callback();
				}
			});
		}
		if (wait)
		{
			Task.WaitAll(array);
		}
		return array;
	}
}