闽公网安备 35020302035485号


对于主键相同的数据,kafka 是不会重复持久化的,它只会接收一条。
3.2 read_committed:消费端应用只能消费到提交的事务内的消息。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
Properties prodcuerProps = new Properties();
// 堆代码 duidaima.com
// kafka地址
prodcuerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
// key序列化
prodcuerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value序列化
prodcuerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 启用幂等性
producerProps.put("enable.idempotence", "true");
// 设置事务id
producerProps.put("transactional.id", "prod-1");
KafkaProducer<String, String> producer = new KafkaProducer(prodcuerProps);
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put("group.id", "my-group-id");
// 设置consumer手动提交
consumerProps.put("enable.auto.commit", "false");
// 设置隔离级别,读取事务已提交的消息
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
//订阅主题
consumer.subscribe(Collections.singletonList("topic1"));
enable.auto.commit=false,设置手动提交消费者offset// 初始化事务
producer.initTransactions();
while(true) {
// 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
if(!records.isEmpty()){
// 准备一个 hashmap 来记录:"分区-消费位移" 键值对
HashMap<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
// 开启事务
producer.beginTransaction();
try {
// 获取本批消息中所有的分区
Set<TopicPartition> partitions = records.partitions();
// 遍历每个分区
for (TopicPartition partition : partitions) {
// 获取该分区的消息
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
// 遍历每条消息
for (ConsumerRecord<String, String> record : partitionRecords) {
// 执行数据的业务处理逻辑
ProducerRecord<String, String> outRecord = new ProducerRecord<>("topic2", record.key(), record.value().toUpperCase());
// 将处理结果写入 kafka
producer.send(outRecord);
}
// 将处理完的本分区对应的消费位移记录到 hashmap 中
long offset = partitionRecords.get(partitionRecords.size() - 1).offset();
// 事务提交的是即将到来的偏移量,这意味着我们需要加 1
offsetsMap.put(partition,new OffsetAndMetadata(offset+1));
}
// 向事务管理器提交消费位移
producer.sendOffsetsToTransaction(offsetsMap,"groupid");
// 提交事务
producer.commitTransaction();
} catch(Exeception e) {
e.printStackTrace();
// 终止事务
producer.abortTransaction();
}
}
}
initTransactions(): 初始化事务abortTransaction(): 放弃事务

7.持久化第6步中的事务成功或者失败的信息, 如果kafka broker配置max.transaction.timeout.ms之前既不提交也不中止事务, kafka broker将中止事务本身。 此属性的默认值为 15 分钟。