使用索引,简化字符串截取
智能大石头 编写于 2021-12-23 14:06:56
X
using System;
using System.Collections.Generic;
using System.Data;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using NewLife;
using NewLife.Caching;
using NewLife.Collections;
using NewLife.Data;

namespace XCode.DataAccessLayer
{
    partial class DAL
    {
        #region 属性
        [ThreadStatic]
        private static Int32 _QueryTimes;
        /// <summary>查询次数</summary>
        public static Int32 QueryTimes => _QueryTimes;

        [ThreadStatic]
        private static Int32 _ExecuteTimes;
        /// <summary>执行次数</summary>
        public static Int32 ExecuteTimes => _ExecuteTimes;

        /// <summary>只读实例。读写分离时,读取操作分走</summary>
        public DAL ReadOnly { get; set; }

        /// <summary>读写分离策略。忽略时间区间和表名</summary>
        public ReadWriteStrategy Strategy { get; set; } = new ReadWriteStrategy();
        #endregion

        #region 数据操作方法
        /// <summary>根据条件把普通查询SQL格式化为分页SQL。</summary>
        /// <param name="builder">查询生成器</param>
        /// <param name="startRowIndex">开始行,0表示第一行</param>
        /// <param name="maximumRows">最大返回行数,0表示所有行</param>
        /// <returns>分页SQL</returns>
        public SelectBuilder PageSplit(SelectBuilder builder, Int64 startRowIndex, Int64 maximumRows)
        {
            if (startRowIndex <= 0 && maximumRows <= 0) return builder;

            // 2016年7月2日 HUIYUE 取消分页SQL缓存,此部分缓存提升性能不多,但有可能会造成分页数据不准确,感觉得不偿失
            return Db.PageSplit(builder.Clone(), startRowIndex, maximumRows);
        }

        /// <summary>执行SQL查询,返回记录集</summary>
        /// <param name="sql">SQL语句</param>
        /// <returns></returns>
        public DataSet Select(String sql)
        {
            return QueryByCache(sql, "", "", (s, k2, k3) => Session.Query(s), nameof(Select));
        }

        /// <summary>执行SQL查询,返回记录集</summary>
        /// <param name="builder">SQL语句</param>
        /// <param name="startRowIndex">开始行,0表示第一行</param>
        /// <param name="maximumRows">最大返回行数,0表示所有行</param>
        /// <returns></returns>
        public DataSet Select(SelectBuilder builder, Int64 startRowIndex, Int64 maximumRows)
        {
            return QueryByCache(builder, startRowIndex, maximumRows, (sb, start, max) =>
            {
                sb = PageSplit(sb, start, max);
                return Session.Query(sb.ToString(), CommandType.Text, sb.Parameters.ToArray());
            }, nameof(Select));
        }

        /// <summary>执行SQL查询,返回记录集</summary>
        /// <param name="builder">SQL语句</param>
        /// <param name="startRowIndex">开始行,0表示第一行</param>
        /// <param name="maximumRows">最大返回行数,0表示所有行</param>
        /// <returns></returns>
        public DbTable Query(SelectBuilder builder, Int64 startRowIndex, Int64 maximumRows)
        {
            return QueryByCache(builder, startRowIndex, maximumRows, (sb, start, max) =>
            {
                sb = PageSplit(sb, start, max);
                return Session.Query(sb.ToString(), sb.Parameters.ToArray());
            }, nameof(Query));
        }

        /// <summary>执行SQL查询,返回记录集</summary>
        /// <param name="sql">SQL语句</param>
        /// <param name="ps">命令参数</param>
        /// <returns></returns>
        public DbTable Query(String sql, IDictionary<String, Object> ps = null)
        {
            return QueryByCache(sql, ps, "", (s, p, k3) => Session.Query(s, Db.CreateParameters(p)), nameof(Query));
        }

        /// <summary>执行SQL查询,返回总记录数</summary>
        /// <param name="sb">查询生成器</param>
        /// <returns></returns>
        public Int32 SelectCount(SelectBuilder sb)
        {
            return (Int32)QueryByCache(sb, "", "", (s, k2, k3) => Session.QueryCount(s), nameof(SelectCount));
        }

        /// <summary>执行SQL查询,返回总记录数</summary>
        /// <param name="sql">SQL语句</param>
        /// <param name="type">命令类型,默认SQL文本</param>
        /// <param name="ps">命令参数</param>
        /// <returns></returns>
        public Int32 SelectCount(String sql, CommandType type, params IDataParameter[] ps)
        {
            return (Int32)QueryByCache(sql, type, ps, (s, t, p) => Session.QueryCount(s, t, p), nameof(SelectCount));
        }

        /// <summary>执行SQL语句,返回受影响的行数</summary>
        /// <param name="sql">SQL语句</param>
        /// <returns></returns>
        public Int32 Execute(String sql)
        {
            return ExecuteByCache(sql, "", "", (s, t, p) => Session.Execute(s));
        }

        /// <summary>执行插入语句并返回新增行的自动编号</summary>
        /// <param name="sql"></param>
        /// <returns>新增行的自动编号</returns>
        public Int64 InsertAndGetIdentity(String sql)
        {
            return ExecuteByCache(sql, "", "", (s, t, p) => Session.InsertAndGetIdentity(s));
        }

        /// <summary>执行SQL查询,返回记录集</summary>
        /// <param name="sql">SQL语句</param>
        /// <param name="type">命令类型,默认SQL文本</param>
        /// <param name="ps">命令参数</param>
        /// <returns></returns>
        public DataSet Select(String sql, CommandType type, params IDataParameter[] ps)
        {
            return QueryByCache(sql, type, ps, (s, t, p) => Session.Query(s, t, p), nameof(Select));
        }

        /// <summary>执行SQL语句,返回受影响的行数</summary>
        /// <param name="sql">SQL语句</param>
        /// <param name="type">命令类型,默认SQL文本</param>
        /// <param name="ps">命令参数</param>
        /// <returns></returns>
        public Int32 Execute(String sql, CommandType type, params IDataParameter[] ps)
        {
            return ExecuteByCache(sql, type, ps, (s, t, p) => Session.Execute(s, t, p));
        }

        /// <summary>执行插入语句并返回新增行的自动编号</summary>
        /// <param name="sql"></param>
        /// <param name="type">命令类型,默认SQL文本</param>
        /// <param name="ps">命令参数</param>
        /// <returns>新增行的自动编号</returns>
        public Int64 InsertAndGetIdentity(String sql, CommandType type, params IDataParameter[] ps)
        {
            return ExecuteByCache(sql, type, ps, (s, t, p) => Session.InsertAndGetIdentity(s, t, p));
        }

        /// <summary>执行SQL查询,返回记录集</summary>
        /// <param name="sql">SQL语句</param>
        /// <param name="type">命令类型,默认SQL文本</param>
        /// <param name="ps">命令参数</param>
        /// <returns></returns>
        public DataSet Select(String sql, CommandType type, IDictionary<String, Object> ps)
        {
            return QueryByCache(sql, type, ps, (s, t, p) => Session.Query(s, t, Db.CreateParameters(p)), nameof(Select));
        }

        /// <summary>执行SQL语句,返回受影响的行数</summary>
        /// <param name="sql">SQL语句</param>
        /// <param name="type">命令类型,默认SQL文本</param>
        /// <param name="ps">命令参数</param>
        /// <returns></returns>
        public Int32 Execute(String sql, CommandType type, IDictionary<String, Object> ps)
        {
            return ExecuteByCache(sql, type, ps, (s, t, p) => Session.Execute(s, t, Db.CreateParameters(p)));
        }

        /// <summary>执行SQL语句,返回受影响的行数</summary>
        /// <param name="sql">SQL语句</param>
        /// <param name="commandTimeout">命令超时时间,一般用于需要长时间执行的命令</param>
        /// <returns></returns>
        public Int32 Execute(String sql, Int32 commandTimeout)
        {
            return ExecuteByCache(sql, commandTimeout, "", (s, t, p) =>
            {
                using var cmd = Session.CreateCommand(s);
                if (t > 0) cmd.CommandTimeout = t;
                return Session.Execute(cmd);
            });
        }

        /// <summary>执行SQL语句,返回结果中的第一行第一列</summary>
        /// <typeparam name="T">返回类型</typeparam>
        /// <param name="sql">SQL语句</param>
        /// <param name="type">命令类型,默认SQL文本</param>
        /// <param name="ps">命令参数</param>
        /// <returns></returns>
        public T ExecuteScalar<T>(String sql, CommandType type, IDictionary<String, Object> ps)
        {
            return ExecuteByCache(sql, type, ps, (s, t, p) => Session.ExecuteScalar<T>(s, t, Db.CreateParameters(p)));
        }
        #endregion

        #region 异步操作
        /// <summary>执行SQL查询,返回记录集</summary>
        /// <param name="builder">SQL语句</param>
        /// <param name="startRowIndex">开始行,0表示第一行</param>
        /// <param name="maximumRows">最大返回行数,0表示所有行</param>
        /// <returns></returns>
        public Task<DbTable> QueryAsync(SelectBuilder builder, Int64 startRowIndex, Int64 maximumRows)
        {
            return QueryByCacheAsync(builder, startRowIndex, maximumRows, (sb, start, max) =>
            {
                sb = PageSplit(sb, start, max);
                return Session.QueryAsync(sb.ToString(), sb.Parameters.ToArray());
            }, nameof(QueryAsync));
        }

        /// <summary>执行SQL查询,返回记录集</summary>
        /// <param name="sql">SQL语句</param>
        /// <param name="ps">命令参数</param>
        /// <returns></returns>
        public Task<DbTable> QueryAsync(String sql, IDictionary<String, Object> ps = null)
        {
            return QueryByCacheAsync(sql, ps, "", (s, p, k3) => Session.QueryAsync(s, Db.CreateParameters(p)), nameof(QueryAsync));
        }

        /// <summary>执行SQL查询,返回总记录数</summary>
        /// <param name="sb">查询生成器</param>
        /// <returns></returns>
        public Task<Int64> SelectCountAsync(SelectBuilder sb)
        {
            return QueryByCacheAsync(sb, "", "", (s, k2, k3) => Session.QueryCountAsync(s), nameof(SelectCountAsync));
        }

        /// <summary>执行SQL查询,返回总记录数</summary>
        /// <param name="sql">SQL语句</param>
        /// <param name="type">命令类型,默认SQL文本</param>
        /// <param name="ps">命令参数</param>
        /// <returns></returns>
        public Task<Int64> SelectCountAsync(String sql, CommandType type, params IDataParameter[] ps)
        {
            return QueryByCacheAsync(sql, type, ps, (s, t, p) => Session.QueryCountAsync(s, t, p), nameof(SelectCountAsync));
        }

        /// <summary>执行SQL语句,返回受影响的行数</summary>
        /// <param name="sql">SQL语句</param>
        /// <returns></returns>
        public Task<Int32> ExecuteAsync(String sql)
        {
            return ExecuteByCacheAsync(sql, "", "", (s, t, p) => Session.ExecuteAsync(s));
        }

        /// <summary>执行插入语句并返回新增行的自动编号</summary>
        /// <param name="sql"></param>
        /// <returns>新增行的自动编号</returns>
        public Task<Int64> InsertAndGetIdentityAsync(String sql)
        {
            return ExecuteByCacheAsync(sql, "", "", (s, t, p) => Session.InsertAndGetIdentityAsync(s));
        }

        /// <summary>执行SQL语句,返回受影响的行数</summary>
        /// <param name="sql">SQL语句</param>
        /// <param name="type">命令类型,默认SQL文本</param>
        /// <param name="ps">命令参数</param>
        /// <returns></returns>
        public Task<Int32> ExecuteAsync(String sql, CommandType type, params IDataParameter[] ps)
        {
            return ExecuteByCacheAsync(sql, type, ps, (s, t, p) => Session.ExecuteAsync(s, t, p));
        }

        /// <summary>执行插入语句并返回新增行的自动编号</summary>
        /// <param name="sql"></param>
        /// <param name="type">命令类型,默认SQL文本</param>
        /// <param name="ps">命令参数</param>
        /// <returns>新增行的自动编号</returns>
        public Task<Int64> InsertAndGetIdentityAsync(String sql, CommandType type, params IDataParameter[] ps)
        {
            return ExecuteByCacheAsync(sql, type, ps, (s, t, p) => Session.InsertAndGetIdentityAsync(s, t, p));
        }

        /// <summary>执行SQL语句,返回受影响的行数</summary>
        /// <param name="sql">SQL语句</param>
        /// <param name="type">命令类型,默认SQL文本</param>
        /// <param name="ps">命令参数</param>
        /// <returns></returns>
        public Task<Int32> ExecuteAsync(String sql, CommandType type, IDictionary<String, Object> ps)
        {
            return ExecuteByCacheAsync(sql, type, ps, (s, t, p) => Session.ExecuteAsync(s, t, Db.CreateParameters(p)));
        }

        /// <summary>执行SQL语句,返回受影响的行数</summary>
        /// <param name="sql">SQL语句</param>
        /// <param name="commandTimeout">命令超时时间,一般用于需要长时间执行的命令</param>
        /// <returns></returns>
        public Task<Int32> ExecuteAsync(String sql, Int32 commandTimeout)
        {
            return ExecuteByCacheAsync(sql, commandTimeout, "", (s, t, p) =>
            {
                using var cmd = Session.CreateCommand(s);
                if (t > 0) cmd.CommandTimeout = t;
                return Session.ExecuteAsync(cmd);
            });
        }

        /// <summary>执行SQL语句,返回结果中的第一行第一列</summary>
        /// <typeparam name="T">返回类型</typeparam>
        /// <param name="sql">SQL语句</param>
        /// <param name="type">命令类型,默认SQL文本</param>
        /// <param name="ps">命令参数</param>
        /// <returns></returns>
        public Task<T> ExecuteScalarAsync<T>(String sql, CommandType type, IDictionary<String, Object> ps)
        {
            return ExecuteByCacheAsync(sql, type, ps, (s, t, p) => Session.ExecuteScalarAsync<T>(s, t, Db.CreateParameters(p)));
        }
        #endregion

        #region 事务
        /// <summary>开始事务</summary>
        /// <remarks>
        /// Read Uncommitted: 允许读取脏数据,一个事务能看到另一个事务还没有提交的数据。(不会阻止其它操作)
        /// Read Committed: 确保事务读取的数据都必须是已经提交的数据。它限制了读取中间的,没有提交的,脏的数据。
        /// 但是它不能确保当事务重新去读取的时候,读的数据跟上次读的数据是一样的,也就是说当事务第一次读取完数据后,
        /// 该数据是可能被其他事务修改的,当它再去读取的时候,数据可能是不一样的。(数据隐藏,不阻止)
        /// Repeatable Read: 是一个更高级别的隔离级别,如果事务再去读取同样的数据,先前的数据是没有被修改过的。(阻止其它修改)
        /// Serializable: 它做出了最有力的保证,除了每次读取的数据是一样的,它还确保每次读取没有新的数据。(阻止其它添删改)
        /// </remarks>
        /// <param name="level">事务隔离等级</param>
        /// <returns>剩下的事务计数</returns>
        public Int32 BeginTransaction(IsolationLevel level = IsolationLevel.ReadCommitted)
        {
            //CheckDatabase();

            return Session.BeginTransaction(level);
        }

        /// <summary>提交事务</summary>
        /// <returns>剩下的事务计数</returns>
        public Int32 Commit() => Session.Commit();

        /// <summary>回滚事务,忽略异常</summary>
        /// <returns>剩下的事务计数</returns>
        public Int32 Rollback() => Session.Rollback();
        #endregion

        #region 缓存
        /// <summary>缓存存储</summary>
        public ICache Store { get; set; }

        /// <summary>数据层缓存。默认10秒</summary>
        public Int32 Expire { get; set; }

        private ICache GetCache()
        {
            var st = Store;
            if (st != null) return st;

            var exp = Expire;
            if (exp == 0) exp = Db.DataCache;
            if (exp == 0) exp = Setting.Current.DataCacheExpire;
            if (exp <= 0) return null;

            Expire = exp;

            lock (this)
            {
                if (Store == null)
                {
                    var p = exp / 2;
                    if (p < 30) p = 30;

                    st = Store = new MemoryCache { Period = p, Expire = exp };
                }
            }

            return st;
        }

        private TResult QueryByCache<T1, T2, T3, TResult>(T1 k1, T2 k2, T3 k3, Func<T1, T2, T3, TResult> callback, String action)
        {
            // 读写分离
            if (Strategy != null)
            {
                if (Strategy.Validate(this, k1 + "", action)) return ReadOnly.QueryByCache(k1, k2, k3, callback, action);
            }

            //CheckDatabase();

            // 读缓存
            var cache = GetCache();
            var key = "";
            if (cache != null)
            {
                var sb = Pool.StringBuilder.Get();
                if (!action.IsNullOrEmpty())
                {
                    sb.Append(action);
                    sb.Append('#');
                }
                Append(sb, k1);
                Append(sb, k2);
                Append(sb, k3);
                key = sb.Put(true);

                if (cache.TryGetValue<TResult>(key, out var value)) return value;
            }

            Interlocked.Increment(ref _QueryTimes);
            var rs = Invoke(k1, k2, k3, callback, action);

            cache?.Set(key, rs, Expire);

            return rs;
        }

        private TResult ExecuteByCache<T1, T2, T3, TResult>(T1 k1, T2 k2, T3 k3, Func<T1, T2, T3, TResult> callback)
        {
            if (Db.Readonly) throw new InvalidOperationException($"数据连接[{ConnName}]只读,禁止执行{k1}");

            //CheckDatabase();

            var rs = Invoke(k1, k2, k3, callback, "Execute");

            GetCache()?.Clear();

            Interlocked.Increment(ref _ExecuteTimes);

            return rs;
        }

        private TResult Invoke<T1, T2, T3, TResult>(T1 k1, T2 k2, T3 k3, Func<T1, T2, T3, TResult> callback, String action)
        {
            var tracer = Tracer ?? GlobalTracer;
            var traceName = "";
            var sql = "";

            // 从sql解析表名,作为跟踪名一部分。正则避免from前后换行的情况
            if (tracer != null)
            {
                sql = (k1 + "").Trim();
                if (action == "Execute")
                {
                    // 使用 Insert/Update/Delete 作为埋点操作名
                    var p = sql.IndexOf(' ');
                    if (p > 0) action = sql[..p];
                }

                traceName = $"db:{ConnName}:{action}";

                var tables = GetTables(sql);
                if (tables.Length > 0) traceName += ":" + tables.Join("-");
            }

            // 使用k1参数作为tag,一般是sql
            var span = tracer?.NewSpan(traceName, sql);
            try
            {
                return callback(k1, k2, k3);
            }
            catch (Exception ex)
            {
                span?.SetError(ex, null);
                throw;
            }
            finally
            {
                span?.Dispose();
            }
        }

        private async Task<TResult> QueryByCacheAsync<T1, T2, T3, TResult>(T1 k1, T2 k2, T3 k3, Func<T1, T2, T3, Task<TResult>> callback, String action)
        {
            // 读写分离
            if (Strategy != null)
            {
                if (Strategy.Validate(this, k1 + "", action)) return await ReadOnly.QueryByCacheAsync(k1, k2, k3, callback, action);
            }

            //CheckDatabase();

            // 读缓存
            var cache = GetCache();
            var key = "";
            if (cache != null)
            {
                var sb = Pool.StringBuilder.Get();
                if (!action.IsNullOrEmpty())
                {
                    sb.Append(action);
                    sb.Append('#');
                }
                Append(sb, k1);
                Append(sb, k2);
                Append(sb, k3);
                key = sb.Put(true);

                if (cache.TryGetValue<TResult>(key, out var value)) return value;
            }

            Interlocked.Increment(ref _QueryTimes);
            var rs = await InvokeAsync(k1, k2, k3, callback, action);

            cache?.Set(key, rs, Expire);

            return rs;
        }

        private async Task<TResult> ExecuteByCacheAsync<T1, T2, T3, TResult>(T1 k1, T2 k2, T3 k3, Func<T1, T2, T3, Task<TResult>> callback)
        {
            if (Db.Readonly) throw new InvalidOperationException($"数据连接[{ConnName}]只读,禁止执行{k1}");

            //CheckDatabase();

            var rs = await InvokeAsync(k1, k2, k3, callback, "Execute");

            GetCache()?.Clear();

            Interlocked.Increment(ref _ExecuteTimes);

            return rs;
        }

        private Task<TResult> InvokeAsync<T1, T2, T3, TResult>(T1 k1, T2 k2, T3 k3, Func<T1, T2, T3, Task<TResult>> callback, String action)
        {
            var tracer = Tracer ?? GlobalTracer;
            var traceName = "";
            var sql = "";

            // 从sql解析表名,作为跟踪名一部分。正则避免from前后换行的情况
            if (tracer != null)
            {
                sql = (k1 + "").Trim();
                if (action == "Execute")
                {
                    // 使用 Insert/Update/Delete 作为埋点操作名
                    var p = sql.IndexOf(' ');
                    if (p > 0) action = sql[..p];
                }

                traceName = $"db:{ConnName}:{action}";

                var tables = GetTables(sql);
                if (tables.Length > 0) traceName += ":" + tables.Join("-");
            }

            // 使用k1参数作为tag,一般是sql
            var span = tracer?.NewSpan(traceName, sql);
            try
            {
                return callback(k1, k2, k3);
            }
            catch (Exception ex)
            {
                span?.SetError(ex, null);
                throw;
            }
            finally
            {
                span?.Dispose();
            }
        }

        private static readonly Regex reg_table = new("(?:\\s+from|insert\\s+into|update|\\s+join|drop\\s+table|truncate\\s+table)\\s+[`'\"\\[]?([\\w]+)[`'\"\\[]?", RegexOptions.Compiled | RegexOptions.IgnoreCase);
        /// <summary>从Sql语句中截取表名</summary>
        /// <param name="sql"></param>
        /// <returns></returns>
        public static String[] GetTables(String sql)
        {
            var list = new List<String>();
            var ms = reg_table.Matches(sql);
            foreach (Match item in ms)
            {
                list.Add(item.Groups[1].Value);
            }
            return list.ToArray();
        }

        private static void Append(StringBuilder sb, Object value)
        {
            if (value == null) return;

            if (value is SelectBuilder builder)
            {
                sb.Append(builder);
                foreach (var item in builder.Parameters)
                {
                    sb.Append('#');
                    sb.Append(item.ParameterName);
                    sb.Append('#');
                    sb.Append(item.Value);
                }
            }
            else if (value is IDataParameter[] ps)
            {
                foreach (var item in ps)
                {
                    sb.Append('#');
                    sb.Append(item.ParameterName);
                    sb.Append('#');
                    sb.Append(item.Value);
                }
            }
            else if (value is IDictionary<String, Object> dic)
            {
                foreach (var item in dic)
                {
                    sb.Append('#');
                    sb.Append(item.Key);
                    sb.Append('#');
                    sb.Append(item.Value);
                }
            }
            else
            {
                sb.Append('#');
                sb.Append(value);
            }
        }
        #endregion
    }
}