Skip to content

Commit

Permalink
feat: modular worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Femi Novia Lina committed Aug 13, 2024
1 parent 136b628 commit 4895f60
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 24 deletions.
1 change: 1 addition & 0 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func Execute(ctx context.Context) {
cmdConfig(),
cmdResourceCommand(),
cmdModuleCommand(),
cmdWorker(),
)

cmdx.SetHelp(rootCmd)
Expand Down
17 changes: 12 additions & 5 deletions cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,18 @@ type Config struct {
}

type syncerConf struct {
SyncInterval time.Duration `mapstructure:"sync_interval" default:"1s"`
RefreshInterval time.Duration `mapstructure:"refresh_interval" default:"3s"`
ExtendLockBy time.Duration `mapstructure:"extend_lock_by" default:"5s"`
SyncBackoffInterval time.Duration `mapstructure:"sync_backoff_interval" default:"5s"`
MaxRetries int `mapstructure:"max_retries" default:"5"`
SyncInterval time.Duration `mapstructure:"sync_interval" default:"1s"`
RefreshInterval time.Duration `mapstructure:"refresh_interval" default:"3s"`
ExtendLockBy time.Duration `mapstructure:"extend_lock_by" default:"5s"`
SyncBackoffInterval time.Duration `mapstructure:"sync_backoff_interval" default:"5s"`
MaxRetries int `mapstructure:"max_retries" default:"5"`
WorkerModules []workerModule `mapstructure:"worker_modules"`
}

type workerModule struct {
Name string `mapstructure:"name"`
Count int `mapstructure:"count" default:"1"`
Scope map[string][]string `mapstructure:"labels"`
}

type serveConfig struct {
Expand Down
8 changes: 5 additions & 3 deletions cli/serve.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cli

import (
"sync"
"time"

"github.com/newrelic/go-agent/v3/newrelic"
Expand Down Expand Up @@ -60,11 +61,12 @@ func cmdServe() *cobra.Command {
}
}

var wg *sync.WaitGroup
if spawnWorker {
go func() {
if runErr := resourceService.RunSyncer(cmd.Context(), cfg.Syncer.SyncInterval); runErr != nil {
zap.L().Error("syncer exited with error", zap.Error(err))
}
wg = spawnWorkers(cmd.Context(), resourceService, cfg.Syncer.WorkerModules, cfg.Syncer.SyncInterval)
wg.Wait()
zap.L().Info("all syncer workers exited")
}()
}

Expand Down
73 changes: 73 additions & 0 deletions cli/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package cli

import (
"context"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/MakeNowJust/heredoc"
"github.com/goto/entropy/core"
"github.com/goto/entropy/core/module"
"github.com/goto/entropy/pkg/logger"
"github.com/spf13/cobra"
"go.uber.org/zap"
)

func cmdWorker() *cobra.Command {
cmd := &cobra.Command{
Use: "worker",
Short: "Start workers",
Example: heredoc.Doc(`
$ entropy worker --count 2 --scope project=example-project,example-project-2
`),
Annotations: map[string]string{
"group:other": "server",
},
}

cmd.RunE = handleErr(func(cmd *cobra.Command, args []string) error {
cfg, err := loadConfig(cmd)
if err != nil {
return err
}

err = logger.Setup(&cfg.Log)
if err != nil {
return err
}

store := setupStorage(cfg.PGConnStr, cfg.Syncer, cfg.Service)
moduleService := module.NewService(setupRegistry(), store)
resourceService := core.New(store, moduleService, time.Now, cfg.Syncer.SyncBackoffInterval, cfg.Syncer.MaxRetries)

var wg *sync.WaitGroup
go func() {
wg = spawnWorkers(cmd.Context(), resourceService, cfg.Syncer.WorkerModules, cfg.Syncer.SyncInterval)
}()

quitChannel := make(chan os.Signal, 1)
signal.Notify(quitChannel, syscall.SIGINT, syscall.SIGTERM)
<-quitChannel

wg.Wait()
zap.L().Info("all syncer workers exited")

return nil
})

return cmd
}

func spawnWorkers(ctx context.Context, resourceService *core.Service, workerModules []workerModule, syncInterval time.Duration) (wg *sync.WaitGroup) {
if len(workerModules) == 0 {
wg = resourceService.RunSyncer(ctx, 1, syncInterval, map[string][]string{})
} else {
for _, module := range workerModules {
wg = resourceService.RunSyncer(ctx, module.Count, syncInterval, module.Scope)
}
}
return
}
21 changes: 11 additions & 10 deletions core/mocks/resource_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Store interface {

Revisions(ctx context.Context, selector RevisionsSelector) ([]Revision, error)

SyncOne(ctx context.Context, syncFn SyncFn) error
SyncOne(ctx context.Context, scope map[string][]string, syncFn SyncFn) error
}

type SyncFn func(ctx context.Context, res Resource) (*Resource, error)
Expand Down
21 changes: 19 additions & 2 deletions core/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package core

import (
"context"
"sync"
"time"

"go.uber.org/zap"
Expand All @@ -12,7 +13,7 @@ import (

// RunSyncer runs the syncer thread that keeps performing resource-sync at
// regular intervals.
func (svc *Service) RunSyncer(ctx context.Context, interval time.Duration) error {
func (svc *Service) runSyncer(ctx context.Context, _ int, interval time.Duration, scope map[string][]string) error {
tick := time.NewTimer(interval)
defer tick.Stop()

Expand All @@ -24,14 +25,30 @@ func (svc *Service) RunSyncer(ctx context.Context, interval time.Duration) error
case <-tick.C:
tick.Reset(interval)

err := svc.store.SyncOne(ctx, svc.handleSync)
err := svc.store.SyncOne(ctx, scope, svc.handleSync)
if err != nil {
zap.L().Warn("SyncOne() failed", zap.Error(err))
}
}
}
}

func (svc *Service) RunSyncer(ctx context.Context, workerCount int, interval time.Duration, scope map[string][]string) *sync.WaitGroup {
wg := &sync.WaitGroup{}
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

if err := svc.runSyncer(ctx, id, interval, scope); err != nil {
zap.L().Error("worker-%d failed", zap.Error(err))
}
}(i)
}

return wg
}

func (svc *Service) handleSync(ctx context.Context, res resource.Resource) (*resource.Resource, error) {
logEntry := zap.L().With(
zap.String("resource_urn", res.URN),
Expand Down
10 changes: 7 additions & 3 deletions internal/store/postgres/resource_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ func (st *Store) Delete(ctx context.Context, urn string, hooks ...resource.Mutat
return withinTx(ctx, st.db, false, deleteFn)
}

func (st *Store) SyncOne(ctx context.Context, syncFn resource.SyncFn) error {
urn, err := st.fetchResourceForSync(ctx)
func (st *Store) SyncOne(ctx context.Context, scope map[string][]string, syncFn resource.SyncFn) error {
urn, err := st.fetchResourceForSync(ctx, scope)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
// No resource available for sync.
Expand Down Expand Up @@ -298,7 +298,7 @@ func (st *Store) handleDequeued(baseCtx context.Context, res resource.Resource,
return fn(runCtx, res)
}

func (st *Store) fetchResourceForSync(ctx context.Context) (string, error) {
func (st *Store) fetchResourceForSync(ctx context.Context, scope map[string][]string) (string, error) {
var urn string

// find a resource ready for sync, extend it next sync time atomically.
Expand All @@ -310,6 +310,10 @@ func (st *Store) fetchResourceForSync(ctx context.Context) (string, error) {
Where(sq.Expr("state_next_sync <= current_timestamp")).
Suffix("FOR UPDATE SKIP LOCKED")

for key, value := range scope {
builder = builder.Where(sq.Eq{key: value})
}

query, args, err := builder.PlaceholderFormat(sq.Dollar).ToSql()
if err != nil {
return err
Expand Down

0 comments on commit 4895f60

Please sign in to comment.