Record
-
消息
-
这里的消息就是指 Kafka 处理的主要对象
Topic
-
主题
-
是承载消息的逻辑容器,在实际使用中多用来区分具体的业务
Partition
-
分区
-
一个有序不变的消息序列。每个主题下可以有多个分区
Offset
-
消息位移
-
表示分区中每条消息的位置信息,是一个单调递增且不变的值
Replica
-
副本
-
Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本
-
副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用
-
所谓副本(Replica),本质就是一个只能追加写消息的提交日志。
副本机制有什么好处呢?
-
提供数据冗余
即使系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性。
-
提供高伸缩性
支持横向扩展,能够通过增加机器的方式来提升读性能,进而提高读操作吞吐量。
-
改善数据局部性
允许将数据放入与用户地理位置相近的地方,从而降低系统延时。
如何确保副本中所有的数据都是一致的呢?
-
基于领导者(Leader-based)的副本机制
-
工作原理
第一,在 Kafka 中,副本分成两类:领导者副本(Leader Replica)和追随者副本(Follower Replica)。每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
第二,Kafka 的副本机制比其他分布式系统要更严格一些。在 Kafka 中,追随者副本是不对外提供服务的。这就是说,任何一个追随者副本都不能响应消费者和生产者的读写请求。 所有的请求都必须由领导者副本来处理,或者说,所有的读写请求都必须发往领导者副本所在的 Broker,由该 Broker 负责处理。追随者副本不处理客户端请求,它唯一的任务就是从领导者副本异步拉取消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
第三,当领导者副本挂掉了,或者说领导者副本所在的 Broker 宕机时,Kafka 依托于ZooKeeper 提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老 Leader 副本重启回来后,只能作为追随者副本加入到集群中。
-
好处
1.方便实现“Read-your-writes”
-
当你使用生产者 API 向 Kafka 成功写入消息后, 马上使用消费者 API 去读取刚才生产的消息。
-
举个例子,比如你平时发微博时,你发完一条微博,肯定是希望能立即看到的,这就是典型的 Read-your-writes 场景。如果允许追随者副本对外提供服务,由于副本同步是异步的, 因此有可能出现追随者副本还没有从领导者副本那里拉取到最新的消息,从而使得客户端看不到最新写入的消息。
2.方便实现单调读(Monotonic Reads)
-
对于一个消费者用户而言,在多次消费消息时,它不会看到某条消息一会儿存在一会儿不存在。
-
如果允许追随者副本提供读服务,那么假设当前有 2 个追随者副本 F1 和 F2,它们异步地拉取领导者副本数据。倘若 F1 拉取了 Leader 的最新消息而 F2 还未及时拉取,那么,此时如果有一个消费者先从 F1 读取消息之后又从 F2 拉取消息,它可能会看到这样的现象: 第一次消费时看到的最新消息在第二次消费时不见了,这就不是单调读一致性。但是,如果所有的读请求都是由 Leader 来处理,那么 Kafka 就很容易实现单调读一致性。
-
-
In-sync Replicas(ISR)
-
ISR 副本集合
-
ISR 不只是追随者副本集合,它必然包括 Leader 副本。甚至在某些情况下,ISR 只有 Leader 这一个副本
-
ISR 中的副本都是与 Leader 同步的副本(Leader 副本天然就在 ISR 中)
-
-
判断 Follower 是否与 Leader 同步的标准
-
Follower 副本能够落后 Leader 副本的最长时间间隔
-
ISR 是一个动态调整的集合,而非静态不变的
-
如果这个同步过程的速度持续慢于 Leader 副本的消息写入速度, 那么在 replica.lag.time.max.ms 时间后,此 Follower 副本就会被认为是与 Leader 副本不同步的,因此不能再放入 ISR 中。此时,Kafka 会自动收缩 ISR 集合,将该副本“踢出”ISR。
-
倘若该副本后面慢慢地追上了 Leader 的进度,那么它是能够重新被加回ISR 的。
-
-
-
怎么选举新 Leader
-
因为 Leader 副本天然就在 ISR 中,如果 ISR 为空了,就说明 Leader 副本也“挂掉”了,Kafka 需要重新选举一个新的 Leader
-
Unclean 领导者选举(Unclean Leader Election)
-
Kafka 把所有不在 ISR 中的存活副本都称为非同步副本
-
在 Kafka 中,选举这种副本的过程称为 Unclean 领导者选举。
-
开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean 领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。
-
强烈建议你不要开启它,我们还可以通过其他的方式来提升高可用性。
-
-
-
Producer
-
生产者
-
向主题发布新消息的应用程序
Consumer
-
消费者
-
从主题订阅新消息的应用程序
Consumer Offset
-
消费者位移
-
表征消费者消费进度,每个消费者都有自己的消费者位移
-
记录了Consumer 要消费的下一条消息的位移
-
提交位移
-
Consumer 需要向 Kafka 汇报自己的位移数据
-
Consumer 需要为分配给它的每个分区提交各自的位移数据
-
从用户的角度
-
总览
-
自动提交
-
Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息(能保证不出现消费丢失的情况)
-
可能会出现重复消费
- 在默认情况下,Consumer 每 5 秒自动提交一次位移。现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。
-
-
手动提交
-
API
-
KafkaConsumer#commitSync() (同步提交)
- 在调用 commitSync() 时,Consumer 程序会处于阻塞状态, 直到远端的 Broker 返回提交结果,这个状态才会结束
-
KafkaConsumer#commitAsync() (异步提交)
- 调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等。
-
组合使用
-
原因
-
我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动, Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
-
我们不希望程序总处于阻塞状态,影响 TPS。
-
-
如何将两个 API 方法结合使用
-
对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞
-
在 Consumer 要关闭前,我们调用commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据
-
-
-
-
调用 consumer.commitSync() 方法的时机,是在你处理完了 poll() 方法返回的所有消息之后
-
如果你莽撞地过早提交了位移,就可能会出现消费数据丢失的情况
-
-
-
-
重设消费者组位移
不论是哪种设置方式,重设位移大致可以从两个维度来进行。
- 位移维度。这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成我们给定的位移值。
- 时间维度。我们可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也可以给出一段时间间隔,比如 30 分钟前,然后让消费者直接将位移调回 30 分钟之前的位移值。
-
重设位移策略
-
7 种重设策略
-
-
消费者 API 方式设置
Consumer Group
-
消费者组
-
多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐
-
Consumer Group
-
Kafka 提供的可扩展且具有容错性的消费者机制
-
组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)
-
每个分区只能由同一个消费者组内的一个 Consumer 实例来消费
-
特性
-
Consumer Group 下可以有一个或多个 Consumer 实例
-
Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group
-
Consumer Group 下所有实例订阅的主题的单个分区, 只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费
-
-
机制
-
如果所有实例都属于同一个 Group, 那么它实现的就是消息队列模型
-
如果所有实例分别属于不同的 Group,那么它实现的就是发布 / 订阅模型
-
-
Consumer 实例的数量应该等于该 Group 订阅主题的分区总数
-
版本
-
老版本的 Consumer Group 把位移保存在 ZooKeeper 中
-
新版本的 Consumer Group(将位移保存在 Kafka 内部主题的方法 就是让人既爱又恨的 __consumer_offsets)
-
__consumer_offsets(位移主题)
-
内部主题
-
用户不能修改(消息格式)
-
消息格式
-
不能随意地向这个主题写消息
-
Kafka Consumer 有 API 帮你提交位移
-
-
-
将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 __consumer_offsets 中
-
__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息
-
位移主题的消息格式
-
用于保存 Consumer Group 信息的消息
- 是用来注册 Consumer Group 的
-
用于删除 Group 过期位移甚至是删除 Group 的消息
-
tombstone 消息,即墓碑消息,也称delete mark
-
一旦某个 Consumer Group 下的所有 Consumer 实例都停止了,而且它们的位移数据都已被删除时,Kafka 会向位移主题的对应分区写入tombstone 消息,表明要彻底删除这个 Group 的信息
-
-
-
提交位移
-
自动提交位移 enable.auto.commit(true)
-
手动提交位移 enable.auto.commit(false)
-
-
删除位移主题中的过期消息
-
自动提交位移(只要 Consumer 一直启动着, 它就会无限期地向位移主题写入消息)
-
Compaction(压缩 压实 整理)
-
Compact 策略
-
定义 Compact 策略中的过期
- 对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息
-
Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起
-
Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据
-
后台线程叫 Log Cleaner
-
很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的
-
-
-
-
-
-
Rebalance
-
重平衡
-
Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅Topic 的每个分区。
-
消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程
-
Rebalance 是 Kafka 消费者端实现高可用的重要手段
-
-
触发条件
-
组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。
-
订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生Rebalance。
-
订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。
-
-
问题
-
Rebalance 影响 Consumer 端 TPS
- Rebalance 过程对 Consumer Group 消费过程有极大的影响。如果你了解 JVM 的垃圾回收机制,你一定听过万物静止的收集方式,即著名的 stop the world,简称STW。在 STW 期间,所有应用线程都会停止工作,表现为整个应用程序僵在那边一动不动。Rebalance 过程也和这个类似,在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。
-
Rebalance 很慢
- Rebalance 实在是太慢了。曾经,有个国外用户的Group 内有几百个 Consumer 实例,成功 Rebalance 一次要几个小时!这完全是不能忍受的。最悲剧的是,目前社区对此无能为力,至少现在还没有特别好的解决方案。所谓“本事大不如不摊上”,也许最好的解决方案就是避免Rebalance 的发生吧。
-
Rebalance 效率不高
- 目前 Rebalance 的设计是所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。例如实例 A 之前负责消费分区 1、2、3,那么 Rebalance 之后,如果可能的话,最好还是让实例A 继续消费分区 1、2、3,而不是被重新分配其他的分区。 这样的话,实例 A 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。
-
-
“不必要的”
-
第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。
-
第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。
-
-
协调者,在 Kafka 中对应的术语是 Coordinator,它专门为 Consumer Group 服务,负责为 Group 执行Rebalance 以及提供位移管理和组成员管理等。
-
Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移
-
当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作
-
所有 Broker 在启动时,都会创建和开启相应的 Coordinator 组件。也就是说,所有Broker 都有各自的 Coordinator 组件。
-
Consumer Group 如何确定为它服务的Coordinator 在哪台 Broker 上
-
算法
第 1 步:确定由位移主题的哪个分区来保存该 Group 数据
第 2 步:找出该分区 Leader 副本所在的 Broker,该 Broker 即为对应的 Coordinator
简单解释一下上面的算法。首先,Kafka 会计算该 Group 的 group.id 参数的哈希值。比如你有个 Group 的 group.id 设置成了“test-group”,那么它的 hashCode 值就应该是627841412。其次,Kafka 会计算 __consumer_offsets 的分区数,通常是 50 个分区,之后将刚才那个哈希值对分区数进行取模加求绝对值计算,即 abs(627841412 % 50) = 12。此时,我们就知道了位移主题的分区 12 负责保存这个 Group 的数据。有了分区号, 算法的第 2 步就变得很简单了,我们只需要找出位移主题分区 12 的 Leader 副本在哪个Broker 上就可以了。这个 Broker,就是我们要找的 Coordinator。
-
意义
-
帮助我们解决定位问题
-
当 Consumer Group 出现问题,需要快速排查 Broker 端日志时,我们能够根据这个算法准确定位 Coordinator 对应的 Broker,不必一台 Broker 一台Broker 地盲查。
-
-
-
-
重平衡过程是如何通知到其他消费者实例
-
靠消费者端的心跳线程
-
当协调者决定开启新一轮重平衡后,它会将“REBALANCE_IN_PROGRESS”封装进心跳请求的响应中,发还给消费者实例。
-
当消费者实例发现心跳响应中包含了“REBALANCE_IN_PROGRESS”,就能立马知道重平衡又开始了,这就是重平衡的通知机制。
-
-
-
消费者组状态机
-
Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable
-
消费者组启动时的状态流转过程。一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance 状态等待成员加入,之后变更到CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。
-
当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。
-
当所有成员都退出组后,消费者组状态变更为 Empty。
-
-
-
消费者端重平衡流程
-
重平衡的完整流程需要消费者端和协调者组件共同参与才能完成
-
从消费者的视角
-
重平衡分为两个步骤
-
加入组
- JoinGroup 请求
-
等待领导者消费者(Leader Consumer)分配方案
- SyncGroup 请求
-
-
流程
-
当组内成员加入组时,它会向协调者发送 JoinGroup 请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。
-
通常情况下,第一个发送 JoinGroup 请求的成员自动成为领导者。 领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。 选出领导者之后,协调者会把消费者组订阅信息封装进 JoinGroup 请求的响应体中,然后发给领导者,由领导者统一做出分配方案后,进入到下一步:发送 SyncGroup 请求。 在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。 值得注意的是,其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步的主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。
-
-
-
从协调者端来看
-
分几个场景
-
场景一:新成员入组
-
新成员入组是指组处于 Stable 状态后,有新成员加入。
-
当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。具体的过程和之前的客户端重平衡流程是一样的。
-
-
-
场景二:组成员主动离组
-
何谓主动离组?就是指消费者实例所在线程或进程调用 close() 方法主动通知协调者它要退出。
-
协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员,因此我就不再赘述了,还是直接用一张图来说明。
-
-
场景三:组成员崩溃离组
-
崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。
-
它和主动离组是有区别的, 因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms 控制的。也就是说,Kafka 一般不会超过 session.timeout.ms 就能感知到这个崩溃。当然,后面处理崩溃离组的流程与之前是一样的,我们来看看下面这张图。
-
-
场景四:重平衡时协调者对组内成员提交位移的处理
-
正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup 请求发送。还是老办法,我们使用一张图来说明。
-
-
-
评论区