4.消息积压:如果消费者处理速度跟不上生产者的发送速率,就会导致消息积压。优化消费者性能、增加消费者实例数量、合理调整队列的预取参数等措施有助于缓解这一情况。
WaitForAll RequiredAcks = -1:当所有的Leader和Follower都接收成功时,才会返回。
设置为-1,可以很好的确认Kafka集群是否已经完成消息的接收和本地化存储,并且可以在Producer发送失败时进行重试。
@Configuration @EnableKafka public class ManualConsumerConfig { @Value("${kafka.bootstrap-servers}") private String bootstrapServers; @Value("${kafka.topic.manual}") private String topic; /** * 堆代码 duidaima.com * RECORD:每处理一条commit一次 * BATCH(默认):每次poll的时候批量提交一次,频率取决于每次poll的调用频率 * TIME:每次间隔ackTime的时间去commit * COUNT:累积达到ackCount次的ack去commit * COUNT_TIME:ackTime或ackCount哪个条件先满足,就commit * MANUAL:listener负责ack,但是背后也是批量上去 * MANUAL_IMMEDIATE:listner负责ack,每调用一次,就立即commit **/ @Bean public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-group"); // 手动提交 configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps)); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } } @Component public class ManualConsumer { @KafkaListener(topics = "${kafka.topic.manual}", containerFactory = "manualKafkaListenerContainerFactory") public void receive(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Consumer consumer, Acknowledgment ack) { System.out.println(String.format("From partition %d : %s", partition, message)); // 同步提交 consumer.commitSync(); } }重复消费
这个值默认是0,意思就是消息必须立即被发送,但是这是不对的。一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch批次,如果100毫秒内,这个batch批次满了16kb,自然就会发送出去。但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。
3.消费端弄丢了数,。刚消费到,还没处理,结果进程挂了,比如重启了。
@Configuration public class RabbitMQConfig { /** * 配置事务管理器 */ @Bean public RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); } } @Service public class RabbitMQServiceImpl { @Autowired private RabbitTemplate rabbitTemplate; @Transactional // 事务注解 public void sendMessage() { // 开启事务 rabbitTemplate.setChannelTransacted(true); // 发送消息 rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message); } }发送方确认机制,保证消息能从交换机路由到指定队列
spring: rabbitmq: publisher-confirm-type: correlated # 开启发送方确认机制 publisher-returns: true # 开启消息返回 template: mandatory: true # 消息投递失败返回客户端 @Component @Slf4j public class RabbitConfirmConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void initRabbitTemplate(){ rabbitTemplate.setConfirmCallback(this); //设置回退消息交给谁处理 rabbitTemplate.setReturnsCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id=correlationData!=null?correlationData.getId():""; if(ack){ log.info("交换机已经收到 id 为:{}的消息",id); }else{ log.info("交换机还未收到 id 为:{}消息,原因:{}",id,cause); } } //当消息无法路由的时候的回调方法 @Override public void returnedMessage(ReturnedMessage returnedMessage) { Message message = returnedMessage.getMessage(); String exchange = returnedMessage.getExchange(); String replyText = returnedMessage.getReplyText(); final String routingKey = returnedMessage.getRoutingKey(); log.error(" 消息 {}, 被交换机 {} 退回,退回原因 :{}, 路由 key:{}",new String(message.getBody()),exchange,replyText,routingKey); } }保证消息在 RabbitMQ Server 中的持久化
/** * 消息队列 */ @Bean public Queue queue() { // 四个参数:name(队列名)、durable(持久化)、 exclusive(独占)、autoDelete(自动删除) return new Queue(MESSAGE_QUEUE, true); } /** * 直接交换机 */ @Bean public DirectExchange exchange() { // 四个参数:name(交换机名)、durable(持久化)、autoDelete(自动删除)、arguments(额外参数) return new DirectExchange(Direct_Exchange, true, false); }消费者
spring: rabbitmq: publisher-confirm-type: correlated # 开启发送方确认机制 publisher-returns: true # 开启消息返回 template: mandatory: true # 消息投递失败返回客户端 listener: simple: acknowledge-mode: manual # 开启手动确认消费机制 # retry: # enabled: true # 是否开启消费者重试(为false时关闭消费者重试,意思不是不重试,而是一直收到消息直到ack确认或者一直到超时) # max-attempts: 5 # 最大重试次数,代码中不能使用try/catch捕获异常,否则重试机制失效 # max-interval: 5000 # 重试间隔时间(单位毫秒) @Slf4j @Component public class ReceiveHandler { @Autowired private RabbitTemplate rabbitTemplate; //监听user队列 @RabbitListener(queues = {RabbitmqConfig.QUEUE_USER}) public void receive_user(String msg, Message message, Channel channel) throws Exception{ String msg_id = message.getMessageProperties().getMessageId(); // TODO 判断消息是否被消费 try { int i = 1/0; /** * channel.basicAck(deliveryTag,multiple) * deliveryTag:消息的index * multiple:是否批量-true将一次性确认ack小于deliveryTag的消息 */ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); // TODO 被消费存到一个地方 }catch (Exception e){ /** * channel.basicNack(deliveryTag,multiple,requeue) * deliveryTag:消息的index * multiple:是否批量-true将一次性拒绝所有ack小于deliveryTag的消息 * requeue:是否重新入队列 */ if(CommonConstants.mess_id_map.getOrDefault(msg_id,0) <= 3){ Integer count = CommonConstants.mess_id_map.getOrDefault(msg_id,1); CommonConstants.mess_id_map.put(msg_id,++count); channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); log.info("当前时间:{},次数:{}", new Date(),count); }else{ channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); rabbitTemplate.convertAndSend(RabbitmqConfig.DELAYED_EXCHANGE_USER, "delayed.user", msg, correlationData ->{ correlationData.getMessageProperties().setDelay(1000 * 10); return correlationData; }); } // 触发重试机制 // throw e; } } @RabbitListener(queues = {RabbitmqConfig.DELAYED_QUEUE_USER}) public void receive_delayed_user(String msg, Message message, Channel channel) throws Exception{ log.info("延迟队列:当前时间:{},消息:{}", new Date(),msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }
private Queue creatQueue(String name) { // 创建一个 单活模式 队列 HashMap<String, Object> args = new HashMap<>(); args.put("x-single-active-consumer", true); return new Queue(name, true, false, false, args); }利用消息的路由键(Routing Key):将相关的消息通过相同的路由键发送到同一个队列,从而确保消息在队列中保持顺序。
基于spring-cloud-starter-stream-rabbit实现分区消费。
使用延迟队列:可以使用延迟队列来实现消息的延时处理,将消息发送到延迟队列中,然后在指定的时间后再进行消费,从而避免消息的积压。
// 同步设置重试次数 producer.setRetryTimesWhenSendFailed(3) // 异步设置重试次数 producer.setRetryTimesWhenSendAsyncFailed(3); // 如果发送失败,是否尝试发送到其他 Broker 节点 producer.setRetryAnotherBrokerWhenNotStoreOK(true);消费者