Skip to content

Commit

Permalink
KAFKA-17951: Share parition rotate strategy (apache#18651)
Browse files Browse the repository at this point in the history
Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit <[email protected]>
  • Loading branch information
apoorvmittal10 authored Jan 28, 2025
1 parent f32932c commit c7619ef
Show file tree
Hide file tree
Showing 11 changed files with 455 additions and 169 deletions.
11 changes: 9 additions & 2 deletions core/src/main/java/kafka/server/share/SharePartitionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
import org.apache.kafka.server.share.fetch.PartitionRotateStrategy;
import org.apache.kafka.server.share.fetch.PartitionRotateStrategy.PartitionRotateMetadata;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.session.ShareSession;
Expand Down Expand Up @@ -261,14 +263,19 @@ public CompletableFuture<Map<TopicIdPartition, PartitionData>> fetchMessages(
String groupId,
String memberId,
FetchParams fetchParams,
int sessionEpoch,
int batchSize,
Map<TopicIdPartition, Integer> partitionMaxBytes
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes
) {
log.trace("Fetch request for topicIdPartitions: {} with groupId: {} fetch params: {}",
partitionMaxBytes.keySet(), groupId, fetchParams);

LinkedHashMap<TopicIdPartition, Integer> topicIdPartitions = PartitionRotateStrategy
.type(PartitionRotateStrategy.StrategyType.ROUND_ROBIN)
.rotate(partitionMaxBytes, new PartitionRotateMetadata(sessionEpoch));

CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
processShareFetch(new ShareFetch(fetchParams, groupId, memberId, future, partitionMaxBytes, batchSize, maxFetchRecords, brokerTopicStats));
processShareFetch(new ShareFetch(fetchParams, groupId, memberId, future, topicIdPartitions, batchSize, maxFetchRecords, brokerTopicStats));

return future;
}
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2796,9 +2796,9 @@ class KafkaApis(val requestChannel: RequestChannel,

// Handling the Fetch from the ShareFetchRequest.
// Variable to store the topic partition wise result of fetching.
val fetchResult: CompletableFuture[Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] =
handleFetchFromShareFetchRequest(
val fetchResult: CompletableFuture[Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] = handleFetchFromShareFetchRequest(
request,
shareSessionEpoch,
erroneousAndValidPartitionData,
sharePartitionManager,
authorizedTopics
Expand Down Expand Up @@ -2893,6 +2893,7 @@ class KafkaApis(val requestChannel: RequestChannel,

// Visible for Testing
def handleFetchFromShareFetchRequest(request: RequestChannel.Request,
shareSessionEpoch: Int,
erroneousAndValidPartitionData: ErroneousAndValidPartitionData,
sharePartitionManagerInstance: SharePartitionManager,
authorizedTopics: Set[String]
Expand Down Expand Up @@ -2954,6 +2955,7 @@ class KafkaApis(val requestChannel: RequestChannel,
groupId,
shareFetchRequest.data.memberId,
params,
shareSessionEpoch,
shareFetchRequest.data.batchSize,
interestedWithMaxBytes
).thenApply{ result =>
Expand Down
70 changes: 18 additions & 52 deletions core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -72,6 +71,7 @@
import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult;
import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.orderedMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -115,9 +115,7 @@ public void testDelayedShareFetchTryCompleteReturnsFalseDueToNonAcquirablePartit
Uuid topicId = Uuid.randomUuid();
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);

SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
Expand Down Expand Up @@ -155,9 +153,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() {
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);

SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
Expand Down Expand Up @@ -217,9 +213,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() {
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);

SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
Expand Down Expand Up @@ -274,9 +268,7 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() {
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);

SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
Expand Down Expand Up @@ -323,9 +315,7 @@ public void testEmptyFutureReturnedByDelayedShareFetchOnComplete() {
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);

SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
Expand Down Expand Up @@ -369,9 +359,7 @@ public void testReplicaManagerFetchShouldHappenOnComplete() {
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);

SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
Expand Down Expand Up @@ -419,8 +407,7 @@ public void testToCompleteAnAlreadyCompletedFuture() {
Uuid topicId = Uuid.randomUuid();
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0);

SharePartition sp0 = mock(SharePartition.class);

Expand Down Expand Up @@ -469,9 +456,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
TopicIdPartition tp2 = new TopicIdPartition(topicId, new TopicPartition("foo", 2));
Map<TopicIdPartition, Integer> partitionMaxBytes1 = new HashMap<>();
partitionMaxBytes1.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes1.put(tp1, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes1 = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);

SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
Expand Down Expand Up @@ -513,9 +498,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
assertTrue(delayedShareFetch1.lock().tryLock());
delayedShareFetch1.lock().unlock();

Map<TopicIdPartition, Integer> partitionMaxBytes2 = new HashMap<>();
partitionMaxBytes2.put(tp1, PARTITION_MAX_BYTES);
partitionMaxBytes2.put(tp2, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes2 = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
ShareFetch shareFetch2 = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), partitionMaxBytes2, BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);
Expand Down Expand Up @@ -561,9 +544,7 @@ public void testCombineLogReadResponse() {
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);

SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
Expand Down Expand Up @@ -619,8 +600,7 @@ public void testExceptionInMinBytesCalculation() {
Uuid topicId = Uuid.randomUuid();
ReplicaManager replicaManager = mock(ReplicaManager.class);
TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0);

SharePartition sp0 = mock(SharePartition.class);

Expand Down Expand Up @@ -697,7 +677,7 @@ public void testLocksReleasedForCompletedFetch() {
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1);

ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), Map.of(tp0, PARTITION_MAX_BYTES), BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), orderedMap(PARTITION_MAX_BYTES, tp0), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);

DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
Expand Down Expand Up @@ -728,7 +708,7 @@ public void testLocksReleasedAcquireException() {
sharePartitions.put(tp0, sp0);

ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(),
new CompletableFuture<>(), Map.of(tp0, PARTITION_MAX_BYTES), BATCH_SIZE, MAX_FETCH_RECORDS,
new CompletableFuture<>(), orderedMap(PARTITION_MAX_BYTES, tp0), BATCH_SIZE, MAX_FETCH_RECORDS,
BROKER_TOPIC_STATS);

DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
Expand All @@ -747,8 +727,7 @@ public void testTryCompleteWhenPartitionMaxBytesStrategyThrowsException() {
String groupId = "grp";
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
SharePartition sp0 = mock(SharePartition.class);
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0);

when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp0.canAcquireRecords()).thenReturn(true);
Expand Down Expand Up @@ -804,12 +783,7 @@ public void testPartitionMaxBytesFromUniformStrategyWhenAllPartitionsAreAcquirab
SharePartition sp3 = mock(SharePartition.class);
SharePartition sp4 = mock(SharePartition.class);

Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp2, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp3, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp4, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1, tp2, tp3, tp4);

when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
Expand Down Expand Up @@ -907,12 +881,7 @@ public void testPartitionMaxBytesFromUniformStrategyWhenFewPartitionsAreAcquirab
SharePartition sp3 = mock(SharePartition.class);
SharePartition sp4 = mock(SharePartition.class);

Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp2, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp3, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp4, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1, tp2, tp3, tp4);

when(sp0.maybeAcquireFetchLock()).thenReturn(true);
when(sp1.maybeAcquireFetchLock()).thenReturn(true);
Expand Down Expand Up @@ -992,10 +961,7 @@ public void testPartitionMaxBytesFromUniformStrategyInCombineLogReadResponse() {
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp2, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1, tp2);

SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
Expand Down
19 changes: 6 additions & 13 deletions core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.function.BiConsumer;

import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.orderedMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -84,9 +85,7 @@ public void testProcessFetchResponse() {
String memberId = Uuid.randomUuid().toString();
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);

SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
Expand Down Expand Up @@ -151,9 +150,7 @@ public void testProcessFetchResponseWithEmptyRecords() {
String memberId = Uuid.randomUuid().toString();
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);

SharePartition sp0 = mock(SharePartition.class);
SharePartition sp1 = mock(SharePartition.class);
Expand Down Expand Up @@ -199,9 +196,7 @@ public void testProcessFetchResponseWithLsoMovementForTopicPartition() {
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));

Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);

SharePartition sp0 = Mockito.mock(SharePartition.class);
SharePartition sp1 = Mockito.mock(SharePartition.class);
Expand Down Expand Up @@ -295,7 +290,7 @@ public void testProcessFetchResponseWhenNoRecordsAreAcquired() {
String groupId = "grp";

TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
Map<TopicIdPartition, Integer> partitionMaxBytes = Collections.singletonMap(tp0, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0);

SharePartition sp0 = Mockito.mock(SharePartition.class);
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>();
Expand Down Expand Up @@ -357,9 +352,7 @@ public void testProcessFetchResponseWithMaxFetchRecords() {
TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));

Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes = orderedMap(PARTITION_MAX_BYTES, tp0, tp1);

SharePartition sp0 = Mockito.mock(SharePartition.class);
SharePartition sp1 = Mockito.mock(SharePartition.class);
Expand Down
Loading

0 comments on commit c7619ef

Please sign in to comment.