Skip to content

Commit

Permalink
Merge branch 'main' into block_cache
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh committed Nov 17, 2023
2 parents ddfbaf0 + 3b0b582 commit b6d3f98
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,10 @@ public class Failover {
private static final Logger LOGGER = LoggerFactory.getLogger(Failover.class);
private final ExecutorService executor = Threads.newFixedThreadPool(1, ThreadUtils.createThreadFactory("wal-failover-%d", true), LOGGER);
private final FailoverFactory factory;
private final FailoverManager failoverManager;
private final WALRecover walRecover;

public Failover(FailoverFactory factory, FailoverManager failoverManager, WALRecover walRecover) {
public Failover(FailoverFactory factory, WALRecover walRecover) {
this.factory = factory;
this.failoverManager = failoverManager;
this.walRecover = walRecover;
}

Expand All @@ -71,6 +69,10 @@ protected void fence(FailoverRequest request) {
// TODO: run command to fence the device
}

protected void complete(FailoverRequest request) {
// TODO: run command to delete the volume
}

class FailoverTask {
private final FailoverRequest request;
private int nodeId = NOOP_NODE_ID;
Expand Down Expand Up @@ -102,9 +104,8 @@ public FailoverResponse failover() throws Throwable {
ObjectManager objectManager = factory.getObjectManager(nodeId, epoch);
LOGGER.info("failover start recover {}", request);
walRecover.recover(wal, streamManager, objectManager, taskLogger);

// notify controller failover completed and controller could delete the volume.
failoverManager.completeFailover(new CompleteFailoverRequest(nodeId, request.getVolumeId())).get();
// delete the volume
complete(request);
LOGGER.info("failover done {}", request);
} finally {
wal.shutdownGracefully();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,28 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class FailoverTest {
String path;
FailoverFactory failoverFactory;
FailoverManager failoverManager;
WALRecover walRecover;
Failover failover;

@BeforeEach
public void setup() {
path = "/tmp/" + System.currentTimeMillis() + "/failover_test_wal";
failoverFactory = mock(FailoverFactory.class);
failoverManager = mock(FailoverManager.class);
walRecover = mock(WALRecover.class);
failover = new Failover(failoverFactory, failoverManager, walRecover);
failover = spy(new Failover(failoverFactory, walRecover));
}

@AfterEach
Expand Down Expand Up @@ -84,13 +80,12 @@ public void test() throws IOException, ExecutionException, InterruptedException,
Assertions.assertTrue(exceptionThrown);

// node match
when(failoverManager.completeFailover(any())).thenReturn(CompletableFuture.completedFuture(null));
request.setNodeId(233);
failover.failover(request).get(1, TimeUnit.SECONDS);

ArgumentCaptor<CompleteFailoverRequest> ac = ArgumentCaptor.forClass(CompleteFailoverRequest.class);
verify(failoverManager, times(1)).completeFailover(ac.capture());
CompleteFailoverRequest req = ac.getValue();
ArgumentCaptor<FailoverRequest> ac = ArgumentCaptor.forClass(FailoverRequest.class);
verify(failover, times(1)).complete(ac.capture());
FailoverRequest req = ac.getValue();
assertEquals(233, req.getNodeId());
assertEquals("test_volume_id", req.getVolumeId());
}
Expand Down

0 comments on commit b6d3f98

Please sign in to comment.