Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/IGNITE-20902' into IGNITE-20902
Browse files Browse the repository at this point in the history
# Conflicts:
#	modules/raft/src/integrationTest/java/org/apache/ignite/raft/jraft/core/ItNodeTest.java
  • Loading branch information
12rcu committed Jan 27, 2025
2 parents 5229ec4 + f1b0559 commit 3827640
Showing 1 changed file with 91 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -2717,8 +2716,7 @@ public void testAppendEntriesWhenFollowerIsInErrorState() throws Exception {
assertEquals(20, fsm.getLogs().size());
}

@Test
@Timeout(value = 25_000, unit = TimeUnit.MILLISECONDS)
@RepeatedTest(value = 2, failureThreshold = 1)
public void testFollowerStartStopFollowing() throws Exception {
// start five nodes
List<TestPeer> peers = TestUtils.generatePeers(testInfo, 5);
Expand All @@ -2727,6 +2725,50 @@ public void testFollowerStartStopFollowing() throws Exception {

for (TestPeer peer : peers)
assertTrue(cluster.start(peer));

for (TestPeer peer: peers) {
MockStateMachine m = cluster.getFsmByPeer(peer.getPeerId());
m.setAdditionalEventHandler(new StateMachine() {
@Override
public void onApply(Iterator iter) {}

@Override
public void onShutdown() {}

@Override
public void onSnapshotSave(SnapshotWriter writer, Closure done) {}

@Override
public boolean onSnapshotLoad(SnapshotReader reader) {return false;}

@Override
public void onLeaderStart(long term) {}

@Override
public void onLeaderStop(Status status) {
if(status.getCode() == RaftError.ERAFTTIMEDOUT.getNumber()) {
fail(new TimeoutException("Unexpected election timeout!"));
}
}

@Override
public void onError(RaftException e) {}

@Override
public void onConfigurationCommitted(Configuration conf) {}

@Override
public void onStopFollowing(LeaderChangeContext ctx) {
if(ctx.getStatus().getCode() == RaftError.ERAFTTIMEDOUT.getNumber()) {
fail(new TimeoutException("Unexpected election timeout!"));
}
}

@Override
public void onStartFollowing(LeaderChangeContext ctx) {}
});
}

Node firstLeader = cluster.waitAndGetLeader();
assertNotNull(firstLeader);
cluster.ensureLeader(firstLeader);
Expand All @@ -2738,7 +2780,10 @@ public void testFollowerStartStopFollowing() throws Exception {
List<Node> firstFollowers = cluster.getFollowers();
assertEquals(4, firstFollowers.size());
for (Node node : firstFollowers) {
assertEquals(1, ((MockStateMachine) node.getOptions().getFsm()).getOnStartFollowingTimes());
System.out.println(((MockStateMachine) node.getOptions().getFsm()).getOnStartFollowingTimes());
assertWaitForCondition(1,
() -> ((MockStateMachine) node.getOptions().getFsm()).getOnStartFollowingTimes(),
Duration.of(10_000, ChronoUnit.MILLIS));
assertEquals(0, ((MockStateMachine) node.getOptions().getFsm()).getOnStopFollowingTimes());
}

Expand All @@ -2753,7 +2798,9 @@ public void testFollowerStartStopFollowing() throws Exception {
List<Node> secondFollowers = cluster.getFollowers();
assertEquals(3, secondFollowers.size());
for (Node node : secondFollowers) {
assertEquals(2, ((MockStateMachine) node.getOptions().getFsm()).getOnStartFollowingTimes());
assertWaitForCondition(2,
() -> ((MockStateMachine) node.getOptions().getFsm()).getOnStartFollowingTimes(),
Duration.of(5_000, ChronoUnit.MILLIS));
assertEquals(1, ((MockStateMachine) node.getOptions().getFsm()).getOnStopFollowingTimes());
}

Expand All @@ -2770,12 +2817,16 @@ public void testFollowerStartStopFollowing() throws Exception {
for (int i = 0; i < 3; i++) {
Node follower = thirdFollowers.get(i);
if (follower.getNodeId().getPeerId().equals(secondLeader.getNodeId().getPeerId())) {
assertEquals(2, ((MockStateMachine) follower.getOptions().getFsm()).getOnStartFollowingTimes());
assertWaitForCondition(2,
() -> ((MockStateMachine) follower.getOptions().getFsm()).getOnStartFollowingTimes(),
Duration.of(5_000, ChronoUnit.MILLIS));
assertEquals(1, ((MockStateMachine) follower.getOptions().getFsm()).getOnStopFollowingTimes());
continue;
}

assertEquals(3, ((MockStateMachine) follower.getOptions().getFsm()).getOnStartFollowingTimes());
assertWaitForCondition(3,
() -> ((MockStateMachine) follower.getOptions().getFsm()).getOnStartFollowingTimes(),
Duration.of(5_000, ChronoUnit.MILLIS));
assertEquals(2, ((MockStateMachine) follower.getOptions().getFsm()).getOnStopFollowingTimes());
}

Expand Down Expand Up @@ -4498,6 +4549,37 @@ private static boolean waitForTopology(TestCluster cluster, PeerId peerId, int e
return false;
}

/**
* assert equals expected and actual and tests if this gets true within a timeout. The method will log all values the actual parameter
* has taken if the timeout is reached.
* @param expected the expected value
* @param actual the actual value, this should change over time and therefore a supplier
* @param timeout the duration within the actual value should be the same as the expected value
* @param <T> the type of expected and actual value
* @throws TimeoutException when the duration is reached
*/
@SuppressWarnings("BusyWait")
private static <T> void assertWaitForCondition(T expected, Supplier<T> actual, Duration timeout) throws TimeoutException {
long stop = System.currentTimeMillis() + timeout.toMillis();
HashSet<Object> results = new HashSet<>();
while (System.currentTimeMillis() < stop) {
T actualVal = actual.get();
try {
assertEquals(expected, actualVal);
return;
} catch (Throwable ignored) {
//no matching result, save for debug
results.add(actualVal);
}
try {
Thread.sleep(50);
} catch (InterruptedException ignored) {
}
}
throw new TimeoutException(
"Timeout reached while waiting for expected result. Expected: " + expected + ", Actual Results: " + results);
}

/**
* @param cond The condition.
* @param timeout The timeout.
Expand Down Expand Up @@ -4588,30 +4670,17 @@ private void sendTestTaskAndWait(Node node, RaftError err) throws InterruptedExc
this.sendTestTaskAndWait(node, 0, 10, err);
}

// Note that waiting for the latch when tasks are applying doesn't guarantee that FSMCallerImpl.lastAppliedIndex
// will be updated immediately.
private void sendTestTaskAndWait(Node node, int start, int amount,
RaftError err) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(amount);
MockStateMachine stateMachine = (MockStateMachine) node.getOptions().getFsm();
long appliedIndexBeforeRunCommands = stateMachine.getAppliedIndex();
for (int i = start; i < start + amount; i++) {
ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes(UTF_8));
Task task = new Task(data, new ExpectClosure(err, latch));
node.apply(task);
}
waitLatch(latch);

if (err == RaftError.SUCCESS) {
// This check is needed to avoid a race with snapshots, since the latch may complete before FSMCallerImpl#lastAppliedIndex is
// updated and the snapshot creation starts.
assertTrue(
waitForCondition(() -> stateMachine.getAppliedIndex() >= appliedIndexBeforeRunCommands + amount, 1_000),
() -> String.format(
"Failed to wait for last applied index update on node: "
+ "[node=%s, appliedIndexBeforeRunCommands=%s, lastAppliedIndex=%s, amount=%s]",
stateMachine.getPeerId(), appliedIndexBeforeRunCommands, stateMachine.getAppliedIndex(), amount
)
);
}
}

private void sendTestTaskAndWait(Node node, int start,
Expand Down

0 comments on commit 3827640

Please sign in to comment.