哪个网站有做电箱电柜的图纸,网站建设之织梦模板,开家做网站公司有哪些,wordpress 创建表系列文章目录
上手第一关#xff0c;手把手教你安装kafka与可视化工具kafka-eagle Kafka是什么#xff0c;以及如何使用SpringBoot对接Kafka 架构必备能力——kafka的选型对比及应用场景 Kafka存取原理与实现分析#xff0c;打破面试难关 系列文章目录一、主题与分区1. 模型…系列文章目录
上手第一关手把手教你安装kafka与可视化工具kafka-eagle Kafka是什么以及如何使用SpringBoot对接Kafka 架构必备能力——kafka的选型对比及应用场景 Kafka存取原理与实现分析打破面试难关 系列文章目录一、主题与分区1. 模型2. 消息与分发 二、分区内数据的存储1. 消息的存储① 偏移量与日志文件② 索引的构成 2. 消息的读取① 消费偏移量的存储②Compaction策略③查找并读取消息 3. 快速存取实现 总结 在前面的几篇内容中我们依次讲了Kafka的安装、与Spring Boot的结合还有选型与应用场景。但是笔者也知道对于很多小伙伴来说原理及实现才算重头戏而且也是面试热点那么本次我们先来进行存取原理的分析当然抱着疑问去学习才是最快的因此在开始之前我也先抛出一些Kafka的重点与热点问题希望大家在学习过程中能总结印证
Kafka为什么吞吐量这么高Kafka的数据存与取有什么特点 作者简介战斧从事金融IT行业有着多年一线开发、架构经验爱好广泛乐于分享致力于创作更多高质量内容 本文收录于 kafka 专栏有需要者可直接订阅专栏实时获取更新 高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新欢迎指导 Zookeeper Redis dubbo docker netty等诸多框架以及架构与分布式专题即将上线敬请期待 一、主题与分区
1. 模型
我们其实在《架构必备能力——kafka的选型对比及应用场景》 一文中其实讲到了Kafka的模型我们这里再把老图拿出来用一遍 不难看出逻辑上的源头就是主题也即Topic而主题又划分为多个分区。我们先来谈谈主题与分区的实现在Kafka中可以使用以下命令来声明一个主题并指定分区
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic my-topic其中
–create: 声明一个新的主题。 –zookeeper: 指定 ZooKeeper 的地址和端口号。 –replication-factor: 指定副本因子即每个分区在集群中的副本数量。这里指定为1表示每个分区只有一个副本。 –partitions: 指定分区数。这里指定为4表示该主题有4个分区。 –topic: 指定主题名称这里为 my-topic。 注意如果要指定分区数量必须在创建主题的时候指定之后无法更改。因此在创建主题时应该仔细考虑分区数量以满足业务需求。
当然如果有同学还记得前面的内容应该知道我们在对接Spring Boot时并没有提前建立主题而是直接使用了。其中的原因是我们在Spring Boot中使用Kafka如果在发送消息时指定的主题不存在Kafka会自动创建该主题。在创建时Kafka将使用默认的分区数量通常为1以及默认的副本因子通常为1来创建分区。
2. 消息与分发
然后当我们发布者向某个主题发送消息时其就会被“分发”到某一个分区里 那么有小伙伴肯定会问 Kafka的主题消息会进哪个分区是我们可以决定的吗默认是进入哪个分区 答案是Kafka的主题消息可以由生产者自己决定要发送到哪个分区也可以使用Kafka提供的默认分区分配算法来自动决定消息要进入哪个分区。 指定分区如果生产者自己决定要发送到哪个分区可以在发送消息时指定消息要发送到的分区编号。此时如果指定的分区编号存在则消息会被发送到该分区如果指定的分区编号不存在则会抛出异常。 自动分区如果使用默认的分区分配算法Kafka提供了多种分配算法例如轮询Round-Robin、随机Random、哈希Hash等。默认情况下Kafka使用哈希算法将消息均匀地分配到所有可用的分区中
当然在此之前我们可以看下KafkaTemplate前面提供的API 不难知道Kafka消息除了指明主题以外还由以下要素组成
消息的key是一个可选项用于标识消息的唯一性和分区。如果不指定key则会随机分配一个key并将消息发送到随机的分区。消息的value是消息的实际内容也是必填项。消息的时间戳是可选项用于标识消息的时间戳。Kafka可以根据时间戳来处理消息的顺序、分配和延迟。消息的分区指定消息应该发送到哪个分区。如果不指定分区则使用默认的分区器来决定分区。
二、分区内数据的存储
从逻辑上来说kafka的分区是一个消息队列当我们发送的消息经由分区器进行分发后就会进入某个分区并被顺序的保存下来。在实现上Kafka的分区更像一个日志记录系统把消息当作日志顺序的写入磁盘
1. 消息的存储
我们需要知道Kafka中每个分区被组织为一组日志段Log Segment其中每个日志段都包含了一个连续的消息序列。当一个日志段被写满后它将被关闭并分配一个更高的编号新的消息将被追加到一个新的日志段中。而日志段的核心又由两个部分组成索引文件index file和数据文件data file 数据文件: 也叫日志文件数据文件是消息分区的核心部分它是以追加方式写入的文件。当有新的消息写入分区时Kafka会根据协议、消息头、消息体等信息将消息封装成字节流然后追加写入数据文件。 索引文件: 索引文件是一个不可变的有序映射它将消息偏移量映射到数据文件中的位置。当一个消费者读取一个分区的消息时它会使用偏移量读取索引文件中的位置并从该位置读取数据文件中的消息。
如下图就是我们上期发送了一条消息而建立的目录test_topic-0代表该目录是test_topic主题下的 0 号分区可以看到里面的 index文件 和 log 文件 ① 偏移量与日志文件
要想更深入的了解我们必须先解释一下kafka中消息偏移量offset 的概念当一条记录需要写入分区的时候它会被追加到 log 文件的末尾同时会被分配一个唯一的序号称为 Offset偏移量。Offset 是一个递增的、不可变的数字由 Kafka 自动维护。需要注意的是在后续内容中我们还会提到各种不同的偏移量请注意区分不要混淆了
由于Offset 初始值为 0所以当第一条消息达到分区后就会建立起 00000000000000000000.log 这样的文件来进行消息的存储后续消息将会在这个文件内追加写入直到文件大小超出限制其默认值为1GB
举个例子当第170411个消息Offset 170410来到时发现 00000000000000000000.log 已经超过了 1 G此时其就会新创建一个日志段同时以本offset为名新建一个日志文件命名为 0000000000000170410.log此时本分区就形成了两个日志段情况如下 ② 索引的构成
我们上面讲了 .log 文件也即数据文件的创建机制。但是还没讲段的另一个组成部分也即索引文件。索引其实就像字典的目录是帮助大家快速找到某条消息的工具索引文件存储的内容主要就是 消息偏移量offset 与 消息存储地址position 的映射关系。
Kafka的索引文件由多个索引条目index entry组成每个索引条目包含两个核心字段
offset消息的偏移量这里是相对偏移量每个索引文件都以0起始其对应的真实偏移量为段初始偏移 本offsetposition消息在日志文件中的磁盘位置相对偏移量偏移量仅适用于对应的日志文件 需要注意的是不是每一条消息都会有索引。这里有参数 index.interval.bytes 的控制其默认值为 4 KB即表示当前分区 log 文件写入了 4 KB 数据后才会在索引文件中增加一个索引条目
2. 消息的读取
现在我们已经存储了一些数据下面就要开始读取了我们目前掌握了这些文件那么怎么才能找到并读取消息呢
① 消费偏移量的存储
我们不难理解每个消费者负责需要消费分配给它的分区上的消息并记录自己在每个分区上消费的最新偏移量。对于消费者而言怎么知道自己应该要消费哪个offset的消息消费者可以通过以下两种方式记录消费的偏移量 手动提交偏移量消费者在消费消息时可以手动调用 consumer.commitSync() 或 consumer.commitAsync() 方法将消费的偏移量提交到 Kafka 中。该方法接收的参数表示要提交的偏移量的值提交后Kafka 会将该偏移量记录到内部的偏移量管理器中。 自动同步提交偏移量消费者可以将 enable.auto.commit 参数设置为 true开启自动提交偏移量的功能。启用该功能后Kafka 会自动记录消费者消费过的最新偏移量并定期将其定期提交到 Kafka 中。
但不管怎么样这个消费的偏移量最终都是由kafka来进行保存的那么其具体的存储是怎么实现的呢Kafka其实提供了将给定消费者组的所有偏移存储在一个叫做组协调器group coordinator的组件。 通过官方文档不难看出当组协调器收到偏移量变动的请求时会将对应数据存储在内置的主题 __consumer_offsets 中在旧版本中偏移量是存在ZK中的我们可以在ZK中看到这个主题的情况 在我们的本地目录中也能看到这个 __consumer_offsets 主题一共建了50个分区默认 当然它分区的个数可以在Kafka服务器配置文件中通过参数offsets.topic.num.partitions 进行配置。
当我们以某个消费者组消费掉某条消息并提交偏移量后偏移量会被提交到 __consumer_offsets Topic的一个特定分区该分区由所消费的主题和消费者组的哈希值决定。在我的例子里是被提交到了 __consumer_offsets-45如下 ②Compaction策略
相信你会对这种存储消费位置的方式有所困惑因为按照我们前面的说法Kafka的内容都是以日志形式存储的在使用的过程中日志岂不是会越来越大到最后找一次偏移量都很麻烦这就不得不提到Kafka中的Compaction策略
compaction是一种保留最后N个版本的消息的消息清理策略它保留特定键的最新值同时删除无用的键值从而减少存储空间。具体来说Compaction会保留每个消息主题中最新的一组键值对并删除所有键相同但值较旧的消息。
使用Compaction策略需要满足以下条件
消息的键必须是唯一的消息的键必须是可序列化的消息必须按照键进行划分消息的存储时间必须足够长以便新消息可以替换旧消息
而这些消费偏移量的数据存储的内容如下 key group.idtopic分区号 value offset 的值 这样就导致某个消费组在某个分区的消费数据只会有一条所以找起来并没有那么复杂
③查找并读取消息
上面我们讲了消费偏移量的存储其实查找偏移量的过程也是一样的同一个消费组会先从特定的 __consumer_offsets 拿取偏移量拿到偏移量以后比如偏移量是 170417我们仍以上面的文件情况为例那么它找到消息的逻辑如下 首先用二分查找确定它是在哪个Segment文件中其中0000000000000000000.index为最开始的文件第二个文件为0000000000000170410.index起始偏移为1704101 170411而第三个文件为0000000000000239430.index起始偏移为2394301 239431。所以这个offset 170417就落在第二个文件中。其他后续文件可以依此类推以起始偏移量命名并排列这些文件然后根据二分查找法就可以快速定位到具体文件位置。 用该offset减去索引文件的编号即170417 - 170410 7也用二分查找法找到索引文件中等于或者小于7的最大的那个编号。可以看出我们能够找到[4476]这组数据476即offset170410 4 170414的消息在log文件中的偏移量。 打开数据文件0000000000000170410.log从位置为476的那个地方开始顺序扫描直到找到offset为170417的那条Message。
总结来说就是通过二分法先找到index文件然后再在index文件中通过二分法找到某一条索引条目然后根据该索引条目给出的地址去log文件中快速定位最后从这个定位开始顺序扫描下去直到找到我们指定的偏移量数据
3. 快速存取实现
我们上面讲了Kafka的一大堆的奇特设计不知道小伙伴们是否产生过疑问比如为什么一个主题要分成多个分区 一个分区为什么要划成多个段以及为什么把数据存储成日志格式 其实这些都是在优化性能我们从快速存取的角度讲一下Kafka都做了哪些努力【面试重点】 多分区负载均衡Kafka支持将一个主题的数据分散至多个分区不同分区位于多个broker节点上实现了集群负载均衡从而提高了写入和读取的性能。 分段存储Kafka会将数据分段存储每个段的大小和时间可以根据需求进行配置这样可以提高读取性能并减少删除操作对IO的影响。 批量写入Kafka允许客户端一次性写入多条消息到broker减少了网络传输的时间。 零拷贝Kafka使用mmap映射磁盘上的文件到虚拟内存空间然后通过直接内存访问Direct Memory Access的方式将数据从磁盘读取到内存中还使用sendfile系统调用来实现网络发送时的零拷贝这样网络数据也可以直接从内核空间中发送避免了数据拷贝到用户空间的过程。 异步刷盘Kafka支持异步刷盘即将消息写入日志后不会立即将数据从内存刷入磁盘而是会缓存一段时间再批量写入磁盘减少了磁盘I/O的次数提高了写入性能。 稀疏索引Kafka会为每个段维护一个索引以便在读取数据时快速定位到所需数据的位置。这样可以避免全盘扫描提高数据读取性能。但如果每个消息都写进索引会导致索引文件臃肿且降低存储速度所以采用了稀疏索引的方式
如果你按照《Kafka是什么以及如何使用SpringBoot对接Kafka》中的动手操作过我们可以继续来做个实现我们先看一下log文件如下 然后我们把发送的代码改成如下这样一次发送1000条消息注意我们在这里还加上了 kafkaTemplate.flush()因为当使用Kafka Template发送消息时消息并不会立即发送到Kafka Broker而是会被缓存在Kafka Template中以减少通信次数如果我们需要立即发送这时候就可以使用kafkaTemplate.flush()方法来实现立即发送。
Service
public class KafkaService {Autowiredprivate KafkaTemplateString, String kafkaTemplate;public void sendMessage(String message) {for (int i 0; i 1000; i) {kafkaTemplate.send(another_topic2, 0,key,message i);}kafkaTemplate.flush();System.out.println(we have send message);}
}但当我们发送消息成功输出 we have send message 并又成功接收到消息后如图 我们却会看到 log 文件的大小没有发生变化即便是不停的刷新目录也无济于事 然而如果我们单击并右键选中该文件就会看到该文件被更新且大小发生变动 这就说明了其写入硬盘的过程是异步且有延迟的使用了操作系统的延迟写入delayed write机制。但其传输数据却可以脱离硬盘使用内存缓存作为收发介质直接实现传达 总结
今天我们详细讲解了消息在kafak中的存与取也介绍了不少细节点知道了Kafka采用批量传输设计减少网络访问次数然后用分区、分段、追加日志等方案来提高吞吐量并且利用了操作系统的零拷贝、异步刷盘等方式来减少磁盘写入的瓶颈最终成为了一款性能优异、吞吐量极大的中间件。希望通过今天的学习能对大家有所帮助我们将在后面继续讲解kafka的其他实现细节。如果你对此有兴趣可以直接订阅本 kafka 专栏