• 旧系统切换数据库时遇到的的两个问题
  • 发布于 2个月前
  • 430 热度
    0 评论
在对公司数据库进行热迁移的时候,遇到了很多的问题,针对如下两个大问题分析解决问题。
问题1.需要编写大量类似代码(严重影响编写效率)

在进行数据迁移需要编写大量的类似代码,使用原生jdbc时,编写增删改查业务时, 编写查询的时候,需要指定返回的类型,编写新增修改时 需要指定出每一个字段,就导致需要编写大量的类似代码。为了解决这个问题定义出公共类,写一套orm框架。


问题1.解决方案1:

为了避免编写类似的代码,最初的想法是替换一套orm框架,比如jpa ,mybatis-plus ,但是使用别人的orm框架,避免编写大量sql语句,但是会遇到数据源之间切换,可能代码会更加复杂,且别人的orm框架还是要遵守三层规范,虽然基础增删改查不用编写,数据持久层并没有帮我省去,还是要出复制粘贴,对其传输实体类进行修改,一张表进行迁移要写两套dao层,新旧表结构不同。


问题1.解决方案2

根据设计模式,编写一套自己的orm框架,彻底砍掉dao层,定义业务工具类关联dao抽象工具。 不在需要编写任何一行sql语句。    

具体代码如下:

public class PgsqlDaoBase<T> {

    public  final static ConcurrentHashMap<String,String > map = new ConcurrentHashMap<>();
    private String UPDATE="UPDATE";
    private String INSERT="INSERT";

    @Resource(name = SaasBusinessDsConfig.SaasBusinessDbNamedParameterJdbcTemplateName)
    protected NamedParameterJdbcTemplate namedParameterJdbcTemplate;

    protected int[] updateExistingRecords(List<T> existingRecords ,String tableName) {
        Class<?> entityClass = existingRecords.get(0).getClass();
        Field[] fields = entityClass.getDeclaredFields();
        List<String> fieldNames = new ArrayList<>();
        String sql = "";
        if ( map.get(UPDATE +tableName) ==null ||  map.get(UPDATE+tableName).equals("")){
            for (Field field : fields) {
                fieldNames.add(field.getName());
            }
            StringBuilder sb = new StringBuilder();
            sb.append("UPDATE ").append(tableName).append(" SET ");
            for (String fieldName : fieldNames) {
                sb.append(fieldName).append(" = :").append(fieldName).append(", ");
            }
            sb.delete(sb.length() - 2, sb.length()); // 移除最后一个逗号和空格
            sb.append(" WHERE id = :id and accId = :accId");
            sql = sb.toString();
        }else{
            sql=map.get(UPDATE+tableName);
        }
        SqlParameterSource[] batchParams = SqlParameterSourceUtils.createBatch(existingRecords);
        return namedParameterJdbcTemplate.batchUpdate(sql, batchParams);
    }


    public int [] insertNewRecords(List<T> newRecords,String name) {
        // 获取实体类的表名
        Class<?> entityClass = newRecords.get(0).getClass();
        // 获取实体类的字段信息
        Field[] fields = entityClass.getDeclaredFields();
        List<String> fieldNames = new ArrayList<>();
        String sql = "";
        if ( map.get(INSERT+name) ==null ||  map.get(INSERT+name).equals("")){

            for (Field field : fields) {
                fieldNames.add(field.getName());
            }
            StringBuilder sb = new StringBuilder();
            sb.append("INSERT INTO ").append(name).append(" (");
            for (String fieldName : fieldNames) {
                sb.append(fieldName).append(", ");
            }
            sb.delete(sb.length() - 2, sb.length());
            sb.append(") VALUES (");
            for (String fieldName : fieldNames) {
                sb.append(":").append(fieldName).append(", ");
            }
            sb.delete(sb.length() - 2, sb.length());
            sb.append(")");
        }else{
            sql=map.get(UPDATE+name);
        }
        // 执行插入操作
        SqlParameterSource[] batchParams = SqlParameterSourceUtils.createBatch(newRecords);
        return namedParameterJdbcTemplate.batchUpdate(sql, batchParams);
    }
    public boolean isRecordExists(Integer id ,String name) {
        String sql = "SELECT COUNT(id) FROM "+name+"  WHERE id = :id";
        Map<String, Integer> paramMap = Collections.singletonMap("id", id);
        int count = namedParameterJdbcTemplate.queryForObject(sql, paramMap, Integer.class);
        return count > 0;
    }

    public int deleteBatch(List<Integer> filteredIds,String name) {
        if (filteredIds == null || filteredIds.isEmpty()) {
            return 0;
        }
        String sql = "delete from "+name+" where id in (:ids)";
        SqlParameterSource params = new MapSqlParameterSource("ids", filteredIds);
        return namedParameterJdbcTemplate.update(sql, params);
    }
}
看到这里大家可能会觉得并没有省去持久层,这里只是定义了一个供所有数据库表增删改查的的工具
public abstract class BaseSyncService<K extends HasId, V> extends DataSyncHelper2<V> {
    protected Boolean isLock = true;
    @Autowired
    protected DiscounDaoBase<V> payRecordDaos;

    @Autowired
    protected PageBaseMiddle<K> payClassDao;
    // 堆代码 duidaima.com
    //1.0 计算总页说
    //2.0 将页进行拆分,获取页中的数据List
    //3.0 将查询到的数据转换未目标对象
    //4.0 先创建数据源
    //4.1 再对插入数据源的数据进行编写sql
    public String NAMES;

    public abstract void setTableNAME();

    public String LOCK_KEY = NAMES + "-task-sync-lock";
    public String DATA_KEY = NAMES + "-data-task-sync";

    public String INCREMENT = NAMES + "-INCREMENT";

    public void unSync() {
        RedisLockDao.unlock(LOCK_KEY);
    }

    public void sync(SyncDataDto args, Class<V> type) {
        syncStart(args, Constants.MAX_PAGE_SIZE, type, NAMES);
    }

    public void taskSync(Class<V> type) {
        log.info("准备同步");
        if (!acquireLock(LOCK_KEY)) {
            log.info("同步失败,请清理缓存");
            return;
        }
        try {
            SyncDataDto syncDataDto = new SyncDataDto();
            long startId = syncDataDto.getStartId();
            long endId = syncDataDto.getEndId();
            // 进行同步操作,使用 startId 和 endId 进行同步
            // 更新同步状态
            String jsonStr = RedisDao.get(DATA_KEY);
            startId = JsonUtils.toObject(jsonStr == null ? "{}" : jsonStr, SyncDataDto.class).getStartId();
            endId = startId + Constants.MAX_PAGE_SIZE;

            Long pClassMaxId = payRecordDaos.getMaxId(NAMES);

            while (startId < pClassMaxId) {

                // 存储同步状态到 Redis
                syncDataDto.setStartId(startId);
                syncDataDto.setEndId(endId);

                sync(syncDataDto, type);

                startId += Constants.MAX_PAGE_SIZE;
                endId = startId + Constants.MAX_PAGE_SIZE;

                syncDataDto.setStartId(startId);
                syncDataDto.setEndId(endId);

                RedisDao.set(DATA_KEY, JsonUtils.toJsonStr(syncDataDto));

                log.info("同步进度 start {}  end {}", startId, endId);
            }
        } catch (Exception ex) {
            log.error(ex.getMessage(), ex);
        } finally {
            RedisLockDao.unlock(LOCK_KEY);
        }
    }

    @Override
    public void setVariable(String value) {
        this.KEYVALUE = this.LOCK_KEY;
    }

    @Override
    public void claneCache() {

        clearCache(LOCK_KEY, DATA_KEY, INCREMENT);
    }

    @Override
    protected V fetchById(int Id) {
        return null;
    }

    @Override
    protected int update(V elem) {
        return 0;
    }

    @Override
    protected int removeById(V ele) {
        return 0;
    }

    @Override
    protected int getCount(SyncDataDto syncData) {
        setVariable(LOCK_KEY);
        return payRecordDaos.counts(syncData, NAMES);
    }

    @Override
    protected List<V> getList(SyncDataDto syncData, Class<V> type) {
        return payRecordDaos.getListPage(syncData, NAMES, type);
    }

    @Override
    protected List<K> convertTo(List<V> dataList) {
        ArrayList<K> objects = new ArrayList<>();
        for (V sqlServerEntity : dataList) {
            K entity = convertToMemberEntity(sqlServerEntity);
            if (entity != null) {
                objects.add(entity);
            }
        }
        return objects;
    }

    protected abstract K convertToMemberEntity(V sqlServerEntity);


    //取消增量
    @Override
    public void StopSyncThenSync() {
        isLock = false;
    }

    @Override
    protected int[] insertBatch(List<?> elems, String name) {
        return payClassDao.insertBatch((List<K>) elems, name);
    }

    @Override
    protected int[] insertBatchs(List<?> elems, String name) {
        return payClassDao.insertNewRecords((List<K>) elems, name);
    }

    @Override
    public void SyncThenSync(Class<V> type) {

        if (RedisLockDao.lock(INCREMENT)) {
            try {
                //获取最大的版本号
                while (isLock) {
                    SqlSyncVersionEntity name = sqlSyncVersionDao.getName(INCREMENT);

                    int s = Integer.parseInt(name == null ? "0" : String.valueOf(name.getVersion()));
                    //查询到最大的版本进行保存
                    int maxversion = payRecordDaos.maxVersion(s - 1, NAMES);
                    if (maxversion != s) {
                        List<V> lastChangeVersions = payRecordDaos.getLastChangeVersions(s - 1, NAMES, type);
                        //保存查询到的数据    //一次处理一千条数据
                        //再次进行一次转换
                        List<K> memberEntities = convertTo(lastChangeVersions);
                        payClassDao.insertBatch(memberEntities, NAMES);
                        //删除过的数据永远没有id  只能通过最后一条日志进行删除
                        //删除     新表
                        List<Integer> filteredIds = payRecordDaos.getDeleteIds(s - 1, NAMES);
                        payClassDao.deleteBatch(filteredIds, NAMES);
                        sqlSyncVersionDao.insertandupdateVersions(INCREMENT, maxversion);
                    }
                    Thread.sleep(5 * 60 * 1000);
                }
            } catch (Exception ex) {
                log.error(ex.getMessage(), ex);
            } finally {
                RedisLockDao.unlock(INCREMENT);
            }
        }
        log.error("正圯同步中.........");
    }
}
@Autowired  注入了一个泛型的的dao   这个dao层遇到不同的实体就会进行随意的切换,这里定义的两个抽象方法,需要自己的去实现        实体之间转换的操作 ,你想要把那个字段值的赋值给哪一个字段  ,填充数据库表的抽象  ,

最终成就:完成工具类后 ,就不是关注迁移代码的编写,而是迁移的过程中。


问题2  性能问题:
在上篇我们通过观察ct日志,进行数据同步的时候,几乎每时每刻都在查看ct中是否有数据,准备往新的数据库中同步,在一直监听ct的过程中原表数据在修改,此时一直监听会进行等待,导致查询ct超时,同步功能异常结束。
解决方式:   减少查询的频率,减少系统的压力。增加稳定性,减少资源消耗。
public List<T> getListPage(SyncDataDto args, String name ,Class<T> type ) {
    MapSqlParameterSource params = new MapSqlParameterSource();
    StringBuilder wheres = builderWheres(args, params);
    StringBuilder builder = new StringBuilder();
    builder.append(" SELECT * FROM( ")
            .append(" SELECT ROW_NUMBER() OVER(ORDER BY id ) as rowNumber, ")
            .append(" * ")
            .append(" FROM  " +name)
            .append(" WHERE 1=1 ")
            .append(wheres)
            .append(" ) t ")
            .append(" WHERE t.rowNumber >= :bgNumber AND t.rowNumber < :edNumber ");
对查询的条件进行优化,尽量分批查询,且一定要保证数据是顺序查询不丢失。
用户评论