diff --git a/CHANGELOG.md b/CHANGELOG.md index ca0ec70..5798aa7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ Operators, you should copy/paste content of this content straight to your projec If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary. +## Unreleased + +* [Operator] Node Manager HTTP `/v1/resume` call now accepts `extra-env==&extra-env==` enabling to override environment variables for the next restart **only**. Use `curl -XPOST "http://localhost:10011/v1/resume?sync=true&extra-env=NODE_DEBUG=true"` (change `localhost:10011` accordingly to your setup). + + > This is **not** persistent upon restart! + ## v1.6.4 ### Substreams fixes @@ -19,7 +25,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s ### Substreams fixes * Fix "cannot resolve 'old cursor' from files in passthrough mode" error on some requests with an old cursor -* Fix handling of 'special case' substreams module with only "params" as its input: should not skip this execution (used in graph-node for head tracking) +* Fix handling of 'special case' substreams module with only "params" as its input: should not skip this execution (used in graph-node for head tracking) -> empty files in module cache with hash `d3b1920483180cbcd2fd10abcabbee431146f4c8` should be deleted for consistency * Fix bug where some invalid cursors may be sent (with 'LIB' being above the block being sent) and add safeguard/loggin if the bug appears again * Fix panic in the whole tier2 process when stores go above the size limit while being read from "kvops" cached changes @@ -66,7 +72,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s * Substreams: add `--common-tmp-dir` flag and activate local caching of pre-compiled WASM modules through wazero feature * Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero firstStreamableBlock. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module. * Substreams: add `--substreams-block-execution-timeout` flag (default 3 minutes) to prevent requests stalling -* * Metering update: more detailed metering with addition of new metrics (`live_uncompressed_read_bytes`, `live_uncompressed_read_forked_bytes`, `file_uncompressed_read_bytes`, `file_uncompressed_read_forked_bytes`, `file_compressed_read_forked_bytes`, `file_compressed_read_bytes`). *DEPRECATION WARNING*: `bytes_read` and `bytes_written` metrics will be removed in the future, please use the new metrics for metering instead. +* Metering update: more detailed metering with addition of new metrics (`live_uncompressed_read_bytes`, `live_uncompressed_read_forked_bytes`, `file_uncompressed_read_bytes`, `file_uncompressed_read_forked_bytes`, `file_compressed_read_forked_bytes`, `file_compressed_read_bytes`). *DEPRECATION WARNING*: `bytes_read` and `bytes_written` metrics will be removed in the future, please use the new metrics for metering instead. ## v1.5.7 diff --git a/consolereader.go b/consolereader.go index f84f3ce..e1908ed 100644 --- a/consolereader.go +++ b/consolereader.go @@ -187,8 +187,7 @@ func (r *ConsoleReader) readInit(line string) error { switch r.readerProtocolVersion { // Implementation of RPC poller were set to use 1.0 so we keep support for it for now case "1.0", "3.0": - r.logger.Info("console reader protocol version set", zap.String("version", r.readerProtocolVersion)) - + // Supported default: return fmt.Errorf("major version of Firehose exchange protocol is unsupported (expected: one of [1.0, 3.0], found %s), you are most probably running an incompatible version of the Firehose aware node client/node poller", r.readerProtocolVersion) } @@ -200,6 +199,11 @@ func (r *ConsoleReader) readInit(line string) error { r.setProtoMessageType(protobufFullyQualifiedName) + r.logger.Info("console reader protocol version init", + zap.String("version", r.readerProtocolVersion), + zap.String("protobuf_fully_qualified_name", protobufFullyQualifiedName), + ) + return nil } diff --git a/devel/standard/standard.yaml b/devel/standard/standard.yaml index 28b0613..e83984d 100644 --- a/devel/standard/standard.yaml +++ b/devel/standard/standard.yaml @@ -20,6 +20,6 @@ start: start --tracer=firehose --store-dir="{node-data-dir}" - --block-rate=1200 + --block-rate=120 --genesis-height=0 --genesis-block-burst=1000 diff --git a/node-manager/operator/http_server.go b/node-manager/operator/http_server.go index f06f9f9..376126f 100644 --- a/node-manager/operator/http_server.go +++ b/node-manager/operator/http_server.go @@ -148,8 +148,8 @@ func (o *Operator) listBackupsHandler(w http.ResponseWriter, r *http.Request) { o.triggerWebCommand("list", params, w, r) } -func getRequestParams(r *http.Request, terms ...string) map[string]string { - params := make(map[string]string) +func getRequestParams(r *http.Request, terms ...string) map[string]any { + params := make(map[string]any) for _, p := range terms { val := r.FormValue(p) if val != "" { @@ -168,18 +168,33 @@ func (o *Operator) maintenanceHandler(w http.ResponseWriter, r *http.Request) { } func (o *Operator) resumeHandler(w http.ResponseWriter, r *http.Request) { - params := map[string]string{ + params := map[string]any{ "debug-firehose-logs": r.FormValue("debug-firehose-logs"), } + env := map[string]string{} + for _, rawValue := range r.Form["extra-env"] { + key, value, found := strings.Cut(rawValue, "=") + if !found { + http.Error(w, "invalid extra-env format, must be key=value", http.StatusBadRequest) + return + } + + env[key] = value + } + if params["debug-firehose-logs"] == "" { params["debug-firehose-logs"] = "false" } + if len(env) > 0 { + params["extra-env"] = env + } + o.triggerWebCommand("resume", params, w, r) } -func (o *Operator) triggerWebCommand(cmdName string, params map[string]string, w http.ResponseWriter, r *http.Request) { +func (o *Operator) triggerWebCommand(cmdName string, params map[string]any, w http.ResponseWriter, r *http.Request) { c := &Command{cmd: cmdName, logger: o.zlogger} c.params = params sync := r.FormValue("sync") diff --git a/node-manager/operator/operator.go b/node-manager/operator/operator.go index 1667a99..d20ffe1 100644 --- a/node-manager/operator/operator.go +++ b/node-manager/operator/operator.go @@ -59,7 +59,7 @@ type Options struct { type Command struct { cmd string - params map[string]string + params map[string]any returnch chan error closer sync.Once logger *zap.Logger @@ -71,6 +71,16 @@ func (c *Command) MarshalLogObject(encoder zapcore.ObjectEncoder) error { return nil } +func GetCommandParamOr[T any](c *Command, key string, defaultValue T) T { + if value, found := c.params[key]; found { + if v, ok := value.(T); ok { + return v + } + } + + return defaultValue +} + func New(zlogger *zap.Logger, chainSuperviser nodeManager.ChainSuperviser, chainReadiness nodeManager.Readiness, options *Options) (*Operator, error) { zlogger.Info("creating operator", zap.Reflect("options", options)) @@ -209,7 +219,7 @@ func (o *Operator) runCommand(cmd *Command) error { o.zlogger.Info("successfully put in maintenance") case "restore": - restoreMod, err := selectRestoreModule(o.backupModules, cmd.params["name"]) + restoreMod, err := selectRestoreModule(o.backupModules, GetCommandParamOr(cmd, "name", "")) if err != nil { cmd.Return(err) return nil @@ -222,12 +232,7 @@ func (o *Operator) runCommand(cmd *Command) error { } } - backupName := "latest" - if b, ok := cmd.params["backupName"]; ok { - backupName = b - } - - if err := restoreMod.Restore(backupName); err != nil { + if err := restoreMod.Restore(GetCommandParamOr(cmd, "backupName", "latest")); err != nil { return err } @@ -238,7 +243,7 @@ func (o *Operator) runCommand(cmd *Command) error { return nil case "backup": - backupMod, err := selectBackupModule(o.backupModules, cmd.params["name"]) + backupMod, err := selectBackupModule(o.backupModules, GetCommandParamOr(cmd, "name", "")) if err != nil { cmd.Return(err) return nil @@ -379,6 +384,12 @@ func (o *Operator) runCommand(cmd *Command) error { } } + if value, found := cmd.params["extra-env"]; found { + if env, ok := value.(map[string]string); ok && len(env) > 0 { + options = append(options, nodeManager.ExtraEnvOption(env)) + } + } + if err := o.Superviser.Start(options...); err != nil { return fmt.Errorf("error starting chain superviser: %w", err) } @@ -419,7 +430,7 @@ func (o *Operator) LaunchBackupSchedules() { } } - cmdParams := map[string]string{"name": sched.BackuperName} + cmdParams := map[string]any{"name": sched.BackuperName} if sched.TimeBetweenRuns > time.Second { //loose validation of not-zero (I've seen issues with .IsZero()) o.zlogger.Info("starting time-based schedule for backup", @@ -438,7 +449,7 @@ func (o *Operator) LaunchBackupSchedules() { } } -func (o *Operator) RunEveryPeriod(period time.Duration, commandName string, params map[string]string) { +func (o *Operator) RunEveryPeriod(period time.Duration, commandName string, params map[string]any) { for { time.Sleep(100 * time.Microsecond) @@ -456,7 +467,7 @@ func (o *Operator) RunEveryPeriod(period time.Duration, commandName string, para } } -func (o *Operator) RunEveryXBlock(freq uint32, commandName string, params map[string]string) { +func (o *Operator) RunEveryXBlock(freq uint32, commandName string, params map[string]any) { var lastHeadReference uint64 for { time.Sleep(1 * time.Second) diff --git a/node-manager/superviser.go b/node-manager/superviser.go index 3b2aaeb..4791e92 100644 --- a/node-manager/superviser.go +++ b/node-manager/superviser.go @@ -20,10 +20,33 @@ import ( logplugin "github.com/streamingfast/firehose-core/node-manager/log_plugin" ) -type StartOption string +type StartOptions struct { + EnableDebugDeepmind bool + ExtraEnv map[string]string +} + +type StartOption interface { + Apply(opts *StartOptions) +} -var EnableDebugDeepmindOption = StartOption("enable-debug-firehose-logs") -var DisableDebugDeepmindOption = StartOption("disable-debug-firehose-logs") +type startOptionFunc func(opts *StartOptions) + +func (f startOptionFunc) Apply(opts *StartOptions) { + f(opts) +} + +var EnableDebugDeepmindOption = startOptionFunc(func(opts *StartOptions) { + opts.EnableDebugDeepmind = true +}) +var DisableDebugDeepmindOption = startOptionFunc(func(opts *StartOptions) { + opts.EnableDebugDeepmind = false +}) + +type ExtraEnvOption map[string]string + +func (f ExtraEnvOption) Apply(opts *StartOptions) { + opts.ExtraEnv = map[string]string(f) +} type ShutterInterface interface { Shutdown(error) diff --git a/node-manager/superviser/superviser.go b/node-manager/superviser/superviser.go index ad433ed..acf596a 100644 --- a/node-manager/superviser/superviser.go +++ b/node-manager/superviser/superviser.go @@ -16,6 +16,7 @@ package superviser import ( "fmt" + "os" "strings" "sync" "time" @@ -151,13 +152,9 @@ func (s *Superviser) LastSeenBlockNum() uint64 { } func (s *Superviser) Start(options ...nodeManager.StartOption) error { + var startOptions nodeManager.StartOptions for _, opt := range options { - if opt == nodeManager.EnableDebugDeepmindOption { - s.setDeepMindDebug(true) - } - if opt == nodeManager.DisableDebugDeepmindOption { - s.setDeepMindDebug(false) - } + opt.Apply(&startOptions) } for _, plugin := range s.logPlugins { @@ -179,19 +176,51 @@ func (s *Superviser) Start(options ...nodeManager.StartOption) error { } } - s.Logger.Info("creating new command instance and launch read loop", zap.String("binary", s.Binary), zap.Strings("arguments", s.Arguments)) - var args []interface{} - for _, a := range s.Arguments { - args = append(args, a) + s.Logger.Info("creating new command instance and launch read loop", + zap.String("binary", s.Binary), + zap.Strings("arguments", s.Arguments), + zap.Any("env", "")) + + env := s.Env + envToLog := []string{fmt.Sprintf("={%d vars}", len(os.Environ()))} + if len(env) > 0 { + envToLog = env + } + + if env == nil && len(startOptions.ExtraEnv) >= 1 { + // If there is extra env to add and the s.Env is nil, we need to inherit from the parent process + // otherwise, we would start with an empty env and have the extra env only. + env = os.Environ() + } + + for k, v := range startOptions.ExtraEnv { + entry := fmt.Sprintf("%s=%s", k, v) + + env = append(env, entry) + envToLog = append(envToLog, entry) } - s.cmd = overseer.NewCmd(s.Binary, s.Arguments, overseer.Options{Streaming: true, Env: s.Env}) + s.Logger.Info("creating new command instance and launch read loop", + zap.String("binary", s.Binary), + zap.Strings("arguments", s.Arguments), + zap.Any("env", explodeToMap(envToLog))) + + s.cmd = overseer.NewCmd(s.Binary, s.Arguments, overseer.Options{Streaming: true, Env: env}) go s.start(s.cmd) return nil } +func explodeToMap(env []string) map[string]string { + out := make(map[string]string, len(env)) + for _, entry := range env { + left, right, _ := strings.Cut(entry, "=") + out[left] = right + } + return out +} + func (s *Superviser) Stop() error { s.cmdLock.Lock() defer s.cmdLock.Unlock() diff --git a/reader_node_bootstrap.go b/reader_node_bootstrap.go index f7136ae..740a68d 100644 --- a/reader_node_bootstrap.go +++ b/reader_node_bootstrap.go @@ -31,9 +31,12 @@ func noOpReaderNodeBootstrapperFactory(ctx context.Context, logger *zap.Logger, func DefaultReaderNodeBootstrapDataURLFlagDescription() string { return cli.Dedent(` - When specified, if the reader node is emtpy (e.g. that 'reader-node-data-dir' location doesn't exist - or has no file within it), the 'reader-node' is going to be boostrapped from it. The exact bootstrapping - behavior depends on the URL received. + When specified, if the reader node is empty (e.g. that 'reader-node-data-dir' location doesn't exist + or has no file within it), the 'reader-node' is going to instantiate a bootstraper based on the URL + provided and will execute node enabling you to restore from a backup or run a script before starting + an empty node, maybe to fetch the initial state from a remote source. + + The exact bootstrapping behavior depends on the URL received. If the bootstrap URL is of the form 'bash:///?', the bash script at '' will be executed. The script is going to receive in environment variables the resolved @@ -50,10 +53,10 @@ func DefaultReaderNodeBootstrapDataURLFlagDescription() string { If the bootstrap URL ends with 'tar.zst' or 'tar.zstd', the archive is read and extracted into the 'reader-node-data-dir' location. The archive is expected to contain the full content of the 'reader-node-data-dir' and is expanded as is. - `) + "\n" + `) } -// DefaultReaderNodeBootstrapper is a constrtuction you can when you want the default bootstrapper logic to be applied +// DefaultReaderNodeBootstrapper is a construction you can when you want the default bootstrapper logic to be applied // but you need support new bootstrap data URL(s) format or override the default behavior for some type. // // The `overrideFactory` argument is a factory function that will be called first, if it returns a non-nil bootstrapper,