Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipelining followups #8

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions conduit/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

log "github.com/sirupsen/logrus"

sdk "github.com/algorand/go-algorand-sdk/v2/types"

"github.com/algorand/conduit/conduit/data"
)

Expand All @@ -20,14 +22,14 @@ func HandlePanic(logger *log.Logger) {
type empty struct{}

type pluginInput interface {
uint64 | data.BlockData | string
uint64 | data.BlockData | string | empty
}

type pluginOutput interface {
pluginInput | empty
pluginInput | *sdk.Genesis
}

// Retries is a wrapper for retrying a function call f() with a cancellation context,
// retries is a wrapper for retrying a function call f() with a cancellation context,
// a delay and a max retry count. It attempts to call the wrapped function at least once
// and only after the first attempt will pay attention to a context cancellation.
// This can allow the pipeline to receive a cancellation and guarantee attempting to finish
Expand All @@ -45,7 +47,7 @@ type pluginOutput interface {
// - when p.cfg.retryCount > 0, the error will be a join of all the errors encountered during the retries
// - when p.cfg.retryCount == 0, the error will be the last error encountered
// - the returned duration dur is the total time spent in the function, including retries
func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) {
func retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) {
start := time.Now()

for i := uint64(0); p.cfg.RetryCount == 0 || i <= p.cfg.RetryCount; i++ {
Expand Down Expand Up @@ -74,9 +76,19 @@ func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipe
return
}

// RetriesNoOutput applies the same logic as Retries, but for functions that return no output.
func RetriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) {
_, d, err := Retries(func(x X) (empty, error) {
// TODO: probly the following function and its unit test should be axed
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^^^ !!!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we probably don't need a no-input function right now -- is this+tests+above interface going to be reverted?

// retriesNoInput applies the same logic as Retries, but for functions that take no input.

//nolint:unused
func retriesNoInput[Y pluginOutput](f func() (Y, error), p *pipelineImpl, msg string) (Y, time.Duration, error) {
return retries(func(x empty) (Y, error) {
return f()
}, empty{}, p, msg)
}

// retriesNoOutput applies the same logic as Retries, but for functions that return no output.
func retriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) {
_, d, err := retries(func(x X) (empty, error) {
return empty{}, f(x)
}, a, p, msg)
return d, err
Expand Down
85 changes: 77 additions & 8 deletions conduit/pipeline/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ func TestRetries(t *testing.T) {
}
}

succeedAfterFactoryNoInput := func(succeedAfter uint64, never bool) func() (uint64, error) {
tries := uint64(0)

return func() (uint64, error) {
if tries >= succeedAfter && !never {
return tries + 1, nil
}
tries++
return 0, fmt.Errorf("%w: tries=%d", errSentinelCause, tries-1)
}
}

cases := []struct {
name string
retryCount uint64
Expand Down Expand Up @@ -106,8 +118,8 @@ func TestRetries(t *testing.T) {
for _, tc := range cases {
tc := tc

// run cases for Retries()
t.Run("Retries() "+tc.name, func(t *testing.T) {
// run cases for retries()
t.Run("retries() "+tc.name, func(t *testing.T) {
t.Parallel()
ctx, ccf := context.WithCancelCause(context.Background())
p := &pipelineImpl{
Expand All @@ -127,7 +139,7 @@ func TestRetries(t *testing.T) {
yChan := make(chan uint64)
errChan := make(chan error)
go func() {
y, _, err := Retries(succeedAfter, 0, p, "test")
y, _, err := retries(succeedAfter, 0, p, "test")
yChan <- y
errChan <- err
}()
Expand All @@ -144,7 +156,7 @@ func TestRetries(t *testing.T) {
return
}

y, _, err := Retries(succeedAfter, 0, p, "test")
y, _, err := retries(succeedAfter, 0, p, "test")
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false
require.NoError(t, err, tc.name)

Expand All @@ -163,8 +175,8 @@ func TestRetries(t *testing.T) {
}
})

// run cases for RetriesNoOutput()
t.Run("RetriesNoOutput() "+tc.name, func(t *testing.T) {
// run cases for retriesNoOutput()
t.Run("retriesNoOutput() "+tc.name, func(t *testing.T) {
t.Parallel()
ctx, ccf := context.WithCancelCause(context.Background())
p := &pipelineImpl{
Expand All @@ -183,7 +195,7 @@ func TestRetries(t *testing.T) {

errChan := make(chan error)
go func() {
_, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test")
_, err := retriesNoOutput(succeedAfterNoOutput, 0, p, "test")
errChan <- err
}()
time.Sleep(5 * time.Millisecond)
Expand All @@ -197,7 +209,7 @@ func TestRetries(t *testing.T) {
return
}

_, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test")
_, err := retriesNoOutput(succeedAfterNoOutput, 0, p, "test")
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false
require.NoError(t, err, tc.name)
} else { // retryCount > 0 so doesn't retry forever
Expand All @@ -209,5 +221,62 @@ func TestRetries(t *testing.T) {
}
}
})

// run case for retriesNoInput()
t.Run("retriesNoInput() "+tc.name, func(t *testing.T) {
t.Parallel()
ctx, ccf := context.WithCancelCause(context.Background())
p := &pipelineImpl{
ctx: ctx,
ccf: ccf,
logger: log.New(),
cfg: &data.Config{
RetryCount: tc.retryCount,
RetryDelay: 1 * time.Millisecond,
},
}
succeedAfterNoInput := succeedAfterFactoryNoInput(tc.succeedAfter, tc.neverSucceed)

if tc.retryCount == 0 && tc.neverSucceed {
// avoid infinite loop by cancelling the context

errChan := make(chan error)
yChan := make(chan uint64)
go func() {
out, _, err := retriesNoInput(succeedAfterNoInput, p, "test")
yChan <- out
errChan <- err
}()
time.Sleep(5 * time.Millisecond)
errTestCancelled := errors.New("test cancelled")
go func() {
ccf(errTestCancelled)
}()
y := <-yChan
err := <-errChan
require.ErrorIs(t, err, errTestCancelled, tc.name)
require.ErrorIs(t, err, errSentinelCause, tc.name)
require.Zero(t, y, tc.name)
return
}

y, _, err := retriesNoInput(succeedAfterNoInput, p, "test")
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false
require.NoError(t, err, tc.name)

// note we subtract 1 from y below because succeedAfter has added 1 to its output
require.Equal(t, tc.succeedAfter, y-1, tc.name)
} else { // retryCount > 0 so doesn't retry forever
if tc.neverSucceed || tc.succeedAfter > tc.retryCount {
require.ErrorContains(t, err, fmt.Sprintf("%d", tc.retryCount), tc.name)
require.ErrorIs(t, err, errSentinelCause, tc.name)
require.Zero(t, y, tc.name)
} else { // !tc.neverSucceed && succeedAfter <= retryCount
require.NoError(t, err, tc.name)
require.Equal(t, tc.succeedAfter, y-1, tc.name)
}
}
})

}
}
15 changes: 6 additions & 9 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ func (p *pipelineImpl) importerHandler(importer importers.Importer, roundChan <-
totalSelectWait += waitTime
p.logger.Tracef("importer handler waited %dms to receive round %d", waitTime.Milliseconds(), rnd)

blkData, importTime, lastError := Retries(importer.GetBlock, rnd, p, importer.Metadata().Name)
blkData, importTime, lastError := retries(importer.GetBlock, rnd, p, importer.Metadata().Name)
if lastError != nil {
p.cancelWithProblem(fmt.Errorf("importer %s handler (%w): failed to import round %d after %dms: %w", importer.Metadata().Name, errImporterCause, rnd, importTime.Milliseconds(), lastError))
return
Expand Down Expand Up @@ -533,7 +533,7 @@ func (p *pipelineImpl) processorHandler(idx int, proc processors.Processor, blkI

var procTime time.Duration
var lastError error
blk, procTime, lastError = Retries(proc.Process, blk, p, proc.Metadata().Name)
blk, procTime, lastError = retries(proc.Process, blk, p, proc.Metadata().Name)
if lastError != nil {
p.cancelWithProblem(fmt.Errorf("processor[%d] %s handler (%w): failed to process round %d after %dms: %w", idx, proc.Metadata().Name, errProcessorCause, lastRnd, procTime.Milliseconds(), lastError))
return
Expand Down Expand Up @@ -598,7 +598,7 @@ func (p *pipelineImpl) exporterHandler(exporter exporters.Exporter, blkChan plug
}

var exportTime time.Duration
exportTime, lastError = RetriesNoOutput(exporter.Receive, blk, p, eName)
exportTime, lastError = retriesNoOutput(exporter.Receive, blk, p, eName)
if lastError != nil {
lastError = fmt.Errorf("aborting after failing to export round %d: %w", lastRound, lastError)
return
Expand Down Expand Up @@ -640,16 +640,15 @@ func (p *pipelineImpl) exporterHandler(exporter exporters.Exporter, blkChan plug
// WARNING: removing/re-log-levelling the following will BREAK:
// - the E2E test (Search for "Pipeline round" in subslurp.py)
// - the internal tools logstats collector (See func ConduitCollector in logstats.go of internal-tools repo)
p.logger.Infof(logstatsE2Elog(nextRound, lastRound, len(blk.Payset), exportTime))
p.logger.Infof(logstatsE2Elog(lastRound, len(blk.Payset), exportTime))
}
}
}()
}

func logstatsE2Elog(nextRound, lastRound uint64, topLevelTxnCount int, exportTime time.Duration) string {
func logstatsE2Elog(lastRound uint64, topLevelTxnCount int, exportTime time.Duration) string {
return fmt.Sprintf(
"UPDATED Pipeline NextRound=%d. FINISHED Pipeline round r=%d (%d txn) exported in %s",
nextRound,
"FINISHED Pipeline round r=%d (%d txn) exported in %s",
lastRound,
topLevelTxnCount,
exportTime,
Expand Down Expand Up @@ -696,8 +695,6 @@ func (p *pipelineImpl) Start() {
}
}
}(p.pipelineMetadata.NextRound)

<-p.ctx.Done()
Comment on lines -699 to -700
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the health endpoint bugfix.

}

func (p *pipelineImpl) Wait() {
Expand Down
5 changes: 2 additions & 3 deletions conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,13 +990,12 @@ func TestMetrics(t *testing.T) {
}

func TestLogStatsE2Elog(t *testing.T) {
nextRound := uint64(1337)
round := uint64(42)
numTxns := 13
duration := 12345600 * time.Microsecond

expectedLog := "UPDATED Pipeline NextRound=1337. FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s"
log := logstatsE2Elog(nextRound, round, numTxns, duration)
expectedLog := "FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s"
log := logstatsE2Elog(round, numTxns, duration)
require.Equal(t, expectedLog, log)

logstatsRex, err := regexp.Compile(`round r=(\d+) \((\d+) txn\) exported in (.*)`)
Expand Down
1 change: 1 addition & 0 deletions conduit/plugins/importers/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ import (
// Call package wide init function
_ "github.com/algorand/conduit/conduit/plugins/importers/algod"
_ "github.com/algorand/conduit/conduit/plugins/importers/filereader"
_ "github.com/algorand/conduit/conduit/plugins/importers/noop"
)
81 changes: 81 additions & 0 deletions conduit/plugins/importers/noop/noop_importer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package noop

import (
"context"
_ "embed" // used to embed config
"fmt"
"time"

"github.com/sirupsen/logrus"

sdk "github.com/algorand/go-algorand-sdk/v2/types"

"github.com/algorand/conduit/conduit/data"
"github.com/algorand/conduit/conduit/plugins"
"github.com/algorand/conduit/conduit/plugins/importers"
)

// PluginName to use when configuring.
var PluginName = "noop"

const sleepForGetBlock = 100 * time.Millisecond

// `noopImporter`s will function without ever erroring. This means they will also process out of order blocks
// which may or may not be desirable for different use cases--it can hide errors in actual importers expecting in order
// block processing.
// The `noopImporter` will maintain `Round` state according to the round of the last block it processed.
// It also sleeps 100 milliseconds between blocks to slow down the pipeline.
type noopImporter struct {
round uint64
cfg ImporterConfig
}

//go:embed sample.yaml
var sampleConfig string

var metadata = plugins.Metadata{
Name: PluginName,
Description: "noop importer",
Deprecated: false,
SampleConfig: sampleConfig,
}

func (imp *noopImporter) Metadata() plugins.Metadata {
return metadata
}

func (imp *noopImporter) Init(_ context.Context, _ data.InitProvider, cfg plugins.PluginConfig, _ *logrus.Logger) error {
if err := cfg.UnmarshalConfig(&imp.cfg); err != nil {
return fmt.Errorf("init failure in unmarshalConfig: %v", err)
}
imp.round = imp.cfg.Round
return nil
}

func (imp *noopImporter) Close() error {
return nil
}

func (imp *noopImporter) GetGenesis() (*sdk.Genesis, error) {
return &sdk.Genesis{}, nil
}

func (imp *noopImporter) GetBlock(rnd uint64) (data.BlockData, error) {
time.Sleep(sleepForGetBlock)
imp.round = rnd
return data.BlockData{
BlockHeader: sdk.BlockHeader{
Round: sdk.Round(rnd),
},
}, nil
}

func (imp *noopImporter) Round() uint64 {
return imp.round
}

func init() {
importers.Register(PluginName, importers.ImporterConstructorFunc(func() importers.Importer {
return &noopImporter{}
}))
}
7 changes: 7 additions & 0 deletions conduit/plugins/importers/noop/noop_importer_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package noop

// ImporterConfig specific to the noop importer
type ImporterConfig struct {
// Optionally specify the round to start on
Round uint64 `yaml:"round"`
}
3 changes: 3 additions & 0 deletions conduit/plugins/importers/noop/sample.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name: noop
# noop has no config
config:
2 changes: 1 addition & 1 deletion e2e_tests/src/e2e_conduit/subslurp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
logger = logging.getLogger(__name__)

# Matches conduit log output:
# "UPDATED Pipeline NextRound=1337. FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s"
# "FINISHED Pipeline round r=42 (13 txn) exported in 12.3456s"
FINISH_ROUND: re.Pattern = re.compile(b"FINISHED Pipeline round r=(\d+)")


Expand Down
3 changes: 3 additions & 0 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func runConduitCmdWithConfig(args *data.Args) error {

// Start server
if pCfg.API.Address != "" {
logger.Infof("starting API server on %s", pCfg.API.Address)
shutdown, err := api.StartServer(logger, pline, pCfg.API.Address)
if err != nil {
// Suppress log, it is about to be printed to stderr.
Expand All @@ -114,6 +115,8 @@ func runConduitCmdWithConfig(args *data.Args) error {
return fmt.Errorf("failed to start API server: %w", err)
}
defer shutdown(context.Background())
} else {
logger.Info("API server is disabled")
}

pline.Wait()
Expand Down
Loading