Skip to content

Commit

Permalink
feat: configurable sync worker job timeout (#71)
Browse files Browse the repository at this point in the history
Co-authored-by: Hermawan Wijaya <[email protected]>
  • Loading branch information
batrov and batrov authored Jan 5, 2024
1 parent 4cbcaf2 commit 4c73dc8
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 4 deletions.
3 changes: 3 additions & 0 deletions compass.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ worker:
username: compass
password: compass_password
job_manager_port: 8085
sync_job_timeout: 15m
index_job_timeout: 5s
delete_job_timeout: 5s

client:
host: localhost:8081
Expand Down
7 changes: 3 additions & 4 deletions internal/workermanager/discovery_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/goto/compass/core/asset"
"github.com/goto/compass/pkg/worker"
Expand Down Expand Up @@ -41,7 +40,7 @@ func (m *Manager) indexAssetHandler() worker.JobHandler {
Handle: m.IndexAsset,
JobOpts: worker.JobOptions{
MaxAttempts: 3,
Timeout: 5 * time.Second,
Timeout: m.indexTimeout,
BackoffStrategy: worker.DefaultExponentialBackoff,
},
}
Expand All @@ -52,7 +51,7 @@ func (m *Manager) syncAssetHandler() worker.JobHandler {
Handle: m.SyncAssets,
JobOpts: worker.JobOptions{
MaxAttempts: 1,
Timeout: 5 * time.Minute,
Timeout: m.syncTimeout,
BackoffStrategy: worker.DefaultExponentialBackoff,
},
}
Expand Down Expand Up @@ -143,7 +142,7 @@ func (m *Manager) deleteAssetHandler() worker.JobHandler {
Handle: m.DeleteAsset,
JobOpts: worker.JobOptions{
MaxAttempts: 3,
Timeout: 5 * time.Second,
Timeout: m.deleteTimeout,
BackoffStrategy: worker.DefaultExponentialBackoff,
},
}
Expand Down
9 changes: 9 additions & 0 deletions internal/workermanager/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type Manager struct {
discoveryRepo DiscoveryRepository
assetRepo asset.Repository
logger log.Logger
syncTimeout time.Duration
indexTimeout time.Duration
deleteTimeout time.Duration
}

//go:generate mockery --name=Worker -r --case underscore --with-expecter --structname Worker --filename worker_mock.go --output=./mocks
Expand All @@ -44,6 +47,9 @@ type Config struct {
ActivePollPercent float64 `mapstructure:"active_poll_percent" default:"20"`
PGQ pgq.Config `mapstructure:"pgq"`
JobManagerPort int `mapstructure:"job_manager_port"`
SyncJobTimeout time.Duration `mapstructure:"sync_job_timeout" default:"15m"`
IndexJobTimeout time.Duration `mapstructure:"index_job_timeout" default:"5s"`
DeleteJobTimeout time.Duration `mapstructure:"delete_job_timeout" default:"5s"`
}

type Deps struct {
Expand Down Expand Up @@ -77,6 +83,9 @@ func New(ctx context.Context, deps Deps) (*Manager, error) {
discoveryRepo: deps.DiscoveryRepo,
assetRepo: deps.AssetRepo,
logger: deps.Logger,
syncTimeout: cfg.SyncJobTimeout,
indexTimeout: cfg.IndexJobTimeout,
deleteTimeout: cfg.DeleteJobTimeout,
}, nil
}

Expand Down

0 comments on commit 4c73dc8

Please sign in to comment.