发布1203
大石头 编写于 2019-12-03 22:57:54
X
#if !NET4
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Threading.Tasks;
using NewLife.Collections;
using NewLife.Data;
using NewLife.Log;
using NewLife.Reflection;
using NewLife.Serialization;

namespace NewLife.Remoting
{
    /// <summary>Http应用接口客户端</summary>
    public class ApiHttpClient : IApiClient
    {
        #region 属性
        /// <summary>令牌。每次请求携带</summary>
        public String Token { get; set; }

        /// <summary>超时时间。默认15000ms</summary>
        public Int32 Timeout { get; set; } = 15_000;

        /// <summary>服务器源。正在使用的服务器</summary>
        public String Source { get; private set; }

        /// <summary>调用统计</summary>
        public ICounter StatInvoke { get; set; }

        /// <summary>慢追踪。远程调用或处理时间超过该值时,输出慢调用日志,默认5000ms</summary>
        public Int32 SlowTrace { get; set; } = 5_000;

        private readonly IList<ServiceItem> _Items = new List<ServiceItem>();
        #endregion

        #region 构造
        /// <summary>实例化</summary>
        public ApiHttpClient() { }

        /// <summary>实例化</summary>
        /// <param name="url"></param>
        public ApiHttpClient(String url) => Add("Default", new Uri(url));
        #endregion

        #region 方法
        /// <summary>添加服务地址</summary>
        /// <param name="name"></param>
        /// <param name="address"></param>
        public void Add(String name, Uri address) => _Items.Add(new ServiceItem { Name = name, Address = address });
        #endregion

        #region 核心方法
        /// <summary>异步获取,参数构造在Url</summary>
        /// <param name="action">服务操作</param>
        /// <param name="args">参数</param>
        /// <returns></returns>
        public async Task<TResult> GetAsync<TResult>(String action, Object args = null) => await InvokeAsync<TResult>(HttpMethod.Get, action, args);

        /// <summary>异步提交,参数Json打包在Body</summary>
        /// <param name="action">服务操作</param>
        /// <param name="args">参数</param>
        /// <returns></returns>
        public async Task<TResult> PostAsync<TResult>(String action, Object args = null) => await InvokeAsync<TResult>(HttpMethod.Post, action, args);

        /// <summary>异步调用,等待返回结果</summary>
        /// <typeparam name="TResult"></typeparam>
        /// <param name="method">请求方法</param>
        /// <param name="action">服务操作</param>
        /// <param name="args">参数</param>
        /// <returns></returns>
        public virtual async Task<TResult> InvokeAsync<TResult>(HttpMethod method, String action, Object args = null)
        {
            var rtype = typeof(TResult);

            // 发起请求
            var msg = await SendAsync(method, action, args, rtype);
            if (rtype == typeof(HttpResponseMessage)) return (TResult)(Object)msg;

            var code = msg.StatusCode;
            var buf = await msg.Content.ReadAsByteArrayAsync();
            if (buf == null || buf.Length == 0) return default;

            // 异常处理
            if (code != HttpStatusCode.OK)
            {
                var invoker = _Items[_Index]?.Address + "";
                var err = buf.ToStr()?.Trim('\"');
                if (err.IsNullOrEmpty()) err = msg.ReasonPhrase;
                throw new ApiException((Int32)code, $"远程[{invoker}]错误! {err}");
            }

            // 原始数据
            if (rtype == typeof(Byte[])) return (TResult)(Object)buf;
            if (rtype == typeof(Packet)) return (TResult)(Object)new Packet(buf);

            var str = buf.ToStr();
            var js = new JsonParser(str).Decode() as IDictionary<String, Object>;
            var data = js["data"];
            var code2 = js["code"].ToInt();
            if (code2 != 0 && code2 != 200)
            {
                var invoker = _Items[_Index]?.Address + "";
                if (data == null) data = js["msg"];
                throw new ApiException(code2, $"远程[{invoker}]错误! {data}");
            }

            // 简单类型
            if (rtype.GetTypeCode() != TypeCode.Object) return data.ChangeType<TResult>();

            // 反序列化
            if (data == null) return default;

            if (!(data is IDictionary<String, Object>) && !(data is IList<Object>)) throw new InvalidDataException("未识别响应数据");

            return JsonHelper.Convert<TResult>(data);
        }

        /// <summary>异步调用,等待返回结果</summary>
        /// <typeparam name="TResult"></typeparam>
        /// <param name="action">服务操作</param>
        /// <param name="args">参数</param>
        /// <returns></returns>
        async Task<TResult> IApiClient.InvokeAsync<TResult>(String action, Object args) => await InvokeAsync<TResult>(HttpMethod.Post, action, args);

        /// <summary>同步调用,阻塞等待</summary>
        /// <param name="action">服务操作</param>
        /// <param name="args">参数</param>
        /// <returns></returns>
        TResult IApiClient.Invoke<TResult>(String action, Object args) => Task.Run(() => InvokeAsync<TResult>(HttpMethod.Post, action, args)).Result;
        #endregion

        #region 构造请求
        /// <summary>建立请求</summary>
        /// <param name="method">请求方法</param>
        /// <param name="action"></param>
        /// <param name="args"></param>
        /// <param name="returnType"></param>
        /// <returns></returns>
        protected virtual HttpRequestMessage BuildRequest(HttpMethod method, String action, Object args, Type returnType)
        {
            // 序列化参数,决定GET/POST
            var request = new HttpRequestMessage(HttpMethod.Get, action);
            if (returnType != typeof(Byte[]) && returnType != typeof(Packet))
                request.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));

            if (method == HttpMethod.Get)
            {
                var ps = args?.ToDictionary();
                var url = GetUrl(action, ps);
                if (!Token.IsNullOrEmpty())
                {
                    url += url.Contains("?") ? "&" : "?";
                    url += $"token={Token}";
                }
                request.RequestUri = new Uri(url, UriKind.RelativeOrAbsolute);
            }
            else
            {
                FillContent(request, args);
                //if (!Token.IsNullOrEmpty()) request.Headers.Add("X-Token", Token);
                if (!Token.IsNullOrEmpty()) request.Headers.Add("Authorization", "Bearer " + Token);
            }

            return request;
        }

        private void FillContent(HttpRequestMessage request, Object args)
        {
            if (args is Packet pk)
            {
                var content =
                    pk.Next == null ?
                    new ByteArrayContent(pk.Data, pk.Offset, pk.Count) :
                    new ByteArrayContent(pk.ToArray());
                content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
                request.Content = content;
            }
            else if (args is Byte[] buf)
            {
                var content = new ByteArrayContent(buf);
                content.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");
                request.Content = content;
            }
            else if (args != null)
            {
                var content = new ByteArrayContent(args.ToJson().GetBytes());
                content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
                request.Content = content;
            }
            request.Method = HttpMethod.Post;
        }

        private static String Encode(String data)
        {
            if (String.IsNullOrEmpty(data)) return String.Empty;

            return Uri.EscapeDataString(data).Replace("%20", "+");
        }

        private static String GetUrl(String action, IDictionary<String, Object> ps)
        {
            var url = action;
            if (ps != null && ps.Count > 0)
            {
                var sb = Pool.StringBuilder.Get();
                sb.Append(action);
                if (action.Contains("?"))
                    sb.Append("&");
                else
                    sb.Append("?");

                var first = true;
                foreach (var item in ps)
                {
                    if (!first) sb.Append("&");
                    first = false;

                    sb.AppendFormat("{0}={1}", Encode(item.Key), Encode("{0}".F(item.Value)));
                }

                url = sb.Put(true);
            }

            return url;
        }
        #endregion

        #region 调度池
        private Int32 _Index;
        /// <summary>异步发送</summary>
        /// <param name="method"></param>
        /// <param name="action"></param>
        /// <param name="returnType"></param>
        /// <param name="args"></param>
        /// <returns></returns>
        protected virtual async Task<HttpResponseMessage> SendAsync(HttpMethod method, String action, Object args, Type returnType)
        {
            var ms = _Items;
            if (ms.Count == 0) throw new InvalidOperationException("未添加服务地址!");

            Exception error = null;
            for (var i = 0; i < ms.Count; i++)
            {
                // 序列化参数,决定GET/POST
                var request = BuildRequest(method, action, args, returnType);

                var service = ms[_Index];
                Source = service.Name;

                // 性能计数器,次数、TPS、平均耗时
                var st = StatInvoke;
                var sw = st.StartCount();
                try
                {
                    if (service.Client == null)
                    {
                        WriteLog("使用[{0}]:{1}", service.Name, service.Address);

                        service.Client = new HttpClient
                        {
                            BaseAddress = service.Address,
                            Timeout = TimeSpan.FromMilliseconds(Timeout)
                        };
                    }

                    return await SendOnServiceAsync(request, service.Name, service.Client);
                }
                catch (Exception ex)
                {
                    if (error == null) error = ex;

                    service.Client = null;
                }
                finally
                {
                    var msCost = st.StopCount(sw) / 1000;
                    if (SlowTrace > 0 && msCost >= SlowTrace) WriteLog($"慢调用[{request.RequestUri.AbsoluteUri}],耗时{msCost:n0}ms");
                }

                _Index++;
                if (_Index >= ms.Count) _Index = 0;
            }

            throw error;
        }

        /// <summary>在指定服务地址上发生请求</summary>
        /// <param name="request"></param>
        /// <param name="serviceName"></param>
        /// <param name="client"></param>
        /// <returns></returns>
        protected virtual async Task<HttpResponseMessage> SendOnServiceAsync(HttpRequestMessage request, String serviceName, HttpClient client)
        {
            var rs = await client.SendAsync(request);
            // 业务层只会返回200 OK
            rs.EnsureSuccessStatusCode();

            return rs;
        }
        #endregion

        #region 内嵌
        /// <summary>服务项</summary>
        class ServiceItem
        {
            /// <summary>名称</summary>
            public String Name { get; set; }

            /// <summary>名称</summary>
            public Uri Address { get; set; }

            /// <summary>名称</summary>
            public HttpClient Client { get; set; }
        }
        #endregion

        #region 日志
        /// <summary>日志</summary>
        public ILog Log { get; set; } = Logger.Null;

        /// <summary>写日志</summary>
        /// <param name="format"></param>
        /// <param name="args"></param>
        public void WriteLog(String format, params Object[] args) => Log?.Info(format, args);
        #endregion
    }
}
#endif