Skip to content

Commit

Permalink
MINOR: Various cleanups in coordinator modules (apache#17828)
Browse files Browse the repository at this point in the history
Reviewers: David Jacot <[email protected]>, Ken Huang <[email protected]>
  • Loading branch information
mimaison authored Nov 19, 2024
1 parent 624cd4f commit 389f96a
Show file tree
Hide file tree
Showing 20 changed files with 89 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2333,7 +2333,7 @@ public void scheduleUnloadOperation(
if (context != null) {
context.lock.lock();
try {
if (!partitionEpoch.isPresent() || context.epoch < partitionEpoch.getAsInt()) {
if (partitionEpoch.isEmpty() || context.epoch < partitionEpoch.getAsInt()) {
log.info("Started unloading metadata for {} with epoch {}.", tp, partitionEpoch);
context.transitionTo(CoordinatorState.CLOSED);
coordinators.remove(tp, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

public class EventAccumulatorTest {

private class MockEvent implements EventAccumulator.Event<Integer> {
private static class MockEvent implements EventAccumulator.Event<Integer> {
int key;
int value;

Expand Down Expand Up @@ -153,7 +153,7 @@ public void testKeyConcurrentAndOrderingGuarantees() {
accumulator.addLast(event2);
assertEquals(3, accumulator.size());

MockEvent event = null;
MockEvent event;

// Poll event0.
event = accumulator.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
*/
public class InMemoryPartitionWriter implements PartitionWriter {

private class PartitionState {
private ReentrantLock lock = new ReentrantLock();
private List<Listener> listeners = new ArrayList<>();
private List<MemoryRecords> entries = new ArrayList<>();
private static class PartitionState {
private final ReentrantLock lock = new ReentrantLock();
private final List<Listener> listeners = new ArrayList<>();
private final List<MemoryRecords> entries = new ArrayList<>();
private long endOffset = 0L;
private long committedOffset = 0L;
}
Expand Down Expand Up @@ -134,9 +134,8 @@ public void commit(
state.lock.lock();
try {
state.committedOffset = offset;
state.listeners.forEach(listener -> {
listener.onHighWatermarkUpdated(tp, state.committedOffset);
});
state.listeners.forEach(listener ->
listener.onHighWatermarkUpdated(tp, state.committedOffset));
} finally {
state.lock.unlock();
}
Expand All @@ -149,9 +148,8 @@ public void commit(
state.lock.lock();
try {
state.committedOffset = state.endOffset;
state.listeners.forEach(listener -> {
listener.onHighWatermarkUpdated(tp, state.committedOffset);
});
state.listeners.forEach(listener ->
listener.onHighWatermarkUpdated(tp, state.committedOffset));
} finally {
state.lock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,6 @@ void onNewMetadataImage(
*
* @param groupId The group id.
* @param newGroupConfig The new group config
* @return void
*/
void updateGroupConfig(String groupId, Properties newGroupConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ public class GroupCoordinatorConfig {
public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols. Supported protocols: " +
Arrays.stream(Group.GroupType.values()).map(Group.GroupType::toString).collect(Collectors.joining(",")) + ". " +
"The " + Group.GroupType.SHARE + " rebalance protocol is in early access and therefore must not be used in production.";
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT =
Collections.unmodifiableList(Arrays.asList(Group.GroupType.CLASSIC.toString(), Group.GroupType.CONSUMER.toString()));
public static final List<String> GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of(
Group.GroupType.CLASSIC.toString(),
Group.GroupType.CONSUMER.toString()
);
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms";
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " +
"wait for writes to accumulate before flushing them to disk. Transactional writes are not accumulated.";
Expand Down Expand Up @@ -120,10 +122,10 @@ public class GroupCoordinatorConfig {

public static final String CONSUMER_GROUP_ASSIGNORS_CONFIG = "group.consumer.assignors";
public static final String CONSUMER_GROUP_ASSIGNORS_DOC = "The server side assignors as a list of full class names. The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor.";
public static final List<String> CONSUMER_GROUP_ASSIGNORS_DEFAULT = Collections.unmodifiableList(Arrays.asList(
UniformAssignor.class.getName(),
RangeAssignor.class.getName()
));
public static final List<String> CONSUMER_GROUP_ASSIGNORS_DEFAULT = List.of(
UniformAssignor.class.getName(),
RangeAssignor.class.getName()
);

public static final String CONSUMER_GROUP_MIGRATION_POLICY_CONFIG = "group.consumer.migration.policy";
public static final String CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT = ConsumerGroupMigrationPolicy.BIDIRECTIONAL.toString();
Expand Down Expand Up @@ -380,10 +382,9 @@ public Map<String, Integer> extractGroupConfigMap(ShareGroupConfig shareGroupCon
* Copy the subset of properties that are relevant to consumer group.
*/
public Map<String, Integer> extractConsumerGroupConfigMap() {
Map<String, Integer> groupProps = new HashMap<>();
groupProps.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, consumerGroupSessionTimeoutMs());
groupProps.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, consumerGroupHeartbeatIntervalMs());
return Collections.unmodifiableMap(groupProps);
return Map.of(
GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, consumerGroupSessionTimeoutMs(),
GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, consumerGroupHeartbeatIntervalMs());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,13 @@ public static CoordinatorRecord newConsumerGroupSubscriptionMetadataRecord(
Map<String, TopicMetadata> newSubscriptionMetadata
) {
ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue();
newSubscriptionMetadata.forEach((topicName, topicMetadata) -> {
newSubscriptionMetadata.forEach((topicName, topicMetadata) ->
value.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicMetadata.id())
.setTopicName(topicMetadata.name())
.setNumPartitions(topicMetadata.numPartitions())
);
});
)
);

return new CoordinatorRecord(
new ApiMessageAndVersion(
Expand Down Expand Up @@ -674,13 +674,13 @@ public static CoordinatorRecord newShareGroupSubscriptionMetadataRecord(
Map<String, TopicMetadata> newSubscriptionMetadata
) {
ShareGroupPartitionMetadataValue value = new ShareGroupPartitionMetadataValue();
newSubscriptionMetadata.forEach((topicName, topicMetadata) -> {
newSubscriptionMetadata.forEach((topicName, topicMetadata) ->
value.topics().add(new ShareGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicMetadata.id())
.setTopicName(topicMetadata.name())
.setNumPartitions(topicMetadata.numPartitions())
);
});
)
);

return new CoordinatorRecord(
new ApiMessageAndVersion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ public class GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord

public static class Builder implements CoordinatorShardBuilder<GroupCoordinatorShard, CoordinatorRecord> {
private final GroupCoordinatorConfig config;
private final GroupConfigManager groupConfigManager;
private LogContext logContext;
private SnapshotRegistry snapshotRegistry;
private Time time;
private CoordinatorTimer<Void, CoordinatorRecord> timer;
private GroupConfigManager groupConfigManager;
private CoordinatorMetrics coordinatorMetrics;
private TopicPartition topicPartition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(
ClassicGroup group = classicGroup(groupId, committedOffset);

if (group.isInState(STABLE)) {
if (!group.protocolName().isPresent()) {
if (group.protocolName().isEmpty()) {
throw new IllegalStateException("Invalid null group protocol for stable group");
}

Expand All @@ -751,7 +751,7 @@ public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(
.setGroupState(group.stateAsString())
.setProtocolType(group.protocolType().orElse(""))
.setMembers(group.allMembers().stream()
.map(member -> member.describeNoMetadata())
.map(ClassicGroupMember::describeNoMetadata)
.collect(Collectors.toList())
)
);
Expand Down Expand Up @@ -3737,9 +3737,9 @@ public void replay(

if (value != null) {
Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
value.topics().forEach(topicMetadata -> {
subscriptionMetadata.put(topicMetadata.topicName(), TopicMetadata.fromRecord(topicMetadata));
});
value.topics().forEach(topicMetadata ->
subscriptionMetadata.put(topicMetadata.topicName(), TopicMetadata.fromRecord(topicMetadata))
);
group.setSubscriptionMetadata(subscriptionMetadata);
} else {
group.setSubscriptionMetadata(Collections.emptyMap());
Expand Down Expand Up @@ -3947,19 +3947,19 @@ public void onUnloaded() {
case DEAD:
break;
case PREPARING_REBALANCE:
classicGroup.allMembers().forEach(member -> {
classicGroup.allMembers().forEach(member ->
classicGroup.completeJoinFuture(member, new JoinGroupResponseData()
.setMemberId(member.memberId())
.setErrorCode(NOT_COORDINATOR.code()));
});
.setErrorCode(NOT_COORDINATOR.code()))
);

break;
case COMPLETING_REBALANCE:
case STABLE:
classicGroup.allMembers().forEach(member -> {
classicGroup.allMembers().forEach(member ->
classicGroup.completeSyncFuture(member, new SyncGroupResponseData()
.setErrorCode(NOT_COORDINATOR.code()));
});
.setErrorCode(NOT_COORDINATOR.code()))
);
}
break;
case SHARE:
Expand Down Expand Up @@ -6086,7 +6086,7 @@ private boolean maybeDeleteEmptyClassicGroup(Group group, List<CoordinatorRecord
if (isEmptyClassicGroup(group)) {
// Delete the classic group by adding tombstones.
// There's no need to remove the group as the replay of tombstones removes it.
if (group != null) createGroupTombstoneRecords(group, records);
createGroupTombstoneRecords(group, records);
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ public boolean cleanupExpiredOffsets(String groupId, List<CoordinatorRecord> rec
long currentTimestampMs = time.milliseconds();
Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition();

if (!offsetExpirationCondition.isPresent()) {
if (offsetExpirationCondition.isEmpty()) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public boolean contains(Object o) {

@Override
public Iterator<Integer> iterator() {
return new Iterator<Integer>() {
return new Iterator<>() {
private int current = from;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -55,7 +52,6 @@
* Balance > Stickiness.
*/
public class UniformHeterogeneousAssignmentBuilder {
private static final Logger LOG = LoggerFactory.getLogger(UniformHeterogeneousAssignmentBuilder.class);

/**
* The maximum number of iterations to perform in the final iterative balancing phase.
Expand Down Expand Up @@ -181,50 +177,44 @@ public UniformHeterogeneousAssignmentBuilder(GroupSpec groupSpec, SubscribedTopi
}
}

this.topicComparator = new Comparator<Uuid>() {
@Override
public int compare(final Uuid topic1Id, final Uuid topic2Id) {
int topic1PartitionCount = subscribedTopicDescriber.numPartitions(topic1Id);
int topic2PartitionCount = subscribedTopicDescriber.numPartitions(topic2Id);
int topic1SubscriberCount = topicSubscribers.get(topic1Id).size();
int topic2SubscriberCount = topicSubscribers.get(topic2Id).size();

// Order by partitions per subscriber, descending.
int order = Double.compare(
(double) topic2PartitionCount / topic2SubscriberCount,
(double) topic1PartitionCount / topic1SubscriberCount
);

// Then order by subscriber count, ascending.
if (order == 0) {
order = Integer.compare(topic1SubscriberCount, topic2SubscriberCount);
}

// Then order by topic id, ascending.
if (order == 0) {
order = topic1Id.compareTo(topic2Id);
}
this.topicComparator = (topic1Id, topic2Id) -> {
int topic1PartitionCount = subscribedTopicDescriber.numPartitions(topic1Id);
int topic2PartitionCount = subscribedTopicDescriber.numPartitions(topic2Id);
int topic1SubscriberCount = topicSubscribers.get(topic1Id).size();
int topic2SubscriberCount = topicSubscribers.get(topic2Id).size();

// Order by partitions per subscriber, descending.
int order = Double.compare(
(double) topic2PartitionCount / topic2SubscriberCount,
(double) topic1PartitionCount / topic1SubscriberCount
);

// Then order by subscriber count, ascending.
if (order == 0) {
order = Integer.compare(topic1SubscriberCount, topic2SubscriberCount);
}

return order;
// Then order by topic id, ascending.
if (order == 0) {
order = topic1Id.compareTo(topic2Id);
}
};

this.memberComparator = new Comparator<Integer>() {
@Override
public int compare(final Integer memberIndex1, final Integer memberIndex2) {
// Order by number of assigned partitions, ascending.
int order = Integer.compare(
memberTargetAssignmentSizes[memberIndex1],
memberTargetAssignmentSizes[memberIndex2]
);
return order;
};

// Then order by member index, ascending.
if (order == 0) {
order = memberIndex1.compareTo(memberIndex2);
}
this.memberComparator = (memberIndex1, memberIndex2) -> {
// Order by number of assigned partitions, ascending.
int order = Integer.compare(
memberTargetAssignmentSizes[memberIndex1],
memberTargetAssignmentSizes[memberIndex2]
);

return order;
// Then order by member index, ascending.
if (order == 0) {
order = memberIndex1.compareTo(memberIndex2);
}

return order;
};

// Initialize partition owners for the target assignments.
Expand Down Expand Up @@ -851,14 +841,6 @@ private void assignPartition(Uuid topicId, int partition, int memberIndex) {
addPartitionToTargetAssignment(topicId, partition, memberIndex);
}

/**
* @param memberIndex The member index.
* @return The current assignment size for the given member.
*/
private int targetAssignmentSize(int memberIndex) {
return memberTargetAssignmentSizes[memberIndex];
}

/**
* Assigns a partition to a member and updates the current assignment size.
*
Expand Down
Loading

0 comments on commit 389f96a

Please sign in to comment.