Skip to content

Commit

Permalink
Merge branch '3.9.0' into merge_3.9
Browse files Browse the repository at this point in the history
  • Loading branch information
superhx committed Nov 8, 2024
2 parents 1ada92c + cc53a63 commit aa856f8
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,7 @@ class KRaftClusterTest {
() => admin.createTopics(newTopics).all().get())
assertNotNull(executionException.getCause)
assertEquals(classOf[PolicyViolationException], executionException.getCause.getClass)
assertEquals("Unable to perform excessively large batch operation.",
assertEquals("Excessively large number of partitions per request.",
executionException.getCause.getMessage)
} finally {
admin.close()
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ group=org.apache.kafka
# - streams/quickstart/pom.xml
# - streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
# - streams/quickstart/java/pom.xml
version=3.9.0-SNAPSHOT
version=3.9.0
scalaVersion=2.13.14
# Adding swaggerVersion in gradle.properties to have a single version in place for swagger
# New version of Swagger 2.2.14 requires minimum JDK 11.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@
*/
public class ReplicationControlManager {
static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
static final int MAX_PARTITIONS_PER_BATCH = 10_000;

static class Builder {
private SnapshotRegistry snapshotRegistry = null;
Expand Down Expand Up @@ -687,6 +688,8 @@ ControllerResult<CreateTopicsResponseData> createTopics(
Map<String, ApiError> topicErrors = new HashMap<>();
List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);

validateTotalNumberOfPartitions(request, defaultNumPartitions);

// Check the topic names.
validateNewTopicNames(topicErrors, request.topics(), topicsWithCollisionChars);

Expand Down Expand Up @@ -1284,6 +1287,34 @@ ControllerResult<AlterPartitionResponseData> alterPartition(
return ControllerResult.of(records, response);
}

/**
* Validates that a batch of topics will create less than {@value MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch
* has led to out-of-memory exceptions. We use this validation to fail earlier to avoid allocating the memory.
* Validates an upper bound number of partitions. The actual number may be smaller if some topics are misconfigured.
*
* @param request a batch of topics to create.
* @param defaultNumPartitions default number of partitions to assign if unspecified.
* @throws PolicyViolationException if total number of partitions exceeds {@value MAX_PARTITIONS_PER_BATCH}.
*/
static void validateTotalNumberOfPartitions(CreateTopicsRequestData request, int defaultNumPartitions) {
int totalPartitions = 0;
for (CreatableTopic topic: request.topics()) {
if (topic.assignments().isEmpty()) {
if (topic.numPartitions() == -1) {
totalPartitions += defaultNumPartitions;
} else if (topic.numPartitions() > 0) {
totalPartitions += topic.numPartitions();
}
} else {
totalPartitions += topic.assignments().size();
}

}
if (totalPartitions > MAX_PARTITIONS_PER_BATCH) {
throw new PolicyViolationException("Excessively large number of partitions per request.");
}
}

/**
* Validate the partition information included in the alter partition request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -573,6 +574,44 @@ public void configure(Map<String, ?> configs) {
}
}

@Test
public void testExcessiveNumberOfTopicsCannotBeCreated() {
// number of partitions is explicitly set without assignments
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
ReplicationControlManager replicationControl = ctx.replicationControl;
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(5000).setReplicationFactor((short) 1));
request.topics().add(new CreatableTopic().setName("bar").
setNumPartitions(5000).setReplicationFactor((short) 1));
request.topics().add(new CreatableTopic().setName("baz").
setNumPartitions(1).setReplicationFactor((short) 1));
ControllerRequestContext requestContext = anonymousContextFor(ApiKeys.CREATE_TOPICS);
PolicyViolationException error = assertThrows(
PolicyViolationException.class,
() -> replicationControl.createTopics(requestContext, request, Stream.of("foo", "bar", "baz").collect(Collectors.toSet())));
assertEquals(error.getMessage(), "Excessively large number of partitions per request.");
}

@Test
public void testExcessiveNumberOfTopicsCannotBeCreatedWithAssignments() {
CreateTopicsRequestData request = new CreateTopicsRequestData();
request.topics().add(new CreatableTopic().setName("foo").
setNumPartitions(-1).setReplicationFactor((short) 1));
CreateTopicsRequestData.CreatableReplicaAssignmentCollection assignments =
new CreateTopicsRequestData.CreatableReplicaAssignmentCollection();
assignments.add(new CreatableReplicaAssignment().setPartitionIndex(1));
assignments.add(new CreatableReplicaAssignment().setPartitionIndex(2));
request.topics().add(new CreatableTopic()
.setName("baz")
.setAssignments(assignments));
PolicyViolationException error = assertThrows(
PolicyViolationException.class,
() -> ReplicationControlManager.validateTotalNumberOfPartitions(request, 9999)
);
assertEquals(error.getMessage(), "Excessively large number of partitions per request.");
}

@Test
public void testCreateTopics() {
ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build();
Expand Down
2 changes: 1 addition & 1 deletion streams/quickstart/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<parent>
<groupId>org.apache.kafka</groupId>
<artifactId>streams-quickstart</artifactId>
<version>3.9.0-SNAPSHOT</version>
<version>3.9.0</version>
<relativePath>..</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>3.9.0-SNAPSHOT</kafka.version>
<kafka.version>3.9.0</kafka.version>
<slf4j.version>1.7.36</slf4j.version>
</properties>

Expand Down
2 changes: 1 addition & 1 deletion streams/quickstart/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<groupId>org.apache.kafka</groupId>
<artifactId>streams-quickstart</artifactId>
<packaging>pom</packaging>
<version>3.9.0-SNAPSHOT</version>
<version>3.9.0</version>

<name>Kafka Streams :: Quickstart</name>

Expand Down
2 changes: 1 addition & 1 deletion tests/kafkatest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@
# Instead, in development branches, the version should have a suffix of the form ".devN"
#
# For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0"
__version__ = '3.9.0.dev0'
__version__ = '3.9.0'

0 comments on commit aa856f8

Please sign in to comment.