Skip to content

Commit

Permalink
Address code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
LukasGasior1 committed Jun 12, 2024
1 parent b707db8 commit e791b13
Show file tree
Hide file tree
Showing 16 changed files with 107 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import com.radixdlt.rev2.REv2StateComputer;
import com.radixdlt.rev2.REv2TransactionsAndProofReader;
import com.radixdlt.transactions.RawNotarizedTransaction;
import com.radixdlt.utils.WrappedByteArray;
import java.util.List;
import java.util.Random;
import java.util.function.Consumer;
Expand Down Expand Up @@ -195,7 +196,8 @@ public StateComputerLedger.StateComputerPrepareResult prepare(

@Override
public LedgerProofBundle commit(
LedgerExtension ledgerExtension, Option<byte[]> serializedVertexStoreState) {
LedgerExtension ledgerExtension,
Option<WrappedByteArray> serializedVertexStoreState) {
return underlyingStateComputer.commit(ledgerExtension, serializedVertexStoreState);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import com.radixdlt.consensus.vertexstore.ExecutedVertex;
import com.radixdlt.lang.Option;
import com.radixdlt.utils.WrappedByteArray;
import java.util.AbstractCollection;

/**
* An event emitted when vertex store updates its highQC, which possibly results in some vertices
Expand All @@ -79,4 +80,15 @@ public record BFTHighQCUpdate(
HighQC newHighQc,
Option<ImmutableList<ExecutedVertex>> committedVertices,
WrappedByteArray serializedVertexStoreState)
implements LocalEvent {}
implements LocalEvent {

@Override
public String toString() {
return String.format(
"%s[newHighQc=%s numCommittedVertices=%s serializedVertexStoreStateSize=%s]",
getClass().getSimpleName(),
newHighQc,
committedVertices.map(AbstractCollection::size).orElse(0),
serializedVertexStoreState.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,11 @@
/** An event emitted after a vertex has been inserted into the vertex store. */
public record BFTInsertUpdate(
ExecutedVertex insertedVertex, WrappedByteArray serializedVertexStoreState)
implements LocalEvent {}
implements LocalEvent {
@Override
public String toString() {
return String.format(
"%s[insertedVertex=%s serializedVertexStoreStateSize=%s]",
getClass().getSimpleName(), insertedVertex(), serializedVertexStoreState.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,12 @@
/** An even emitted when the vertex store has been rebuilt. */
public record BFTRebuildUpdate(
VertexStoreState vertexStoreState, WrappedByteArray serializedVertexStoreState)
implements LocalEvent {}
implements LocalEvent {

@Override
public String toString() {
return String.format(
"%s[serializedVertexStoreStateSize=%s]",
getClass().getSimpleName(), serializedVertexStoreState.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public SyncResult syncToQC(HighQC highQC, @Nullable NodeId author, HighQcSource
return SyncResult.INVALID;
}

return switch (vertexStore.insertQc(qc)) {
return switch (vertexStore.insertQuorumCertificate(qc)) {
case VertexStore.InsertQcResult.Inserted ignored -> {
// QC was inserted, try TC too (as it can be higher), and then process a new highQC
highQC.highestTC().map(vertexStore::insertTimeoutCertificate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,15 @@ public boolean insertTimeoutCertificate(TimeoutCertificate timeoutCertificate) {
};
}

public VertexStore.InsertQcResult insertQc(QuorumCertificate qc) {
public VertexStore.InsertQcResult insertQuorumCertificate(QuorumCertificate qc) {
final var result = vertexStore.insertQc(qc);
if (result instanceof VertexStore.InsertQcResult.Inserted inserted) {
dispatchPostQcInsertionEvents(inserted);
switch (result) {
case VertexStore.InsertQcResult.Inserted inserted -> this.highQCUpdateDispatcher.dispatch(
new BFTHighQCUpdate(
inserted.newHighQc(),
inserted.committedUpdate().map(VertexStore.CommittedUpdate::committedVertices),
inserted.serializedVertexStoreState()));
default -> {} // no-op
}
return result;
}
Expand All @@ -151,18 +156,21 @@ public List<ExecutedVertex> getPathFromRoot(HashCode vertexHash) {

public void insertVertexChain(VertexChain vertexChain) {
final var result = vertexStore.insertVertexChain(vertexChain);
result.insertedQcs().forEach(this::dispatchPostQcInsertionEvents);
result
.insertedQcs()
.forEach(
inserted -> {
this.highQCUpdateDispatcher.dispatch(
new BFTHighQCUpdate(
inserted.newHighQc(),
inserted
.committedUpdate()
.map(VertexStore.CommittedUpdate::committedVertices),
inserted.serializedVertexStoreState()));
});
result.insertUpdates().forEach(bftUpdateDispatcher::dispatch);
}

private void dispatchPostQcInsertionEvents(VertexStore.InsertQcResult.Inserted inserted) {
this.highQCUpdateDispatcher.dispatch(
new BFTHighQCUpdate(
inserted.newHighQc(),
inserted.committedUpdate().map(VertexStore.CommittedUpdate::committedVertices),
inserted.serializedVertexStoreState()));
}

public Optional<ImmutableList<VertexWithHash>> getVertices(HashCode vertexId, int count) {
return vertexStore.getVertices(vertexId, count).toOptional();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,10 @@ private void removeVertexAndPruneInternal(HashCode vertexId, Optional<HashCode>
return;
}

final var children = vertexChildren.removeAll(vertexId);
// Note that we need a copy of the children list here (.stream().toList()) - that is because
// we're removing the items in the loop (otherwise, this could lead to
// ConcurrentModificationException)
final var children = vertexChildren.get(vertexId).stream().toList();
for (HashCode child : children) {
if (!Optional.of(child).equals(skip)) {
removeVertexAndPruneInternal(child, Optional.empty());
Expand Down
62 changes: 32 additions & 30 deletions core/src/main/java/com/radixdlt/ledger/StateComputerLedger.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import com.radixdlt.transactions.RawLedgerTransaction;
import com.radixdlt.transactions.RawNotarizedTransaction;
import com.radixdlt.utils.TimeSupplier;
import com.radixdlt.utils.WrappedByteArray;
import java.util.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -162,7 +163,7 @@ StateComputerPrepareResult prepare(
RoundDetails roundDetails);

LedgerProofBundle commit(
LedgerExtension ledgerExtension, Option<byte[]> serializedVertexStoreState);
LedgerExtension ledgerExtension, Option<WrappedByteArray> serializedVertexStoreState);
}

private final StateComputer stateComputer;
Expand Down Expand Up @@ -301,40 +302,21 @@ public EventProcessor<BFTHighQCUpdate> bftHighQcUpdateEventProcessor() {
.ifPresent(
committedVertices -> {
updateCommittedVerticesMetrics(committedVertices);

final ImmutableList<RawLedgerTransaction> transactions =
committedVertices.stream()
.flatMap(ExecutedVertex::successfulTransactions)
.map(ExecutedTransaction::transaction)
.collect(ImmutableList.toImmutableList());

highQcUpdate
.newHighQc()
.highestCommittedQC()
.getProcessedCommit(hasher)
.ifPresentOrElse(
processedQcCommit -> {
switch (processedQcCommit) {
case ProcessedQcCommit.OfConensusQc ofConensusQc -> {
final var ledgerExtension =
new LedgerExtension(transactions, ofConensusQc.ledgerProof());
metrics
.ledger()
.commit()
.measure(
() ->
this.commit(
ledgerExtension,
Option.some(
highQcUpdate
.serializedVertexStoreState()
.value())));
}
case ProcessedQcCommit.OfInitialEpochQc ofInitialEpochQc -> {
// no-op, ignore
this.metrics.ledger().ignoredBftCommittedUpdates().inc();
}
}
final var transactions =
committedVertices.stream()
.flatMap(ExecutedVertex::successfulTransactions)
.map(ExecutedTransaction::transaction)
.collect(ImmutableList.toImmutableList());
processQcCommit(
processedQcCommit,
transactions,
highQcUpdate.serializedVertexStoreState());
},
() -> {
// no-op, ignore
Expand All @@ -344,6 +326,25 @@ public EventProcessor<BFTHighQCUpdate> bftHighQcUpdateEventProcessor() {
};
}

private void processQcCommit(
ProcessedQcCommit processedQcCommit,
ImmutableList<RawLedgerTransaction> transactions,
WrappedByteArray serializedVertexStoreState) {
switch (processedQcCommit) {
case ProcessedQcCommit.OfConensusQc ofConensusQc -> {
final var ledgerExtension = new LedgerExtension(transactions, ofConensusQc.ledgerProof());
metrics
.ledger()
.commit()
.measure(() -> this.commit(ledgerExtension, Option.some(serializedVertexStoreState)));
}
case ProcessedQcCommit.OfInitialEpochQc ofInitialEpochQc -> {
// no-op, ignore
this.metrics.ledger().ignoredBftCommittedUpdates().inc();
}
}
}

private void updateCommittedVerticesMetrics(ImmutableList<ExecutedVertex> committedVertices) {
final var numCommittedFallbackVertices =
committedVertices.stream().filter(v -> v.vertex().isFallback()).count();
Expand Down Expand Up @@ -396,7 +397,8 @@ public EventProcessor<LedgerExtension> syncEventProcessor() {
* InvalidCommitRequestException} is propagated from the Rust state computer.
* </ul>
*/
private void commit(LedgerExtension ledgerExtension, Option<byte[]> serializedVertexStoreState) {
private void commit(
LedgerExtension ledgerExtension, Option<WrappedByteArray> serializedVertexStoreState) {
final var proofToCommit = ledgerExtension.proof();

final int extensionTransactionCount; // for metrics purposes only
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/com/radixdlt/rev2/REv2StateComputer.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import com.radixdlt.transactions.PreparedNotarizedTransaction;
import com.radixdlt.transactions.RawNotarizedTransaction;
import com.radixdlt.utils.UInt64;
import com.radixdlt.utils.WrappedByteArray;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -283,15 +284,15 @@ public StateComputerLedger.StateComputerPrepareResult prepare(

@Override
public LedgerProofBundle commit(
LedgerExtension ledgerExtension, Option<byte[]> serializedVertexStoreState) {
LedgerExtension ledgerExtension, Option<WrappedByteArray> serializedVertexStoreState) {
final var proof = ledgerExtension.proof();
final var header = proof.ledgerHeader();

var commitRequest =
new CommitRequest(
ledgerExtension.transactions(),
proof,
serializedVertexStoreState,
serializedVertexStoreState.map(WrappedByteArray::value),
Option.from(selfValidatorId));

final var result = stateComputer.commit(commitRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,6 @@ public Observable<TestInvariantError> check(RunningNetwork network) {
emitter.onNext(highQCUpdate.newHighQc().highestQC());
},
BFTHighQCUpdate.class);
nodeEvents.addListener(
(node, highQcUpdate) -> {
emitter.onNext(highQcUpdate.newHighQc().highestQC());
},
BFTHighQCUpdate.class);
})
.serialize()
.map(QuorumCertificate::getProposedHeader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import com.radixdlt.p2p.NodeId;
import com.radixdlt.targeted.mempool.SimpleMempool;
import com.radixdlt.transactions.RawNotarizedTransaction;
import com.radixdlt.utils.WrappedByteArray;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -137,7 +138,7 @@ public StateComputerPrepareResult prepare(

@Override
public LedgerProofBundle commit(
LedgerExtension ledgerExtension, Option<byte[]> serializedVertexStoreState) {
LedgerExtension ledgerExtension, Option<WrappedByteArray> serializedVertexStoreState) {
final var proof = this.stateComputer.commit(ledgerExtension, serializedVertexStoreState);
this.mempool.handleTransactionsCommitted(
ledgerExtension.transactions().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import com.radixdlt.statecomputer.commit.CommitSummary;
import com.radixdlt.transactions.RawNotarizedTransaction;
import com.radixdlt.utils.UInt32;
import com.radixdlt.utils.WrappedByteArray;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -125,7 +126,7 @@ public StateComputerLedger.StateComputerPrepareResult prepare(

@Override
public LedgerProofBundle commit(
LedgerExtension ledgerExtension, Option<byte[]> serializedVertexStoreState) {
LedgerExtension ledgerExtension, Option<WrappedByteArray> serializedVertexStoreState) {
latestProof =
new LedgerProofBundle(
ledgerExtension.proof(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import com.radixdlt.mempool.MempoolAdd;
import com.radixdlt.p2p.NodeId;
import com.radixdlt.transactions.RawNotarizedTransaction;
import com.radixdlt.utils.WrappedByteArray;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -147,7 +148,7 @@ public StateComputerPrepareResult prepare(

@Override
public LedgerProofBundle commit(
LedgerExtension ledgerExtension, Option<byte[]> serializedVertexStoreState) {
LedgerExtension ledgerExtension, Option<WrappedByteArray> serializedVertexStoreState) {
return this.stateComputer.commit(ledgerExtension, serializedVertexStoreState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import com.radixdlt.statecomputer.commit.CommitSummary;
import com.radixdlt.transactions.RawNotarizedTransaction;
import com.radixdlt.utils.UInt32;
import com.radixdlt.utils.WrappedByteArray;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -204,7 +205,7 @@ private LedgerUpdate generateLedgerUpdate(LedgerExtension ledgerExtension) {

@Override
public LedgerProofBundle commit(
LedgerExtension ledgerExtension, Option<byte[]> serializedVertexStoreState) {
LedgerExtension ledgerExtension, Option<WrappedByteArray> serializedVertexStoreState) {
var ledgerUpdate = this.generateLedgerUpdate(ledgerExtension);
ledgerUpdateDispatcher.dispatch(ledgerUpdate);
return ledgerUpdate.committedProof();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import com.radixdlt.utils.TimeSupplier;
import com.radixdlt.utils.UInt192;
import com.radixdlt.utils.UInt32;
import com.radixdlt.utils.WrappedByteArray;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
Expand Down Expand Up @@ -168,7 +169,7 @@ public StateComputerPrepareResult prepare(

@Override
public LedgerProofBundle commit(
LedgerExtension ledgerExtension, Option<byte[]> serializedVertexStoreState) {
LedgerExtension ledgerExtension, Option<WrappedByteArray> serializedVertexStoreState) {
// No-op
// `closestEpochProofOnOrBefore` isn't really correct here, but that's fine
return new LedgerProofBundle(
Expand Down
Loading

0 comments on commit e791b13

Please sign in to comment.