Skip to content

Commit

Permalink
upd edge sync (#206)
Browse files Browse the repository at this point in the history
* upd edge sync

* remove replace

* fix linter issue
  • Loading branch information
gertd authored Dec 8, 2023
1 parent 67e4f36 commit 47ad726
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 30 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/mennanov/fmutils v0.2.1
github.com/mitchellh/mapstructure v1.5.0
github.com/open-policy-agent/opa v0.58.0
github.com/panmari/cuckoofilter v1.0.6
github.com/pkg/errors v0.9.1
github.com/rs/zerolog v1.31.0
github.com/spf13/cobra v1.8.0
Expand Down Expand Up @@ -76,6 +77,7 @@ require (
github.com/containerd/log v0.1.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-http-utils/headers v0.0.0-20181008091004-fed159eddc2a // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etly
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E=
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g=
github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
Expand Down Expand Up @@ -791,6 +793,8 @@ github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3I
github.com/opencontainers/image-spec v1.1.0-rc5 h1:Ygwkfw9bpDvs+c9E34SdgGOj41dX/cbdlwvlWt0pnFI=
github.com/opencontainers/image-spec v1.1.0-rc5/go.mod h1:X4pATf0uXsnn3g5aiGIsVnJBR4mxhKzfwmvK/B2NTm8=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/panmari/cuckoofilter v1.0.6 h1:WKb1aSj16h22x0CKVtTCaRkJiCnVGPLEMGbNY8xwXf8=
github.com/panmari/cuckoofilter v1.0.6/go.mod h1:bKADbQPGbN6TxUvo/IbMEIUbKuASnpsOvrLTgpSX0aU=
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
43 changes: 27 additions & 16 deletions plugins/edge/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,7 @@ func (p *Plugin) Start(ctx context.Context) error {

p.manager.UpdatePluginStatus(PluginName, &plugins.Status{State: plugins.StateOK})

// start first run after 15 sec delay.
go p.scheduler(time.NewTicker(
time.Duration(15) * time.Second),
)
go p.scheduler()

return nil
}
Expand All @@ -87,9 +84,7 @@ func (p *Plugin) Reconfigure(ctx context.Context, config interface{}) {
if p.config.Enabled != newConfig.Enabled {
p.logger.Info().Str("id", p.manager.ID).Bool("old", p.config.Enabled).Bool("new", newConfig.Enabled).Msg("sync enabled changed")
if newConfig.Enabled {
go p.scheduler(time.NewTicker(
time.Duration(newConfig.SyncInterval) * time.Minute),
)
go p.scheduler()
} else {
p.cancel()
}
Expand All @@ -104,10 +99,19 @@ func (p *Plugin) SyncNow() {
p.syncNow <- true
}

func (p *Plugin) scheduler(interval *time.Ticker) {
const cycles int = 4

func (p *Plugin) scheduler() {
// p.config.SyncInterval 1m-60m
// 1m -> 60s -> 15s interval
// 5m -> 300s -> 75s interval
// 60m -> 3600s -> 900s interval
waitInSec := (p.config.SyncInterval * 60) / cycles

interval := time.NewTicker(15 * time.Second)
defer interval.Stop()

wait := time.Duration(p.config.SyncInterval) * time.Minute
cycle := cycles

for {
select {
Expand All @@ -117,20 +121,27 @@ func (p *Plugin) scheduler(interval *time.Ticker) {
case t := <-interval.C:
p.logger.Info().Time("dispatch", t).Msg(syncScheduler)
interval.Stop()
p.task()
interval.Reset(time.Duration(p.config.SyncInterval) * time.Minute)

p.task(cycle%cycles == 0)
if cycle%cycles == 0 {
cycle = 0
}
cycle++

wait := time.Duration(waitInSec) * time.Second
interval.Reset(wait)
p.logger.Info().Str("interval", wait.String()).Time("next-run", time.Now().Add(wait)).Msg(syncScheduler)

case <-p.syncNow:
p.logger.Info().Msg("run-now")
interval.Stop()
p.task()
interval.Reset(time.Duration(p.config.SyncInterval) * time.Minute)
p.logger.Info().Str("interval", wait.String()).Time("next-run", time.Now().Add(wait)).Msg(syncScheduler)

p.task(true)
}
}
}

func (p *Plugin) task() {
func (p *Plugin) task(fullSync bool) {
p.logger.Info().Str(status, started).Msg(syncTask)

defer func() {
Expand All @@ -144,7 +155,7 @@ func (p *Plugin) task() {
}

sync := NewSyncMgr(p.config, p.topazConfig, p.logger)
sync.Run()
sync.Run(fullSync)

p.logger.Info().Str(status, finished).Msg(syncTask)
}
Loading

0 comments on commit 47ad726

Please sign in to comment.