在进行数据迁移需要编写大量的类似代码,使用原生jdbc时,编写增删改查业务时, 编写查询的时候,需要指定返回的类型,编写新增修改时 需要指定出每一个字段,就导致需要编写大量的类似代码。为了解决这个问题定义出公共类,写一套orm框架。
为了避免编写类似的代码,最初的想法是替换一套orm框架,比如jpa ,mybatis-plus ,但是使用别人的orm框架,避免编写大量sql语句,但是会遇到数据源之间切换,可能代码会更加复杂,且别人的orm框架还是要遵守三层规范,虽然基础增删改查不用编写,数据持久层并没有帮我省去,还是要出复制粘贴,对其传输实体类进行修改,一张表进行迁移要写两套dao层,新旧表结构不同。
根据设计模式,编写一套自己的orm框架,彻底砍掉dao层,定义业务工具类关联dao抽象工具。 不在需要编写任何一行sql语句。
具体代码如下:
public class PgsqlDaoBase<T> { public final static ConcurrentHashMap<String,String > map = new ConcurrentHashMap<>(); private String UPDATE="UPDATE"; private String INSERT="INSERT"; @Resource(name = SaasBusinessDsConfig.SaasBusinessDbNamedParameterJdbcTemplateName) protected NamedParameterJdbcTemplate namedParameterJdbcTemplate; protected int[] updateExistingRecords(List<T> existingRecords ,String tableName) { Class<?> entityClass = existingRecords.get(0).getClass(); Field[] fields = entityClass.getDeclaredFields(); List<String> fieldNames = new ArrayList<>(); String sql = ""; if ( map.get(UPDATE +tableName) ==null || map.get(UPDATE+tableName).equals("")){ for (Field field : fields) { fieldNames.add(field.getName()); } StringBuilder sb = new StringBuilder(); sb.append("UPDATE ").append(tableName).append(" SET "); for (String fieldName : fieldNames) { sb.append(fieldName).append(" = :").append(fieldName).append(", "); } sb.delete(sb.length() - 2, sb.length()); // 移除最后一个逗号和空格 sb.append(" WHERE id = :id and accId = :accId"); sql = sb.toString(); }else{ sql=map.get(UPDATE+tableName); } SqlParameterSource[] batchParams = SqlParameterSourceUtils.createBatch(existingRecords); return namedParameterJdbcTemplate.batchUpdate(sql, batchParams); } public int [] insertNewRecords(List<T> newRecords,String name) { // 获取实体类的表名 Class<?> entityClass = newRecords.get(0).getClass(); // 获取实体类的字段信息 Field[] fields = entityClass.getDeclaredFields(); List<String> fieldNames = new ArrayList<>(); String sql = ""; if ( map.get(INSERT+name) ==null || map.get(INSERT+name).equals("")){ for (Field field : fields) { fieldNames.add(field.getName()); } StringBuilder sb = new StringBuilder(); sb.append("INSERT INTO ").append(name).append(" ("); for (String fieldName : fieldNames) { sb.append(fieldName).append(", "); } sb.delete(sb.length() - 2, sb.length()); sb.append(") VALUES ("); for (String fieldName : fieldNames) { sb.append(":").append(fieldName).append(", "); } sb.delete(sb.length() - 2, sb.length()); sb.append(")"); }else{ sql=map.get(UPDATE+name); } // 执行插入操作 SqlParameterSource[] batchParams = SqlParameterSourceUtils.createBatch(newRecords); return namedParameterJdbcTemplate.batchUpdate(sql, batchParams); } public boolean isRecordExists(Integer id ,String name) { String sql = "SELECT COUNT(id) FROM "+name+" WHERE id = :id"; Map<String, Integer> paramMap = Collections.singletonMap("id", id); int count = namedParameterJdbcTemplate.queryForObject(sql, paramMap, Integer.class); return count > 0; } public int deleteBatch(List<Integer> filteredIds,String name) { if (filteredIds == null || filteredIds.isEmpty()) { return 0; } String sql = "delete from "+name+" where id in (:ids)"; SqlParameterSource params = new MapSqlParameterSource("ids", filteredIds); return namedParameterJdbcTemplate.update(sql, params); } }看到这里大家可能会觉得并没有省去持久层,这里只是定义了一个供所有数据库表增删改查的的工具
public abstract class BaseSyncService<K extends HasId, V> extends DataSyncHelper2<V> { protected Boolean isLock = true; @Autowired protected DiscounDaoBase<V> payRecordDaos; @Autowired protected PageBaseMiddle<K> payClassDao; // 堆代码 duidaima.com //1.0 计算总页说 //2.0 将页进行拆分,获取页中的数据List //3.0 将查询到的数据转换未目标对象 //4.0 先创建数据源 //4.1 再对插入数据源的数据进行编写sql public String NAMES; public abstract void setTableNAME(); public String LOCK_KEY = NAMES + "-task-sync-lock"; public String DATA_KEY = NAMES + "-data-task-sync"; public String INCREMENT = NAMES + "-INCREMENT"; public void unSync() { RedisLockDao.unlock(LOCK_KEY); } public void sync(SyncDataDto args, Class<V> type) { syncStart(args, Constants.MAX_PAGE_SIZE, type, NAMES); } public void taskSync(Class<V> type) { log.info("准备同步"); if (!acquireLock(LOCK_KEY)) { log.info("同步失败,请清理缓存"); return; } try { SyncDataDto syncDataDto = new SyncDataDto(); long startId = syncDataDto.getStartId(); long endId = syncDataDto.getEndId(); // 进行同步操作,使用 startId 和 endId 进行同步 // 更新同步状态 String jsonStr = RedisDao.get(DATA_KEY); startId = JsonUtils.toObject(jsonStr == null ? "{}" : jsonStr, SyncDataDto.class).getStartId(); endId = startId + Constants.MAX_PAGE_SIZE; Long pClassMaxId = payRecordDaos.getMaxId(NAMES); while (startId < pClassMaxId) { // 存储同步状态到 Redis syncDataDto.setStartId(startId); syncDataDto.setEndId(endId); sync(syncDataDto, type); startId += Constants.MAX_PAGE_SIZE; endId = startId + Constants.MAX_PAGE_SIZE; syncDataDto.setStartId(startId); syncDataDto.setEndId(endId); RedisDao.set(DATA_KEY, JsonUtils.toJsonStr(syncDataDto)); log.info("同步进度 start {} end {}", startId, endId); } } catch (Exception ex) { log.error(ex.getMessage(), ex); } finally { RedisLockDao.unlock(LOCK_KEY); } } @Override public void setVariable(String value) { this.KEYVALUE = this.LOCK_KEY; } @Override public void claneCache() { clearCache(LOCK_KEY, DATA_KEY, INCREMENT); } @Override protected V fetchById(int Id) { return null; } @Override protected int update(V elem) { return 0; } @Override protected int removeById(V ele) { return 0; } @Override protected int getCount(SyncDataDto syncData) { setVariable(LOCK_KEY); return payRecordDaos.counts(syncData, NAMES); } @Override protected List<V> getList(SyncDataDto syncData, Class<V> type) { return payRecordDaos.getListPage(syncData, NAMES, type); } @Override protected List<K> convertTo(List<V> dataList) { ArrayList<K> objects = new ArrayList<>(); for (V sqlServerEntity : dataList) { K entity = convertToMemberEntity(sqlServerEntity); if (entity != null) { objects.add(entity); } } return objects; } protected abstract K convertToMemberEntity(V sqlServerEntity); //取消增量 @Override public void StopSyncThenSync() { isLock = false; } @Override protected int[] insertBatch(List<?> elems, String name) { return payClassDao.insertBatch((List<K>) elems, name); } @Override protected int[] insertBatchs(List<?> elems, String name) { return payClassDao.insertNewRecords((List<K>) elems, name); } @Override public void SyncThenSync(Class<V> type) { if (RedisLockDao.lock(INCREMENT)) { try { //获取最大的版本号 while (isLock) { SqlSyncVersionEntity name = sqlSyncVersionDao.getName(INCREMENT); int s = Integer.parseInt(name == null ? "0" : String.valueOf(name.getVersion())); //查询到最大的版本进行保存 int maxversion = payRecordDaos.maxVersion(s - 1, NAMES); if (maxversion != s) { List<V> lastChangeVersions = payRecordDaos.getLastChangeVersions(s - 1, NAMES, type); //保存查询到的数据 //一次处理一千条数据 //再次进行一次转换 List<K> memberEntities = convertTo(lastChangeVersions); payClassDao.insertBatch(memberEntities, NAMES); //删除过的数据永远没有id 只能通过最后一条日志进行删除 //删除 新表 List<Integer> filteredIds = payRecordDaos.getDeleteIds(s - 1, NAMES); payClassDao.deleteBatch(filteredIds, NAMES); sqlSyncVersionDao.insertandupdateVersions(INCREMENT, maxversion); } Thread.sleep(5 * 60 * 1000); } } catch (Exception ex) { log.error(ex.getMessage(), ex); } finally { RedisLockDao.unlock(INCREMENT); } } log.error("正圯同步中........."); } }@Autowired 注入了一个泛型的的dao 这个dao层遇到不同的实体就会进行随意的切换,这里定义的两个抽象方法,需要自己的去实现 实体之间转换的操作 ,你想要把那个字段值的赋值给哪一个字段 ,填充数据库表的抽象 ,
最终成就:完成工具类后 ,就不是关注迁移代码的编写,而是迁移的过程中。
public List<T> getListPage(SyncDataDto args, String name ,Class<T> type ) { MapSqlParameterSource params = new MapSqlParameterSource(); StringBuilder wheres = builderWheres(args, params); StringBuilder builder = new StringBuilder(); builder.append(" SELECT * FROM( ") .append(" SELECT ROW_NUMBER() OVER(ORDER BY id ) as rowNumber, ") .append(" * ") .append(" FROM " +name) .append(" WHERE 1=1 ") .append(wheres) .append(" ) t ") .append(" WHERE t.rowNumber >= :bgNumber AND t.rowNumber < :edNumber ");对查询的条件进行优化,尽量分批查询,且一定要保证数据是顺序查询不丢失。