From 389f96aabd6239015dfa9d4180524dcfc5121e5c Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 19 Nov 2024 10:01:05 +0100 Subject: [PATCH] MINOR: Various cleanups in coordinator modules (#17828) Reviewers: David Jacot , Ken Huang --- .../common/runtime/CoordinatorRuntime.java | 2 +- .../common/runtime/EventAccumulatorTest.java | 4 +- .../runtime/InMemoryPartitionWriter.java | 18 ++-- .../coordinator/group/GroupCoordinator.java | 1 - .../group/GroupCoordinatorConfig.java | 21 ++--- .../group/GroupCoordinatorRecordHelpers.java | 12 +-- .../group/GroupCoordinatorShard.java | 2 +- .../group/GroupMetadataManager.java | 24 +++--- .../group/OffsetMetadataManager.java | 2 +- .../coordinator/group/assignor/RangeSet.java | 2 +- ...UniformHeterogeneousAssignmentBuilder.java | 82 ++++++++----------- .../group/classic/ClassicGroup.java | 25 +----- .../coordinator/group/modern/UnionSet.java | 2 +- .../modern/consumer/ConsumerGroupMember.java | 4 +- .../group/modern/share/ShareGroupConfig.java | 6 +- .../group/GroupCoordinatorConfigTest.java | 1 - .../group/MetadataImageBuilder.java | 2 +- .../group/OffsetMetadataManagerTest.java | 4 +- .../group/modern/UnionSetTest.java | 3 +- .../transaction/TransactionLogConfigTest.java | 3 +- 20 files changed, 89 insertions(+), 131 deletions(-) diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index 2a7c3c2e508f1..34f350d385b01 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -2333,7 +2333,7 @@ public void scheduleUnloadOperation( if (context != null) { context.lock.lock(); try { - if (!partitionEpoch.isPresent() || context.epoch < partitionEpoch.getAsInt()) { + if (partitionEpoch.isEmpty() || context.epoch < partitionEpoch.getAsInt()) { log.info("Started unloading metadata for {} with epoch {}.", tp, partitionEpoch); context.transitionTo(CoordinatorState.CLOSED); coordinators.remove(tp, context); diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/EventAccumulatorTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/EventAccumulatorTest.java index 1f357de34e379..d35a0776db075 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/EventAccumulatorTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/EventAccumulatorTest.java @@ -37,7 +37,7 @@ public class EventAccumulatorTest { - private class MockEvent implements EventAccumulator.Event { + private static class MockEvent implements EventAccumulator.Event { int key; int value; @@ -153,7 +153,7 @@ public void testKeyConcurrentAndOrderingGuarantees() { accumulator.addLast(event2); assertEquals(3, accumulator.size()); - MockEvent event = null; + MockEvent event; // Poll event0. event = accumulator.poll(); diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java index cd756a018af33..1f676ad550fc8 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/InMemoryPartitionWriter.java @@ -37,10 +37,10 @@ */ public class InMemoryPartitionWriter implements PartitionWriter { - private class PartitionState { - private ReentrantLock lock = new ReentrantLock(); - private List listeners = new ArrayList<>(); - private List entries = new ArrayList<>(); + private static class PartitionState { + private final ReentrantLock lock = new ReentrantLock(); + private final List listeners = new ArrayList<>(); + private final List entries = new ArrayList<>(); private long endOffset = 0L; private long committedOffset = 0L; } @@ -134,9 +134,8 @@ public void commit( state.lock.lock(); try { state.committedOffset = offset; - state.listeners.forEach(listener -> { - listener.onHighWatermarkUpdated(tp, state.committedOffset); - }); + state.listeners.forEach(listener -> + listener.onHighWatermarkUpdated(tp, state.committedOffset)); } finally { state.lock.unlock(); } @@ -149,9 +148,8 @@ public void commit( state.lock.lock(); try { state.committedOffset = state.endOffset; - state.listeners.forEach(listener -> { - listener.onHighWatermarkUpdated(tp, state.committedOffset); - }); + state.listeners.forEach(listener -> + listener.onHighWatermarkUpdated(tp, state.committedOffset)); } finally { state.lock.unlock(); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index 87efb530dc8fa..223cff9720eaf 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -420,7 +420,6 @@ void onNewMetadataImage( * * @param groupId The group id. * @param newGroupConfig The new group config - * @return void */ void updateGroupConfig(String groupId, Properties newGroupConfig); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index e761c136d9f30..b1e41720fe30e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -78,8 +78,10 @@ public class GroupCoordinatorConfig { public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols. Supported protocols: " + Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(",")) + ". " + "The " + Group.GroupType.SHARE + " rebalance protocol is in early access and therefore must not be used in production."; - public static final List GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = - Collections.unmodifiableList(Arrays.asList(Group.GroupType.CLASSIC.toString(), Group.GroupType.CONSUMER.toString())); + public static final List GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of( + Group.GroupType.CLASSIC.toString(), + Group.GroupType.CONSUMER.toString() + ); public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms"; public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " + "wait for writes to accumulate before flushing them to disk. Transactional writes are not accumulated."; @@ -120,10 +122,10 @@ public class GroupCoordinatorConfig { public static final String CONSUMER_GROUP_ASSIGNORS_CONFIG = "group.consumer.assignors"; public static final String CONSUMER_GROUP_ASSIGNORS_DOC = "The server side assignors as a list of full class names. The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor."; - public static final List CONSUMER_GROUP_ASSIGNORS_DEFAULT = Collections.unmodifiableList(Arrays.asList( - UniformAssignor.class.getName(), - RangeAssignor.class.getName() - )); + public static final List CONSUMER_GROUP_ASSIGNORS_DEFAULT = List.of( + UniformAssignor.class.getName(), + RangeAssignor.class.getName() + ); public static final String CONSUMER_GROUP_MIGRATION_POLICY_CONFIG = "group.consumer.migration.policy"; public static final String CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT = ConsumerGroupMigrationPolicy.BIDIRECTIONAL.toString(); @@ -380,10 +382,9 @@ public Map extractGroupConfigMap(ShareGroupConfig shareGroupCon * Copy the subset of properties that are relevant to consumer group. */ public Map extractConsumerGroupConfigMap() { - Map groupProps = new HashMap<>(); - groupProps.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, consumerGroupSessionTimeoutMs()); - groupProps.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, consumerGroupHeartbeatIntervalMs()); - return Collections.unmodifiableMap(groupProps); + return Map.of( + GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, consumerGroupSessionTimeoutMs(), + GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, consumerGroupHeartbeatIntervalMs()); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java index b3aa3b9db77fb..05cf4d5bd3a2b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java @@ -141,13 +141,13 @@ public static CoordinatorRecord newConsumerGroupSubscriptionMetadataRecord( Map newSubscriptionMetadata ) { ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue(); - newSubscriptionMetadata.forEach((topicName, topicMetadata) -> { + newSubscriptionMetadata.forEach((topicName, topicMetadata) -> value.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata() .setTopicId(topicMetadata.id()) .setTopicName(topicMetadata.name()) .setNumPartitions(topicMetadata.numPartitions()) - ); - }); + ) + ); return new CoordinatorRecord( new ApiMessageAndVersion( @@ -674,13 +674,13 @@ public static CoordinatorRecord newShareGroupSubscriptionMetadataRecord( Map newSubscriptionMetadata ) { ShareGroupPartitionMetadataValue value = new ShareGroupPartitionMetadataValue(); - newSubscriptionMetadata.forEach((topicName, topicMetadata) -> { + newSubscriptionMetadata.forEach((topicName, topicMetadata) -> value.topics().add(new ShareGroupPartitionMetadataValue.TopicMetadata() .setTopicId(topicMetadata.id()) .setTopicName(topicMetadata.name()) .setNumPartitions(topicMetadata.numPartitions()) - ); - }); + ) + ); return new CoordinatorRecord( new ApiMessageAndVersion( diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java index 173ea366c7f1d..d047e5a425e12 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java @@ -116,11 +116,11 @@ public class GroupCoordinatorShard implements CoordinatorShard { private final GroupCoordinatorConfig config; + private final GroupConfigManager groupConfigManager; private LogContext logContext; private SnapshotRegistry snapshotRegistry; private Time time; private CoordinatorTimer timer; - private GroupConfigManager groupConfigManager; private CoordinatorMetrics coordinatorMetrics; private TopicPartition topicPartition; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 0d4ef16a45c24..7ad755135427e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -731,7 +731,7 @@ public List describeGroups( ClassicGroup group = classicGroup(groupId, committedOffset); if (group.isInState(STABLE)) { - if (!group.protocolName().isPresent()) { + if (group.protocolName().isEmpty()) { throw new IllegalStateException("Invalid null group protocol for stable group"); } @@ -751,7 +751,7 @@ public List describeGroups( .setGroupState(group.stateAsString()) .setProtocolType(group.protocolType().orElse("")) .setMembers(group.allMembers().stream() - .map(member -> member.describeNoMetadata()) + .map(ClassicGroupMember::describeNoMetadata) .collect(Collectors.toList()) ) ); @@ -3737,9 +3737,9 @@ public void replay( if (value != null) { Map subscriptionMetadata = new HashMap<>(); - value.topics().forEach(topicMetadata -> { - subscriptionMetadata.put(topicMetadata.topicName(), TopicMetadata.fromRecord(topicMetadata)); - }); + value.topics().forEach(topicMetadata -> + subscriptionMetadata.put(topicMetadata.topicName(), TopicMetadata.fromRecord(topicMetadata)) + ); group.setSubscriptionMetadata(subscriptionMetadata); } else { group.setSubscriptionMetadata(Collections.emptyMap()); @@ -3947,19 +3947,19 @@ public void onUnloaded() { case DEAD: break; case PREPARING_REBALANCE: - classicGroup.allMembers().forEach(member -> { + classicGroup.allMembers().forEach(member -> classicGroup.completeJoinFuture(member, new JoinGroupResponseData() .setMemberId(member.memberId()) - .setErrorCode(NOT_COORDINATOR.code())); - }); + .setErrorCode(NOT_COORDINATOR.code())) + ); break; case COMPLETING_REBALANCE: case STABLE: - classicGroup.allMembers().forEach(member -> { + classicGroup.allMembers().forEach(member -> classicGroup.completeSyncFuture(member, new SyncGroupResponseData() - .setErrorCode(NOT_COORDINATOR.code())); - }); + .setErrorCode(NOT_COORDINATOR.code())) + ); } break; case SHARE: @@ -6086,7 +6086,7 @@ private boolean maybeDeleteEmptyClassicGroup(Group group, List rec long currentTimestampMs = time.milliseconds(); Optional offsetExpirationCondition = group.offsetExpirationCondition(); - if (!offsetExpirationCondition.isPresent()) { + if (offsetExpirationCondition.isEmpty()) { return false; } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java index 867cdd9c55619..5668bd98fdf77 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java @@ -63,7 +63,7 @@ public boolean contains(Object o) { @Override public Iterator iterator() { - return new Iterator() { + return new Iterator<>() { private int current = from; @Override diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java index ca5ba77fbbdb5..3166d775c27e0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java @@ -24,9 +24,6 @@ import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -55,7 +52,6 @@ * Balance > Stickiness. */ public class UniformHeterogeneousAssignmentBuilder { - private static final Logger LOG = LoggerFactory.getLogger(UniformHeterogeneousAssignmentBuilder.class); /** * The maximum number of iterations to perform in the final iterative balancing phase. @@ -181,50 +177,44 @@ public UniformHeterogeneousAssignmentBuilder(GroupSpec groupSpec, SubscribedTopi } } - this.topicComparator = new Comparator() { - @Override - public int compare(final Uuid topic1Id, final Uuid topic2Id) { - int topic1PartitionCount = subscribedTopicDescriber.numPartitions(topic1Id); - int topic2PartitionCount = subscribedTopicDescriber.numPartitions(topic2Id); - int topic1SubscriberCount = topicSubscribers.get(topic1Id).size(); - int topic2SubscriberCount = topicSubscribers.get(topic2Id).size(); - - // Order by partitions per subscriber, descending. - int order = Double.compare( - (double) topic2PartitionCount / topic2SubscriberCount, - (double) topic1PartitionCount / topic1SubscriberCount - ); - - // Then order by subscriber count, ascending. - if (order == 0) { - order = Integer.compare(topic1SubscriberCount, topic2SubscriberCount); - } - - // Then order by topic id, ascending. - if (order == 0) { - order = topic1Id.compareTo(topic2Id); - } + this.topicComparator = (topic1Id, topic2Id) -> { + int topic1PartitionCount = subscribedTopicDescriber.numPartitions(topic1Id); + int topic2PartitionCount = subscribedTopicDescriber.numPartitions(topic2Id); + int topic1SubscriberCount = topicSubscribers.get(topic1Id).size(); + int topic2SubscriberCount = topicSubscribers.get(topic2Id).size(); + + // Order by partitions per subscriber, descending. + int order = Double.compare( + (double) topic2PartitionCount / topic2SubscriberCount, + (double) topic1PartitionCount / topic1SubscriberCount + ); + + // Then order by subscriber count, ascending. + if (order == 0) { + order = Integer.compare(topic1SubscriberCount, topic2SubscriberCount); + } - return order; + // Then order by topic id, ascending. + if (order == 0) { + order = topic1Id.compareTo(topic2Id); } - }; - this.memberComparator = new Comparator() { - @Override - public int compare(final Integer memberIndex1, final Integer memberIndex2) { - // Order by number of assigned partitions, ascending. - int order = Integer.compare( - memberTargetAssignmentSizes[memberIndex1], - memberTargetAssignmentSizes[memberIndex2] - ); + return order; + }; - // Then order by member index, ascending. - if (order == 0) { - order = memberIndex1.compareTo(memberIndex2); - } + this.memberComparator = (memberIndex1, memberIndex2) -> { + // Order by number of assigned partitions, ascending. + int order = Integer.compare( + memberTargetAssignmentSizes[memberIndex1], + memberTargetAssignmentSizes[memberIndex2] + ); - return order; + // Then order by member index, ascending. + if (order == 0) { + order = memberIndex1.compareTo(memberIndex2); } + + return order; }; // Initialize partition owners for the target assignments. @@ -851,14 +841,6 @@ private void assignPartition(Uuid topicId, int partition, int memberIndex) { addPartitionToTargetAssignment(topicId, partition, memberIndex); } - /** - * @param memberIndex The member index. - * @return The current assignment size for the given member. - */ - private int targetAssignmentSize(int memberIndex) { - return memberTargetAssignmentSizes[memberIndex]; - } - /** * Assigns a partition to a member and updates the current assignment size. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java index 819eb53be38b3..bc25569edba43 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java @@ -287,7 +287,7 @@ public Optional protocolType() { * @return True if the group is a simple group. */ public boolean isSimpleGroup() { - return !protocolType.isPresent() && isEmpty() && pendingJoinMembers.isEmpty(); + return protocolType.isEmpty() && isEmpty() && pendingJoinMembers.isEmpty(); } /** @@ -448,7 +448,7 @@ public void add(ClassicGroupMember member, CompletableFuture statesFilter, long committedOffset) { return statesFilter.contains(state.toLowerCaseString()); } - /** - * Verify the member id is up to date for static members. Return true if both conditions met: - * 1. given member is a known static member to group - * 2. group stored member id doesn't match with given member id - * - * @param groupInstanceId the group instance id. - * @param memberId the member id. - * @return whether the static member is fenced based on the condition above. - */ - public boolean isStaticMemberFenced( - String groupInstanceId, - String memberId - ) { - String existingMemberId = staticMemberId(groupInstanceId); - return existingMemberId != null && !existingMemberId.equals(memberId); - } - /** * @return whether the group can rebalance. */ @@ -1160,7 +1143,7 @@ public boolean isSubscribedToTopic(String topic) { * @return the subscribed topics or Empty based on the condition above. */ public Optional> computeSubscribedTopics() { - if (!protocolType.isPresent()) { + if (protocolType.isEmpty()) { return Optional.empty(); } String type = protocolType.get(); @@ -1378,7 +1361,7 @@ public static ClassicGroup fromConsumerGroup( ClassicGroupState.STABLE, time, consumerGroup.groupEpoch(), - Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE), + Optional.of(ConsumerProtocol.PROTOCOL_TYPE), Optional.empty(), Optional.empty(), Optional.of(time.milliseconds()) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/UnionSet.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/UnionSet.java index 185142a13df47..c3dd3c71054ca 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/UnionSet.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/UnionSet.java @@ -73,7 +73,7 @@ public boolean contains(Object o) { @Override public Iterator iterator() { - return new Iterator() { + return new Iterator<>() { private final Iterator largeSetIterator = largeSet.iterator(); private final Iterator smallSetIterator = smallSet.iterator(); private T next = null; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java index 3b59d820a70f6..2f04931bda45b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java @@ -253,7 +253,7 @@ public ConsumerGroupMember build() { /** * The rebalance timeout provided by the member. */ - private int rebalanceTimeoutMs; + private final int rebalanceTimeoutMs; /** * The subscription pattern configured by the member. @@ -360,7 +360,7 @@ public JoinGroupRequestData.JoinGroupRequestProtocolCollection supportedJoinGrou */ public Optional classicProtocolSessionTimeout() { if (useClassicProtocol()) { - return Optional.ofNullable(classicMemberMetadata.sessionTimeoutMs()); + return Optional.of(classicMemberMetadata.sessionTimeoutMs()); } else { return Optional.empty(); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java index 8c7e7a15c1f39..4e34e15fee910 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java @@ -23,8 +23,6 @@ import org.apache.kafka.coordinator.group.GroupConfig; import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -179,8 +177,6 @@ private void validate() { * Copy the subset of properties that are relevant to share group. */ public Map extractShareGroupConfigMap() { - Map groupProps = new HashMap<>(); - groupProps.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, shareGroupRecordLockDurationMs()); - return Collections.unmodifiableMap(groupProps); + return Map.of(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, shareGroupRecordLockDurationMs()); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 1ec6fe7d68c1a..4956acaf3866b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -36,7 +36,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -@SuppressWarnings("deprecation") public class GroupCoordinatorConfigTest { private static final List GROUP_COORDINATOR_CONFIG_DEFS = Arrays.asList( GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java index 995f1ee74a50b..2f6aacccebabc 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MetadataImageBuilder.java @@ -27,7 +27,7 @@ import java.util.Arrays; public class MetadataImageBuilder { - private MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); + private final MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); public MetadataImageBuilder addTopic( Uuid topicId, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index fa6a95e741038..4aa3cd7f7005e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -102,11 +102,11 @@ public static class Builder { private final MockCoordinatorTimer timer = new MockCoordinatorTimer<>(time); private final LogContext logContext = new LogContext(); private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); + private final GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class); + private final GroupConfigManager configManager = mock(GroupConfigManager.class); private GroupMetadataManager groupMetadataManager = null; private MetadataImage metadataImage = null; private GroupCoordinatorConfig config = null; - private GroupCoordinatorMetricsShard metrics = mock(GroupCoordinatorMetricsShard.class); - private GroupConfigManager configManager = mock(GroupConfigManager.class); Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) { config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize, 60000L, 24 * 60); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/UnionSetTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/UnionSetTest.java index 23986231b6501..2653c7385f184 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/UnionSetTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/UnionSetTest.java @@ -45,8 +45,7 @@ public void testUnion() { Set.of(2, 3, 4, 5) ); - List result = new ArrayList<>(); - result.addAll(union); + List result = new ArrayList<>(union); result.sort(Integer::compareTo); assertEquals(List.of(1, 2, 3, 4, 5), result); diff --git a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigTest.java b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigTest.java index ea68367681c3c..5ed9e74c4bda0 100644 --- a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigTest.java +++ b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/TransactionLogConfigTest.java @@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -88,7 +89,7 @@ void ShouldGetDynamicValueFromAbstractConfig() { TransactionLogConfig transactionLogConfig = new TransactionLogConfig(config); - assertEquals(false, transactionLogConfig.transactionPartitionVerificationEnable()); + assertFalse(transactionLogConfig.transactionPartitionVerificationEnable()); assertEquals(88, transactionLogConfig.producerIdExpirationMs()); // If the following calls are missing, we won’t be able to distinguish whether the value is set in the constructor or if