v10.10.2024.0701 使用IJsonHost改进Json序列化
大石头 编写于 2024-07-01 08:36:34 大石头 提交于 2024-07-01 08:48:33
X
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading;
using NewLife.Threading;

#if !NET4
namespace System.Collections.Concurrent
{
    /// <summary>并行队列</summary>
    /// <remarks>
    /// 非常巧妙的设计,用数据段组成链表,然后数据段内部是一个数组,既满足了并发冲突要求,又满足了性能要求。
    /// </remarks>
    /// <typeparam name="T"></typeparam>
    public class ConcurrentQueue<T> : IEnumerable<T>, ICollection, IEnumerable
    {
        #region 属性
        /// <summary>链表头部</summary>
        private Segment _head;

        /// <summary>链表尾部</summary>
        private Segment _tail;

        /// <summary>数据段大小</summary>
        private const int SEGMENT_SIZE = 0x20;

        /// <summary>当前并行数</summary>
        [NonSerialized]
        internal int _Takers;

        private Int32 _Count;
        /// <summary>元素个数</summary>
        public int Count { get { return _Count; } }
        //{
        //    get
        //    {
        //        Segment segment;
        //        Segment segment2;
        //        int num;
        //        int num2;
        //        GetHeadTailPositions(out segment, out segment2, out num, out num2);
        //        if (segment == segment2) return num2 - num + 1;
        //        int num3 = SEGMENT_SIZE - num;
        //        num3 += SEGMENT_SIZE * (((int)(segment2.m_index - segment.m_index)) - 1);
        //        return num3 + (num2 + 1);
        //    }
        //}

        /// <summary>是否空</summary>
        public bool IsEmpty { get { return _Count == 0; } }
        //{
        //    get
        //    {
        //        var head = m_head;
        //        if (head.IsEmpty)
        //        {
        //            if (head.Next == null) return true;

        //            var wait = new SpinWait();
        //            while (head.IsEmpty)
        //            {
        //                if (head.Next == null) return true;
        //                wait.SpinOnce();
        //                head = m_head;
        //            }
        //        }
        //        return false;
        //    }
        //}
        #endregion

        #region 构造
        /// <summary>实例化</summary>
        public ConcurrentQueue()
        {
            _head = _tail = new Segment(0, this);
        }

        /// <summary>使用集合初始化</summary>
        /// <param name="collection"></param>
        public ConcurrentQueue(IEnumerable<T> collection)
        {
            if (collection == null) throw new ArgumentNullException("collection");

            var segment = new Segment(0, this);
            _head = segment;
            int num = 0;
            foreach (var local in collection)
            {
                segment.UnsafeAdd(local);
                num++;
                _Count++;

                // 超过大小,数组增长
                if (num >= SEGMENT_SIZE)
                {
                    segment = segment.UnsafeGrow();
                    num = 0;
                }
            }
            _tail = segment;
        }
        #endregion

        #region 核心方法
        /// <summary>进入队列</summary>
        /// <param name="item"></param>
        public void Enqueue(T item)
        {
            var wait = new SpinWait();
            while (!_tail.TryAppend(item))
            {
                wait.SpinOnce();
            }

            Interlocked.Increment(ref _Count);
        }

        /// <summary>尝试出列</summary>
        /// <param name="result"></param>
        /// <returns></returns>
        public bool TryDequeue(out T result)
        {
            while (!IsEmpty)
            {
                if (_head.TryRemove(out result))
                {
                    Interlocked.Decrement(ref _Count);
                    return true;
                }
            }
            result = default(T);
            return false;
        }

        /// <summary>尝试获取一个,不出列</summary>
        /// <param name="result"></param>
        /// <returns></returns>
        public bool TryPeek(out T result)
        {
            Interlocked.Increment(ref _Takers);
            while (!IsEmpty)
            {
                if (_head.TryPeek(out result))
                {
                    Interlocked.Decrement(ref _Takers);
                    return true;
                }
            }
            result = default(T);
            Interlocked.Decrement(ref _Takers);
            return false;
        }
        #endregion

        #region 辅助方法
        /// <summary>转为数组</summary>
        /// <returns></returns>
        public T[] ToArray() { return ToList().ToArray(); }

        private List<T> ToList()
        {
            Interlocked.Increment(ref _Takers);
            var list = new List<T>();
            try
            {
                Segment seg1;
                Segment seg2;
                int num;
                int num2;
                GetHeadTailPositions(out seg1, out seg2, out num, out num2);
                if (seg1 == seg2)
                {
                    seg1.AddToList(list, num, num2);
                    return list;
                }
                seg1.AddToList(list, num, SEGMENT_SIZE - 1);
                for (var seg = seg1.Next; seg != seg2; seg = seg.Next)
                {
                    seg.AddToList(list, 0, SEGMENT_SIZE - 1);
                }
                seg2.AddToList(list, 0, num2);
            }
            finally
            {
                Interlocked.Decrement(ref _Takers);
            }
            return list;
        }

        private void GetHeadTailPositions(out Segment head, out Segment tail, out int headLow, out int tailHigh)
        {
            head = _head;
            tail = _tail;
            headLow = head.Low;
            tailHigh = tail.High;

            var wait = new SpinWait();
            while (head != _head || tail != _tail || headLow != head.Low || tailHigh != tail.High || head.m_index > tail.m_index)
            {
                wait.SpinOnce();
                head = _head;
                tail = _tail;
                headLow = head.Low;
                tailHigh = tail.High;
            }
        }
        #endregion

        #region 内部
        [StructLayout(LayoutKind.Sequential)]
        internal struct VolatileBool
        {
            public volatile bool m_value;

            public VolatileBool(bool value) { m_value = value; }
        }

        /// <summary>数据段。每个段有一个数组,用于保存数据</summary>
        private class Segment
        {
            internal volatile T[] m_array;
            private int m_high;
            private int m_low;
            internal readonly long m_index;

            private volatile Segment m_next;
            private volatile ConcurrentQueue<T> m_source;
            internal volatile VolatileBool[] m_state;

            internal Segment(long index, ConcurrentQueue<T> source)
            {
                m_array = new T[SEGMENT_SIZE];
                m_state = new VolatileBool[SEGMENT_SIZE];
                m_high = -1;
                m_index = index;
                m_source = source;
            }

            internal void AddToList(List<T> list, int start, int end)
            {
                for (int i = start; i <= end; i++)
                {
                    var wait = new SpinWait();
                    while (!m_state[i].m_value)
                    {
                        wait.SpinOnce();
                    }
                    list.Add(m_array[i]);
                }
            }

            internal void Grow()
            {
                var segment = new Segment(m_index + 1, m_source);
                m_next = segment;
                m_source._tail = m_next;
            }

            internal bool TryAppend(T value)
            {
                if (m_high >= SEGMENT_SIZE - 1) return false;
                int index = SEGMENT_SIZE;

                index = Interlocked.Increment(ref m_high);
                if (index <= SEGMENT_SIZE - 1)
                {
                    m_array[index] = value;
                    m_state[index].m_value = true;
                }
                if (index == SEGMENT_SIZE - 1) Grow();

                return index <= SEGMENT_SIZE - 1;
            }

            internal bool TryPeek(out T result)
            {
                result = default(T);
                int low = Low;
                if (low > High) return false;

                var wait = new SpinWait();
                while (!m_state[low].m_value)
                {
                    wait.SpinOnce();
                }
                result = m_array[low];
                return true;
            }

            internal bool TryRemove(out T result)
            {
                var wait = new SpinWait();
                int low = Low;
                for (int i = High; low <= i; i = High)
                {
                    if (Interlocked.CompareExchange(ref m_low, low + 1, low) == low)
                    {
                        var wait2 = new SpinWait();
                        while (!m_state[low].m_value)
                        {
                            wait2.SpinOnce();
                        }
                        result = m_array[low];
                        if (m_source._Takers <= 0) m_array[low] = default(T);
                        if (low + 1 >= SEGMENT_SIZE)
                        {
                            wait2 = new SpinWait();
                            while (m_next == null)
                            {
                                wait2.SpinOnce();
                            }
                            m_source._head = m_next;
                        }
                        return true;
                    }
                    wait.SpinOnce();
                    low = Low;
                }
                result = default(T);
                return false;
            }

            /// <summary>不安全添加</summary>
            /// <param name="value"></param>
            internal void UnsafeAdd(T value)
            {
                m_high++;
                m_array[m_high] = value;
                m_state[m_high].m_value = true;
            }

            /// <summary>不安全增长</summary>
            /// <returns></returns>
            internal Segment UnsafeGrow()
            {
                var segment = new Segment(m_index + 1, m_source);
                m_next = segment;
                return segment;
            }

            internal int High { get { return Math.Min(m_high, SEGMENT_SIZE - 1); } }

            internal bool IsEmpty { get { return Low > High; } }

            internal int Low { get { return Math.Min(m_low, SEGMENT_SIZE); } }

            internal Segment Next { get { return m_next; } }
        }
        #endregion

        #region IEnumerable 成员
        IEnumerator<T> IEnumerable<T>.GetEnumerator()
        {
            Segment segment;
            Segment segment2;
            int num;
            int num2;
            Interlocked.Increment(ref _Takers);
            GetHeadTailPositions(out segment, out segment2, out num, out num2);

            for (var seg = _head; seg != null; seg = seg.Next)
            {
                for (int i = seg.Low; i < seg.High; i++)
                {
                    T rs = default(T);
                    if (seg.TryPeek(out rs)) yield return rs;
                }
            }

            yield break;
        }

        IEnumerator IEnumerable.GetEnumerator() { return (this as IEnumerable<T>).GetEnumerator(); }
        #endregion

        #region ICollection 成员
        void ICollection.CopyTo(Array array, int index)
        {
            if (array == null) throw new ArgumentNullException("array");
            //this.ToList().CopyTo(array, index);
        }

        bool ICollection.IsSynchronized { get { return false; } }

        object ICollection.SyncRoot { get { throw new NotSupportedException(); } }
        #endregion
    }
}
#endif