【Kafka源码解读和使用指南】第42篇:Kafka日志存储源码解析(一)——消息是怎么被写入磁盘的
上一篇【第41篇】Kafka API层源码解析——KafkaApisBroker的总调度室下一篇【第43篇】Kafka日志存储源码解析二——Segment分段存储的精妙设计摘要前四篇文章我们解析了Kafka服务端的网络层和API层知道了请求是怎么从客户端到达Broker、又是怎么被分发的。从本文开始我们深入Kafka最核心的模块——日志存储层。Kafka之所以能支撑百万级吞吐量很大程度上归功于它的存储设计顺序写磁盘 分段管理 稀疏索引。本文作为日志存储系列的开篇聚焦消息是怎么被写入磁盘的这一核心问题从Topic-Partition-Segment的目录结构讲起深入FileMessageSet.append()和Log.append()的源码实现并用图示解释为什么顺序写磁盘能比内存随机写还快。一、Kafka日志的目录结构——从Topic到磁盘文件在深入代码之前我们必须先搞清楚Kafka在磁盘上怎么组织消息数据的。1.1 逻辑层次Topic → Partition → Segment【Kafka日志文件组织结构】 Kafka数据目录 (log.dirs/data/kafka-logs) │ ├── order_events-0/ ← Topicorder_events, Partition0 │ ├── 00000000000000000000.log ← Segment 0 的日志文件 │ ├── 00000000000000000000.index ← Segment 0 的索引文件 │ ├── 00000000000000001024.log ← Segment 1 的日志文件 (baseOffset1024) │ ├── 00000000000000001024.index ← Segment 1 的索引文件 │ └── ... │ ├── order_events-1/ ← Topicorder_events, Partition1 │ ├── 00000000000000000000.log │ ├── 00000000000000000000.index │ └── ... │ └── user_behavior-0/ ← 另一个Topic ├── 00000000000000000000.log └── ...1.2 核心概念对应表概念对应磁盘实体说明Topic逻辑概念无直接对应目录消息的分类标签Partition磁盘上的一个目录topic_partition_id/一个分区对应一个Log对象Segment目录下的一组文件[baseOffset].log[baseOffset].index默认1GB一切防止单文件过大Message写入.log文件的一条记录包含offset、size、CRC32、key、value1.3 为什么要用Segment分段一个Partition的所有消息如果只存在一个文件里会有什么问题【单文件 vs 分段文件对比】 ❌ 单文件方案 partition-0/ └── huge.log (500GB!) → 问题1文件太大索引无法全放内存 → 问题2清理旧数据时只能全文件遍历 → 问题3崩溃恢复时需要扫描整个文件 ✅ Kafka分段方案 partition-0/ ├── 00000000.log (1GB) ├── 00000000.index ├── 00001024.log (1GB) ├── 00001024.index └── ... → 优点1旧Segment可以独立删除不影响写入 → 优点2崩溃恢复只需扫描最新的几个Segment → 优点3索引文件大小固定可以mmap进内存二、顺序写磁盘——快得违反直觉在讲源码之前必须理解一个反直觉的事实磁盘顺序写的性能可以超过内存随机写。2.1 顺序写 vs 随机写性能对比【磁盘I/O性能对比】 吞吐量 │ 顺序写磁盘 ● │ ~600 MB/s (Kafka的做法) │ │ 内存随机读写 ● │ ~50~200 MB/s (有Cache Miss) │ 磁盘随机写 ● │ ~100 KB/s ~ 1 MB/s │ └───────────────────── 时间操作类型吞吐量延迟原因磁盘顺序写~600 MB/s很低磁头不需要频繁寻道磁盘随机写~100 KB/s极高每次写都要移动磁头寻道时间5~10ms内存顺序访问~20 GB/s极低CPU Cache友好内存随机访问~200 MB/s中等Cache Miss导致访问主存2.2 Kafka的顺序写是怎么做到的Kafka的核心设计原则所有消息只追加append-only绝不修改已有数据。【Kafka写入方式纯顺序写】 磁盘盘面简化视图 ┌─────────────────────────────────────────┐ │ [msg1][msg2][msg3][msg4][msg5]... │ │ ↑ │ │ └── 永远只在末尾追加不回退修改 │ └─────────────────────────────────────────┘ ↑ 磁头只需一次寻道之后一直顺序写对比传统数据库的随机写【传统数据库写入随机写】 磁盘盘面 ┌─────────────────────────────────────────┐ │ [row1] [row3] [row2] │ │ ↓ ↓ ↓ │ │ 磁头到处跳每次写都要寻道 │ └─────────────────────────────────────────┘2.3 Page Cache——Kafka的隐形加速器Kafka写入时并不直接调fsync()而是依赖操作系统的Page Cache页缓存【Kafka写入的数据流】 生产者发送消息 │ ▼ ┌────────────────────────────────────┐ │ Kafka Broker │ │ │ │ FileMessageSet.append() │ │ │ │ │ ▼ │ │ FileChannel.write() ───► 不调fsync() │ │ │ │ │ ▼ │ │ ┌─────────────────┐ │ │ │ OS Page Cache │ ◄── 数据先到这里 │ │ │ (内存) │ │ │ └────────┬────────┘ │ │ │ │ │ [操作系统异步刷盘] │ │ │ │ │ ▼ │ │ 磁盘持久化 │ └────────────────────────────────────┘优点写入性能 内存写入速度风险如果还没刷盘就宕机最近几秒的数据会丢失控制参数log.flush.interval.messages和log.flush.interval.ms通常保持默认值即可现代操作系统Page Cache已经足够可靠三、FileMessageSet源码解析——直接操作日志文件FileMessageSet是Kafka中直接对应磁盘上.log文件的类。3.1 FileMessageSet的核心字段// FileMessageSet.scala (简化版)classFileMessageSetprivate[log](valfile:File,// 对应的磁盘文件private[log]valchannel:FileChannel,// 用于读写文件的通道private[log]valstart:Int,// 分片的起始位置非分片时为0private[log]valend:Int,// 分片的结束位置非分片时为Int.MaxValueisSlice:Boolean// 是否为日志文件的分片)extendsMessageSetwithLogging{// ★核心使用AtomicInteger保证线程安全volatileprivatevar_size:AtomicInteger_// 文件预分配NTFS/老版Linux文件系统可以提升写入性能if(preallocate){valrandomAccessFilenewRandomAccessFile(file,rw)randomAccessFile.setLength(initFileSize)}}3.2 FileMessageSet.append()——消息写入的核心// FileMessageSet.scaladefappend(messages:ByteBufferMessageSet):Unit{// ★关键将ByteBufferMessageSet中的全部数据写入FileChannelvalwrittenmessages.writeFullyTo(channel)// 原子更新文件大小_size.getAndAdd(written)// ★重要Kafka不主动调fsync()依赖OS Page Cache// 数据的持久化由操作系统异步完成}3.3 ByteBufferMessageSet.writeFullyTo()——真正的写入操作// ByteBufferMessageSet.scaladefwriteFullyTo(channel:GatheringByteChannel):Int{buffer.mark()// 记录当前positionvarwritten0// ★循环写入直到所有字节都写入Channelwhile(writtensizeInBytes){writtenchannel.write(buffer)}buffer.reset()// 重置position不影响后续使用written}【append()写入流程图解】 ByteBufferMessageSet (内存中的数据) │ ▼ writeFullyTo(channel) │ ▼ ┌──────────────────────────────────────┐ │ while(written sizeInBytes): │ │ written channel.write(buffer) │ │ │ │ [msg1][msg2][msg3]... │ │ │ │ │ ▼ │ │ FileChannel.write() │ │ │ │ │ ▼ │ │ ┌──────────────┐ │ │ │ OS Page Cache │ │ │ └──────────────┘ │ │ │ │ │ ▼ (异步) │ │ 磁盘文件 .log │ └──────────────────────────────────────┘四、Log.append()——分区级别的写入入口FileMessageSet负责单个Segment的写入而Log类负责整个Partition多个Segment的写入管理。4.1 Log类的核心字段// Log.scala (简化版)classLog(valdir:File,// 分区对应的目录volatileprivatevar_segments:ConcurrentNavigableMap[Long,LogSegment],cleanupPolicy:CleanupPolicy,// 清理策略delete 或 compactconfig:LogConfig)extendsLogging{// ★活跃Segment当前正在写入的SegmentprivatedefactiveSegmentsegments.lastEntry.getValue// LEO (Log End Offset)下一条消息的offsetvolatileprivatevarnextOffsetMetadata:LogOffsetMetadata_}4.2 Log.append()的完整流程// Log.scala (核心逻辑经过简化)defappend(records:ByteBufferMessageSet,isFromClient:Booleantrue):LogAppendResult{// 步骤1验证消息CRC32、消息大小、Magic ValuevalappendInfoanalyzeAndValidateMessageSet(records)// 步骤2如果需要分配offset生产者已分配则跳过if(needsOffsetAssignment){assignOffsets(records,appendInfo)}// 步骤3★检查是否需要滚动新SegmentvalsegmentmaybeRoll(records.sizeInBytes)// 步骤4追加到当前活跃Segmentsegment.append(firstOffset,records)// 步骤5更新LEOupdateLogEndOffset(appendInfo.lastOffset1)// 步骤6★按需flush由log.flush.interval配置控制if(unflushedMessagesconfig.flushInterval){flush()}LogAppendResult(appendInfo,errorNone)}4.3 maybeRoll()——Segment滚动机制【Segment滚动判断条件】 maybeRoll() 被调用 │ ▼ ┌──────────────────────────────────────┐ │ 条件1当前Segment大小 新消息 log.segment.bytes ? │ │ (默认1GB) │ │ YES → 需要滚动 ✓ │ │ │ │ 条件2当前Segment创建时间 log.roll.ms ?│ │ (默认7天) │ │ YES → 需要滚动 ✓ │ │ │ │ 条件3索引文件满了 │ │ YES → 需要滚动 ✓ │ │ │ │ 三个条件都不满足 → 不滚动继续用当前Segment│ └──────────────────────────────────────┘五、WALWrite-Ahead Log思想在Kafka中的体现Kafka的日志存储本质上是一种WAL预写日志模式【WAL模式图解】 传统数据库WAL 1. 写WAL日志顺序写快 2. 更新内存中的B树随机写慢...但已在内存中 3. 定期Checkpoint将内存刷到磁盘 Kafka的WAL变体 1. 消息直接append到.log文件顺序写快 2. 不修改已写入的数据不可变简单 3. 通过Segment分段 索引文件实现快速查找WAL特性传统数据库Kafka顺序写日志✓✓先写日志再更新数据✓N/AKafka日志即数据崩溃恢复重放WAL重放未flush的消息日志不可变✓✓六、性能数据对比——Kafka为什么这么快【Kafka写入性能实测数据参考值】 场景 吞吐量 ───────────────────────────────────────── 单Broker3个Partition ~100,000 msg/s 单Broker顺序写磁盘 ~600 MB/s (吞吐量瓶颈在磁盘) 3个Broker集群 ~300,000 msg/s 加上压缩(Snappy) ~200,000 msg/s (CPU瓶颈) 对比RabbitMQ写入性能 单节点 ~20,000 msg/s (主要瓶颈在内存复制fsync)本篇小结本文从Kafka的日志文件组织结构出发深入解析了消息是如何被写入磁盘的文件结构Topic → Partition目录 → Segment文件组(.log .index)分段设计让清理和恢复更高效顺序写磁盘Kafka坚持append-only模式避免了磁盘随机写的性能陷阱顺序写性能可超过600MB/sPage Cache加速Kafka依赖操作系统页缓存而非主动fsync()在保证性能的同时兼顾了可靠性FileMessageSet.append()核心写入方法通过FileChannel.write()将内存中的ByteBufferMessageSet写入磁盘文件Log.append()分区级别的写入入口负责验证消息、分配offset、滚动Segment、追加数据WAL思想Kafka的日志存储本质上是WAL模式的一个精妙变体日志即数据不可变设计极大简化了实现下一篇文章我们将深入LogSegment——Kafka日志分段存储的精妙设计看看Segment是如何管理.log和.index文件的。上一篇【第41篇】Kafka API层源码解析——KafkaApisBroker的总调度室下一篇【第43篇】Kafka日志存储源码解析二——Segment分段存储的精妙设计