聊聊kafka
PS: 本文主要讨论的是kafka的消费和读取的简单层面. 更多深入的细节还需要读者根据兴趣点去深入研究.
Kafka是什么
Apache Kafka® 是 一个分布式流处理平台
划重点:
- 分布式:
- 可靠性
- 一致性
- 高性能
后续会适当的分析在这些点上.Kafka是如何保证的.
- 流处理平台:
- 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。因为这个特性所以我们现在经常会将它充当为消息队列来使用.
- 可以储存流式的记录,并且有较好的容错性。
- 可以在流式记录产生时就进行处理。
> 因为这些个特性. Kafka会被经常的用在以下2个场景中
- 总体概览
- Kafka作为集群运行在一个或者多个机器上.
- 四大核心的API
- Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。
- Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。
- Streams API 允许一个应用程序作为一个_流处理器_,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。
- Connector API 允许构建并运行可重用的生产者或者消费者, 可以很简单的快速定义 connectors 将大量数据从 Kafka 移入和移出. Kafka Connect 可以摄取数据库数据或者收集应用程序的 metrics 存储到 Kafka topics
作为队列来用总体架构图:
上图包含的概念:
get
- Producer, 消息生产者
- Broker, kafka的服务器节点.可以是单个或者多个.
- Consumer, 消息的消费者
- ZK, 管理和存储Kafka的一些meta信息. 比如topic, broker 之类.
往更细一些去看. 在一个broker节点中. 每个消息生产和消费如下面这样:
上面又出现了一些新的概念:
- **Topic,**每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的. 这个概念是对消息逻辑上的聚合. 实际我们应该掌握的,对消息的存储和读取至少需要了解到Partition这个级别
- **Partition: **parttion 是物理存储上的概念,每个topic都会包含一个或者多个parttion. 这些分区之间是由一个leader以及多个follow来进行协同工作. 后文中会提到
- **replica 同步: **如字面意思. 是partition的副本,同步leader的数据至各follower中
- **leader: **在一个同步关系中. follower只会从leader中同步数据. 消费者也只会同leader打交道. 它承载了实际的写入和消费. follower作为leader的备份使用.
- **follower: **参考上面leader的描述. 实际生产环境中需要去衡量有多少个follower. 因为过多的同步会带来性能上的损失. 因为kafka确认一条消息被commit是通过知道所有的leader都被同步了才会确认.
- **Consumer group: **消费者一般来说都是以消费着组的形式去提现的. 消费者组会以一些meta的形式体现在kafka的zk配置中. 现在有部分是体现在集群自身上.比如offset的信息
> 结合搭建的集群. 讲下基本的配置. 以及目录结构
数据的存储与读取
命令行demo
1 | //创建 |
简单的消息生产与消费Demo
1 | package com.example.kafkademo; |
1 | ListenableFuture<SendResult<K, V>> sendDefault(V data); |
kafka如何存储
上文提到了一个基本的消息发送和接收的Demo. 对于Kafka来说. 是如何处理这个过程的呢?
在上文和demo中我们都看到了一个概念. topic & parttion. 也知道topic是对parttion的逻辑聚合.
那么为什么kafka要分区?
- 分区的目的
如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题,Parttion分布式分布在集群的各个机器上. 最大化的避免了单节点带来的IO瓶颈
- 分区的特性
> IOPS
- 顺序读取&写入
顺序读取&写入的性能要远高于随机读取&写入. 即使在SSD上这种差异也是比较的明显. 主要原因还是因为操作系统和磁盘的工作原理. 一次完整的硬盘读取时间主要消耗在哪?
- 寻道时间 : 每片磁盘都是由很多磁道组成. 硬盘在定位需要的数据的时候需要将磁头先定位到这个磁道上. 这个时间是寻道时间.
- 旋转延迟: 定位到具体的磁道后,通过盘片的旋转.将磁头定位到数据的起始扇区
- 读取时间: 读取数据到内存中
整个时间应该是在10MS朝上. 并且大部分的时间是消耗在寻道上.
顺序读写可以认为是连续的读取一个大文件. 而随机读写可以认为是随机的去读取很多文件. 对于顺序读写来说. 磁盘的寻道时间就会非常的短. 因为只有一次. 所以大部分时间都花在了数据读取上. 相比于随机读写. 要省掉了很多的寻道时间和旋转延迟.这对于一个机械盘来说性能提升是巨大的.
另外,对于现代的磁盘来说. 当读取一块数据的时候很多时候都会有一个”预读”的动作. 就是会读取相邻扇区的数据. 这不是一个多余的功能. 一个扇区的数据量是固定的. 对于现在的应用程序来说. 当我们读取一个扇区的时候 很大概率也会需要相邻扇区的数据. 而随机读取的时候就会浪费了这个预读的功能.
- 水平扩展.
上面提到了分区的目的. 分区对于整个集群来说. 理论上来说可以无限扩展.
如何分区?
多少分区合适? : 一般来说分区数量不要超过机器数. 实际需要根据业务以及压测场景来看.
写入哪个分区?
观看上面的template的send方法.结合源码, 我们可以看到有以下的几个关键点:
- 只指定了topic
根据配置的DefaultPartitioner
或者RoundRobinPartitioner
来决定落到哪个分区,
- 黏性分区
- 指定了 key
通过实现自定义的Partitioner或者上述的方式来实现分区选取.如果没有自定义的Partitioner
那么通过简单的key进行hash来进行分区计算.
- 指定了parttion和key
优先根据指定的partition去选择分区.如果这个值为null, 参考上面的分区选择策略
如何写入?
请看时序图:
- producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
- producer 将消息发送给该 leader
- leader 将消息写入本地 log
- followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
- leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
数据的磁盘存储结构?
比如mytopic_test这个topic我们分了4个区, 那在具体的log中表现的形式就是4个目录
1 | drwxr-xr-x 2 root root 4096 Apr 15 13:21 mytopic_test-0 |
观看每个目录结构如下:
1 | 00000000000000000000.index |
宏观点看. 我们的结构表现形式是这样:
其中index是索引文件. log是以segment为单位进行存储的日志文件. 当满足如下几个条件中的其中之一,就会触发文件的切分:
- 当前日志分段文件的大小超过了 broker 端参数
log.segment.bytes
配置的值。log.segment.bytes
参数的默认值为 1073741824,即 1GB。 - 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于
log.roll.ms
或log.roll.hours
参数配置的值。如果同时配置了log.roll.ms
和log.roll.hours
参数,那么log.roll.ms
的优先级高。默认情况下,只配置了log.roll.hours
参数,其值为168,即 7 天。 - 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数
log.index.size.max.bytes
配置的值。log.index.size.max.bytes
的默认值为 10485760,即 10MB。 - 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于
Integer.MAX_VALUE
,即要追加的消息的偏移量不能转变为相对偏移量。
关于数据的持久性和可靠性保证?
措施1: 通过follower来同步leader的数据.
所有的follower都会通过pull的方式去拉取leader上的数据.拉取成功会给leader发送ack. 上文中提到了ISR的概念.
什么是ISR(In-Sync Replicas), 其实就是字面的意思同步副本队列. 一条消息被leader接受后, 会同步的被ISR列表中的其他节点pull到自己的节点上. 这个在某种程度上来看极大的增强了数据的可靠性. 但是凡事有好也有弊端. 这么做会对吞吐量形成一定的影响 . 生产上需要衡量这个参数的配置.
Leader只能从ISR列表中被选出来. 具体参考后面的选举机制
措施2: 当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:
1(默认):producer 发送数据到 leader,leader 写本地日志成功,返回客户端成功;此时 ISR 中的其它副本还没有来得及拉取该消息,如果此时 leader 宕机了,那么此次发送的消息就会丢失。
因为follow可能还未来得及同步数据. leader身份也没了. 就会导致数据被截断到HW位置.
0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
无需确认显然效率最高. 但是没有ack的消息无法保证是否落盘
-1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据的一致性
极端场景比如这样: ISR中有一个follower还没来得及同步消息. Leader挂了, 这时候会出现2个情况. 如果这个没同步的节点被选举为leader, 那么数据不会重复,如果是其他已经同步的节点被选为leader. 因为生产方一直收不到ack.会触发重新发送. 就会导致重复数据.
如果要提高数据的可靠性,在设置request.required.acks=-1的同时,也要min.insync.replicas这个参数(可以在broker或者topic层面进行设置)的配合
kafka是如何控制消息的消费.
消费者层面
kafka提供了2套API
1. The high-level Consumer API
2. The SimpleConsumer API
第二种方式不再我们今天范围内. 此种方案一般适用场景是你极度的需要客制化的需求. 比如对 parttion offset 甚至 broker以及 leader的控制.
为了实现这些控制你可能需要大量的编码来进行这样的控制工作.
传统的消费模式如下:
kafka 消费者有消费者组的概念,当生产者将数据发布到topic时,消费者通过pull的方式,定期从服务器拉取数据,当然在pull数据的时候,服务器会告诉consumer可消费的消息offset。
**
**PS: 所有消费者读取的数据都是从leader分区去读取数据. **
Why pull?
每个消费者的消费能力只有自己清楚. 如果采用push的模式. 可能会导致消费能力跟不上导致消息堆积 . 这些设计点都是kafka高吞吐的基本保证.
对于每个消费组. 是如何控制消费线程消费哪个分区的呢?
消费者可能随时会增加和减少. 不管是哪个动作发生了都会触发consumer rebalance
0.9x后引入了Group Coordinator 的概念. 现在已经是一个核心的不可或缺的一环. 可以参考 它主要的功能:
- 维护在组中的consumer心跳
- 新的消费者加入后重新rebalance
- 消费者离线后重新rebanlance
PS:
- 如果消费线程大于 patition 数量,则有些线程将收不到消息
- 如果 patition 数量大于线程数,则有些线程多收到多个 patition 的消息
- 如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,kafka顺序性只对parttion生效
如何控制读? 是否收到kafka的ack后就可以读了呢?
前面提到一个概念HW
, 与之对应的其实还有一个概念叫LEO
HW: HighWatermark 高水位
LEO: LogEndOffset 就是标识最后一条消息的位置
看下图描述
思考: 如果Follower没同步完成. leader挂了怎么办?
读取的过程如何?
回到上面的目录结构.如果要读取170418这条offset的记录如何读取?
- 通过二分法定位在哪个index和log中
- 通过index定位offset对应的log的物理地址.
- 通过物理地址去读取具体的log
思考: 如何知道在log中读取多少数据呢. 怎么才能确定一条数据被读完?
选举机制
一个基本的原则就是,如果 leader 挂掉,新的 leader 必须拥有原来的 leader 已经 commit 的所有消息,这不就是 ISR 中副本的特征吗?
前面提到ISR. 与之对应的还有一个叫OSR.
replica.lag.time.max.ms 这个参数决定了ISR和OSR的关系
当一个follower在持续pull的速度变慢, 超过replica.lag.time.max.ms这个时间的时候就会被拉入OSR中. 而所有的选举都是在ISR中进行的.
会考虑少数服从多数的
选举模式吗?
这种模式下,如果我们有2f+1个副本,那么在commit之前必须保证有f+1个replica复制完消息,同时为了保证能正确选举出新的leader,失败的副本数不能超过f个.
优势: 系统的延迟取决于最快的几台机器,
劣势: 如果要更多的副本容错性. 就需要大量的副本. 因为必须接受坏掉一半后. 还能选举出一个新的leader. 过多的副本又会涉及到同步带来性能上的影响.
kafka的措施:
基本思路是按照AR(Assigned Replicas=isr+osr)集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中。一个分区的AR集合在分配的时候就被指定.
并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的ISR集合中副本的顺序可能会改变。注意这里是根据AR的顺序而不是ISR的顺序进行选举的。
还有一些会触发选举的点,但是大致的思路都如上:
- 分区进行重分配
- 发生优先副本选举
- 某节点被优雅地关闭
如果ISR列表中的全部都挂了咋办?
- 等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
- 选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。
默认是第二种方式.来保证可用性. 也可以通过配置更改为第一种
遗留思考
参考资料
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!