• 为什么我不推荐面向SQL开发
  • 发布于 2个月前
  • 195 热度
    0 评论
  • Cactus
  • 20 粉丝 36 篇博客
  •   
在老东家工作 3 年了,公司的业务和技术栈相对熟练得差不多了。领导觉得我能够委以重任,便把一个新项目交给我负责,另外指派一名同事协助我。项目的重点在于数据的交互比较多,以及每天大量的数据同步和批量操作,不能出错。队友建议以短、平、快为主,能够使用已有现成的技术就用现成的技术。直接面向过程开发是人们最为舒适,是人为本能的习惯。由于他有这一种能够处理好的决心,便把数据批量操作这块委托于他。

查看了以往公司现成一些写法,一部分是直接面向 SQL 写法批量插入,面对增量同步则先查出,存在的更新,不存在的插入。一部分是通过 Kafka 和后台任务原子操作。理论上这么操作结果也能成,但是看到修改记录,我就知道面临的需求变了很多变化很快,导致大量的更改。私底下询问负责人也了解出了太多问题,原本一劳永逸赶紧写完结果反而投入了更多的精力和时间。

出于预防心理,也对那位同事进行了提醒并且加以思考再下手。不到一个月,我们就把项目上线了,并且没有出现数据上的错误,得到了领导的表扬。我们也提前收场,做一些小的优化,其余时间在摸鱼。一段时间之后,麻烦便接踵而至,其一就是开始数据量暴增,那位同事在做增量同步时进行了锁表操作,批量操作需要一些时间,在前台读取时出现响应超时。

其二就是增量同步要调整,以主库或第三方来源库为主,出现数据更新和删除的需要同步操作。

同事目前的主力放在了新项目上,把一些零散的时间用来调整需求和 bug,结果越处理,bug 出现的越多,不是数量过多卡死就是变量不对导致数据处理不对。于是到了某一时刻终于爆发,领导找到我俩,被痛批一顿,工作这么久就只会 CRUD 操作,来的实习生都会干的活,还养你们干什么。

当然,要复盘的话当然有迹可循。我想碰见这种情况还真不少,首次开发项目时一鼓作气,以“短、平、快” 战术面向过程开发,短时间内上线。但是,一个软件的生命周期可不止步于上线,还要过程运维以及面对变化。导致在二次开发的时候就脱节了,要么当时写法不符合现有业务,要么改动太多动不动就割到了大动脉大出血,要么人跑了...

所以我们会采用面向对象,抽象化编程,就是用来保稳定,预留一部分来应付变化,避免牵一发而动全身。挨完骂,也要开始收拾烂摊子。于是我打算重新组装一个通用的方法,打算一劳永逸。

首先我们定义一个接口通用思维 IDbAsyncBulk。由于源码已经发布到了github,所以一些注释写成了英文,大致也能看出蹩脚英文的注释。
public interface IDbAsyncBulk
    {
        /// <summary>
        /// default init.
        /// use reflect to auto init all type, to lower case database fileds,and  default basic type.
        /// if ignore some fileds,please use DbBulk,Ignore property to remarkable fileds.
        /// if other operating,need user-defined to init operate.
        /// </summary>
        /// <typeparam name="T">Corresponding type</typeparam>
        Task InitDefaultMappings<T>();

        /// <summary>
        /// batch operating
        /// </summary>
        /// <typeparam name="T">will operate object entity type.</typeparam>
        /// <param name="connection_string">database connecting string.</param>
        /// <param name="targetTable">target table name. </param>
        /// <param name="list">will operate data list.</param>
        Task CopyToServer<T>(string connection_string, string targetTable, List<T> list);

        /// <summary>
        /// batch operating
        /// </summary>
        /// <typeparam name="T">will operate object entity type.</typeparam>
        /// <param name="connection">database connecting string.need to check database connecting is openning.
        /// if nothing other follow-up operate, shouldn't cover this connecting.</param>
        /// <param name="targetTable">target table name.</param>
        /// <param name="list">will operate data list.</param>
        Task CopyToServer<T>(DbConnection connection, string targetTable, List<T> list);

        /// <summary>
        /// renew as it exists,insert as it not exists.
        /// follow up : 
        /// 1.create temporary table
        /// 2.put data into temporary table.
        /// 3.merge data to target table.
        /// </summary>
        /// <typeparam name="T">data type</typeparam>
        /// <param name="connection_string">connecting string</param>
        /// <param name="keys">mapping orignal table and target table fileds,need primary key and data only,if not will throw error.</param>
        /// <param name="targetTable">target table name.</param>
        /// <param name="list">will operate data list.</param>
        /// <param name="tempTable">put data into temporary table,default name as 'target table name + # or _temp'</param>
        /// <param name="insertmapping">need to insert column,if is null,just use Mapping fileds,in order to avoid auto-create column</param>
        /// <param name="updatemapping">need to modify column,if is null,just use Mapping fileds</param>
        Task MergeToServer<T>(string connection_string, List<string> keys, string targetTable, List<T> list, string tempTable = null, List<string> insertmapping = null, List<string> updatemapping = null);

        /// <summary>
        /// renew as it exists,insert as it not exists.
        /// follow up : 
        /// 1.create temporary table
        /// 2.put data into temporary table.
        /// 3.merge data to target table.
        /// </summary>
        /// <typeparam name="T">data type</typeparam>
        /// <param name="connection">database connecting string.need to check database connecting is openning.</param>
        /// <param name="keys">mapping orignal table and target table fileds,need primary key and data only,if not will throw error.</param>
        /// <param name="targetTable">target table name.</param>
        /// <param name="list">will operate data list.</param>
        /// <param name="tempTable">put data into temporary table,default name as 'target table name + # or _temp'</param>
        /// <param name="insertmapping">need to insert column,if is null,just use Mapping fileds,in order to avoid auto-create column</param>
        /// <param name="updatemapping">need to modify column,if is null,just use Mapping fileds</param>
        Task MergeToServer<T>(DbConnection connection, List<string> keys, string targetTable, List<T> list, string tempTable = null, List<string> insertmapping = null, List<string> updatemapping = null);

        /// <summary>
        ///  batch update operating。
        /// 1.create temporary table
        /// 2.put data into temporary table.
        /// 3.merge data to target table.
        /// </summary>
        /// <typeparam name="T">data type</typeparam>
        /// <param name="connection_string">connecting string</param>
        /// <param name="where_name">matching 'where' compare fileds.</param>
        /// <param name="update_name">need to update fileds.</param>
        /// <param name="targetTable">target table name</param>
        /// <param name="list">will operate data list.</param>
        /// <param name="tempTable">put data into temporary table,default name as 'target table name + # or _temp'</param>
        Task UpdateToServer<T>(string connection_string, List<string> where_name, List<string> update_name, string targetTable, List<T> list, string tempTable = null);

        /// <summary>
        ///  batch update operating。
        /// 1.create temporary table
        /// 2.put data into temporary table.
        /// 3.merge data to target table.
        /// </summary>
        /// <typeparam name="T">data type</typeparam>
        /// <param name="connection_string">connecting string</param>
        /// <param name="where_name">matching 'where' compare fileds.</param>
        /// <param name="update_name">need to update fileds.</param>
        /// <param name="targetTable">target table name</param>
        /// <param name="list">will operate data list.</param>
        /// <param name="tempTable">put data into temporary table,default name as 'target table name + # or _temp'</param>
        /// <param name="createtemp"> create temporary table or not </param>
        Task UpdateToServer<T>(DbConnection connection, List<string> where_name, List<string> update_name, string targetTable, List<T> list, string tempTable = null, bool createtemp = true);

        /// <summary>
        /// renew as it exists,insert as it not exists.original table not exist and  target table exist will remove.
        /// 1.create temporary table
        /// 2.put data into temporary table.
        /// 3.merge data to target table.
        /// 4.will remove data that temporary data not exist and target table exist.
        /// </summary>
        /// <typeparam name="T">data type</typeparam>
        /// <param name="connection_string">connecting string</param>
        /// <param name="keys">mapping orignal table and target table fileds,need primary key and data only,if not will throw error.</param>
        /// <param name="targetTable">target table name</param>
        /// <param name="list">will operate data list.</param>
        /// <param name="tempTable">put data into temporary table,default name as 'target table name + # or _temp'</param>
        /// <param name="insertmapping">need to insert column,if is null,just use Mapping fileds,in order to avoid auto-create column</param>
        /// <param name="updatemapping">need to modify column,if is null,just use Mapping fileds</param>
        Task MergeAndDeleteToServer<T>(string connection_string, List<string> keys, string targetTable, List<T> list, string tempTable = null, List<string> insertmapping = null, List<string> updatemapping = null);

        /// <summary>
        /// renew as it exists,insert as it not exists.original table not exist and  target table exist will remove.
        ///  1.create temporary table
        /// 2.put data into temporary table.
        /// 3.merge data to target table.
        /// 4.will remove data that temporary data not exist and target table exist.
        /// </summary>
        /// <typeparam name="T">data type</typeparam>
        /// <param name="connection">database connecting string.need to check database connecting is openning.</param>
        /// <param name="keys">mapping orignal table and target table fileds,need primary key and data only,if not will throw error.</param>
        /// <param name="targetTable">target table name</param>
        /// <param name="list">will operate data list.</param>
        /// <param name="tempTable">put data into temporary table,default name as 'target table name + # or _temp'</param>
        /// <param name="insertmapping">need to insert column,if is null,just use Mapping fileds,in order to avoid auto-create column</param>
        /// <param name="updatemapping">need to modify column,if is null,just use Mapping fileds</param>
        Task MergeAndDeleteToServer<T>(DbConnection connection, List<string> keys, string targetTable, List<T> list, string tempTable = null, List<string> insertmapping = null, List<string> updatemapping = null);

        /// <summary>
        /// create temporary table
        /// </summary>
        /// <param name="tempTable">create temporary table name</param>
        /// <param name="targetTable">rarget table name</param>
        /// <param name="connection">database connecting</param>
        Task CreateTempTable(string tempTable, string targetTable, DbConnection connection);
    }
解释几个方法的作用:
InitDefaultMappings:初始化映射,将目标表的字段映射到实体,在批量操作时候会根据反射进行一一匹配表字段;
CopyToServer:批量新增,在符合数据表结构时批量复制到目标表,采用官方 SqlBulkCopy 类结合实体简化操作。
MergeToServer:增量同步,需指定唯一键,存在即更新,不存在则插入。支持指定更新字段,指定插入字段。
UpdateToServer:批量更新,需指定 where 条件,以及更新的字段。
MergeAndDeleteToServer:增量同步,以数据源和目标表进行匹配,目标表存在的则更新,不存在的则插入,目标表存在,数据源不存在则目标表移除。
CreateTempTable:创建临时表。

增加实体属性标记,用来标记列名是否忽略同步数据,以及消除数据库别名,大小写的差异。
 /// <summary>
    /// 堆代码 duidaima.com
    /// 数据库批量操作标记,用于标记对象属性。
    /// </summary>
    public class DbBulkAttribute : Attribute
    {
        /// <summary>
        /// 是否忽略。忽略则其余属性不需要设置,不忽略则必须设置Type。
        /// </summary>
        public bool Ignore { get; set; }

        /// <summary>
        /// 列名,不设置则默认为实体字段名小写
        /// </summary>
        public string ColumnName { get; set; }

    }
实现类,目前仅支持 SqlServer 数据库,正在更新 MySql 和 PGSql 中。然后需要定义BatchSize(default 10000)、BulkCopyTimeout (default 300)、ColumnMappings,分别是每批次大小,允许超时时间和映射的字段。
/// <summary>
    /// sql server batch
    /// </summary>
    public class SqlServerAsyncBulk : IDbAsyncBulk
    {
        /// <summary>
        /// log recoding
        /// </summary>
        private ILogger _log;
        /// <summary>
        ///batch insert size(handle a batch every time )。default 10000。
        /// </summary>
        public int BatchSize { get; set; }
        /// <summary>
        /// overtime,default 300
        /// </summary>
        public int BulkCopyTimeout { get; set; }
        /// <summary>
        /// columns mapping
        /// </summary>
        public Dictionary<string, string> ColumnMappings { get; set; }
        /// <summary>
        /// structure function
        /// </summary>
        /// <param name="log"></param>
        public SqlServerAsyncBulk(ILogger<SqlServerAsyncBulk> log)
        {
            _log = log;
            BatchSize = 10000;
            BulkCopyTimeout = 300;
        }
        
        //...to do
使用上也非常的简便,直接在服务里注册单例模式,使用的时候直接依赖注入。
 //if you use SqlServer database, config SqlServerAsyncBulk service.
services.AddSingleton<IDbAsyncBulk, SqlServerAsyncBulk>();
public class BatchOperate
{
  private readonly IDbAsyncBulk _bulk;
  public BatchOperate(IDbAsyncBulk bulk)
  {
    _bulk = bulk;
  }
}
以 user_base 表举两个实例,目前测试几十万数据也才零点几秒。
 public async Task CopyToServerTest()
        {
            var connectStr = @"Data Source=KF009\SQLEXPRESS;Initial Catalog=MockData;User ID=xxx;Password=xxx";
            await _bulk.InitDefaultMappings<UserBaseModel>();
            var mock_list = new List<UserBaseModel>();
            for (var i = 0; i < 1000; i++) {
                mock_list.Add(new UserBaseModel
                {
                    age = i,
                    birthday = DateTime.Now.AddMonths(-i).Date,
                    education = "本科",
                    email = "xiaoyu@163.com",
                    name = $"小榆{i}",
                    nation = "汉",
                    nationality="中国"
                });
            }
            await _bulk.CopyToServer(connectStr, "user_base", mock_list);
        }
public async Task MergeToServerTest()
        {
            var connectStr = @"Data Source=KF009\SQLEXPRESS;Initial Catalog=MockData;User ID=sa;Password=root";
            await _bulk.InitDefaultMappings<UserBaseModel>();
            var mock_list = new List<UserBaseModel>();
            for (var i = 0; i < 1000; i++)
            {
                mock_list.Add(new UserBaseModel
                {
                    age = i,
                    birthday = DateTime.Now.AddMonths(-i).Date,
                    education = "本科",
                    email = "mock@163.com",
                    name = $"小榆{i}",
                    nation = "汉",
                    nationality = "中国"
                });
            }
            var insertMapping = new List<string> { "birthday", "education", "age", "email", "name", "nation", "nationality" };
            var updateMapping = new List<string> { "birthday", "education", "age", "email"};
            await _bulk.MergeToServer(connectStr,new List<string> {"id"}, "user_base", mock_list,null, insertMapping, updateMapping);
到这里,也已经完成了批量数据操作啦,不用再面对大量的sql操作啦。面向 sql 开发一时确实爽,但是面临变化或者别人接手的时候,是很痛苦的。
用户评论