v10.10.2024.0701 使用IJsonHost改进Json序列化
大石头 编写于 2024-07-01 08:36:34 大石头 提交于 2024-07-01 08:48:33
X
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;

namespace System.Collections.Concurrent;
using OperationCanceledException = NewLife.OperationCanceledException;

[ComVisible(false)]
[DebuggerDisplay("Count={Count}")]
public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
{
	private readonly IProducerConsumerCollection<T> underlyingColl;

	private readonly int upperBound;

	private AtomicBoolean isComplete;

	private long completeId;

	private int addId = int.MinValue;

	private int removeId = int.MinValue;

	[ThreadStatic]
	private SpinWait sw;

	public int BoundedCapacity => upperBound;

	public int Count => underlyingColl.Count;

	public bool IsAddingCompleted => isComplete.Value;

	public bool IsCompleted
	{
		get
		{
			if (isComplete.Value)
			{
				return addId == removeId;
			}
			return false;
		}
	}

	object ICollection.SyncRoot => underlyingColl.SyncRoot;

	bool ICollection.IsSynchronized => underlyingColl.IsSynchronized;

	public BlockingCollection()
		: this((IProducerConsumerCollection<T>)new ConcurrentQueue<T>(), -1)
	{
	}

	public BlockingCollection(int upperBound)
		: this((IProducerConsumerCollection<T>)new ConcurrentQueue<T>(), upperBound)
	{
	}

	public BlockingCollection(IProducerConsumerCollection<T> underlyingColl)
		: this(underlyingColl, -1)
	{
	}

	public BlockingCollection(IProducerConsumerCollection<T> underlyingColl, int upperBound)
	{
		this.underlyingColl = underlyingColl;
		this.upperBound = upperBound;
		isComplete = new AtomicBoolean();
	}

	public void Add(T item)
	{
		Add(item, null);
	}

	public void Add(T item, CancellationToken token)
	{
		Add(item, () => token.IsCancellationRequested);
	}

	private void Add(T item, Func<bool> cancellationFunc)
	{
		while (true)
		{
			int num = addId;
			int num2 = removeId;
			if (upperBound != -1 && num - num2 > upperBound)
			{
				Block();
				continue;
			}
			if (isComplete.Value && num >= completeId)
			{
				throw new InvalidOperationException("The BlockingCollection<T> has been marked as complete with regards to additions.");
			}
			if (Interlocked.CompareExchange(ref addId, num + 1, num) == num)
			{
				break;
			}
			if (cancellationFunc != null && cancellationFunc())
			{
				throw new OperationCanceledException("CancellationToken triggered");
			}
		}
		if (!underlyingColl.TryAdd(item))
		{
			throw new InvalidOperationException("The underlying collection didn't accept the item.");
		}
	}

	public T Take()
	{
		return Take(null);
	}

	public T Take(CancellationToken token)
	{
		return Take(() => token.IsCancellationRequested);
	}

	private T Take(Func<bool> cancellationFunc)
	{
		while (true)
		{
			int num = removeId;
			int num2 = addId;
			if (num == num2)
			{
				if (IsCompleted)
				{
					throw new OperationCanceledException("The BlockingCollection<T> has been marked as complete with regards to additions.");
				}
				Block();
				continue;
			}
			if (Interlocked.CompareExchange(ref removeId, num + 1, num) == num)
			{
				break;
			}
			if (cancellationFunc == null || !cancellationFunc())
			{
				continue;
			}
			throw new OperationCanceledException("The CancellationToken has had cancellation requested.");
		}
		T item;
		while (!underlyingColl.TryTake(out item))
		{
		}
		return item;
	}

	public bool TryAdd(T item)
	{
		return TryAdd(item, null, null);
	}

	private bool TryAdd(T item, Func<bool> contFunc, CancellationToken? token)
	{
		do
		{
			if (token.HasValue && token.Value.IsCancellationRequested)
			{
				throw new OperationCanceledException("The CancellationToken has had cancellation requested.");
			}
			int num = addId;
			int num2 = removeId;
			if (upperBound != -1 && num - num2 > upperBound)
			{
				continue;
			}
			if (isComplete.Value && num >= completeId)
			{
				throw new InvalidOperationException("The BlockingCollection<T> has been marked as complete with regards to additions.");
			}
			if (Interlocked.CompareExchange(ref addId, num + 1, num) == num)
			{
				if (!underlyingColl.TryAdd(item))
				{
					throw new InvalidOperationException("The underlying collection didn't accept the item.");
				}
				return true;
			}
		}
		while (contFunc != null && contFunc());
		return false;
	}

	public bool TryAdd(T item, TimeSpan ts)
	{
		return TryAdd(item, (int)ts.TotalMilliseconds);
	}

	public bool TryAdd(T item, int millisecondsTimeout)
	{
		Watch stopwatch = Watch.StartNew();
		return TryAdd(item, () => stopwatch.ElapsedMilliseconds < millisecondsTimeout, null);
	}

	public bool TryAdd(T item, int millisecondsTimeout, CancellationToken token)
	{
		Watch stopwatch = Watch.StartNew();
		return TryAdd(item, () => stopwatch.ElapsedMilliseconds < millisecondsTimeout, token);
	}

	public bool TryTake(out T item)
	{
		return TryTake(out item, null, null);
	}

	private bool TryTake(out T item, Func<bool> contFunc, CancellationToken? token)
	{
		item = default(T);
		do
		{
			if (token.HasValue && token.Value.IsCancellationRequested)
			{
				throw new OperationCanceledException("The CancellationToken has had cancellation requested.");
			}
			int num = removeId;
			int num2 = addId;
			if (num == num2)
			{
				if (IsCompleted)
				{
					return false;
				}
			}
			else if (Interlocked.CompareExchange(ref removeId, num + 1, num) == num)
			{
				return underlyingColl.TryTake(out item);
			}
		}
		while (contFunc != null && contFunc());
		return false;
	}

	public bool TryTake(out T item, TimeSpan ts)
	{
		return TryTake(out item, (int)ts.TotalMilliseconds);
	}

	public bool TryTake(out T item, int millisecondsTimeout)
	{
		item = default(T);
		Watch sw = Watch.StartNew();
		return TryTake(out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
	}

	public bool TryTake(out T item, int millisecondsTimeout, CancellationToken token)
	{
		item = default(T);
		Watch sw = Watch.StartNew();
		return TryTake(out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
	}

	private static void CheckArray(BlockingCollection<T>[] collections)
	{
		if (collections == null)
		{
			throw new ArgumentNullException("collections");
		}
		if (collections.Length == 0 || IsThereANullElement(collections))
		{
			throw new ArgumentException("The collections argument is a 0-length array or contains a null element.", "collections");
		}
	}

	private static bool IsThereANullElement(BlockingCollection<T>[] collections)
	{
		foreach (BlockingCollection<T> blockingCollection in collections)
		{
			if (blockingCollection == null)
			{
				return true;
			}
		}
		return false;
	}

	public static int AddToAny(BlockingCollection<T>[] collections, T item)
	{
		CheckArray(collections);
		int num = 0;
		foreach (BlockingCollection<T> blockingCollection in collections)
		{
			try
			{
				blockingCollection.Add(item);
				return num;
			}
			catch
			{
			}
			num++;
		}
		return -1;
	}

	public static int AddToAny(BlockingCollection<T>[] collections, T item, CancellationToken token)
	{
		CheckArray(collections);
		int num = 0;
		foreach (BlockingCollection<T> blockingCollection in collections)
		{
			try
			{
				blockingCollection.Add(item, token);
				return num;
			}
			catch
			{
			}
			num++;
		}
		return -1;
	}

	public static int TryAddToAny(BlockingCollection<T>[] collections, T item)
	{
		CheckArray(collections);
		int num = 0;
		foreach (BlockingCollection<T> blockingCollection in collections)
		{
			if (blockingCollection.TryAdd(item))
			{
				return num;
			}
			num++;
		}
		return -1;
	}

	public static int TryAddToAny(BlockingCollection<T>[] collections, T item, TimeSpan ts)
	{
		CheckArray(collections);
		int num = 0;
		foreach (BlockingCollection<T> blockingCollection in collections)
		{
			if (blockingCollection.TryAdd(item, ts))
			{
				return num;
			}
			num++;
		}
		return -1;
	}

	public static int TryAddToAny(BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
	{
		CheckArray(collections);
		int num = 0;
		foreach (BlockingCollection<T> blockingCollection in collections)
		{
			if (blockingCollection.TryAdd(item, millisecondsTimeout))
			{
				return num;
			}
			num++;
		}
		return -1;
	}

	public static int TryAddToAny(BlockingCollection<T>[] collections, T item, int millisecondsTimeout, CancellationToken token)
	{
		CheckArray(collections);
		int num = 0;
		foreach (BlockingCollection<T> blockingCollection in collections)
		{
			if (blockingCollection.TryAdd(item, millisecondsTimeout, token))
			{
				return num;
			}
			num++;
		}
		return -1;
	}

	public static int TakeFromAny(BlockingCollection<T>[] collections, out T item)
	{
		item = default(T);
		CheckArray(collections);
		int num = 0;
		foreach (BlockingCollection<T> blockingCollection in collections)
		{
			try
			{
				item = blockingCollection.Take();
				return num;
			}
			catch
			{
			}
			num++;
		}
		return -1;
	}

	public static int TakeFromAny(BlockingCollection<T>[] collections, out T item, CancellationToken token)
	{
		item = default(T);
		CheckArray(collections);
		int num = 0;
		foreach (BlockingCollection<T> blockingCollection in collections)
		{
			try
			{
				item = blockingCollection.Take(token);
				return num;
			}
			catch
			{
			}
			num++;
		}
		return -1;
	}

	public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item)
	{
		item = default(T);
		CheckArray(collections);
		int num = 0;
		foreach (BlockingCollection<T> blockingCollection in collections)
		{
			if (blockingCollection.TryTake(out item))
			{
				return num;
			}
			num++;
		}
		return -1;
	}

	public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, TimeSpan ts)
	{
		item = default(T);
		CheckArray(collections);
		int num = 0;
		foreach (BlockingCollection<T> blockingCollection in collections)
		{
			if (blockingCollection.TryTake(out item, ts))
			{
				return num;
			}
			num++;
		}
		return -1;
	}

	public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
	{
		item = default(T);
		CheckArray(collections);
		int num = 0;
		foreach (BlockingCollection<T> blockingCollection in collections)
		{
			if (blockingCollection.TryTake(out item, millisecondsTimeout))
			{
				return num;
			}
			num++;
		}
		return -1;
	}

	public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, CancellationToken token)
	{
		item = default(T);
		CheckArray(collections);
		int num = 0;
		foreach (BlockingCollection<T> blockingCollection in collections)
		{
			if (blockingCollection.TryTake(out item, millisecondsTimeout, token))
			{
				return num;
			}
			num++;
		}
		return -1;
	}

	public void CompleteAdding()
	{
		completeId = addId;
		isComplete.Value = true;
	}

	void ICollection.CopyTo(Array array, int index)
	{
		underlyingColl.CopyTo(array, index);
	}

	public void CopyTo(T[] array, int index)
	{
		underlyingColl.CopyTo(array, index);
	}

	public IEnumerable<T> GetConsumingEnumerable()
	{
		return GetConsumingEnumerable(Take);
	}

	public IEnumerable<T> GetConsumingEnumerable(CancellationToken token)
	{
		return GetConsumingEnumerable(() => Take(token));
	}

	private IEnumerable<T> GetConsumingEnumerable(Func<T> getFunc)
	{
		while (true)
		{
			T item;
			try
			{
				item = getFunc();
			}
			catch
			{
				break;
			}
			yield return item;
		}
	}

	IEnumerator IEnumerable.GetEnumerator()
	{
		return ((IEnumerable)underlyingColl).GetEnumerator();
	}

	IEnumerator<T> IEnumerable<T>.GetEnumerator()
	{
		return underlyingColl.GetEnumerator();
	}

	public void Dispose()
	{
	}

	protected virtual void Dispose(bool managedRes)
	{
	}

	public T[] ToArray()
	{
		return underlyingColl.ToArray();
	}

	private void Block()
	{
		sw.SpinOnce();
	}
}