@Transactional public void saveWithMessage(BusinessDO businessDO){ String id = IdUtils.nextId(); businessDO.setId(id); xxxRepository.save(businessDO); // 堆代码 duidaima.com BusinessMessage businessMessage = new BusinessMessage(); businessMessage.setKey(id); SendResult send = rocketMQTemplate.syncSend("test-topic", sendMessage); }在这段代码里通过@Transactional注解将数据库的操作以及发送消息放到一个事务中,如果数据库的保存或者消息发送失败,则回滚事务。乍一看似乎没什么问题,但稍微推敲一下就会发现此方式有如下两个缺陷:
@Transactional public void saveWithMessage(BusinessDO businessDO){ xxxRepository.save(businessDO); TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCommit() { BusinessMessage businessMessage = new BusinessMessage(); businessMessage.setXXX(); rocketMQTemplate.syncSend("test-topic", sendMessage); } }); }TransactionSynchronizationManager.registerSynchronization 是 Spring 框架中用于注册事务同步的方法。通过这个方法,你可以在事务提交、回滚或完成时执行一些额外的逻辑。在上述代码中,使用了afterCommit方法,在事务成功提交后执行发送消息操作,确保在数据库操作成功且事务稳定的情况下发送消息。
@Transactional public void saveWithMessage(BusinessDO businessDO){ xxxRepository.save(businessDO); eventPublisher.publishEvent(new UserCreatedEvent(registerUser)); } @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void handleUserRegisteredEvent(UserCreatedEvent userCreatedEvent) { rocketMQTemplate.syncSend("test-topic", sendMessage); }这里需要说明一下:在默认情况下Spring的事件监听机制并不是异步的(上次群友弄错了),而是同步的将代码进行解耦,@TransactionalEventListener也是通过同步的方式,但是加入了回调的方式来解决,这样就能够控制事务进行Commited、Rollback时才进行事件的处理,来达到事务同步的目的。
@Transactional public void saveWithMessage(BusinessDO businessDO){ TransactionMessage transactionMessage = new TransactionMessage(); transactionMessage.setStaus(MessageStatus.WAITING_SEND); transactionMessage.setMessageKey(businessDO.getId()); ... xxxRepository.save(businessDO); messageRepository.save(TransactionMessage); TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCommit() { messageService.sendMessage(transactionMessage,businessDO); } }); } public void sendMessage(TransactionMessage transactionMessage,BusinessDO businessDO){ BusinessMessage businessMessage = new BusinessMessage(); businessMessage.setXXX(); try{ rocketMQTemplate.syncSend("test-topic", sendMessage); transactionMessage.setStatus(MessageStatus.SUCCESS); messageRepository.update(transactionMessage); }catch (Exception e){ // 执行失败的业务逻辑 } }3.1 问题
虽然这种方式能够保证消息在事务提交后发送,且能够保证最终一致性,但仍然存在一些缺陷:
首先,需要额外的消息表,增加了系统复杂度。(针对此问题,我们又可以将该功能单独提取出来,做成一个消息服务来统一处理,考虑篇幅问题,这里暂不展开。)