diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala index fa1d4b5d8c..c80b5ef3b5 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala @@ -469,11 +469,14 @@ class ElasticReplicaManager( // 1) fetch request does not want to wait // 2) fetch request does not require any data // 3) has enough data to respond - // 4) some error happens while reading data - // 5) we found a diverging epoch - // 6) has a preferred read replica - if (!remoteFetchInfo.isPresent && (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData || - hasDivergingEpoch || hasPreferredReadReplica)) { + // 4) does not have enough data to respond but it's a catch-up read (this means we reached + // the end of a segment) + // 5) some error happens while reading data + // 6) we found a diverging epoch + // 7) has a preferred read replica + if (!remoteFetchInfo.isPresent && (params.maxWaitMs <= 0 || fetchInfos.isEmpty + || (bytesReadable >= params.minBytes || (bytesReadable < params.minBytes && !ReadHint.isFastRead)) + || errorReadingData || hasDivergingEpoch || hasPreferredReadReplica)) { val fetchPartitionData = logReadResults.map { case (tp, result) => val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId) tp -> result.toFetchPartitionData(isReassignmentFetch)