From 3aa21a35a8b6e45a4de81a0d0de14013ffcaa28c Mon Sep 17 00:00:00 2001 From: Robin Han Date: Wed, 24 Apr 2024 10:23:02 +0800 Subject: [PATCH] fix(issues1140): fix S3ObjectControlManager NPE Signed-off-by: Robin Han --- .../kafka/controller/stream/S3ObjectControlManager.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index 9918f9b2cc..0d9c4f5341 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -300,8 +300,13 @@ public ControllerResult checkS3ObjectsLifecycle() { } // check the mark destroyed objects List requiredDeleteKeys = new LinkedList<>(); + List notExistObjects = new LinkedList<>(); for (Long objectId : this.markDestroyedObjects) { S3Object object = this.objectsMetadata.get(objectId); + if (object == null) { + notExistObjects.add(objectId); + continue; + } if (object.getMarkDestroyedTimeInMs() + (this.config.objectRetentionTimeInSecond() * 1000L) < System.currentTimeMillis()) { // exceed delete retention time, trigger the truly deletion requiredDeleteKeys.add(object.getObjectKey()); @@ -310,6 +315,9 @@ public ControllerResult checkS3ObjectsLifecycle() { break; } } + // markDestroyedObjects isn't Timeline structure, so it may contains dirty / duplicated objectId + notExistObjects.forEach(this.markDestroyedObjects::remove); + if (!requiredDeleteKeys.isEmpty()) { this.lastCleanStartTimestamp = System.currentTimeMillis(); this.lastCleanCf = this.objectCleaner.clean(requiredDeleteKeys);