Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipelining followups #147

Merged
merged 33 commits into from
Aug 31, 2023
Merged
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f2bc1a4
msgp in file plugins
Aug 18, 2023
18af031
lint
Aug 18, 2023
ce344d4
remove posErr
Aug 18, 2023
e41924b
Update conduit/plugins/exporters/filewriter/util.go
tzaffi Aug 18, 2023
b46628f
Update conduit/plugins/importers/filereader/test_resources/conduit_da…
tzaffi Aug 18, 2023
b7c82a1
gofmt
Aug 18, 2023
b1c2b0b
change file patterns in README's
Aug 18, 2023
5c06330
readmes
Aug 18, 2023
ae1ee48
Merge branch 'master' into file-plugins-msgp
Aug 21, 2023
d7fd948
per CR suggestion, comment out configuration not explicitly required …
Aug 22, 2023
1e71d1a
Merge remote-tracking branch 'origin/master' into file-plugins-msgp
Aug 22, 2023
497df82
per CR discussion: genesis is always `genesis.json`
Aug 22, 2023
fafc6d7
lint
Aug 22, 2023
e08d93a
per CR discussion: revert renaming/factoring of pipeline's makeConfig()
Aug 22, 2023
488f1e8
lint
Aug 22, 2023
e9af0bd
complete revert
Aug 22, 2023
873cc83
Update conduit/plugins/importers/filereader/fileReadWrite_test.go
tzaffi Aug 22, 2023
85714d1
trim the genesis - ridonculous
Aug 22, 2023
b18d85b
per CR: remove unneeded assignment
Aug 25, 2023
27b2168
Update conduit/plugins/exporters/filewriter/util.go
tzaffi Aug 25, 2023
fb7ea14
test defaults should actually test the defaults
Aug 25, 2023
14a64cd
typo
Aug 25, 2023
29ed381
privatize RetriesXYZ() + retriesNoInput()
Aug 29, 2023
78b8447
don't block inside of Start()
Aug 29, 2023
a4d7376
trim the special end of round log
Aug 29, 2023
f96dbc8
noop importer
Aug 29, 2023
d65afb4
test CLI for the health endpoint
Aug 29, 2023
f12f4e6
lint
Aug 29, 2023
4d0386a
gofmt
Aug 29, 2023
f38ed13
Update conduit/plugins/importers/noop/sample.yaml
tzaffi Aug 29, 2023
f68c929
Update pkg/cli/cli_test.go
tzaffi Aug 29, 2023
35c859c
Merge remote-tracking branch 'origin/master' into pipelining-followups
Aug 29, 2023
d635841
remove retriesNoInput() and its test
Aug 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
per CR discussion: revert renaming/factoring of pipeline's makeConfig()
  • Loading branch information
Zeph Grunschlag committed Aug 22, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit e08d93a555bf9621b4d7f9eab572b4fd1a4e3bcf
29 changes: 21 additions & 8 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
@@ -7,13 +7,15 @@ import (
"fmt"
"net/http"
"os"
"path"
"runtime/pprof"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
yaml "gopkg.in/yaml.v3"

sdk "github.com/algorand/go-algorand-sdk/v2/types"

@@ -124,16 +126,27 @@ func (p *pipelineImpl) registerPluginMetricsCallbacks() {
}
}

// configWithLogger creates a plugin config from a name and config pair.
// makeConfig creates a plugin config from a name and config pair.
// It also creates a logger for the plugin and configures it using the pipeline's log settings.
func (p *pipelineImpl) configWithLogger(cfg data.NameConfigPair, pluginType plugins.PluginType) (*log.Logger, plugins.PluginConfig, error) {
func (p *pipelineImpl) makeConfig(cfg data.NameConfigPair, pluginType plugins.PluginType) (*log.Logger, plugins.PluginConfig, error) {
var dataDir string
if p.cfg.ConduitArgs != nil {
dataDir = p.cfg.ConduitArgs.ConduitDataDir
}
config, err := pluginType.GetConfig(cfg, dataDir)

configs, err := yaml.Marshal(cfg.Config)
if err != nil {
return nil, plugins.PluginConfig{}, fmt.Errorf("configWithLogger(): unable to create plugin config: %w", err)
return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig(): could not serialize config: %w", err)
}

var config plugins.PluginConfig
config.Config = string(configs)
if dataDir != "" {
config.DataDir = path.Join(dataDir, fmt.Sprintf("%s_%s", pluginType, cfg.Name))
err = os.MkdirAll(config.DataDir, os.ModePerm)
if err != nil {
return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig: unable to create plugin data directory: %w", err)
}
}

lgr := log.New()
@@ -186,7 +199,7 @@ func (p *pipelineImpl) pluginRoundOverride() (uint64, error) {
var pluginOverride uint64
var pluginOverrideName string // cache this in case of error.
for _, part := range parts {
_, config, err := p.configWithLogger(part.cfg, part.t)
_, config, err := p.makeConfig(part.cfg, part.t)
if err != nil {
return 0, err
}
@@ -317,7 +330,7 @@ func (p *pipelineImpl) Init() error {

// Initialize Importer
{
importerLogger, pluginConfig, err := p.configWithLogger(p.cfg.Importer, plugins.Importer)
importerLogger, pluginConfig, err := p.makeConfig(p.cfg.Importer, plugins.Importer)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not make %s config: %w", p.cfg.Importer.Name, err)
}
@@ -350,7 +363,7 @@ func (p *pipelineImpl) Init() error {
// Initialize Processors
for idx, processor := range p.processors {
ncPair := p.cfg.Processors[idx]
logger, config, err := p.configWithLogger(ncPair, plugins.Processor)
logger, config, err := p.makeConfig(ncPair, plugins.Processor)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair, err)
}
@@ -363,7 +376,7 @@ func (p *pipelineImpl) Init() error {

// Initialize Exporter
{
logger, config, err := p.configWithLogger(p.cfg.Exporter, plugins.Exporter)
logger, config, err := p.makeConfig(p.cfg.Exporter, plugins.Exporter)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", p.cfg.Exporter.Name, err)
}
20 changes: 18 additions & 2 deletions conduit/plugins/importers/filereader/fileReadWrite_test.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ import (

logrusTest "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/algorand/conduit/conduit"
"github.com/algorand/conduit/conduit/data"
@@ -121,6 +122,21 @@ func identicalFilesUncompressed(t *testing.T, path1, path2 string) {
}
}

func getConfig(t *testing.T, pt plugins.PluginType, cfg data.NameConfigPair, dataDir string) plugins.PluginConfig {
configs, err := yaml.Marshal(cfg.Config)
require.NoError(t, err)

var config plugins.PluginConfig
config.Config = string(configs)
if dataDir != "" {
config.DataDir = path.Join(dataDir, fmt.Sprintf("%s_%s", pt, cfg.Name))
err = os.MkdirAll(config.DataDir, os.ModePerm)
require.NoError(t, err)
}

return config
}

// TestRoundTrip tests that blocks read by the filereader importer
// under the msgp.gz encoding are written to identical files by the filewriter exporter.
// This includes both a genesis block and a round-0 block with differend encodings.
@@ -159,7 +175,7 @@ func TestRoundTrip(t *testing.T) {
impCtor, err := importers.ImporterConstructorByName(plineConfig.Importer.Name)
require.NoError(t, err)
importer := impCtor.New()
impConfig, err := plugins.Importer.GetConfig(plineConfig.Importer, conduitDataDir)
impConfig := getConfig(t, plugins.Importer, plineConfig.Importer, conduitDataDir)
require.NoError(t, err)
require.Equal(t, path.Join(conduitDataDir, "importer_file_reader"), impConfig.DataDir)

@@ -189,7 +205,7 @@ func TestRoundTrip(t *testing.T) {
expCtor, err := exporters.ExporterConstructorByName(plineConfig.Exporter.Name)
require.NoError(t, err)
exporter := expCtor.New()
expConfig, err := plugins.Exporter.GetConfig(plineConfig.Exporter, conduitDataDir)
expConfig := getConfig(t, plugins.Exporter, plineConfig.Exporter, conduitDataDir)
require.NoError(t, err)
require.Equal(t, path.Join(conduitDataDir, "exporter_file_writer"), expConfig.DataDir)

29 changes: 0 additions & 29 deletions conduit/plugins/metadata.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
package plugins

import (
"fmt"
"os"
"path"

yaml "gopkg.in/yaml.v3"

"github.com/algorand/conduit/conduit/data"
)

// Metadata returns fields relevant to identification and description of plugins.
type Metadata struct {
Name string
@@ -32,22 +22,3 @@ const (
Importer PluginType = "importer"
)

// GetConfig creates an appropriate plugin config for the type.
func (pt PluginType) GetConfig(cfg data.NameConfigPair, dataDir string) (PluginConfig, error) {
configs, err := yaml.Marshal(cfg.Config)
if err != nil {
return PluginConfig{}, fmt.Errorf("GetConfig(): could not serialize config: %w", err)
}

var config PluginConfig
config.Config = string(configs)
if dataDir != "" {
config.DataDir = path.Join(dataDir, fmt.Sprintf("%s_%s", pt, cfg.Name))
err = os.MkdirAll(config.DataDir, os.ModePerm)
if err != nil {
return PluginConfig{}, fmt.Errorf("GetConfig: unable to create plugin data directory: %w", err)
}
}

return config, nil
}