京东-优惠雷达
新人页面
精选商品
首月0月租体验,领12个月京东PLUS
自营热卖

Kafka 2.8.0 JAVA API基本使用

红尘几度欢颜笑 2月前   阅读数 42 0

以下测试皆在windows下进行,请根据自己情况酌情配置kafka zookeeper等环境

本人使用的是jdk11,代码中可能存在jdk9的新特性,使用jdk9以前的jdk的朋友请自行转换

kafka环境变量等暂时略过

1.java导入依赖

        <!--导入kafka依赖-->
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>2.8.0</version>
        </dependency>

kafka Producer

导入相关依赖后,创建测试类ProducerDemo;

  • 创建生产者对象

    使用KafkaProducer 创建kafka生产者对象,这时可以发现kafka不允许我们使用空构造来创建对象;

    那么我们就选用传入properties的方式创建kafka生产者

    创建生产者的时候,跟控制台命令一样,我们需要指定集群名称以及序列化器,而这些相关设置都会存储在我们的配置文件中;

    kafka给我们提供了ProducerConfig类,并在其中已经给我们提前准备好了我们所需要的key,在向properties中put键值时,可以直接使用producerConfig的静态常量作为key;并传入相应value

  • 向kafka中发送信息

    使用kafkaProducer向kafka中发送信息,可以使用其提供的send()方法 ;使用时可以看到其需要传入ProducerRecord以及一个可选的Callback

    ProducerRecord: 即为每条数据所封装成的对象

    CallBack:可选;获取函数的回调

  • close()

    在真实生产环境中,我们可能不需要手动调用close方法关闭kafkaProducer,但是目前的测试阶段,如果不使用close关闭,可能会导致发送的信息在设置等待的时间内,不会被真正的发送;

    流在关闭的时候会对数据进行回收操作

/**
 * 描述:kafkaProducer生产者
 *
 * <pre>
 * HISTORY
 * ****************************************************************************
 *  ID     DATE          PERSON          REASON
 *  1      2021/8/10 23:14    Bambi        Create
 * ****************************************************************************
 * </pre>
 *
 * @author Bambi
 * @since 1.0
 */
public class ProducerPartitionerDemo01 {
    public static void main(String[] args) {
        
        Properties properties = new Properties();
        //自行修改为对应的集群地址 kafka默认为9092,此处我没有更改
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");   
        //需要传入序列化器的全类名,kafka需要通过反射全类名去获取序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
​
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        for (int i = 0; i < 10; i++) {
            //使用callBack收集回调信息,使用了lamdba表达式
            kafkaProducer.send(new ProducerRecord("此处使用自己存在的主题","value"),((metadata, exception) ->{
                if(exception==null){
                    System.out.println("没有错误,数据添加成功");
                }
            } ));
        }
        //关闭
        kafkaProducer.close();
    }
}

自定义分区器

如果想要自己根据业务需求编写自定义的分区规则,可以自定义分区器;

说到自定义,就势必需要去实现某个接口或者继承某个类

这里, 我们需要实现的是kafka给我们的Partitioner接口,实现后重写方法

/**
 * 描述: 自定义分区器
 *
 * <pre>
 * HISTORY
 * ****************************************************************************
 *  ID     DATE          PERSON          REASON
 *  1      2021/8/10 22:24    Bambi        Create
 * ****************************************************************************
 * </pre>
 *
 * @author Bambi
 * @since 1.0
 */
public class MyPartitioner implements Partitioner {
​
    /**
     * 编写分区规则
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //根据业务需求编写分区规则
        return 0;
    }
​
    @Override
    public void close() {
​
    }
​
    /**
     * 读取配置信息
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {
​
    }
}

在编写规则时可以参考Kafka对Partitioner的默认实现 DefaultPartitioner

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                         int numPartitions) {
        //如果key也不存在,则会对可用分区进行轮询
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        }
        //如果没有指定分区,且存在key值,则会根据key的hash进行取模来选择分区
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

实现同步发送

正常情况下kafka生产者发送信息采用的是异步发送的方式,主线程将信息发送给共享变量 RecordAccumulator ,Sender线程不同的从共享变量中拉取数据发送到broker上;

实现逻辑

在两个线程其中一个执行的时候去阻塞另一个线程,实现串行

我们可以发现kafkaProducer的send()方法是存在返回值 Future 的;

而我们知道,当future对象调用get()方法时,不仅会获得当前线程回调的对象,还会阻塞当前线程

我们便使用这个方法来实现同步发送

同步发送的使用场景相对较少,我们可以使用同步发送来确保区内有序,即当上一条信息发送后,未接收到ack之前,阻塞发送线程,不继续发送,从而实现有序


消费者API

编写消费者api的逻辑与生产者十分的相像,使用kafka提供的 KafkaConsumer 来创建消费者对象

并在配置文件中传递对应信息,可以使用ConsumerConfig中的静态属性充当key值

/**
 * 描述:kafka消费者
 *
 *
 * <pre>
 * HISTORY
 * ****************************************************************************
 *  ID     DATE          PERSON          REASON
 *  1      2021/8/11 0:21    Bambi        Create
 * ****************************************************************************
 * </pre>
 *
 * @author Bambi
 * @since 1.0
 */
public class ConsumerTest {
​
    public static void main(String[] args) {
        Properties properties = new Properties();
        //连接的集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //生产者需要指定序列化器,那么消费者就需要指定对应的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
​
        //自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        //自动提交的延迟,提交的是消费者的offset
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
​
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"consumerGroup01");
​
        //创建消费者
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        //订阅主题
        //此处可以添加多个集群
        //可以看到这里没有返回值,也就是说,这里只是单纯的指定了主题,如果想获取主题中的信息,需要使用别的方法
        kafkaConsumer.subscribe(List.of("你的主题"));
​
​
        while (true){
            //获取的类型与Producer类似,不过为ConsumerRecords类,想要得到单个数据,需要遍历输出
            //新版本建议使用传入Duration的方式,直接传入毫秒数的方式以过时
            ConsumerRecords<String,String> consumerRecords = kafkaConsumer.poll(Duration.ZERO);
            consumerRecords.forEach(stringStringConsumerRecord -> {
                //可以看到,使用consumerRecord去调用方法的时候,可以获取到Key,所以key并不只是用来划分分区之用,如果没有指定key,会输出null
                System.out.println(stringStringConsumerRecord.key()+":"+stringStringConsumerRecord.value()+"    :"+stringStringConsumerRecord.offset());
            });
        }
        //consumer进行订阅拉去信息的时候不需要手动关闭,因为顺序执行完毕后,jvm会关闭;所以可以使用一个while循环来持续消费
    }
}

启动消费者,会发现我们可以连接到对应的主题,但是不会获取到先前已经存在的信息;

在我们使用控制台调用消费者时,如果我们想获取该分区已经存在的信息,我们可以使用 --from-beginning指令将offset放到最前端从头获取;

java api中也是一样;

kafka中 命令行能做的事情,在配置文件中应该都有相关的配置

我们进入ConsumerConfig,可以查看到其已经给我们提供了 AUTO_OFFSET_RESET_CONFIG这个属性;

根据下方的doc描述,该属性默认值为lastest,这也是我们为什么在不设置的时候会无法获取已存在信息的原因,我们可以手动在配置文件中传入

earierlast

此处注意这个指定的生效条件:

  • 只有当当前消费者/消费者组第一次消费(即还没有offset时),或当前的offset在这个server中不存在时,指令才会生效

    这里解释一下为什么会不存在,kafka的数据默认时7天清空一次,如果我们拿着已经清空的数据的offset去寻找数据,就会出现offset在server中不存在的现象,此时AUTO_OFFSET_RESET_CONFIG就会生效

关于offset的手动提交

我们为什么需要手动提交? 自动提交无法保证准确的提交时机

  • 如果设置的提交延时过短,会丢是数据

  • 如果设置的延时过长,会导致数据重复

1.在配置文件中关闭自动提交

既然我们需要手动提交,则必然需要在配置文件中将自动提交置为false

ENABLE_AUTO_COMMIT_CONFIG,<----将它改成false

  1. 在消费结束后进行手动提交

    使用consumer的 commitSync() 同步提交,或commitAsync() 异步提交

    • commitSyn:

      相比于异步提交,因为其提交offset时自带失败重试的机制,相对更加可靠

    • commitAsync:

      同步提交相对可靠,但是会阻塞当先线程,影响吞吐量;

      在大多数情况下,我们会选用异步提交的方式

自定义存储offset

手动提交虽然可以解决丢是数据的问题,但是仍然会存在数据重复的现象;

kafka也早已考虑到这种情况,所以允许我们自定义存储offset的规则,(比如我们可以和MySQL的写入操作进行事务绑定...)

但是相对于自定义分区器,自定义存储offset要相对麻烦一些;在0.9版本之后,kafka会将offset暂存在kafka内置的一个主题中,想要去维护一个offset,就需要考虑到消费者的Rebalance问题

即,如果当前消费者所消费的分区挂掉了,消费者需要转移到另一分区去消费,此时的offset需要定位到这个分区最近提交的offset

为此,我们需要实现kafka提供的ConsumerRebalanceListener

/**
 * 描述: 自定义存储Offset
 *
 * <pre>
 * HISTORY
 * ****************************************************************************
 *  ID     DATE          PERSON          REASON
 *  1      2021/8/11 23:26    Bambi        Create
 * ****************************************************************************
 * </pre>
 *
 * @author Bambi
 * @since 1.0
 */
public class ConsumerConfigOffset {
    //创建一个Map在暂存当前offset
    private static Map<TopicPartition,Long> currentOffset = new ConcurrentHashMap<>();
    
    public static void main(String[] args) {
        //配置文件较为冗长,我写了个工具类进行配置,相关配置内容已经提到过,就不再赘述
        PropertiesUtils propertiesUtils = new PropertiesUtils();
        Properties properties = propertiesUtils.ConsumerProperties("localhost:9092", "bambiOffset", "false", "100", 1);
​
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
​
        //在此处创建ConsumerRebalanceListener类
        kafkaConsumer.subscribe(List.of("solo1"), new ConsumerRebalanceListener() {
            //在Rebalance之前调用
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                commitOffset(currentOffset);
            }
​
            //在Rebalance之后调用
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                currentOffset.clear();
                partitions.forEach(partition -> {
                    //定位到分区中最近的offset,继续消费
                    kafkaConsumer.seek(partition,getOffset(partition));
                });
            }
        });
​
        while (true){
            ConsumerRecords<String,String> poll = kafkaConsumer.poll(Duration.ofMillis(1000));
            poll.forEach(consumerRecord ->{
                System.out.printf("offset =  %d %n",consumerRecord.offset());
                System.out.printf("key = %s %n",consumerRecord.key());
                System.out.printf("value = %s %n",consumerRecord.value());
​
                //将下标缓存到offset中
                currentOffset.put(new TopicPartition(consumerRecord.topic(),consumerRecord.partition()),consumerRecord.offset());
            });
            commitOffset(currentOffset);
        }
    }
​
    /**
     * 提交当前offset
     * @param currentOffset
     */
    private static void commitOffset(Map<TopicPartition , Long> currentOffset){
        //处理异步提交的业务逻辑
    }
​
    //获取当前分区的offset
    private static long getOffset(TopicPartition partition){
        return 0;
    }
}


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: