京东-优惠雷达
新人页面
精选商品
首月0月租体验,领12个月京东PLUS
自营热卖

初步认识Kafka

浅笑、念伊人 1年前   阅读数 146 0

1.什么是Kafka

Apache Kafka是一个 开源的分布式消息队列 (生产者消费者模式) Apache Kafka 目标:构建企业中统一的. 高通量的,低延时的消息平台 ; 大多的消息队列是基于JMS标准实现的Apache Kafka l类似于JMS的实现.

2.Kafka的特点

作为缓冲(流量消减),来异构,解耦系统

3.基本架构

Kafka Cluster:由多个服务器组成。每个服务器单独的名字broker(掮客)。

kafka broker:kafka集群中包含的服务器

Kafka Producer:消息生产者、发布消息到 kafka 集群的终端或服务。

Kafka consumer:消息消费者、负责消费数据。

Kafka Topic: 主题,一类消息的名称。存储数据时将一类数据存放在某个topic下,消费数据也是消费一类数据。

​ 订单系统:创建一个topic,叫做order。

​ 用户系统:创建一个topic,叫做user。

​ 商品系统:创建一个topic,叫做product。

 

注意:Kafka的元数据都是存放在zookeeper中。

4.Kafka的基本使用

4.1使用脚本操作Kafka

1) 创建一个topic

./kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic order

2) 使用Kafka 自带一个命令客户端启动一个生产者,生产数据
./kafka-console-producer.sh --broker-list node01:9092 --topic order

3) 使用Kafka自带一个命令客户端启动一个消费者,消费数据

./kafka-console-consumer.sh --bootstrap-server node01:9092  --topic order
该消费语句,只能获取最新的数据,要想历史数据,需要添加选项-

4)查看有哪些topic

./kafka-topics.sh --list --zookeeper node01:2181

5) 查看某一个具体的Topic的详细信息

./kafka-topics.sh --describe --topic order --zookeeper node01:2181

6) 删除topic

./kafka-topics.sh --delete --topic order --zookeeper node01:2181

4.2 使用java API 操作kafka

第一步: 添加kafka相关的依赖

<dependency>     
    <groupId>org.apache.kafka</groupId>     
    <artifactId>kafka-clients</artifactId>     
    <version>0.11.0.1</version>   
</dependency>

4.2.1 编写生产者 :

// kakfa的生产者
public class KafkaProducerTest {


    public static void main(String[] args) {

        //1. 创建 kafka的生产者的对象

        Properties props = new Properties();

        props.put("bootstrap.servers", "192.168.72.141:9092,192.168.72.142:9092,192.168.72.143:9092");
        props.put("acks", "all");  // 消息确认机制 : all 最高级别, 保证数据不会丢失
        props.put("retries", 0); // 重试 : 0 表示发送失败, 不会重试
        props.put("batch.size", 16384);  // 发送数据时候 一批数据的大小    默认值: 16384字节(16)
        props.put("linger.ms", 1);    // 每次发送数据间隔时间
        props.put("buffer.memory", 33554432); //  缓存池的大小: 默认值:  33554432   32M
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 效率, java的序列化慢
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);
        //2. 发送消息
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("order", "我是JavaAPI发过来...");
        producer.send(record);
        //3. 关闭资源

        producer.close();
    }
}

4.2.2 编写消费者

package com.itheima.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

// kafka的消费者
public class KafkaConsumerTest {


    public static void main(String[] args) {


        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.72.141:9092,192.168.72.142:9092,192.168.72.143:9092");
        props.put("group.id", "test"); // 组id号
        props.put("enable.auto.commit", "true");  // 开启自动提交
        props.put("auto.commit.interval.ms", "1000"); //  每隔多长时间自动提交一次
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //1. 创建 kafka的消费者的对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        //2.  让消费者订阅一个topic
        consumer.subscribe(Arrays.asList("order"));

        //3. 获取消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100); // 取出元素,
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            }
        }

    }
}

5 Apache Kafka原理

5.1 分片与副本机制 :

​ 分片机制:主要解决了单台服务器存储容量有限的问题

​ 当数据量非常大的时候,一个服务器存放不了,就将数据分成两个或者多个部分,存放在多台服务器上。每个服务器上的数据,叫做一个分片

副本:副本备份机制解决了数据存储的高可用问题

​ 当数据只保存一份的时候,有丢失的风险。为了更好的容错和容灾,将数据拷贝几份,保存到不同的机器上。在放置副本的时候:

​ 假设有三个分片, 三个副本, 共计9个节点

​ 在三台服务器上 各放置一个分片的副本

​ 第二个副本放置在和这台服务器同机架上

​ 第三个副本放置在不同的机架的服务器上

5.2 Kafka 保证数据不丢失机制

5.2.1保证生产端数据不丢失机制

1)消息生产分为同步方式和异步方式

2)消息确认分为三个状态

a)0:生产者只负责发送数据

b)1:某个分片的leader收到数据给出响应

C)-1:某个分片的副本都收到数据后给出响应

3在同步方式下

生产者等待10秒,如果broke没有给出ack响应,就认为失败

生产者主动将进行重试3次如果还没有响应就报错

在异步模式下

a)先将数据保存在生产端的buffer中 Buffer大小是2万条

b)满足数据阀值或者数量(时间)阀值其中的一个条件就可以发送数据

c 发送一批数据的大小是500条

5,2,2 broker端消息不丢失

broker端的信息不丢失其实就是用partition和副本机制(高可用)来保证

producer ack -1(all)能够保证所有的副本都同步好了数据,其中一台机器了并不影响数据的完整性

5.2.3 消费端消息不丢失

offSet:偏移量

通过offset commit 来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,会接着上次的offset进行消费。

5.3 消息存储及查询机制

5.3.1 文件存储机制

数据都是顺序存储在磁盘中

segment段中有两个核心的文件一个是log,一个是index。 当log文件等于1G时,新的消息会写入到下一个segment中。

5.3.2 文件查询机制

需求: 读取 offset=368776 的message消息数据

5.4 生产者数据分发策略

1) 如果是用户制定了parttion ,数据分发策略的时候可以指定数据发往哪个parttion

当producerRecord 的构造参数中有partition的时候就可以发送到对应的parttion上

2) 当用户指定key, 默认使用hash算法  可以定义分发策略的算法

3) 当用户既没有指定partition 也没有Key 使用轮询的方式发送数据

5.5 消费者负载均衡机制

一个parttion可以被一个组中的某一个成员消费

所以如果消费组中有多于partition数量的消费者,那么一定会有消费者无法消费数据。 Kafka 消费者在消费数据的时候 如果数据量过大 如何解决消费者慢的问题

1)在尽可能硬件满足的情况下 多加分片 并将同组中消费者数量和分片数量保持一致

2) 提高消费者处理数据的能力优化代码

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: