diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 7e74ced9a0..18402e5408 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -1289,7 +1289,7 @@ class KRaftClusterTest { () => admin.createTopics(newTopics).all().get()) assertNotNull(executionException.getCause) assertEquals(classOf[PolicyViolationException], executionException.getCause.getClass) - assertEquals("Unable to perform excessively large batch operation.", + assertEquals("Excessively large number of partitions per request.", executionException.getCause.getMessage) } finally { admin.close() diff --git a/gradle.properties b/gradle.properties index 88d63be550..0c14658608 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,7 +23,7 @@ group=org.apache.kafka # - streams/quickstart/pom.xml # - streams/quickstart/java/src/main/resources/archetype-resources/pom.xml # - streams/quickstart/java/pom.xml -version=3.9.0-SNAPSHOT +version=3.9.0 scalaVersion=2.13.14 # Adding swaggerVersion in gradle.properties to have a single version in place for swagger # New version of Swagger 2.2.14 requires minimum JDK 11. diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 93edecd925..7efc5386a6 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -172,6 +172,7 @@ */ public class ReplicationControlManager { static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000; + static final int MAX_PARTITIONS_PER_BATCH = 10_000; static class Builder { private SnapshotRegistry snapshotRegistry = null; @@ -687,6 +688,8 @@ ControllerResult createTopics( Map topicErrors = new HashMap<>(); List records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); + validateTotalNumberOfPartitions(request, defaultNumPartitions); + // Check the topic names. validateNewTopicNames(topicErrors, request.topics(), topicsWithCollisionChars); @@ -1284,6 +1287,34 @@ ControllerResult alterPartition( return ControllerResult.of(records, response); } + /** + * Validates that a batch of topics will create less than {@value MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch + * has led to out-of-memory exceptions. We use this validation to fail earlier to avoid allocating the memory. + * Validates an upper bound number of partitions. The actual number may be smaller if some topics are misconfigured. + * + * @param request a batch of topics to create. + * @param defaultNumPartitions default number of partitions to assign if unspecified. + * @throws PolicyViolationException if total number of partitions exceeds {@value MAX_PARTITIONS_PER_BATCH}. + */ + static void validateTotalNumberOfPartitions(CreateTopicsRequestData request, int defaultNumPartitions) { + int totalPartitions = 0; + for (CreatableTopic topic: request.topics()) { + if (topic.assignments().isEmpty()) { + if (topic.numPartitions() == -1) { + totalPartitions += defaultNumPartitions; + } else if (topic.numPartitions() > 0) { + totalPartitions += topic.numPartitions(); + } + } else { + totalPartitions += topic.assignments().size(); + } + + } + if (totalPartitions > MAX_PARTITIONS_PER_BATCH) { + throw new PolicyViolationException("Excessively large number of partitions per request."); + } + } + /** * Validate the partition information included in the alter partition request. * diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index b50d32e16b..bfc6440c4c 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -123,6 +123,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; @@ -573,6 +574,44 @@ public void configure(Map configs) { } } + @Test + public void testExcessiveNumberOfTopicsCannotBeCreated() { + // number of partitions is explicitly set without assignments + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(); + ReplicationControlManager replicationControl = ctx.replicationControl; + CreateTopicsRequestData request = new CreateTopicsRequestData(); + request.topics().add(new CreatableTopic().setName("foo"). + setNumPartitions(5000).setReplicationFactor((short) 1)); + request.topics().add(new CreatableTopic().setName("bar"). + setNumPartitions(5000).setReplicationFactor((short) 1)); + request.topics().add(new CreatableTopic().setName("baz"). + setNumPartitions(1).setReplicationFactor((short) 1)); + ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS); + PolicyViolationException error = assertThrows( + PolicyViolationException.class, + () -> replicationControl.createTopics(requestContext, request, Stream.of("foo", "bar", "baz").collect(Collectors.toSet()))); + assertEquals(error.getMessage(), "Excessively large number of partitions per request."); + } + + @Test + public void testExcessiveNumberOfTopicsCannotBeCreatedWithAssignments() { + CreateTopicsRequestData request = new CreateTopicsRequestData(); + request.topics().add(new CreatableTopic().setName("foo"). + setNumPartitions(-1).setReplicationFactor((short) 1)); + CreateTopicsRequestData.CreatableReplicaAssignmentCollection assignments = + new CreateTopicsRequestData.CreatableReplicaAssignmentCollection(); + assignments.add(new CreatableReplicaAssignment().setPartitionIndex(1)); + assignments.add(new CreatableReplicaAssignment().setPartitionIndex(2)); + request.topics().add(new CreatableTopic() + .setName("baz") + .setAssignments(assignments)); + PolicyViolationException error = assertThrows( + PolicyViolationException.class, + () -> ReplicationControlManager.validateTotalNumberOfPartitions(request, 9999) + ); + assertEquals(error.getMessage(), "Excessively large number of partitions per request."); + } + @Test public void testCreateTopics() { ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(); diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml index 3cf31f1288..78e1a70fa7 100644 --- a/streams/quickstart/java/pom.xml +++ b/streams/quickstart/java/pom.xml @@ -26,7 +26,7 @@ org.apache.kafka streams-quickstart - 3.9.0-SNAPSHOT + 3.9.0 .. diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml index f6034a115e..2bc235eb16 100644 --- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml +++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml @@ -29,7 +29,7 @@ UTF-8 - 3.9.0-SNAPSHOT + 3.9.0 1.7.36 diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml index 46cdea42fc..b5101583ee 100644 --- a/streams/quickstart/pom.xml +++ b/streams/quickstart/pom.xml @@ -22,7 +22,7 @@ org.apache.kafka streams-quickstart pom - 3.9.0-SNAPSHOT + 3.9.0 Kafka Streams :: Quickstart diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index a4a73838de..a190e05067 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -22,4 +22,4 @@ # Instead, in development branches, the version should have a suffix of the form ".devN" # # For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0" -__version__ = '3.9.0.dev0' +__version__ = '3.9.0'