Skip to content

Commit

Permalink
fix(log): fix consumer offsets load missing (#1602)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Jul 20, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 05e530e commit 54bc277
Showing 2 changed files with 21 additions and 13 deletions.
6 changes: 6 additions & 0 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
@@ -1115,6 +1115,9 @@ private[log] class Cleaner(val id: Int,
}

val fetchDataInfo = src.read(src.baseOffset, Integer.MAX_VALUE)
if (fetchDataInfo == null) {
return
}
for (batch <- fetchDataInfo.records.batches().asScala) {
checkDone(topicPartition)
val records = MemoryRecords.readableRecords(batch.asInstanceOf[DefaultRecordBatch].buffer())
@@ -1148,6 +1151,9 @@ private[log] class Cleaner(val id: Int,
stats: CleanerStats): Boolean = {
val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
val fetchDataInfo = segment.read(startOffset, Integer.MAX_VALUE, Long.MaxValue)
if (fetchDataInfo == null) {
return false
}
for (batch <- fetchDataInfo.records.batches().asScala) {
checkDone(topicPartition)
throttler.maybeThrottle(batch.sizeInBytes())
28 changes: 15 additions & 13 deletions core/src/main/scala/kafka/log/streamaspect/ElasticLogSegment.scala
Original file line number Diff line number Diff line change
@@ -177,22 +177,24 @@ class ElasticLogSegment(val _meta: ElasticStreamSegmentMeta,
if (maxSize < 0)
return CompletableFuture.failedFuture(new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log"))
// Note that relativePositionInSegment here is a fake value. There are no 'position' in elastic streams.
val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, (startOffset - this.baseOffset).toInt)
if (maxSize == 0) {
return CompletableFuture.completedFuture(FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = true))

// if the start offset is already off the end of the log, return null
if (startOffset >= _log.nextOffset()) {
return CompletableFuture.completedFuture(null)
}
// Note that 'maxPosition' and 'minOneMessage' are not used here. 'maxOffset' is a better alternative to 'maxPosition'.
// 'minOneMessage' is also not used because we always read at least one message ('maxSize' is just a hint in ES SDK).
val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, (startOffset - this.baseOffset).toInt)

val adjustedMaxSize =
if (minOneMessage) math.max(maxSize, 1)
else maxSize

// return a log segment but with zero size in the case below
if (adjustedMaxSize == 0)
return CompletableFuture.completedFuture(FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY))

_log.read(startOffset, maxOffset, maxSize)
.thenApply(records => {
if (ReadHint.isReadAll() && records.sizeInBytes() == 0) {
// After topic compact, the read request might be out of range. Segment should return null and log will retry read next segment.
null
} else {
// We keep 'firstEntryIncomplete' false here since real size of records may exceed 'maxSize'. It is some kind of
// hack but we don't want to return 'firstEntryIncomplete' as true in that case.
FetchDataInfo(offsetMetadata, records)
}
FetchDataInfo(offsetMetadata, records)
})
}

0 comments on commit 54bc277

Please sign in to comment.