diff --git a/integration-tests/go.mod b/integration-tests/go.mod index b7ef4de1bae..3aeb40dd0a0 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -46,6 +46,7 @@ require ( github.com/smartcontractkit/chainlink-testing-framework/havoc v1.50.2 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.18 github.com/smartcontractkit/chainlink-testing-framework/lib/grafana v1.50.0 + github.com/smartcontractkit/chainlink-testing-framework/sentinel v0.0.0-00010101000000-000000000000 github.com/smartcontractkit/chainlink-testing-framework/seth v1.50.9 github.com/smartcontractkit/chainlink-testing-framework/wasp v1.50.2 github.com/smartcontractkit/chainlink/deployment v0.0.0-00010101000000-000000000000 @@ -547,6 +548,6 @@ replace ( github.com/btcsuite/btcd/btcec/v2 => github.com/btcsuite/btcd/btcec/v2 v2.3.2 // replicating the replace directive on cosmos SDK github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 - + github.com/smartcontractkit/chainlink-testing-framework/sentinel => ../../chainlink-testing-framework/sentinel github.com/sourcegraph/sourcegraph/lib => github.com/sourcegraph/sourcegraph-public-snapshot/lib v0.0.0-20240822153003-c864f15af264 ) diff --git a/integration-tests/testsetups/ocr.go b/integration-tests/testsetups/ocr.go index 7a90c38fdd0..12ca38135fb 100644 --- a/integration-tests/testsetups/ocr.go +++ b/integration-tests/testsetups/ocr.go @@ -21,6 +21,7 @@ import ( geth "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/pelletier/go-toml/v2" "github.com/rs/zerolog" "github.com/stretchr/testify/require" @@ -45,6 +46,9 @@ import ( "github.com/smartcontractkit/chainlink-testing-framework/lib/networks" reportModel "github.com/smartcontractkit/chainlink-testing-framework/lib/testreporters" "github.com/smartcontractkit/chainlink-testing-framework/lib/utils/testcontext" + "github.com/smartcontractkit/chainlink-testing-framework/sentinel" + sentinelapi "github.com/smartcontractkit/chainlink-testing-framework/sentinel/api" + "github.com/smartcontractkit/chainlink-testing-framework/sentinel/blockchain_client_wrapper" "github.com/smartcontractkit/chainlink/deployment/environment/nodeclient" "github.com/smartcontractkit/chainlink/integration-tests/actions" @@ -66,11 +70,13 @@ type OCRSoakTest struct { OperatorForwarderFlow bool sethClient *seth.Client OCRVersion string + sc *SentinelCoordinator t *testing.T startTime time.Time timeLeft time.Duration startingBlockNum uint64 + endBlockNum uint64 startingValue int testEnvironment *environment.Environment namespace string @@ -80,6 +86,7 @@ type OCRSoakTest struct { mockServer *ctf_client.MockserverClient filterQuery geth.FilterQuery + ocrStateMutex sync.Mutex // Protects access to ocrRoundStates and testIssues ocrRoundStates []*testreporters.OCRRoundState testIssues []*testreporters.TestIssue @@ -98,6 +105,14 @@ type OCRSoakTest struct { chaosList []*havoc.Chaos // list of chaos simulations to run during the test } +type SentinelCoordinator struct { + sentinel *sentinel.Sentinel + wg *sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + log zerolog.Logger +} + type OCRSoakTestOption = func(c *OCRSoakTest) func WithChaos(chaosList []*havoc.Chaos) OCRSoakTestOption { @@ -120,6 +135,7 @@ func WithForwarderFlow(forwarderFlow bool) OCRSoakTestOption { // NewOCRSoakTest creates a new OCR soak test to setup and run func NewOCRSoakTest(t *testing.T, config *tc.TestConfig, opts ...OCRSoakTestOption) (*OCRSoakTest, error) { + sc := NewSentinelCoordinator(logging.GetTestLogger(t)) test := &OCRSoakTest{ Config: config, TestReporter: testreporters.OCRSoakTestReporter{ @@ -132,6 +148,7 @@ func NewOCRSoakTest(t *testing.T, config *tc.TestConfig, opts ...OCRSoakTestOpti ocrRoundStates: make([]*testreporters.OCRRoundState, 0), ocrV1InstanceMap: make(map[string]contracts.OffchainAggregator), ocrV2InstanceMap: make(map[string]contracts.OffchainAggregatorV2), + sc: sc, } ocrVersion := "1" @@ -147,6 +164,7 @@ func NewOCRSoakTest(t *testing.T, config *tc.TestConfig, opts ...OCRSoakTestOpti } t.Cleanup(func() { test.deleteChaosSimulations() + test.sc.Shutdown() }) return test, test.ensureInputValues() } @@ -283,6 +301,174 @@ func (o *OCRSoakTest) Environment() *environment.Environment { return o.testEnvironment } +func NewSentinelCoordinator(log zerolog.Logger) *SentinelCoordinator { + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + s := sentinel.NewSentinel(sentinel.SentinelConfig{Logger: log}) + + return &SentinelCoordinator{ + sentinel: s, + wg: wg, + ctx: ctx, + cancel: cancel, + log: log, + } +} + +// setupSentinel initializes Sentinel, adds chains, and subscribes to required events. +func (o *OCRSoakTest) setupSentinel() error { + blockchainClient := blockchain_client_wrapper.NewGethClientWrapper(o.sethClient.Client) // Wrap ethClient to implement api.BlockchainClient + // Define the chain poller service configuration + addChainConfig := sentinel.AddChainConfig{ + ChainID: o.rpcNetwork.ChainID, + PollInterval: 30 * time.Second, + BlockchainClient: blockchainClient, + } + // Add the chain to Sentinel + if err := o.sc.sentinel.AddChain(addChainConfig); err != nil { + return fmt.Errorf("failed to add chain to Sentinel: %w", err) + } + + // Subscribe to OCR events + ocrAddresses := o.getContractAddresses() + for _, addr := range ocrAddresses { + var topic common.Hash + if o.OCRVersion == "1" { + // Use OCRv1 specific topic + contractABI, err := offchainaggregator.OffchainAggregatorMetaData.GetAbi() + if err != nil { + return fmt.Errorf("failed to get OCRv1 contract ABI: %w", err) + } + topic = contractABI.Events["AnswerUpdated"].ID + } else if o.OCRVersion == "2" { + // Use OCRv2 specific topic + contractABI, err := ocr2aggregator.AggregatorInterfaceMetaData.GetAbi() + if err != nil { + return fmt.Errorf("failed to get OCRv2 contract ABI: %w", err) + } + topic = contractABI.Events["AnswerUpdated"].ID + } + + // Subscribe to the topic for each OCR contract + logCh, err := o.sc.sentinel.Subscribe(o.rpcNetwork.ChainID, addr, topic) + if err != nil { + return fmt.Errorf("failed to subscribe to events for contract %s: %w", addr.Hex(), err) + } + + o.sc.wg.Add(1) + + // Start a goroutine to listen for logs + go o.handleSentinelLogs(logCh) + } + + return nil +} + +// handleSentinelLogs processes logs received from Sentinel's subscription channels. +func (o *OCRSoakTest) handleSentinelLogs(logCh chan sentinelapi.Log) { + defer o.sc.wg.Done() + + for { + select { + case <-o.sc.ctx.Done(): + o.log.Info().Msg("Sentinel log handler received cancellation signal. Exiting goroutine.") + return + case log, ok := <-logCh: + if !ok { + o.log.Info().Msg("Sentinel log channel closed. Exiting goroutine.") + return + } + + typesLog, err := sentinel.ConvertAPILogToTypesLog(log) + if err != nil { + o.log.Warn().Err(err).Msg("Error converting Sentinel log to types.Log") + continue + } + + answer, err := o.parseAnswer(*typesLog) + if err != nil { + o.log.Warn().Err(err).Msg("Error parsing answer from OCR event") + continue + } + roundID, err := o.parseRoundID(*typesLog) + if err != nil { + o.log.Warn().Err(err).Msg("Error parsing roundID from OCR event") + continue + } + + event := &testreporters.FoundEvent{ + StartTime: time.Now(), + Address: log.Address.Hex(), + Answer: answer, + RoundID: roundID, + BlockNumber: log.BlockNumber, + } + + o.processFoundEvent(event) + } + } +} + +// processFoundEvent processes a single found event. +func (o *OCRSoakTest) processFoundEvent(event *testreporters.FoundEvent) { + // Lock to protect access to ocrRoundStates and testIssues + o.ocrStateMutex.Lock() + defer o.ocrStateMutex.Unlock() + + if len(o.ocrRoundStates) > 0 { + currentRound := (o.ocrRoundStates)[len(o.ocrRoundStates)-1] + currentRound.FoundEvents[event.Address] = append(currentRound.FoundEvents[event.Address], event) + currentRound.TimeLineEvents = append(currentRound.TimeLineEvents, event) + } + + o.log.Info(). + Str("Address", event.Address). + Uint64("Block Number", event.BlockNumber). + Int64("Answer", event.Answer). + Uint64("Round ID", event.RoundID). + Msg("Processed OCR AnswerUpdated Event via Sentinel") +} + +// parseAnswer extracts the answer from the log based on OCR version. +func (o *OCRSoakTest) parseAnswer(log types.Log) (int64, error) { + if o.OCRVersion == "1" { + answerUpdated, err := o.ocrV1Instances[0].ParseEventAnswerUpdated(log) + if err != nil { + o.log.Warn().Err(err).Msg("Error parsing OCRv1 AnswerUpdated event") + return 0, fmt.Errorf("error parsing OCRv1 AnswerUpdated event: %w", err) + } + return answerUpdated.Current.Int64(), nil + } else if o.OCRVersion == "2" { + answerUpdated, err := o.ocrV2Instances[0].ParseEventAnswerUpdated(log) + if err != nil { + o.log.Warn().Err(err).Msg("Error parsing OCRv2 AnswerUpdated event") + return 0, fmt.Errorf("error parsing OCRv2 AnswerUpdated event: %w", err) + } + return answerUpdated.Current.Int64(), nil + } + return 0, fmt.Errorf("OCR version not supported: %s", o.OCRVersion) +} + +// parseRoundID extracts the round ID from the log based on OCR version. +func (o *OCRSoakTest) parseRoundID(log types.Log) (uint64, error) { + if o.OCRVersion == "1" { + answerUpdated, err := o.ocrV1Instances[0].ParseEventAnswerUpdated(log) + if err != nil { + o.log.Warn().Err(err).Msg("Error parsing OCRv1 RoundID from event") + return 0, fmt.Errorf("error parsing OCRv1 RoundID from event: %w", err) + } + return answerUpdated.RoundId.Uint64(), nil + } else if o.OCRVersion == "2" { + answerUpdated, err := o.ocrV2Instances[0].ParseEventAnswerUpdated(log) + if err != nil { + o.log.Warn().Err(err).Msg("Error parsing OCRv2 RoundID from event") + return 0, fmt.Errorf("error parsing OCRv2 RoundID from event: %w", err) + } + return answerUpdated.RoundId.Uint64(), nil + } + return 0, fmt.Errorf("OCR version not supported: %s", o.OCRVersion) +} + // Setup initializes the OCR Soak Test by setting up clients, funding nodes, and deploying OCR contracts. func (o *OCRSoakTest) Setup(ocrTestConfig tt.OcrTestConfig) { o.initializeClients() @@ -298,6 +484,10 @@ func (o *OCRSoakTest) Setup(ocrTestConfig tt.OcrTestConfig) { o.setupOCRContracts(ocrTestConfig, forwarders) o.log.Info().Msg("OCR Soak Test Setup Complete") + + // Initialize Sentinel and set up subscriptions + err := o.setupSentinel() + require.NoError(o.t, err, "Failed to set up Sentinel") } // initializeClients sets up the Seth client, Chainlink nodes, and mock server. @@ -448,6 +638,9 @@ func (o *OCRSoakTest) Run() { Msg("Starting OCR Soak Test") o.testLoop(o.Config.GetActiveOCRConfig().Common.TestDuration.Duration, o.startingValue) + ctx, cancel = context.WithTimeout(testcontext.Get(o.t), time.Second*5) + endBlockNum, err := o.sethClient.Client.BlockNumber(ctx) + o.endBlockNum = endBlockNum o.complete() } @@ -682,25 +875,28 @@ func (o *OCRSoakTest) Interrupted() bool { // ****** Helpers ****** // ********************* +func (sc *SentinelCoordinator) Shutdown() { + sc.log.Info().Msg("Shutting down SentinelCoordinator.") + sc.cancel() // Signal all goroutines to stop + sc.wg.Wait() // Wait for all goroutines to finish + sc.sentinel.Close() // Close any Sentinel-specific resources + sc.log.Info().Msg("SentinelCoordinator shutdown complete.") +} + // testLoop is the primary test loop that will trigger new rounds and watch events func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) { + // Gracefully shut down Sentinel when the test finishes + defer o.sc.Shutdown() + endTest := time.After(testDuration) interruption := make(chan os.Signal, 1) //nolint:staticcheck //ignore SA1016 we need to send the os.Kill signal signal.Notify(interruption, os.Kill, os.Interrupt, syscall.SIGTERM) - ctx, cancel := context.WithCancel(context.Background()) - var wg sync.WaitGroup - // Channel to signal polling to reset round event counter - resetEventCounter := make(chan struct{}) - defer close(resetEventCounter) - lastValue := 0 newRoundTrigger := time.NewTimer(0) // Want to trigger a new round ASAP defer newRoundTrigger.Stop() o.setFilterQuery() - wg.Add(1) - go o.pollingOCREvents(ctx, &wg, resetEventCounter) n := o.Config.GetNetworkConfig() @@ -778,21 +974,25 @@ func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) { case <-interruption: saveStart := time.Now() o.log.Warn().Msg("Test interrupted, saving state before shut down") + + o.ocrStateMutex.Lock() o.testIssues = append(o.testIssues, &testreporters.TestIssue{ StartTime: time.Now(), Message: "Test Interrupted", }) + o.ocrStateMutex.Unlock() + if err := o.SaveState(); err != nil { o.log.Error().Err(err).Msg("Error saving state") } o.log.Warn().Str("Time Taken", time.Since(saveStart).String()).Msg("Saved state") o.deleteChaosSimulations() + o.sc.Shutdown() os.Exit(interruptedExitCode) // Exit with interrupted code to indicate test was interrupted, not just a normal failure case <-endTest: - cancel() - wg.Wait() // Wait for polling to complete return case <-newRoundTrigger.C: + o.checkNumberOfEventsForRound() err := o.triggerNewRound(newValue) timerReset := o.Config.GetActiveOCRConfig().Soak.TimeBetweenRounds.Duration if err != nil { @@ -801,8 +1001,13 @@ func (o *OCRSoakTest) testLoop(testDuration time.Duration, newValue int) { Str("Waiting", timerReset.String()). Msg("Error triggering new round, waiting and trying again. Possible connection issues with mockserver") } - // Signal polling to reset event counter - resetEventCounter <- struct{}{} + // Stop the timer to ensure it's not already expired + if !newRoundTrigger.Stop() { + select { + case <-newRoundTrigger.C: + default: + } + } newRoundTrigger.Reset(timerReset) // Change value for the next round @@ -908,175 +1113,47 @@ func (o *OCRSoakTest) setFilterQuery() { Msg("Filter Query Set") } -// pollingOCREvents Polls the blocks for OCR events and logs them to the test logger -func (o *OCRSoakTest) pollingOCREvents(ctx context.Context, wg *sync.WaitGroup, resetEventCounter <-chan struct{}) { - defer wg.Done() - // Keep track of the last processed block number - processedBlockNum := o.startingBlockNum - 1 - // TODO: Make this configurable - pollInterval := time.Second * 30 - ticker := time.NewTicker(pollInterval) - defer ticker.Stop() - - // Retrieve expected number of events per round from configuration - expectedEventsPerRound := *o.Config.GetActiveOCRConfig().Common.NumberOfContracts - eventCounter := 0 - roundTimeout := o.Config.GetActiveOCRConfig().Soak.TimeBetweenRounds.Duration - timeoutTimer := time.NewTimer(roundTimeout) - round := 0 - defer timeoutTimer.Stop() - - o.log.Info().Msg("Start Polling for Answer Updated Events") - - for { - select { - case <-resetEventCounter: - if round != 0 { - if eventCounter == expectedEventsPerRound { - o.log.Info(). - Int("Events found", eventCounter). - Int("Events Expected", expectedEventsPerRound). - Msg("All expected events found") - } else if eventCounter < expectedEventsPerRound { - o.log.Warn(). - Int("Events found", eventCounter). - Int("Events Expected", expectedEventsPerRound). - Msg("Expected to find more events") - } - } - // Reset event counter and timer for new round - eventCounter = 0 - // Safely stop and drain the timer if a value is present - if !timeoutTimer.Stop() { - <-timeoutTimer.C - } - timeoutTimer.Reset(roundTimeout) - o.log.Info().Msg("Polling for new round, event counter reset") - round++ - case <-ctx.Done(): - o.log.Info().Msg("Test duration ended, finalizing event polling") - timeoutTimer.Reset(roundTimeout) - // Wait until expected events are fetched or until timeout - for eventCounter < expectedEventsPerRound { - select { - case <-timeoutTimer.C: - o.log.Warn().Msg("Timeout reached while waiting for final events") - return - case <-ticker.C: - o.fetchAndProcessEvents(&eventCounter, expectedEventsPerRound, &processedBlockNum) - } - } - o.log.Info(). - Int("Events found", eventCounter). - Int("Events Expected", expectedEventsPerRound). - Msg("Stop polling.") - return - case <-ticker.C: - o.fetchAndProcessEvents(&eventCounter, expectedEventsPerRound, &processedBlockNum) - } - } -} - -// Helper function to poll events and update eventCounter -func (o *OCRSoakTest) fetchAndProcessEvents(eventCounter *int, expectedEvents int, processedBlockNum *uint64) { - latestBlock, err := o.sethClient.Client.BlockNumber(context.Background()) - if err != nil { - o.log.Error().Err(err).Msg("Error getting latest block number") - return - } - - if *processedBlockNum == latestBlock { - o.log.Debug(). - Uint64("Latest Block", latestBlock). - Uint64("Last Processed Block Number", *processedBlockNum). - Msg("No new blocks since last poll") +// Checks if the current round has received all expected events +func (o *OCRSoakTest) checkNumberOfEventsForRound() { + o.ocrStateMutex.Lock() + defer o.ocrStateMutex.Unlock() + if len(o.ocrRoundStates) < 1 { return } + currentRound := o.ocrRoundStates[len(o.ocrRoundStates)-1] - // Check if the latest block is behind processedBlockNum due to possible reorgs - if *processedBlockNum > latestBlock { - o.log.Error(). - Uint64("From Block", *processedBlockNum). - Uint64("To Block", latestBlock). - Msg("The latest block is behind the processed block. This could happen due to RPC issues or possibly a reorg") - *processedBlockNum = latestBlock - return + // Calculate total received events for the current round + totalReceivedEvents := 0 + for _, addresses := range currentRound.FoundEvents { + for range addresses { + totalReceivedEvents++ + } } + numberOfContracts := len(o.getContractAddresses()) - fromBlock := *processedBlockNum + 1 - o.filterQuery.FromBlock = big.NewInt(0).SetUint64(fromBlock) - o.filterQuery.ToBlock = big.NewInt(0).SetUint64(latestBlock) - - o.log.Debug(). - Uint64("From Block", fromBlock). - Uint64("To Block", latestBlock). - Msg("Fetching logs for the specified range") - - logs, err := o.sethClient.Client.FilterLogs(context.Background(), o.filterQuery) - if err != nil { - o.log.Error().Err(err).Msg("Error fetching logs") + if totalReceivedEvents == numberOfContracts { + o.log.Info(). + Int("Expected Events", numberOfContracts). + Int("Received Events", totalReceivedEvents). + Msg("Received all expected events for OCR round") return + } else if totalReceivedEvents < numberOfContracts { + o.log.Warn(). + Int("Expected Events", numberOfContracts). + Int("Received Events", totalReceivedEvents). + Msg("Expected to find more events") + } else { + o.log.Warn(). + Int("Expected Events", numberOfContracts). + Int("Received Events", totalReceivedEvents). + Msg("Exceeded expected number of events for OCR round") } - - for _, event := range logs { - *eventCounter++ - if o.OCRVersion == "1" { - answerUpdated, err := o.ocrV1Instances[0].ParseEventAnswerUpdated(event) - if err != nil { - o.log.Warn(). - Err(err). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Msg("Error parsing event as AnswerUpdated") - continue - } - if *eventCounter <= expectedEvents { - o.log.Info(). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Uint64("Round ID", answerUpdated.RoundId.Uint64()). - Int64("Answer", answerUpdated.Current.Int64()). - Msg("Answer Updated Event") - } else { - o.log.Error(). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Uint64("Round ID", answerUpdated.RoundId.Uint64()). - Int64("Answer", answerUpdated.Current.Int64()). - Msg("Excess event detected, beyond expected count") - } - } else if o.OCRVersion == "2" { - answerUpdated, err := o.ocrV2Instances[0].ParseEventAnswerUpdated(event) - if err != nil { - o.log.Warn(). - Err(err). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Msg("Error parsing event as AnswerUpdated") - continue - } - if *eventCounter <= expectedEvents { - o.log.Info(). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Uint64("Round ID", answerUpdated.RoundId.Uint64()). - Int64("Answer", answerUpdated.Current.Int64()). - Msg("Answer Updated Event") - } else { - o.log.Error(). - Str("Address", event.Address.Hex()). - Uint64("Block Number", event.BlockNumber). - Uint64("Round ID", answerUpdated.RoundId.Uint64()). - Int64("Answer", answerUpdated.Current.Int64()). - Msg("Excess event detected, beyond expected count") - } - } - } - *processedBlockNum = latestBlock } // triggers a new OCR round by setting a new mock adapter value func (o *OCRSoakTest) triggerNewRound(newValue int) error { + o.ocrStateMutex.Lock() + defer o.ocrStateMutex.Unlock() if len(o.ocrRoundStates) > 0 { o.ocrRoundStates[len(o.ocrRoundStates)-1].EndTime = time.Now() } @@ -1115,6 +1192,8 @@ func (o *OCRSoakTest) triggerNewRound(newValue int) error { func (o *OCRSoakTest) collectEvents() error { start := time.Now() + o.ocrStateMutex.Lock() + defer o.ocrStateMutex.Unlock() if len(o.ocrRoundStates) == 0 { return fmt.Errorf("error collecting on-chain events, no rounds have been started") } @@ -1123,6 +1202,7 @@ func (o *OCRSoakTest) collectEvents() error { // Set from block to be starting block before filtering o.filterQuery.FromBlock = big.NewInt(0).SetUint64(o.startingBlockNum) + o.filterQuery.ToBlock = big.NewInt(0).SetUint64(o.endBlockNum) // We must retrieve the events, use exponential backoff for timeout to retry timeout := time.Second * 15