闽公网安备 35020302035485号
4.消息积压:如果消费者处理速度跟不上生产者的发送速率,就会导致消息积压。优化消费者性能、增加消费者实例数量、合理调整队列的预取参数等措施有助于缓解这一情况。
WaitForAll RequiredAcks = -1:当所有的Leader和Follower都接收成功时,才会返回。
设置为-1,可以很好的确认Kafka集群是否已经完成消息的接收和本地化存储,并且可以在Producer发送失败时进行重试。
@Configuration
@EnableKafka
public class ManualConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.topic.manual}")
private String topic;
/**
* 堆代码 duidaima.com
* RECORD:每处理一条commit一次
* BATCH(默认):每次poll的时候批量提交一次,频率取决于每次poll的调用频率
* TIME:每次间隔ackTime的时间去commit
* COUNT:累积达到ackCount次的ack去commit
* COUNT_TIME:ackTime或ackCount哪个条件先满足,就commit
* MANUAL:listener负责ack,但是背后也是批量上去
* MANUAL_IMMEDIATE:listner负责ack,每调用一次,就立即commit
**/
@Bean
public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-group");
// 手动提交
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
@Component
public class ManualConsumer {
@KafkaListener(topics = "${kafka.topic.manual}", containerFactory = "manualKafkaListenerContainerFactory")
public void receive(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
Consumer consumer,
Acknowledgment ack) {
System.out.println(String.format("From partition %d : %s", partition, message));
// 同步提交
consumer.commitSync();
}
}
重复消费这个值默认是0,意思就是消息必须立即被发送,但是这是不对的。一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch批次,如果100毫秒内,这个batch批次满了16kb,自然就会发送出去。但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。
3.消费端弄丢了数,。刚消费到,还没处理,结果进程挂了,比如重启了。
@Configuration
public class RabbitMQConfig {
/**
* 配置事务管理器
*/
@Bean
public RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
}
@Service
public class RabbitMQServiceImpl {
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional // 事务注解
public void sendMessage() {
// 开启事务
rabbitTemplate.setChannelTransacted(true);
// 发送消息
rabbitTemplate.convertAndSend(RabbitMQConfig.Direct_Exchange, routingKey, message);
}
}
发送方确认机制,保证消息能从交换机路由到指定队列spring:
rabbitmq:
publisher-confirm-type: correlated # 开启发送方确认机制
publisher-returns: true # 开启消息返回
template:
mandatory: true # 消息投递失败返回客户端
@Component
@Slf4j
public class RabbitConfirmConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate(){
rabbitTemplate.setConfirmCallback(this);
//设置回退消息交给谁处理
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id=correlationData!=null?correlationData.getId():"";
if(ack){
log.info("交换机已经收到 id 为:{}的消息",id);
}else{
log.info("交换机还未收到 id 为:{}消息,原因:{}",id,cause);
}
}
//当消息无法路由的时候的回调方法
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
Message message = returnedMessage.getMessage();
String exchange = returnedMessage.getExchange();
String replyText = returnedMessage.getReplyText();
final String routingKey = returnedMessage.getRoutingKey();
log.error(" 消息 {}, 被交换机 {} 退回,退回原因 :{}, 路由 key:{}",new String(message.getBody()),exchange,replyText,routingKey);
}
}
保证消息在 RabbitMQ Server 中的持久化/**
* 消息队列
*/
@Bean
public Queue queue() {
// 四个参数:name(队列名)、durable(持久化)、 exclusive(独占)、autoDelete(自动删除)
return new Queue(MESSAGE_QUEUE, true);
}
/**
* 直接交换机
*/
@Bean
public DirectExchange exchange() {
// 四个参数:name(交换机名)、durable(持久化)、autoDelete(自动删除)、arguments(额外参数)
return new DirectExchange(Direct_Exchange, true, false);
}
消费者spring:
rabbitmq:
publisher-confirm-type: correlated # 开启发送方确认机制
publisher-returns: true # 开启消息返回
template:
mandatory: true # 消息投递失败返回客户端
listener:
simple:
acknowledge-mode: manual # 开启手动确认消费机制
# retry:
# enabled: true # 是否开启消费者重试(为false时关闭消费者重试,意思不是不重试,而是一直收到消息直到ack确认或者一直到超时)
# max-attempts: 5 # 最大重试次数,代码中不能使用try/catch捕获异常,否则重试机制失效
# max-interval: 5000 # 重试间隔时间(单位毫秒)
@Slf4j
@Component
public class ReceiveHandler {
@Autowired
private RabbitTemplate rabbitTemplate;
//监听user队列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_USER})
public void receive_user(String msg, Message message, Channel channel) throws Exception{
String msg_id = message.getMessageProperties().getMessageId();
// TODO 判断消息是否被消费
try {
int i = 1/0;
/**
* channel.basicAck(deliveryTag,multiple)
* deliveryTag:消息的index
* multiple:是否批量-true将一次性确认ack小于deliveryTag的消息
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
// TODO 被消费存到一个地方
}catch (Exception e){
/**
* channel.basicNack(deliveryTag,multiple,requeue)
* deliveryTag:消息的index
* multiple:是否批量-true将一次性拒绝所有ack小于deliveryTag的消息
* requeue:是否重新入队列
*/
if(CommonConstants.mess_id_map.getOrDefault(msg_id,0) <= 3){
Integer count = CommonConstants.mess_id_map.getOrDefault(msg_id,1);
CommonConstants.mess_id_map.put(msg_id,++count);
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
log.info("当前时间:{},次数:{}", new Date(),count);
}else{
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
rabbitTemplate.convertAndSend(RabbitmqConfig.DELAYED_EXCHANGE_USER, "delayed.user", msg,
correlationData ->{
correlationData.getMessageProperties().setDelay(1000 * 10);
return correlationData;
});
}
// 触发重试机制
// throw e;
}
}
@RabbitListener(queues = {RabbitmqConfig.DELAYED_QUEUE_USER})
public void receive_delayed_user(String msg, Message message, Channel channel) throws Exception{
log.info("延迟队列:当前时间:{},消息:{}", new Date(),msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
private Queue creatQueue(String name) {
// 创建一个 单活模式 队列
HashMap<String, Object> args = new HashMap<>();
args.put("x-single-active-consumer", true);
return new Queue(name, true, false, false, args);
}
利用消息的路由键(Routing Key):将相关的消息通过相同的路由键发送到同一个队列,从而确保消息在队列中保持顺序。基于spring-cloud-starter-stream-rabbit实现分区消费。
使用延迟队列:可以使用延迟队列来实现消息的延时处理,将消息发送到延迟队列中,然后在指定的时间后再进行消费,从而避免消息的积压。
// 同步设置重试次数 producer.setRetryTimesWhenSendFailed(3) // 异步设置重试次数 producer.setRetryTimesWhenSendAsyncFailed(3); // 如果发送失败,是否尝试发送到其他 Broker 节点 producer.setRetryAnotherBrokerWhenNotStoreOK(true);消费者