1.Kafka是什么?
kafka
是一款由 scala
语言编写的,基于 ”发布/订阅“ 模式的高性能消息中间件。它的组成部分包括了 provider
、consumer
、partition
(分区)、topic
(主题)、broker
(节点)几个。
在 kafka
中,topic
与 partition
之间是一对多的关系,一条消息只会发送到一个特定的 topic
中,随后根据 kafka
的负载均衡策略,分配到不同的 partition
中(每一个 partition
中的数据是有序的),而 partition
又分可以分为 主分区、副本分区,用于实现多个机器上的分布式存储,以此实现了高可用。
kafka
的常见应用场景有两种,一种是当做常规的消息中间件来使用,另一种是作为流处理技术使用它的 kafka stream
(未使用过)
思维导图
2. kafka 的高性能是如何保障的
大致有下几点,最重要和经常说的应该是 “零拷贝 + 顺序写”
- 零拷贝,利用操作系统的优点,例如
linux
上的mmap
与sendfile
,加快数据的复制速度; - 顺序写,可以类比
mysql
的redolog
以及redis
中的AOF
日志,顺序写盘相对于磁盘随机读取来说,IO
效率会高上不少; - 批量提交,
kafka
不会为每一条消息都去写一次,会暂时等待一小波数据(linger.ms
),或者达到批量条数后(batch.size
),然后批量一起写。 数据压缩,
Kafka
提供了数据压缩参数,采用数据压缩能减少数据传输量,提高效率。(题外话:个人认为压缩可以节省网络传输的耗时,但是相对的 压缩/解压 的过程也会耗时间,应该适用于网络状况不太好,或者节省broker
服务器资源的场景下使用,对于consumer
来说,消费速度肯定是会受到影响的)日志分段存储,只允许写最新的段日志文件,旧的段数据只读,这样做的好处是避免文件越来越大。除此之外,
kafka
给每个文件都保存了偏移量和索引信息,并且会将索引信息全部装入到内存中,减少磁盘IO
操作。Page Cache
,这一点基本上主流的中间件都会用到,在写数据时,可以只将数据写到Page Cache
便返回成功,不等待执行fsync
将数据刷到磁盘的过程。(这是配置项,可以加速吞吐量,但也会增加数据丢失的风险)
3.如何理解零拷贝?
一般的数据从网络到磁盘,或者从磁盘到网络,都需要经过四次拷贝。比如说磁盘到网络,要经过:
- 磁盘到内核缓冲区
- 内核缓存区到应用缓冲区
- 应用缓冲区到内核缓存区
- 内核缓存区到网络缓冲区
而零拷贝省去了第二步和第三步:
直接在内核中完成了数据的拷贝,无需要再经过一次应用缓冲区的中转。
Kafka
利用了两项零拷贝技术,mmap
和 sendfile
。前者是用于解决网络数据落盘的,Kafka
直接利用内存映射,完成了“写入操作”,对于 Kafka
来说,完成了网络缓冲区到磁盘缓冲区的“写入”,之后强制调用 flush
或者等操作系统自动刷盘(有参数控制)。Java
提供了 FileChannel
和 MappedByteBuffer
两项技术来实现 mmap
。
sendfile
是另外一种零拷贝实现,主要解决磁盘到网络的数据传输(数据推送)。操作系统读取磁盘数据到内存缓冲,直接丢过去socket buffer
,而后发送出去。很多中间件,例如 Nignx
, tomcat
都采用了类似的技术。
4.kafka 的 ISR 是什么?如何工作的?
ISR
又称为分区同步队列,Kafka
为每个主分区维护了一个 ISR
,处于 ISR
的分区意味着与主分区保持了同步(所以主分区也在 ISR
里面)。
当 Producer
写入消息的时候,需要等 ISR
里面分区的确认,当 ISR
确认之后,就被认为消息已经提交成功了。
ISR
里面的分区会定时从主分区里面拉取数据,如果长时间未拉取,或者数据落后太多(已被取消),分区会被移出 ISR
。ISR
里面分区已经同步的偏移量被称为 LEO(Log End Offset)
,最小的 LEO
称为 HW
(木桶最短的那块板),也就是消费者可以消费的最新消息。
当主分区挂掉的时候,会从 ISR
里面选举一个新的主分区出来。
写入消息的时候, 我们可以在 Producer
里面控制 ACK
机制。Producer
可以配置成三种:
Producer
发出去就算成功;Producer
发出去,主分区写入本地磁盘就算成功;Producer
发出去,ISR
所有的分区都写入磁盘,就算成功;
三种配置的性能依次下降,但是可靠性依次上升。
因为 ISR
里面包含了主分区,也就是说,如果整个 ISR
只有主分区,那么全部写入就退化为主分区写入。所以在可靠性要求非常高的情况下,我们要求 ISR
中分区不能少于三个。该参数可以在 Broker
中配置(min.insync.replicas
)
ISR
的同步机制和其它中间件机制也是类似的,在涉及主从同步的时候都要在性能和可靠性之间做取舍。通常的选项都是:
- 主写入就认为成功
- 主写入,至少一个从写入就认为成功;
- 主写入,大部分从库写入就认为成功(一般“大部分”是可以配置的,从这个意义上来说,2 和 3 可以合并为一点);
- 主写入,所有从库写入就认为成功;
而“写入”也会有不同语义:
- 中间件写到日志缓存就认为写入了;
- 中间件写入到系统缓存(
page cache
)就认为写入了; - 中间件强制刷新到磁盘(发起了
fsync
)就认为写入了;
都是性能到可靠性的取舍。
5.什么时候分区会被移出 ISR?
在旧版本中是两种情况:
消息落后太多,由参数
replica.lag.max.messages
进行控制,在0.9
以后就已经取消了为什么要删除? 因为这个参数本身很难给出一个合适的值。以默认的值 4000 为例,对于消息流入速度很低的主题(比如 TPS 为 10 ),这个参数就没什么用;对于消息流入速度很高的主题(比如 TPS 为 2000 ),这个参数的取值又会引入 ISR 的频繁变动( ISR 需要在 Zookeeper 中维护)。所以从 0.9x 版本开始,Kafka 就彻底移除了这一个参数。
分区长时间没有发起
fetch
请求,由参数replica.lag.time.max.ms
控制。
基本上,除非是新的 Broker
,否则几乎都是由网络、磁盘 IO
和 GC
引起的,大多数情况下,是负载过高导致的
这两个参数,设置过小会导致 ISR
频繁变化,过大会导致可靠性降低,存在数据丢失的风险。
6.Kafka 的负载均衡策略有哪些?
一般来说有两种,一种是轮询,即 Producer
轮流挑选不同的 Partition
;另外一种是 Hash
取余,这要求我们提供 Key
。
选取 Hash key
时, 一般会根据业务 id
取余,或者拼接某几个字段在一起。这样一来可以将业务上的同类数据发往同一个 partition
(也保证了顺序),缺点在于可能会导致某个分区的数据特别多,类似 redis
的热点 key
问题。
这些负载均衡其实都只考虑了发送端,而没有考虑到 consumer
的负载,假如消息分发已经很均匀了,但不同消费者之间的消费速度不一致,也有可能引起问题。
7.为什么 Kafka 的从 Partition 不能读取?
首先是 Kafka
自身的限制,即 Kafka
强制要求一个 Partition
只能有一个 Consumer
,因此 Consumer
天然只需要消费主 Partition
就可以。
那么假如说 Kafka
放开这种限制,比如说有多个 Consumer
,分别从主 Partition
和从 Partition
上读取数据,那么会出现一个问题:即偏移量如何同步的问题。例如一个 Consumer
从 Partition A
读取了 0 - 100
的消息,那么另外一个 Consumer
从 Partition B
上读取,就只能读取 100
之后的数据。那么 Kafka
就需要在不同的 Partition
之间协调这个已读取偏移量。而这是分布式一致性的问题,难以解决。
MySQL
的主从模式比起来,并没有这种问题,即 MySQL
不需要进行类似偏移量的协商。
而从另外一个角度来说,Kafka
的读取压力是远小于 MySQL
的,毕竟一个 Topic
,是不会有特别多的消费者的。并且 Kafka
也不需要支持复杂查询,所以完全没必要读取从 Partition
的数据。
8.为什么 Kafka 在消费者端采用了拉(PULL)模型?
采用拉模型的核心原因在于,消费者的消费速率不同。在拉模型之下,消费者自己消费完毕就自己再去拉一批,那么这种速率是由消费者自己控制的,所需要的控制信息也是由消费者自己保存的。而采用推模型,就意味着中间件要和消费者就速率问题进行协商,否则容易导致要么推送过快,要么推送过慢的问题。
推模型的一个极大的好处是避免竞争,例如在多个消费者拉同一主题的消息的时候,就需要保证,不同消费者不会引起并发问题。而 Kafka
不会有类似的问题,因为 Kafka
限制了一个 Partition
只能有一个消费者,所以拉模型反而更加合适。
9.分区过多问题如何解决
对于 Producer
来说,它采用的是批量发送的机制,那么分区数量多的话,就需要消耗大量的内存来维护这些缓存的消息。同时,也增大了数据丢失的风险。
对于 Consumer
来说,分区数量多意味着要么部署非常多的实例,要么开启非常多的线程,无论是哪一种方案,都是开销巨大。
对于 Broker
来说,分区特别多而对应的 Broker
数量又不足的话,那么意味着一个 Broker
上分布着大量的分区,那么一次宕机就会引起 Kafka
延时猛增。同时,每一个分区都要求 Broker
开启三个句柄,那么会引起 Broker
上的文件句柄被急速消耗,可能导致程序崩溃。还要考虑到,Kafka
虽然采用了顺序写,但是这是指在一个分区内部顺序写,在多个分区之间,是无法做到顺序写的。
(注意,对于 Broker
来说,如果你的集群规模非常大,以至于虽然有一万个分区,但是每个 Broker
上只有寥寥几个分区,那么分区数量对 Broker
来说是没影响的。我们这里的讨论,都是建立在我一个 Broker
上放了很多分区的基础上)
(分区数量和性能的关系类似一个二次函数,随着分区增长会慢慢变好,但是到达一个临界点之后,就会开始衰退)
解决
- 增加
Broker
,确保Broker
上不会存在很多的分区。这可以避免Broker
上文件句柄数量过多,顺序写退化为随机写,以及宕机影响范围太大的问题。 - 其次可以考虑拆分
Topic
并且部署到不同的集群。(光是拆分没啥用, 还得加broker
)
10.Rebalance 问题
rebanlance
通常发生在以下两种情况:
Topic
或者分区的数量变化(例如增加新的分区)- 消费者数量变化(加入或者退出)。超时又可以细分为两个:一个是消费超时(
max.poll.interval.ms
),一个是心跳超时(session.timeout.ms
)
rebanlance 的步骤:
- 新的消费者向协调者上报自己的订阅信息;
- 协调者强制别的消费者发起一轮
rebalance
,上报自己的订阅信息; - 协调者从消费者中挑选一个
leader
,注意这里是挑选了消费者中的leader
; - 协调者将订阅信息发给
leader
,让leader
来制作分配方案; leader
上报自己的方案;- 协调者同步方案给别的消费者
rebanlance 会导致的问题
- 重复消费:如果在消费者已经消费了,但是还没提交,这个时候发生了
rebalance
,那么别的消费者可能会再一次消费; - 影响性能:
rebalance
的过程,一般是在几十毫秒到上百毫秒。这个过程会导致集群处于一种不稳定状态中,影响消费者的吞吐量;
如何避免 rebalance?
首先,Topic
或者分区变化,引起 rebalance
是无法避免的,因为一般都是因为业务变化引起的。比如说,随着流量增加,我们要增加分区。
能够避免的就是防止消费者出现消费超时或者心跳超时。消费超时可以增大 max.poll.interval.ms
参数,避免被协调者踢掉。或者优化消费逻辑,使得消费者能够快速消费,拉取下一批消息
11. 事务消息问题
Kafka
的事务消息语义是:一次发送多条,要么一起成功要么一起失败。不是业务处理结果,与消息发送成功、失败绑定这种事务语义(rocketmq
)。
rocketmq
的事务消息逻辑:
- 发送半消息,此时内容对于消费者不可见,
- 查询本地事务执行结果
- 如果本地事务执行成功(提交半消息),如果执行失败(标记半消息为删除)。如果查询超时(重试)
要点:rocketmq
的半消息与普通消息其实无异,只是包装了一下,放在了特殊的队列中(不可见)提交的时候再发送到真正的队列中(延时消息有同样的思路!!!)
rocketmq
的事务消息执行流程: