闽公网安备 35020302035485号
今天我给为大家讲讲RabbitMQ中的一个关键问题 - 消息重复消费。我们会了解什么是消息重复消费问题,如何保证消息的幂等性,并讲一讲对非幂等性操作可能带来的数据不一致性问题以及如何解决这些问题。
在分布式系统中,消息队列扮演着重要的角色,它能够实现系统间的解耦和异步通信。然而,由于网络、硬件和服务故障等原因,消息在传递过程中可能会发生重复消费的情况。这会导致系统中的数据出现不一致性,并产生预期之外的副作用。
2.幂等操作设计: 我们需要设计操作具备幂等性,也就是说,无论我们对相同的消息进行多次操作,最终的结果都应该是一样的。这可能需要对数据和业务逻辑进行合理的设计,例如使用乐观锁、唯一索引或者版本控制等手段。
如果我们对非幂等性的操作多次消费消息,将会引发数据不一致性的问题。这是因为每次操作都会对数据进行修改,导致最终结果与预期不符。例如,扣除用户余额是一个非幂等性操作,如果多次消费相同的消息,用户的余额将会被重复扣除,导致错误的结果。
在消息发送时,为消息设置一定的过期时间,超过该时间的消息将自动过期。这样即便消息被重复消费,也不会对系统产生实质性的影响。
通过将消息ID储存到缓存中,消费者可以在处理每条消息之前,快速检查是否已经处理过该消息,从而实现消息去重。使用缓存能够提供高效的查询和插入操作,并且具备较快的响应速度,适用于高并发的场景。
需要注意的是,缓存中的消息ID可能会占用一定的存储空间,因此需要根据实际情况设置合理的过期时间和缓存大小,以及定期清理不需要的数据,以避免存储空间的过度占用。
@SpringBootTest
@Slf4j
public class SpringAmqpTest {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* @throws Exception
*/
@Test
public void sendOrderInfoDesc() throws Exception {
//模拟发送订单信息给消费者扣减库存
//订单参数
Order order = Order.builder()
.id(15L)
.goodsId(9L)
.number(2)
.build();
String json = JSON.toJSONString(order);
//消息id 保证消息的唯一性
String messageId = UUID.randomUUID().toString().replace("-", "");
//设置消息id
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(messageId);
// 创建消息对象
Message message = new Message(json.getBytes(), messageProperties);
// 发送消息
rabbitTemplate.send("order.direct", "test", message);
}
}
消费者/**
*
* @ 堆代码 duidaima.com
*/
@Configuration
public class RabbitConfig {
//声明队列
@Bean
public Queue errorQueue(){
return QueueBuilder.durable("order.test.queue").build();
}
//声明交换机
@Bean
public DirectExchange errorDirect(){
return ExchangeBuilder.directExchange("order.direct").build();
}
//绑定队列与交换机
@Bean
public Binding errorQueueToErrorDirect(Queue errorQueue, DirectExchange errorDirect){
return BindingBuilder.bind(errorQueue).to(errorDirect).with("test");
}
}
@Component
@Slf4j
public class MessageListener {
@Autowired
private RedisTemplate redisTemplate;
@RabbitListener(queues = "order.test.queue")
public void listenerGoodsQueue(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long tag) {
String messageId = message.getMessageProperties().getMessageId();
if (Objects.nonNull(messageId)){
//在Redis中设置缓存键,如果已存在,则表示已处理过该消息
boolean flag = redisTemplate.opsForValue().setIfAbsent(messageId, "0",30, TimeUnit.SECONDS);
if (flag) {
//缓存设置成功
String value = String.valueOf(redisTemplate.opsForValue().get(messageId));
log.warn("redis缓存设置成功--->key={},value={}",messageId,value);
try {
String json = new String(message.getBody());
Order order = JSON.parseObject(json, Order.class);
log.warn("处理消息----> 订单id:" + order.getId() + " 商品id:" + order.getGoodsId() + " 扣减库存数量:" + order.getNumber());
//消息处理完毕将缓存中messageId的值设置为1
redisTemplate.opsForValue().set(messageId, "1");
value = String.valueOf(redisTemplate.opsForValue().get(messageId));
log.warn("消息处理完毕修改缓存messageId的值为:{}",value);
//消息处理完成 手动ack
channel.basicAck(tag, false);
} catch (IOException e) {
e.printStackTrace();
}
} else {
//缓存设置失败
//获取缓存中messageId的值 判断是0还是1
String value = String.valueOf(redisTemplate.opsForValue().get(messageId));
if ("1".equals(value)) {
// 缓存中的值为1,表示消息已被处理过,手动签收返回ack
try {
channel.basicAck(tag, false);
} catch (IOException e) {
e.printStackTrace();
}
}
//如果是0 就什么都不做
}
}
}
}