主要内容:

  • 对Kafka的日志(Log)、日志段(LogSegment)以及索引(Index)源码进行尝试性分析。

日志段及其相关代码是 Kafka 服务器源码中最为重要的组件代码之一。

1 Kafka 日志结构概览

Kafka 日志在磁盘上的组织架构:

Kafka 日志对象由多个日志段对象组成,而每个日志段对象会在磁盘上创建一组文件:

  • 消息日志文件(.log)
  • 位移索引文件(.index)
  • 时间戳索引文件(.timeindex)
  • 已中止(Aborted)事务的索引文件(.txnindex)

图中的一串数字 0 是该日志段的起始位移值(Base Offset),也就是该日志段中所存的第一条消息的位移值。

2 日志段源码介绍

位置:core/src/main/scala/kafka/log/LogSegment.scala,该文件定义了三个对象:

  • LogSegment class
  • LogSegment object
  • LogFlushStats object

源码对日志段的描述:

A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in any previous segment.

指明了日志段主要由两部分组成:日志和索引文件。每个日志段都有一个起始位移值(Base Offset),而该位移值是此日志段所有消息中最小的位移值,同时,该值却又比前面任何日志段中消息的位移值都大。

日志段类声明

class LogSegment private[log] (val log: FileRecords,
                               val lazyOffsetIndex: LazyIndex[OffsetIndex],
                               val lazyTimeIndex: LazyIndex[TimeIndex],
                               val txnIndex: TransactionIndex,
                               val baseOffset: Long,
                               val indexIntervalBytes: Int,
                               val rollJitterMs: Long,
                               val time: Time) extends Logging { … }

根据LogSegment的定义,一个日志段包含的信息:

  • FileRecords:实际保存Kafka消息的对象;
  • lazyOffsetIndex:位移索引文件,使用了延迟初始化;
  • lazyTimeIndex:时间戳索引文件,使用了延迟初始化;
  • txnIndex:已中止事务索引文件,Kafka事务相关;
  • baseOffset:起始位移,磁盘上看到的文件名就是 baseOffset 的值;
  • indexIntervalBytes:也是Broker 端参数 log.index.interval.bytes 值,控制日志段对象新增索引项的频率。默认情况下,日志段至少新写入 4KB 的消息数据才会新增一条索引项;
  • rollJitterMs:日志段对象新增倒计时的“扰动值“。因为目前 Broker 端日志段新增倒计时是全局设置,这就是说,在未来的某个时刻可能同时创建多个日志段对象,这将极大地增加物理磁盘 I/O 压力。有了 rollJitterMs 值的干扰,每个新增日志段在创建时会彼此岔开一小段时间,这样可以缓解物理磁盘的 I/O 负载瓶颈;
  • time :用于统计计时的一个实现类。

接下来介绍日志段的几个重要方法。

append方法

参数:

  • largestOffset:待写入的一个批次的消息的最大位移值
  • largestTimestamp:待写入的一个批次的消息的最大时间戳
  • shallowOffsetOfMaxTimestamp:待写入的一个批次的消息的最大时间戳对应消息的位移
  • records:待写入的消息集合

我将自己的理解标注在了源码里:

def append(largestOffset: Long,
             largestTimestamp: Long,
             shallowOffsetOfMaxTimestamp: Long,
             records: MemoryRecords): Unit = {
    // 1 判断待写入的消息集合是否为空,不为空进行下面操作
    if (records.sizeInBytes > 0) {
      trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
            s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
      // 2 判断当前日志段是否为空
      // 如果是空,则记录要写入消息集合的最大时间戳,并将其作为后面新增日志段倒计时的依据
      val physicalPosition = log.sizeInBytes()
      if (physicalPosition == 0)
        rollingBasedTimestamp = Some(largestTimestamp)
      // 3 判断输入参数最大位移值是否合法,标准是largestOffset-baseOffset是否在整数范围内
      ensureOffsetInRange(largestOffset)

      // 4 如果合法,调用FileRecords的append方法执行真正的写入
      val appendedBytes = log.append(records)
      trace(s"Appended $appendedBytes to ${log.file} at end offset $largestOffset")
      // 5 更新当前日志段的最大时间戳和对应消息的offset
      if (largestTimestamp > maxTimestampSoFar) {
        maxTimestampSoFar = largestTimestamp
        offsetOfMaxTimestampSoFar = shallowOffsetOfMaxTimestamp
      }
      // 6 判断是否需要新增索引项,标准:已写入字节数是否超过4KB
      if (bytesSinceLastIndexEntry > indexIntervalBytes) {
        // 当已写入字节数超过了 4KB 之后,append方法会调用索引对象的append方法新增索引项,同时清空已写入字节数,以备下次重新累积计算
        offsetIndex.append(largestOffset, physicalPosition)
        timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
        bytesSinceLastIndexEntry = 0
      }
      // 若不需要新增索引项,则更新日志段已写入的字节数
      bytesSinceLastIndexEntry += records.sizeInBytes
    }
  }

append方法的完整执行流程:

read方法

  • startOffset:要读取的第一条消息的位移

  • maxSize:能读取的最大字节数

  • maxPosition :能读到的最大文件位置

  • minOneMessage:是否允许在消息体过大时至少返回第一条消息,确保不出现消费饿死的情况

def read(startOffset: Long,
           maxSize: Int,
           maxPosition: Long = size,
           minOneMessage: Boolean = false): FetchDataInfo = {
    // 1 判断待读取的最大字节数是否合法
    if (maxSize < 0)
      throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")
    // 2 调用translateOffset方法定位要读取的起始文件位置
    val startOffsetAndSize = translateOffset(startOffset)

    // 如果该位置已经在日志末尾,返回空
    if (startOffsetAndSize == null)
      return null

    val startPosition = startOffsetAndSize.position
    // 计算offst的元信息
    val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
    // 3 计算调整后的能读取的最大字节数
    // 如果该值为空,则返回空
    val adjustedMaxSize =
      if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
      else maxSize

    if (adjustedMaxSize == 0)
      return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)

    // 4 根据计算出来的起始文件的位置,以及能读到的最大文件位置和调整后的能读到的最大字节数计算出要读取的总字节数
    // maxSize=100,maxPosition=300,startPosition=250,那么read方法只能读取50字节,因为maxPosition - startPosition = 50
    val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)
    // 5 调用FileRecords的slice方法,从指定位置读取指定大小的消息集合
    FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
      firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
  }

下图展示了 read 方法的完整执行逻辑:

总结:

  • 对 Kafka 日志段源码进行了重点的分析,包括日志段的 append 方法、read 方法。
  • append 方法:分析了源码是如何写入消息到日志段的。重点关注一下写操作过程中更新索引的时机是如何设定的。
  • read 方法:分析了源码底层读取消息的完整流程。重点关注 Kafka 计算待读取消息字节数的逻辑,也就是 maxSize、maxPosition 和 startOffset 是如何共同影响 read 方法的。