Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added possibility to override env on the fly when using node-manager HTTP interface #65

Merged
merged 3 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ Operators, you should copy/paste content of this content straight to your projec

If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary.

## Unreleased

* [Operator] Node Manager HTTP `/v1/resume` call now accepts `extra-env=<key>=<value>&extra-env=<keyN>=<valueN>` enabling to override environment variables for the next restart **only**. Use `curl -XPOST "http://localhost:10011/v1/resume?sync=true&extra-env=NODE_DEBUG=true"` (change `localhost:10011` accordingly to your setup).

> This is **not** persistent upon restart!

## v1.6.4

### Substreams fixes
Expand All @@ -19,7 +25,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s
### Substreams fixes

* Fix "cannot resolve 'old cursor' from files in passthrough mode" error on some requests with an old cursor
* Fix handling of 'special case' substreams module with only "params" as its input: should not skip this execution (used in graph-node for head tracking)
* Fix handling of 'special case' substreams module with only "params" as its input: should not skip this execution (used in graph-node for head tracking)
-> empty files in module cache with hash `d3b1920483180cbcd2fd10abcabbee431146f4c8` should be deleted for consistency
* Fix bug where some invalid cursors may be sent (with 'LIB' being above the block being sent) and add safeguard/loggin if the bug appears again
* Fix panic in the whole tier2 process when stores go above the size limit while being read from "kvops" cached changes
Expand Down Expand Up @@ -66,7 +72,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s
* Substreams: add `--common-tmp-dir` flag and activate local caching of pre-compiled WASM modules through wazero feature
* Substreams: revert module hash calculation from `v1.5.5`, when using a non-zero firstStreamableBlock. Hashes will now be the same even if the chain's first streamable block affects the initialBlock of a module.
* Substreams: add `--substreams-block-execution-timeout` flag (default 3 minutes) to prevent requests stalling
* * Metering update: more detailed metering with addition of new metrics (`live_uncompressed_read_bytes`, `live_uncompressed_read_forked_bytes`, `file_uncompressed_read_bytes`, `file_uncompressed_read_forked_bytes`, `file_compressed_read_forked_bytes`, `file_compressed_read_bytes`). *DEPRECATION WARNING*: `bytes_read` and `bytes_written` metrics will be removed in the future, please use the new metrics for metering instead.
* Metering update: more detailed metering with addition of new metrics (`live_uncompressed_read_bytes`, `live_uncompressed_read_forked_bytes`, `file_uncompressed_read_bytes`, `file_uncompressed_read_forked_bytes`, `file_compressed_read_forked_bytes`, `file_compressed_read_bytes`). *DEPRECATION WARNING*: `bytes_read` and `bytes_written` metrics will be removed in the future, please use the new metrics for metering instead.

## v1.5.7

Expand Down
8 changes: 6 additions & 2 deletions consolereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ func (r *ConsoleReader) readInit(line string) error {
switch r.readerProtocolVersion {
// Implementation of RPC poller were set to use 1.0 so we keep support for it for now
case "1.0", "3.0":
r.logger.Info("console reader protocol version set", zap.String("version", r.readerProtocolVersion))

// Supported
default:
return fmt.Errorf("major version of Firehose exchange protocol is unsupported (expected: one of [1.0, 3.0], found %s), you are most probably running an incompatible version of the Firehose aware node client/node poller", r.readerProtocolVersion)
}
Expand All @@ -200,6 +199,11 @@ func (r *ConsoleReader) readInit(line string) error {

r.setProtoMessageType(protobufFullyQualifiedName)

r.logger.Info("console reader protocol version init",
zap.String("version", r.readerProtocolVersion),
zap.String("protobuf_fully_qualified_name", protobufFullyQualifiedName),
)

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion devel/standard/standard.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ start:
start
--tracer=firehose
--store-dir="{node-data-dir}"
--block-rate=1200
--block-rate=120
--genesis-height=0
--genesis-block-burst=1000
23 changes: 19 additions & 4 deletions node-manager/operator/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func (o *Operator) listBackupsHandler(w http.ResponseWriter, r *http.Request) {
o.triggerWebCommand("list", params, w, r)
}

func getRequestParams(r *http.Request, terms ...string) map[string]string {
params := make(map[string]string)
func getRequestParams(r *http.Request, terms ...string) map[string]any {
params := make(map[string]any)
for _, p := range terms {
val := r.FormValue(p)
if val != "" {
Expand All @@ -168,18 +168,33 @@ func (o *Operator) maintenanceHandler(w http.ResponseWriter, r *http.Request) {
}

func (o *Operator) resumeHandler(w http.ResponseWriter, r *http.Request) {
params := map[string]string{
params := map[string]any{
"debug-firehose-logs": r.FormValue("debug-firehose-logs"),
}

env := map[string]string{}
for _, rawValue := range r.Form["extra-env"] {
key, value, found := strings.Cut(rawValue, "=")
if !found {
http.Error(w, "invalid extra-env format, must be key=value", http.StatusBadRequest)
return
}

env[key] = value
}

if params["debug-firehose-logs"] == "" {
params["debug-firehose-logs"] = "false"
}

if len(env) > 0 {
params["extra-env"] = env
}

o.triggerWebCommand("resume", params, w, r)
}

func (o *Operator) triggerWebCommand(cmdName string, params map[string]string, w http.ResponseWriter, r *http.Request) {
func (o *Operator) triggerWebCommand(cmdName string, params map[string]any, w http.ResponseWriter, r *http.Request) {
c := &Command{cmd: cmdName, logger: o.zlogger}
c.params = params
sync := r.FormValue("sync")
Expand Down
35 changes: 23 additions & 12 deletions node-manager/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Options struct {

type Command struct {
cmd string
params map[string]string
params map[string]any
returnch chan error
closer sync.Once
logger *zap.Logger
Expand All @@ -71,6 +71,16 @@ func (c *Command) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
return nil
}

func GetCommandParamOr[T any](c *Command, key string, defaultValue T) T {
if value, found := c.params[key]; found {
if v, ok := value.(T); ok {
return v
}
}

return defaultValue
}

func New(zlogger *zap.Logger, chainSuperviser nodeManager.ChainSuperviser, chainReadiness nodeManager.Readiness, options *Options) (*Operator, error) {
zlogger.Info("creating operator", zap.Reflect("options", options))

Expand Down Expand Up @@ -209,7 +219,7 @@ func (o *Operator) runCommand(cmd *Command) error {
o.zlogger.Info("successfully put in maintenance")

case "restore":
restoreMod, err := selectRestoreModule(o.backupModules, cmd.params["name"])
restoreMod, err := selectRestoreModule(o.backupModules, GetCommandParamOr(cmd, "name", ""))
if err != nil {
cmd.Return(err)
return nil
Expand All @@ -222,12 +232,7 @@ func (o *Operator) runCommand(cmd *Command) error {
}
}

backupName := "latest"
if b, ok := cmd.params["backupName"]; ok {
backupName = b
}

if err := restoreMod.Restore(backupName); err != nil {
if err := restoreMod.Restore(GetCommandParamOr(cmd, "backupName", "latest")); err != nil {
return err
}

Expand All @@ -238,7 +243,7 @@ func (o *Operator) runCommand(cmd *Command) error {
return nil

case "backup":
backupMod, err := selectBackupModule(o.backupModules, cmd.params["name"])
backupMod, err := selectBackupModule(o.backupModules, GetCommandParamOr(cmd, "name", ""))
if err != nil {
cmd.Return(err)
return nil
Expand Down Expand Up @@ -379,6 +384,12 @@ func (o *Operator) runCommand(cmd *Command) error {
}
}

if value, found := cmd.params["extra-env"]; found {
if env, ok := value.(map[string]string); ok && len(env) > 0 {
options = append(options, nodeManager.ExtraEnvOption(env))
}
}

if err := o.Superviser.Start(options...); err != nil {
return fmt.Errorf("error starting chain superviser: %w", err)
}
Expand Down Expand Up @@ -419,7 +430,7 @@ func (o *Operator) LaunchBackupSchedules() {
}
}

cmdParams := map[string]string{"name": sched.BackuperName}
cmdParams := map[string]any{"name": sched.BackuperName}

if sched.TimeBetweenRuns > time.Second { //loose validation of not-zero (I've seen issues with .IsZero())
o.zlogger.Info("starting time-based schedule for backup",
Expand All @@ -438,7 +449,7 @@ func (o *Operator) LaunchBackupSchedules() {
}
}

func (o *Operator) RunEveryPeriod(period time.Duration, commandName string, params map[string]string) {
func (o *Operator) RunEveryPeriod(period time.Duration, commandName string, params map[string]any) {
for {
time.Sleep(100 * time.Microsecond)

Expand All @@ -456,7 +467,7 @@ func (o *Operator) RunEveryPeriod(period time.Duration, commandName string, para
}
}

func (o *Operator) RunEveryXBlock(freq uint32, commandName string, params map[string]string) {
func (o *Operator) RunEveryXBlock(freq uint32, commandName string, params map[string]any) {
var lastHeadReference uint64
for {
time.Sleep(1 * time.Second)
Expand Down
29 changes: 26 additions & 3 deletions node-manager/superviser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,33 @@ import (
logplugin "github.com/streamingfast/firehose-core/node-manager/log_plugin"
)

type StartOption string
type StartOptions struct {
EnableDebugDeepmind bool
ExtraEnv map[string]string
}

type StartOption interface {
Apply(opts *StartOptions)
}

var EnableDebugDeepmindOption = StartOption("enable-debug-firehose-logs")
var DisableDebugDeepmindOption = StartOption("disable-debug-firehose-logs")
type startOptionFunc func(opts *StartOptions)

func (f startOptionFunc) Apply(opts *StartOptions) {
f(opts)
}

var EnableDebugDeepmindOption = startOptionFunc(func(opts *StartOptions) {
opts.EnableDebugDeepmind = true
})
var DisableDebugDeepmindOption = startOptionFunc(func(opts *StartOptions) {
opts.EnableDebugDeepmind = false
})

type ExtraEnvOption map[string]string

func (f ExtraEnvOption) Apply(opts *StartOptions) {
opts.ExtraEnv = map[string]string(f)
}

type ShutterInterface interface {
Shutdown(error)
Expand Down
51 changes: 40 additions & 11 deletions node-manager/superviser/superviser.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package superviser

import (
"fmt"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -151,13 +152,9 @@ func (s *Superviser) LastSeenBlockNum() uint64 {
}

func (s *Superviser) Start(options ...nodeManager.StartOption) error {
var startOptions nodeManager.StartOptions
for _, opt := range options {
if opt == nodeManager.EnableDebugDeepmindOption {
s.setDeepMindDebug(true)
}
if opt == nodeManager.DisableDebugDeepmindOption {
s.setDeepMindDebug(false)
}
opt.Apply(&startOptions)
}

for _, plugin := range s.logPlugins {
Expand All @@ -179,19 +176,51 @@ func (s *Superviser) Start(options ...nodeManager.StartOption) error {
}
}

s.Logger.Info("creating new command instance and launch read loop", zap.String("binary", s.Binary), zap.Strings("arguments", s.Arguments))
var args []interface{}
for _, a := range s.Arguments {
args = append(args, a)
s.Logger.Info("creating new command instance and launch read loop",
zap.String("binary", s.Binary),
zap.Strings("arguments", s.Arguments),
zap.Any("env", ""))

env := s.Env
envToLog := []string{fmt.Sprintf("<inherited from process>={%d vars}", len(os.Environ()))}
if len(env) > 0 {
envToLog = env
}

if env == nil && len(startOptions.ExtraEnv) >= 1 {
// If there is extra env to add and the s.Env is nil, we need to inherit from the parent process
// otherwise, we would start with an empty env and have the extra env only.
env = os.Environ()
}

for k, v := range startOptions.ExtraEnv {
entry := fmt.Sprintf("%s=%s", k, v)

env = append(env, entry)
envToLog = append(envToLog, entry)
}

s.cmd = overseer.NewCmd(s.Binary, s.Arguments, overseer.Options{Streaming: true, Env: s.Env})
s.Logger.Info("creating new command instance and launch read loop",
zap.String("binary", s.Binary),
zap.Strings("arguments", s.Arguments),
zap.Any("env", explodeToMap(envToLog)))

s.cmd = overseer.NewCmd(s.Binary, s.Arguments, overseer.Options{Streaming: true, Env: env})

go s.start(s.cmd)

return nil
}

func explodeToMap(env []string) map[string]string {
out := make(map[string]string, len(env))
for _, entry := range env {
left, right, _ := strings.Cut(entry, "=")
out[left] = right
}
return out
}

func (s *Superviser) Stop() error {
s.cmdLock.Lock()
defer s.cmdLock.Unlock()
Expand Down
13 changes: 8 additions & 5 deletions reader_node_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ func noOpReaderNodeBootstrapperFactory(ctx context.Context, logger *zap.Logger,

func DefaultReaderNodeBootstrapDataURLFlagDescription() string {
return cli.Dedent(`
When specified, if the reader node is emtpy (e.g. that 'reader-node-data-dir' location doesn't exist
or has no file within it), the 'reader-node' is going to be boostrapped from it. The exact bootstrapping
behavior depends on the URL received.
When specified, if the reader node is empty (e.g. that 'reader-node-data-dir' location doesn't exist
or has no file within it), the 'reader-node' is going to instantiate a bootstraper based on the URL
provided and will execute node enabling you to restore from a backup or run a script before starting
an empty node, maybe to fetch the initial state from a remote source.

The exact bootstrapping behavior depends on the URL received.

If the bootstrap URL is of the form 'bash:///<path/to/script>?<parameters>', the bash script at
'<path/to/script>' will be executed. The script is going to receive in environment variables the resolved
Expand All @@ -50,10 +53,10 @@ func DefaultReaderNodeBootstrapDataURLFlagDescription() string {
If the bootstrap URL ends with 'tar.zst' or 'tar.zstd', the archive is read and extracted into the
'reader-node-data-dir' location. The archive is expected to contain the full content of the 'reader-node-data-dir'
and is expanded as is.
`) + "\n"
`)
}

// DefaultReaderNodeBootstrapper is a constrtuction you can when you want the default bootstrapper logic to be applied
// DefaultReaderNodeBootstrapper is a construction you can when you want the default bootstrapper logic to be applied
// but you need support new bootstrap data URL(s) format or override the default behavior for some type.
//
// The `overrideFactory` argument is a factory function that will be called first, if it returns a non-nil bootstrapper,
Expand Down
Loading