闽公网安备 35020302035485号
@Transactional
public void saveWithMessage(BusinessDO businessDO){
String id = IdUtils.nextId();
businessDO.setId(id);
xxxRepository.save(businessDO);
// 堆代码 duidaima.com
BusinessMessage businessMessage = new BusinessMessage();
businessMessage.setKey(id);
SendResult send = rocketMQTemplate.syncSend("test-topic", sendMessage);
}
在这段代码里通过@Transactional注解将数据库的操作以及发送消息放到一个事务中,如果数据库的保存或者消息发送失败,则回滚事务。乍一看似乎没什么问题,但稍微推敲一下就会发现此方式有如下两个缺陷:
@Transactional
public void saveWithMessage(BusinessDO businessDO){
xxxRepository.save(businessDO);
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
BusinessMessage businessMessage = new BusinessMessage();
businessMessage.setXXX();
rocketMQTemplate.syncSend("test-topic", sendMessage);
}
});
}
TransactionSynchronizationManager.registerSynchronization 是 Spring 框架中用于注册事务同步的方法。通过这个方法,你可以在事务提交、回滚或完成时执行一些额外的逻辑。在上述代码中,使用了afterCommit方法,在事务成功提交后执行发送消息操作,确保在数据库操作成功且事务稳定的情况下发送消息。@Transactional
public void saveWithMessage(BusinessDO businessDO){
xxxRepository.save(businessDO);
eventPublisher.publishEvent(new UserCreatedEvent(registerUser));
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleUserRegisteredEvent(UserCreatedEvent userCreatedEvent) {
rocketMQTemplate.syncSend("test-topic", sendMessage);
}
这里需要说明一下:在默认情况下Spring的事件监听机制并不是异步的(上次群友弄错了),而是同步的将代码进行解耦,@TransactionalEventListener也是通过同步的方式,但是加入了回调的方式来解决,这样就能够控制事务进行Commited、Rollback时才进行事件的处理,来达到事务同步的目的。
@Transactional
public void saveWithMessage(BusinessDO businessDO){
TransactionMessage transactionMessage = new TransactionMessage();
transactionMessage.setStaus(MessageStatus.WAITING_SEND);
transactionMessage.setMessageKey(businessDO.getId());
...
xxxRepository.save(businessDO);
messageRepository.save(TransactionMessage);
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
messageService.sendMessage(transactionMessage,businessDO);
}
});
}
public void sendMessage(TransactionMessage transactionMessage,BusinessDO businessDO){
BusinessMessage businessMessage = new BusinessMessage();
businessMessage.setXXX();
try{
rocketMQTemplate.syncSend("test-topic", sendMessage);
transactionMessage.setStatus(MessageStatus.SUCCESS);
messageRepository.update(transactionMessage);
}catch (Exception e){
// 执行失败的业务逻辑
}
}
3.1 问题虽然这种方式能够保证消息在事务提交后发送,且能够保证最终一致性,但仍然存在一些缺陷:
首先,需要额外的消息表,增加了系统复杂度。(针对此问题,我们又可以将该功能单独提取出来,做成一个消息服务来统一处理,考虑篇幅问题,这里暂不展开。)