消息队列Kafka教程
1. 简介
1.1 介绍
Kafka
Kafka是用Scala语言编写的。
Kafka最初由LinkedIn公司开发,是一个分布式流处理平台,主要使用Scala语言进行开发。由于其强大的功能和广泛的应用场景,Kafka被广泛应用于实时处理大量数据以满足各种需求场景。Scala语言的运用使得Kafka能够高效地处理流数据,同时也使其在Java虚拟机(JVM)上运行,依赖于Java环境。因此,在安装和使用Kafka之前,需要确保系统中有适当的Java开发工具包(JDK)环境,并且由于Kafka对Zookeeper的依赖,通常还需要先安装Zookeeper以确保Kafka的正常运行。
核心:一种高吞吐量的分布式流处理平台,它可以处理消费者在网站中的所有动作流数据。比如网页浏览,搜索和其他用户的行为等,应用于大数据实时处理领域。
优点
高吞吐量:可满足每秒百万级别的消息的生产和消费
持久性:具备一套完整的消息的存储机制,可以确保消息数据的高效的安全的持久化
分布式:既有扩展以及容错性。
zookeeper
ZooKeeper是一个集中服务,用于维护配置信息、命名、提供分布式同步和组服务。所有这些类型的服务都以某种形式被分布式应用程序使用。每次实施它们时,都要进行大量的工作来修复不可避免的bug和竞争条件。由于实现这类服务的困难,应用程序最初通常会对它们进行精简,这使得它们在发生变化时变得脆弱,并且难以管理。即使操作正确,在部署应用程序时,这些服务的不同实现也会导致管理复杂性。
ZooKeeper的目标是将这些不同服务的精髓提炼为一个非常简单的接口,从而实现集中式协调服务。服务本身是分布式的,并且高度可靠。共识、组管理和存在协议将由服务实现,因此应用程序不需要自己实现它们。特定于应用程序的这些使用将由Zoo Keeper的特定组件和特定于应用程序的约定的混合组成。
zookeeper的特性
- 顺序一致性:数据按照顺序分批入库
- 原子性:集体更新成功或失败,没有部分结果
- 单一视图:客户端连接集群中的任一zk服务节点,它看到的数据都是相同的。
- 可靠性:每次对zk的操作状态都会保存在服务端
- 实时性:客户端可以读取到zk服务端的最新数据
zookeeper作用
集群管理
ZooKeeper一般以集群的方式对外提供服务,一般3 ~ 5台机器就可以组成一个可用的Zookeeper集群了,每台机器都会在内存中维护当前的服务器状态,并且每台机器之间都相互保持着通信。只要集群中超过半数的机器能够正常工作,那么整个集群就能够正常对外服务
配置管理
统一配置文件管理,即只需要部署一台服务器,则可以把相同的配置文件同步更新到其他所有服务器中(发布订阅),此操作在云计算中用的特别多(假设修改了redis统一配置)
分布式锁
分布式锁是控制分布式系统或不同系统之间共同访问共享资源的一种锁实现,如果不同的系统或同一个系统的不同主机之间共享了某个资源时,往往需要互斥来防止彼此干扰来保证一致性。
消息队列的使用场景
目前企业中比较常见的消息队列产品主要有Kafka、ActiveMQ、RabbitMQ,RocketMQ 等。
在大数据场景主要采用Kafka作为消息队列。在JavaEE 开发中主要采用ActiveMMO、RabbitMQ、RocketMQ。
缓冲/消峰:
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
消息队列可以实现系统之间的解耦,使得系统之间可以独立地扩展或修改,只要它们遵守同样的接口约束。例如,在订单系统中,当订单创建成功后,可以通过消息队列通知库存系统、物流系统等进行相应的处理,而不需要订单系统直接调用这些系统的接口。
异步处理
消息队列允许将不是必须的业务逻辑异步处理,从而提高系统的吞吐量和响应速度。例如,在用户注册成功后,需要发送注册邮件和短信通知,这些操作可以异步写入消息队列,然后立即返回用户注册成功的响应,而不需要等待邮件和短信发送完成。
1.2 安装
step 1: 安装jdk
cd /usr/local
wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
tar -zxvf jdk-17_linux-x64_bin.tar.gz
然后在/etc/profile
中配置jdk路径
export JAVA_HOME=/usr/local/jdk-17.0.12
export PATH=$JAVA_HOME/bin
然后重启一下配置
source /etc/profile
验证jdk是否安装成功
java --version
step 2: 安装zookeeper
kafaka 3.0以后不需要安装zookeeper,也可以正常启动
建议直接下载编译好的
运行一下命令,下载zookeeper。这里我们选择镜像网站https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
tar -zxvf apache-zookeeper-3.8.4-bin.tar.gz
mv apache-zookeeper-3.8.4-bin/ zookeeper
然后在/etc/profile
配置环境
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin
启动前,还需要配置一下zookeeoer文件
cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
配置文件解释。
vim /usr/local/zookeeper/conf/zoo.cfg
记得修改日志和数据位置
# 每一次滴答的毫秒数,滴答:zookeeper基本时间单元
tickTime=2000
# 初始同步阶段可以执行的滴答数,用于集群,配置主节点等待从节点启动并完成数据同步的时间,以 tickTime 的倍数来表示,默认是10 那么就是 10*tickTime
initLimit=10
# 在发送请求和获得确认之间可以经过的滴答数,用于集群,配置 主节点 与 从节点 之间心跳检测的最大时间长度,默认值是 5,也就是 5*tickTime
syncLimit=5
# zookeeper服务存储快照目录,必须配置,不能配置在/tmp目录下,该目录下的文件会被自动删除
dataDir=/usr/local/zookeeper/dataDir
# 日志目录,如果不配置会共用dataDir目录
dataLogDir=/usr/local/zookeeper/dataLogDir
# 客户端连接的端口,服务器对外暴露的端口,默认 2181
clientPort=2181
启动zookeeper
zkServer.sh start
此命令支持以下选项:
- -conf:指定 ZooKeeper配置文件路径:
- -data :指定 ZooKeeper 数据目录路径。
- -log :指定 ZooKeeper 日志文件路径
- -nodes:指定ZooKeeper 集群中其他服务器的地址,
- -port :指定 ZooKeeper 服务监听的端口号,
- -help:显示帮助信息。
输入jps查看进程
jps
如果出现Starting zookeeper ... FAILED TO START
通过日志的报错进一步判断错误的原因
zkServer.sh start-foreground
如果是 QuorumPeeMain。那就说明是安装错了版本。应该下载编译好的。而不是源码
step 3: 安装kafka(3.7.1)
kafka-xxx-yyy:xxx是scala版本,yyy是kafka版本(scala是基于jdk开发,需要安装jdk环境)
Scala 2.12 - kafka_2.12-3.7.1.tgz (asc, sha512)
Scala 2.13 - kafka_2.13-3.7.1.tgz (asc, sha512)
我们应该知道,一个完整的Kafka实例,至少包含了3部分:
生产者-Producer
Broker
生产者-Consumer 其中生产者和消费者是使用Java语言,Broker则是使用的Scala语言,这样是不是就明白了。2.12和2.13其实就是说的Scala的版本,3.7.1就是Kafka真正的正式版本号。
wget https://downloads.apache.org/kafka/3.7.1/kafka_2.12-3.7.1.tgz
tar -zxvf kafka_2.12-3.7.1.tgz
修改解压后的文件名字
mv kafka_2.12-3.7.1/ kafka
然后在/etc/profile
配置环境
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
最终/etc/profie目录下的配置文件
然后重新启动一下配置文件
source /etc/profile
step 4: 修改配置信息
vim /usr/local/kafka/config/server.properties
解压缩完成后进入到 kafka/config目录
# brokerid,配置集群时每个服务器上的brokerid要配置成不一样的
broker.id=0
# 监听地址,一般配置成局域网地址
listeners=PLAINTEXT://:9092 # 示例:listeners=PLAINTEXT://192.168.189.82:9092
# 监听地址,一般配置成外网地址,如果没有配置将使用listeners地址
advertised.listeners=PLAINTEXT://your.host.name:9092
# 持久化存储路径,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能
log.dirs=/usr/local/kafka/data/logs
# 指定 partition 的数量
num.partitions=1
# 配置 zookeeper 连接的地址,集群连接使用英式逗号分隔
zookeeper.connect=localhost:2181
# 启动启用日志清理,true启用,false关闭
log.cleaner.enable=true
# 清理日志线程数
log.cleaner.threads=2
# 清理日志使用的策略,delete:删除,compact:压缩,根据key进行整理,有相同key不同
# value值,只保留最后一个
log.cleanup.policy=delete
# 定时任务检测删除日志间隔时间,默认5分钟
log.retention.check.interval.ms=300000
# 删除超过指定时间的消息,默认是168小时,7天
# 还有log.retention.ms, log.retention.minutes, log.retention.hours,优先级从高到低
log.retention.hours=168
# 超过指定大小后,删除旧的消息,下面是1G的字节数,-1就是没限制,log.retention.bytes
# 和log.retention.hours任意一个达到要求,都会执行删除
log.retention.bytes=1073741824
启动kafka,后面一定指定配置文件
kafka-server-start.sh /usr/local/kafka/config/server.properties
关闭kafka
kafka-server-stop.sh
后台启动
nohup kafka-server-start.sh /usr/local/kafka/config/server.properties > kafka-out.file 2>&1 &
step 4: 安装可视化工具
https://github.com/obsidiandynamics/kafdrop
启动
java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED -jar kafdrop-4.0.2.jar --kafka.brokerConnect=127.0.0.1:9092
restart.sh
ps -ef | grep kafdrop-4.0.2.jar | grep -v grep | awk '{print $2}' | xargs kill -9
访问
http://localhost:9000/
1.3 连接工具
kafka客户端工具推荐kafdrop
,kafkamanager
,kafka Eagle
,前两个工具都比tool好用,可以试一试
安装流程上一步,已经介绍了
2. 入门
2.1 核心概念
- 为了方便拓展,并提高吞吐量,一个topic被分成多个partition
- 配合分区的设计,提出消费者组的概念,组内每个消费者并行消费
- 为提高可用性,为每个partition增加若干副本,类似NameNode HA
- ZK中记录谁是leader,Kafka2.8.0以后也可以配置不采用ZK
Broker:Kafka的服务端程序,可以认为一个mq节点就是一个broker,broker存储topic的数据
Producer生产者:
- 生产者是向 Kafka broker 发送消息的客户端。它负责将消息发布到指定的主题(Topic),并可以选择将消息发送到特定的分区(Partition)。
- 生产者通常是数据源,如应用程序、传感器、日志系统等。
- 生产者可以以异步或同步的方式发送消息,并且可以配置消息发送的确认机制以确保消息的可靠性。
Consumer消费者:消费队列中的消息
ConsumerGroup消费者组:同一个topic, 可以广播发送给不同的group,一个group中只有一个consumer可以消费此消息
Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,主题的意思
Partition分区:kafka数据存储的基本单元,topic中的数据分割为一个或多个partition,每个topic至少有一个partition,是有序的一个Topic的多个partitions, 被分布在kafka集群中的多个server上。
partition是一个有序的队列,以文件夹的形式存储在Broker本机上。消费者数量 <=小于或者等于Partition数量。Kafka 采取了分片和索引机制,将每个partition分为多个segment,每个segment对应2个文件 log 和 index,log默认大小配置log.segment.bytes
Replication副本(备份) 同个Partition会有多个副本replication ,多个副本的数据是一样的,当其他broker挂掉后,系统可以主动用副本提供服务。 默认每个topic的副本都是1(默认是没有副本,节省资源),也可以在创建topic的时候指定如果当前kafka集群只有3个broker节点,则replication-factor最大就是3了,如果创建副本为4,则会报错。
ReplicationLeader、ReplicationFollower
Partition有多个副本,但只有一个replicationLeader负责该Partition和生产者消费者交互。 ReplicationFollower只是做一个备份,从replicationLeader进行同步
ReplicationManager
负责Broker所有分区副本信息,Replication副本状态切换
offset偏移量
每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。 partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息(记录自己消费到哪里了的标记)。可以认为offset是partition中Message的id。kafka把offset保存在消费端的消费者组里 LEO(LogEndOffset)表示每个partition的log最后一条Message的位置。
HW(HighWatermark)
表示partition各个replicas数据间同步且一致的offset位置,即表示allreplicas已经commit的位置 HW之前的数据才是Commit后的,对消费者才可见 ISR集合里面最小leo HW的作用:保证消费数据的一致性和副本数据的一致性
Follower故障 Follower发生故障后会被临时踢出ISR(动态变化),待该follower恢复后,follower会读取本地的磁盘记录的上次的HW,并将该log文件高于HW的部分截取掉,从HW开始向leader进行同步,等该follower的LEO大于等于该Partition的hw,即follower追上leader后,就可以重新加入ISR。 Leader故障 Leader发生故障后,会从ISR中选出一个新的leader,为了保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于hw的部分截掉(新leader自己不会截掉),然后从新的leader同步数据。
Segment 每个topic可以有多个partition,而每个partition又由多个segment file组成。 segment file 由2部分组成,分别为index file和data file(log file),两个文件是一一对应的,后缀”.index”和”.log”分别表示索引文件和数据文件。 命名规则:partition的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset+1。
ISR (in-sync replica set ) leader会维持一个与其保持同步的replica集合,该集合就是ISR,每一个leader partition都有一个ISR,leader动态维护, 要保证kafka不丢失message,就要保证ISR这组集合存活(至少有一个存活),并且消息commit成功 Partition leader 保持同步的 Partition Follower 集合, 当 ISR 中的Partition Follower 完成数据的同步之后,就会给 leader 发送 ack 如果Partition follower长时间(replica.lag.time.max.ms) 未向leader同步数据,则该Partition Follower将被踢出ISR Partition Leader 发生故障之后,就会从 ISR 中选举新的 Partition Leader。 OSR (out-of-sync-replica set) 与leader副本分区 同步滞后过多的副本集合
2.2 Topic命令
参数 | 描述 |
---|---|
--bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号。 |
--topic <String: topic> | 操作的 topic 名称 |
--create | 创建主题 |
--delete | 删除主题 |
--alter | 修改主题 |
--list | 查看所有主题 |
--describe | 查看主题详细描述 |
--partitions <Integer: # of partitions> | 设置分区数 |
--replication-factor<Integer: replication factor> | 设置分区副本 |
--config String:name=value | 更新系统默认的配置 |
连接上其他集群,
kafka-topics.sh --bootstrap-server localhost:9092
如果想连接多个,添加逗号,我这个是单机,没有试过
kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop101:9092
创建topics
。一个分区,1个副本。创建3个副本
kafka-topics.sh --bootstrap-server localhost:9092 --topic first --create --partitions 1 --replication-factor 3
这个时候,为啥呢?因为我们是单机,不是集群,副本只是1。一般是多少台机子,多少个副本
kafka-topics.sh --bootstrap-server localhost:9092 --topic first --create --partitions 1 --replication-factor 1
查看某一个topics的具体信息
kafka-topics.sh --bootstrap-server localhost:9092 --topic first --describe
查看所有topics
kafka-topics.sh --bootstrap-server localhost:9092 --list
修改分区
kafka-topics.sh --bootstrap-server localhost:9092 --topic first --alter --partitions 3
2.3 生产者
命令行
发送一条数据
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first
创建一个消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
但是有个问题,第一条数据hello 没有消费怎么办?
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first --from-beginning
发送数据原理
- Producer创建时,会创建⼀个sender线程并设置为守护线程。
- ⽣产消息时,内部其实是异步的;⽣产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。
- 批次发送的条件为:缓冲区数据⼤⼩达到batch.size或者linger.ms达到上限,哪个先达到就算哪个。
- 批次发送后,发往指定分区,然后落盘到broker;如果⽣产者配置了retrires参数⼤于0并且失败原因允许重试,那么客户端内部会对该消息进⾏重试。
- 落盘到broker成功,返回⽣产元数据给⽣产者。
- 元数据返回有两种⽅式:⼀种是通过阻塞直接返回,另⼀种是通过回调返回。
下面是具体原理,上图为大致流程。
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和发送线程。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息收集器(RecordAccumulator,也称为消息累加器)中。发送线程负责从消息收集器中获取消息并将其发送到 Kafka 中。
主线程中发送过来的消息都会被追加到消息收集器的某个双端队列(Deque)中,在其的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即 Deque。消息写入缓存时,追加到双端队列的尾部;Sender 读取消息时,从双端队列的头部读取。注意 ProducerBatch 不是 ProducerRecord,ProducerBatch 中可以包含一至多个 ProducerRecord。
通俗地说,ProducerRecord 是生产者中创建的消息,而 ProducerBatch 是指一个消息批次,ProducerRecord 会被包含在 ProducerBatch 中,这样可以使字节的使用更加紧凑。与此同时,将较小的 ProducerRecord 拼凑成一个较大的 ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。
如果生产者客户端需要向很多分区发送消息,则可以将 buffer.memory 参数适当调大以增加整体的吞吐量。
消息在网络上都是以字节的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在 Kafka 生产者客户端中,通过 java.io.ByteBuffer实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在 消息收集器的内部还有一个 BufferPool,它主要用来实现 ByteBuffer 的复用,以实现缓存的高效利用。不过 BufferPool 只针对特定大小的 ByteBuffer 进行管理,而其他大小的 ByteBuffer 不会缓存进 BufferPool 中,这个特定的大小由 batch.size
参数来指定,默认值为16384B,即16KB
。我们可以适当地调大 batch.size 参数以便多缓存一些消息。
ProducerBatch 的大小和 batch.size 参数也有着密切的关系。当一条消息流入消息收集器时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch(如果没有则新建),查看 ProducerBatch 中是否还可以写入这个 ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的 ProducerBatch。
在新建 ProducerBatch 时评估这条消息的大小是否超过 batch.size 参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建 ProducerBatch,这样在使用完这段内存区域之后,可以通过 BufferPool 的管理来进行复用;如果超过,那么就以评估的大小来创建 ProducerBatch,这段内存区域不会被复用。
Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本> 的保存形式转变成 的形式,其中 Node 表示 Kafka 集群的 broker 节点。
对于网络连接来说,生产者客户端是与具体的 broker 节点建立的连接,也就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer 的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。
在转换成 的形式之后,Sender 还会进一步封装成 的形式,这样就可以将 Request 请求发往各个 Node 了,这里的 Request 是指 Kafka 的各种协议请求,对于消息发送而言就是指具体的 ProduceReques
请求在从 Sender 线程发往 Kafka 之前还会保存到InFlightRequests中,保存对象的具体形式为 Map,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个 String 类型,表示节点的 id 编号)。
有个应答机制
0: 生产者发送过来的数据,不需要等数据落盘应答。
1: 生产者发送过来的数据,Leader收到数据后应答
-1(all): 生产者发送过来的数据,Leader和ISR队列里面的所有阶段收齐数据后应答。-1和all等价
异步发送
step 1: 创建一个java的maven项目
step 2: 切换一下maven镜像源
step 3: 然后添加最新的依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.8.0</version>
</dependency>
step 4: 编写不带回调函数的API代码
如果是远程连接的话,需要修改配置kafka配置
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://外网的ip:9092
配置讲解,看下面的博客
一文搞懂Kafka中的listeners和advertised.listeners以及其他通信配置_51CTO博客_kafka advertised.listeners
package com.lkcoffee.framework;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* @Desciption:
* @Author: feixiang.li
* @date: 2024-07-30 11:09
**/
public class KafkaProducerTest1 {
public static void main(String[] args) {
System.out.println("开始发送");
// 1. 创建链接配置
Properties properties = new Properties();
// 连接集群。这里我链接的是远程的。如果是本地的,换成localhost
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"101.201.104.148:9092");
// 设置序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// 2. 创建kafka生产者对象
try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) {
// 3. 开始发送数据
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "lkcoffee" + i));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 4. 关闭资源
System.out.println("结束发送");
}
}
}
回调异步
package com.lkcoffee.framework;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Objects;
import java.util.Properties;
/**
* @Desciption:
* @Author: feixiang.li
* @date: 2024-07-30 11:09
**/
public class KafkaProducerCallBack {
public static void main(String[] args) {
System.out.println("开始发送");
// 1. 创建链接配置
Properties properties = new Properties();
// 连接集群。这里我链接的是远程的。如果是本地的,换成localhost
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"101.201.104.148:9092");
// 设置序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// 2. 创建kafka生产者对象
try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) {
// 3. 开始发送数据
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "lkcoffee" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(Objects.isNull(e)){
System.out.println("主题: "+recordMetadata.topic()+" 分区: "+recordMetadata.partition());
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 4. 关闭资源
System.out.println("结束发送");
}
}
}
下面是发送的结果
同步发送
添加get()
kafkaProducer.send(new ProducerRecord<>("first", "lkcoffee" + i)).get();
分区器
便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
(3)既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同) 例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到,Kafka再随机一个分区进行使用了如果还是0会继续随机)
package com.lkcoffee.framework;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Objects;
import java.util.Properties;
/**
* @Desciption:
* @Author: feixiang.li
* @date: 2024-07-30 11:09
**/
public class KafkaProducerCallBackPartitions {
public static void main(String[] args) {
System.out.println("开始发送");
// 1. 创建链接配置
Properties properties = new Properties();
// 连接集群。这里我链接的是远程的。如果是本地的,换成localhost
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"101.201.104.148:9092");
// 设置序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// 2. 创建kafka生产者对象
try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) {
// 3. 开始发送数据
for (int i = 0; i < 10; i++) {
// partition是从0开始的。如果是3个分区的话,最大写2
kafkaProducer.send(new ProducerRecord<>("first", 0,"","lkcoffee" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(Objects.isNull(e)){
System.out.println("主题: "+recordMetadata.topic()+" 分区: "+recordMetadata.partition());
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 4. 关闭资源
System.out.println("结束发送");
}
}
}
自定义分区器
例如我们实现一个分区器实现,发送过来的数据中包含atguigu,就发往0号分区,不包含atguigu,就发往1号分区
package com.lkcoffee.framework;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* @Desciption:
* @Author: feixiang.li
* @date: 2024-07-30 15:32
**/
public class MyPartitioner implements Partitioner {
/**
* 实现分区
* @param s topic主题
* @param o key 发送的key
* @param bytes 序列化的key
* @param o1 value 发送的值
* @param bytes1 序列化的主题
* @param cluster 连接主题
* @return 返回分区结果
*/
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
// 获取数据
String data = o1.toString();
System.out.println("发送过来的value "+ data);
if (data.contains("atguigu")) {
return 0;
} else if (data.contains("hello")) {
return 1;
} else if (data.contains("joker")) {
return 2;
}
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
这个代码中使用,记得更换类名字
package com.lkcoffee.framework;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
/**
* @Desciption:
* @Author: feixiang.li
* @date: 2024-07-30 11:09
**/
public class KafkaProducerCallBackPartitionsTest1 {
public static void main(String[] args) {
System.out.println("开始发送");
// 1. 创建链接配置
Properties properties = new Properties();
// 连接集群。这里我链接的是远程的。如果是本地的,换成localhost
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "101.201.104.148:9092");
// 设置序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 设置自定义分区
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.lkcoffee.framework.MyPartitioner");
// 2. 创建kafka生产者对象
try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) {
// 随机制造3组数据,包含atguigu
List<String> list = Arrays.asList("atguigu", "hello", "mm", "joker");
// 3. 开始发送数据
for (int i = 0; i < 10; i++) {
// partition是从0开始的。如果是3个分区的话,最大写2
String value=list.get(i % list.size()) + i;
System.out.println("KafkaProducerCallBackPartitionsTest1 发送的value: "+value);
kafkaProducer.send(new ProducerRecord<>("first", value),new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (Objects.isNull(e)) {
System.out.println("主题: " + recordMetadata.topic() + " 分区: " + recordMetadata.partition());
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 4. 关闭资源
System.out.println("结束发送");
}
}
}
提高吞吐量
package com.lkcoffee.framework;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Objects;
import java.util.Properties;
/**
* @Desciption:
* @Author: feixiang.li
* @date: 2024-07-30 11:09
**/
public class KafkaProducerParameters {
public static void main(String[] args) {
System.out.println("开始发送");
// 1. 创建链接配置
Properties properties = new Properties();
// 连接集群。这里我链接的是远程的。如果是本地的,换成localhost
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"101.201.104.148:9092");
// 设置序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// 设置缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432L);
// 设置批量发送的大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// 设置linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
// 2. 创建kafka生产者对象
try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) {
// 3. 开始发送数据
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "lkcoffee" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(Objects.isNull(e)){
System.out.println("主题: "+recordMetadata.topic()+" 分区: "+recordMetadata.partition());
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 4. 关闭资源
System.out.println("结束发送");
}
}
}
数据可靠
数据完全可靠条件=ACK级别设置为-1+分区副本大于等于2+ISR里应答的最小副本数量大于等于2
可靠性总结
生产者发送过来数据就不管了,可靠性差,效率高;
- acks=0,
- acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
- acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
在生产环境中,acks=0很少使用;acks=1,-般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。
数据重复分析
设置ACK机制
package com.lkcoffee.framework;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Objects;
import java.util.Properties;
/**
* @Desciption:
* @Author: feixiang.li
* @date: 2024-07-30 11:09
**/
public class KafkaProducerAck {
public static void main(String[] args) {
System.out.println("开始发送");
// 1. 创建链接配置
Properties properties = new Properties();
// 连接集群。这里我链接的是远程的。如果是本地的,换成localhost
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"101.201.104.148:9092");
// 设置序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// 设置acks
properties.put(ProducerConfig.ACKS_CONFIG,"-1");
// 设置重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,"3");
// 2. 创建kafka生产者对象
try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) {
// 3. 开始发送数据
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "lkcoffee" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(Objects.isNull(e)){
System.out.println("主题: "+recordMetadata.topic()+" 分区: "+recordMetadata.partition());
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 4. 关闭资源
System.out.println("结束发送");
}
}
}
数据幂等性
介绍
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
幂等性指:无论执行多少次同样的运算,结果都是相同的。即一条命令,任意多次执行所产生的影响均与一次执行的影响相同。
先来了解下消息的三种投递语义:
最多一次(at most once): 消息只发一次,消息可能会丢失,但绝不会被重复发送。例如:mqtt 中 QoS = 0。
至少一次(at least once): 消息至少发一次,消息不会丢失,但有可能被重复发送。例如:mqtt 中 QoS = 1
精确一次(Exactly Once):幂等性+至少一次( ack=-1+ 分区副本数>=2 + ISR最小副本数量>=2)。
kafka幂等机制流程
重复数据的判断标准: 具有
<PID,Partition,SeqNumber>
相同主键的消息提交时,Broker只会持久化一条。其 中PID
是Katka每次重启都会分配一个新的;Partition
表示分区号;Sequence Number
是单调自增的。
- Producer 每次启动后,会向 Broker 申请一个全局唯一的 pid。(重启后 pid 会变化,这也是弊端之一)
- Sequence Numbe:针对每个 <Topic, Partition> 都对应一个从0开始单调递增的 Sequence,同时 Broker端会缓存这个 seq num
- 判断是否重复: 拿
<pid, seq num>
去 Broker 里对应的队列ProducerStateEntry.Queue
(默认队列长度为 5)查询是否存在- 如果
nextSeq == lastSeq + 1
,即 服务端seq + 1 == 生产传入seq
,则接收。 - 如果
nextSeq == 0 && lastSeq == Int.MaxValue
,即刚初始化,也接收。 - 反之,要么重复,要么丢消息,均拒绝。
- 如果
所以幂等性只能保证的是在单分区单会话内不重复。
如何使用幂等性?
开启参数
enable.idempotence
默认为 true。false 关闭。
java代码
// 开启幂等性
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true");
// 设置最大队列长度
properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,"5");
设置幂等,启动幂等。
配置 acks,注意:一定要设置
acks=all
,否则会抛异常。高版本不需要设置配置 max.in.flight.requests.per.connection 需要 <= 5,否则会抛异常 OutOfOrderSequenceException。
0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1
Kafka >= 1.1, max.in.flight.request.per.connection <= 5
Kafka的事务
package com.lkcoffee.framework;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Objects;
import java.util.Properties;
/**
* @Desciption:
* @Author: feixiang.li
* @date: 2024-07-30 11:09
**/
public class KafkaProducerTransformation {
public static void main(String[] args) {
System.out.println("开始发送");
// 1. 创建链接配置
Properties properties = new Properties();
// 连接集群。这里我链接的是远程的。如果是本地的,换成localhost
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"101.201.104.148:9092");
// 设置序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// 指定事务id
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"lkcoffee");
// 2. 创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
try {
// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction();
// 3. 开始发送数据
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "lkcoffee" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(Objects.isNull(e)){
System.out.println("主题: "+recordMetadata.topic()+" 分区: "+recordMetadata.partition());
}
}
});
}
// 提交事务
kafkaProducer.commitTransaction();
} catch (Exception e) {
e.printStackTrace();
// 终止事务
kafkaProducer.abortTransaction();
} finally {
// 4. 关闭资源
System.out.println("结束发送");
}
}
}
数据有序,乱序
Kafka中的消息都是被发布到某个Topic的。一个Topic可以配置多个Partition,Partition是Kafka进行并行操作的基本单位。
对于同一个Partition来说,其中消息必须严格按照生成顺序进行 Append。这保证了每个Partition内部的消息递增有序。但是跨Partition的消息则不保证有序。不同Partition分布在不同的Broker上,彼此之间没有顺序保证。
消费消息时,每个Partition只能被同一个Consumer Group中的一个Consumer消费,这样保证了每个Partition内消息的消费顺序。但不同Partition的消息可能被同一个Group的不同Consumer并行消费,因此无法保证顺序。
如何保证??
Producer端对消息打上全局唯一的序号ID,或者使用Kafka自带的分区器按序号对消息Partition。消费时可以按序号排序。
Consumer端采用只订阅单个Partition的方式消费数据,而不是跨Partition订阅,这保证了一个Partition顺序。Consumer端自己维护偏移及排序,跨Partition订阅后MERGE排序。可在DB中存 last offset。
将相关联的消息发送到同一个Partition,不同类型消息分配到不同Partition。减小分区数可减少排序问题。采用可以重置偏移的Kafka消费者模式,以读取历史有序数据。采用Spark Streaming等支持有序接收Kafka数据并处理的框架。
设置为单节点单分区,并开启log compaction。新消息会覆盖key相同的老消息,确保单调递增顺序。调整max.message.bytes以控制批次大小,间接控制顺序粒度。
参考资料
2.4 Broker
2.4.1 zookeeper存储信息
连接zookeeper
ZooKeeper解压后,在其bin目录下包含着常用的程序,例如 zkServer.sh zkCli.sh 我们使用zkCli.sh 就可以通过命令行使用Zookeeper客户端
zkCli.sh
出现以下这种界面,基本上就算连接成功。
如果不是默认地址的话。可以指定端口
#对于本地默认端口 则可以直接 ./zkCli.sh
# -server 指定服务地址和端口
./zkCli.sh -server localhost:15881
查看节点
可以使用ls
查看子节点列表,使用 get 命令查看节点的内容
ls /
可以看到除了默认的Zookeeper节点外还有一个节点叫做kafka(之所以叫这个是因为咱们在配置文件中写的名字就是/kafka)。我什么都没有改,没有前缀信息,所有东西直接就在zookeeper的根目录下
查看kafka节点
ls /kafka
可以再往里面看看:
ls /kafka/brokers/ids
可以看到在 ids 下面有着 0 1 2
三个数字,这个其实就是我们Kafka集群中每台机器的 brokerid
。
还有其他用法
# 使用 get 命令查看节点内容 get -s 则可以附加打印节点状态信息
[zk: localhost:15881(CONNECTED) 6] get /zk-temp
data222
# stat 命令查看节点状态
[zk: localhost:15881(CONNECTED) 0] stat /zk-temp
cZxid = 0x30000000a
ctime = Wed Jul 05 10:48:44 CST 2023
mZxid = 0x30000000a
mtime = Wed Jul 05 10:48:44 CST 2023
pZxid = 0x30000000a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100008d52290003
dataLength = 7
numChildren = 0
更新节点内容 命令:set path data [version]
version表示数据版本,在zookeeper中,节点的数据是有版本概念的,这个参数⽤于指定本次更新操作是基于Znode的哪⼀个数据版本进⾏的,如果版本和最新版本对不上则会更新失败,这样可以防止覆盖最新写入的数据。
set /zk-premament 666
zookeeper里面的内容
所有东西都在zookeeper里面有体现
如果是单机的话,里面应该是没有信息的
2.4.2 工作原理
- 第一步:broker启动后在zk中注册,争抢controller
- 第二步:谁先注册Controller,谁说了算
- 第三步:由选举出来的Controller,监听brokers节点变化
- 第四步:Controller决定Leader选举
- 第五步:Controller将节点信息上传到ZK
- 第六步:其他contorller从zk同步相关信息,万一挂了他们好上位
- 第七步:假设Broker1中Leader挂了
- 第八步:Controller监听到节点变化
- 第九步:获取ISR
- 第十步:选举新的Leader(在 isr中存活为前提,按照 AR中排在前面的优先)
- 第十一步:更新Leader及ISR
- 选举规则:在isr中存活为前提,按 照AR中排在前面的优先。例如 ar[1,0,2], isr [1,0,2],那么leader 就会按照1,0,2的顺序轮询
模拟 Kafka 上下线,Zookeeper 中数据变化:
查看
/kafka/brokers/ids
路径上的节点:查看
/kafka/controller
路径上的数据:查看
/kafka/brokers/topics/first/partitions/0/state
路径上的数据:停止 hadoop104 上的 kafka:
再次查看
/kafka/brokers/ids
路径上的节点再次查看
/kafka/controller
路径上的数据再次查看
/kafka/brokers/topics/first/partitions/0/state
路径上的数据
2.4.3 Broker重要参数
参数名称 | 描述 |
---|---|
replica.lag.time.max.ms | ISR中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。 该时间阈值,默认 30s |
auto.leader.rebalance.enable | 默认是 true。 自动 Leader Partition 平衡 |
leader.imbalance.per.broker.percentage | 默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器 会触发 leader 的平衡 |
leader.imbalance.check.interval.seconds | 默认值 300 秒。检查 leader 负载是否平衡的间隔时间 |
log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是 指 log 日志划分成块的大小,默认值 1G |
log.index.interval.bytes | 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引 |
log.retention.hours | Kafka 中数据保存的时间,默认 7 天 |
log.retention.minutes | Kafka 中数据保存的时间,分钟级别,默认关闭 |
log.retention.ms | Kafka 中数据保存的时间,毫秒级别,默认关闭 |
log.retention.check.interval.ms | 检查数据是否保存超时的间隔,默认是 5 分钟 |
log.retention.bytes | 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment |
log.cleanup.policy | 默认是 delete,表示所有数据启用删除策略; 如果设置值为compact,表示所有数据启用压缩策略 |
num.io.threads | 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50% |
num.replica.fetchers | 副本拉取线程数,这个参数占总核数的 50%的1/3 |
num.network.threads | 默认是 3。数据传输线程数,这个参数占总核数的 50%的2/3 |
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改, 交给系统自己管理 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理 |
2.4.4 服役新节点
2.4.5 退役旧节点
2.4.6 Kafka副本
副本信息
Kafka 副本作用:提高数据可靠性。
Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;
太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
Kafka 中副本分为:Leader 和 Follower。
Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。
Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。
ISR
,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。OSR
,表示 Follower 与 Leader 副本同步时,延迟过多的副本。
选举流程
Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。
Leader 选举会按照 AR 的顺序进行选取,就是下图中的 Replicas
顺序:
创建一个新的topic,4个分区,4个副本
kafka-topics.sh --bootstrap-server test1:9092 --create --topic van --partitions 4 --replication-factor 4
副本数要小于连接的机子数量,不然会报错
。我这里都是单机,所以就直接抄了
查看leader分布情况
kafka-topics.sh --bootstrap-server test1:9092 --describe --topic second
停止掉 test4 的 kafka 进程,并查看 Leader 分区情况:
kafka-server-stop.sh
回到 test1 上重新看一眼:
kafka-topics.sh --bootstrap-server test1:9092 --describe --topic second
可以看到两次的 Leader 分区发生的变化:
停止掉 test3 的 kafka 进程,并查看 Leader 分区情况:
启动 test4 和 test5 的 kafka 进程,并查看 Leader 分区情况(可以看到没有变回去):
Follow故障
首先介绍2个新的概念
(Log End Offset): 每个副本的最后一个offset,
LEO
其实就是最新的offset+1
.HW(High Watermark): 所有副本中最小的LEO。
该
Follower
先被踢出 ISR 队列,然后其余的 Leader、Follower继续接受数据。如果该Follower
恢复了,会读取本地磁盘上次记录的HW
,并裁剪掉 高于 HW 的数据,从 HW 开始向 Leader 进行同步数据。
待该 Follower 的 LEO 大于等于该 Partition 的 HW,即 Follower 追上了 Leader,就可以重新加入 ISR 了。
Leader故障
broker0 一开始是 Leader,然后挂掉了,选举 broker1 为新的 Leader,然后其余的 Follower 会把各自 log 文件高于 HW 的部分裁剪掉,然后从新的 Leader 同步数据。
分区副本分配
如果 kafka 服务器只有 4 个节点,那么设置 kafka 的分区数大于服务器台数,在 kafka 底层如何分配存储副本呢?
创建16个分区,3个副本
kafka-topics.sh --bootstrap-server test1:9092 --create --partitions 16 --replication-factor 3 --topic second
看一下细节(颜色标好了,找规律即可):
手动调整分区副本
在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副 本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。
需求:创建一个新的topic,4个分区,两个副本,名称为three。将该topic的所有副本都存储到broker0和 broker1两台服务器上。
vim increase-replication-factor.json
输入一下内容
{
"version":1,
"partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
{"topic":"three","partition":1,"replicas":[0,1]},
{"topic":"three","partition":2,"replicas":[1,0]},
{"topic":"three","partition":3,"replicas":[1,0]}]
}
执行副本存储计划
kafka-reassign-partitions.sh --bootstrap-server test1:9092 --reassignment-json-file increase-replication-factor.json --execute
验证副本存储计划
kafka-reassign-partitions.sh --bootstrap-server test1:9092 --reassignment-json-file increase-replication-factor.json --verify
查看分区副本存储情况
bin/kafka-topics.sh --bootstrap-server test1:9092 --describe --topic three
2.4.7 Partiton 负载均衡
正常情况下,Kafka本身会自动把Leader Partition均匀分散在各个机器上,来保证每台机器的读写吞吐量都是均匀的。但是如果某些broker宕机,会导致Leader Partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是follower partition,读写请求很低,造成集群负载不均衡。
- auto.leader.rebalance.enable ,默认是true。自动 Leader Partition 平衡
- leader.imbalance.per.broker.percentage , 默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡
- leader.imbalance.check.interval.seconds , 默认值300秒。检查leader负载是否平衡的间隔时间
可以看到上图中先创建了一个topic,然后进行两次查看(第一次查看之后我手动把一台broker关掉再重启)。
第一次查看:ISR全部存活,对Replicas进行对比,谁靠前谁就是Leader。
然后去关掉一个broker,当它下线的时候,集群会选举出新的Leader。之后重启它,让它重新上线。
第二次查看:因为中途Leader的变更,所以可以看到就算Replicas中2是靠前的,但是Leader变成了3。
拿 broker0 来说,原本的Leader应该是2,但是在重新选举后变成了3,所以不平衡数加1。AR的副本数是4,所以 broker0 节点不平衡率为 1/4>10% ,需要再平衡。
Broker1 2 3 的不平衡数为0,不需要再平衡。
增加副本因子
创建topics
kafka-topics.sh --bootstrap-server test1:9092 --create --partitions 3 --replication-factor 1 --topic four
手动增加副本存储
创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中):
vim increase-replication-factor.json
内容如下
{"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}
执行计划
kafka-reassign-partitions.sh --bootstrap-server test1:9092 --reassignment-json-file increase-replication-factor.json --execute
2.4.8 文件存储
怎么存储
Topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号,例如:first-0。
可以用Linux的文件目录来抽象的表示一下:
Topic/
├── Partition-0
│ └── log
│ ├── *.index
│ ├── *.log
│ └── *.timeindex
├── Partition-1
│ └── log
└── Partition-2
└── log
查看数据到底存储在什么位置
查看 test1(或者 test2、test3)的/opt/module/kafka/datas/first-1 (first-0、first-2)路径上的文件:
cd /usr/local/kafka/data/logs/first-0/
这个得看启动kafka的时候,配置文件写的存储位置。
- .log 日志文件
- .index 偏移量索引文件
- .timeindex 时间戳索引文件
index 文件和 log 文件详解
- Index为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引.参数log.index.interval.bytes默认4kb。
- Index文件中保存的offset为相对offset,这样能确保offset的值所占空间不会过大,因此能将offset的值控制在固定大小。
如何在log文件中定位到offset所对应的Record:
- 根据目标offset定位Segment文件
- 找到小于等于目标offset的最大offset对应的索引项
- 定位到log文件
- 向下遍历找到目标Record
日志存储参数配置:
参数 | 描述 |
---|---|
log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G |
log.index.interval.bytes | 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就 往 index 文件里面记录一个索引。 稀疏索引 |
文件清除策略
Kafka 中默认的日志保存时间为7天
,可以通过调整如下参数修改保存时间:
- log.retention.hours ,最低优先级小时,默认 7 天。
- log.retention.minutes ,分钟。
- log.retention.ms ,最高优先级毫秒。
- log.retention.check.interval.ms ,负责设置检查周期,默认 5 分钟。
Kafka中提供的日志清理策略有 delete 和 compact 两种。
1-delete日志删除:
将过期数据删除
log.cleanup.policy = delete 所有数据启用删除策略
基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。 log.retention.bytes,默认等于-1,表示无穷大。
2-compact-日志压缩
对于相同key的不同value值,只保留最后一个版本。
log.cleanup.policy = compact 所有数据启用压缩策略。
压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大 的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。
这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息 集里就保存了所有用户最新的资料。
2.4.9 高效读写数据
Kafka本身是分布式集群,可以采用分区技术,并行度高
读数据采用稀疏索引,可以快速定位要消费的数据
顺序写磁盘
Kaka的 producer生产数据,要写入到l0g文件中,写的过程是一直追加到文件末端为顺序写。官网有数据表明,同样的磁盘,顺序写能到600Ms,而随机写只有100KS。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
页缓存+零拷贝技术
参考资料
2.5 消费者
2.5.1 消费方式
pull(拉)模式:
kafka采用拉模式,消费者主动从broker拉取数据。
拉模式的不足之处:如果kafka没有数据,那么消费者可能陷入循环中,一直返回空数据。
push(推)模式:
kafka没有采用这种方式,因为有broker决定消息发送速率,很难适应所有的消费者。
每个 消费者消费的速率不同,可能造成有的消费者非常忙碌,有的消费者很空闲。
2.5.2 消费者工作流程
新版本(0.9之后)的 offset 保存在 kafka 的 Topic里,持久化到磁盘,可靠性有保障。
老版本(0.9之前)的 offset 保存在 Zookeeper 的 consumers 节点路径下。
为什么转移了呢?如果所有的消费者都把 offset 维护在 Zookeeper 中,那么所有的消费者都需要跟 Zookeeper 进行大量的交互,就会导致网络数据传输非常频繁,压力较大。所以存储在主题里更易于维护管理。
2.5.3 消费者组
原理
- 消费者组,有多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。消费组之间互不影响。所有的消费者都属于某个消费组。即消费者组是逻辑上的一个订阅者
- 当只创建一个消费者时候,未指定groupid的时候,其实kafka会自动分配一个groupid。
- 同一个分区只可以被一个消费者组内的消费者消费,如果组内只有一个消费者,那同一主题下所有分区都被该消费者消费。如果消费者组中的消费者超过分区数,那么就有一部分消费者闲置,不会接收任何消息。
- 每个消费者消费的offset有消费者提交到系统主题中保存。_consumer_offsets
消费者组初始化流程
coordinator(协调员):辅助实现消费者组的初始化和分区的分配。每个broker都存在一个coordinator
coordinator节点选择=groupid的hashcode值**%50(50=_consumer_offsets的分区数量)。假如:groupid的hashcode值=1,1%50=1**。那么_consumer_offsets主题的1号分区在那个leader broker上,就选择这个节点的coordinator作为这个消费组的老大。消费者下的所有消费者提交的offset的时候就往这个分区去提交offset.
确定完coordinator所在的broker。
- 每个consumer都发送JoinGroup请求到coordinator
- coordinator选出一个consumer作为Leader
- coordinator将要消费的topic情况发送给Leader消费者
- leader消费者负责制定消费方案
- 将制定完的消费方案发送给coordinator
- coordinator会将消费方案下发给各个消费者
- 每个消费者都会和coordinator保持心跳(
heartbeat.interval.ms=3s
),一旦超时(session.timeout.ms=45s
),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms=5分钟
),也会出发再平衡
协议
kafka提供了5个协议来处理与消费组协调相关的问题:
- Heartbeat请求:consumer需要定期给组协调器发送心跳来表明自己还活着
- LeaveGroup请求:主动告诉组协调器我要离开消费组
- SyncGroup请求:消费组Leader把分配方案告诉组内所有成员
- JoinGroup请求:成员请求加入组
- DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用
组协调器在再均衡的时候主要用到了前面4种请求。
首先,kafka 需要和消费者组建立网络连接客户端:ConsumerNetworkClient
获取消费者分区分配策略。
创建消费者协调器(提交offset)
调用
sendFetches
用来初始化抓取数据。设置fetch.min.bytes、fetch.max.wait.ms、fetch.max.bytes
这里会设置 3 个参数:
fetch.min.bytes
:每批次最小抓取大小,默认1字节fetch.max.wait.ms
:一批数据最小值未达到的超时时间,默认500msfetch.max.bytes
:每批次最大抓取大小,默认50m
开始调用
send
方法,发送请求。通过回调方法onSuccess
把对应的结果拉取回来。一批一批的放入一个队列中。消费者调用
fetchedRecords
方法从队列中抓取数据。消费者一批次会拉取500条(max.poll.records
)。经过反序列化 -》 过滤器 -》 正常的处理数据
在生产端也有拦截器,拦截器的作用:整个 kafka 集群不会处理数据,只会存数据,那么处理数据就可以在生产端和消费端的拦截器去做,而且拦截器可以方便的监控 kafka 的运行情况。这也是 kafka 高吞吐量的原因。
消费者重要参数
案例-消费一个分区
独立消费者案例(订阅主题)
需求:创建一个独立消费者,消费 first 主题中数据。
package com.lkcoffee.framework;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
/**
* @Desciption:
* @Author: feixiang.li
* @date: 2024-07-30 11:09
**/
public class KafkaCustomConsumer {
public static void main(String[] args) {
System.out.println("开始发送");
// 1. 创建链接配置
Properties properties = new Properties();
// 连接集群。这里我链接的是远程的。如果是本地的,换成localhost
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"101.201.104.148:9092");
// 设置序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// 反序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组id(组名任意起名)必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 2. 创建kafka生产者对象
try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) {
// 3. 开始发送数据
for (int i = 0; i < 10; i++) {
// 给value 加上时间戳
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
String formattedNow = now.format(formatter);
String value = "hello kafka: " + formattedNow;
kafkaProducer.send(new ProducerRecord<>("first", i % 3, "", value), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(Objects.isNull(e)){
System.out.println("发送成功 主题: "+recordMetadata.topic()+" 分区: "+recordMetadata.partition());
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 4. 关闭资源
System.out.println("结束发送");
}
// 3. 创建消费者对象。自己发送,自己消费
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(List.of("first"));
// 4. 开始消费
while (true) {
System.out.println("开始消费");
// 设置每过1s消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
我们可以看到3个分区的数据,都有消费
案例-消费一个分区的数据
需求:创建一个独立消费者,消费 first 主题 0 号分区的数据。
package com.lkcoffee.framework;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
/**
* @Desciption:
* @Author: feixiang.li
* @date: 2024-07-30 11:09
**/
public class KafkaCustomConsumerTest1 {
public static void main(String[] args) {
System.out.println("开始发送");
// 1. 创建链接配置
Properties properties = new Properties();
// 连接集群。这里我链接的是远程的。如果是本地的,换成localhost
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"101.201.104.148:9092");
// 设置序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// 反序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组id(组名任意起名)必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 2. 创建kafka生产者对象
try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) {
// 3. 开始发送数据
for (int i = 0; i < 10; i++) {
// 给value 加上时间戳
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
String formattedNow = now.format(formatter);
String value = "hello kafka: " + formattedNow;
kafkaProducer.send(new ProducerRecord<>("first", i % 3, "", value), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(Objects.isNull(e)){
System.out.println("发送成功 主题: "+recordMetadata.topic()+" 分区: "+recordMetadata.partition());
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 4. 关闭资源
System.out.println("结束发送");
}
// 3. 创建消费者对象。自己发送,自己消费
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 我们只消费first topic, 分区0的数据
kafkaConsumer.assign(List.of(new TopicPartition("first", 0)));
// 4. 开始消费
while (true) {
System.out.println("开始消费");
// 设置每过1s消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
我们看到只能消费到0号分区的,
案例-消费者组案例
需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。
package com.lkcoffee.framework;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Desciption:
* @Author: feixiang.li
* @date: 2024-07-30 11:09
**/
public class KafkaCustomConsumerTest2 {
public static void main(String[] args) {
System.out.println("开始发送");
// 1. 创建链接配置
Properties properties = new Properties();
// 连接集群。这里我链接的是远程的。如果是本地的,换成localhost
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"101.201.104.148:9092");
// 设置序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// 反序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组id(组名任意起名)必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 2. 创建kafka生产者对象
try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) {
// 3. 开始发送数据
for (int i = 0; i < 10; i++) {
// 给value 加上时间戳
LocalDateTime now = LocalDateTime.now();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
String formattedNow = now.format(formatter);
String value = "hello kafka: " + formattedNow;
kafkaProducer.send(new ProducerRecord<>("first", i % 3, "", value), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(Objects.isNull(e)){
System.out.println("发送成功 主题: "+recordMetadata.topic()+" 分区: "+recordMetadata.partition());
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 4. 关闭资源
System.out.println("结束发送");
}
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 创建3个消费者并分配给不同的线程
for (int i = 0; i < 3; i++) {
int partitionId = i % 3; // 假设我们有3个分区
executorService.submit(() -> consumeData(properties, partitionId));
}
// 等待所有任务完成
executorService.shutdown();
while (!executorService.isTerminated()) {
}
}
private static void consumeData(Properties properties, int partitionId) {
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 都只注册在一个topic
kafkaConsumer.subscribe(List.of("first"));
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(Thread.currentThread().getName()+" - 消费于线程 -"+consumerRecord);
}
}
}
}
看下结果
2.5.4 分区的分配以及再平衡
重平衡其实就是一个协议,它规定了如何让消费者组下的所有消费者来分配topic中的每一个分区。比如一个topic有100个分区,一个消费者组内有20个消费者,在协调者的控制下让组内每一个消费者分配到5个分区,这个分配的过程就是重平衡。
重平衡的触发条件主要有三个:
- 消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组。
- 主题的分区数发生变更,kafka目前只支持增加分区,当增加的时候就会触发重平衡.
- 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡
Kafka有四种主流的分区分配策略:
- Range
- RoundRobin、
- Sticky.
- CooperativeSticky
Consumer Leader 就是根据分区分配策略,制定消费方案。
Range 以及再平衡
案例
修改主题 first 为 7 个分区
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 7
同时启动 3 个消费者:CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”
启动 CustomProducer 生产者,发送 500 条消息,随机发送到不同的分区
for (int i = 0; i < 500; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null) {
System.out.println("主题:" + metadata.topic() + "->" +
"分区:" + metadata.partition());
} else {
e.printStackTrace();
}
}
});
}
观察 3 个消费者分别消费哪些分区的数据
- 停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)
- 1 号消费者:消费到
3、4
号分区数据。 - 2 号消费者:消费到
5、6
号分区数据。 - 0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。
- 1 号消费者:消费到
- 再次重新发送消息观看结果(45s 以后)
- 1 号消费者:消费到
0、1、2、3
号分区数据。 - 2 号消费者:消费到
4、5、6
号分区数据。 - 说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。
- 1 号消费者:消费到
RoundRobin 以及再平衡
案例
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
重启 3 个消费者,重复发送消息的步骤,观看分区结果
- 停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)
- 1 号消费者:消费到 2、5 号分区数据
- 2 号消费者:消费到 4、1 号分区数据
- 0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,分别由 1 号消费者卓和 2 号消费者消费。
- 再次重新发送消息观看结果(45s 以后)
- 1 号消费者:消费到 0、2、4、6 号分区数据
- 2 号消费者:消费到 1、3、5 号分区数据
Sticky 以及再平衡
**粘性分区定义:**可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
需求:设置主题为 first,7 个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。
修改分区分配策略为粘性 Sticky
// 修改分区分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);
使用同样的生产者发送 500 条消息
- 停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)
- 1 号消费者:消费到 2、5、3 号分区数据。
- 2 号消费者:消费到 4、6 号分区数据。
- 0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 1 号消费者或者 2 号消费者消费。
- 再次重新发送消息观看结果(45s 以后)
- 1 号消费者:消费到 2、3、5 号分区数据。
- 2 号消费者:消费到 0、1、4、6 号分区数据。
CooperativeSticky以及再平衡
上述三种分区分配策略均是基于 eager
协议,Kafka2.4.0开始引入 CooperativeSticky 策略——在不停止消费的情况下进行增量再平衡。
CooperativeSticky 与之前的 Sticky 虽然都是维持原来的分区分配方案,最大的区别是:Sticky
仍然是基于 eager
协议,分区重分配时候,都需要 consumers 先放弃当前持有的分区,重新加入consumer group
;而 CooperativeSticky
基于 cooperative
协议,该协议将原来的一次全局分区重平衡,改成多次小规模分区重平衡。
例如:一个Topic(T0,三个分区)
,两个 consumers(consumer1、consumer2)
均订阅 Topic(T0)
。
如果consumers订阅信息为:
consumer1 | T0P0、T0P2 |
---|---|
consumer2 | T0P1 |
此时,新的 consumer3 加入消费者组,那么基于 eager
协议的分区重分配策略流程:
- consumer1、 consumer2 正常发送心跳信息到 Group Coordinator。
- 随着 consumer3 加入,Group Coordinator 收到对应的 Join Group 请求,Group Coordinator 确认有新成员需要加入消费者组。
- Group Coordinator 通知 consumer1 和 consumer2,需要 rebalance 了。
- consumer1 和 consumer2 放弃(revoke)当前各自持有的已有分区,重新发送 Join Group 请求到 Group Coordinator。
- Group Coordinator 依据指定的分区分配策略的处理逻辑,生成新的分区分配方案,然后通过 Sync Group 请求,将新的分区分配方案发送给 consumer1、consumer2、consumer3。
- 所有 consumers 按照新的分区分配,重新开始消费数据。
而基于 cooperative
协议的分区分配策略的流程:
- consumer1、 consumer2 正常发送心跳信息到 Group Coordinator。
- 随着 consumer3 加入,Group Coordinator 收到对应的 Join Group 请求,Group Coordinator确认有新成员需要加入消费者组。
- Group Coordinator 通知 consumer1 和 consumer2,需要 rebalance 了。
- consumer1、consumer2 通过 Join Group 请求将已经持有的分区发送给 Group Coordinator。
- 注意:并没有放弃(revoke)已有分区。
- Group Coordinator 取消 consumer1 对分区 p2 的消费,然后发送 sync group 请求给 consumer1、consumer2。
- consumer1、consumer2 接收到分区分配方案,重新开始消费。至此,一次 rebalance 完成。
- 当前 p2 也没有被消费,再次触发下一轮 rebalance,将 p2 分配给 consumer3 消费。
2.5.5 offset位移
offset 的默认维护位置
__consumer_offsets
主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号
,value 就是当前 offset的值。每隔一段时间,kafka 内部会对这个 topic 进行 compact,也就是每个 group.id+topic+分区号
只保留最新数据。
默认情况下是看不到系统主题里的数据的
怎么解决呢??
在配置文件 config/consumer.properties 中添加配置
exclude.internal.topics=false
默认是 tmue,表示不能消费系统主题。为了査看该系统主题数据,所以该参数修改为 false。
创建一个新的topic
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test1 --partitions 2 --replication-factor 1
发送数据
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test1
启动消费者消费数据
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1 --group test
查看消费者消费主题__cunsumer_offsets
kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --consumer.config /usr/local/kafka/config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
里面有test1的offset
自动提交offset
相关参数
enable.auto.commit
:是否开启自动提交offset功能,默认是trueauto.commit.interval.ms
:自动提交offset的时间间隔,默认是5s
// 自动提交 `enable.auto.commit` 默认值为 true,消费者会自动周期性地向服务器提交偏移量。
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 提交时间间隔 `auto.commit.interval.ms` 如果设置了 `enable.auto.commit` 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
自动提交的问题
默认情况下消费者每五秒钟会提交一次改动的偏移量, 这样做虽然说吞吐量上来了, 但是可能会出现重复消费的问题。
因为可能在下一次提交偏移量之前消费者本地消费了一些消息,然后发生了分区再均衡。假设上次提交的偏移量是 2000 在下一次提交之前 其实消费者又消费了500条数据 也就是说当前的偏移量应该是2500 但是这个2500只在消费者本地, 也就是说其他消费者去消费这个分区的时候拿到的偏移量是2000,那么又会从2000开始消费消息 那么 2000到2500之间的消息又会被消费一遍,这就是重复消费的问题.
手动提交offset
手动提交有两种。commitSync(同步提交)和commitASync(异步提交)。相同点,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试;异步提交没有失败重试机制,可能会提交失败
- commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
- commitAsync(异步提交):发送完提交offset请求后,就开始消费下一批数据了
同步提交offset
必须等待offset提交完毕之后再去消费下一批数据。由于存在失败重试机制,可靠性比较高。但是因为需要等待提交结果,所以效率比较低。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumerByHandSync {
public static void main(String[] args) {
// 1. 创建 kafka 消费者配置类
Properties properties = new Properties();
// 2. 添加配置参数
// 添加连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// 配置序列化 必须
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// 配置消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 3. 创建 kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 4. 设置消费主题 形参是列表
consumer.subscribe(Arrays.asList("first"));
// 5. 消费数据
while (true) {
// 读取消息
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
// 输出消息
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.value());
}
// 同步提交 offset
consumer.commitSync();
}
}
}
异步提交offset
发送完提交offset请求后,就开始消费下一批数据了。异步提交offset无需等待,效率比较高
// 设置手动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
// 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
// 手动提交
consumer.commitAsync();
}
指定offset消费
auto.offset.reset
: earliest | latest | none 默认是 latest。
当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
- earliest:自动将偏移量重置为最早的偏移量,
--from-beginning
- latest(默认值):自动将偏移量重置为最新偏移量
- none:如果未找到消费者组的先前偏移量,则向消费者抛出异常
任意指定 offset 位移开始消费
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class CustomConsumerSeek {
public static void main(String[] args) {
// 0 配置信息
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// key value 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment = kafkaConsumer.assignment();
// 保证分区分配方案已经制定完毕
// 消费者初始化流程:
// 1)消费者跟coordinator汇报,我要加入消费者组
// 2)然后coordinator会选择一个Consumer Leader,把各Topic的情况给到它
// 3)Consumer Leader会制定分区分配方案,发给coordinator
// 4)coordinator再把分区分配方案下发给所有Consumer
// 所以需要等待一段时间。
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
// 遍历所有分区,并指定 offset 从 1700 的位置开始消费
for (TopicPartition tp : assignment) {
kafkaConsumer.seek(tp, 1700);
}
// 3 消费该主题数据
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
指定时间消费
- 需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。
- 例如要求按照时间消费前一天的数据,怎么处理?
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class CustomConsumerSeekTime {
public static void main(String[] args) {
// 0 配置信息
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// key value 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
Set<TopicPartition> assignment = kafkaConsumer.assignment();
// 保证分区分配方案已经制定完毕
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
// 希望把时间转换为对应的offset
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
// 封装集合存储,每个分区对应一天前的数据
for (TopicPartition topicPartition : assignment) {
timestampToSearch.put(topicPartition, System.currentTimeMillis() - 24 * 3600 * 1000);
}
// 获取从 1 天前开始消费的每个分区的 offset
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
// 遍历每个分区,对每个分区设置消费时间。
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
// 根据时间指定开始消费的位置
if (offsetAndTimestamp != null) {
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
}
}
// 3 消费该主题数据
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
重复消费和漏消费
- 重复消费:已经消费了数据,但是 offset 没提交。
- 漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。
API | 说明 |
---|---|
public void assign(Collection partitions) | 给当前消费者手动分配一系列主题分区。手动分配分区不支持增量分配,如果先前有分配分区,则该操作会覆盖之前的分配。如果给出的主题分区是空的,则等价于调用unsubscribe方法。手动分配主题分区的方法不使用消费组管理功能。当消费组成员变了,或者集群或主题的元数据改变了,不会触发分区分配的再平衡。手动分区分配assign(Collection)不能和自动分区分配subscribe(Collection,ConsumerRebalanceListener)一起使用。 |
public Set assignment() | 获取给当前消费者分配的分区集合。如果订阅是通过调用assign方法直接分配主题分区,则返回相同的集合。如果使用了主题订阅,该方法返回当前分配给该消费者的主题分区集合。如果分区订阅还没开始进行分区分配,或者正在重新分配分区,则会返回none。 |
public Map<String, List> listTopics() | 获取对用户授权的所有主题分区元数据。该方法会对服务器发起远程调用。 |
public List partitionsFor(String topic) | 获取指定主题的分区元数据。如果当前消费者没有关于该主题的元数据,就会对服务器发起远程调用。 |
public Map<TopicPartition, Long> beginningOffsets(Collection partitions) | 对于给定的主题分区,列出它们第一个消息的偏移量。注意,如果指定的分区不存在,该方法可能会永远阻塞。该方法不改变分区的当前消费者偏移量。 |
public void seekToEnd(Collection partitions) | 将偏移量移动到每个给定分区的最后一个。该方法延迟执行,只有当调用过poll方法或position方法之后才可以使用。如果没有指定分区,则将当前消费者分配的所有分区的消费者偏移量移动到最后。如果设置了隔离级别为:isolation.level=read_committed,则会将分区的消费偏移量移动到最后一个稳定的偏移量,即下一个要消费的消息现在还是未提交状态的事务消息。 |
public void seek(TopicPartition partition, long offset) | 将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下一条要消费的消息偏移量。若该方法多次调用,则最后一次的覆盖前面的。如果在消费中间随意使用,可能会丢失数据。 |
public long position(TopicPartition partition) | 检查指定主题分区的消费偏移量 |
public void seekToBeginning(Collection partitions) | 将给定每个分区的消费者偏移量移动到它们的起始偏移量。该方法懒执行,只有当调用过poll方法或position方法之后才会执行。如果没有提供分区,则将所有分配给当前消费者的分区消费偏移量移动到起始偏移量。 |
消息积压
如果kafka消费能力不足了,则可以考虑增加topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。
如果是下游的数据处理不及时,提高每批次拉去的数据。批次拉取的数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据的积压
生产者提高吞吐量:
batch.size
:默认 16klinger.ms
:默认 0mscompression.type
:数据压缩,默认为 Nonebuffer.memory
:RecordAccumlator 缓冲区大小,默认 32M
消费者提高吞吐量:
- 增加 Topic 的分区数,同时增加消费者数量
max.poll.records
:提高每批次拉取的数量,默认500条
参考资料