From 884347f81a5f2953fb75c78273814cfac75c63d3 Mon Sep 17 00:00:00 2001 From: ldebruijn Date: Mon, 2 Oct 2023 17:21:37 +0200 Subject: [PATCH 1/7] feat(reloading): Add remote reloading for persisted operations --- .gitignore | 3 +- cmd/main.go | 19 +-- cmd/main_test.go | 7 +- .../block_field_suggestions.go | 4 +- .../persisted_operations/dir_loader.go | 4 +- .../gcp_storage_loader.go | 43 ++++-- .../business/persisted_operations/loader.go | 32 ++--- .../persisted_operations/memory_loader.go | 2 +- .../persisted_operations.go | 129 +++++++++++++++--- .../persisted_operations_test.go | 2 +- internal/business/proxy/proxy.go | 5 +- 11 files changed, 187 insertions(+), 63 deletions(-) diff --git a/.gitignore b/.gitignore index 722dd90..4476cfc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ armor.yml operations.json -TODO.md \ No newline at end of file +TODO.md +main \ No newline at end of file diff --git a/cmd/main.go b/cmd/main.go index 776b7da..1689dd9 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -59,9 +59,16 @@ func run(ctx context.Context, log *slog.Logger, cfg *config.Config, shutdown cha return nil } + poLoader, err := persisted_operations.RemoteLoaderFromConfig(cfg.PersistedOperations) + if err != nil { + log.Error("Unable to determine loading strategy for persisted operations", "err", err) + } + + po, _ := persisted_operations.NewPersistedOperations(log, cfg.PersistedOperations, persisted_operations.NewLocalDirLoader(cfg.PersistedOperations), poLoader) + mux := http.NewServeMux() - mid := middleware(log, cfg) + mid := middleware(log, cfg, po) mux.Handle(cfg.Web.Path, mid(Handler(pxy))) api := http.Server{ @@ -91,6 +98,8 @@ func run(ctx context.Context, log *slog.Logger, cfg *config.Config, shutdown cha ctx, cancel := context.WithTimeout(context.Background(), cfg.Web.ShutdownTimeout) defer cancel() + po.Shutdown() + if err := api.Shutdown(ctx); err != nil { _ = api.Close() return fmt.Errorf("could not stop server gracefully: %w", err) @@ -100,14 +109,8 @@ func run(ctx context.Context, log *slog.Logger, cfg *config.Config, shutdown cha return nil } -func middleware(log *slog.Logger, cfg *config.Config) func(next http.Handler) http.Handler { - poLoader, err := persisted_operations.DetermineLoaderFromConfig(cfg.PersistedOperations) - if err != nil { - log.Error("Unable to determine loading strategy for persisted operations", "err", err) - } - +func middleware(log *slog.Logger, cfg *config.Config, po *persisted_operations.PersistedOperationsHandler) func(next http.Handler) http.Handler { rec := middleware2.Recover(log) - po, _ := persisted_operations.NewPersistedOperations(log, cfg.PersistedOperations, poLoader) fn := func(next http.Handler) http.Handler { return rec(po.Execute(next)) diff --git a/cmd/main_test.go b/cmd/main_test.go index 99f2a1b..4998c24 100644 --- a/cmd/main_test.go +++ b/cmd/main_test.go @@ -41,6 +41,7 @@ func TestHttpServerIntegration(t *testing.T) { }(), cfgOverrides: func(cfg *config.Config) *config.Config { cfg.PersistedOperations.Enabled = true + cfg.PersistedOperations.Store = "./" return cfg }, mockResponse: map[string]interface{}{ @@ -85,6 +86,7 @@ func TestHttpServerIntegration(t *testing.T) { }(), cfgOverrides: func(cfg *config.Config) *config.Config { cfg.PersistedOperations.Enabled = true + cfg.PersistedOperations.Store = "./" return cfg }, mockResponse: map[string]interface{}{ @@ -127,6 +129,8 @@ func TestHttpServerIntegration(t *testing.T) { }(), cfgOverrides: func(cfg *config.Config) *config.Config { cfg.PersistedOperations.Enabled = true + cfg.PersistedOperations.Store = "./" + cfg.PersistedOperations.FailUnknownOperations = false return cfg }, mockResponse: map[string]interface{}{ @@ -160,7 +164,8 @@ func TestHttpServerIntegration(t *testing.T) { }, } ex, _ := json.Marshal(expected) - actual, _ := io.ReadAll(response.Body) + actual, err := io.ReadAll(response.Body) + assert.NoError(t, err) // perform string comparisons as map[string]interface seems incomparable assert.Equal(t, string(ex), string(actual)) }, diff --git a/internal/business/block_field_suggestions/block_field_suggestions.go b/internal/business/block_field_suggestions/block_field_suggestions.go index 4f8371a..b23e0bd 100644 --- a/internal/business/block_field_suggestions/block_field_suggestions.go +++ b/internal/business/block_field_suggestions/block_field_suggestions.go @@ -51,13 +51,13 @@ func (b *BlockFieldSuggestionsHandler) processErrors(payload interface{}) interf func (b *BlockFieldSuggestionsHandler) processError(err map[string]interface{}) map[string]interface{} { if msg, ok4 := err["message"]; ok4 { if message, ok := msg.(string); ok { - err["message"] = b.ReplaceSuggestions(message) + err["message"] = b.replaceSuggestions(message) } } return err } -func (b *BlockFieldSuggestionsHandler) ReplaceSuggestions(message string) string { +func (b *BlockFieldSuggestionsHandler) replaceSuggestions(message string) string { if strings.HasPrefix(message, "Did you mean") { return b.cfg.Mask } diff --git a/internal/business/persisted_operations/dir_loader.go b/internal/business/persisted_operations/dir_loader.go index 5ee39ff..6625c48 100644 --- a/internal/business/persisted_operations/dir_loader.go +++ b/internal/business/persisted_operations/dir_loader.go @@ -16,9 +16,9 @@ type DirLoader struct { path string } -func newDirLoader(cfg Config) *DirLoader { +func NewLocalDirLoader(cfg Config) *DirLoader { return &DirLoader{ - path: cfg.Store.Dir, + path: cfg.Store, } } diff --git a/internal/business/persisted_operations/gcp_storage_loader.go b/internal/business/persisted_operations/gcp_storage_loader.go index cb7f51f..2b70a54 100644 --- a/internal/business/persisted_operations/gcp_storage_loader.go +++ b/internal/business/persisted_operations/gcp_storage_loader.go @@ -3,11 +3,12 @@ package persisted_operations import ( "cloud.google.com/go/storage" "context" - "encoding/json" "errors" "fmt" "google.golang.org/api/iterator" "io" + "os" + "time" ) // GcpStorageLoader loads persisted operations from a GCP Storage bucket. @@ -18,7 +19,7 @@ type GcpStorageLoader struct { bucket string } -func NewGcpStorageLoader(ctx context.Context, bucket string) (*GcpStorageLoader, error) { +func NewGcpStorageLoader(ctx context.Context, bucket string, cfg Config) (*GcpStorageLoader, error) { client, err := storage.NewClient(ctx) if err != nil { return nil, err @@ -29,38 +30,52 @@ func NewGcpStorageLoader(ctx context.Context, bucket string) (*GcpStorageLoader, bucket: bucket, }, nil } - -func (g *GcpStorageLoader) Load(ctx context.Context) (map[string]string, error) { - var store map[string]string - +func (g *GcpStorageLoader) Load(ctx context.Context) error { it := g.client.Bucket(g.bucket).Objects(ctx, &storage.Query{ MatchGlob: "*.json", }) + + var errs []error for { attrs, err := it.Next() if errors.Is(err, iterator.Done) { break } if err != nil { - continue + break } - rc, err := g.client.Bucket(g.bucket).Object(attrs.Name).NewReader(ctx) + ctx, cancel := context.WithTimeout(ctx, time.Second*50) + + // include path + f, err := os.Create(attrs.Name) if err != nil { + cancel() + errs = append(errs, fmt.Errorf("os.Create: %w", err)) continue } - data, err := io.ReadAll(rc) + rc, err := g.client.Bucket(g.bucket).Object(attrs.Name).NewReader(ctx) if err != nil { - return nil, fmt.Errorf("ioutil.ReadAll: %w", err) + cancel() + errs = append(errs, fmt.Errorf("Object(%q).NewReader: %w", attrs.Name, err)) + continue } - _ = rc.Close() - err = json.Unmarshal(data, &store) - if err != nil { + if _, err := io.Copy(f, rc); err != nil { + cancel() + errs = append(errs, fmt.Errorf("io.Copy: %w", err)) continue } + + if err = f.Close(); err != nil { + cancel() + errs = append(errs, fmt.Errorf("f.Close: %w", err)) + } + + cancel() + _ = rc.Close() } - return store, nil + return errors.Join(errs...) } diff --git a/internal/business/persisted_operations/loader.go b/internal/business/persisted_operations/loader.go index 82f9a5d..a675710 100644 --- a/internal/business/persisted_operations/loader.go +++ b/internal/business/persisted_operations/loader.go @@ -5,11 +5,19 @@ import ( "errors" ) -var ErrNoLoaderSpecified = errors.New("no loaders specified") +type LocalLoader interface { + Load(ctx context.Context) (map[string]string, error) +} + +type RemoteLoader interface { + Load(ctx context.Context) error +} + +var ErrNoRemoteLoaderSpecified = errors.New("no remote loader specified") -// DetermineLoaderFromConfig looks at the configuration applied and figures out which loader to initialize and return -// If no loader is configured an error is returned -func DetermineLoaderFromConfig(cfg Config) (PersistedOperationsLoader, error) { +// RemoteLoaderFromConfig looks at the configuration applied and figures out which remoteLoader to initialize and return +// If no remoteLoader is configured an error is returned +func RemoteLoaderFromConfig(cfg Config) (RemoteLoader, error) { loader, err := determineLoader(cfg) if err != nil { return loader, err @@ -18,21 +26,13 @@ func DetermineLoaderFromConfig(cfg Config) (PersistedOperationsLoader, error) { } // load loads persisted operations from various sources -func determineLoader(cfg Config) (PersistedOperationsLoader, error) { - if cfg.Store.Dir != "" { - loader := newDirLoader(cfg) - if loader == nil { - return nil, errors.New("unable to instantiate DirLoader") - } - - return loader, nil - } - if cfg.Store.GcpBucket != "" { - loader, err := NewGcpStorageLoader(context.Background(), cfg.Store.GcpBucket) +func determineLoader(cfg Config) (RemoteLoader, error) { + if cfg.Remote.GcpBucket != "" { + loader, err := NewGcpStorageLoader(context.Background(), cfg.Remote.GcpBucket, cfg) if err != nil { return nil, errors.New("unable to instantiate GcpBucketLoader") } return loader, nil } - return newMemoryLoader(map[string]string{}), ErrNoLoaderSpecified + return nil, ErrNoRemoteLoaderSpecified } diff --git a/internal/business/persisted_operations/memory_loader.go b/internal/business/persisted_operations/memory_loader.go index 034defe..9749dfe 100644 --- a/internal/business/persisted_operations/memory_loader.go +++ b/internal/business/persisted_operations/memory_loader.go @@ -4,7 +4,7 @@ import ( "context" ) -// MemoryLoader is a loader for testing purposes +// MemoryLoader is a remoteLoader for testing purposes // It allows the user to specify operations in memory type MemoryLoader struct { store map[string]string diff --git a/internal/business/persisted_operations/persisted_operations.go b/internal/business/persisted_operations/persisted_operations.go index 22add9d..93c85c6 100644 --- a/internal/business/persisted_operations/persisted_operations.go +++ b/internal/business/persisted_operations/persisted_operations.go @@ -8,6 +8,8 @@ import ( "io" "log/slog" "net/http" + "sync" + "time" ) type RequestPayload struct { @@ -29,20 +31,24 @@ type ErrorPayload struct { } `json:"errors"` } -type PersistedOperationsLoader interface { - Load(ctx context.Context) (map[string]string, error) -} - type Config struct { Enabled bool `conf:"default:false" yaml:"enabled"` - Store struct { + // The location on which persisted operations are stored + Store string `conf:"./store" yaml:"store"` + // Configuration for auto-reloading persisted operations + Reload struct { + Enabled bool `conf:"default:false" yaml:"enabled"` + Interval time.Duration `conf:"default:5m" yaml:"interval"` + Timeout time.Duration `conf:"default:10s" yaml:"timeout"` + } + // Remote strategies for fetching persisted operations + Remote struct { GcpBucket string `conf:"gs://something/foo" yaml:"gcp_bucket"` - Dir string `conf:"" yaml:"dir"` } FailUnknownOperations bool `conf:"default:false" yaml:"fail_unknown_operations"` } -var ErrNoLoaderSupplied = errors.New("no loader supplied") +var ErrNoLoaderSupplied = errors.New("no remoteLoader supplied") var ErrNoHashFound = errors.New("no hash found") type PersistedOperationsHandler struct { @@ -51,15 +57,38 @@ type PersistedOperationsHandler struct { // this has the opportunity to grow indefinitely, might wat to replace with a fixed-cap cache // or something like an LRU with a TTL cache map[string]string - // not sure if keeping a reference to this is required, might be nice for refreshing during runtime - loader PersistedOperationsLoader + // Strategy for loading persisted operations from a remote location + remoteLoader RemoteLoader + refreshTicker *time.Ticker + + dirLoader LocalLoader + done chan bool + lock sync.RWMutex } -func NewPersistedOperations(log *slog.Logger, cfg Config, loader PersistedOperationsLoader) (*PersistedOperationsHandler, error) { +func NewPersistedOperations(log *slog.Logger, cfg Config, loader LocalLoader, remoteLoader RemoteLoader) (*PersistedOperationsHandler, error) { if loader == nil { return nil, ErrNoLoaderSupplied } + if remoteLoader != nil { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, cfg.Reload.Timeout) + defer cancel() + err := remoteLoader.Load(ctx) + if err != nil { + return nil, err + } + } + + if cfg.Reload.Interval < 10*time.Second { + cfg.Reload.Interval = 10 * time.Second + log.Warn("Reload interval cannot be less than every 10 seconds, manually overwrote to 10 seconds") + } + + refreshTicker := time.NewTicker(cfg.Reload.Interval) + done := make(chan bool) + cache, err := loader.Load(context.Background()) if err != nil { return nil, err @@ -67,12 +96,21 @@ func NewPersistedOperations(log *slog.Logger, cfg Config, loader PersistedOperat log.Info("Loaded persisted operations", "amount", len(cache)) - return &PersistedOperationsHandler{ - log: log, - cfg: cfg, - cache: cache, - loader: loader, - }, nil + poh := &PersistedOperationsHandler{ + log: log, + cfg: cfg, + cache: cache, + remoteLoader: remoteLoader, + dirLoader: loader, + refreshTicker: refreshTicker, + done: done, + lock: sync.RWMutex{}, + } + + // start reloader + poh.reload() + + return poh, nil } // Execute runs of the persisted operations handler @@ -113,7 +151,10 @@ func (p *PersistedOperationsHandler) Execute(next http.Handler) http.Handler { return } + p.lock.RLock() query, ok := p.cache[hash] + p.lock.RUnlock() + if !ok { // hash not found, fail p.log.Warn("Unknown hash, persisted operation not found ", "hash", hash) @@ -141,6 +182,62 @@ func (p *PersistedOperationsHandler) Execute(next http.Handler) http.Handler { return http.HandlerFunc(fn) } +func (p *PersistedOperationsHandler) reloadFromLocalDir() error { + dirLoader := NewLocalDirLoader(p.cfg) + if dirLoader == nil { + return errors.New("dir loader is nil") + } + + cache, err := dirLoader.Load(context.Background()) + if err != nil { + return err + } + p.lock.Lock() + p.cache = cache + p.lock.Unlock() + + p.log.Info("Loaded persisted operations", "amount", len(cache)) + + return nil +} + +func (p *PersistedOperationsHandler) reload() { + if !p.cfg.Reload.Enabled { + return + } + + go func() { + for { + select { + case <-p.done: + return + case _ = <-p.refreshTicker.C: + p.reloadFromRemote() + } + } + }() +} + +func (p *PersistedOperationsHandler) reloadFromRemote() { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, p.cfg.Reload.Timeout) + err := p.remoteLoader.Load(ctx) + cancel() + if err != nil { + return + } + + err = p.reloadFromLocalDir() + if err != nil { + p.log.Error("Error loading from local dir", "err", err) + return + } +} + +func (p *PersistedOperationsHandler) Shutdown() { + p.done <- true +} + func hashFromPayload(payload RequestPayload) (string, error) { if payload.Extensions.PersistedQuery == nil { return "", ErrNoHashFound diff --git a/internal/business/persisted_operations/persisted_operations_test.go b/internal/business/persisted_operations/persisted_operations_test.go index 40a8d23..7beb393 100644 --- a/internal/business/persisted_operations/persisted_operations_test.go +++ b/internal/business/persisted_operations/persisted_operations_test.go @@ -141,7 +141,7 @@ func TestNewPersistedOperations(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { log := slog.Default() - po, _ := NewPersistedOperations(log, tt.args.cfg, newMemoryLoader(tt.args.cache)) + po, _ := NewPersistedOperations(log, tt.args.cfg, newMemoryLoader(tt.args.cache), nil) po.cache = tt.args.cache bts, err := json.Marshal(&tt.args.payload) diff --git a/internal/business/proxy/proxy.go b/internal/business/proxy/proxy.go index 0ae30cc..02ebc62 100644 --- a/internal/business/proxy/proxy.go +++ b/internal/business/proxy/proxy.go @@ -9,6 +9,7 @@ import ( "net/http" "net/http/httputil" "net/url" + "strconv" "time" ) @@ -39,6 +40,7 @@ func NewProxy(cfg Config, blockFieldSuggestions *block_field_suggestions.BlockFi } decoder := json.NewDecoder(res.Body) + defer res.Body.Close() var response map[string]interface{} err := decoder.Decode(&response) @@ -55,8 +57,9 @@ func NewProxy(cfg Config, blockFieldSuggestions *block_field_suggestions.BlockFi } buffer := bytes.NewBuffer(bts) - res.Body = io.NopCloser(buffer) res.ContentLength = int64(buffer.Len()) + res.Header.Set("Content-Length", strconv.Itoa(buffer.Len())) + res.Body = io.NopCloser(buffer) return nil } From 338b1c000be011c31b3afa075385125334819776 Mon Sep 17 00:00:00 2001 From: ldebruijn Date: Mon, 2 Oct 2023 21:40:09 +0200 Subject: [PATCH 2/7] feat(reloading): Fix failing test --- cmd/main.go | 6 +++++- cmd/main_test.go | 11 +++++++---- .../persisted_operations/persisted_operations.go | 13 +++++++++---- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 1689dd9..d38ed57 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -64,7 +64,11 @@ func run(ctx context.Context, log *slog.Logger, cfg *config.Config, shutdown cha log.Error("Unable to determine loading strategy for persisted operations", "err", err) } - po, _ := persisted_operations.NewPersistedOperations(log, cfg.PersistedOperations, persisted_operations.NewLocalDirLoader(cfg.PersistedOperations), poLoader) + po, err := persisted_operations.NewPersistedOperations(log, cfg.PersistedOperations, persisted_operations.NewLocalDirLoader(cfg.PersistedOperations), poLoader) + if err != nil { + log.Error("Error creating Persisted Operations", "err", err) + return nil + } mux := http.NewServeMux() diff --git a/cmd/main_test.go b/cmd/main_test.go index 4998c24..0d242a2 100644 --- a/cmd/main_test.go +++ b/cmd/main_test.go @@ -63,7 +63,8 @@ func TestHttpServerIntegration(t *testing.T) { }, } ex, _ := json.Marshal(expected) - actual, _ := io.ReadAll(response.Body) + actual, err := io.ReadAll(response.Body) + assert.NoError(t, err) // perform string comparisons as map[string]interface seems incomparable assert.Equal(t, string(ex), string(actual)) }, @@ -107,7 +108,9 @@ func TestHttpServerIntegration(t *testing.T) { }, } ex, _ := json.Marshal(expected) - actual, _ := io.ReadAll(response.Body) + actual, err := io.ReadAll(response.Body) + assert.NoError(t, err) + ac := string(actual) ac = strings.TrimSuffix(ac, "\n") @@ -178,7 +181,7 @@ func TestHttpServerIntegration(t *testing.T) { _, _ = w.Write(bts) })) - defer mockServer.Close() + //defer mockServer.Close() shutdown := make(chan os.Signal, 1) @@ -195,7 +198,7 @@ func TestHttpServerIntegration(t *testing.T) { url := "http://localhost:8080" + tt.args.request.URL.String() res, err := http.Post(url, tt.args.request.Header.Get("Content-Type"), tt.args.request.Body) if err != nil { - assert.NoError(t, err) + assert.NoError(t, err, tt.name) } tt.want(t, res) diff --git a/internal/business/persisted_operations/persisted_operations.go b/internal/business/persisted_operations/persisted_operations.go index 93c85c6..b35cb24 100644 --- a/internal/business/persisted_operations/persisted_operations.go +++ b/internal/business/persisted_operations/persisted_operations.go @@ -50,6 +50,7 @@ type Config struct { var ErrNoLoaderSupplied = errors.New("no remoteLoader supplied") var ErrNoHashFound = errors.New("no hash found") +var ErrReloadIntervalTooShort = errors.New("reload interval cannot be less than 10 seconds") type PersistedOperationsHandler struct { log *slog.Logger @@ -81,12 +82,16 @@ func NewPersistedOperations(log *slog.Logger, cfg Config, loader LocalLoader, re } } - if cfg.Reload.Interval < 10*time.Second { - cfg.Reload.Interval = 10 * time.Second - log.Warn("Reload interval cannot be less than every 10 seconds, manually overwrote to 10 seconds") + if cfg.Reload.Enabled && cfg.Reload.Interval < 10*time.Second { + return nil, ErrReloadIntervalTooShort } - refreshTicker := time.NewTicker(cfg.Reload.Interval) + refreshTicker := func() *time.Ticker { + if !cfg.Reload.Enabled { + return nil + } + return time.NewTicker(cfg.Reload.Interval) + }() done := make(chan bool) cache, err := loader.Load(context.Background()) From 1ba7e0ca197334c6e7e2c9e7533956d75d29f35c Mon Sep 17 00:00:00 2001 From: ldebruijn Date: Mon, 2 Oct 2023 22:06:57 +0200 Subject: [PATCH 3/7] feat(reloading): Update docs, cleanup --- cmd/main.go | 4 +-- docs/persisted_operations.md | 22 ++++++++++---- .../persisted_operations.go | 29 ++++++++++--------- 3 files changed, 34 insertions(+), 21 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index d38ed57..230bc8d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -72,7 +72,7 @@ func run(ctx context.Context, log *slog.Logger, cfg *config.Config, shutdown cha mux := http.NewServeMux() - mid := middleware(log, cfg, po) + mid := middleware(log, po) mux.Handle(cfg.Web.Path, mid(Handler(pxy))) api := http.Server{ @@ -113,7 +113,7 @@ func run(ctx context.Context, log *slog.Logger, cfg *config.Config, shutdown cha return nil } -func middleware(log *slog.Logger, cfg *config.Config, po *persisted_operations.PersistedOperationsHandler) func(next http.Handler) http.Handler { +func middleware(log *slog.Logger, po *persisted_operations.PersistedOperationsHandler) func(next http.Handler) http.Handler { rec := middleware2.Recover(log) fn := func(next http.Handler) http.Handler { diff --git a/docs/persisted_operations.md b/docs/persisted_operations.md index a6af734..0d97cad 100644 --- a/docs/persisted_operations.md +++ b/docs/persisted_operations.md @@ -20,12 +20,14 @@ persisted_operations: enabled: true # Fail unknown operations, disable this feature to allow unknown operations to reach your GraphQL API fail_unknown_operations: true - # Determines the strategy for loading the supported operations. - # Only one store will be used - store: - # Load persisted operations from a directory on the local filesystem. - # Will look at all files in the directory and attempt to load any file with a `.json` extension - dir: "./my-dir" + # Store is the location on local disk where go-graphql-armor can find the persisted operations, it loads any `*.json` files on disk + store: "./store" + reload: + enabled: true + interval: 5m + # The timeout for the remote operation + timeout: 10s + remote: # Load persisted operations from a GCP Cloud Storage bucket. # Will look at all the objects in the bucket and try to load any object with a `.json` extension gcp_bucket: "gs://somebucket" @@ -33,6 +35,14 @@ persisted_operations: # ... ``` +## How it works + +`go-graphql-armor` looks at the store location on local disk to find any `*.json` files it can parse for persisted operations. + +It can be configured to look at this directory and reload based on the files on local disk. + +Additionally, it can be configured to fetch operations from a remote location onto the local disk. + ## Parsing Structure To be able to parse Persisted Operations go-graphql-armor expects a `key-value` structure for `hash-operation` in the files. diff --git a/internal/business/persisted_operations/persisted_operations.go b/internal/business/persisted_operations/persisted_operations.go index b35cb24..5ea8c7b 100644 --- a/internal/business/persisted_operations/persisted_operations.go +++ b/internal/business/persisted_operations/persisted_operations.go @@ -112,6 +112,12 @@ func NewPersistedOperations(log *slog.Logger, cfg Config, loader LocalLoader, re lock: sync.RWMutex{}, } + poh.reloadFromRemote() + err = poh.reloadFromLocalDir() + if err != nil { + return nil, err + } + // start reloader poh.reload() @@ -188,12 +194,7 @@ func (p *PersistedOperationsHandler) Execute(next http.Handler) http.Handler { } func (p *PersistedOperationsHandler) reloadFromLocalDir() error { - dirLoader := NewLocalDirLoader(p.cfg) - if dirLoader == nil { - return errors.New("dir loader is nil") - } - - cache, err := dirLoader.Load(context.Background()) + cache, err := p.dirLoader.Load(context.Background()) if err != nil { return err } @@ -218,23 +219,25 @@ func (p *PersistedOperationsHandler) reload() { return case _ = <-p.refreshTicker.C: p.reloadFromRemote() + err := p.reloadFromLocalDir() + if err != nil { + p.log.Warn("Error loading from local dir", "err", err) + } } } }() } func (p *PersistedOperationsHandler) reloadFromRemote() { - ctx := context.Background() - ctx, cancel := context.WithTimeout(ctx, p.cfg.Reload.Timeout) - err := p.remoteLoader.Load(ctx) - cancel() - if err != nil { + if p.remoteLoader == nil { return } + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, p.cfg.Reload.Timeout) + defer cancel() - err = p.reloadFromLocalDir() + err := p.remoteLoader.Load(ctx) if err != nil { - p.log.Error("Error loading from local dir", "err", err) return } } From 0ba21e2792812f19e2296389fab0f1eb31a2396f Mon Sep 17 00:00:00 2001 From: ldebruijn Date: Mon, 2 Oct 2023 22:31:11 +0200 Subject: [PATCH 4/7] feat(reloading): Fix app shutdown and tests --- cmd/main.go | 12 +++++------- cmd/main_test.go | 5 ++--- .../persisted_operations/gcp_storage_loader.go | 8 +++++--- internal/business/persisted_operations/loader.go | 2 +- .../persisted_operations/persisted_operations.go | 3 ++- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 230bc8d..c9780ab 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -22,8 +22,6 @@ import ( var build = "develop" func main() { - ctx := context.Background() - log := slog.Default() // cfg @@ -40,13 +38,13 @@ func main() { shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM) - if err := run(ctx, log, cfg, shutdown); err != nil { + if err := run(log, cfg, shutdown); err != nil { log.Error("startup", "msg", err) os.Exit(1) } } -func run(ctx context.Context, log *slog.Logger, cfg *config.Config, shutdown chan os.Signal) error { +func run(log *slog.Logger, cfg *config.Config, shutdown chan os.Signal) error { log.Info("startup", "GOMAXPROCS", runtime.GOMAXPROCS(0)) log.Info("Starting proxy", "target", cfg.Target.Host) @@ -59,12 +57,12 @@ func run(ctx context.Context, log *slog.Logger, cfg *config.Config, shutdown cha return nil } - poLoader, err := persisted_operations.RemoteLoaderFromConfig(cfg.PersistedOperations) + remoteLoader, err := persisted_operations.RemoteLoaderFromConfig(cfg.PersistedOperations) if err != nil { - log.Error("Unable to determine loading strategy for persisted operations", "err", err) + log.Warn("Error initializing remote loader", "err", err) } - po, err := persisted_operations.NewPersistedOperations(log, cfg.PersistedOperations, persisted_operations.NewLocalDirLoader(cfg.PersistedOperations), poLoader) + po, err := persisted_operations.NewPersistedOperations(log, cfg.PersistedOperations, persisted_operations.NewLocalDirLoader(cfg.PersistedOperations), remoteLoader) if err != nil { log.Error("Error creating Persisted Operations", "err", err) return nil diff --git a/cmd/main_test.go b/cmd/main_test.go index 0d242a2..39fbc4c 100644 --- a/cmd/main_test.go +++ b/cmd/main_test.go @@ -2,7 +2,6 @@ package main import ( "bytes" - "context" "encoding/json" "github.com/ldebruijn/go-graphql-armor/internal/app/config" "github.com/stretchr/testify/assert" @@ -181,7 +180,7 @@ func TestHttpServerIntegration(t *testing.T) { _, _ = w.Write(bts) })) - //defer mockServer.Close() + defer mockServer.Close() shutdown := make(chan os.Signal, 1) @@ -192,7 +191,7 @@ func TestHttpServerIntegration(t *testing.T) { cfg.Target.Host = mockServer.URL go func() { - _ = run(context.Background(), slog.Default(), cfg, shutdown) + _ = run(slog.Default(), cfg, shutdown) }() url := "http://localhost:8080" + tt.args.request.URL.String() diff --git a/internal/business/persisted_operations/gcp_storage_loader.go b/internal/business/persisted_operations/gcp_storage_loader.go index 2b70a54..5179652 100644 --- a/internal/business/persisted_operations/gcp_storage_loader.go +++ b/internal/business/persisted_operations/gcp_storage_loader.go @@ -8,6 +8,7 @@ import ( "google.golang.org/api/iterator" "io" "os" + "path/filepath" "time" ) @@ -17,9 +18,10 @@ import ( type GcpStorageLoader struct { client *storage.Client bucket string + store string } -func NewGcpStorageLoader(ctx context.Context, bucket string, cfg Config) (*GcpStorageLoader, error) { +func NewGcpStorageLoader(ctx context.Context, bucket string, store string) (*GcpStorageLoader, error) { client, err := storage.NewClient(ctx) if err != nil { return nil, err @@ -28,6 +30,7 @@ func NewGcpStorageLoader(ctx context.Context, bucket string, cfg Config) (*GcpSt return &GcpStorageLoader{ client: client, bucket: bucket, + store: store, }, nil } func (g *GcpStorageLoader) Load(ctx context.Context) error { @@ -47,8 +50,7 @@ func (g *GcpStorageLoader) Load(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, time.Second*50) - // include path - f, err := os.Create(attrs.Name) + f, err := os.Create(filepath.Join(g.store, attrs.Name)) if err != nil { cancel() errs = append(errs, fmt.Errorf("os.Create: %w", err)) diff --git a/internal/business/persisted_operations/loader.go b/internal/business/persisted_operations/loader.go index a675710..b39f14a 100644 --- a/internal/business/persisted_operations/loader.go +++ b/internal/business/persisted_operations/loader.go @@ -28,7 +28,7 @@ func RemoteLoaderFromConfig(cfg Config) (RemoteLoader, error) { // load loads persisted operations from various sources func determineLoader(cfg Config) (RemoteLoader, error) { if cfg.Remote.GcpBucket != "" { - loader, err := NewGcpStorageLoader(context.Background(), cfg.Remote.GcpBucket, cfg) + loader, err := NewGcpStorageLoader(context.Background(), cfg.Remote.GcpBucket, cfg.Store) if err != nil { return nil, errors.New("unable to instantiate GcpBucketLoader") } diff --git a/internal/business/persisted_operations/persisted_operations.go b/internal/business/persisted_operations/persisted_operations.go index 5ea8c7b..196abb3 100644 --- a/internal/business/persisted_operations/persisted_operations.go +++ b/internal/business/persisted_operations/persisted_operations.go @@ -92,7 +92,8 @@ func NewPersistedOperations(log *slog.Logger, cfg Config, loader LocalLoader, re } return time.NewTicker(cfg.Reload.Interval) }() - done := make(chan bool) + // buffered in case we dont have reloading enabled + done := make(chan bool, 1) cache, err := loader.Load(context.Background()) if err != nil { From 001f228098988c387b3f705131ac970f9c5cf361 Mon Sep 17 00:00:00 2001 From: ldebruijn Date: Mon, 2 Oct 2023 22:44:17 +0200 Subject: [PATCH 5/7] feat(reloading): Fix test --- .../persisted_operations/persisted_operations.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/internal/business/persisted_operations/persisted_operations.go b/internal/business/persisted_operations/persisted_operations.go index 196abb3..b378f17 100644 --- a/internal/business/persisted_operations/persisted_operations.go +++ b/internal/business/persisted_operations/persisted_operations.go @@ -95,17 +95,10 @@ func NewPersistedOperations(log *slog.Logger, cfg Config, loader LocalLoader, re // buffered in case we dont have reloading enabled done := make(chan bool, 1) - cache, err := loader.Load(context.Background()) - if err != nil { - return nil, err - } - - log.Info("Loaded persisted operations", "amount", len(cache)) - poh := &PersistedOperationsHandler{ log: log, cfg: cfg, - cache: cache, + cache: map[string]string{}, remoteLoader: remoteLoader, dirLoader: loader, refreshTicker: refreshTicker, @@ -114,7 +107,7 @@ func NewPersistedOperations(log *slog.Logger, cfg Config, loader LocalLoader, re } poh.reloadFromRemote() - err = poh.reloadFromLocalDir() + err := poh.reloadFromLocalDir() if err != nil { return nil, err } From e9512b7df7c005a7465916d87b50e5151b25d2b3 Mon Sep 17 00:00:00 2001 From: ldebruijn Date: Mon, 2 Oct 2023 22:46:55 +0200 Subject: [PATCH 6/7] feat(reloading): Undo change --- internal/business/persisted_operations/memory_loader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/business/persisted_operations/memory_loader.go b/internal/business/persisted_operations/memory_loader.go index 9749dfe..034defe 100644 --- a/internal/business/persisted_operations/memory_loader.go +++ b/internal/business/persisted_operations/memory_loader.go @@ -4,7 +4,7 @@ import ( "context" ) -// MemoryLoader is a remoteLoader for testing purposes +// MemoryLoader is a loader for testing purposes // It allows the user to specify operations in memory type MemoryLoader struct { store map[string]string From 406c12ad6fcc63ff13047b47a4dea77ff8fe065e Mon Sep 17 00:00:00 2001 From: ldebruijn Date: Tue, 3 Oct 2023 08:52:13 +0200 Subject: [PATCH 7/7] feat(reloading): Update docs --- cmd/main.go | 5 +++-- docs/persisted_operations.md | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index c9780ab..667bb67 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "fmt" "github.com/ardanlabs/conf/v3" "github.com/ldebruijn/go-graphql-armor/internal/app/config" @@ -58,13 +59,13 @@ func run(log *slog.Logger, cfg *config.Config, shutdown chan os.Signal) error { } remoteLoader, err := persisted_operations.RemoteLoaderFromConfig(cfg.PersistedOperations) - if err != nil { + if err != nil && !errors.Is(err, persisted_operations.ErrNoRemoteLoaderSpecified) { log.Warn("Error initializing remote loader", "err", err) } po, err := persisted_operations.NewPersistedOperations(log, cfg.PersistedOperations, persisted_operations.NewLocalDirLoader(cfg.PersistedOperations), remoteLoader) if err != nil { - log.Error("Error creating Persisted Operations", "err", err) + log.Error("Error initializing Persisted Operations", "err", err) return nil } diff --git a/docs/persisted_operations.md b/docs/persisted_operations.md index 0d97cad..c268f1f 100644 --- a/docs/persisted_operations.md +++ b/docs/persisted_operations.md @@ -24,6 +24,7 @@ persisted_operations: store: "./store" reload: enabled: true + # The interval in which the local store dir is read and refreshes the internal state interval: 5m # The timeout for the remote operation timeout: 10s @@ -37,7 +38,7 @@ persisted_operations: ## How it works -`go-graphql-armor` looks at the store location on local disk to find any `*.json` files it can parse for persisted operations. +`go-graphql-armor` looks at the `store` location on local disk to find any `*.json` files it can parse for persisted operations. It can be configured to look at this directory and reload based on the files on local disk.