Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sync progress on local chain #9184

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class MultipeerCommonAncestorFinder {
public static MultipeerCommonAncestorFinder create(
final RecentChainData recentChainData, final EventThread eventThread, final Spec spec) {
return new MultipeerCommonAncestorFinder(
recentChainData, new CommonAncestor(recentChainData), eventThread, spec);
recentChainData, new CommonAncestor(recentChainData, true), eventThread, spec);
}

public SafeFuture<UInt64> findCommonAncestor(final TargetChain targetChain) {
Expand All @@ -62,23 +62,24 @@ public SafeFuture<UInt64> findCommonAncestor(final TargetChain targetChain) {
}

return findCommonAncestor(latestFinalizedSlot, targetChain)
.thenPeek(ancestor -> LOG.trace("Found common ancestor at slot {}", ancestor));
.thenPeek(ancestor -> LOG.info("Found common ancestor at slot {}", ancestor));
}

private SafeFuture<UInt64> findCommonAncestor(
final UInt64 latestFinalizedSlot, final TargetChain targetChain) {
eventThread.checkOnEventThread();

final SyncSource source1 = targetChain.selectRandomPeer().orElseThrow();
final Optional<SyncSource> source2 = targetChain.selectRandomPeer(source1);
// Only one peer available, just go with it's common ancestor
final SafeFuture<UInt64> source1CommonAncestor =
commonAncestorFinder.getCommonAncestor(
source1, latestFinalizedSlot, targetChain.getChainHead().getSlot());
if (source2.isEmpty()) {
LOG.trace("Finding common ancestor from one peer");
LOG.info("Finding common ancestor from one peer");
return source1CommonAncestor;
}
LOG.trace("Finding common ancestor from two peers");
LOG.info("Finding common ancestor from two peers");
// Two peers available, so check they have the same common ancestor
return source1CommonAncestor
.thenCombineAsync(
Expand All @@ -90,8 +91,8 @@ private SafeFuture<UInt64> findCommonAncestor(
eventThread)
.exceptionally(
error -> {
LOG.debug("Failed to find common ancestor. Starting sync from finalized slot", error);
return latestFinalizedSlot;
LOG.info("Failed to find common ancestor. Starting sync from finalized slot", error);
throw new RuntimeException("Failed to find common ancestor.", error);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread;
import tech.pegasys.teku.infrastructure.exceptions.ExceptionUtil;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.storage.client.RecentChainData;
Expand Down Expand Up @@ -156,7 +157,8 @@ private InProgressSync startSync(final SyncTarget syncTarget) {
syncResult.finishAsync(
this::onSyncComplete,
error -> {
LOG.error("Sync process failed to complete");
LOG.error(
"Sync process failed to complete: {}", ExceptionUtil.getMessageOrSimpleName(error));
LOG.debug("Error encountered during sync", error);
onSyncComplete(SyncResult.FAILED);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public SlotAndBlockRoot getChainHead() {

public Optional<SyncSource> selectRandomPeer(final SyncSource... excluding) {
final Set<SyncSource> excludedPeers = Set.of(excluding);
return selectRandomPeer(excludedPeers);
}

public Optional<SyncSource> selectRandomPeer(final Set<SyncSource> excludedPeers) {
return peers.stream()
.filter(peer -> !excludedPeers.contains(peer))
.skip((int) ((peers.size() - excludedPeers.size()) * Math.random()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,18 @@ public class CommonAncestor {

private final RecentChainData recentChainData;
private final int maxAttempts;
private final boolean fallbackToFirst;

public CommonAncestor(final RecentChainData recentChainData) {
this(recentChainData, DEFAULT_MAX_ATTEMPTS);
public CommonAncestor(final RecentChainData recentChainData, final boolean fallbackToFirst) {
this(recentChainData, DEFAULT_MAX_ATTEMPTS, fallbackToFirst);
}

@VisibleForTesting
CommonAncestor(final RecentChainData recentChainData, final int maxAttempts) {
CommonAncestor(
final RecentChainData recentChainData, final int maxAttempts, final boolean fallbackToFirst) {
this.recentChainData = recentChainData;
this.maxAttempts = maxAttempts;
this.fallbackToFirst = fallbackToFirst;
}

public SafeFuture<UInt64> getCommonAncestor(
Expand All @@ -56,7 +59,7 @@ public SafeFuture<UInt64> getCommonAncestor(

final UInt64 localNonFinalisedSlotCount = lowestHeadSlot.minusMinZero(firstNonFinalSlot);

LOG.debug(
LOG.info(
"Local head slot {}. Have {} non finalized slots, peer head is {}",
ourHeadSlot,
localNonFinalisedSlotCount,
Expand All @@ -74,13 +77,25 @@ private SafeFuture<UInt64> getCommonAncestor(
final UInt64 firstRequestedSlot,
final UInt64 firstNonFinalSlot,
final int attempt) {
if (attempt >= maxAttempts || firstRequestedSlot.isLessThanOrEqualTo(firstNonFinalSlot)) {
if (firstRequestedSlot.isLessThanOrEqualTo(firstNonFinalSlot)) {
return SafeFuture.completedFuture(firstNonFinalSlot);
}

if (attempt >= maxAttempts) {
if (fallbackToFirst) {
return SafeFuture.completedFuture(firstNonFinalSlot);
}

return SafeFuture.failedFuture(
new RuntimeException(
"Failed to find common ancestor after "
+ maxAttempts
+ " attempts. Fallback to firstNonFinalSlot is disabled."));
}

final UInt64 lastSlot = firstRequestedSlot.plus(BLOCK_COUNT_PER_ATTEMPT);

LOG.debug("Sampling ahead from {} to {}.", firstRequestedSlot, lastSlot);
LOG.info("Sampling ahead from {} to {}.", firstRequestedSlot, lastSlot);

final BestBlockListener blockResponseListener = new BestBlockListener(recentChainData);
final PeerSyncBlockListener blockListener =
Expand Down Expand Up @@ -126,6 +141,7 @@ public SafeFuture<?> onResponse(final SignedBeaconBlock block) {
bestSlot
.map(uInt64 -> uInt64.max(block.getSlot()))
.or(() -> Optional.of(block.getSlot()));
LOG.info("Found common ancestor at {}", block.toLogString());
}

return SafeFuture.COMPLETE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private SafeFuture<PeerSyncResult> executeSync(
if (!findCommonAncestor) {
return SafeFuture.completedFuture(startSlot);
}
CommonAncestor ancestor = new CommonAncestor(recentChainData);
CommonAncestor ancestor = new CommonAncestor(recentChainData, true);
return ancestor.getCommonAncestor(peer, startSlot, status.getHeadSlot());
})
.thenCompose(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void shouldUseLatestFinalizedSlotWhenMultipleSourcesDisagree() {
}

@Test
void shouldUseLatestFinalizedSlotWhenOneSourceFailsToFindCommonAncestor() {
void shouldFailWhenOneSourceFailsToFindCommonAncestor() {
final TargetChain chain =
chainWith(
new SlotAndBlockRoot(UInt64.valueOf(10_000), dataStructureUtil.randomBytes32()),
Expand All @@ -167,7 +167,7 @@ void shouldUseLatestFinalizedSlotWhenOneSourceFailsToFindCommonAncestor() {

source1CommonAncestor.completeExceptionally(new RuntimeException("Doh!"));
source2CommonAncestor.complete(UInt64.valueOf(1485));
assertThat(result).isCompletedWithValue(finalizedSlot);
assertThat(result).isCompletedExceptionally();
}

private SafeFuture<UInt64> findCommonAncestor(final TargetChain chain) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@
import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.assertThatSafeFuture;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.peers.PeerStatus;

public class CommonAncestorTest extends AbstractSyncTest {

private final CommonAncestor commonAncestor = new CommonAncestor(recentChainData, 4);
private final CommonAncestor commonAncestor = new CommonAncestor(recentChainData, 4, false);
private final CommonAncestor commonAncestorForceMostRecent =
new CommonAncestor(recentChainData, 4, true);

@Test
void shouldNotSearchCommonAncestorWithoutSufficientLocalData() {
Expand All @@ -52,8 +56,13 @@
verifyNoInteractions(peer);
}

@Test
void shouldSearchStartingFromCurrentCommonHeadSlot() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void shouldSearchStartingFromCurrentCommonHeadSlot(
final boolean forceSelectingMostRecentLocalBlocks) {
final CommonAncestor commonAncestor =
forceSelectingMostRecentLocalBlocks ? commonAncestorForceMostRecent : this.commonAncestor;
Comment on lines +63 to +64

Check notice

Code scanning / CodeQL

Possible confusion of local and field Note test

Potentially confusing name: method
shouldSearchStartingFromCurrentCommonHeadSlot
also refers to field
commonAncestor
(as this.commonAncestor).

final UInt64 firstNonFinalSlot = dataStructureUtil.randomUInt64();

final UInt64 currentLocalHead = firstNonFinalSlot.plus(BLOCK_COUNT_PER_ATTEMPT.plus(21));
Expand Down Expand Up @@ -174,6 +183,84 @@
.isCompletedWithValue(firstNonFinalSlot);
}

@Test
void shouldFailWhenRPCFails() {
final UInt64 firstNonFinalSlot = dataStructureUtil.randomUInt64();

final UInt64 currentLocalHead = firstNonFinalSlot.plus(BLOCK_COUNT_PER_ATTEMPT.plus(21));
final UInt64 currentRemoteHead = firstNonFinalSlot.plus(BLOCK_COUNT_PER_ATTEMPT.plus(20));
final UInt64 syncStartSlot = currentRemoteHead.minus(BLOCK_COUNT_PER_ATTEMPT.minus(1));
final SafeFuture<Void> requestFuture = new SafeFuture<>();
when(peer.requestBlocksByRange(eq(syncStartSlot), eq(BLOCK_COUNT_PER_ATTEMPT), any()))
.thenReturn(requestFuture);
final PeerStatus status =
withPeerHeadSlot(
currentRemoteHead,
spec.computeEpochAtSlot(currentRemoteHead),
dataStructureUtil.randomBytes32());

when(recentChainData.getHeadSlot()).thenReturn(currentLocalHead);
when(recentChainData.containsBlock(any())).thenReturn(false);

final SafeFuture<UInt64> futureSlot =
commonAncestor.getCommonAncestor(peer, firstNonFinalSlot, status.getHeadSlot());

assertThat(futureSlot.isDone()).isFalse();

verify(peer)
.requestBlocksByRange(
eq(syncStartSlot),
eq(BLOCK_COUNT_PER_ATTEMPT),
blockResponseListenerArgumentCaptor.capture());

var error = new RuntimeException("Failed");

requestFuture.completeExceptionally(error);

assertThatSafeFuture(
commonAncestor.getCommonAncestor(peer, firstNonFinalSlot, status.getHeadSlot()))
.isCompletedExceptionallyWith(error);
}

@Test
void shouldFailWhenForceSelectingMostRecentLocalBlocksIsActivatedAndFails() {
final UInt64 firstNonFinalSlot = UInt64.valueOf(10_000);

final UInt64 currentLocalHead = firstNonFinalSlot.plus(BLOCK_COUNT_PER_ATTEMPT.plus(4_001));
final UInt64 currentRemoteHead = firstNonFinalSlot.plus(BLOCK_COUNT_PER_ATTEMPT.plus(4_000));
final UInt64 syncStartSlot = currentRemoteHead.minus(BLOCK_COUNT_PER_ATTEMPT.minus(1));

final UInt64 syncStartSlotAttempt1 = syncStartSlot;

final SafeFuture<Void> requestFutureAttempt1 = new SafeFuture<>();
when(peer.requestBlocksByRange(eq(syncStartSlotAttempt1), eq(BLOCK_COUNT_PER_ATTEMPT), any()))
.thenReturn(requestFutureAttempt1);

final PeerStatus status =
withPeerHeadSlot(
currentRemoteHead,
spec.computeEpochAtSlot(currentRemoteHead),
dataStructureUtil.randomBytes32());

when(recentChainData.getHeadSlot()).thenReturn(currentLocalHead);
when(recentChainData.containsBlock(any())).thenReturn(false);

final SafeFuture<UInt64> futureSlot =
commonAncestorForceMostRecent.getCommonAncestor(
peer, firstNonFinalSlot, status.getHeadSlot());

assertThat(futureSlot.isDone()).isFalse();

verifyAndRespond(syncStartSlotAttempt1, requestFutureAttempt1);

// we ended up on the firstNonFinalSlot
assertThatSafeFuture(futureSlot)
.isCompletedExceptionallyWith(RuntimeException.class)
.hasMessage("No common ancestor found using mos recent local blocks");

verifyNoMoreInteractions(peer);
}

private void verifyAndRespond(
final UInt64 syncStartSlot, final SafeFuture<Void> requestFutureAttempt) {
verify(peer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,13 @@ public SafeFuture<BlockImportResult> importBlock(
final SignedBeaconBlock block,
final Optional<BlockImportPerformance> blockImportPerformance,
final BlockBroadcastValidator blockBroadcastValidator) {
final Optional<Boolean> knownOptimistic = recentChainData.isBlockOptimistic(block.getRoot());
if (knownOptimistic.isPresent()) {
LOG.trace(
"Importing known block {}. Return successful result without re-processing.",
block::toLogString);
return SafeFuture.completedFuture(BlockImportResult.knownBlock(block, knownOptimistic.get()));
}
// final Optional<Boolean> knownOptimistic = recentChainData.isBlockOptimistic(block.getRoot());
// if (knownOptimistic.isPresent()) {
// LOG.trace(
// "Importing known block {}. Return successful result without re-processing.",
// block::toLogString);
// return SafeFuture.completedFuture(BlockImportResult.knownBlock(block, knownOptimistic.get()));
// }
if (BAD_BLOCKS.contains(block.getRoot())) {
LOG.info("Avoiding bad block from Electra holesky upgrade.");
return SafeFuture.completedFuture(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ SafeFuture<Boolean> processHead(final UInt64 nodeSlot) {

private SafeFuture<Boolean> processHead(
final Optional<UInt64> nodeSlot, final boolean isPreProposal) {
LOG.info("processHead {}", nodeSlot);
final Checkpoint retrievedJustifiedCheckpoint =
recentChainData.getStore().getJustifiedCheckpoint();
return recentChainData
Expand Down Expand Up @@ -592,7 +593,20 @@ private BlockImportResult importBlockAndState(
blobSidecars,
earliestBlobSidecarsSlot);

final boolean reorg;
if (recentChainData
.getBestBlockRoot()
.map(root -> !root.equals(block.getParentRoot()))
.orElse(false)) {
LOG.info("Reorg detected. Parent block root: {}", block.getParentRoot());
reorg = true;
} else {
LOG.info("not reorging {} with {}", recentChainData.getBestBlockRoot(), block.toLogString());
reorg = false;
}

if (shouldApplyProposerBoost(block, transaction)) {
LOG.info("Applying proposer boost for block at slot {}", block.getSlot());
transaction.setProposerBoostRoot(block.getRoot());
}

Expand Down Expand Up @@ -624,13 +638,23 @@ private BlockImportResult importBlockAndState(
}
updateForkChoiceForImportedBlock(block, result, forkChoiceStrategy);
notifyForkChoiceUpdatedAndOptimisticSyncingChanged(Optional.empty());

if (reorg) {
processHead(block.getSlot()).ifExceptionGetsHereRaiseABug();
}

return result;
}

// from consensus-specs/fork-choice:
private boolean shouldApplyProposerBoost(
final SignedBeaconBlock block, final StoreTransaction transaction) {
// get_current_slot(store) == block.slot

LOG.info(
"shouldApplyProposerBoost - current slot: {}, block slot: {}",
spec.getCurrentSlot(transaction),
block.getSlot());
if (!spec.getCurrentSlot(transaction).equals(block.getSlot())) {
return false;
}
Expand Down