rabbitmq: host: 127.0.0.1 port: 5672 username: username password: password virtual-host: /test publisher-confirm-type: correlated # 配置启用confirm机制2.使用RabbitMQConfig创建业务队列和对应死信队列
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; @Configuration public class RabbitMQConfig { public static final String DELAY_EXCHANGE_NAME = "delay.business.exchange"; public static final String DELAY_QUEUE_NAME = "delay.business.queue"; public static final String DELAY_QUEUE_ROUTING_KEY = "delay.business.queue.routingKey"; public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange"; public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue"; public static final String DEAD_LETTER_QUEUE_ROUTING_KEY = "dead.letter.queue.routingKey"; // 堆代码 duidaima.com // 声明延迟队列交换机 @Bean("delayExchange") public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE_NAME); } // 声明死信队列交换机 @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME); } // 声明延时队列 @Bean("delayQueue") public Queue delayQueue(){ HashMap<String, Object> map = new HashMap<>(); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME); // x-dead-letter-routing-key 这里声明当前队列的死信路由key map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY); // 设置该队列最大消息数 map.put("x-max-length", 10); map.put("x-overflow", "reject-publish"); return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(map).build(); } // 声明死信队列 @Bean("deadLetterQueue") public Queue deadLetterQueue(){ return new Queue(DEAD_LETTER_QUEUE_NAME); } // 声明延时队列的绑定关系 @Bean public Binding delayBinding(@Qualifier("delayExchange") DirectExchange directExchange, @Qualifier("delayQueue") Queue queue){ return BindingBuilder.bind(queue).to(directExchange).with(DELAY_QUEUE_ROUTING_KEY); } // 声明死信队列的绑定关系 @Bean public Binding deadLetterBinding(@Qualifier("deadLetterExchange") DirectExchange directExchange, @Qualifier("deadLetterQueue") Queue queue){ return BindingBuilder.bind(queue).to(directExchange).with(DEAD_LETTER_QUEUE_ROUTING_KEY); } }注意,这里我们并没有设置overflow参数,所以采用的是默认配置
import com.rabbitmq.client.Channel; import com.whs.edws.config.RabbitMQConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Slf4j @Component public class MaxLengthConsumer { @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE_NAME) public void receive(Message message, Channel channel) throws IOException { String s = new String(message.getBody()); log.info("死信队列消费者接收到消息:" + s); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }4.创建测试方法发送消息
@Test void maxLengthTestPublish(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * @param correlationData 相关配置信息 * @param ack 消息队列是否成功收到消息 * @param cause 错误原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { logger.info("消息发送成功:" + correlationData.getId()); } else { logger.error("消息发送失败:" + correlationData.getId()); logger.error("错误原因:" + cause); } } }); for (int i = 0; i < 11; i++) { CorrelationData correlationData = new CorrelationData(); correlationData.setId(String.valueOf(i)); rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE_NAME, RabbitMQConfig.DELAY_QUEUE_ROUTING_KEY, String.valueOf(i), correlationData); } }5.运行结果
和官方文档说的一样,默认最旧的一条消息被放入死信队列
// 指定超过队列长度后的策略 map.put("x-overflow", "reject-publish");执行方法,打印的结果为:
2023-07-18 02:45:07.242 INFO 22328 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate : 消息发送失败:10 2023-07-18 02:45:07.242 INFO 22328 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate : 错误原因:null通过日志可以看到,最新插入的消息被丢弃了。至于cause为什么是null,我没找到原因,如果了解的朋友可以在评论里讨论一下
// 指定超过队列长度后的策略 map.put("x-overflow", "reject-publish-dlx");打印结果如下:
2023-07-18 02:49:13.246 INFO 10488 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate : 消息发送失败:10 2023-07-18 02:49:13.246 INFO 10488 --- [nectionFactory2] o.s.amqp.rabbit.core.RabbitTemplate : 错误原因:null 2023-07-18 02:49:13.252 INFO 10488 --- [ntContainer#1-1] com.whs.edws.rabbitmq.MaxLengthConsumer : 死信队列消费者接收到消息:10通过日志可以看出,最新的一条消息被拒绝且被放入死信队列