消除各种编译警告
智能大石头 authored at 2023-03-08 21:47:28
11.80 KiB
X_NET20
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;

namespace System.Collections.Concurrent
{
	[ComVisible(false)]
	[DebuggerDisplay("Count={Count}")]
	public class BlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
	{
		private readonly IProducerConsumerCollection<T> underlyingColl;
        private AtomicBoolean isComplete;

		private Int64 completeId;

		private Int32 addId = Int32.MinValue;

		private Int32 removeId = Int32.MinValue;

		[ThreadStatic]
		private SpinWait sw;

        public Int32 BoundedCapacity { get; }

        public Int32 Count => underlyingColl.Count;

		public Boolean IsAddingCompleted => isComplete.Value;

		public Boolean IsCompleted
		{
			get
			{
				if (isComplete.Value)
				{
					return addId == removeId;
				}
				return false;
			}
		}

        Object ICollection.SyncRoot => underlyingColl.SyncRoot;

        Boolean ICollection.IsSynchronized => underlyingColl.IsSynchronized;

		public BlockingCollection()
			: this((IProducerConsumerCollection<T>)new ConcurrentQueue<T>(), -1)
		{
		}

		public BlockingCollection(Int32 upperBound)
			: this((IProducerConsumerCollection<T>)new ConcurrentQueue<T>(), upperBound)
		{
		}

		public BlockingCollection(IProducerConsumerCollection<T> underlyingColl)
			: this(underlyingColl, -1)
		{
		}

		public BlockingCollection(IProducerConsumerCollection<T> underlyingColl, Int32 upperBound)
		{
			this.underlyingColl = underlyingColl;
			this.BoundedCapacity = upperBound;
			isComplete = new AtomicBoolean();
		}

		public void Add(T item)
		{
			Add(item, null);
		}

		public void Add(T item, CancellationToken token)
		{
			Add(item, () => token.IsCancellationRequested);
		}

		private void Add(T item, Func<Boolean> cancellationFunc)
		{
			while (true)
			{
				var cachedAddId = addId;
				var cachedRemoveId = removeId;
				if (BoundedCapacity != -1 && cachedAddId - cachedRemoveId > BoundedCapacity)
				{
					Block();
					continue;
				}
				if (isComplete.Value && cachedAddId >= completeId)
				{
					throw new InvalidOperationException("The BlockingCollection<T> has been marked as complete with regards to additions.");
				}
				if (Interlocked.CompareExchange(ref addId, cachedAddId + 1, cachedAddId) == cachedAddId)
				{
					break;
				}
				if (cancellationFunc != null && cancellationFunc())
				{
					throw new OperationCanceledException("CancellationToken triggered");
				}
			}
			if (!underlyingColl.TryAdd(item))
			{
				throw new InvalidOperationException("The underlying collection didn't accept the item.");
			}
		}

		public T Take()
		{
			return Take(null);
		}

		public T Take(CancellationToken token)
		{
			return Take(() => token.IsCancellationRequested);
		}

		private T Take(Func<Boolean> cancellationFunc)
		{
			while (true)
			{
				var cachedRemoveId = removeId;
				var cachedAddId = addId;
				if (cachedRemoveId == cachedAddId)
				{
					if (IsCompleted)
					{
						throw new OperationCanceledException("The BlockingCollection<T> has been marked as complete with regards to additions.");
					}
					Block();
					continue;
				}
				if (Interlocked.CompareExchange(ref removeId, cachedRemoveId + 1, cachedRemoveId) == cachedRemoveId)
				{
					break;
				}
				if (cancellationFunc == null || !cancellationFunc())
				{
					continue;
				}
				throw new OperationCanceledException("The CancellationToken has had cancellation requested.");
			}
			T item;
			while (!underlyingColl.TryTake(out item))
			{
			}
			return item;
		}

		public Boolean TryAdd(T item)
		{
			return TryAdd(item, null, null);
		}

		private Boolean TryAdd(T item, Func<Boolean> contFunc, CancellationToken? token)
		{
			do
			{
				if (token.HasValue && token.Value.IsCancellationRequested)
				{
					throw new OperationCanceledException("The CancellationToken has had cancellation requested.");
				}
				var cachedAddId = addId;
				var cachedRemoveId = removeId;
				if (BoundedCapacity != -1 && cachedAddId - cachedRemoveId > BoundedCapacity)
				{
					continue;
				}
				if (isComplete.Value && cachedAddId >= completeId)
				{
					throw new InvalidOperationException("The BlockingCollection<T> has been marked as complete with regards to additions.");
				}
				if (Interlocked.CompareExchange(ref addId, cachedAddId + 1, cachedAddId) == cachedAddId)
				{
					if (!underlyingColl.TryAdd(item))
					{
						throw new InvalidOperationException("The underlying collection didn't accept the item.");
					}
					return true;
				}
			}
			while (contFunc != null && contFunc());
			return false;
		}

		public Boolean TryAdd(T item, TimeSpan ts)
		{
			return TryAdd(item, (Int32)ts.TotalMilliseconds);
		}

		public Boolean TryAdd(T item, Int32 millisecondsTimeout)
		{
			var stopwatch = Watch.StartNew();
			return TryAdd(item, () => stopwatch.ElapsedMilliseconds < millisecondsTimeout, null);
		}

		public Boolean TryAdd(T item, Int32 millisecondsTimeout, CancellationToken token)
		{
			var stopwatch = Watch.StartNew();
			return TryAdd(item, () => stopwatch.ElapsedMilliseconds < millisecondsTimeout, token);
		}

		public Boolean TryTake(out T item)
		{
			return TryTake(out item, null, null);
		}

		private Boolean TryTake(out T item, Func<Boolean> contFunc, CancellationToken? token)
		{
			item = default(T);
			do
			{
				if (token.HasValue && token.Value.IsCancellationRequested)
				{
					throw new OperationCanceledException("The CancellationToken has had cancellation requested.");
				}
				var cachedRemoveId = removeId;
				var cachedAddId = addId;
				if (cachedRemoveId == cachedAddId)
				{
					if (IsCompleted)
					{
						return false;
					}
				}
				else if (Interlocked.CompareExchange(ref removeId, cachedRemoveId + 1, cachedRemoveId) == cachedRemoveId)
				{
					return underlyingColl.TryTake(out item);
				}
			}
			while (contFunc != null && contFunc());
			return false;
		}

		public Boolean TryTake(out T item, TimeSpan ts)
		{
			return TryTake(out item, (Int32)ts.TotalMilliseconds);
		}

		public Boolean TryTake(out T item, Int32 millisecondsTimeout)
		{
			item = default(T);
			var sw = Watch.StartNew();
			return TryTake(out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
		}

		public Boolean TryTake(out T item, Int32 millisecondsTimeout, CancellationToken token)
		{
			item = default(T);
			var sw = Watch.StartNew();
			return TryTake(out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
		}

		private static void CheckArray(BlockingCollection<T>[] collections)
		{
			if (collections == null)
			{
				throw new ArgumentNullException("collections");
			}
			if (collections.Length == 0 || IsThereANullElement(collections))
			{
				throw new ArgumentException("The collections argument is a 0-length array or contains a null element.", "collections");
			}
		}

		private static Boolean IsThereANullElement(BlockingCollection<T>[] collections)
		{
			foreach (var e in collections)
			{
				if (e == null)
				{
					return true;
				}
			}
			return false;
		}

		public static Int32 AddToAny(BlockingCollection<T>[] collections, T item)
		{
			CheckArray(collections);
			var index = 0;
			foreach (var coll in collections)
			{
				try
				{
					coll.Add(item);
					return index;
				}
				catch
				{
				}
				index++;
			}
			return -1;
		}

		public static Int32 AddToAny(BlockingCollection<T>[] collections, T item, CancellationToken token)
		{
			CheckArray(collections);
			var index = 0;
			foreach (var coll in collections)
			{
				try
				{
					coll.Add(item, token);
					return index;
				}
				catch
				{
				}
				index++;
			}
			return -1;
		}

		public static Int32 TryAddToAny(BlockingCollection<T>[] collections, T item)
		{
			CheckArray(collections);
			var index = 0;
			foreach (var coll in collections)
			{
				if (coll.TryAdd(item))
				{
					return index;
				}
				index++;
			}
			return -1;
		}

		public static Int32 TryAddToAny(BlockingCollection<T>[] collections, T item, TimeSpan ts)
		{
			CheckArray(collections);
			var index = 0;
			foreach (var coll in collections)
			{
				if (coll.TryAdd(item, ts))
				{
					return index;
				}
				index++;
			}
			return -1;
		}

		public static Int32 TryAddToAny(BlockingCollection<T>[] collections, T item, Int32 millisecondsTimeout)
		{
			CheckArray(collections);
			var index = 0;
			foreach (var coll in collections)
			{
				if (coll.TryAdd(item, millisecondsTimeout))
				{
					return index;
				}
				index++;
			}
			return -1;
		}

		public static Int32 TryAddToAny(BlockingCollection<T>[] collections, T item, Int32 millisecondsTimeout, CancellationToken token)
		{
			CheckArray(collections);
			var index = 0;
			foreach (var coll in collections)
			{
				if (coll.TryAdd(item, millisecondsTimeout, token))
				{
					return index;
				}
				index++;
			}
			return -1;
		}

		public static Int32 TakeFromAny(BlockingCollection<T>[] collections, out T item)
		{
			item = default(T);
			CheckArray(collections);
			var index = 0;
			foreach (var coll in collections)
			{
				try
				{
					item = coll.Take();
					return index;
				}
				catch
				{
				}
				index++;
			}
			return -1;
		}

		public static Int32 TakeFromAny(BlockingCollection<T>[] collections, out T item, CancellationToken token)
		{
			item = default(T);
			CheckArray(collections);
			var index = 0;
			foreach (var coll in collections)
			{
				try
				{
					item = coll.Take(token);
					return index;
				}
				catch
				{
				}
				index++;
			}
			return -1;
		}

		public static Int32 TryTakeFromAny(BlockingCollection<T>[] collections, out T item)
		{
			item = default(T);
			CheckArray(collections);
			var index = 0;
			foreach (var coll in collections)
			{
				if (coll.TryTake(out item))
				{
					return index;
				}
				index++;
			}
			return -1;
		}

		public static Int32 TryTakeFromAny(BlockingCollection<T>[] collections, out T item, TimeSpan ts)
		{
			item = default(T);
			CheckArray(collections);
			var index = 0;
			foreach (var coll in collections)
			{
				if (coll.TryTake(out item, ts))
				{
					return index;
				}
				index++;
			}
			return -1;
		}

		public static Int32 TryTakeFromAny(BlockingCollection<T>[] collections, out T item, Int32 millisecondsTimeout)
		{
			item = default(T);
			CheckArray(collections);
			var index = 0;
			foreach (var coll in collections)
			{
				if (coll.TryTake(out item, millisecondsTimeout))
				{
					return index;
				}
				index++;
			}
			return -1;
		}

		public static Int32 TryTakeFromAny(BlockingCollection<T>[] collections, out T item, Int32 millisecondsTimeout, CancellationToken token)
		{
			item = default(T);
			CheckArray(collections);
			var index = 0;
			foreach (var coll in collections)
			{
				if (coll.TryTake(out item, millisecondsTimeout, token))
				{
					return index;
				}
				index++;
			}
			return -1;
		}

		public void CompleteAdding()
		{
			completeId = addId;
			isComplete.Value = true;
		}

		void ICollection.CopyTo(Array array, Int32 index)
		{
			underlyingColl.CopyTo(array, index);
		}

		public void CopyTo(T[] array, Int32 index)
		{
			underlyingColl.CopyTo(array, index);
		}

		public IEnumerable<T> GetConsumingEnumerable()
		{
			return GetConsumingEnumerable(Take);
		}

		public IEnumerable<T> GetConsumingEnumerable(CancellationToken token)
		{
			return GetConsumingEnumerable(() => Take(token));
		}

		private IEnumerable<T> GetConsumingEnumerable(Func<T> getFunc)
		{
			while (true)
			{
				T item;
				try
				{
					item = getFunc();
				}
				catch
				{
					yield break;
				}
				yield return item;
			}
		}

		IEnumerator IEnumerable.GetEnumerator()
		{
			return ((IEnumerable)underlyingColl).GetEnumerator();
		}

		IEnumerator<T> IEnumerable<T>.GetEnumerator()
		{
			return underlyingColl.GetEnumerator();
		}

		public void Dispose()
		{
		}

		protected virtual void Dispose(Boolean managedRes)
		{
		}

		public T[] ToArray()
		{
			return underlyingColl.ToArray();
		}

		private void Block()
		{
			sw.SpinOnce();
		}
	}
}