diff --git a/cli/cli.go b/cli/cli.go index 02802eae..64a53801 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -23,6 +23,7 @@ func Execute(ctx context.Context) { cmdConfig(), cmdResourceCommand(), cmdModuleCommand(), + cmdWorker(), ) cmdx.SetHelp(rootCmd) diff --git a/cli/config.go b/cli/config.go index 5958f947..3e53b306 100644 --- a/cli/config.go +++ b/cli/config.go @@ -30,11 +30,18 @@ type Config struct { } type syncerConf struct { - SyncInterval time.Duration `mapstructure:"sync_interval" default:"1s"` - RefreshInterval time.Duration `mapstructure:"refresh_interval" default:"3s"` - ExtendLockBy time.Duration `mapstructure:"extend_lock_by" default:"5s"` - SyncBackoffInterval time.Duration `mapstructure:"sync_backoff_interval" default:"5s"` - MaxRetries int `mapstructure:"max_retries" default:"5"` + SyncInterval time.Duration `mapstructure:"sync_interval" default:"1s"` + RefreshInterval time.Duration `mapstructure:"refresh_interval" default:"3s"` + ExtendLockBy time.Duration `mapstructure:"extend_lock_by" default:"5s"` + SyncBackoffInterval time.Duration `mapstructure:"sync_backoff_interval" default:"5s"` + MaxRetries int `mapstructure:"max_retries" default:"5"` + WorkerModules []workerModule `mapstructure:"worker_modules"` +} + +type workerModule struct { + Name string `mapstructure:"name"` + Count int `mapstructure:"count" default:"1"` + Scope map[string][]string `mapstructure:"labels"` } type serveConfig struct { diff --git a/cli/serve.go b/cli/serve.go index 17ea57ed..c6ecda06 100644 --- a/cli/serve.go +++ b/cli/serve.go @@ -61,11 +61,7 @@ func cmdServe() *cobra.Command { } if spawnWorker { - go func() { - if runErr := resourceService.RunSyncer(cmd.Context(), cfg.Syncer.SyncInterval); runErr != nil { - zap.L().Error("syncer exited with error", zap.Error(err)) - } - }() + go spawnWorkers(cmd.Context(), resourceService, cfg.Syncer.WorkerModules, cfg.Syncer.SyncInterval) } return entropyserver.Serve(cmd.Context(), diff --git a/cli/worker.go b/cli/worker.go new file mode 100644 index 00000000..fd6c3e77 --- /dev/null +++ b/cli/worker.go @@ -0,0 +1,67 @@ +package cli + +import ( + "context" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/MakeNowJust/heredoc" + "github.com/goto/entropy/core" + "github.com/goto/entropy/core/module" + "github.com/spf13/cobra" + "go.uber.org/zap" +) + +func cmdWorker() *cobra.Command { + cmd := &cobra.Command{ + Use: "worker", + Short: "Start workers", + Example: heredoc.Doc(` + $ entropy worker + `), + Annotations: map[string]string{ + "group:other": "server", + }, + } + + cmd.RunE = handleErr(func(cmd *cobra.Command, args []string) error { + cfg, err := loadConfig(cmd) + if err != nil { + return err + } + + store := setupStorage(cfg.PGConnStr, cfg.Syncer, cfg.Service) + moduleService := module.NewService(setupRegistry(), store) + resourceService := core.New(store, moduleService, time.Now, cfg.Syncer.SyncBackoffInterval, cfg.Syncer.MaxRetries) + + var wg *sync.WaitGroup + go func() { + wg = spawnWorkers(cmd.Context(), resourceService, cfg.Syncer.WorkerModules, cfg.Syncer.SyncInterval) + }() + + quitChannel := make(chan os.Signal, 1) + signal.Notify(quitChannel, syscall.SIGINT, syscall.SIGTERM) + <-quitChannel + + wg.Wait() + zap.L().Info("all syncer workers exited") + + return nil + }) + + return cmd +} + +func spawnWorkers(ctx context.Context, resourceService *core.Service, workerModules []workerModule, syncInterval time.Duration) (wg *sync.WaitGroup) { + if len(workerModules) == 0 { + wg = resourceService.RunSyncer(ctx, 1, syncInterval, map[string][]string{}) + } else { + for _, module := range workerModules { + wg = resourceService.RunSyncer(ctx, module.Count, syncInterval, module.Scope) + } + } + return +} diff --git a/core/mocks/resource_store.go b/core/mocks/resource_store.go index 6e3a0c42..df3aebe5 100644 --- a/core/mocks/resource_store.go +++ b/core/mocks/resource_store.go @@ -324,17 +324,17 @@ func (_c *ResourceStore_Revisions_Call) RunAndReturn(run func(context.Context, r return _c } -// SyncOne provides a mock function with given fields: ctx, syncFn -func (_m *ResourceStore) SyncOne(ctx context.Context, syncFn resource.SyncFn) error { - ret := _m.Called(ctx, syncFn) +// SyncOne provides a mock function with given fields: ctx, scope, syncFn +func (_m *ResourceStore) SyncOne(ctx context.Context, scope map[string][]string, syncFn resource.SyncFn) error { + ret := _m.Called(ctx, scope, syncFn) if len(ret) == 0 { panic("no return value specified for SyncOne") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, resource.SyncFn) error); ok { - r0 = rf(ctx, syncFn) + if rf, ok := ret.Get(0).(func(context.Context, map[string][]string, resource.SyncFn) error); ok { + r0 = rf(ctx, scope, syncFn) } else { r0 = ret.Error(0) } @@ -349,14 +349,15 @@ type ResourceStore_SyncOne_Call struct { // SyncOne is a helper method to define mock.On call // - ctx context.Context +// - scope map[string][]string // - syncFn resource.SyncFn -func (_e *ResourceStore_Expecter) SyncOne(ctx interface{}, syncFn interface{}) *ResourceStore_SyncOne_Call { - return &ResourceStore_SyncOne_Call{Call: _e.mock.On("SyncOne", ctx, syncFn)} +func (_e *ResourceStore_Expecter) SyncOne(ctx interface{}, scope interface{}, syncFn interface{}) *ResourceStore_SyncOne_Call { + return &ResourceStore_SyncOne_Call{Call: _e.mock.On("SyncOne", ctx, scope, syncFn)} } -func (_c *ResourceStore_SyncOne_Call) Run(run func(ctx context.Context, syncFn resource.SyncFn)) *ResourceStore_SyncOne_Call { +func (_c *ResourceStore_SyncOne_Call) Run(run func(ctx context.Context, scope map[string][]string, syncFn resource.SyncFn)) *ResourceStore_SyncOne_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(resource.SyncFn)) + run(args[0].(context.Context), args[1].(map[string][]string), args[2].(resource.SyncFn)) }) return _c } @@ -366,7 +367,7 @@ func (_c *ResourceStore_SyncOne_Call) Return(_a0 error) *ResourceStore_SyncOne_C return _c } -func (_c *ResourceStore_SyncOne_Call) RunAndReturn(run func(context.Context, resource.SyncFn) error) *ResourceStore_SyncOne_Call { +func (_c *ResourceStore_SyncOne_Call) RunAndReturn(run func(context.Context, map[string][]string, resource.SyncFn) error) *ResourceStore_SyncOne_Call { _c.Call.Return(run) return _c } diff --git a/core/resource/resource.go b/core/resource/resource.go index 65166deb..32879d48 100644 --- a/core/resource/resource.go +++ b/core/resource/resource.go @@ -27,7 +27,7 @@ type Store interface { Revisions(ctx context.Context, selector RevisionsSelector) ([]Revision, error) - SyncOne(ctx context.Context, syncFn SyncFn) error + SyncOne(ctx context.Context, scope map[string][]string, syncFn SyncFn) error } type SyncFn func(ctx context.Context, res Resource) (*Resource, error) diff --git a/core/sync.go b/core/sync.go index d4d17795..d1d27d91 100644 --- a/core/sync.go +++ b/core/sync.go @@ -2,6 +2,7 @@ package core import ( "context" + "sync" "time" "go.uber.org/zap" @@ -12,7 +13,7 @@ import ( // RunSyncer runs the syncer thread that keeps performing resource-sync at // regular intervals. -func (svc *Service) RunSyncer(ctx context.Context, interval time.Duration) error { +func (svc *Service) runSyncer(ctx context.Context, _ int, interval time.Duration, scope map[string][]string) error { tick := time.NewTimer(interval) defer tick.Stop() @@ -24,7 +25,7 @@ func (svc *Service) RunSyncer(ctx context.Context, interval time.Duration) error case <-tick.C: tick.Reset(interval) - err := svc.store.SyncOne(ctx, svc.handleSync) + err := svc.store.SyncOne(ctx, scope, svc.handleSync) if err != nil { zap.L().Warn("SyncOne() failed", zap.Error(err)) } @@ -32,6 +33,22 @@ func (svc *Service) RunSyncer(ctx context.Context, interval time.Duration) error } } +func (svc *Service) RunSyncer(ctx context.Context, workerCount int, interval time.Duration, scope map[string][]string) *sync.WaitGroup { + wg := &sync.WaitGroup{} + for i := 0; i < workerCount; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + + if err := svc.runSyncer(ctx, id, interval, scope); err != nil { + zap.L().Error("worker-%d failed", zap.Error(err)) + } + }(i) + } + + return wg +} + func (svc *Service) handleSync(ctx context.Context, res resource.Resource) (*resource.Resource, error) { logEntry := zap.L().With( zap.String("resource_urn", res.URN), diff --git a/internal/store/postgres/resource_store.go b/internal/store/postgres/resource_store.go index 9d33563a..c68c76c2 100644 --- a/internal/store/postgres/resource_store.go +++ b/internal/store/postgres/resource_store.go @@ -264,8 +264,8 @@ func (st *Store) Delete(ctx context.Context, urn string, hooks ...resource.Mutat return withinTx(ctx, st.db, false, deleteFn) } -func (st *Store) SyncOne(ctx context.Context, syncFn resource.SyncFn) error { - urn, err := st.fetchResourceForSync(ctx) +func (st *Store) SyncOne(ctx context.Context, scope map[string][]string, syncFn resource.SyncFn) error { + urn, err := st.fetchResourceForSync(ctx, scope) if err != nil { if errors.Is(err, sql.ErrNoRows) { // No resource available for sync. @@ -298,7 +298,7 @@ func (st *Store) handleDequeued(baseCtx context.Context, res resource.Resource, return fn(runCtx, res) } -func (st *Store) fetchResourceForSync(ctx context.Context) (string, error) { +func (st *Store) fetchResourceForSync(ctx context.Context, scope map[string][]string) (string, error) { var urn string // find a resource ready for sync, extend it next sync time atomically. @@ -310,6 +310,10 @@ func (st *Store) fetchResourceForSync(ctx context.Context) (string, error) { Where(sq.Expr("state_next_sync <= current_timestamp")). Suffix("FOR UPDATE SKIP LOCKED") + for key, value := range scope { + builder = builder.Where(sq.Eq{key: value}) + } + query, args, err := builder.PlaceholderFormat(sq.Dollar).ToSql() if err != nil { return err