v7.3.2018.0614   重构高性能资源池,减少GC压力,增加线程池,让异步任务得到平等竞争CPU的机会
大石头 编写于 2018-06-14 17:56:44
X
using System;
using System.Collections.Generic;
using System.Text;
using NewLife.Data;
using NewLife.Net.MQTT.Packets;

namespace NewLife.Net.MQTT
{
    public sealed class MqttEncoder : MessageToMessageEncoder<DataPacket>
    {
        public static readonly MqttEncoder Instance = new MqttEncoder();
        const Int32 PacketIdLength = 2;
        const Int32 StringSizeLength = 2;
        const Int32 MaxVariableLength = 4;

        protected override void Encode(IHandlerContext context, DataPacket message, List<Object> output) => DoEncode(context.Allocator, message, output);

        public override Boolean IsSharable => true;

        /// <summary>
        ///     This is the main encoding method.
        ///     It's only visible for testing.
        ///     @param bufferAllocator Allocates ByteBuf
        ///     @param packet MQTT packet to encode
        ///     @return ByteBuf with encoded bytes
        /// </summary>
        internal static void DoEncode(IByteBufferAllocator bufferAllocator, DataPacket packet, List<Object> output)
        {
            switch (packet.PacketType)
            {
                case PacketType.CONNECT:
                    EncodeConnectMessage(bufferAllocator, (ConnectPacket)packet, output);
                    break;
                case PacketType.CONNACK:
                    EncodeConnAckMessage(bufferAllocator, (ConnAckPacket)packet, output);
                    break;
                case PacketType.PUBLISH:
                    EncodePublishMessage(bufferAllocator, (PublishPacket)packet, output);
                    break;
                case PacketType.PUBACK:
                case PacketType.PUBREC:
                case PacketType.PUBREL:
                case PacketType.PUBCOMP:
                case PacketType.UNSUBACK:
                    EncodePacketWithIdOnly(bufferAllocator, (PacketWithId)packet, output);
                    break;
                case PacketType.SUBSCRIBE:
                    EncodeSubscribeMessage(bufferAllocator, (SubscribePacket)packet, output);
                    break;
                case PacketType.SUBACK:
                    EncodeSubAckMessage(bufferAllocator, (SubAckPacket)packet, output);
                    break;
                case PacketType.UNSUBSCRIBE:
                    EncodeUnsubscribeMessage(bufferAllocator, (UnsubscribePacket)packet, output);
                    break;
                case PacketType.PINGREQ:
                case PacketType.PINGRESP:
                case PacketType.DISCONNECT:
                    EncodePacketWithFixedHeaderOnly(bufferAllocator, packet, output);
                    break;
                default:
                    throw new ArgumentException("Unknown packet type: " + packet.PacketType, nameof(packet));
            }
        }

        static void EncodeConnectMessage(IByteBufferAllocator bufferAllocator, ConnectPacket packet, List<Object> output)
        {
            var payloadBufferSize = 0;

            // Client id
            var clientId = packet.ClientId;
            Util.ValidateClientId(clientId);
            var clientIdBytes = EncodeStringInUtf8(clientId);
            payloadBufferSize += StringSizeLength + clientIdBytes.Length;

            Byte[] willTopicBytes;
            Packet willMessage;
            if (packet.HasWill)
            {
                // Will topic and message
                var willTopic = packet.WillTopicName;
                willTopicBytes = EncodeStringInUtf8(willTopic);
                willMessage = packet.WillMessage;
                payloadBufferSize += StringSizeLength + willTopicBytes.Length;
                payloadBufferSize += 2 + willMessage.ReadableBytes;
            }
            else
            {
                willTopicBytes = null;
                willMessage = null;
            }

            var userName = packet.Username;
            Byte[] userNameBytes;
            if (packet.HasUsername)
            {
                userNameBytes = EncodeStringInUtf8(userName);
                payloadBufferSize += StringSizeLength + userNameBytes.Length;
            }
            else
            {
                userNameBytes = null;
            }

            Byte[] passwordBytes;
            if (packet.HasPassword)
            {
                var password = packet.Password;
                passwordBytes = EncodeStringInUtf8(password);
                payloadBufferSize += StringSizeLength + passwordBytes.Length;
            }
            else
            {
                passwordBytes = null;
            }

            // Fixed header
            var protocolNameBytes = EncodeStringInUtf8(Util.ProtocolName);
            var variableHeaderBufferSize = StringSizeLength + protocolNameBytes.Length + 4;
            var variablePartSize = variableHeaderBufferSize + payloadBufferSize;
            var fixedHeaderBufferSize = 1 + MaxVariableLength;
            Packet buf = null;
            try
            {
                buf = bufferAllocator.Buffer(fixedHeaderBufferSize + variablePartSize);
                buf.WriteByte(CalculateFirstByteOfFixedHeader(packet));
                WriteVariableLengthInt(buf, variablePartSize);

                buf.WriteShort(protocolNameBytes.Length);
                buf.WriteBytes(protocolNameBytes);

                buf.WriteByte(Util.ProtocolLevel);
                buf.WriteByte(CalculateConnectFlagsByte(packet));
                buf.WriteShort(packet.KeepAliveInSeconds);

                // Payload
                buf.WriteShort(clientIdBytes.Length);
                buf.WriteBytes(clientIdBytes, 0, clientIdBytes.Length);
                if (packet.HasWill)
                {
                    buf.WriteShort(willTopicBytes.Length);
                    buf.WriteBytes(willTopicBytes, 0, willTopicBytes.Length);
                    buf.WriteShort(willMessage.ReadableBytes);
                    if (willMessage.IsReadable())
                    {
                        buf.WriteBytes(willMessage);
                    }
                    willMessage.Release();
                    willMessage = null;
                }
                if (packet.HasUsername)
                {
                    buf.WriteShort(userNameBytes.Length);
                    buf.WriteBytes(userNameBytes, 0, userNameBytes.Length);

                    if (packet.HasPassword)
                    {
                        buf.WriteShort(passwordBytes.Length);
                        buf.WriteBytes(passwordBytes, 0, passwordBytes.Length);
                    }
                }

                output.Add(buf);
                buf = null;
            }
            finally
            {
                buf?.SafeRelease();
                willMessage?.SafeRelease();
            }
        }

        static Int32 CalculateConnectFlagsByte(ConnectPacket packet)
        {
            var flagByte = 0;
            if (packet.HasUsername)
            {
                flagByte |= 0x80;
            }
            if (packet.HasPassword)
            {
                flagByte |= 0x40;
            }
            if (packet.HasWill)
            {
                flagByte |= 0x04;
                flagByte |= ((Int32)packet.WillQualityOfService & 0x03) << 3;
                if (packet.WillRetain)
                {
                    flagByte |= 0x20;
                }
            }
            if (packet.CleanSession)
            {
                flagByte |= 0x02;
            }
            return flagByte;
        }

        static void EncodeConnAckMessage(IByteBufferAllocator bufferAllocator, ConnAckPacket message, List<Object> output)
        {
            Packet buffer = null;
            try
            {
                buffer = bufferAllocator.Buffer(4);
                buffer.WriteByte(CalculateFirstByteOfFixedHeader(message));
                buffer.WriteByte(2); // remaining length
                if (message.SessionPresent)
                {
                    buffer.WriteByte(1); // 7 reserved 0-bits and SP = 1
                }
                else
                {
                    buffer.WriteByte(0); // 7 reserved 0-bits and SP = 0
                }
                buffer.WriteByte((Byte)message.ReturnCode);


                output.Add(buffer);
                buffer = null;
            }
            finally
            {
                buffer?.SafeRelease();
            }
        }

        static void EncodePublishMessage(IByteBufferAllocator bufferAllocator, PublishPacket packet, List<Object> output)
        {
            Packet payload = packet.Payload ?? Unpooled.Empty;

            var topicName = packet.TopicName;
            Util.ValidateTopicName(topicName);
            var topicNameBytes = EncodeStringInUtf8(topicName);

            Int32 variableHeaderBufferSize = StringSizeLength + topicNameBytes.Length +
                (packet.QualityOfService > QualityOfService.AtMostOnce ? PacketIdLength : 0);
            Int32 payloadBufferSize = payload.ReadableBytes;
            var variablePartSize = variableHeaderBufferSize + payloadBufferSize;
            var fixedHeaderBufferSize = 1 + MaxVariableLength;

            Packet buf = null;
            try
            {
                buf = bufferAllocator.Buffer(fixedHeaderBufferSize + variablePartSize);
                buf.WriteByte(CalculateFirstByteOfFixedHeader(packet));
                WriteVariableLengthInt(buf, variablePartSize);
                buf.WriteShort(topicNameBytes.Length);
                buf.WriteBytes(topicNameBytes);
                if (packet.QualityOfService > QualityOfService.AtMostOnce)
                {
                    buf.WriteShort(packet.PacketId);
                }

                output.Add(buf);
                buf = null;
            }
            finally
            {
                buf?.SafeRelease();
            }

            if (payload.IsReadable())
            {
                output.Add(payload.Retain());
            }
        }

        static void EncodePacketWithIdOnly(IByteBufferAllocator bufferAllocator, PacketWithId packet, List<Object> output)
        {
            var msgId = packet.PacketId;

            const Int32 VariableHeaderBufferSize = PacketIdLength; // variable part only has a packet id
            var fixedHeaderBufferSize = 1 + MaxVariableLength;
            Packet buffer = null;
            try
            {
                buffer = bufferAllocator.Buffer(fixedHeaderBufferSize + VariableHeaderBufferSize);
                buffer.WriteByte(CalculateFirstByteOfFixedHeader(packet));
                WriteVariableLengthInt(buffer, VariableHeaderBufferSize);
                buffer.WriteShort(msgId);

                output.Add(buffer);
                buffer = null;
            }
            finally
            {
                buffer?.SafeRelease();
            }
        }

        static void EncodeSubscribeMessage(IByteBufferAllocator bufferAllocator, SubscribePacket packet, List<Object> output)
        {
            const Int32 VariableHeaderSize = PacketIdLength;
            var payloadBufferSize = 0;

            ThreadLocalObjectList encodedTopicFilters = ThreadLocalObjectList.NewInstance();

            Packet buf = null;
            try
            {
                foreach (var topic in packet.Requests)
                {
                    var topicFilterBytes = EncodeStringInUtf8(topic.TopicFilter);
                    payloadBufferSize += StringSizeLength + topicFilterBytes.Length + 1; // length, value, QoS
                    encodedTopicFilters.Add(topicFilterBytes);
                }

                var variablePartSize = VariableHeaderSize + payloadBufferSize;
                var fixedHeaderBufferSize = 1 + MaxVariableLength;

                buf = bufferAllocator.Buffer(fixedHeaderBufferSize + variablePartSize);
                buf.WriteByte(CalculateFirstByteOfFixedHeader(packet));
                WriteVariableLengthInt(buf, variablePartSize);

                // Variable Header
                buf.WriteShort(packet.PacketId); // todo: review: validate?

                // Payload
                for (var i = 0; i < encodedTopicFilters.Count; i++)
                {
                    var topicFilterBytes = (Byte[])encodedTopicFilters[i];
                    buf.WriteShort(topicFilterBytes.Length);
                    buf.WriteBytes(topicFilterBytes, 0, topicFilterBytes.Length);
                    buf.WriteByte((Int32)packet.Requests[i].QualityOfService);
                }

                output.Add(buf);
                buf = null;
            }
            finally
            {
                buf?.SafeRelease();
                encodedTopicFilters.Return();
            }
        }

        static void EncodeSubAckMessage(IByteBufferAllocator bufferAllocator, SubAckPacket message, List<Object> output)
        {
            var payloadBufferSize = message.ReturnCodes.Count;
            var variablePartSize = PacketIdLength + payloadBufferSize;
            var fixedHeaderBufferSize = 1 + MaxVariableLength;
            Packet buf = null;
            try
            {
                buf = bufferAllocator.Buffer(fixedHeaderBufferSize + variablePartSize);
                buf.WriteByte(CalculateFirstByteOfFixedHeader(message));
                WriteVariableLengthInt(buf, variablePartSize);
                buf.WriteShort(message.PacketId);
                foreach (QualityOfService qos in message.ReturnCodes)
                {
                    buf.WriteByte((Byte)qos);
                }

                output.Add(buf);
                buf = null;

            }
            finally
            {
                buf?.SafeRelease();
            }
        }

        static void EncodeUnsubscribeMessage(IByteBufferAllocator bufferAllocator, UnsubscribePacket packet, List<Object> output)
        {
            const Int32 VariableHeaderSize = 2;
            var payloadBufferSize = 0;

            ThreadLocalObjectList encodedTopicFilters = ThreadLocalObjectList.NewInstance();

            Packet buf = null;
            try
            {
                foreach (var topic in packet.TopicFilters)
                {
                    var topicFilterBytes = EncodeStringInUtf8(topic);
                    payloadBufferSize += StringSizeLength + topicFilterBytes.Length; // length, value
                    encodedTopicFilters.Add(topicFilterBytes);
                }

                var variablePartSize = VariableHeaderSize + payloadBufferSize;
                var fixedHeaderBufferSize = 1 + MaxVariableLength;

                buf = bufferAllocator.Buffer(fixedHeaderBufferSize + variablePartSize);
                buf.WriteByte(CalculateFirstByteOfFixedHeader(packet));
                WriteVariableLengthInt(buf, variablePartSize);

                // Variable Header
                buf.WriteShort(packet.PacketId); // todo: review: validate?

                // Payload
                for (var i = 0; i < encodedTopicFilters.Count; i++)
                {
                    var topicFilterBytes = (Byte[])encodedTopicFilters[i];
                    buf.WriteShort(topicFilterBytes.Length);
                    buf.WriteBytes(topicFilterBytes, 0, topicFilterBytes.Length);
                }

                output.Add(buf);
                buf = null;
            }
            finally
            {
                buf?.SafeRelease();
                encodedTopicFilters.Return();
            }
        }

        static void EncodePacketWithFixedHeaderOnly(IByteBufferAllocator bufferAllocator, DataPacket packet, List<Object> output)
        {
            Packet buffer = null;
            try
            {
                buffer = bufferAllocator.Buffer(2);
                buffer.WriteByte(CalculateFirstByteOfFixedHeader(packet));
                buffer.WriteByte(0);

                output.Add(buffer);
                buffer = null;
            }
            finally
            {
                buffer?.SafeRelease();
            }
        }

        static Int32 CalculateFirstByteOfFixedHeader(DataPacket packet)
        {
            var ret = 0;
            ret |= (Int32)packet.PacketType << 4;
            if (packet.Duplicate)
            {
                ret |= 0x08;
            }
            ret |= (Int32)packet.QualityOfService << 1;
            if (packet.RetainRequested)
            {
                ret |= 0x01;
            }
            return ret;
        }

        static void WriteVariableLengthInt(Packet buffer, Int32 value)
        {
            do
            {
                var digit = value % 128;
                value /= 128;
                if (value > 0)
                {
                    digit |= 0x80;
                }
                buffer.WriteByte(digit);
            }
            while (value > 0);
        }

        static Byte[] EncodeStringInUtf8(String s)
        {
            // todo: validate against extra limitations per MQTT's UTF-8 string definition
            return Encoding.UTF8.GetBytes(s);
        }
    }
}