From 47ad726d92fedabce0a1e47140f1bf5d854603a0 Mon Sep 17 00:00:00 2001 From: Gert Drapers Date: Thu, 7 Dec 2023 22:48:35 -0800 Subject: [PATCH] upd edge sync (#206) * upd edge sync * remove replace * fix linter issue --- go.mod | 2 + go.sum | 4 + plugins/edge/plugin.go | 43 +++++++---- plugins/edge/sync.go | 165 +++++++++++++++++++++++++++++++++++++---- 4 files changed, 184 insertions(+), 30 deletions(-) diff --git a/go.mod b/go.mod index 6f86bdd8..a3afdc7d 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,7 @@ require ( github.com/mennanov/fmutils v0.2.1 github.com/mitchellh/mapstructure v1.5.0 github.com/open-policy-agent/opa v0.58.0 + github.com/panmari/cuckoofilter v1.0.6 github.com/pkg/errors v0.9.1 github.com/rs/zerolog v1.31.0 github.com/spf13/cobra v1.8.0 @@ -76,6 +77,7 @@ require ( github.com/containerd/log v0.1.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect + github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-http-utils/headers v0.0.0-20181008091004-fed159eddc2a // indirect diff --git a/go.sum b/go.sum index 763d6a83..fa53484f 100644 --- a/go.sum +++ b/go.sum @@ -504,6 +504,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etly github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= +github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 h1:BS21ZUJ/B5X2UVUbczfmdWH7GapPWAhxcMsDnjJTU1E= +github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48 h1:fRzb/w+pyskVMQ+UbP35JkH8yB7MYb4q/qhBarqZE6g= github.com/dgryski/trifles v0.0.0-20200323201526-dd97f9abfb48/go.mod h1:if7Fbed8SFyPtHLHbg49SI7NAdJiC5WIA09pe59rfAA= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= @@ -791,6 +793,8 @@ github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3I github.com/opencontainers/image-spec v1.1.0-rc5 h1:Ygwkfw9bpDvs+c9E34SdgGOj41dX/cbdlwvlWt0pnFI= github.com/opencontainers/image-spec v1.1.0-rc5/go.mod h1:X4pATf0uXsnn3g5aiGIsVnJBR4mxhKzfwmvK/B2NTm8= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/panmari/cuckoofilter v1.0.6 h1:WKb1aSj16h22x0CKVtTCaRkJiCnVGPLEMGbNY8xwXf8= +github.com/panmari/cuckoofilter v1.0.6/go.mod h1:bKADbQPGbN6TxUvo/IbMEIUbKuASnpsOvrLTgpSX0aU= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/plugins/edge/plugin.go b/plugins/edge/plugin.go index c64c63a0..da794d5f 100644 --- a/plugins/edge/plugin.go +++ b/plugins/edge/plugin.go @@ -63,10 +63,7 @@ func (p *Plugin) Start(ctx context.Context) error { p.manager.UpdatePluginStatus(PluginName, &plugins.Status{State: plugins.StateOK}) - // start first run after 15 sec delay. - go p.scheduler(time.NewTicker( - time.Duration(15) * time.Second), - ) + go p.scheduler() return nil } @@ -87,9 +84,7 @@ func (p *Plugin) Reconfigure(ctx context.Context, config interface{}) { if p.config.Enabled != newConfig.Enabled { p.logger.Info().Str("id", p.manager.ID).Bool("old", p.config.Enabled).Bool("new", newConfig.Enabled).Msg("sync enabled changed") if newConfig.Enabled { - go p.scheduler(time.NewTicker( - time.Duration(newConfig.SyncInterval) * time.Minute), - ) + go p.scheduler() } else { p.cancel() } @@ -104,10 +99,19 @@ func (p *Plugin) SyncNow() { p.syncNow <- true } -func (p *Plugin) scheduler(interval *time.Ticker) { +const cycles int = 4 + +func (p *Plugin) scheduler() { + // p.config.SyncInterval 1m-60m + // 1m -> 60s -> 15s interval + // 5m -> 300s -> 75s interval + // 60m -> 3600s -> 900s interval + waitInSec := (p.config.SyncInterval * 60) / cycles + + interval := time.NewTicker(15 * time.Second) defer interval.Stop() - wait := time.Duration(p.config.SyncInterval) * time.Minute + cycle := cycles for { select { @@ -117,20 +121,27 @@ func (p *Plugin) scheduler(interval *time.Ticker) { case t := <-interval.C: p.logger.Info().Time("dispatch", t).Msg(syncScheduler) interval.Stop() - p.task() - interval.Reset(time.Duration(p.config.SyncInterval) * time.Minute) + + p.task(cycle%cycles == 0) + if cycle%cycles == 0 { + cycle = 0 + } + cycle++ + + wait := time.Duration(waitInSec) * time.Second + interval.Reset(wait) p.logger.Info().Str("interval", wait.String()).Time("next-run", time.Now().Add(wait)).Msg(syncScheduler) + case <-p.syncNow: p.logger.Info().Msg("run-now") interval.Stop() - p.task() - interval.Reset(time.Duration(p.config.SyncInterval) * time.Minute) - p.logger.Info().Str("interval", wait.String()).Time("next-run", time.Now().Add(wait)).Msg(syncScheduler) + + p.task(true) } } } -func (p *Plugin) task() { +func (p *Plugin) task(fullSync bool) { p.logger.Info().Str(status, started).Msg(syncTask) defer func() { @@ -144,7 +155,7 @@ func (p *Plugin) task() { } sync := NewSyncMgr(p.config, p.topazConfig, p.logger) - sync.Run() + sync.Run(fullSync) p.logger.Info().Str(status, finished).Msg(syncTask) } diff --git a/plugins/edge/sync.go b/plugins/edge/sync.go index 0b55cde7..5b8f5269 100644 --- a/plugins/edge/sync.go +++ b/plugins/edge/sync.go @@ -10,15 +10,19 @@ import ( "sync/atomic" "time" + dsc3 "github.com/aserto-dev/go-directory/aserto/directory/common/v3" dse3 "github.com/aserto-dev/go-directory/aserto/directory/exporter/v3" dsi3 "github.com/aserto-dev/go-directory/aserto/directory/importer/v3" dsm3 "github.com/aserto-dev/go-directory/aserto/directory/model/v3" dsr3 "github.com/aserto-dev/go-directory/aserto/directory/reader/v3" dsw3 "github.com/aserto-dev/go-directory/aserto/directory/writer/v3" + "github.com/aserto-dev/go-edge-ds/pkg/ds" + "github.com/pkg/errors" "github.com/aserto-dev/go-aserto/client" topaz "github.com/aserto-dev/topaz/pkg/cc/config" + cuckoo "github.com/panmari/cuckoofilter" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -55,6 +59,8 @@ type Sync struct { log *zerolog.Logger exportChan chan *dse3.ExportResponse errChan chan error + filter *cuckoo.Filter + counts *Counter } type Counter struct { @@ -83,9 +89,9 @@ func NewSyncMgr(c *Config, topazConfig *topaz.Config, logger *zerolog.Logger) *S } } -func (s *Sync) Run() { +func (s *Sync) Run(fs bool) { runStartTime := time.Now().UTC() - s.log.Info().Str(status, started).Msg(syncRun) + s.log.Info().Str(status, started).Bool("full-sync", fs).Msg(syncRun) defer func() { close(s.errChan) @@ -98,18 +104,27 @@ func (s *Sync) Run() { } }() + s.counts = &Counter{} + if err := s.syncManifest(); err != nil { s.log.Error().Str("sync-manifest", "").Err(err).Msg(syncRun) } g := new(errgroup.Group) + watermark := s.getWatermark() + + if fs { + watermark = ×tamppb.Timestamp{} + s.filter = cuckoo.NewFilter(1000000) + } + g.Go(func() error { return s.subscriber() }) g.Go(func() error { - return s.producer() + return s.producer(watermark) }) err := g.Wait() @@ -117,11 +132,22 @@ func (s *Sync) Run() { s.log.Error().Err(err).Msg("sync run failed") } + if fs { + if err := s.diff(); err != nil { + s.log.Error().Err(err).Msg("failed to diff") + } + } + runEndTime := time.Now().UTC() s.log.Info().Str(status, finished).Str("duration", runEndTime.Sub(runStartTime).String()).Msg(syncRun) } -func (s *Sync) producer() error { +func (s *Sync) producer(watermark *timestamppb.Timestamp) error { + defer func() { + s.log.Debug().Msg("producer closed export channel") + close(s.exportChan) + }() + dsc, err := s.getRemoteDirectoryClient() if err != nil { s.log.Error().Err(err).Msgf("%s - failed to get directory connection", syncProducer) @@ -131,8 +157,6 @@ func (s *Sync) producer() error { counts := Counter{} s.log.Info().Str(status, started).Msg(syncProducer) - watermark := s.getWatermark() - stream, err := dsc.Exporter.Export(s.ctx, &dse3.ExportRequest{ Options: uint32(dse3.Option_OPTION_DATA), StartFrom: watermark, @@ -152,11 +176,17 @@ func (s *Sync) producer() error { atomic.AddInt32(&counts.Received, 1) - switch msg.Msg.(type) { + switch m := msg.Msg.(type) { case *dse3.ExportResponse_Object: atomic.AddInt32(&counts.Objects, 1) + if fullSync(watermark) { + s.filter.Insert(getObjectKey(m.Object)) + } case *dse3.ExportResponse_Relation: atomic.AddInt32(&counts.Relations, 1) + if fullSync(watermark) { + s.filter.Insert(getRelationKey(m.Relation)) + } default: s.log.Debug().Msg("producer unknown message type") } @@ -164,9 +194,6 @@ func (s *Sync) producer() error { s.exportChan <- msg } - s.log.Debug().Msg("producer closed export channel") - close(s.exportChan) - s.log.Info().Str(status, finished).Int32("received", counts.Received).Int32("objects", counts.Objects). Int32("relations", counts.Relations).Int32("errors", counts.Errors).Msg(syncProducer) @@ -185,7 +212,7 @@ func (s *Sync) subscriber() error { watermark := s.getWatermark() - stream, err := dsc.Importer.Import(s.ctx) + writer, err := dsc.Importer.Import(s.ctx) if err != nil { s.log.Error().Err(err).Msgf("subscriber - failed to setup import stream") return err @@ -202,7 +229,7 @@ func (s *Sync) subscriber() error { switch m := msg.Msg.(type) { case *dse3.ExportResponse_Object: - if err := stream.Send(&dsi3.ImportRequest{ + if err := writer.Send(&dsi3.ImportRequest{ OpCode: dsi3.Opcode_OPCODE_SET, Msg: &dsi3.ImportRequest_Object{Object: m.Object}, }); err == nil { @@ -216,7 +243,7 @@ func (s *Sync) subscriber() error { } case *dse3.ExportResponse_Relation: - if err := stream.Send(&dsi3.ImportRequest{ + if err := writer.Send(&dsi3.ImportRequest{ OpCode: dsi3.Opcode_OPCODE_SET, Msg: &dsi3.ImportRequest_Relation{Relation: m.Relation}, }); err == nil { @@ -234,7 +261,7 @@ func (s *Sync) subscriber() error { } } - if err := stream.CloseSend(); err != nil { + if err := writer.CloseSend(); err != nil { return err } @@ -470,3 +497,113 @@ func (s *Sync) setManifest(mc dsm3.ModelClient, r io.Reader) error { return nil } + +func getObjectKey(obj *dsc3.Object) []byte { + return []byte(fmt.Sprintf("%s:%s", obj.Type, obj.Id)) +} + +func getRelationKey(rel *dsc3.Relation) []byte { + return []byte(fmt.Sprintf("%s:%s#%s@%s:%s%s", + rel.ObjectType, rel.ObjectId, rel.Relation, rel.SubjectType, rel.SubjectId, + ds.Iff(rel.SubjectRelation == "", "", fmt.Sprintf("#%s", rel.SubjectRelation)))) +} + +func fullSync(watermark *timestamppb.Timestamp) bool { + return (watermark.Seconds == 0 && watermark.Nanos == 0) +} + +func (s *Sync) diff() error { + if s.filter == nil { + return errors.New("filter not initialized") + } + + dsc, err := s.getLocalDirectoryClient() + if err != nil { + s.log.Error().Err(err).Msgf("diff - failed to create directory client") + return err + } + + writer, err := dsc.Importer.Import(s.ctx) + if err != nil { + s.log.Error().Err(err).Msgf("subscriber - failed to setup import stream") + return err + } + + objDeleted := 0 + relDeleted := 0 + + { + reader, err := dsc.Exporter.Export(s.ctx, &dse3.ExportRequest{StartFrom: ×tamppb.Timestamp{}, Options: uint32(dse3.Option_OPTION_DATA_OBJECTS)}) + if err != nil { + return err + } + + for { + msg, err := reader.Recv() + if err == io.EOF { + break + } + if err != nil { + return err + } + obj := msg.Msg.(*dse3.ExportResponse_Object).Object + if !s.filter.Lookup(getObjectKey(obj)) { + s.log.Trace().Str("key", string(getObjectKey(obj))).Msg("delete") + + if err := writer.Send(&dsi3.ImportRequest{ + OpCode: dsi3.Opcode_OPCODE_DELETE, + Msg: &dsi3.ImportRequest_Object{Object: obj}, + }); err == nil { + atomic.AddInt32(&s.counts.Deletes, 1) + objDeleted++ + } else { + s.log.Error().Err(err).Msgf("failed to delete object %v", obj) + s.errChan <- err + atomic.AddInt32(&s.counts.Errors, 1) + } + } + } + } + + { + reader, err := dsc.Exporter.Export(s.ctx, &dse3.ExportRequest{StartFrom: ×tamppb.Timestamp{}, Options: uint32(dse3.Option_OPTION_DATA_RELATIONS)}) + if err != nil { + return err + } + for { + msg, err := reader.Recv() + if err == io.EOF { + break + } + if err != nil { + return err + } + rel := msg.Msg.(*dse3.ExportResponse_Relation).Relation + if !s.filter.Lookup(getRelationKey(rel)) { + s.log.Trace().Str("key", string(getRelationKey(rel))).Msg("delete") + + if err := writer.Send(&dsi3.ImportRequest{ + OpCode: dsi3.Opcode_OPCODE_DELETE, + Msg: &dsi3.ImportRequest_Relation{Relation: rel}, + }); err == nil { + atomic.AddInt32(&s.counts.Deletes, 1) + relDeleted++ + } else { + s.log.Error().Err(err).Msgf("failed to delete relation %v", rel) + s.errChan <- err + atomic.AddInt32(&s.counts.Errors, 1) + } + } + } + } + + if err := writer.CloseSend(); err != nil { + return err + } + + s.filter = nil + + s.log.Info().Int("obj", objDeleted).Int("rel", relDeleted).Msg("diff") + + return nil +}