Skip to content

Commit

Permalink
Added possibility to override env on the fly when using node-manager …
Browse files Browse the repository at this point in the history
…HTTP interface (#65)

* Added possibility to override env on the fly when using node-manager HTTP interface

Use `curl -XPOST "http://localhost:10011/v1/resume?sync=true&extra-env=VALUE=one"` on resume.
  • Loading branch information
maoueh authored Oct 2, 2024
1 parent 0b386ef commit 3a03037
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 40 deletions.
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ Operators, you should copy/paste content of this content straight to your projec

If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary.

## Unreleased

* [Operator] Node Manager HTTP `/v1/resume` call now accepts `extra-env=<key>=<value>&extra-env=<keyN>=<valueN>` enabling to override environment variables for the next restart **only**. Use `curl -XPOST "http://localhost:10011/v1/resume?sync=true&extra-env=NODE_DEBUG=true"` (change `localhost:10011` accordingly to your setup).

> This is **not** persistent upon restart!
## v1.6.4

### Substreams fixes
Expand All @@ -19,7 +25,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s
### Substreams fixes

* Fix "cannot resolve 'old cursor' from files in passthrough mode" error on some requests with an old cursor
* Fix handling of 'special case' substreams module with only "params" as its input: should not skip this execution (used in graph-node for head tracking)
* Fix handling of 'special case' substreams module with only "params" as its input: should not skip this execution (used in graph-node for head tracking)
-> empty files in module cache with hash `d3b1920483180cbcd2fd10abcabbee431146f4c8` should be deleted for consistency
* Fix bug where some invalid cursors may be sent (with 'LIB' being above the block being sent) and add safeguard/loggin if the bug appears again
* Fix panic in the whole tier2 process when stores go above the size limit while being read from "kvops" cached changes
Expand Down Expand Up @@ -66,7 +72,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s
* Substreams: add `--common-tmp-dir` flag and activate local caching of pre-compiled WASM modules through wazero feature
* Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero firstStreamableBlock. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module.
* Substreams: add `--substreams-block-execution-timeout` flag (default 3 minutes) to prevent requests stalling
* * Metering update: more detailed metering with addition of new metrics (`live_uncompressed_read_bytes`, `live_uncompressed_read_forked_bytes`, `file_uncompressed_read_bytes`, `file_uncompressed_read_forked_bytes`, `file_compressed_read_forked_bytes`, `file_compressed_read_bytes`). *DEPRECATION WARNING*: `bytes_read` and `bytes_written` metrics will be removed in the future, please use the new metrics for metering instead.
* Metering update: more detailed metering with addition of new metrics (`live_uncompressed_read_bytes`, `live_uncompressed_read_forked_bytes`, `file_uncompressed_read_bytes`, `file_uncompressed_read_forked_bytes`, `file_compressed_read_forked_bytes`, `file_compressed_read_bytes`). *DEPRECATION WARNING*: `bytes_read` and `bytes_written` metrics will be removed in the future, please use the new metrics for metering instead.

## v1.5.7

Expand Down
8 changes: 6 additions & 2 deletions consolereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ func (r *ConsoleReader) readInit(line string) error {
switch r.readerProtocolVersion {
// Implementation of RPC poller were set to use 1.0 so we keep support for it for now
case "1.0", "3.0":
r.logger.Info("console reader protocol version set", zap.String("version", r.readerProtocolVersion))

// Supported
default:
return fmt.Errorf("major version of Firehose exchange protocol is unsupported (expected: one of [1.0, 3.0], found %s), you are most probably running an incompatible version of the Firehose aware node client/node poller", r.readerProtocolVersion)
}
Expand All @@ -200,6 +199,11 @@ func (r *ConsoleReader) readInit(line string) error {

r.setProtoMessageType(protobufFullyQualifiedName)

r.logger.Info("console reader protocol version init",
zap.String("version", r.readerProtocolVersion),
zap.String("protobuf_fully_qualified_name", protobufFullyQualifiedName),
)

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion devel/standard/standard.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ start:
start
--tracer=firehose
--store-dir="{node-data-dir}"
--block-rate=1200
--block-rate=120
--genesis-height=0
--genesis-block-burst=1000
23 changes: 19 additions & 4 deletions node-manager/operator/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func (o *Operator) listBackupsHandler(w http.ResponseWriter, r *http.Request) {
o.triggerWebCommand("list", params, w, r)
}

func getRequestParams(r *http.Request, terms ...string) map[string]string {
params := make(map[string]string)
func getRequestParams(r *http.Request, terms ...string) map[string]any {
params := make(map[string]any)
for _, p := range terms {
val := r.FormValue(p)
if val != "" {
Expand All @@ -168,18 +168,33 @@ func (o *Operator) maintenanceHandler(w http.ResponseWriter, r *http.Request) {
}

func (o *Operator) resumeHandler(w http.ResponseWriter, r *http.Request) {
params := map[string]string{
params := map[string]any{
"debug-firehose-logs": r.FormValue("debug-firehose-logs"),
}

env := map[string]string{}
for _, rawValue := range r.Form["extra-env"] {
key, value, found := strings.Cut(rawValue, "=")
if !found {
http.Error(w, "invalid extra-env format, must be key=value", http.StatusBadRequest)
return
}

env[key] = value
}

if params["debug-firehose-logs"] == "" {
params["debug-firehose-logs"] = "false"
}

if len(env) > 0 {
params["extra-env"] = env
}

o.triggerWebCommand("resume", params, w, r)
}

func (o *Operator) triggerWebCommand(cmdName string, params map[string]string, w http.ResponseWriter, r *http.Request) {
func (o *Operator) triggerWebCommand(cmdName string, params map[string]any, w http.ResponseWriter, r *http.Request) {
c := &Command{cmd: cmdName, logger: o.zlogger}
c.params = params
sync := r.FormValue("sync")
Expand Down
35 changes: 23 additions & 12 deletions node-manager/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Options struct {

type Command struct {
cmd string
params map[string]string
params map[string]any
returnch chan error
closer sync.Once
logger *zap.Logger
Expand All @@ -71,6 +71,16 @@ func (c *Command) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
return nil
}

func GetCommandParamOr[T any](c *Command, key string, defaultValue T) T {
if value, found := c.params[key]; found {
if v, ok := value.(T); ok {
return v
}
}

return defaultValue
}

func New(zlogger *zap.Logger, chainSuperviser nodeManager.ChainSuperviser, chainReadiness nodeManager.Readiness, options *Options) (*Operator, error) {
zlogger.Info("creating operator", zap.Reflect("options", options))

Expand Down Expand Up @@ -209,7 +219,7 @@ func (o *Operator) runCommand(cmd *Command) error {
o.zlogger.Info("successfully put in maintenance")

case "restore":
restoreMod, err := selectRestoreModule(o.backupModules, cmd.params["name"])
restoreMod, err := selectRestoreModule(o.backupModules, GetCommandParamOr(cmd, "name", ""))
if err != nil {
cmd.Return(err)
return nil
Expand All @@ -222,12 +232,7 @@ func (o *Operator) runCommand(cmd *Command) error {
}
}

backupName := "latest"
if b, ok := cmd.params["backupName"]; ok {
backupName = b
}

if err := restoreMod.Restore(backupName); err != nil {
if err := restoreMod.Restore(GetCommandParamOr(cmd, "backupName", "latest")); err != nil {
return err
}

Expand All @@ -238,7 +243,7 @@ func (o *Operator) runCommand(cmd *Command) error {
return nil

case "backup":
backupMod, err := selectBackupModule(o.backupModules, cmd.params["name"])
backupMod, err := selectBackupModule(o.backupModules, GetCommandParamOr(cmd, "name", ""))
if err != nil {
cmd.Return(err)
return nil
Expand Down Expand Up @@ -379,6 +384,12 @@ func (o *Operator) runCommand(cmd *Command) error {
}
}

if value, found := cmd.params["extra-env"]; found {
if env, ok := value.(map[string]string); ok && len(env) > 0 {
options = append(options, nodeManager.ExtraEnvOption(env))
}
}

if err := o.Superviser.Start(options...); err != nil {
return fmt.Errorf("error starting chain superviser: %w", err)
}
Expand Down Expand Up @@ -419,7 +430,7 @@ func (o *Operator) LaunchBackupSchedules() {
}
}

cmdParams := map[string]string{"name": sched.BackuperName}
cmdParams := map[string]any{"name": sched.BackuperName}

if sched.TimeBetweenRuns > time.Second { //loose validation of not-zero (I've seen issues with .IsZero())
o.zlogger.Info("starting time-based schedule for backup",
Expand All @@ -438,7 +449,7 @@ func (o *Operator) LaunchBackupSchedules() {
}
}

func (o *Operator) RunEveryPeriod(period time.Duration, commandName string, params map[string]string) {
func (o *Operator) RunEveryPeriod(period time.Duration, commandName string, params map[string]any) {
for {
time.Sleep(100 * time.Microsecond)

Expand All @@ -456,7 +467,7 @@ func (o *Operator) RunEveryPeriod(period time.Duration, commandName string, para
}
}

func (o *Operator) RunEveryXBlock(freq uint32, commandName string, params map[string]string) {
func (o *Operator) RunEveryXBlock(freq uint32, commandName string, params map[string]any) {
var lastHeadReference uint64
for {
time.Sleep(1 * time.Second)
Expand Down
29 changes: 26 additions & 3 deletions node-manager/superviser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,33 @@ import (
logplugin "github.com/streamingfast/firehose-core/node-manager/log_plugin"
)

type StartOption string
type StartOptions struct {
EnableDebugDeepmind bool
ExtraEnv map[string]string
}

type StartOption interface {
Apply(opts *StartOptions)
}

var EnableDebugDeepmindOption = StartOption("enable-debug-firehose-logs")
var DisableDebugDeepmindOption = StartOption("disable-debug-firehose-logs")
type startOptionFunc func(opts *StartOptions)

func (f startOptionFunc) Apply(opts *StartOptions) {
f(opts)
}

var EnableDebugDeepmindOption = startOptionFunc(func(opts *StartOptions) {
opts.EnableDebugDeepmind = true
})
var DisableDebugDeepmindOption = startOptionFunc(func(opts *StartOptions) {
opts.EnableDebugDeepmind = false
})

type ExtraEnvOption map[string]string

func (f ExtraEnvOption) Apply(opts *StartOptions) {
opts.ExtraEnv = map[string]string(f)
}

type ShutterInterface interface {
Shutdown(error)
Expand Down
51 changes: 40 additions & 11 deletions node-manager/superviser/superviser.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package superviser

import (
"fmt"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -151,13 +152,9 @@ func (s *Superviser) LastSeenBlockNum() uint64 {
}

func (s *Superviser) Start(options ...nodeManager.StartOption) error {
var startOptions nodeManager.StartOptions
for _, opt := range options {
if opt == nodeManager.EnableDebugDeepmindOption {
s.setDeepMindDebug(true)
}
if opt == nodeManager.DisableDebugDeepmindOption {
s.setDeepMindDebug(false)
}
opt.Apply(&startOptions)
}

for _, plugin := range s.logPlugins {
Expand All @@ -179,19 +176,51 @@ func (s *Superviser) Start(options ...nodeManager.StartOption) error {
}
}

s.Logger.Info("creating new command instance and launch read loop", zap.String("binary", s.Binary), zap.Strings("arguments", s.Arguments))
var args []interface{}
for _, a := range s.Arguments {
args = append(args, a)
s.Logger.Info("creating new command instance and launch read loop",
zap.String("binary", s.Binary),
zap.Strings("arguments", s.Arguments),
zap.Any("env", ""))

env := s.Env
envToLog := []string{fmt.Sprintf("<inherited from process>={%d vars}", len(os.Environ()))}
if len(env) > 0 {
envToLog = env
}

if env == nil && len(startOptions.ExtraEnv) >= 1 {
// If there is extra env to add and the s.Env is nil, we need to inherit from the parent process
// otherwise, we would start with an empty env and have the extra env only.
env = os.Environ()
}

for k, v := range startOptions.ExtraEnv {
entry := fmt.Sprintf("%s=%s", k, v)

env = append(env, entry)
envToLog = append(envToLog, entry)
}

s.cmd = overseer.NewCmd(s.Binary, s.Arguments, overseer.Options{Streaming: true, Env: s.Env})
s.Logger.Info("creating new command instance and launch read loop",
zap.String("binary", s.Binary),
zap.Strings("arguments", s.Arguments),
zap.Any("env", explodeToMap(envToLog)))

s.cmd = overseer.NewCmd(s.Binary, s.Arguments, overseer.Options{Streaming: true, Env: env})

go s.start(s.cmd)

return nil
}

func explodeToMap(env []string) map[string]string {
out := make(map[string]string, len(env))
for _, entry := range env {
left, right, _ := strings.Cut(entry, "=")
out[left] = right
}
return out
}

func (s *Superviser) Stop() error {
s.cmdLock.Lock()
defer s.cmdLock.Unlock()
Expand Down
13 changes: 8 additions & 5 deletions reader_node_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ func noOpReaderNodeBootstrapperFactory(ctx context.Context, logger *zap.Logger,

func DefaultReaderNodeBootstrapDataURLFlagDescription() string {
return cli.Dedent(`
When specified, if the reader node is emtpy (e.g. that 'reader-node-data-dir' location doesn't exist
or has no file within it), the 'reader-node' is going to be boostrapped from it. The exact bootstrapping
behavior depends on the URL received.
When specified, if the reader node is empty (e.g. that 'reader-node-data-dir' location doesn't exist
or has no file within it), the 'reader-node' is going to instantiate a bootstraper based on the URL
provided and will execute node enabling you to restore from a backup or run a script before starting
an empty node, maybe to fetch the initial state from a remote source.
The exact bootstrapping behavior depends on the URL received.
If the bootstrap URL is of the form 'bash:///<path/to/script>?<parameters>', the bash script at
'<path/to/script>' will be executed. The script is going to receive in environment variables the resolved
Expand All @@ -50,10 +53,10 @@ func DefaultReaderNodeBootstrapDataURLFlagDescription() string {
If the bootstrap URL ends with 'tar.zst' or 'tar.zstd', the archive is read and extracted into the
'reader-node-data-dir' location. The archive is expected to contain the full content of the 'reader-node-data-dir'
and is expanded as is.
`) + "\n"
`)
}

// DefaultReaderNodeBootstrapper is a constrtuction you can when you want the default bootstrapper logic to be applied
// DefaultReaderNodeBootstrapper is a construction you can when you want the default bootstrapper logic to be applied
// but you need support new bootstrap data URL(s) format or override the default behavior for some type.
//
// The `overrideFactory` argument is a factory function that will be called first, if it returns a non-nil bootstrapper,
Expand Down

0 comments on commit 3a03037

Please sign in to comment.