PG分页使用 offset + limit
大石头 编写于 2022-02-15 17:14:03
X
using System;
using System.Collections.Concurrent;
using System.Data;
using System.Data.Common;
using System.Linq;
using System.Runtime.InteropServices;
using NewLife;
using NewLife.Collections;
using NewLife.Log;
using NewLife.Net;
using TDengineDriver;
using XCode.DataAccessLayer;
using TD = TDengineDriver.TDengine;

namespace XCode.TDengine
{
    /// <summary>数据库连接</summary>
    public partial class TDengineConnection : DbConnection
    {
        #region 属性
        internal IntPtr _handler;
        internal Int32 _transactionLevel;

        /// <summary>连接字符串</summary>
        public override String ConnectionString { get; set; }

        private String _version = String.Empty;
        /// <summary>服务器版本</summary>
        public override String ServerVersion
        {
            get
            {
                if (_handler == IntPtr.Zero) throw new XCodeException("连接未打开");

                if (_version.IsNullOrEmpty())
                    _version = Marshal.PtrToStringAnsi(TD.GetServerInfo(_handler));

                return _version;
            }
        }

        private ConnectionState _state;
        /// <summary>状态</summary>
        public override ConnectionState State => _state;

        /// <summary>数据提供者工厂</summary>
        protected override DbProviderFactory DbProviderFactory => TDengineFactory.Instance;

        /// <summary>事务</summary>
        protected internal virtual TDengineTransaction Transaction { get; set; }

        private String _Database;
        /// <summary>数据库</summary>
        public override String Database => _Database;

        private String _DataSource;
        /// <summary>数据源</summary>
        public override String DataSource => _DataSource;
        #endregion

        #region 构造
        static TDengineConnection()
        {
            var configPath = "C:/TDengine/cfg";

            if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
                configPath = "/etc/taos";

            TD.Options((Int32)TDengineInitOption.TDDB_OPTION_CONFIGDIR, configPath);
            TD.Options((Int32)TDengineInitOption.TDDB_OPTION_SHELL_ACTIVITY_TIMER, "60");
            TD.Init();

            var h = TD.GetClientInfo();
            if (h != IntPtr.Zero)
            {
                var str = Marshal.PtrToStringAnsi(h);
                XTrace.WriteLine("TDengine v{0}", str);
            }

            AppDomain.CurrentDomain.DomainUnload += (s, e) => TD.Cleanup();
        }

        /// <summary>销毁</summary>
        /// <param name="disposing"></param>
        protected override void Dispose(Boolean disposing)
        {
            if (disposing) Close();

            base.Dispose(disposing);
        }
        #endregion

        #region 核心方法
        private void SetState(ConnectionState value)
        {
            var originalState = _state;
            if (originalState != value)
            {
                _state = value;
                OnStateChange(new StateChangeEventArgs(originalState, value));
            }
        }

        /// <summary>打开连接</summary>
        public override void Open()
        {
            if (State == ConnectionState.Open) return;

            var connStr = ConnectionString;
            if (connStr.IsNullOrEmpty()) throw new InvalidOperationException("未设置连接字符串");

            if (_DataSource.IsNullOrEmpty() || _Database.IsNullOrEmpty())
            {
                var builder = new ConnectionStringBuilder(connStr);
                _DataSource = builder["DataSource"] ?? builder["Server"];
                _Database = builder["Database"] ?? builder["Db"];
            }

            var pool = GetPool(ConnectionString);
            _handler = pool.Get();

            SetState(ConnectionState.Open);

            //ChangeDatabase(db);
        }

        /// <summary>关闭连接</summary>
        public override void Close()
        {
#if DEBUG
            //XTrace.WriteLine("State={1} 断开TDengine:server={0}", _DataSource, State);
#endif

            //if (State != ConnectionState.Closed) TD.Close(_handler);

            var pool = GetPool(ConnectionString);
            pool.Put(_handler);
            _handler = IntPtr.Zero;

            Transaction?.Dispose();

            SetState(ConnectionState.Closed);
        }

        /// <summary>执行</summary>
        /// <param name="sql"></param>
        /// <returns></returns>
        public Int32 Execute(String sql)
        {
            using var cmd = CreateCommand();
            cmd.CommandText = sql;
            return cmd.ExecuteNonQuery();
        }
        #endregion

        #region 连接池
        private class MyPool : ObjectPool<IntPtr>
        {
            public String ConnectionString { get; set; }

            protected override IntPtr OnCreate()
            {
                var connStr = ConnectionString;
                if (connStr.IsNullOrEmpty()) throw new InvalidOperationException("未设置连接字符串");

                var builder = new ConnectionStringBuilder(connStr);
                var dataSource = builder["DataSource"] ?? builder["Server"];
                var port = builder["Port"].ToInt();
                //if (port <= 0) port = 6030;

                var user = builder["username"] ?? builder["user"] ?? builder["uid"];
                var pass = builder["password"] ?? builder["pass"] ?? builder["pwd"];
                var db = builder["database"] ?? builder["db"];

                var uri = new NetUri(dataSource);
                if (port > 0) uri.Port = port;
#if DEBUG
                XTrace.WriteLine("连接TDengine:server={0};user={1};pass={2};db={3}", dataSource, user, pass, db);
#endif

                var handler = TD.Connect(uri.Address + "", user, pass, db, (Int16)uri.Port);
                if (handler == IntPtr.Zero) throw new XCodeException("打开数据库连接失败!");

                return handler;
            }

            protected override void OnDispose(IntPtr value)
            {
#if DEBUG
                XTrace.WriteLine("断开TDengine");
#endif
                TD.Close(value);
            }
        }

        private static ConcurrentDictionary<String, IPool<IntPtr>> _cache = new();
        internal static IPool<IntPtr> GetPool(String connStr)
        {
            return _cache.GetOrAdd(connStr, k => new MyPool
            {
                Name = "TDPool",
                ConnectionString = k,
                Min = 1,
                Max = 1000,
                IdleTime = 20,
                AllIdleTime = 120,
#if DEBUG
                //Log = XTrace.Log,
#endif
            });
        }
        #endregion

        #region 辅助方法
        /// <summary>创建命令</summary>
        /// <returns></returns>
        protected override DbCommand CreateDbCommand() => new TDengineCommand { Connection = this, Transaction = Transaction };

        /// <summary>开始事务</summary>
        /// <param name="isolationLevel"></param>
        /// <returns></returns>
        protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) => Transaction = new TDengineTransaction(this, isolationLevel);

        /// <summary>改变数据库</summary>
        /// <param name="databaseName"></param>
        public override void ChangeDatabase(String databaseName)
        {
            if (_Database.IsNullOrEmpty() || _Database != databaseName)
            {
                //Int32 result = TD.SelectDatabase(_handler, databaseName);
                //if (result == 0) _Database = databaseName;
            }
        }
        #endregion
    }
}