Skip to content

Commit

Permalink
KAFKA-15022: tests for HA assignor and StickyTaskAssignor (apache#14921)
Browse files Browse the repository at this point in the history
Part of KIP-925.

Tests for HAAssignor and StickyAssignor.

Reviewers: Matthias J. Sax <[email protected]>
  • Loading branch information
lihaosky authored Dec 6, 2023
1 parent 9658942 commit 6be2e5c
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,14 @@ private void constructEdges(
final UUID processId = clientList.get(clientIndex);
final int clientNodeId = getClientNodeId(clientIndex, taskIdList, clientList, topicGroupIndex);
int startingTaskNodeId = taskNodeId;
boolean hasTask = false;
int validTaskCount = 0;
for (final TaskId taskId : taskIds) {
// It's possible some taskId is not in the tasks we want to assign. For example, taskIdSet is only stateless tasks,
// but the tasks in subtopology map contains all tasks including stateful ones.
if (!taskIdSet.contains(taskId)) {
continue;
}
hasTask = true;
validTaskCount++;
final boolean inCurrentAssignment = hasAssignedTask.test(clientStates.get(processId), taskId);
graph.addEdge(startingTaskNodeId, clientNodeId, 1, costFunction.getCost(taskId, processId, inCurrentAssignment, trafficCost, nonOverlapCost, isStandby), 0);
startingTaskNodeId++;
Expand All @@ -212,13 +212,12 @@ private void constructEdges(
}
}

if (hasTask) {
if (validTaskCount > 0) {
final int secondStageClientNodeId = getSecondStageClientNodeId(taskIdList,
clientList, tasksForTopicGroup, clientIndex);
final int capacity =
originalAssignedTaskNumber.containsKey(processId) ? (int) Math.ceil(
originalAssignedTaskNumber.get(processId) * 1.0 / taskIdList.size()
* taskIds.size()) : 0;
originalAssignedTaskNumber.containsKey(processId) ?
(int) Math.ceil(originalAssignedTaskNumber.get(processId) * 1.0 / taskIdList.size() * validTaskCount) : 0;
graph.addEdge(clientNodeId, secondStageClientNodeId, capacity, 0, 0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,12 +504,12 @@ V detectNegativeCycles(final V source, final Map<V, V> parentNodes, final Map<V,
final Long distanceEnd = distance.get(end);
// There's a path to u and either we haven't computed V or distance to V is shorter
if (distanceStart != null && (distanceEnd == null || distanceEnd > distanceStart + edge.cost)) {
if (i == nodeCount - 1) {
return end;
}
distance.put(end, distanceStart + edge.cost);
parentNodes.put(end, start);
parentEdges.put(end, edge);
if (i == nodeCount - 1) {
return end;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
);

public static Stream<Arguments> data() {
return Stream.of(Arguments.of(true), Arguments.of(false));
return Stream.of(
Arguments.of(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE),
Arguments.of(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC),
Arguments.of(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY)
);
}

@BeforeAll
Expand All @@ -112,24 +116,25 @@ public static void closeCluster() {

@ParameterizedTest
@MethodSource("data")
public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final boolean enableRackAwareAssignor, final TestInfo testInfo) throws InterruptedException {
public void shouldScaleOutWithWarmupTasksAndInMemoryStores(final String rackAwareStrategy, final TestInfo testInfo) throws InterruptedException {
// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
// value is one minute
shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)), testInfo, enableRackAwareAssignor);
shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName)), testInfo, rackAwareStrategy);
}

@ParameterizedTest
@MethodSource("data")
public void shouldScaleOutWithWarmupTasksAndPersistentStores(final boolean enableRackAwareAssignor, final TestInfo testInfo) throws InterruptedException {
public void shouldScaleOutWithWarmupTasksAndPersistentStores(final String rackAwareStrategy, final TestInfo testInfo) throws InterruptedException {
// NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum
// value is one minute
shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName)), testInfo, enableRackAwareAssignor);
shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName)), testInfo, rackAwareStrategy);
}

private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<Object, Object, KeyValueStore<Bytes, byte[]>>> materializedFunction,
final TestInfo testInfo,
final boolean enableRackAwareAssignor) throws InterruptedException {
final String testId = safeUniqueTestName(getClass(), testInfo);
final String rackAwareStrategy) throws InterruptedException {
// Replace "balance_subtopology" with shorter name since max name length is 249
final String testId = safeUniqueTestName(getClass(), testInfo).replaceAll("balance_subtopology", "balance");
final String appId = "appId_" + System.currentTimeMillis() + "_" + testId;
final String inputTopic = "input" + testId;
final Set<TopicPartition> inputTopicPartitions = mkSet(
Expand Down Expand Up @@ -170,8 +175,8 @@ private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<O

produceTestData(inputTopic, numberOfRecords);

try (final KafkaStreams kafkaStreams0 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener, enableRackAwareAssignor, AssignmentTestUtils.RACK_0));
final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener, enableRackAwareAssignor, AssignmentTestUtils.RACK_1));
try (final KafkaStreams kafkaStreams0 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener, rackAwareStrategy, AssignmentTestUtils.RACK_0));
final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener, rackAwareStrategy, AssignmentTestUtils.RACK_1));
final Consumer<String, String> consumer = new KafkaConsumer<>(getConsumerProperties())) {
kafkaStreams0.start();

Expand Down Expand Up @@ -312,9 +317,8 @@ private static void assertFalseNoRetry(final boolean assertion, final String mes

private static Properties streamsProperties(final String appId,
final AssignmentListener configuredAssignmentListener,
final boolean enableRackAwareAssignor,
final String rackAwareStrategy,
final String rack) {
final String rackAwareStrategy = enableRackAwareAssignor ? StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC : StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE;
return mkObjectProperties(
mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,13 +495,17 @@ static void assertBalancedActiveAssignment(final Map<UUID, ClientState> clientSt
}

static void assertBalancedTasks(final Map<UUID, ClientState> clientStates) {
final TaskSkewReport taskSkewReport = analyzeTaskAssignmentBalance(clientStates);
assertBalancedTasks(clientStates, 1);
}

static void assertBalancedTasks(final Map<UUID, ClientState> clientStates, final int skewThreshold) {
final TaskSkewReport taskSkewReport = analyzeTaskAssignmentBalance(clientStates, skewThreshold);
if (taskSkewReport.totalSkewedTasks() > 0) {
fail("Expected a balanced task assignment, but was: " + taskSkewReport);
}
}

static TaskSkewReport analyzeTaskAssignmentBalance(final Map<UUID, ClientState> clientStates) {
static TaskSkewReport analyzeTaskAssignmentBalance(final Map<UUID, ClientState> clientStates, final int skewThreshold) {
final Function<Integer, Map<UUID, AtomicInteger>> initialClientCounts =
i -> clientStates.keySet().stream().collect(Collectors.toMap(c -> c, c -> new AtomicInteger(0)));

Expand Down Expand Up @@ -531,7 +535,7 @@ static TaskSkewReport analyzeTaskAssignmentBalance(final Map<UUID, ClientState>
}
final int taskSkew = max - min;
maxTaskSkew = Math.max(maxTaskSkew, taskSkew);
if (taskSkew > 1) {
if (taskSkew > skewThreshold) {
skewedSubtopologies.add(entry.getKey());
}
}
Expand Down Expand Up @@ -1013,12 +1017,16 @@ static Map<UUID, Map<String, Optional<String>>> getProcessRacksForAllProcess() {
}

static RackAwareTaskAssignor getRackAwareTaskAssignor(final AssignmentConfigs configs) {
return getRackAwareTaskAssignor(configs, mkMap());
}

static RackAwareTaskAssignor getRackAwareTaskAssignor(final AssignmentConfigs configs, final Map<Subtopology, Set<TaskId>> taskForTopicGroup) {
return spy(
new RackAwareTaskAssignor(
getClusterForAllTopics(),
getTaskTopicPartitionMapForAllTasks(),
getTaskChangelogMapForAllTasks(),
new HashMap<>(),
taskForTopicGroup,
getProcessRacksForAllProcess(),
mockInternalTopicManagerForChangelog(),
configs,
Expand Down
Loading

0 comments on commit 6be2e5c

Please sign in to comment.