9.8.2018.0630
大石头 authored at 2018-06-30 11:15:32
7.52 KiB
X
using System;
using System.Collections.Generic;
using System.Data.Common;
using NewLife.Security;
using XCode.Exceptions;

namespace XCode.DataAccessLayer
{
    /// <summary>
    /// 分布式数据库。同时操作多个数据库,多读多写,读写分析。
    /// 支持:
    /// 1,一主多从写入(主库同步写入从库异步写入);
    /// 2,多主多从写入(主库同步写入从库异步写入);
    /// 3,按权重分布式读取;
    /// </summary>
    /// <remarks>
    /// 1,通过连接字符串配置读写服务器组,并加上权重,如“WriteServer='connA*1,connC*0' ReadServer='connB*8,connD'”;
    /// 2,对于写服务器,权重大于0表示作为主服务器,操作返回值取主服务器操作总和,等于0表示作为从服务器,采用异步方式写入,不设置权重表示0,全部不设置权重表示1;
    /// 3,对于读服务器,默认根据权重进行随机分配,不设置表示1;
    /// 4,对于读服务器,可优先考虑最近使用的数据库
    /// </remarks>
    class Distributed : DbBase
    {
        #region 属性
        /// <summary>返回数据库类型。</summary>
        public override DatabaseType Type
        {
            get { return DatabaseType.Distributed; }
        }

        /// <summary>工厂</summary>
        public override DbProviderFactory Factory
        {
            get { throw new NotSupportedException(); }
        }
        #endregion

        #region 方法
        /// <summary>创建数据库会话</summary>
        /// <returns></returns>
        protected override IDbSession OnCreateSession() { return new DistributedDbSession(this); }

        /// <summary>创建元数据对象</summary>
        /// <returns></returns>
        protected override IMetaData OnCreateMetaData()
        {
            return new DistributedDbMetaData();
        }
        #endregion

        #region 读写服务器集合
        private Server[] _WriteServers;
        /// <summary>写入服务器集合</summary>
        public Server[] WriteServers { get { return _WriteServers; } set { _WriteServers = value; } }

        private Server[] _ReadServers;
        /// <summary>读取服务器集合</summary>
        public Server[] ReadServers { get { return _ReadServers; } set { _ReadServers = value; } }

        const String ExceptionMessage1 = "缺少写入服务器配置WriteServer!";
        const String ExceptionMessage2 = "缺少读取服务器配置ReadServer!";

        void LoadConfig(String connStr)
        {
            if (String.IsNullOrEmpty(connStr)) return;

            var builder = new XDbConnectionStringBuilder();
            builder.ConnectionString = connStr;

            var ws = "";
            var rs = "";
            if (!builder.TryGetValue("WriteServer", out ws) || String.IsNullOrEmpty(ws)) throw new XDbException(this, ExceptionMessage1);
            if (!builder.TryGetValue("ReadServer", out rs) || String.IsNullOrEmpty(rs)) throw new XDbException(this, ExceptionMessage2);

            #region 加载写入服务器
            String[] ss = ws.Split(new Char[] { ',' }, StringSplitOptions.RemoveEmptyEntries);
            if (ss == null || ss.Length < 1) throw new XDbException(this, ExceptionMessage1);

            List<Server> list = new List<Server>();
            foreach (String item in ss)
            {
                String name = item.Trim();
                if (String.IsNullOrEmpty(name)) continue;

                Int32 p = name.IndexOf("*");
                if (p > 0)
                    list.Add(new Server(name.Substring(0, p), Int32.Parse(name.Substring(p + 1))));
                else
                    list.Add(new Server(name, 0));
            }
            // 按权重降序
            list.Sort(delegate (Server item1, Server item2) { return -1 * item1.Weight.CompareTo(item2.Weight); });
            WriteServers = list.ToArray();
            #endregion

            #region 加载读取服务器
            ss = rs.Split(new Char[] { ',' }, StringSplitOptions.RemoveEmptyEntries);
            if (ss == null || ss.Length < 1) throw new XDbException(this, ExceptionMessage2);

            list.Clear();
            foreach (String item in ss)
            {
                String name = item.Trim();
                if (String.IsNullOrEmpty(name)) continue;

                Int32 p = name.IndexOf("*");
                if (p > 0)
                    list.Add(new Server(name.Substring(0, p), Int32.Parse(name.Substring(p + 1))));
                else
                    list.Add(new Server(name, 1));
            }
            // 按权重降序
            list.Sort(delegate (Server item1, Server item2) { return -1 * item1.Weight.CompareTo(item2.Weight); });
            ReadServers = list.ToArray();
            #endregion
        }

        #region 服务器配置
        public class Server
        {
            private String _ConnName;
            /// <summary>连接名</summary>
            public String ConnName
            {
                get { return _ConnName; }
                set { _ConnName = value; }
            }

            private Int32 _Weight;
            /// <summary>权重</summary>
            public Int32 Weight
            {
                get { return _Weight; }
                set { _Weight = value; }
            }

            private IDatabase _Db;
            /// <summary>数据库对象</summary>
            public IDatabase Db
            {
                get
                {
                    if (_Db == null)
                    {
                        _Db = DAL.Create(ConnName).Db;
                    }
                    return _Db;
                }
                //set { _Db = value; }
            }

            public Server(String connname, Int32 weight)
            {
                ConnName = connname;
                Weight = weight;
            }
        }
        #endregion
        #endregion

        #region 获取数据库操作接口
        private Int32 _Inited = 0;
        /// <summary>初始化</summary>
        void Init()
        {
            if (_Inited > 0) return;
            _Inited++;

            LoadConfig(ConnectionString);
        }

        //Random _Rnd;
        ///// <summary>随机数产生器</summary>
        ///// <returns></returns>
        //Random GetRnd()
        //{
        //    if (_Rnd == null) _Rnd = new Random((Int32)DateTime.Now.Ticks);
        //    return _Rnd;
        //}

        /// <summary>获取一个用于读取的数据库对象</summary>
        /// <returns></returns>
        public IDatabase GetReadDb()
        {
            Init();

            // 计算权重总和
            Int32 weight = 0;
            foreach (Server item in ReadServers)
            {
                weight += item.Weight;
            }

            // 随机抽取
            var index = Rand.Next(0, weight);
            foreach (Server item in ReadServers)
            {
                if (index < item.Weight) return item.Db;

                index -= item.Weight;
            }

            throw new XCodeException("设计错误,不应该到达这里!");
        }
        #endregion
    }

    class DistributedDbSession : DbSession
    {
        #region 构造函数
        public DistributedDbSession(IDatabase db) : base(db) { }
        #endregion
    }

    class DistributedDbMetaData : DbMetaData
    {

    }
}