Skip to content

Commit

Permalink
MINOR: Various cleanups in raft (apache#15805)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
mimaison authored Apr 26, 2024
1 parent 2db87f0 commit e779225
Show file tree
Hide file tree
Showing 14 changed files with 125 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private QuorumStateData readStateFromFile(File file) {
}

if (dataVersionNode.asInt() != 0) {
throw new UnsupportedVersionException("Unknown data version of " + dataVersionNode.toString());
throw new UnsupportedVersionException("Unknown data version of " + dataVersionNode);
}

final short dataVersion = dataVersionNode.shortValue();
Expand Down
10 changes: 4 additions & 6 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1881,12 +1881,10 @@ private FetchSnapshotRequestData buildFetchSnapshotRequest(OffsetAndEpoch snapsh
clusterId,
quorum().localIdOrSentinel(),
log.topicPartition(),
snapshotPartition -> {
return snapshotPartition
.setCurrentLeaderEpoch(quorum.epoch())
.setSnapshotId(requestSnapshotId)
.setPosition(snapshotSize);
}
snapshotPartition -> snapshotPartition
.setCurrentLeaderEpoch(quorum.epoch())
.setSnapshotId(requestSnapshotId)
.setPosition(snapshotSize)
);

return request.setReplicaId(quorum.localIdOrSentinel());
Expand Down
2 changes: 1 addition & 1 deletion raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public void ensureValid(String name, Object value) {
}

@SuppressWarnings("unchecked")
List<String> voterStrings = (List) value;
List<String> voterStrings = (List<String>) value;

// Attempt to parse the connect strings
parseVoterConnections(voterStrings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,9 @@ public void assertCantReadQuorumStateVersion(String jsonString) throws IOExcepti
// We initialized a state from the metadata log
assertTrue(stateFile.exists());

final int epoch = 3012;
final int leaderId = 9990;
final int follower1 = leaderId + 1;
final int follower2 = follower1 + 1;
Set<Integer> voters = Utils.mkSet(leaderId, follower1, follower2);
writeToStateFile(stateFile, jsonString);

assertThrows(UnsupportedVersionException.class, () -> {
stateStore.readElectionState(); });
assertThrows(UnsupportedVersionException.class, () -> stateStore.readElectionState());

stateStore.clear();
assertFalse(stateFile.exists());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public void update(Time time, MockClient.MetadataUpdate update) { }
ApiKeys.FETCH
);

private final String clusterId = "clusterId";
private final int requestTimeoutMs = 30000;
private final Time time = new MockTime();
private final MockClient client = new MockClient(time, new StubMetadataUpdater());
Expand Down Expand Up @@ -220,11 +219,11 @@ public void testFetchRequestDowngrade(short version) {
AbstractRequest request = client.requests().peek().requestBuilder().build(version);

if (version < 15) {
assertTrue(((FetchRequest) request).data().replicaId() == 1);
assertTrue(((FetchRequest) request).data().replicaState().replicaId() == -1);
assertEquals(1, ((FetchRequest) request).data().replicaId());
assertEquals(-1, ((FetchRequest) request).data().replicaState().replicaId());
} else {
assertTrue(((FetchRequest) request).data().replicaId() == -1);
assertTrue(((FetchRequest) request).data().replicaState().replicaId() == 1);
assertEquals(-1, ((FetchRequest) request).data().replicaId());
assertEquals(1, ((FetchRequest) request).data().replicaState().replicaId());
}
}

Expand Down Expand Up @@ -256,6 +255,7 @@ private void sendAndAssertErrorResponse(ApiKeys apiKey, int destinationId, Error
private ApiMessage buildTestRequest(ApiKeys key) {
int leaderEpoch = 5;
int leaderId = 1;
String clusterId = "clusterId";
switch (key) {
case BEGIN_QUORUM_EPOCH:
return BeginQuorumEpochRequest.singletonRequest(topicPartition, clusterId, leaderEpoch, leaderId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1873,12 +1873,10 @@ private static FetchSnapshotRequestData fetchSnapshotRequest(
clusterId,
replicaId,
topicPartition,
snapshotPartition -> {
return snapshotPartition
.setCurrentLeaderEpoch(epoch)
.setSnapshotId(snapshotId)
.setPosition(position);
}
snapshotPartition -> snapshotPartition
.setCurrentLeaderEpoch(epoch)
.setSnapshotId(snapshotId)
.setPosition(position)
);

return request.setMaxBytes(maxBytes);
Expand Down Expand Up @@ -1963,8 +1961,8 @@ private static SnapshotWriter<String> snapshotWriter(RaftClientTestContext conte

private final static class MemorySnapshotWriter implements RawSnapshotWriter {
private final OffsetAndEpoch snapshotId;
private final AtomicLong frozenPosition;
private ByteBuffer data;
private AtomicLong frozenPosition;

public MemorySnapshotWriter(OffsetAndEpoch snapshotId) {
this.snapshotId = snapshotId;
Expand Down
4 changes: 2 additions & 2 deletions raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ public int hashCode() {
}

private DescribeQuorumResponseData.ReplicaState describeVoterState(
LeaderState state,
LeaderState<?> state,
int voterId,
long currentTimeMs
) {
Expand All @@ -580,7 +580,7 @@ private DescribeQuorumResponseData.ReplicaState describeVoterState(
}

private DescribeQuorumResponseData.ReplicaState describeObserverState(
LeaderState state,
LeaderState<?> state,
int observerId,
long currentTimeMs
) {
Expand Down
33 changes: 15 additions & 18 deletions raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -217,7 +216,7 @@ public void testAppendControlRecord() {
}

@Test
public void testAppendAsFollower() throws IOException {
public void testAppendAsFollower() {
final long initialOffset = 5;
final int epoch = 3;
SimpleRecord recordFoo = new SimpleRecord("foo".getBytes());
Expand Down Expand Up @@ -332,8 +331,6 @@ public void testMetadataValidation() {
LogFetchInfo readInfo = log.read(5, Isolation.UNCOMMITTED);
assertEquals(5L, readInfo.startOffsetMetadata.offset);
assertTrue(readInfo.startOffsetMetadata.metadata.isPresent());
MockLog.MockOffsetMetadata offsetMetadata = (MockLog.MockOffsetMetadata)
readInfo.startOffsetMetadata.metadata.get();

// Update to a high watermark with valid offset metadata
log.updateHighWatermark(readInfo.startOffsetMetadata);
Expand Down Expand Up @@ -381,7 +378,7 @@ public void testEmptyAppendNotAllowed() {
}

@Test
public void testReadOutOfRangeOffset() throws IOException {
public void testReadOutOfRangeOffset() {
final long initialOffset = 5L;
final int epoch = 3;
SimpleRecord recordFoo = new SimpleRecord("foo".getBytes());
Expand Down Expand Up @@ -431,7 +428,7 @@ public void testUnflushedRecordsLostAfterReopen() {
}

@Test
public void testCreateSnapshot() throws IOException {
public void testCreateSnapshot() {
int numberOfRecords = 10;
int epoch = 0;
OffsetAndEpoch snapshotId = new OffsetAndEpoch(numberOfRecords, epoch);
Expand Down Expand Up @@ -588,7 +585,7 @@ public void testReadMissingSnapshot() {
}

@Test
public void testUpdateLogStartOffset() throws IOException {
public void testUpdateLogStartOffset() {
int offset = 10;
int epoch = 0;
OffsetAndEpoch snapshotId = new OffsetAndEpoch(offset, epoch);
Expand Down Expand Up @@ -634,7 +631,7 @@ public void testUpdateLogStartOffsetWithMissingSnapshot() {
}

@Test
public void testFailToIncreaseLogStartPastHighWatermark() throws IOException {
public void testFailToIncreaseLogStartPastHighWatermark() {
int offset = 10;
int epoch = 0;
OffsetAndEpoch snapshotId = new OffsetAndEpoch(2 * offset, epoch);
Expand All @@ -653,7 +650,7 @@ public void testFailToIncreaseLogStartPastHighWatermark() throws IOException {
}

@Test
public void testTruncateFullyToLatestSnapshot() throws IOException {
public void testTruncateFullyToLatestSnapshot() {
int numberOfRecords = 10;
int epoch = 0;
OffsetAndEpoch sameEpochSnapshotId = new OffsetAndEpoch(2 * numberOfRecords, epoch);
Expand Down Expand Up @@ -686,7 +683,7 @@ public void testTruncateFullyToLatestSnapshot() throws IOException {
}

@Test
public void testDoesntTruncateFully() throws IOException {
public void testDoesntTruncateFully() {
int numberOfRecords = 10;
int epoch = 1;

Expand All @@ -710,7 +707,7 @@ public void testDoesntTruncateFully() throws IOException {
}

@Test
public void testTruncateWillRemoveOlderSnapshot() throws IOException {
public void testTruncateWillRemoveOlderSnapshot() {
int numberOfRecords = 10;
int epoch = 1;

Expand All @@ -734,7 +731,7 @@ public void testTruncateWillRemoveOlderSnapshot() throws IOException {
}

@Test
public void testUpdateLogStartOffsetWillRemoveOlderSnapshot() throws IOException {
public void testUpdateLogStartOffsetWillRemoveOlderSnapshot() {
int numberOfRecords = 10;
int epoch = 1;

Expand Down Expand Up @@ -771,7 +768,7 @@ public void testValidateEpochGreaterThanLastKnownEpoch() {
}

@Test
public void testValidateEpochLessThanOldestSnapshotEpoch() throws IOException {
public void testValidateEpochLessThanOldestSnapshotEpoch() {
int offset = 1;
int epoch = 1;

Expand All @@ -786,7 +783,7 @@ public void testValidateEpochLessThanOldestSnapshotEpoch() throws IOException {
}

@Test
public void testValidateOffsetLessThanOldestSnapshotOffset() throws IOException {
public void testValidateOffsetLessThanOldestSnapshotOffset() {
int offset = 2;
int epoch = 1;

Expand All @@ -801,7 +798,7 @@ public void testValidateOffsetLessThanOldestSnapshotOffset() throws IOException
}

@Test
public void testValidateOffsetEqualToOldestSnapshotOffset() throws IOException {
public void testValidateOffsetEqualToOldestSnapshotOffset() {
int offset = 2;
int epoch = 1;

Expand All @@ -816,7 +813,7 @@ public void testValidateOffsetEqualToOldestSnapshotOffset() throws IOException {
}

@Test
public void testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot() throws IOException {
public void testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot() {
int numberOfRecords = 5;
int offset = 10;

Expand All @@ -836,7 +833,7 @@ public void testValidateUnknownEpochLessThanLastKnownGreaterThanOldestSnapshot()
}

@Test
public void testValidateEpochLessThanFirstEpochInLog() throws IOException {
public void testValidateEpochLessThanFirstEpochInLog() {
int numberOfRecords = 5;
int offset = 10;

Expand Down Expand Up @@ -980,7 +977,7 @@ private static void validateReadRecords(List<SimpleRecord> expectedRecords, Mock
Records records = log.read(currentOffset, Isolation.UNCOMMITTED).records;
List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());

assertTrue(batches.size() > 0);
assertFalse(batches.isEmpty());
for (RecordBatch batch : batches) {
assertTrue(batch.countOrNull() > 0);
assertEquals(currentOffset, batch.baseOffset());
Expand Down
Loading

0 comments on commit e779225

Please sign in to comment.