From 7c4b57d5c64fb400af7290360e00eb9677316a2b Mon Sep 17 00:00:00 2001 From: sweexordious Date: Thu, 4 Jul 2024 11:01:07 +0100 Subject: [PATCH] fix: working listener + more control --- replay/replayer.go | 91 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 83 insertions(+), 8 deletions(-) diff --git a/replay/replayer.go b/replay/replayer.go index 12a7e45..49716e8 100644 --- a/replay/replayer.go +++ b/replay/replayer.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/ecdsa" + "encoding/hex" "fmt" "time" @@ -27,6 +28,9 @@ func Follow( targetBlobstreamContractAddress string, targetChainGatewayAddress string, privateKey *ecdsa.PrivateKey, + headerRangeFunctionID [32]byte, + nextHeaderFunctionID [32]byte, + filterRange int64, ) error { logger.Info("listening for new proofs on the source chain") sourceBlobstreamX, err := blobstreamxwrapper.NewBlobstreamX(ethcmn.HexToAddress(sourceBlobstreamContractAddress), sourceEVMClient) @@ -69,10 +73,33 @@ func Follow( continue } else if event.StartBlock > latestTargetContractBlock { logger.Info("the target contract needs to catchup", "event_start_block", event.StartBlock, "target_contract_latest_block", latestTargetContractBlock) - err = Catchup(ctx, logger, verify, trpc, sourceEVMClient, targetEVMClient, sourceBlobstreamContractAddress, targetBlobstreamContractAddress, targetChainGatewayAddress, privateKey) + err = Catchup( + ctx, + logger, + verify, + trpc, + sourceEVMClient, + targetEVMClient, + sourceBlobstreamContractAddress, + targetBlobstreamContractAddress, + targetChainGatewayAddress, + privateKey, + headerRangeFunctionID, + nextHeaderFunctionID, + filterRange, + ) if err != nil { return err } + latestTargetContractBlock, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + if event.EndBlock == latestTargetContractBlock { + // the contract is already up to date + logger.Info("contract up to date", "target_contract_latest_block", event.EndBlock) + continue + } } logger.Debug("getting transaction containing the proof", "nonce", event.ProofNonce.Int64(), "hash", event.Raw.TxHash.Hex(), "start_block", event.StartBlock) tx, _, err := sourceEVMClient.TransactionByHash(ctx, event.Raw.TxHash) @@ -82,7 +109,8 @@ func Follow( logger.Debug("decoding the proof") rawMap := make(map[string]interface{}) - err = abi.UnpackIntoMap(rawMap, "fulfillCall", tx.Data()[4:]) + inputArgs := abi.Methods["fulfillCall"].Inputs + err = inputArgs.UnpackIntoMap(rawMap, tx.Data()[4:]) if err != nil { return err } @@ -94,6 +122,13 @@ func Follow( // update the address to be the target blobstreamX contract for the callback decodedArgs.CallbackAddress = ethcmn.HexToAddress(targetBlobstreamContractAddress) + if event.EndBlock-event.StartBlock > 1 { + // this is a header range proof + decodedArgs.FunctionID = headerRangeFunctionID + } else { + // this is a next header proof + decodedArgs.FunctionID = nextHeaderFunctionID + } logger.Info("replaying the proof", "nonce", event.ProofNonce.Int64()) opts, err := newTransactOptsBuilder(privateKey)(ctx, targetEVMClient, 25000000) @@ -130,9 +165,10 @@ func Catchup( targetBlobstreamContractAddress string, targetChainGatewayAddress string, privateKey *ecdsa.PrivateKey, + headerRangeFunctionID [32]byte, + nextHeaderFunctionID [32]byte, + filterRange int64, ) error { - filterRange := int64(5000) - lookupStartHeight, err := sourceEVMClient.BlockNumber(ctx) if err != nil { return err @@ -165,7 +201,6 @@ func Catchup( return err } - // TODO: this could be improved in the future to only get the events needed dataCommitmentEvents, err := getAllDataCommitmentStoredEvents( ctx, logger, @@ -173,6 +208,7 @@ func Catchup( int64(lookupStartHeight), filterRange, latestSourceContractNonce.Int64(), + int64(latestTargetContractBlock), ) if err != nil { return err @@ -202,7 +238,19 @@ func Catchup( if bytes.Equal(coreDataCommitment.DataCommitment.Bytes(), event.DataCommitment[:]) { logger.Info("data commitment verified") } else { - logger.Error("data commitment mismatch!! quitting", "proof_nonce_in_source_contract", event.ProofNonce, "start_block", event.StartBlock, "end_block", event.EndBlock) + logger.Error( + "data commitment mismatch!! quitting", + "proof_nonce_in_source_contract", + event.ProofNonce, + "start_block", + event.StartBlock, + "end_block", + event.EndBlock, + "expected_data_commitment", + hex.EncodeToString(coreDataCommitment.DataCommitment.Bytes()), + "actual_data_commitment", + hex.EncodeToString(event.DataCommitment[:]), + ) return fmt.Errorf("data commitment mistmatch. start height %d end height %d", event.StartBlock, event.EndBlock) } } @@ -243,6 +291,14 @@ func Catchup( // update the address to be the target blobstreamX contract for the callback decodedArgs.CallbackAddress = ethcmn.HexToAddress(targetBlobstreamContractAddress) + if event.EndBlock-event.StartBlock > 1 { + // this is a header range proof + decodedArgs.FunctionID = headerRangeFunctionID + } else { + // this is a next header proof + decodedArgs.FunctionID = nextHeaderFunctionID + } + logger.Info("replaying the proof", "startHeight", startHeight) opts, err := newTransactOptsBuilder(privateKey)(ctx, targetEVMClient, 25000000) if err != nil { @@ -262,7 +318,17 @@ func Catchup( if err != nil { return err } - startHeight = event.EndBlock + // make sure the contract was updated + latestTargetContractBlock, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) + if err != nil { + return err + } + if latestTargetContractBlock == event.EndBlock { + // contract updated successfully, we can advance + startHeight = event.EndBlock + } else { + logger.Error("contract did not update successfully, retrying the same proof", "expected_target_height", event.EndBlock, "actual_target_height", latestTargetContractBlock) + } } latestTargetContractBlock, err = targetBlobstreamX.LatestBlock(&bind.CallOpts{Context: ctx}) @@ -270,7 +336,7 @@ func Catchup( return err } - logger.Info("contract up to date", "latest_nonce", latestTargetContractBlock) + logger.Info("contract up to date", "latest_target_contract_block", latestTargetContractBlock) return nil } @@ -281,6 +347,7 @@ func getAllDataCommitmentStoredEvents( lookupStartHeight int64, filterRange int64, latestSourceContractNonce int64, + latestTargetContractBlock int64, ) (map[int64]blobstreamxwrapper.BlobstreamXDataCommitmentStored, error) { logger.Info("querying all the data commitment stored events in the source contract...") dataCommitmentEvents := make(map[int64]blobstreamxwrapper.BlobstreamXDataCommitmentStored) @@ -302,6 +369,7 @@ func getAllDataCommitmentStoredEvents( return nil, err } + gatheredTheNecessaryEvents := false for { if events.Event != nil { _, exists := dataCommitmentEvents[int64(events.Event.StartBlock)] @@ -309,6 +377,9 @@ func getAllDataCommitmentStoredEvents( continue } else { dataCommitmentEvents[int64(events.Event.StartBlock)] = *events.Event + if int64(events.Event.StartBlock) < latestTargetContractBlock { + gatheredTheNecessaryEvents = true + } } } if !events.Next() { @@ -320,6 +391,10 @@ func getAllDataCommitmentStoredEvents( logger.Info("found all events", "count", len(dataCommitmentEvents)) break } + if gatheredTheNecessaryEvents { + logger.Info("found enough events to cover the needed range", "count", len(dataCommitmentEvents)) + break + } logger.Info("found events", "count", len(dataCommitmentEvents)) } return dataCommitmentEvents, nil