Kafka知识点总结

1.Kafka是什么?

kafka 是一款由 scala 语言编写的,基于 ”发布/订阅“ 模式的高性能消息中间件。它的组成部分包括了 providerconsumerpartition(分区)、topic(主题)、broker(节点)几个。

kafka 中,topicpartition 之间是一对多的关系,一条消息只会发送到一个特定的 topic 中,随后根据 kafka 的负载均衡策略,分配到不同的 partition 中(每一个 partition 中的数据是有序的),而 partition 又分可以分为 主分区、副本分区,用于实现多个机器上的分布式存储,以此实现了高可用。

kafka 的常见应用场景有两种,一种是当做常规的消息中间件来使用,另一种是作为流处理技术使用它的 kafka stream(未使用过)

思维导图

2. kafka 的高性能是如何保障的

大致有下几点,最重要和经常说的应该是 “零拷贝 + 顺序写”

  • 零拷贝,利用操作系统的优点,例如 linux 上的 mmapsendfile ,加快数据的复制速度;
  • 顺序写,可以类比 mysqlredolog 以及 redis 中的 AOF 日志,顺序写盘相对于磁盘随机读取来说,IO 效率会高上不少;
  • 批量提交,kafka 不会为每一条消息都去写一次,会暂时等待一小波数据(linger.ms),或者达到批量条数后(batch.size),然后批量一起写。
  • 数据压缩,Kafka 提供了数据压缩参数,采用数据压缩能减少数据传输量,提高效率。(题外话:个人认为压缩可以节省网络传输的耗时,但是相对的 压缩/解压 的过程也会耗时间,应该适用于网络状况不太好,或者节省 broker 服务器资源的场景下使用,对于 consumer 来说,消费速度肯定是会受到影响的)

  • 日志分段存储,只允许写最新的段日志文件,旧的段数据只读,这样做的好处是避免文件越来越大。除此之外,kafka 给每个文件都保存了偏移量和索引信息,并且会将索引信息全部装入到内存中,减少磁盘 IO 操作。

  • Page Cache,这一点基本上主流的中间件都会用到,在写数据时,可以只将数据写到 Page Cache 便返回成功,不等待执行 fsync 将数据刷到磁盘的过程。(这是配置项,可以加速吞吐量,但也会增加数据丢失的风险)

3.如何理解零拷贝?

一般的数据从网络到磁盘,或者从磁盘到网络,都需要经过四次拷贝。比如说磁盘到网络,要经过:

  • 磁盘到内核缓冲区
  • 内核缓存区到应用缓冲区
  • 应用缓冲区到内核缓存区
  • 内核缓存区到网络缓冲区

而零拷贝省去了第二步和第三步:

直接在内核中完成了数据的拷贝,无需要再经过一次应用缓冲区的中转。

Kafka 利用了两项零拷贝技术,mmapsendfile。前者是用于解决网络数据落盘的,Kafka 直接利用内存映射,完成了“写入操作”,对于 Kafka 来说,完成了网络缓冲区到磁盘缓冲区的“写入”,之后强制调用 flush或者等操作系统自动刷盘(有参数控制)。Java 提供了 FileChannelMappedByteBuffer两项技术来实现 mmap

sendfile是另外一种零拷贝实现,主要解决磁盘到网络的数据传输(数据推送)。操作系统读取磁盘数据到内存缓冲,直接丢过去socket buffer,而后发送出去。很多中间件,例如 Nignx, tomcat 都采用了类似的技术。

4.kafka 的 ISR 是什么?如何工作的?

ISR 又称为分区同步队列,Kafka 为每个主分区维护了一个 ISR,处于 ISR 的分区意味着与主分区保持了同步(所以主分区也在 ISR 里面)。

Producer 写入消息的时候,需要等 ISR 里面分区的确认,当 ISR 确认之后,就被认为消息已经提交成功了。

ISR 里面的分区会定时从主分区里面拉取数据,如果长时间未拉取,或者数据落后太多(已被取消),分区会被移出 ISRISR 里面分区已经同步的偏移量被称为 LEO(Log End Offset),最小的 LEO 称为 HW(木桶最短的那块板),也就是消费者可以消费的最新消息。

当主分区挂掉的时候,会从 ISR 里面选举一个新的主分区出来。

写入消息的时候, 我们可以在 Producer 里面控制 ACK 机制。Producer 可以配置成三种:

  1. Producer 发出去就算成功;
  2. Producer 发出去,主分区写入本地磁盘就算成功;
  3. Producer 发出去,ISR 所有的分区都写入磁盘,就算成功;

三种配置的性能依次下降,但是可靠性依次上升。

因为 ISR 里面包含了主分区,也就是说,如果整个 ISR 只有主分区,那么全部写入就退化为主分区写入。所以在可靠性要求非常高的情况下,我们要求 ISR 中分区不能少于三个。该参数可以在 Broker 中配置(min.insync.replicas

ISR 的同步机制和其它中间件机制也是类似的,在涉及主从同步的时候都要在性能和可靠性之间做取舍。通常的选项都是:

  1. 主写入就认为成功
  2. 主写入,至少一个从写入就认为成功;
  3. 主写入,大部分从库写入就认为成功(一般“大部分”是可以配置的,从这个意义上来说,2 和 3 可以合并为一点);
  4. 主写入,所有从库写入就认为成功;

而“写入”也会有不同语义:

  1. 中间件写到日志缓存就认为写入了;
  2. 中间件写入到系统缓存(page cache)就认为写入了;
  3. 中间件强制刷新到磁盘(发起了 fsync)就认为写入了;

都是性能到可靠性的取舍。

5.什么时候分区会被移出 ISR?

在旧版本中是两种情况:

  • 消息落后太多,由参数 replica.lag.max.messages 进行控制,在 0.9 以后就已经取消了

    为什么要删除? 因为这个参数本身很难给出一个合适的值。以默认的值 4000 为例,对于消息流入速度很低的主题(比如 TPS 为 10 ),这个参数就没什么用;对于消息流入速度很高的主题(比如 TPS 为 2000 ),这个参数的取值又会引入 ISR 的频繁变动( ISR 需要在 Zookeeper 中维护)。所以从 0.9x 版本开始,Kafka 就彻底移除了这一个参数。

  • 分区长时间没有发起fetch请求,由参数 replica.lag.time.max.ms 控制。

基本上,除非是新的 Broker,否则几乎都是由网络、磁盘 IOGC 引起的,大多数情况下,是负载过高导致的

这两个参数,设置过小会导致 ISR 频繁变化,过大会导致可靠性降低,存在数据丢失的风险。

6.Kafka 的负载均衡策略有哪些?

一般来说有两种,一种是轮询,即 Producer 轮流挑选不同的 Partition;另外一种是 Hash 取余,这要求我们提供 Key

选取 Hash key 时, 一般会根据业务 id 取余,或者拼接某几个字段在一起。这样一来可以将业务上的同类数据发往同一个 partition (也保证了顺序),缺点在于可能会导致某个分区的数据特别多,类似 redis 的热点 key 问题。

这些负载均衡其实都只考虑了发送端,而没有考虑到 consumer 的负载,假如消息分发已经很均匀了,但不同消费者之间的消费速度不一致,也有可能引起问题。

7.为什么 Kafka 的从 Partition 不能读取?

首先是 Kafka 自身的限制,即 Kafka 强制要求一个 Partition 只能有一个 Consumer,因此 Consumer 天然只需要消费主 Partition 就可以。

那么假如说 Kafka 放开这种限制,比如说有多个 Consumer,分别从主 Partition 和从 Partition 上读取数据,那么会出现一个问题:即偏移量如何同步的问题。例如一个 ConsumerPartition A 读取了 0 - 100 的消息,那么另外一个 ConsumerPartition B 上读取,就只能读取 100 之后的数据。那么 Kafka 就需要在不同的 Partition 之间协调这个已读取偏移量。而这是分布式一致性的问题,难以解决。

MySQL 的主从模式比起来,并没有这种问题,即 MySQL 不需要进行类似偏移量的协商。

而从另外一个角度来说,Kafka 的读取压力是远小于 MySQL 的,毕竟一个 Topic,是不会有特别多的消费者的。并且 Kafka 也不需要支持复杂查询,所以完全没必要读取从 Partition 的数据。

8.为什么 Kafka 在消费者端采用了拉(PULL)模型?

采用拉模型的核心原因在于,消费者的消费速率不同。在拉模型之下,消费者自己消费完毕就自己再去拉一批,那么这种速率是由消费者自己控制的,所需要的控制信息也是由消费者自己保存的。而采用推模型,就意味着中间件要和消费者就速率问题进行协商,否则容易导致要么推送过快,要么推送过慢的问题。

推模型的一个极大的好处是避免竞争,例如在多个消费者拉同一主题的消息的时候,就需要保证,不同消费者不会引起并发问题。而 Kafka 不会有类似的问题,因为 Kafka 限制了一个 Partition 只能有一个消费者,所以拉模型反而更加合适。

9.分区过多问题如何解决

对于 Producer 来说,它采用的是批量发送的机制,那么分区数量多的话,就需要消耗大量的内存来维护这些缓存的消息。同时,也增大了数据丢失的风险。

对于 Consumer 来说,分区数量多意味着要么部署非常多的实例,要么开启非常多的线程,无论是哪一种方案,都是开销巨大。

对于 Broker 来说,分区特别多而对应的 Broker 数量又不足的话,那么意味着一个 Broker 上分布着大量的分区,那么一次宕机就会引起 Kafka 延时猛增。同时,每一个分区都要求 Broker 开启三个句柄,那么会引起 Broker 上的文件句柄被急速消耗,可能导致程序崩溃。还要考虑到,Kafka 虽然采用了顺序写,但是这是指在一个分区内部顺序写,在多个分区之间,是无法做到顺序写的。

(注意,对于 Broker 来说,如果你的集群规模非常大,以至于虽然有一万个分区,但是每个 Broker 上只有寥寥几个分区,那么分区数量对 Broker 来说是没影响的。我们这里的讨论,都是建立在我一个 Broker 上放了很多分区的基础上)

(分区数量和性能的关系类似一个二次函数,随着分区增长会慢慢变好,但是到达一个临界点之后,就会开始衰退)

解决

  • 增加 Broker,确保 Broker 上不会存在很多的分区。这可以避免 Broker 上文件句柄数量过多,顺序写退化为随机写,以及宕机影响范围太大的问题。
  • 其次可以考虑拆分 Topic 并且部署到不同的集群。(光是拆分没啥用, 还得加 broker

10.Rebalance 问题

rebanlance 通常发生在以下两种情况:

  • Topic 或者分区的数量变化(例如增加新的分区)
  • 消费者数量变化(加入或者退出)。超时又可以细分为两个:一个是消费超时(max.poll.interval.ms),一个是心跳超时(session.timeout.ms

rebanlance 的步骤:

  • 新的消费者向协调者上报自己的订阅信息;
  • 协调者强制别的消费者发起一轮 rebalance,上报自己的订阅信息;
  • 协调者从消费者中挑选一个 leader,注意这里是挑选了消费者中的 leader
  • 协调者将订阅信息发给 leader,让 leader 来制作分配方案;
  • leader 上报自己的方案;
  • 协调者同步方案给别的消费者

rebanlance 会导致的问题

  1. 重复消费:如果在消费者已经消费了,但是还没提交,这个时候发生了 rebalance,那么别的消费者可能会再一次消费;
  2. 影响性能:rebalance 的过程,一般是在几十毫秒到上百毫秒。这个过程会导致集群处于一种不稳定状态中,影响消费者的吞吐量;

如何避免 rebalance?

首先,Topic 或者分区变化,引起 rebalance 是无法避免的,因为一般都是因为业务变化引起的。比如说,随着流量增加,我们要增加分区。

能够避免的就是防止消费者出现消费超时或者心跳超时。消费超时可以增大 max.poll.interval.ms 参数,避免被协调者踢掉。或者优化消费逻辑,使得消费者能够快速消费,拉取下一批消息

11. 事务消息问题

Kafka 的事务消息语义是:一次发送多条,要么一起成功要么一起失败。不是业务处理结果,与消息发送成功、失败绑定这种事务语义(rocketmq)。

rocketmq 的事务消息逻辑:

  • 发送半消息,此时内容对于消费者不可见,
  • 查询本地事务执行结果
  • 如果本地事务执行成功(提交半消息),如果执行失败(标记半消息为删除)。如果查询超时(重试)

要点:rocketmq 的半消息与普通消息其实无异,只是包装了一下,放在了特殊的队列中(不可见)提交的时候再发送到真正的队列中(延时消息有同样的思路!!!)

rocketmq 的事务消息执行流程:

0%