Skip to content

Commit

Permalink
MINOR: refactor BuiltInPartitioner to remove mockRandom from producti…
Browse files Browse the repository at this point in the history
…on code (apache#16277)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
gongxuanzhang authored Jun 15, 2024
1 parent d239dde commit 3a9d877
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
* Built-in default partitioner. Note, that this is just a utility class that is used directly from
Expand All @@ -44,8 +43,6 @@ public class BuiltInPartitioner {
private volatile PartitionLoadStats partitionLoadStats = null;
private final AtomicReference<StickyPartitionInfo> stickyPartitionInfo = new AtomicReference<>();

// Visible and used for testing only.
public static volatile Supplier<Integer> mockRandom = null;

/**
* BuiltInPartitioner constructor.
Expand All @@ -66,7 +63,7 @@ public BuiltInPartitioner(LogContext logContext, String topic, int stickyBatchSi
* Calculate the next partition for the topic based on the partition load stats.
*/
private int nextPartition(Cluster cluster) {
int random = mockRandom != null ? mockRandom.get() : Utils.toPositive(ThreadLocalRandom.current().nextInt());
int random = randomPartition();

// Cache volatile variable in local variable.
PartitionLoadStats partitionLoadStats = this.partitionLoadStats;
Expand Down Expand Up @@ -113,6 +110,10 @@ private int nextPartition(Cluster cluster) {
return partition;
}

int randomPartition() {
return Utils.toPositive(ThreadLocalRandom.current().nextInt());
}

/**
* Test-only function. When partition load stats are defined, return the end of range for the
* random number.
Expand Down Expand Up @@ -335,6 +336,7 @@ private static final class PartitionLoadStats {
public final int[] cumulativeFrequencyTable;
public final int[] partitionIds;
public final int length;

public PartitionLoadStats(int[] cumulativeFrequencyTable, int[] partitionIds, int length) {
assert cumulativeFrequencyTable.length == partitionIds.length;
assert length <= cumulativeFrequencyTable.length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public RecordAppendResult append(String topic,
boolean abortOnNewBatch,
long nowMs,
Cluster cluster) throws InterruptedException {
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(logContext, k, batchSize));
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize)));

// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
Expand Down Expand Up @@ -1033,10 +1033,15 @@ public Deque<ProducerBatch> getDeque(TopicPartition tp) {
* Get the deque for the given topic-partition, creating it if necessary.
*/
private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(tp.topic(), k -> new TopicInfo(logContext, k, batchSize));
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(tp.topic(),
k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize)));
return topicInfo.batches.computeIfAbsent(tp.partition(), k -> new ArrayDeque<>());
}

BuiltInPartitioner createBuiltInPartitioner(LogContext logContext, String topic, int stickyBatchSize) {
return new BuiltInPartitioner(logContext, topic, stickyBatchSize);
}

/**
* Deallocate the record batch
*/
Expand Down Expand Up @@ -1261,8 +1266,8 @@ private static class TopicInfo {
public final ConcurrentMap<Integer /*partition*/, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>();
public final BuiltInPartitioner builtInPartitioner;

public TopicInfo(LogContext logContext, String topic, int stickyBatchSize) {
builtInPartitioner = new BuiltInPartitioner(logContext, topic, stickyBatchSize);
public TopicInfo(BuiltInPartitioner builtInPartitioner) {
this.builtInPartitioner = builtInPartitioner;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
Expand All @@ -47,11 +46,6 @@ public class BuiltInPartitionerTest {
static final String TOPIC_C = "topicC";
final LogContext logContext = new LogContext();

@AfterEach
public void tearDown() {
BuiltInPartitioner.mockRandom = null;
}

@Test
public void testStickyPartitioning() {
List<PartitionInfo> allPartitions = asList(new PartitionInfo(TOPIC_A, 0, NODES[0], NODES, NODES),
Expand All @@ -63,13 +57,9 @@ public void testStickyPartitioning() {
Collections.emptySet(), Collections.emptySet());

// Create partitions with "sticky" batch size to accommodate 3 records.
BuiltInPartitioner builtInPartitionerA = new BuiltInPartitioner(logContext, TOPIC_A, 3);
BuiltInPartitioner builtInPartitionerA = new SequentialPartitioner(logContext, TOPIC_A, 3);

// Test the partition is not switched until sticky batch size is reached.
// Mock random number generator with just sequential integer.
AtomicInteger mockRandom = new AtomicInteger();
BuiltInPartitioner.mockRandom = () -> mockRandom.getAndAdd(1);

BuiltInPartitioner.StickyPartitionInfo partitionInfo = builtInPartitionerA.peekCurrentPartitionInfo(testCluster);
int partA = partitionInfo.partition();
builtInPartitionerA.updatePartitionInfo(partitionInfo, 1, testCluster);
Expand All @@ -86,7 +76,7 @@ public void testStickyPartitioning() {
assertNotEquals(partA, builtInPartitionerA.peekCurrentPartitionInfo(testCluster).partition());

// Check that switching works even when there is one partition.
BuiltInPartitioner builtInPartitionerB = new BuiltInPartitioner(logContext, TOPIC_B, 1);
BuiltInPartitioner builtInPartitionerB = new SequentialPartitioner(logContext, TOPIC_B, 1);
for (int c = 10; c-- > 0; ) {
partitionInfo = builtInPartitionerB.peekCurrentPartitionInfo(testCluster);
assertEquals(0, partitionInfo.partition());
Expand Down Expand Up @@ -155,11 +145,7 @@ public void unavailablePartitionsTest() {

@Test
public void adaptivePartitionsTest() {
// Mock random number generator with just sequential integer.
AtomicInteger mockRandom = new AtomicInteger();
BuiltInPartitioner.mockRandom = () -> mockRandom.getAndAdd(1);

BuiltInPartitioner builtInPartitioner = new BuiltInPartitioner(logContext, TOPIC_A, 1);
BuiltInPartitioner builtInPartitioner = new SequentialPartitioner(logContext, TOPIC_A, 1);

// Simulate partition queue sizes.
int[] queueSizes = {5, 0, 3, 0, 1};
Expand Down Expand Up @@ -203,4 +189,19 @@ void testStickyBatchSizeMoreThatZero() {
assertThrows(IllegalArgumentException.class, () -> new BuiltInPartitioner(logContext, TOPIC_A, 0));
assertDoesNotThrow(() -> new BuiltInPartitioner(logContext, TOPIC_A, 1));
}


private static class SequentialPartitioner extends BuiltInPartitioner {

AtomicInteger mockRandom = new AtomicInteger();

public SequentialPartitioner(LogContext logContext, String topic, int stickyBatchSize) {
super(logContext, topic, stickyBatchSize);
}

@Override
int randomPartition() {
return mockRandom.getAndAdd(1);
}
}
}
Loading

0 comments on commit 3a9d877

Please sign in to comment.