在公司的实际业务场景中,初始化数据是个避不过去的话题。比如项目上线初期,将数据库的商品数据同步到搜索引擎 ElasticSearcgh 、缓存 Redis 或者其它的数据库。在这种初始化同步的场景下,如果使用单线程同步,几千万或者上亿的数据可能需要同步很长时间,而且其中一些细节知识点也很值得考究。如何解决大数据深分页问题?如何防止应用内存溢出等。
让我们先来讨论深分页问题,这是一个经久不衰的问题。如果你还不了解 MySQL 深分页是什么,请继续阅读。
MySQL 大数据量深分页问题是指当使用 MySQL 数据库进行大数据量分页查询时,例如从一个包含数百万条记录的表中查询最后一小部分数据时,会出现性能问题。具体来说,当使用 LIMIT 和 OFFSET 关键字来分页查询时,MySQL 会将查询结果集中的所有记录都加载到内存中,然后再将指定的分页数据返回给客户端。这种方式在处理小数据集时是有效的,但对于大数据集来说,它会导致大量的内存使用和性能问题。
当 OFFSET 数量很大时,MySQL 必须扫描并跳过所有之前的记录,这会导致非常慢的查询速度。此外,当使用 InnoDB 存储引擎时,随着 OFFSET 增加,I/O 操作会变得更加频繁,因为 MySQL 必须从磁盘中读取更多的数据页。
select * from product where id < 3000000 limit 1;因为我们的目标是同步所有商品库的数据,所以每次取 5000 条,记住最大的 ID,下次以该 ID 充当查询条件。通过该方式完美解决深分页造成的性能损耗,也就是上文说的分段查询。
直接得出结论:如果没有分表,该方法可以有效解决深度分页的问题。然而,一旦进行了分表,该方法将不再适用。这是因为商品表的分片键不是 ID 字段,如果执行此 SQL 语句,将会查询所有商品分表。那么,就到了解决深分页的主角登场了,那就是流式查询。流式查询通过在MySQL服务器端使用游标进行数据的逐行读取,可以避免一次性将所有数据加载到内存中,从而节省内存空间并提高查询效率。
简单来说,当你调用 MySQL 流式查询时,就像建立了一个“管道”一样,可以源源不断地将数据传输到客户端。但是,流式查询不是银弹,当流式查询数据没有完全传输完时,当前的数据库连接是独占的,无法被其它线程所使用。如果说需要频繁进行流式查询操作,可以单独拆一个数据库连接池,不要和正常业务逻辑共用一个连接池。
import lombok.SneakyThrows; import java.util.concurrent.LinkedBlockingQueue; public class ProductSyncExecutor { // 堆代码 duidaima.com private static final LinkedBlockingQueue<Object> blockingQueue = new LinkedBlockingQueue<>(10); public static void main(String[] args) { Thread product1 = new Thread(new ProductProduce()); Thread product2 = new Thread(new ProductProduce()); Thread product3 = new Thread(new ProductProduce()); Thread consumer1 = new Thread(new ProductConsumer()); Thread consumer2 = new Thread(new ProductConsumer()); Thread consumer3 = new Thread(new ProductConsumer()); product1.setName("Thread-Product-1"); product1.start(); product2.setName("Thread-Product-2"); product2.start(); product3.setName("Thread-Product-3"); product3.start(); consumer1.setName("Thread-Consumer-1"); consumer1.start(); consumer2.setName("Thread-Consumer-2"); consumer2.start(); consumer3.setName("Thread-Consumer-3"); consumer3.start(); } public static class ProductProduce implements Runnable { @SneakyThrows @Override public void run() { while (true) { blockingQueue.put(new Object()); System.out.println("【生产者:" + Thread.currentThread().getName() + "】放入一个商品,现容量:" + blockingQueue.size()); Thread.sleep(100); } } } public static class ProductConsumer implements Runnable { @SneakyThrows @Override public void run() { while (true) { blockingQueue.take(); System.out.println("【消费者:" + Thread.currentThread().getName() + "】消费了一个商品,现容量:" + blockingQueue.size()); Thread.sleep(200); } } } }
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.apache.ibatis.annotations.Options; import org.apache.ibatis.annotations.ResultType; import org.apache.ibatis.annotations.Select; import org.apache.ibatis.mapping.ResultSetType; import org.apache.ibatis.session.ResultHandler; import org.opengoofy.congomall.biz.product.infrastructure.dao.entity.ProductSkuDO; /** * 商品 SKU 持久层 */ public interface ProductSkuMapper extends BaseMapper<ProductSkuDO> { /** * 通过流式查询的方式获取所有商品 SKU */ @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE) @ResultType(ProductSkuDO.class) @Select("SELECT * FROM product_sku WHERE del_flag = '0'") void listAllProductSkuStreamQuery(ResultHandler<ProductSkuDO> handler); }2. 并发同步程序
import com.alibaba.fastjson.JSON; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.opengoofy.congomall.biz.product.infrastructure.dao.entity.ProductSkuDO; import org.opengoofy.congomall.biz.product.infrastructure.dao.mapper.ProductSkuMapper; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.GetMapping; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; /** * 初始化商品任务,通过并发编程完成生产-消费模型,达到快速同步的效果 */ @Slf4j @Component @RequiredArgsConstructor public class InitializeProductJobHandlerTwo { /** * 商品 SKU 持久层 */ private final ProductSkuMapper productSkuMapper; /** * Hippo4j 线程池,执行同步程序 */ private final ThreadPoolExecutor productSkuInitSyncThreadPoolExecutor; /** * 单次同步 ElasticSearch 数量 */ private static final Integer MAX_SYNC_SIZE = 5000; /** * 阻塞队列最大容量,相当于一个缓冲池大小 */ private static final Integer MAX_POOL_SIZE = 200000; /** * 记录同步 */ private static final AtomicInteger COUNT_NUM = new AtomicInteger(0); /** * 记录实际同步数量 */ private static final LongAdder SYNC_SUM = new LongAdder(); @GetMapping("/init/product") public void execute() throws Exception { BlockingQueue<ProductSkuDO> blockingQueueCachePool = new LinkedBlockingQueue<>(MAX_POOL_SIZE); productSkuMapper.listAllProductSkuStreamQuery(resultContext -> { // 记录流式查询总数量 COUNT_NUM.incrementAndGet(); // 每次向缓冲池添加 MAX_SYNC_SIZE 记录 try { blockingQueueCachePool.put(resultContext.getResultObject()); } catch (Exception ex) { log.error("商品SKU基础数据初始化流程, 添加阻塞队列缓冲池失败, 数据记录: {}", JSON.toJSONString(resultContext.getResultObject()), ex); } // 避免请求目标数据库(ElasticSearch 或其它)次数过多,所以建议每次 MAX_SYNC_SIZE 条数,虽然可能不够这个数 if (blockingQueueCachePool.size() >= MAX_SYNC_SIZE) { productSkuInitSyncThreadPoolExecutor.execute(() -> executeTask(blockingQueueCachePool)); } }); // 兜底,将最后缓冲的任务执行 productSkuInitSyncThreadPoolExecutor.execute(() -> lastOnceExecuteTask(blockingQueueCachePool)); } private void executeTask(BlockingQueue<ProductSkuDO> blockingQueueCachePool) { List<ProductSkuDO> copyList = new ArrayList<>(MAX_SYNC_SIZE); try { int drainTo = blockingQueueCachePool.drainTo(copyList, MAX_SYNC_SIZE); if (drainTo > 0) { // 此处决定向何处同步数据 // ...... SYNC_SUM.add(drainTo); } } catch (Exception ex) { log.error("商品SKU基础数据初始化流程执行失败", ex); } } private void lastOnceExecuteTask(BlockingQueue<ProductSkuDO> blockingQueueCachePool) { List<ProductSkuDO> lastProductSkus = blockingQueueCachePool.stream().parallel().collect(Collectors.toList()); try { SYNC_SUM.add(lastProductSkus.size()); // 此处决定向何处同步数据 // ...... } catch (Exception ex) { log.error("商品SKU基础数据初始化流程执行最后一次同步失败", ex); } } }3. 任务进度监控
import com.alibaba.fastjson.JSON; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.opengoofy.congomall.biz.product.infrastructure.dao.entity.ProductSkuDO; import org.opengoofy.congomall.biz.product.infrastructure.dao.mapper.ProductSkuMapper; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; /** * 初始化商品任务,通过并发编程完成生产-消费模型,达到快速同步的效果 */ @Slf4j @Component @RequiredArgsConstructor public class InitializeProductJobHandler extends IJobHandler { /** * 商品 SKU 持久层 */ private final ProductSkuMapper productSkuMapper; /** * Hippo4j 线程池,执行同步程序 */ private final ThreadPoolExecutor productSkuInitSyncThreadPoolExecutor; /** * 单次同步 ElasticSearch 数量 */ private static final Integer MAX_SYNC_SIZE = 5000; /** * 阻塞队列最大容量,相当于一个缓冲池大小 */ private static final Integer MAX_POOL_SIZE = 200000; /** * 记录开始时间 */ private static Long START_TIME = 0L; /** * 记录同步 */ private static final AtomicInteger COUNT_NUM = new AtomicInteger(0); /** * 记录实际同步数量 */ private static final LongAdder SYNC_SUM = new LongAdder(); /** * 打印输出监控定时器 */ private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors.newSingleThreadScheduledExecutor(); @XxlJob(value = "demoJobHandler") @Override public void execute() throws Exception { // 定时打印执行进度 printPoolAndScheduledInfo(); // 执行商品 SKU 同步程序 executeProductSkuSync(); // 释放定时器、同步线程池资源 shutdownPoolAndPrintCountSize(); } void executeProductSkuSync() { BlockingQueue<ProductSkuDO> blockingQueueCachePool = new LinkedBlockingQueue<>(MAX_POOL_SIZE); productSkuMapper.listAllProductSkuStreamQuery(resultContext -> { // 记录流式查询总数量 COUNT_NUM.incrementAndGet(); // 每次向缓冲池添加 MAX_SYNC_SIZE 记录 try { blockingQueueCachePool.put(resultContext.getResultObject()); } catch (Exception ex) { log.error("商品SKU基础数据初始化流程, 添加阻塞队列缓冲池失败, 数据记录: {}", JSON.toJSONString(resultContext.getResultObject()), ex); } // 避免请求目标数据库(ElasticSearch 或其它)次数过多,所以建议每次 MAX_SYNC_SIZE 条数,虽然可能不够这个数 if (blockingQueueCachePool.size() >= MAX_SYNC_SIZE) { productSkuInitSyncThreadPoolExecutor.execute(() -> executeTask(blockingQueueCachePool)); } }); // 兜底,将最后缓冲的任务执行 productSkuInitSyncThreadPoolExecutor.execute(() -> lastOnceExecuteTask(blockingQueueCachePool)); } private void executeTask(BlockingQueue<ProductSkuDO> blockingQueueCachePool) { List<ProductSkuDO> copyList = new ArrayList<>(MAX_SYNC_SIZE); try { int drainTo = blockingQueueCachePool.drainTo(copyList, MAX_SYNC_SIZE); if (drainTo > 0) { // 此处决定向何处同步数据 // ...... SYNC_SUM.add(drainTo); } } catch (Exception ex) { log.error("商品SKU基础数据初始化流程执行失败", ex); } } private void lastOnceExecuteTask(BlockingQueue<ProductSkuDO> blockingQueueCachePool) { List<ProductSkuDO> lastProductSkus = blockingQueueCachePool.stream().parallel().collect(Collectors.toList()); try { SYNC_SUM.add(lastProductSkus.size()); // 此处决定向何处同步数据 // ...... } catch (Exception ex) { log.error("商品SKU基础数据初始化流程执行最后一次同步失败", ex); } } private void printPoolAndScheduledInfo() { START_TIME = System.currentTimeMillis(); SCHEDULED_EXECUTOR.scheduleAtFixedRate(() -> { log.info("商品SKU基础数据初始化流程, 当前已同步总数量: {}", COUNT_NUM.get()); log.info("商品SKU基础数据初始化流程, 线程池状态打印, 当前活动线程数: {}, 当前排队任务数: {}, 执行完成线程数: {}, 线程池任务总数: {}", productSkuInitSyncThreadPoolExecutor.getActiveCount(), productSkuInitSyncThreadPoolExecutor.getQueue().size(), productSkuInitSyncThreadPoolExecutor.getCompletedTaskCount(), productSkuInitSyncThreadPoolExecutor.getTaskCount()); }, 30, 10, TimeUnit.SECONDS); } private void shutdownPoolAndPrintCountSize() { // 关闭定时器线程池 SCHEDULED_EXECUTOR.shutdown(); // 关闭数据同步线程池 productSkuInitSyncThreadPoolExecutor.shutdown(); while (true) { if (SCHEDULED_EXECUTOR.isTerminated() && productSkuInitSyncThreadPoolExecutor.isTerminated()) { log.info("商品SKU基础数据初始化流程, 总条数: {}, 同步成功数: {}, 同步执行总耗时: {}", COUNT_NUM.get(), SYNC_SUM.longValue(), System.currentTimeMillis() - START_TIME); break; } } } }
细心的同学可能发现,除了增加定时任务线程池外,还添加调用 shutdownPoolAndPrintCountSize 关闭线程池资源的流程。使用过且不会再使用的资源,及时关闭可释放系统相关资源,这是个很好的编码习惯。
之前生产环境通过文章中的流程跑过,大概一千多万的数据两分钟左右就搞定了。大家在进行使用上述程序的时候,服务器资源、线程数量以及 ElasticSearch 配置都是性能考量点,每一个步骤都会影响最终完成时间,需要不断模拟各种参数进行逐步调优。
在本文中,我们讨论了如何快速同步亿级商品数据到三方数据库的问题。为了实现高效的数据同步,我们采用了并发编程和解决深分页问题的方法。具体来说,我们通过使用线程池和多线程技术,对数据同步过程进行了并发处理,从而提高了同步效率。同时,为了解决深分页问题,我们采用了 MySQL 流式查询技术,避免了一次性将大量数据加载到内存中的问题,提高了查询效率和减少了内存占用。