diff --git a/core/src/integration/java/com/radixdlt/integration/steady_state/deterministic/consensus/DivergentExecutionLivenessBreakTest.java b/core/src/integration/java/com/radixdlt/integration/steady_state/deterministic/consensus/DivergentExecutionLivenessBreakTest.java index 54e2f77ba3..4fe001f045 100644 --- a/core/src/integration/java/com/radixdlt/integration/steady_state/deterministic/consensus/DivergentExecutionLivenessBreakTest.java +++ b/core/src/integration/java/com/radixdlt/integration/steady_state/deterministic/consensus/DivergentExecutionLivenessBreakTest.java @@ -96,6 +96,7 @@ import com.radixdlt.rev2.REv2StateComputer; import com.radixdlt.rev2.REv2TransactionsAndProofReader; import com.radixdlt.transactions.RawNotarizedTransaction; +import com.radixdlt.utils.WrappedByteArray; import java.util.List; import java.util.Random; import java.util.function.Consumer; @@ -195,7 +196,8 @@ public StateComputerLedger.StateComputerPrepareResult prepare( @Override public LedgerProofBundle commit( - LedgerExtension ledgerExtension, Option serializedVertexStoreState) { + LedgerExtension ledgerExtension, + Option serializedVertexStoreState) { return underlyingStateComputer.commit(ledgerExtension, serializedVertexStoreState); } }; diff --git a/core/src/main/java/com/radixdlt/consensus/bft/BFTHighQCUpdate.java b/core/src/main/java/com/radixdlt/consensus/bft/BFTHighQCUpdate.java index c5cc9ac910..6d249a574b 100644 --- a/core/src/main/java/com/radixdlt/consensus/bft/BFTHighQCUpdate.java +++ b/core/src/main/java/com/radixdlt/consensus/bft/BFTHighQCUpdate.java @@ -70,6 +70,7 @@ import com.radixdlt.consensus.vertexstore.ExecutedVertex; import com.radixdlt.lang.Option; import com.radixdlt.utils.WrappedByteArray; +import java.util.AbstractCollection; /** * An event emitted when vertex store updates its highQC, which possibly results in some vertices @@ -79,4 +80,15 @@ public record BFTHighQCUpdate( HighQC newHighQc, Option> committedVertices, WrappedByteArray serializedVertexStoreState) - implements LocalEvent {} + implements LocalEvent { + + @Override + public String toString() { + return String.format( + "%s[newHighQc=%s numCommittedVertices=%s serializedVertexStoreStateSize=%s]", + getClass().getSimpleName(), + newHighQc, + committedVertices.map(AbstractCollection::size).orElse(0), + serializedVertexStoreState.size()); + } +} diff --git a/core/src/main/java/com/radixdlt/consensus/bft/BFTInsertUpdate.java b/core/src/main/java/com/radixdlt/consensus/bft/BFTInsertUpdate.java index a7c4e144ba..e42c5906fc 100644 --- a/core/src/main/java/com/radixdlt/consensus/bft/BFTInsertUpdate.java +++ b/core/src/main/java/com/radixdlt/consensus/bft/BFTInsertUpdate.java @@ -71,4 +71,11 @@ /** An event emitted after a vertex has been inserted into the vertex store. */ public record BFTInsertUpdate( ExecutedVertex insertedVertex, WrappedByteArray serializedVertexStoreState) - implements LocalEvent {} + implements LocalEvent { + @Override + public String toString() { + return String.format( + "%s[insertedVertex=%s serializedVertexStoreStateSize=%s]", + getClass().getSimpleName(), insertedVertex(), serializedVertexStoreState.size()); + } +} diff --git a/core/src/main/java/com/radixdlt/consensus/bft/BFTRebuildUpdate.java b/core/src/main/java/com/radixdlt/consensus/bft/BFTRebuildUpdate.java index 8881395334..a821d4ea40 100644 --- a/core/src/main/java/com/radixdlt/consensus/bft/BFTRebuildUpdate.java +++ b/core/src/main/java/com/radixdlt/consensus/bft/BFTRebuildUpdate.java @@ -71,4 +71,12 @@ /** An even emitted when the vertex store has been rebuilt. */ public record BFTRebuildUpdate( VertexStoreState vertexStoreState, WrappedByteArray serializedVertexStoreState) - implements LocalEvent {} + implements LocalEvent { + + @Override + public String toString() { + return String.format( + "%s[serializedVertexStoreStateSize=%s]", + getClass().getSimpleName(), serializedVertexStoreState.size()); + } +} diff --git a/core/src/main/java/com/radixdlt/consensus/sync/BFTSync.java b/core/src/main/java/com/radixdlt/consensus/sync/BFTSync.java index 259e6aa206..3762bc78c6 100644 --- a/core/src/main/java/com/radixdlt/consensus/sync/BFTSync.java +++ b/core/src/main/java/com/radixdlt/consensus/sync/BFTSync.java @@ -244,7 +244,7 @@ public SyncResult syncToQC(HighQC highQC, @Nullable NodeId author, HighQcSource return SyncResult.INVALID; } - return switch (vertexStore.insertQc(qc)) { + return switch (vertexStore.insertQuorumCertificate(qc)) { case VertexStore.InsertQcResult.Inserted ignored -> { // QC was inserted, try TC too (as it can be higher), and then process a new highQC highQC.highestTC().map(vertexStore::insertTimeoutCertificate); diff --git a/core/src/main/java/com/radixdlt/consensus/vertexstore/VertexStoreAdapter.java b/core/src/main/java/com/radixdlt/consensus/vertexstore/VertexStoreAdapter.java index e1a3482ab7..d00a7416d8 100644 --- a/core/src/main/java/com/radixdlt/consensus/vertexstore/VertexStoreAdapter.java +++ b/core/src/main/java/com/radixdlt/consensus/vertexstore/VertexStoreAdapter.java @@ -133,10 +133,15 @@ public boolean insertTimeoutCertificate(TimeoutCertificate timeoutCertificate) { }; } - public VertexStore.InsertQcResult insertQc(QuorumCertificate qc) { + public VertexStore.InsertQcResult insertQuorumCertificate(QuorumCertificate qc) { final var result = vertexStore.insertQc(qc); - if (result instanceof VertexStore.InsertQcResult.Inserted inserted) { - dispatchPostQcInsertionEvents(inserted); + switch (result) { + case VertexStore.InsertQcResult.Inserted inserted -> this.highQCUpdateDispatcher.dispatch( + new BFTHighQCUpdate( + inserted.newHighQc(), + inserted.committedUpdate().map(VertexStore.CommittedUpdate::committedVertices), + inserted.serializedVertexStoreState())); + default -> {} // no-op } return result; } @@ -151,18 +156,21 @@ public List getPathFromRoot(HashCode vertexHash) { public void insertVertexChain(VertexChain vertexChain) { final var result = vertexStore.insertVertexChain(vertexChain); - result.insertedQcs().forEach(this::dispatchPostQcInsertionEvents); + result + .insertedQcs() + .forEach( + inserted -> { + this.highQCUpdateDispatcher.dispatch( + new BFTHighQCUpdate( + inserted.newHighQc(), + inserted + .committedUpdate() + .map(VertexStore.CommittedUpdate::committedVertices), + inserted.serializedVertexStoreState())); + }); result.insertUpdates().forEach(bftUpdateDispatcher::dispatch); } - private void dispatchPostQcInsertionEvents(VertexStore.InsertQcResult.Inserted inserted) { - this.highQCUpdateDispatcher.dispatch( - new BFTHighQCUpdate( - inserted.newHighQc(), - inserted.committedUpdate().map(VertexStore.CommittedUpdate::committedVertices), - inserted.serializedVertexStoreState())); - } - public Optional> getVertices(HashCode vertexId, int count) { return vertexStore.getVertices(vertexId, count).toOptional(); } diff --git a/core/src/main/java/com/radixdlt/consensus/vertexstore/VertexStoreJavaImpl.java b/core/src/main/java/com/radixdlt/consensus/vertexstore/VertexStoreJavaImpl.java index e74230ba74..e4cca32231 100644 --- a/core/src/main/java/com/radixdlt/consensus/vertexstore/VertexStoreJavaImpl.java +++ b/core/src/main/java/com/radixdlt/consensus/vertexstore/VertexStoreJavaImpl.java @@ -425,7 +425,10 @@ private void removeVertexAndPruneInternal(HashCode vertexId, Optional return; } - final var children = vertexChildren.removeAll(vertexId); + // Note that we need a copy of the children list here (.stream().toList()) - that is because + // we're removing the items in the loop (otherwise, this could lead to + // ConcurrentModificationException) + final var children = vertexChildren.get(vertexId).stream().toList(); for (HashCode child : children) { if (!Optional.of(child).equals(skip)) { removeVertexAndPruneInternal(child, Optional.empty()); diff --git a/core/src/main/java/com/radixdlt/ledger/StateComputerLedger.java b/core/src/main/java/com/radixdlt/ledger/StateComputerLedger.java index a8cb50d9a5..2460bfd189 100644 --- a/core/src/main/java/com/radixdlt/ledger/StateComputerLedger.java +++ b/core/src/main/java/com/radixdlt/ledger/StateComputerLedger.java @@ -85,6 +85,7 @@ import com.radixdlt.transactions.RawLedgerTransaction; import com.radixdlt.transactions.RawNotarizedTransaction; import com.radixdlt.utils.TimeSupplier; +import com.radixdlt.utils.WrappedByteArray; import java.util.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -162,7 +163,7 @@ StateComputerPrepareResult prepare( RoundDetails roundDetails); LedgerProofBundle commit( - LedgerExtension ledgerExtension, Option serializedVertexStoreState); + LedgerExtension ledgerExtension, Option serializedVertexStoreState); } private final StateComputer stateComputer; @@ -301,40 +302,21 @@ public EventProcessor bftHighQcUpdateEventProcessor() { .ifPresent( committedVertices -> { updateCommittedVerticesMetrics(committedVertices); - - final ImmutableList transactions = - committedVertices.stream() - .flatMap(ExecutedVertex::successfulTransactions) - .map(ExecutedTransaction::transaction) - .collect(ImmutableList.toImmutableList()); - highQcUpdate .newHighQc() .highestCommittedQC() .getProcessedCommit(hasher) .ifPresentOrElse( processedQcCommit -> { - switch (processedQcCommit) { - case ProcessedQcCommit.OfConensusQc ofConensusQc -> { - final var ledgerExtension = - new LedgerExtension(transactions, ofConensusQc.ledgerProof()); - metrics - .ledger() - .commit() - .measure( - () -> - this.commit( - ledgerExtension, - Option.some( - highQcUpdate - .serializedVertexStoreState() - .value()))); - } - case ProcessedQcCommit.OfInitialEpochQc ofInitialEpochQc -> { - // no-op, ignore - this.metrics.ledger().ignoredBftCommittedUpdates().inc(); - } - } + final var transactions = + committedVertices.stream() + .flatMap(ExecutedVertex::successfulTransactions) + .map(ExecutedTransaction::transaction) + .collect(ImmutableList.toImmutableList()); + processQcCommit( + processedQcCommit, + transactions, + highQcUpdate.serializedVertexStoreState()); }, () -> { // no-op, ignore @@ -344,6 +326,25 @@ public EventProcessor bftHighQcUpdateEventProcessor() { }; } + private void processQcCommit( + ProcessedQcCommit processedQcCommit, + ImmutableList transactions, + WrappedByteArray serializedVertexStoreState) { + switch (processedQcCommit) { + case ProcessedQcCommit.OfConensusQc ofConensusQc -> { + final var ledgerExtension = new LedgerExtension(transactions, ofConensusQc.ledgerProof()); + metrics + .ledger() + .commit() + .measure(() -> this.commit(ledgerExtension, Option.some(serializedVertexStoreState))); + } + case ProcessedQcCommit.OfInitialEpochQc ofInitialEpochQc -> { + // no-op, ignore + this.metrics.ledger().ignoredBftCommittedUpdates().inc(); + } + } + } + private void updateCommittedVerticesMetrics(ImmutableList committedVertices) { final var numCommittedFallbackVertices = committedVertices.stream().filter(v -> v.vertex().isFallback()).count(); @@ -396,7 +397,8 @@ public EventProcessor syncEventProcessor() { * InvalidCommitRequestException} is propagated from the Rust state computer. * */ - private void commit(LedgerExtension ledgerExtension, Option serializedVertexStoreState) { + private void commit( + LedgerExtension ledgerExtension, Option serializedVertexStoreState) { final var proofToCommit = ledgerExtension.proof(); final int extensionTransactionCount; // for metrics purposes only diff --git a/core/src/main/java/com/radixdlt/rev2/REv2StateComputer.java b/core/src/main/java/com/radixdlt/rev2/REv2StateComputer.java index 7d97f5fee2..d6795406db 100644 --- a/core/src/main/java/com/radixdlt/rev2/REv2StateComputer.java +++ b/core/src/main/java/com/radixdlt/rev2/REv2StateComputer.java @@ -90,6 +90,7 @@ import com.radixdlt.transactions.PreparedNotarizedTransaction; import com.radixdlt.transactions.RawNotarizedTransaction; import com.radixdlt.utils.UInt64; +import com.radixdlt.utils.WrappedByteArray; import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; @@ -283,7 +284,7 @@ public StateComputerLedger.StateComputerPrepareResult prepare( @Override public LedgerProofBundle commit( - LedgerExtension ledgerExtension, Option serializedVertexStoreState) { + LedgerExtension ledgerExtension, Option serializedVertexStoreState) { final var proof = ledgerExtension.proof(); final var header = proof.ledgerHeader(); @@ -291,7 +292,7 @@ public LedgerProofBundle commit( new CommitRequest( ledgerExtension.transactions(), proof, - serializedVertexStoreState, + serializedVertexStoreState.map(WrappedByteArray::value), Option.from(selfValidatorId)); final var result = stateComputer.commit(commitRequest); diff --git a/core/src/test-core/java/com/radixdlt/harness/simulation/monitors/consensus/LivenessInvariant.java b/core/src/test-core/java/com/radixdlt/harness/simulation/monitors/consensus/LivenessInvariant.java index 094da7c690..af160b94af 100644 --- a/core/src/test-core/java/com/radixdlt/harness/simulation/monitors/consensus/LivenessInvariant.java +++ b/core/src/test-core/java/com/radixdlt/harness/simulation/monitors/consensus/LivenessInvariant.java @@ -101,11 +101,6 @@ public Observable check(RunningNetwork network) { emitter.onNext(highQCUpdate.newHighQc().highestQC()); }, BFTHighQCUpdate.class); - nodeEvents.addListener( - (node, highQcUpdate) -> { - emitter.onNext(highQcUpdate.newHighQc().highestQC()); - }, - BFTHighQCUpdate.class); }) .serialize() .map(QuorumCertificate::getProposedHeader) diff --git a/core/src/test-core/java/com/radixdlt/statecomputer/MockedMempoolStateComputer.java b/core/src/test-core/java/com/radixdlt/statecomputer/MockedMempoolStateComputer.java index e4f7ac0052..9dda57b234 100644 --- a/core/src/test-core/java/com/radixdlt/statecomputer/MockedMempoolStateComputer.java +++ b/core/src/test-core/java/com/radixdlt/statecomputer/MockedMempoolStateComputer.java @@ -78,6 +78,7 @@ import com.radixdlt.p2p.NodeId; import com.radixdlt.targeted.mempool.SimpleMempool; import com.radixdlt.transactions.RawNotarizedTransaction; +import com.radixdlt.utils.WrappedByteArray; import java.util.List; import java.util.Set; import javax.annotation.Nullable; @@ -137,7 +138,7 @@ public StateComputerPrepareResult prepare( @Override public LedgerProofBundle commit( - LedgerExtension ledgerExtension, Option serializedVertexStoreState) { + LedgerExtension ledgerExtension, Option serializedVertexStoreState) { final var proof = this.stateComputer.commit(ledgerExtension, serializedVertexStoreState); this.mempool.handleTransactionsCommitted( ledgerExtension.transactions().stream() diff --git a/core/src/test-core/java/com/radixdlt/statecomputer/MockedStateComputer.java b/core/src/test-core/java/com/radixdlt/statecomputer/MockedStateComputer.java index 19214e2190..46a257bda1 100644 --- a/core/src/test-core/java/com/radixdlt/statecomputer/MockedStateComputer.java +++ b/core/src/test-core/java/com/radixdlt/statecomputer/MockedStateComputer.java @@ -82,6 +82,7 @@ import com.radixdlt.statecomputer.commit.CommitSummary; import com.radixdlt.transactions.RawNotarizedTransaction; import com.radixdlt.utils.UInt32; +import com.radixdlt.utils.WrappedByteArray; import java.util.List; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -125,7 +126,7 @@ public StateComputerLedger.StateComputerPrepareResult prepare( @Override public LedgerProofBundle commit( - LedgerExtension ledgerExtension, Option serializedVertexStoreState) { + LedgerExtension ledgerExtension, Option serializedVertexStoreState) { latestProof = new LedgerProofBundle( ledgerExtension.proof(), diff --git a/core/src/test-core/java/com/radixdlt/statecomputer/MockedStateComputerWithEpochs.java b/core/src/test-core/java/com/radixdlt/statecomputer/MockedStateComputerWithEpochs.java index d8659096be..f89f5312eb 100644 --- a/core/src/test-core/java/com/radixdlt/statecomputer/MockedStateComputerWithEpochs.java +++ b/core/src/test-core/java/com/radixdlt/statecomputer/MockedStateComputerWithEpochs.java @@ -80,6 +80,7 @@ import com.radixdlt.mempool.MempoolAdd; import com.radixdlt.p2p.NodeId; import com.radixdlt.transactions.RawNotarizedTransaction; +import com.radixdlt.utils.WrappedByteArray; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -147,7 +148,7 @@ public StateComputerPrepareResult prepare( @Override public LedgerProofBundle commit( - LedgerExtension ledgerExtension, Option serializedVertexStoreState) { + LedgerExtension ledgerExtension, Option serializedVertexStoreState) { return this.stateComputer.commit(ledgerExtension, serializedVertexStoreState); } } diff --git a/core/src/test-core/java/com/radixdlt/statecomputer/StatelessComputer.java b/core/src/test-core/java/com/radixdlt/statecomputer/StatelessComputer.java index 0cda0cb637..75bed7c9ab 100644 --- a/core/src/test-core/java/com/radixdlt/statecomputer/StatelessComputer.java +++ b/core/src/test-core/java/com/radixdlt/statecomputer/StatelessComputer.java @@ -81,6 +81,7 @@ import com.radixdlt.statecomputer.commit.CommitSummary; import com.radixdlt.transactions.RawNotarizedTransaction; import com.radixdlt.utils.UInt32; +import com.radixdlt.utils.WrappedByteArray; import java.util.ArrayList; import java.util.List; @@ -204,7 +205,7 @@ private LedgerUpdate generateLedgerUpdate(LedgerExtension ledgerExtension) { @Override public LedgerProofBundle commit( - LedgerExtension ledgerExtension, Option serializedVertexStoreState) { + LedgerExtension ledgerExtension, Option serializedVertexStoreState) { var ledgerUpdate = this.generateLedgerUpdate(ledgerExtension); ledgerUpdateDispatcher.dispatch(ledgerUpdate); return ledgerUpdate.committedProof(); diff --git a/core/src/test/java/com/radixdlt/consensus/epoch/EpochManagerTest.java b/core/src/test/java/com/radixdlt/consensus/epoch/EpochManagerTest.java index dcb005bfe4..d3e01fcd51 100644 --- a/core/src/test/java/com/radixdlt/consensus/epoch/EpochManagerTest.java +++ b/core/src/test/java/com/radixdlt/consensus/epoch/EpochManagerTest.java @@ -118,6 +118,7 @@ import com.radixdlt.utils.TimeSupplier; import com.radixdlt.utils.UInt192; import com.radixdlt.utils.UInt32; +import com.radixdlt.utils.WrappedByteArray; import java.util.List; import java.util.Optional; import java.util.function.Consumer; @@ -168,7 +169,7 @@ public StateComputerPrepareResult prepare( @Override public LedgerProofBundle commit( - LedgerExtension ledgerExtension, Option serializedVertexStoreState) { + LedgerExtension ledgerExtension, Option serializedVertexStoreState) { // No-op // `closestEpochProofOnOrBefore` isn't really correct here, but that's fine return new LedgerProofBundle( diff --git a/core/src/test/java/com/radixdlt/consensus/vertexstore/VertexStoreTest.java b/core/src/test/java/com/radixdlt/consensus/vertexstore/VertexStoreTest.java index 7044c938a4..1b301b2692 100644 --- a/core/src/test/java/com/radixdlt/consensus/vertexstore/VertexStoreTest.java +++ b/core/src/test/java/com/radixdlt/consensus/vertexstore/VertexStoreTest.java @@ -210,7 +210,7 @@ public void adding_a_qc_should_update_highest_qc() { // Act QuorumCertificate qc = vertices.get(1).vertex().getQCToParent(); - vertexStoreAdapter.insertQc(qc); + vertexStoreAdapter.insertQuorumCertificate(qc); // Assert assertThat(vertexStoreAdapter.highQC().highestQC()).isEqualTo(qc); @@ -278,7 +278,7 @@ removed from the vertex store (including their children). .getQCToParent(); // Act - final var insertQcResult = vertexStoreAdapter.insertQc(qcForVertexG); + final var insertQcResult = vertexStoreAdapter.insertQuorumCertificate(qcForVertexG); assertTrue(insertQcResult instanceof VertexStore.InsertQcResult.Inserted); assertEquals(vertexStoreAdapter.getRoot().hash(), vertexD.hash()); @@ -318,7 +318,7 @@ public void adding_a_qc_with_commit_should_commit_vertices_to_ledger() { // Act QuorumCertificate qc = vertices.get(3).vertex().getQCToParent(); - final var insertQcResult = vertexStoreAdapter.insertQc(qc); + final var insertQcResult = vertexStoreAdapter.insertQuorumCertificate(qc); // Assert assertTrue(insertQcResult instanceof VertexStore.InsertQcResult.Inserted); @@ -359,7 +359,7 @@ public void adding_a_qc_which_needs_sync_should_return_a_matching_result() { // Act QuorumCertificate qc = this.nextVertex.get().vertex().getQCToParent(); - final var insertQcResult = vertexStoreAdapter.insertQc(qc); + final var insertQcResult = vertexStoreAdapter.insertQuorumCertificate(qc); // Assert assertTrue(insertQcResult instanceof VertexStore.InsertQcResult.VertexIsMissing);