消息接收方:对于消息接收方而言,我们需要解决接收消息与本地事务的原子性问题,即保证接收消息成功后,本地事务也一定执行成功。
CREATE TABLE `tx_record` ( `tx_no` varchar(64) NOT NULL COMMENT '事务Id', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', PRIMARY KEY (`tx_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='事务记录表'接下来我们要引入 RocketMQ 内置的TransactionListener接口。为了实现事务消息,开发人员的主要开发工作量就体现在对这个接口的实现过程中。TransactionListener接口的定义如下所示。
public interface TransactionListener { //当发送事务消息成功之后,该方法会被触发,本地事务将被执行 LocalTransactionState executeLocalTransaction(final Message msg, final Object arg); // 堆代码 duidaima.com //当没有收到事务消息的响应时,服务器会发送确认消息来检查事务状态,该方法会被触发并获取本地事务状态 LocalTransactionState checkLocalTransaction(final MessageExt msg); }可以看到,TransactionListener接口的两个方法分别完成了本地事务执行和本地事务回查这两个核心操作。那么我们应该如何实现这两个方法呢?这里给出这两个方法的执行伪代码。
executeLocalTransaction { 执行本地事务 如果失败就选择回滚事务,反之提交事务 } checkLocalTransaction { 实现事务回查 根据事务执行记录判断,已执行则提交事务 }注意:这两个方法需要消息的发布者来实现,但调用方是 RocketMQ 自身,而且这个调用过程是自动触发的,不需要开发做任何干预。
@Service public class CustomerTicketServiceImpl implements ICustomerTicketService { @Autowired TxRecordMapper txRecordMapper; @Autowired RocketMQTemplate rocketMQTemplate; @Override public void generateTicket(AddCustomerTicketReqVO addCustomerTicketReqVO) { //从VO中创建TicketGeneratedEvent TicketGeneratedEvent ticketGeneratedEvent = createTicketGeneratedEvent(addCustomerTicketReqVO); //将Event转化为JSON对象 JSONObject jsonObject =new JSONObject(); jsonObject.put("ticketGeneratedEvent",ticketGeneratedEvent); String jsonString = jsonObject.toJSONString(); //生成消息对象 Message<String> message = MessageBuilder.withPayload(jsonString).build(); //发送事务消息 rocketMQTemplate.sendMessageInTransaction("producer_group_ticket","topic_ticket",message,null); } @Override @Transactional public void doGenerateTicket(TicketGeneratedEvent ticketGeneratedEvent) { //幂等判断 if(Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()))){ return ; } //插入工单 CustomerTicket customerTicket = CustomerTicketConverter.INSTANCE.convertEvent(ticketGeneratedEvent); customerTicket.setStatus(1); save(customerTicket); //添加事务日志 txRecordMapper.addTxRecord(ticketGeneratedEvent.getTxNo()); } ... }上述代码展示的是一个插入客服工单(CustomerTicket)的过程,generateTicket和doGenerateTicket方法分别对应图 3 中的发送消息和执行本地事务这两个环节。
@Component @RocketMQTransactionListener(txProducerGroup = "producer_group_ticket") public class ProducerListener implements RocketMQLocalTransactionListener { @Autowired ICustomerTicketService customerTicketService; @Autowired TxRecordMapper txRecordMapper; //事务消息发送后的回调方法,当消息发送给MQ成功,此方法被回调 @Override @Transactional public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { try { //解析消息,转成Event对象 TicketGeneratedEvent ticketGeneratedEvent = convertEvent(message); //执行本地事务 customerTicketService.doGenerateTicket(ticketGeneratedEvent); //当返回RocketMQLocalTransactionState.COMMIT,自动向MQ发送commit消息,MQ将消息的状态改为可消费 return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { e.printStackTrace(); //如果本地事务执行失败,就将消息设置为回滚状态 return RocketMQLocalTransactionState.ROLLBACK; } } //事务状态回查 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { //解析消息,转成Event对象 TicketGeneratedEvent ticketGeneratedEvent = convertEvent(message); //根据事务Id判断是否存在已执行的事务 Boolean isTxNoExisted = Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo())); //如果事务已执行则返回COMMIT,反之返回UNKNOWN状态 if(isTxNoExisted){ return RocketMQLocalTransactionState.COMMIT; }else{ return RocketMQLocalTransactionState.UNKNOWN; } } ... }这段代码清晰地展示了TransactionListener接口中两个核心方法的实现过程。在executeLocalTransaction方法中,我们通过调用CustomerTicketService业务服务类的doGenerateTicket方法完成了本地事务;而在checkLocalTransaction方法中,我们则实现了事务回查机制。这里同样展示了事务执行记录表的另一种应用场景,即实现事务回查。
@Component @RocketMQMessageListener(consumerGroup = "consumer_group_ticket",topic = "topic_ticket") public class Consumer implements RocketMQListener<String> { @Autowired IChatRecordService chatRecordService; //接收消息 @Override public void onMessage(String message) { log.info("开始消费消息:{}",message); //解析消息 JSONObject jsonObject = JSONObject.parseObject(message); String ticketGeneratedEventString = jsonObject.getString("ticketGeneratedEvent"); //转成TicketGeneratedEvent TicketGeneratedEvent ticketGeneratedEvent = JSONObject.parseObject(ticketGeneratedEventString, TicketGeneratedEvent.class); //添加本地聊天记录 chatRecordService.generateChatRecord(ticketGeneratedEvent); } }可以看到,这个消息消费者的实现过程没有任何特殊之处,我们只需要实现RocketMQListener接口的onMessage方法,并在该方法中调用业务服务实现类中的业务方法即可。消费者端的业务服务实现类的实现过程如下。
@Service public class ChatRecordServiceImpl implements IChatRecordService { @Autowired TxRecordMapper txRecordMapper; @Override @Transactional public void generateChatRecord(TicketGeneratedEvent ticketGeneratedEvent) { //幂等判断 if(Objects.nonNull(txRecordMapper.findTxRecordByTxNo(ticketGeneratedEvent.getTxNo()))){ return ; } //插入聊天记录 ChatRecord chatRecord = ChatRecordConverter.INSTANCE.convertEvent(ticketGeneratedEvent); save(chatRecord); //添加事务日志 txRecordMapper.addTxRecord(ticketGeneratedEvent.getTxNo()); } }这里同样通过事务执行记录表实现了业务层幂等控制,并最终完成本地事务的提交。