发布0115
大石头 authored at 2020-01-15 01:11:37
15.71 KiB
X
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NewLife.Collections;
using NewLife.Data;
using NewLife.Model;
using NewLife.Reflection;
using NewLife.Serialization;

namespace XCode.DataAccessLayer
{
    partial class DAL
    {
        #region 备份
        /// <summary>备份单表数据</summary>
        /// <remarks>
        /// 最大支持21亿行
        /// </remarks>
        /// <param name="table">数据表</param>
        /// <param name="stream">目标数据流</param>
        /// <param name="progress">进度回调,参数为已处理行数和当前页表</param>
        /// <returns></returns>
        public Int32 Backup(String table, Stream stream, Action<Int32, DbTable> progress = null)
        {
            var writeFile = new WriteFileActor
            {
                Stream = stream,

                // 最多同时堆积数页
                BoundedCapacity = 4,
            };
            //writeFile.Start();

            var sb = new SelectBuilder
            {
                Table = Db.FormatTableName(table)
            };

            // 总行数
            writeFile.Total = SelectCount(sb);
            WriteLog("备份[{0}/{1}]开始,共[{2:n0}]行", table, ConnName, writeFile.Total);

            var row = 0;
            var pageSize = 10_000;
            var total = 0;
            var sw = Stopwatch.StartNew();
            while (true)
            {
                // 分页
                var sb2 = PageSplit(sb, row, pageSize);

                // 查询数据
                var dt = Session.Query(sb2.ToString(), null);
                if (dt == null) break;

                var count = dt.Rows.Count;
                WriteLog("备份[{0}/{1}]数据 {2:n0} + {3:n0}", table, ConnName, row, count);

                // 进度报告
                progress?.Invoke(row, dt);

                // 消费数据
                writeFile.Tell(dt);

                // 下一页
                total += count;
                if (count < pageSize) break;
                row += pageSize;
            }

            // 通知写入完成
            writeFile.Stop(-1);

            sw.Stop();
            var ms = sw.Elapsed.TotalMilliseconds;
            WriteLog("备份[{0}/{1}]完成,共[{2:n0}]行,耗时{3:n0}ms,速度{4:n0}tps", table, ConnName, total, ms, total * 1000L / ms);

            // 返回总行数
            return total;
        }

        /// <summary>备份单表数据到文件</summary>
        /// <param name="table"></param>
        /// <param name="file"></param>
        /// <returns></returns>
        public Int32 Backup(String table, String file = null)
        {
            if (file.IsNullOrEmpty()) file = table + ".table";

            var file2 = file.GetFullPath();
            file2.EnsureDirectory(true);

            WriteLog("备份[{0}/{1}]到文件 {2}", table, ConnName, file2);

            using (var fs = new FileStream(file2, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite))
            {
                var rs = 0;
                if (file.EndsWithIgnoreCase(".gz"))
                {
#if NET4
                    using (var gs = new GZipStream(fs, CompressionMode.Compress, true))
#else
                    using (var gs = new GZipStream(fs, CompressionLevel.Optimal, true))
#endif
                    {
                        rs = Backup(table, gs);
                    }
                }
                else
                {
                    rs = Backup(table, fs);
                }

                // 截断文件
                fs.SetLength(fs.Position);

                return rs;
            }
        }

        /// <summary>备份一批表到指定目录</summary>
        /// <param name="tables"></param>
        /// <param name="dir"></param>
        /// <param name="backupSchema">备份架构</param>
        /// <returns></returns>
        public IDictionary<String, Int32> BackupAll(String[] tables, String dir, Boolean backupSchema = true)
        {
            var dic = new Dictionary<String, Int32>();

            IList<IDataTable> tbls = null;
            if (tables == null)
            {
                tbls = Tables;
                tables = tbls.Select(e => e.TableName).ToArray();
            }
            if (tables != null && tables.Length > 0)
            {
                // 备份架构
                if (backupSchema)
                {
                    if (tbls == null) tbls = Tables;
                    var bs = tables.Select(e => tbls.FirstOrDefault(t => e.EqualIgnoreCase(t.Name, t.TableName))).Where(e => e != null).ToArray();

                    var xml = Export(bs);
                    dir.EnsureDirectory(false);
                    File.WriteAllText(dir.CombinePath(ConnName + ".xml"), xml);
                }

                foreach (var item in tables)
                {
                    dic[item] = Backup(item, dir.CombinePath(item + ".table"));
                }
            }

            return dic;
        }

        class WriteFileActor : Actor
        {
            public Stream Stream { get; set; }
            public Int32 Total { get; set; }

            private Binary _Binary;
            private Boolean _writeHeader;

            public override Task Start()
            {
                // 二进制读写器
                _Binary = new Binary
                {
                    EncodeInt = true,
                    Stream = Stream,
                };

                return base.Start();
            }

            protected override void Receive(ActorContext context)
            {
                var dt = context.Message as DbTable;
                var bn = _Binary;

                // 写头部结构。没有数据时可以备份结构
                if (!_writeHeader)
                {
                    dt.Total = Total;
                    dt.WriteHeader(bn);

                    // 输出日志
                    var cs = dt.Columns;
                    var ts = dt.Types;
                    WriteLog("字段[{0}]:{1}", cs.Length, cs.Join());
                    WriteLog("类型[{0}]:{1}", ts.Length, ts.Join(",", e => e.Name));

                    _writeHeader = true;
                }

                var rs = dt.Rows;
                if (rs == null || rs.Count == 0) return;

                // 写入文件
                dt.WriteData(bn);
            }
        }
        #endregion

        #region 恢复
        /// <summary>从数据流恢复数据</summary>
        /// <param name="stream">数据流</param>
        /// <param name="table">数据表</param>
        /// <param name="progress">进度回调,参数为已处理行数和当前页表</param>
        /// <returns></returns>
        public Int32 Restore(Stream stream, IDataTable table, Action<Int32, DbTable> progress = null)
        {
            var writeDb = new WriteDbActor
            {
                Table = table,
                Dal = this,

                // 最多同时堆积数页
                BoundedCapacity = 4,
            };
            //writeDb.Start();

            var sw = Stopwatch.StartNew();

            // 二进制读写器
            var bn = new Binary
            {
                EncodeInt = true,
                Stream = stream,
            };

            var dt = new DbTable();
            dt.ReadHeader(bn);
            WriteLog("恢复[{0}/{1}]开始,共[{2:n0}]行", table.Name, ConnName, dt.Total);

            // 输出日志
            var cs = dt.Columns;
            var ts = dt.Types;
            WriteLog("字段[{0}]:{1}", cs.Length, cs.Join());
            WriteLog("类型[{0}]:{1}", ts.Length, ts.Join(",", e => e.Name));

            var row = 0;
            var pageSize = 10_000;
            var total = 0;
            while (true)
            {
                // 读取数据
                var count = dt.ReadData(bn, Math.Min(dt.Total - row, pageSize));

                var rs = dt.Rows;
                if (rs == null || rs.Count == 0) break;

                WriteLog("恢复[{0}/{1}]数据 {2:n0} + {3:n0}", table.Name, ConnName, row, rs.Count);

                // 进度报告
                progress?.Invoke(row, dt);

                // 批量写入数据库。克隆对象,避免被修改
                writeDb.Tell(dt.Clone());

                // 下一页
                total += count;
                if (count < pageSize) break;
                row += pageSize;
            }

            // 通知写入完成
            writeDb.Stop(-1);

            sw.Stop();
            var ms = sw.Elapsed.TotalMilliseconds;
            WriteLog("恢复[{0}/{1}]完成,共[{2:n0}]行,耗时{3:n0}ms,速度{4:n0}tps", table.Name, ConnName, total, ms, total * 1000L / ms);

            // 返回总行数
            return total;
        }

        /// <summary>从文件恢复数据</summary>
        /// <param name="file"></param>
        /// <param name="table"></param>
        /// <returns></returns>
        public Int64 Restore(String file, IDataTable table = null)
        {
            if (file.IsNullOrEmpty() || !File.Exists(file)) return 0;

            if (table == null)
            {
                var name = Path.GetFileNameWithoutExtension(file);
                table = Tables.FirstOrDefault(e => name.EqualIgnoreCase(e.Name, e.TableName));
            }
            else
                SetTables(table);
            if (table == null) return 0;

            var file2 = file.GetFullPath();
            file2.EnsureDirectory(true);

            WriteLog("恢复[{2}]到[{0}/{1}]", table, ConnName, file);

            var compressed = file.EndsWithIgnoreCase(".gz");
            return file2.AsFile().OpenRead(compressed, s => Restore(s, table));
            //using (var fs = new FileStream(file2, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
            //{
            //    if (file.EndsWithIgnoreCase(".gz"))
            //    {
            //        using (var gs = new GZipStream(fs, CompressionMode.Decompress, true))
            //        {
            //            return Restore(gs, table);
            //        }
            //    }
            //    else
            //    {
            //        return Restore(fs, table);
            //    }
            //}
        }

        /// <summary>从指定目录恢复一批数据到目标库</summary>
        /// <param name="dir"></param>
        /// <param name="tables"></param>
        /// <returns></returns>
        public IDictionary<String, Int64> RestoreAll(String dir, IDataTable[] tables = null)
        {
            var dic = new Dictionary<String, Int64>();
            if (dir.IsNullOrEmpty() || !Directory.Exists(dir)) return dic;

            if (tables == null)
            {
                var schm = dir.AsDirectory().GetAllFiles("*.xml").FirstOrDefault();
                var tbls = schm != null ? ImportFrom(schm.FullName) : Tables;
                var ts = new List<IDataTable>();
                foreach (var item in dir.AsDirectory().GetFiles("*.table"))
                {
                    var name = Path.GetFileNameWithoutExtension(item.Name);
                    var tb = tbls.FirstOrDefault(e => name.EqualIgnoreCase(e.Name, e.TableName));
                    if (tb != null) ts.Add(tb);
                }
                tables = ts.ToArray();
            }
            if (tables != null && tables.Length > 0)
            {
                foreach (var item in tables)
                {
                    dic[item.Name] = Restore(dir.CombinePath(item.Name + ".table"), item);
                }
            }

            return dic;
        }

        class WriteDbActor : Actor
        {
            public DAL Dal { get; set; }
            public IDataTable Table { get; set; }

            private String _TableName;
            private IDataColumn[] _Columns;

            protected override void Receive(ActorContext context)
            {
                if (!(context.Message is DbTable dt)) return;

                // 匹配要写入的列
                if (_TableName == null)
                {
                    _TableName = Dal.Db.FormatTableName(Table.TableName);
                    _Columns = Table.GetColumns(dt.Columns);
                }

                // 批量插入
                var ds = new List<IIndexAccessor>();
                foreach (var item in dt)
                {
                    ds.Add(item);
                }
                Dal.Session.Insert(_TableName, _Columns, ds);
            }
        }
        #endregion

        #region 同步
        /// <summary>同步单表数据</summary>
        /// <remarks>
        /// 把数据同一张表同步到另一个库
        /// </remarks>
        /// <param name="table">数据表</param>
        /// <param name="connName">目标连接名</param>
        /// <param name="syncSchema">同步架构</param>
        /// <param name="progress">进度回调,参数为已处理行数和当前页表</param>
        /// <returns></returns>
        public Int32 Sync(IDataTable table, String connName, Boolean syncSchema = true, Action<Int32, DbTable> progress = null)
        {
            var dal = connName.IsNullOrEmpty() ? null : Create(connName);

            var writeDb = new WriteDbActor
            {
                Table = table,
                Dal = dal,

                // 最多同时堆积数页
                BoundedCapacity = 4,
            };

            var sw = Stopwatch.StartNew();

            // 表结构
            if (syncSchema) dal.SetTables(table);

            var sb = new SelectBuilder
            {
                Table = Db.FormatTableName(table.TableName)
            };

            var row = 0;
            var pageSize = 10_000;
            var total = 0;
            while (true)
            {
                // 分页
                var sb2 = PageSplit(sb, row, pageSize);

                // 查询数据
                var dt = Session.Query(sb2.ToString(), null);
                if (dt == null) break;

                var count = dt.Rows.Count;
                WriteLog("同步[{0}/{1}]数据 {2:n0} + {3:n0}", table.Name, ConnName, row, count);

                // 进度报告
                progress?.Invoke(row, dt);

                // 消费数据
                writeDb.Tell(dt);

                // 下一页
                total += count;
                if (count < pageSize) break;
                row += pageSize;
            }

            // 通知写入完成
            writeDb.Stop(-1);

            sw.Stop();
            var ms = sw.Elapsed.TotalMilliseconds;
            WriteLog("同步[{0}/{1}]完成,共[{2:n0}]行,耗时{3:n0}ms,速度{4:n0}tps", table.Name, ConnName, total, ms, total * 1000L / ms);

            // 返回总行数
            return total;
        }

        /// <summary>备份一批表到另一个库</summary>
        /// <param name="tables">表名集合</param>
        /// <param name="connName">目标连接名</param>
        /// <param name="syncSchema">同步架构</param>
        /// <returns></returns>
        public IDictionary<String, Int32> SyncAll(IDataTable[] tables, String connName, Boolean syncSchema = true)
        {
            var dic = new Dictionary<String, Int32>();

            if (tables == null) tables = Tables.ToArray();
            if (tables != null && tables.Length > 0)
            {
                // 同步架构
                if (syncSchema)
                {
                    var dal = connName.IsNullOrEmpty() ? null : Create(connName);
                    dal.SetTables(tables);
                }

                foreach (var item in tables)
                {
                    dic[item.Name] = Sync(item, connName, false);
                }
            }

            return dic;
        }
        #endregion
    }
}