Skip to content

Commit

Permalink
Caplin: under the hood block downloading (erigontech#8459)
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 authored Oct 16, 2023
1 parent f265bd8 commit 9e42b70
Show file tree
Hide file tree
Showing 25 changed files with 427 additions and 293 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ COPY --from=builder /app/build/bin/sentry /usr/local/bin/sentry
COPY --from=builder /app/build/bin/state /usr/local/bin/state
COPY --from=builder /app/build/bin/txpool /usr/local/bin/txpool
COPY --from=builder /app/build/bin/verkle /usr/local/bin/verkle
COPY --from=builder /app/build/bin/caplin-phase1 /usr/local/bin/caplin-phase1
COPY --from=builder /app/build/bin/caplin /usr/local/bin/caplin
COPY --from=builder /app/build/bin/caplin-regression /usr/local/bin/caplin-regression


Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.debian
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ COPY --from=builder /app/build/bin/sentry /usr/local/bin/sentry
COPY --from=builder /app/build/bin/state /usr/local/bin/state
COPY --from=builder /app/build/bin/txpool /usr/local/bin/txpool
COPY --from=builder /app/build/bin/verkle /usr/local/bin/verkle
COPY --from=builder /app/build/bin/caplin-phase1 /usr/local/bin/caplin-phase1
COPY --from=builder /app/build/bin/caplin /usr/local/bin/caplin
COPY --from=builder /app/build/bin/caplin-regression /usr/local/bin/caplin-regression

EXPOSE 8545 \
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ COMMANDS += txpool
COMMANDS += verkle
COMMANDS += evm
COMMANDS += sentinel
COMMANDS += caplin-phase1
COMMANDS += caplin
COMMANDS += caplin-regression


Expand Down
4 changes: 4 additions & 0 deletions cl/persistence/block_saver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (m *mockEngine) ForkChoiceUpdate(finalized libcommon.Hash, head libcommon.H
panic("unimplemented")
}

func (m *mockEngine) FrozenBlocks() uint64 {
panic("unimplemented")
}

func (m *mockEngine) NewPayload(payload *cltypes.Eth1Block, beaconParentRoot *libcommon.Hash) (bool, error) {
panic("unimplemented")
}
Expand Down
49 changes: 26 additions & 23 deletions cl/persistence/snapshot_format/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"

"github.com/golang/snappy"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
)
Expand Down Expand Up @@ -40,11 +41,11 @@ func writeExecutionBlockPtr(w io.Writer, p *cltypes.Eth1Block) error {
temp := make([]byte, 8)
binary.BigEndian.PutUint64(temp, p.BlockNumber)

return writeChunk(w, temp, pointerDataType, false)
return writeChunk(w, temp, pointerDataType)
}

func readExecutionBlockPtr(r io.Reader) (uint64, error) {
b, dT, err := readChunk(r, false)
b, dT, err := readChunk(r)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -90,11 +91,15 @@ func WriteBlockForSnapshot(block *cltypes.SignedBeaconBlock, w io.Writer) error
currentChunkLength += uint64(body.AttesterSlashings.EncodingSizeSSZ())

// Write the chunk and chunk attestations
if err := writeChunk(w, encoded[:currentChunkLength], chunkDataType, false); err != nil {
if err := writeChunk(w, encoded[:currentChunkLength], chunkDataType); err != nil {
return err
}
encoded = encoded[currentChunkLength:]
if err := writeChunk(w, encoded[:uint64(body.Attestations.EncodingSizeSSZ())], chunkDataType, true); err != nil {
snappyWriter := snappy.NewBufferedWriter(w)
if err := writeChunk(snappyWriter, encoded[:uint64(body.Attestations.EncodingSizeSSZ())], chunkDataType); err != nil {
return err
}
if err := snappyWriter.Close(); err != nil {
return err
}
encoded = encoded[body.Attestations.EncodingSizeSSZ():]
Expand All @@ -103,7 +108,7 @@ func WriteBlockForSnapshot(block *cltypes.SignedBeaconBlock, w io.Writer) error
currentChunkLength += uint64(body.Deposits.EncodingSizeSSZ())
currentChunkLength += uint64(body.VoluntaryExits.EncodingSizeSSZ())

if err := writeChunk(w, encoded[:currentChunkLength], chunkDataType, false); err != nil {
if err := writeChunk(w, encoded[:currentChunkLength], chunkDataType); err != nil {
return err
}
// we are done if we are before altair
Expand All @@ -117,42 +122,48 @@ func WriteBlockForSnapshot(block *cltypes.SignedBeaconBlock, w io.Writer) error
if version <= clparams.BellatrixVersion {
return nil
}
return writeChunk(w, encoded, chunkDataType, false)
return writeChunk(w, encoded, chunkDataType)
}

func ReadBlockFromrSnapshot(r io.Reader, executionReader ExecutionBlockReaderByNumber, cfg *clparams.BeaconChainConfig) (*cltypes.SignedBeaconBlock, error) {
func ReadBlockFromSnapshot(r io.Reader, executionReader ExecutionBlockReaderByNumber, cfg *clparams.BeaconChainConfig) (*cltypes.SignedBeaconBlock, error) {
plainSSZ := []byte{}

block := cltypes.NewSignedBeaconBlock(cfg)
// Metadata section is just the current hardfork of the block. TODO(give it a useful purpose)
v, err := readMetadataForBlock(r)
if err != nil {
return nil, err
}

// Read the first chunk
chunk1, dT1, err := readChunk(r, false)
chunk1, dT1, err := readChunk(r)
if err != nil {
return nil, err
}
if dT1 != chunkDataType {
return nil, fmt.Errorf("malformed beacon block, invalid chunk 1 type %d, expected: %d", dT1, chunkDataType)
}
plainSSZ = append(plainSSZ, chunk1...)
// Read the attestation chunk (2nd chunk)
chunk2, dT2, err := readChunk(r, true)
chunk2, dT2, err := readChunk(snappy.NewReader(r))
if err != nil {
return nil, err
}
if dT2 != chunkDataType {
return nil, fmt.Errorf("malformed beacon block, invalid chunk 2 type %d, expected: %d", dT2, chunkDataType)
}
plainSSZ = append(plainSSZ, chunk2...)
// Read the 3rd chunk
chunk3, dT3, err := readChunk(r, false)
chunk3, dT3, err := readChunk(r)
if err != nil {
return nil, err
}
if dT3 != chunkDataType {
return nil, fmt.Errorf("malformed beacon block, invalid chunk 3 type %d, expected: %d", dT3, chunkDataType)
}
plainSSZ = append(plainSSZ, chunk3...)
if v <= clparams.AltairVersion {
return blockFromChunks(v, cfg, chunk1, chunk2, chunk3)
return block, block.DecodeSSZ(plainSSZ, int(v))
}
// Read the block pointer and retrieve chunk4 from the execution reader
blockPointer, err := readExecutionBlockPtr(r)
Expand All @@ -168,28 +179,20 @@ func ReadBlockFromrSnapshot(r io.Reader, executionReader ExecutionBlockReaderByN
if err != nil {
return nil, err
}
plainSSZ = append(plainSSZ, chunk4...)
if v <= clparams.BellatrixVersion {
return blockFromChunks(v, cfg, chunk1, chunk2, chunk3, chunk4)
return block, block.DecodeSSZ(plainSSZ, int(v))
}

// Read the 5h chunk
chunk5, dT5, err := readChunk(r, false)
chunk5, dT5, err := readChunk(r)
if err != nil {
return nil, err
}
if dT5 != chunkDataType {
return nil, fmt.Errorf("malformed beacon block, invalid chunk 5 type %d, expected: %d", dT5, chunkDataType)
}
plainSSZ = append(plainSSZ, chunk5...)

return blockFromChunks(v, cfg, chunk1, chunk2, chunk3, chunk4, chunk5)
}

func blockFromChunks(v clparams.StateVersion, cfg *clparams.BeaconChainConfig, chunks ...[]byte) (*cltypes.SignedBeaconBlock, error) {
block := cltypes.NewSignedBeaconBlock(cfg)
plainSSZ := []byte{}
for _, chunk := range chunks {
plainSSZ = append(plainSSZ, chunk...)
}
return block, block.DecodeSSZ(plainSSZ, int(v))

}
2 changes: 1 addition & 1 deletion cl/persistence/snapshot_format/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestBlockSnapshotEncoding(t *testing.T) {
}
var b bytes.Buffer
require.NoError(t, snapshot_format.WriteBlockForSnapshot(blk, &b))
blk2, err := snapshot_format.ReadBlockFromrSnapshot(&b, &br, &clparams.MainnetBeaconConfig)
blk2, err := snapshot_format.ReadBlockFromSnapshot(&b, &br, &clparams.MainnetBeaconConfig)
require.NoError(t, err)
_ = blk2
hash1, err := blk.HashSSZ()
Expand Down
24 changes: 9 additions & 15 deletions cl/persistence/snapshot_format/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"

"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/utils"
)

type dataType int
Expand All @@ -17,10 +16,7 @@ const (
)

// writeChunk writes a chunk to the writer.
func writeChunk(w io.Writer, buf []byte, t dataType, snappy bool) error {
if snappy {
buf = utils.CompressSnappy(buf)
}
func writeChunk(w io.Writer, buf []byte, t dataType) error {
// prefix is type of chunk + length of chunk
prefix := make([]byte, 8)
binary.BigEndian.PutUint64(prefix, uint64(len(buf)))
Expand All @@ -34,22 +30,20 @@ func writeChunk(w io.Writer, buf []byte, t dataType, snappy bool) error {
return nil
}

func readChunk(r io.Reader, snappy bool) (buf []byte, t dataType, err error) {
func readChunk(r io.Reader) (buf []byte, t dataType, err error) {
prefix := make([]byte, 8)
if _, err := r.Read(prefix); err != nil {
return nil, dataType(0), err
fmt.Println("A")
if _, err = r.Read(prefix); err != nil {
return
}

t = dataType(prefix[0])
prefix[0] = 0
fmt.Println(binary.BigEndian.Uint64(prefix))
buf = make([]byte, binary.BigEndian.Uint64(prefix))
if _, err := r.Read(buf); err != nil {
return nil, t, err
}
if snappy {
buf, err = utils.DecompressSnappy(buf)
if _, err = r.Read(buf); err != nil {
return
}
return buf, t, err
return
}

func readMetadataForBlock(r io.Reader) (clparams.StateVersion, error) {
Expand Down
4 changes: 4 additions & 0 deletions cl/phase1/execution_client/execution_client_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,7 @@ func (cc *ExecutionClientDirect) GetBodiesByRange(start, count uint64) ([]*types
func (cc *ExecutionClientDirect) GetBodiesByHashes(hashes []libcommon.Hash) ([]*types.RawBody, error) {
return cc.chainRW.GetBodiesByHases(hashes), nil
}

func (cc *ExecutionClientDirect) FrozenBlocks() uint64 {
return cc.chainRW.FrozenBlocks()
}
4 changes: 4 additions & 0 deletions cl/phase1/execution_client/execution_client_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,7 @@ func (cc *ExecutionClientRpc) GetBodiesByHashes(hashes []libcommon.Hash) ([]*typ
}
return ret, nil
}

func (cc *ExecutionClientRpc) FrozenBlocks() uint64 {
panic("unimplemented")
}
2 changes: 2 additions & 0 deletions cl/phase1/execution_client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ type ExecutionEngine interface {
// Range methods
GetBodiesByRange(start, count uint64) ([]*types.RawBody, error)
GetBodiesByHashes(hashes []libcommon.Hash) ([]*types.RawBody, error)
// Snapshots
FrozenBlocks() uint64
}
2 changes: 1 addition & 1 deletion cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (g *GossipManager) onRecv(ctx context.Context, data *sentinel.GossipData, l
if err := operationsContract[*cltypes.AttesterSlashing](ctx, g, l, data, int(version), "attester slashing", g.forkChoice.OnAttesterSlashing); err != nil {
return err
}
case sentinel.GossipType_BlsToExecutionChangeType:
case sentinel.GossipType_BlsToExecutionChangeGossipType:
if err := operationsContract[*cltypes.SignedBLSToExecutionChange](ctx, g, l, data, int(version), "bls to execution change", g.forkChoice.OnBlsToExecutionChange); err != nil {
return err
}
Expand Down
21 changes: 12 additions & 9 deletions cl/phase1/stages/clstages.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type Args struct {

targetEpoch, seenEpoch uint64
targetSlot, seenSlot uint64

downloadedHistory bool
}

func ClStagesCfg(
Expand Down Expand Up @@ -93,11 +95,11 @@ const (
minPeersForDownload = uint64(4)
)

func MetaCatchingUp(args Args, hasDownloaded bool) StageName {
func MetaCatchingUp(args Args) StageName {
if args.peers < minPeersForDownload {
return WaitForPeers
}
if !hasDownloaded {
if !args.downloadedHistory {
return DownloadHistoricalBlocks
}
if args.seenEpoch < args.targetEpoch {
Expand Down Expand Up @@ -188,6 +190,7 @@ func ConsensusClStages(ctx context.Context,
log.Error("failed to get sentinel peer count", "err", err)
args.peers = 0
}
args.downloadedHistory = downloaded
args.seenSlot = cfg.forkChoice.HighestSeen()
args.seenEpoch = args.seenSlot / cfg.beaconCfg.SlotsPerEpoch
args.targetSlot = utils.GetCurrentSlot(cfg.genesisCfg.GenesisTime, cfg.beaconCfg.SecondsPerSlot)
Expand All @@ -199,7 +202,7 @@ func ConsensusClStages(ctx context.Context,
WaitForPeers: {
Description: `wait for enough peers. This is also a safe stage to go to when unsure of what stage to use`,
TransitionFunc: func(cfg *Cfg, args Args, err error) string {
if x := MetaCatchingUp(args, downloaded); x != "" {
if x := MetaCatchingUp(args); x != "" {
return x
}
return CatchUpBlocks
Expand Down Expand Up @@ -232,7 +235,7 @@ func ConsensusClStages(ctx context.Context,
DownloadHistoricalBlocks: {
Description: "Download historical blocks",
TransitionFunc: func(cfg *Cfg, args Args, err error) string {
if x := MetaCatchingUp(args, downloaded); x != "" {
if x := MetaCatchingUp(args); x != "" {
return x
}
return CatchUpBlocks
Expand All @@ -256,7 +259,7 @@ func ConsensusClStages(ctx context.Context,
CatchUpEpochs: {
Description: `if we are 1 or more epochs behind, we download in parallel by epoch`,
TransitionFunc: func(cfg *Cfg, args Args, err error) string {
if x := MetaCatchingUp(args, downloaded); x != "" {
if x := MetaCatchingUp(args); x != "" {
return x
}
return CatchUpBlocks
Expand Down Expand Up @@ -321,7 +324,7 @@ func ConsensusClStages(ctx context.Context,
CatchUpBlocks: {
Description: `if we are within the epoch but not at head, we run catchupblocks`,
TransitionFunc: func(cfg *Cfg, args Args, err error) string {
if x := MetaCatchingUp(args, downloaded); x != "" {
if x := MetaCatchingUp(args); x != "" {
return x
}
return ForkChoice
Expand Down Expand Up @@ -378,7 +381,7 @@ func ConsensusClStages(ctx context.Context,
Description: `fork choice stage. We will send all fork choise things here
also, we will wait up to delay seconds to deal with attestations + side forks`,
TransitionFunc: func(cfg *Cfg, args Args, err error) string {
if x := MetaCatchingUp(args, downloaded); x != "" {
if x := MetaCatchingUp(args); x != "" {
return x
}
return ListenForForks
Expand Down Expand Up @@ -466,7 +469,7 @@ func ConsensusClStages(ctx context.Context,
defer func() {
shouldForkChoiceSinceReorg = false
}()
if x := MetaCatchingUp(args, downloaded); x != "" {
if x := MetaCatchingUp(args); x != "" {
return x
}
if shouldForkChoiceSinceReorg {
Expand Down Expand Up @@ -512,7 +515,7 @@ func ConsensusClStages(ctx context.Context,
CleanupAndPruning: {
Description: `cleanup and pruning is done here`,
TransitionFunc: func(cfg *Cfg, args Args, err error) string {
if x := MetaCatchingUp(args, downloaded); x != "" {
if x := MetaCatchingUp(args); x != "" {
return x
}
return SleepForSlot
Expand Down
Loading

0 comments on commit 9e42b70

Please sign in to comment.