• Kafka 如何实现自定义分区器
  • 发布于 1个月前
  • 55 热度
    0 评论
默认分区器的实现
分区器的作用就是为消息分配分区

消息指定partition字段
当发送消息时指定了partition字段,则不需要经过分区器,因为消息会直接发到partition指定的分区
// 指定partition
for (int i = 0; i < 5; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>(topic,2, "不同的key","demo" + i);
    kafkaProducer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            System.out.println("消息所在分区:"+metadata.partition());
        }
    });
}
消息没有指定partition字段
如果没有指定partition字段,则需要依赖分区器,根据Key字段来计算partition值

key 为空
如key为空,消息将通过轮询的方式发往主题内各个可用分区,这就无法保证消息顺序性了,并
// 堆代码 duidaima.com
// 不指定key和分区
for (int i = 0; i < 5; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, "demo" + i);
    kafkaProducer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            System.out.println("消息所在分区:"+metadata.partition());
        }
    });
}

key不为空
当key不为空,那么默认分区器会对key进行hash,采用MurmurHash2算法,最终根据hash值计算分区号,如果指定了相同的key,消息将会发到相同的分区,但是这些分区有可能不可用,因为kafka是不会干涉你指定的分区,不关心它是否可用。

指定相同的key
// 指定相同的key
for (int i = 0; i < 5; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, "相同的key","demo" + i);
    kafkaProducer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            System.out.println("消息所在分区:"+metadata.partition());
        }
    });
}
指定不同的key
// 指定不同的key
for (int i = 0; i < 5; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, "不同的key"+i,"demo" + i);
    kafkaProducer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            System.out.println("消息所在分区:"+metadata.partition());
        }
    });
}
实现自定义分区器
默认是根据key来计算分区号

分区器的接口定义
分区器接口是org che.kafka.clients.producer intemals DefaultPartitioner,有三个重要方法
1. public int partition(String topic , Object key , byte[] keyBytes, Object value , byte[] valueBytes, Cluster cluster) :用来计算分区数值
2. close():回收一些资源
3. void configure(Map<String, ?> configs) :获取配置信息及初始化数据
需求
我想根据value值来计算分区号
代码实现
分区器实现
主要实现partition方法,实现计算逻辑
package org.example.day8;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;
import java.util.Objects;

/**
 * @Description TODO
 * @Author 堆代码 duidaima.com
 * @Date 2023/2/14 09:23
 */
public class CustomPartition implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (Objects.nonNull(value)) {
            int partitions = cluster.availablePartitionsForTopic(topic).size();
            int partition = (int) value % partitions;
            System.out.println(String.format("当前值:%s,选择分区:%s", value,partition));
            return partition;
        }
        System.out.println("使用默认分区0");
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
生产者配置修改
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class); 添加这个配置,用来配置自定义分区器,因为消息发送的value在这里指定的Integer类型,所以也需要相应的改动序列化器。
package org.example.day8;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @Description TODO
 * @Author 堆代码 duidaima.com
 * @Date 2023/2/14 09:27
 */
public class DemoCustomPartiton {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String serverList = "192.168.64.3:9092";
        String topic = "topic_test";
        Properties prop = new Properties();

        // 2.给 Kafka 配置对象添加配置信息
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList);
        // 指定分区实现配置
        prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartition.class);
        // 2.2 配置 key.serializer 和 value.serializer
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());

        // 3.创建 Kafka 生产者对象
        KafkaProducer<String, Integer> kafkaProducer = new KafkaProducer<String, Integer>(prop);
        // 4.调用 send 方法,发送消息

        for (int i = 0; i < 5; i++) {
            ProducerRecord<String, Integer> record = new ProducerRecord<String, Integer>(topic,"不同的key",i);
            kafkaProducer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.println("消息所在分区:"+metadata.partition());
                }
            });
        }
        Thread.sleep(10000);
    }
}

用户评论