聊聊kafka

PS: 本文主要讨论的是kafka的消费和读取的简单层面. 更多深入的细节还需要读者根据兴趣点去深入研究.

Kafka是什么

Apache Kafka® 是 一个分布式流处理平台


划重点:

  1. 分布式:
    • 可靠性
    • 一致性
    • 高性能

后续会适当的分析在这些点上.Kafka是如何保证的.

  1. 流处理平台:
    1. 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。因为这个特性所以我们现在经常会将它充当为消息队列来使用.
    2. 可以储存流式的记录,并且有较好的容错性。
    3. 可以在流式记录产生时就进行处理。

> 因为这些个特性. Kafka会被经常的用在以下2个场景中
  • 消息队列->本文今天的讨论范围
  • 流处理,构建实时流式应用程序,通过kafka 提供的stream API进行流的操作与组合

    全局架构&相关的概念

  1. 总体概览

image.png

  • Kafka作为集群运行在一个或者多个机器上.
  • 四大核心的API
    •  Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。
    •  Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。
    •  Streams API 允许一个应用程序作为一个_流处理器_,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。
    • Connector API 允许构建并运行可重用的生产者或者消费者, 可以很简单的快速定义 connectors 将大量数据从 Kafka 移入和移出. Kafka Connect 可以摄取数据库数据或者收集应用程序的 metrics 存储到 Kafka topics

作为队列来用总体架构图:
image.png


上图包含的概念:
get

  1. Producer, 消息生产者
  2. Broker, kafka的服务器节点.可以是单个或者多个.
  3. Consumer, 消息的消费者
  4. ZK, 管理和存储Kafka的一些meta信息. 比如topic, broker 之类.

往更细一些去看. 在一个broker节点中. 每个消息生产和消费如下面这样:

image.png


上面又出现了一些新的概念:

  • **Topic,**每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的. 这个概念是对消息逻辑上的聚合. 实际我们应该掌握的,对消息的存储和读取至少需要了解到Partition这个级别
  • **Partition: **parttion 是物理存储上的概念,每个topic都会包含一个或者多个parttion. 这些分区之间是由一个leader以及多个follow来进行协同工作. 后文中会提到
  • **replica 同步: **如字面意思. 是partition的副本,同步leader的数据至各follower中
  • **leader: **在一个同步关系中. follower只会从leader中同步数据. 消费者也只会同leader打交道. 它承载了实际的写入和消费. follower作为leader的备份使用.
  • **follower: **参考上面leader的描述. 实际生产环境中需要去衡量有多少个follower. 因为过多的同步会带来性能上的损失. 因为kafka确认一条消息被commit是通过知道所有的leader都被同步了才会确认.
  • **Consumer group: **消费者一般来说都是以消费着组的形式去提现的. 消费者组会以一些meta的形式体现在kafka的zk配置中. 现在有部分是体现在集群自身上.比如offset的信息

> 结合搭建的集群. 讲下基本的配置. 以及目录结构

数据的存储与读取

命令行demo

1
2
3
4
5
6
7
8
//创建
kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
//描述
kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
//生产
kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
//消费
kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

简单的消息生产与消费Demo

参考文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.example.kafkademo;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;

import javax.annotation.Resource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@SpringBootApplication
@Slf4j
public class KafkaDemoApplication implements CommandLineRunner {

@Resource
private KafkaTemplate<String, String> template;

private final CountDownLatch latch = new CountDownLatch(3);

public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}


@Override
public void run(String... args) throws Exception {
this.template.send("xxx", "123");
this.template.send("xxx", "456");
this.template.send("xxx", "789");
latch.await(60, TimeUnit.SECONDS);
log.info("All received");
}

@KafkaListener(topics = "xxx")
public void listen(ConsumerRecord<?, ?> cr) throws Exception {
log.info("receive msg: {}", cr.toString());
latch.countDown();
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
ListenableFuture<SendResult<K, V>> sendDefault(V data);

ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, V data);

ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

ListenableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

// Flush the producer.

void flush();

interface ProducerCallback<K, V, T> {

T doInKafka(Producer<K, V> producer);

}

kafka如何存储


上文提到了一个基本的消息发送和接收的Demo. 对于Kafka来说. 是如何处理这个过程的呢?


在上文和demo中我们都看到了一个概念. topic & parttion. 也知道topic是对parttion的逻辑聚合.

那么为什么kafka要分区?

  • 分区的目的

如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题,Parttion分布式分布在集群的各个机器上. 最大化的避免了单节点带来的IO瓶颈

  • 分区的特性

> IOPS
  1. 顺序读取&写入

顺序读取&写入的性能要远高于随机读取&写入. 即使在SSD上这种差异也是比较的明显. 主要原因还是因为操作系统和磁盘的工作原理. 一次完整的硬盘读取时间主要消耗在哪?

  • 寻道时间 : 每片磁盘都是由很多磁道组成. 硬盘在定位需要的数据的时候需要将磁头先定位到这个磁道上. 这个时间是寻道时间.
  • 旋转延迟: 定位到具体的磁道后,通过盘片的旋转.将磁头定位到数据的起始扇区
  • 读取时间: 读取数据到内存中

整个时间应该是在10MS朝上. 并且大部分的时间是消耗在寻道上.
顺序读写可以认为是连续的读取一个大文件. 而随机读写可以认为是随机的去读取很多文件. 对于顺序读写来说. 磁盘的寻道时间就会非常的短. 因为只有一次. 所以大部分时间都花在了数据读取上. 相比于随机读写. 要省掉了很多的寻道时间和旋转延迟.这对于一个机械盘来说性能提升是巨大的.
另外,对于现代的磁盘来说. 当读取一块数据的时候很多时候都会有一个”预读”的动作. 就是会读取相邻扇区的数据. 这不是一个多余的功能. 一个扇区的数据量是固定的. 对于现在的应用程序来说. 当我们读取一个扇区的时候 很大概率也会需要相邻扇区的数据. 而随机读取的时候就会浪费了这个预读的功能.

  1. 水平扩展.

上面提到了分区的目的. 分区对于整个集群来说. 理论上来说可以无限扩展.

如何分区?

多少分区合适? : 一般来说分区数量不要超过机器数. 实际需要根据业务以及压测场景来看.

写入哪个分区?


观看上面的template的send方法.结合源码, 我们可以看到有以下的几个关键点:

  • 只指定了topic

根据配置的DefaultPartitioner 或者RoundRobinPartitioner来决定落到哪个分区,

  - 黏性分区
  • 指定了 key

通过实现自定义的Partitioner或者上述的方式来实现分区选取.如果没有自定义的Partitioner那么通过简单的key进行hash来进行分区计算.

  • 指定了parttion和key

优先根据指定的partition去选择分区.如果这个值为null, 参考上面的分区选择策略

如何写入?

请看时序图:




image.png

  1. producer 先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader
  2. producer 将消息发送给该 leader
  3. leader 将消息写入本地 log
  4. followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
  5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK

数据的磁盘存储结构?

比如mytopic_test这个topic我们分了4个区, 那在具体的log中表现的形式就是4个目录

1
2
3
4
drwxr-xr-x 2 root root 4096 Apr 15 13:21 mytopic_test-0
drwxr-xr-x 2 root root 4096 Apr 15 13:21 mytopic_test-1
drwxr-xr-x 2 root root 4096 Apr 15 13:21 mytopic_test-2
drwxr-xr-x 2 root root 4096 Apr 15 13:21 mytopic_test-3

观看每个目录结构如下:

1
2
3
4
5
6
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

宏观点看. 我们的结构表现形式是这样:
image.png


其中index是索引文件. log是以segment为单位进行存储的日志文件. 当满足如下几个条件中的其中之一,就会触发文件的切分:

  1. 当前日志分段文件的大小超过了 broker 端参数 log.segment.bytes 配置的值。log.segment.bytes 参数的默认值为 1073741824,即 1GB。
  2. 当前日志分段中消息的最大时间戳与当前系统的时间戳的差值大于 log.roll.mslog.roll.hours 参数配置的值。如果同时配置了 log.roll.mslog.roll.hours 参数,那么 log.roll.ms 的优先级高。默认情况下,只配置了 log.roll.hours 参数,其值为168,即 7 天。
  3. 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数 log.index.size.max.bytes 配置的值。log.index.size.max.bytes 的默认值为 10485760,即 10MB。
  4. 追加的消息的偏移量与当前日志分段的偏移量之间的差值大于 Integer.MAX_VALUE,即要追加的消息的偏移量不能转变为相对偏移量。

关于数据的持久性和可靠性保证?

措施1: 通过follower来同步leader的数据.

所有的follower都会通过pull的方式去拉取leader上的数据.拉取成功会给leader发送ack. 上文中提到了ISR的概念.


什么是ISR(In-Sync Replicas), 其实就是字面的意思同步副本队列. 一条消息被leader接受后, 会同步的被ISR列表中的其他节点pull到自己的节点上. 这个在某种程度上来看极大的增强了数据的可靠性. 但是凡事有好也有弊端. 这么做会对吞吐量形成一定的影响 . 生产上需要衡量这个参数的配置.


Leader只能从ISR列表中被选出来. 具体参考后面的选举机制


措施2: 当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:

  • 1(默认):producer 发送数据到 leader,leader 写本地日志成功,返回客户端成功;此时 ISR 中的其它副本还没有来得及拉取该消息,如果此时 leader 宕机了,那么此次发送的消息就会丢失。

    因为follow可能还未来得及同步数据. leader身份也没了. 就会导致数据被截断到HW位置.

  • 0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。

    无需确认显然效率最高. 但是没有ack的消息无法保证是否落盘

  • -1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据的一致性

    极端场景比如这样: ISR中有一个follower还没来得及同步消息. Leader挂了, 这时候会出现2个情况. 如果这个没同步的节点被选举为leader, 那么数据不会重复,如果是其他已经同步的节点被选为leader. 因为生产方一直收不到ack.会触发重新发送. 就会导致重复数据.

如果要提高数据的可靠性,在设置request.required.acks=-1的同时,也要min.insync.replicas这个参数(可以在broker或者topic层面进行设置)的配合

kafka是如何控制消息的消费.

消费者层面


kafka提供了2套API
1. The high-level Consumer API
2. The SimpleConsumer API


第二种方式不再我们今天范围内. 此种方案一般适用场景是你极度的需要客制化的需求. 比如对 parttion offset 甚至 broker以及 leader的控制.
为了实现这些控制你可能需要大量的编码来进行这样的控制工作.


传统的消费模式如下:
image.png
        kafka 消费者有消费者组的概念,当生产者将数据发布到topic时,消费者通过pull的方式,定期从服务器拉取数据,当然在pull数据的时候,服务器会告诉consumer可消费的消息offset。
**
**PS: 所有消费者读取的数据都是从leader分区去读取数据. **
Why pull?
每个消费者的消费能力只有自己清楚. 如果采用push的模式. 可能会导致消费能力跟不上导致消息堆积 . 这些设计点都是kafka高吞吐的基本保证.

对于每个消费组. 是如何控制消费线程消费哪个分区的呢?


消费者可能随时会增加和减少. 不管是哪个动作发生了都会触发consumer rebalance

0.9x后引入了Group Coordinator 的概念. 现在已经是一个核心的不可或缺的一环. 可以参考 它主要的功能:

  1. 维护在组中的consumer心跳
  2. 新的消费者加入后重新rebalance
  3. 消费者离线后重新rebanlance


PS:

  1. 如果消费线程大于 patition 数量,则有些线程将收不到消息
  2. 如果 patition 数量大于线程数,则有些线程多收到多个 patition 的消息
  3. 如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,kafka顺序性只对parttion生效

如何控制读? 是否收到kafka的ack后就可以读了呢?

前面提到一个概念HW, 与之对应的其实还有一个概念叫LEO
HW: HighWatermark  高水位
LEO: LogEndOffset 就是标识最后一条消息的位置
看下图描述
image.png

思考: 如果Follower没同步完成. leader挂了怎么办?

读取的过程如何?

回到上面的目录结构.如果要读取170418这条offset的记录如何读取?
image.png

  • 通过二分法定位在哪个index和log中
  • 通过index定位offset对应的log的物理地址.
  • 通过物理地址去读取具体的log


思考: 如何知道在log中读取多少数据呢. 怎么才能确定一条数据被读完?

选举机制

一个基本的原则就是,如果 leader 挂掉,新的 leader 必须拥有原来的 leader 已经 commit 的所有消息,这不就是 ISR 中副本的特征吗?


前面提到ISR. 与之对应的还有一个叫OSR.

replica.lag.time.max.ms 这个参数决定了ISR和OSR的关系

当一个follower在持续pull的速度变慢, 超过replica.lag.time.max.ms这个时间的时候就会被拉入OSR中. 而所有的选举都是在ISR中进行的.

会考虑少数服从多数的选举模式吗?


这种模式下,如果我们有2f+1个副本,那么在commit之前必须保证有f+1个replica复制完消息,同时为了保证能正确选举出新的leader,失败的副本数不能超过f个.
优势: 系统的延迟取决于最快的几台机器,
劣势: 如果要更多的副本容错性. 就需要大量的副本. 因为必须接受坏掉一半后. 还能选举出一个新的leader. 过多的副本又会涉及到同步带来性能上的影响.


kafka的措施:
基本思路是按照AR(Assigned Replicas=isr+osr)集合中副本的顺序查找第一个存活的副本,并且这个副本在ISR集合中。一个分区的AR集合在分配的时候就被指定.
并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的ISR集合中副本的顺序可能会改变。注意这里是根据AR的顺序而不是ISR的顺序进行选举的。

还有一些会触发选举的点,但是大致的思路都如上:

  • 分区进行重分配
  • 发生优先副本选举
  • 某节点被优雅地关闭


如果ISR列表中的全部都挂了咋办?

  1. 等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
  2. 选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。


默认是第二种方式.来保证可用性. 也可以通过配置更改为第一种

遗留思考

  1. 如何保证消息的顺序性?

  2. 为何follower不承担数据读取服务?

    附录

  3. ZK的meta说明

image.png

参考资料


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!