默认分区器的实现
分区器的作用就是为消息分配分区
消息指定partition字段
当发送消息时指定了partition字段,则不需要经过分区器,因为消息会直接发到partition指定的分区
// 指定partition
for (int i = 0; i < 5; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic,2, "不同的key","demo" + i);
kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("消息所在分区:"+metadata.partition());
}
});
}
消息没有指定partition字段
如果没有指定partition字段,则需要依赖分区器,根据Key字段来计算partition值
key 为空
如key为空,消息将通过轮询的方式发往主题内各个可用分区,这就无法保证消息顺序性了,并
// 堆代码 duidaima.com
// 不指定key和分区
for (int i = 0; i < 5; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "demo" + i);
kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("消息所在分区:"+metadata.partition());
}
});
}
key不为空
当key不为空,那么默认分区器会对key进行hash,采用MurmurHash2算法,最终根据hash值计算分区号,如果指定了相同的key,消息将会发到相同的分区,但是这些分区有可能不可用,因为kafka是不会干涉你指定的分区,不关心它是否可用。
指定相同的key
// 指定相同的key
for (int i = 0; i < 5; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "相同的key","demo" + i);
kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("消息所在分区:"+metadata.partition());
}
});
}
指定不同的key
// 指定不同的key
for (int i = 0; i < 5; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "不同的key"+i,"demo" + i);
kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("消息所在分区:"+metadata.partition());
}
});
}
实现自定义分区器
默认是根据key来计算分区号
分区器的接口定义
分区器接口是org che.kafka.clients.producer intemals DefaultPartitioner,有三个重要方法
1. public int partition(String topic , Object key , byte[] keyBytes, Object value , byte[] valueBytes, Cluster cluster) :用来计算分区数值
2. close():回收一些资源
3. void configure(Map<String, ?> configs) :获取配置信息及初始化数据
需求
我想根据value值来计算分区号
代码实现
分区器实现
主要实现partition方法,实现计算逻辑
package org.example.day8;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
import java.util.Objects;
/**
* @Description TODO
* @Author 堆代码 duidaima.com
* @Date 2023/2/14 09:23
*/
public class CustomPartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (Objects.nonNull(value)) {
int partitions = cluster.availablePartitionsForTopic(topic).size();
int partition = (int) value % partitions;
System.out.println(String.format("当前值:%s,选择分区:%s", value,partition));
return partition;
}
System.out.println("使用默认分区0");
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
生产者配置修改
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class); 添加这个配置,用来配置自定义分区器,因为消息发送的value在这里指定的Integer类型,所以也需要相应的改动序列化器。
package org.example.day8;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @Description TODO
* @Author 堆代码 duidaima.com
* @Date 2023/2/14 09:27
*/
public class DemoCustomPartiton {
public static void main(String[] args) throws ExecutionException, InterruptedException {
String serverList = "192.168.64.3:9092";
String topic = "topic_test";
Properties prop = new Properties();
// 2.给 Kafka 配置对象添加配置信息
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList);
// 指定分区实现配置
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class);
// 2.2 配置 key.serializer 和 value.serializer
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
// 3.创建 Kafka 生产者对象
KafkaProducer<String, Integer> kafkaProducer = new KafkaProducer<String, Integer>(prop);
// 4.调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
ProducerRecord<String, Integer> record = new ProducerRecord<String, Integer>(topic,"不同的key",i);
kafkaProducer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("消息所在分区:"+metadata.partition());
}
});
}
Thread.sleep(10000);
}
}