Skip to content

Commit

Permalink
fix(metadata): ensure that the consumer offset is deleted along with …
Browse files Browse the repository at this point in the history
…the deletion of the topic (#1262)

Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored May 15, 2024
1 parent a10e0de commit 33b7b82
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 62 deletions.
5 changes: 1 addition & 4 deletions automq-shell/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ plugins {
id 'java'
}

group = 'org.apache.kafka'
version = '3.8.0-SNAPSHOT'

project(':automq-shell') {
archivesBaseName = "automq-shell"
}
Expand Down Expand Up @@ -57,7 +54,7 @@ test {
}

jar {
enabled false
dependsOn(':s3stream:jar', ':clients:shadowJar')
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
from {
configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,57 +326,59 @@ class BrokerMetadataPublisher(
private def handleTopicsDelta(deltaName: String, topicsDelta: TopicsDelta, delta: MetadataDelta,
newImage: MetadataImage): Unit = {
// Callback for each topic delta.
def callback(topicDelta: TopicDelta, partition: Int): Unit = {
if (Topic.GROUP_METADATA_TOPIC_NAME.equals(topicDelta.name())) {
try {
// Handle the case where the group metadata topic was deleted
if (topicsDelta.topicWasDeleted(Topic.GROUP_METADATA_TOPIC_NAME)) {
val partitionRegistration = topicsDelta.image.getTopic(Topic.GROUP_METADATA_TOPIC_NAME).partitions.get(partition)
if (partitionRegistration != null && partitionRegistration.leader == brokerId) {
groupCoordinator.onResignation(partition, OptionalInt.of(partitionRegistration.leaderEpoch))
}
}
def callback(topicPartition: TopicPartition): Unit = {
// Handle the case where the group metadata topic was deleted
if (topicsDelta.topicWasDeleted(Topic.GROUP_METADATA_TOPIC_NAME)) {
val partitionRegistration = topicsDelta.image.getTopic(Topic.GROUP_METADATA_TOPIC_NAME).partitions.get(topicPartition.partition())
if (partitionRegistration != null && partitionRegistration.leader == brokerId) {
groupCoordinator.onResignation(topicPartition.partition(), OptionalInt.of(partitionRegistration.leaderEpoch))
}
}

// Update the group coordinator of local changes
updateCoordinator(
topicDelta,
partition,
groupCoordinator.onElection,
(partitionIndex, leaderEpochOpt) => groupCoordinator.onResignation(partitionIndex, toOptionalInt(leaderEpochOpt))
)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
s"coordinator with local changes in $deltaName", t)
// Handle the case where the transaction state topic was deleted
if (topicsDelta.topicWasDeleted(Topic.TRANSACTION_STATE_TOPIC_NAME)) {
val partitionRegistration = topicsDelta.image.getTopic(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions.get(topicPartition.partition())
if (partitionRegistration != null && partitionRegistration.leader == brokerId) {
groupCoordinator.onResignation(topicPartition.partition(), OptionalInt.of(partitionRegistration.leaderEpoch))
}
}

if (Topic.TRANSACTION_STATE_TOPIC_NAME.equals(topicDelta.name())) {
try {
// Handle the case where the transaction state topic was deleted
if (topicsDelta.topicWasDeleted(Topic.TRANSACTION_STATE_TOPIC_NAME)) {
val partitionRegistration = topicsDelta.image.getTopic(Topic.TRANSACTION_STATE_TOPIC_NAME).partitions.get(partition)
if (partitionRegistration != null && partitionRegistration.leader == brokerId) {
groupCoordinator.onResignation(partition, OptionalInt.of(partitionRegistration.leaderEpoch))
}
getTopicDelta(topicPartition.topic(), newImage, delta).foreach(topicDelta => {
if (Topic.GROUP_METADATA_TOPIC_NAME.equals(topicDelta.name())) {
try {
// Update the group coordinator of local changes
updateCoordinator(
topicDelta,
topicPartition.partition(),
groupCoordinator.onElection,
(partitionIndex, leaderEpochOpt) => groupCoordinator.onResignation(partitionIndex, toOptionalInt(leaderEpochOpt))
)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
s"coordinator with local changes in $deltaName", t)
}
}

// Update the transaction coordinator of local changes
updateCoordinator(
topicDelta,
partition,
txnCoordinator.onElection,
txnCoordinator.onResignation)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating txn " +
s"coordinator with local changes in $deltaName", t)
if (Topic.TRANSACTION_STATE_TOPIC_NAME.equals(topicDelta.name())) {
try {
// Update the transaction coordinator of local changes
updateCoordinator(
topicDelta,
topicPartition.partition(),
txnCoordinator.onElection,
txnCoordinator.onResignation)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating txn " +
s"coordinator with local changes in $deltaName", t)
}
}
}
})

try {
// Notify the group coordinator about deleted topics.
if (topicsDelta.deletedTopicIds().contains(topicDelta.id())) {
if (topicsDelta.topicWasDeleted(topicPartition.topic())) {
groupCoordinator.onPartitionsDeleted(
util.List.of(new TopicPartition(topicDelta.name(), partition)),
util.List.of(new TopicPartition(topicPartition.topic(), topicPartition.partition())),
RequestLocal.NoCaching.bufferSupplier)
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.utils.{ThreadUtils, Time}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicDelta, TopicsDelta}
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants
import org.apache.kafka.server.common.{DirectoryEventHandler, MetadataVersion}
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsConstants._
Expand Down Expand Up @@ -939,7 +939,7 @@ class ElasticReplicaManager(
* @param newImage The new metadata image.
*/
override def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = {
asyncApplyDelta(delta, newImage, (_, _) => {}).get()
asyncApplyDelta(delta, newImage, _ => {}).get()
}

/**
Expand All @@ -950,10 +950,10 @@ class ElasticReplicaManager(
* @return A future which completes when the partition has been deleted.
*/
private def doPartitionDeletionAsyncLocked(stopPartition: StopPartition): CompletableFuture[Void] = {
doPartitionDeletionAsyncLocked(stopPartition, null, null, (_, _) => {})
doPartitionDeletionAsyncLocked(stopPartition, _ => {})
}

private def doPartitionDeletionAsyncLocked(stopPartition: StopPartition, delta: TopicsDelta, newImage: MetadataImage, callback: (TopicDelta, Int) => Unit): CompletableFuture[Void] = {
private def doPartitionDeletionAsyncLocked(stopPartition: StopPartition, callback: TopicPartition => Unit): CompletableFuture[Void] = {
val prevOp = partitionOpMap.getOrDefault(stopPartition.topicPartition, CompletableFuture.completedFuture(null))
val opCf = new CompletableFuture[Void]()
val tracker = partitionStatusTracker.tracker(stopPartition.topicPartition)
Expand All @@ -975,9 +975,7 @@ class ElasticReplicaManager(
s"we got an unexpected ${e.getClass.getName} exception: ${e.getMessage}")
}
}
if (delta != null && newImage != null) {
getTopicDelta(stopPartition.topicPartition.topic(), newImage, delta).foreach(callback(_, stopPartition.topicPartition.partition()))
}
callback(stopPartition.topicPartition)
} finally {
opCf.complete(null)
partitionOpMap.remove(stopPartition.topicPartition, opCf)
Expand All @@ -990,21 +988,13 @@ class ElasticReplicaManager(
opCf
}

def getTopicDelta(topicName: String, newImage: MetadataImage, delta: TopicsDelta): Option[TopicDelta] = {
Option(newImage.topics().getTopic(topicName)).flatMap {
topicImage => Option(delta).flatMap {
topicDelta => Option(topicDelta.changedTopic(topicImage.id()))
}
}
}

/**
* Apply a KRaft topic change delta.
*
* @param delta The delta to apply.
* @param newImage The new metadata image.
*/
def asyncApplyDelta(delta: TopicsDelta, newImage: MetadataImage, callback: (TopicDelta, Int) => Unit): CompletableFuture[Void] = {
def asyncApplyDelta(delta: TopicsDelta, newImage: MetadataImage, callback: TopicPartition => Unit): CompletableFuture[Void] = {
// Before taking the lock, compute the local changes
val localChanges = delta.localChanges(config.nodeId)
val metadataVersion = newImage.features().metadataVersion()
Expand All @@ -1029,7 +1019,7 @@ class ElasticReplicaManager(
def doPartitionDeletion(): Unit = {
stateChangeLogger.info(s"Deleting ${deletes.size} partition(s).")
deletes.foreach(stopPartition => {
val opCf = doPartitionDeletionAsyncLocked(stopPartition, delta, newImage, callback)
val opCf = doPartitionDeletionAsyncLocked(stopPartition, callback)
opCfList.add(opCf)
})
}
Expand Down Expand Up @@ -1065,7 +1055,7 @@ class ElasticReplicaManager(
applyLocalLeadersDelta(leaderChangedPartitions, newImage, delta, lazyOffsetCheckpoints, leader, directoryIds)
tracker.opened()
// Apply the delta before elect leader.
getTopicDelta(tp.topic(), newImage, delta).foreach(callback(_, tp.partition()))
callback(tp)
// Elect the leader to let client can find the partition by metadata.
if (info.partition().leader < 0) {
// The tryElectLeader may be failed, tracker will check the partition status and elect leader if needed.
Expand Down

0 comments on commit 33b7b82

Please sign in to comment.