From 326f7dc1a7d4e5d26ab684a4cbccfcd7ae1e65d4 Mon Sep 17 00:00:00 2001 From: takt Date: Tue, 21 Mar 2023 09:10:42 +0100 Subject: [PATCH] Implement concurrent data retrieval (#128) Co-authored-by: Oliver Geiselhardt-Herms --- Makefile | 2 +- cmd/octorpki/octorpki.go | 297 +++++++++++++++++++--------------- cmd/octorpki/rrdp_fetcher.go | 57 +++++++ cmd/octorpki/rsync_fetcher.go | 49 ++++++ sync/lib/rsync.go | 2 +- 5 files changed, 274 insertions(+), 133 deletions(-) create mode 100644 cmd/octorpki/rrdp_fetcher.go create mode 100644 cmd/octorpki/rsync_fetcher.go diff --git a/Makefile b/Makefile index 6550a0c..44fc226 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,7 @@ clean: .PHONY: build-octorpki build-octorpki: prepare - go build -ldflags $(LDFLAGS) -o $(OUTPUT_OCTORPKI) cmd/octorpki/octorpki.go + go build -ldflags $(LDFLAGS) -o $(OUTPUT_OCTORPKI) cmd/octorpki/*.go .PHONY: docker-octorpki docker-octorpki: diff --git a/cmd/octorpki/octorpki.go b/cmd/octorpki/octorpki.go index 7729868..8357b36 100644 --- a/cmd/octorpki/octorpki.go +++ b/cmd/octorpki/octorpki.go @@ -13,6 +13,7 @@ import ( "fmt" "io/ioutil" "net/http" + "net/http/pprof" "os" "os/exec" "path/filepath" @@ -22,23 +23,17 @@ import ( "sync/atomic" "time" - syncpki "github.com/cloudflare/cfrpki/sync/lib" - librpki "github.com/cloudflare/cfrpki/validator/lib" "github.com/cloudflare/cfrpki/validator/pki" - - "github.com/rs/cors" - + "github.com/cloudflare/gortr/prefixfile" + "github.com/getsentry/sentry-go" + "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/rs/cors" - "github.com/cloudflare/gortr/prefixfile" + syncpki "github.com/cloudflare/cfrpki/sync/lib" + librpki "github.com/cloudflare/cfrpki/validator/lib" log "github.com/sirupsen/logrus" - - // Debugging - "net/http/pprof" - - "github.com/getsentry/sentry-go" - "github.com/opentracing/opentracing-go" jcfg "github.com/uber/jaeger-client-go/config" ) @@ -95,6 +90,8 @@ var ( Tracer = flag.Bool("tracer", false, "Enable tracer") SentryDSN = flag.String("sentry.dsn", "", "Send errors to Sentry") + MaxConcurrentRetrievals = flag.Uint("max_concurrent_retrievals", 100, "Maximum amount of concurrent retrievals (rsync + RRDP)") + Version = flag.Bool("version", false, "Print version") CertRepository = asn1.ObjectIdentifier{1, 3, 6, 1, 5, 5, 7, 48, 5} @@ -202,28 +199,10 @@ func ReadKey(key []byte, isPem bool) (*ecdsa.PrivateKey, error) { return k, nil } -type Stats struct { - URI string `json:"uri"` - Count int `json:"file-count"` - Iteration int `json:"iteration"` - Errors int `json:"errors"` - Duration float64 `json:"duration"` - - LastFetch int `json:"last-fetch"` - LastFetchError int `json:"last-fetch-error,omitempty"` - - RRDPSerial int64 `json:"rrdp-serial,omitempty"` - RRDPSessionID string `json:"rrdp-sessionid,omitempty"` - RRDPLastFile string `json:"rrdp-last-file,omitempty"` - - LastError string `json:"last-error,omitempty"` -} - type OctoRPKI struct { - Tals []*pki.PKIFile - TalsFetch map[string]*librpki.RPKITAL - TalNames []string - + Tals []*pki.PKIFile + TalsFetch map[string]*librpki.RPKITAL + TalNames []string LastComputed time.Time Key *ecdsa.PrivateKey @@ -235,12 +214,15 @@ type OctoRPKI struct { PrevRepos map[string]time.Time CurrentRepos map[string]time.Time - RRDPFetch map[string]string // maps from RRDP Url to rsync URL - RRDPFetchDomain map[string]string + rrdpFetch map[string]string // maps from RRDP Url to rsync URL + rrdpFetchMu sync.RWMutex + rrdpFetchDomain map[string]string + rrdpFetchDomainMu sync.RWMutex - RsyncFetchJobs map[string]string + rsyncFetchJobManager *rsyncFetchJobManager - RRDPInfo map[string]RRDPInfo + RRDPInfo map[string]RRDPInfo + RRDPInfoMu sync.RWMutex ROAList *prefixfile.ROAList ROAListMu sync.RWMutex @@ -252,18 +234,48 @@ type OctoRPKI struct { tracer opentracing.Tracer } +func (s *OctoRPKI) getRRDPFetch() map[string]string { + s.rrdpFetchMu.RLock() + defer s.rrdpFetchMu.RUnlock() + + ret := make(map[string]string, len(s.rrdpFetch)) + for k, v := range s.rrdpFetch { + ret[k] = v + } + + return ret +} + +func (s *OctoRPKI) setRRDPFetch(key, value string) { + s.rrdpFetchMu.Lock() + defer s.rrdpFetchMu.Unlock() + + s.rrdpFetch[key] = value +} + +func (s *OctoRPKI) getRRDPDomain(path string) (string, bool) { + s.rrdpFetchDomainMu.RLock() + defer s.rrdpFetchDomainMu.RUnlock() + + domain, exists := s.rrdpFetchDomain[path] + return domain, exists +} + +func (s *OctoRPKI) setRRDPDomain(path string, domain string) { + s.rrdpFetchDomainMu.Lock() + defer s.rrdpFetchDomainMu.Unlock() + + s.rrdpFetchDomain[path] = domain +} + type octoRPKIStats struct { - RRDPStats map[string]*Stats - RsyncStats map[string]*Stats ValidationDuration time.Duration - Iteration int + iterations atomic.Uint64 ROAsTALsCount []ROAsTAL } func newOctoRPKIStats() *octoRPKIStats { return &octoRPKIStats{ - RsyncStats: make(map[string]*Stats), - RRDPStats: make(map[string]*Stats), ROAsTALsCount: make([]ROAsTAL, 0), } } @@ -285,7 +297,6 @@ func (s *OctoRPKI) MainReduce() bool { } // Init deletion of folder if missing from current - s.Fetcher.SetRepositories(s.CurrentRepos) if len(s.PrevRepos) != len(s.CurrentRepos) { @@ -353,8 +364,6 @@ func (s *OctoRPKI) ReceiveRRDPFileCallback(main string, url string, path string, } MetricSIACounts.With(prometheus.Labels{"address": main, "type": "rrdp"}).Inc() - s.stats.RRDPStats[main].Count++ - s.stats.RRDPStats[main].RRDPLastFile = url return nil } @@ -364,6 +373,9 @@ func (s *OctoRPKI) LoadRRDPInfo(file string) error { return fmt.Errorf("Unable to read file %q: %v", file, err) } + s.RRDPInfoMu.Lock() + defer s.RRDPInfoMu.Unlock() + s.RRDPInfo = make(map[string]RRDPInfo) err = json.Unmarshal(fc, &s.RRDPInfo) if err != nil { @@ -374,7 +386,7 @@ func (s *OctoRPKI) LoadRRDPInfo(file string) error { } func (s *OctoRPKI) saveRRDPInfo(file string) error { - fc, err := json.Marshal(s.RRDPInfo) + fc, err := json.Marshal(s.getRRDPInfo()) if err != nil { return fmt.Errorf("JSON marshal failed: %v", err) } @@ -387,13 +399,29 @@ func (s *OctoRPKI) saveRRDPInfo(file string) error { return nil } +func (s *OctoRPKI) getRRDPInfo() map[string]RRDPInfo { + s.RRDPInfoMu.RLock() + defer s.RRDPInfoMu.RUnlock() + + ret := make(map[string]RRDPInfo, len(s.RRDPInfo)) + for k, v := range s.RRDPInfo { + ret[k] = v + } + + return ret +} + func (s *OctoRPKI) mainRRDP(pSpan opentracing.Span) { span := s.tracer.StartSpan("rrdp", opentracing.ChildOf(pSpan.Context())) defer span.Finish() - for path, rsync := range s.RRDPFetch { - s.fetchRRDP(path, rsync, span) + fetcher := newRRDPFetcher(s, int(*MaxConcurrentRetrievals), span) + for path, rsync := range s.getRRDPFetch() { + fetcher.fetch(path, rsync) } + + fetcher.done() + fetcher.wait() } func (s *OctoRPKI) fetchRRDP(path string, rsyncURL string, span opentracing.Span) { @@ -407,56 +435,45 @@ func (s *OctoRPKI) fetchRRDP(path string, rsyncURL string, span opentracing.Span MetricSIACounts.With(prometheus.Labels{"address": path, "type": "rrdp"}).Set(0) - if _, exists := s.stats.RRDPStats[path]; !exists { - s.stats.RRDPStats[path] = &Stats{} - } - - s.stats.RRDPStats[path].URI = path - s.stats.RRDPStats[path].Iteration++ - s.stats.RRDPStats[path].Count = 0 - - t1 := time.Now() - - rrdp := s.getRRDPSystem(path, rsyncURL) + rrdpSystem := s.newRRDPSystem(path, rsyncURL) - err := rrdp.FetchRRDP(s.RRDPFetchDomain[path]) - t2 := time.Now() + domain, _ := s.getRRDPDomain(path) + err := rrdpSystem.FetchRRDP(domain) if err != nil { - s.rrdpError(rsyncURL, path, err, t1, t2, rSpan, rrdp) + s.rrdpError(rsyncURL, path, err, rSpan, rrdpSystem) return } log.Debugf("Success fetching %s, removing rsync %s", path, rsyncURL) - delete(s.RsyncFetchJobs, rsyncURL) + s.rsyncFetchJobManager.delete(rsyncURL) rSpan.LogKV("event", "rrdp", "type", "success", "message", "rrdp successfully fetched") sentry.WithScope(func(scope *sentry.Scope) { scope.SetLevel(sentry.LevelInfo) scope.SetTag("Rsync", rsyncURL) scope.SetTag("RRDP", path) - rrdp.SetSentryScope(scope) + rrdpSystem.SetSentryScope(scope) sentry.CaptureMessage("fetched rrdp successfully") }) - MetricRRDPSerial.With(prometheus.Labels{"address": path}).Set(float64(rrdp.Serial)) - - lastFetch := time.Now().Unix() - MetricLastFetch.With(prometheus.Labels{"address": path, "type": "rrdp"}).Set(float64(lastFetch)) + MetricRRDPSerial.With(prometheus.Labels{"address": path}).Set(float64(rrdpSystem.Serial)) + MetricLastFetch.With(prometheus.Labels{"address": path, "type": "rrdp"}).Set(float64(time.Now().Unix())) - s.stats.RRDPStats[path].LastFetch = int(lastFetch) - s.stats.RRDPStats[path].RRDPSerial = rrdp.Serial - s.stats.RRDPStats[path].RRDPSessionID = rrdp.SessionID - s.stats.RRDPStats[path].Duration = t2.Sub(t1).Seconds() + s.RRDPInfoMu.Lock() + defer s.RRDPInfoMu.Unlock() s.RRDPInfo[rsyncURL] = RRDPInfo{ RsyncURL: rsyncURL, Path: path, - SessionID: rrdp.SessionID, - Serial: rrdp.Serial, + SessionID: rrdpSystem.SessionID, + Serial: rrdpSystem.Serial, } } -func (s *OctoRPKI) getRRDPSystem(path string, rsync string) *syncpki.RRDPSystem { +func (s *OctoRPKI) newRRDPSystem(path string, rsync string) *syncpki.RRDPSystem { + s.RRDPInfoMu.RLock() + defer s.RRDPInfoMu.RUnlock() + return &syncpki.RRDPSystem{ Callback: s.ReceiveRRDPFileCallback, Path: path, @@ -467,7 +484,7 @@ func (s *OctoRPKI) getRRDPSystem(path string, rsync string) *syncpki.RRDPSystem } } -func (s *OctoRPKI) rrdpError(rsyncURL string, path string, err error, t1 time.Time, t2 time.Time, rSpan opentracing.Span, rrdp *syncpki.RRDPSystem) { +func (s *OctoRPKI) rrdpError(rsyncURL string, path string, err error, rSpan opentracing.Span, rrdp *syncpki.RRDPSystem) { rSpan.SetTag("error", true) sentry.WithScope(func(scope *sentry.Scope) { if errC, ok := err.(interface{ SetURL(string, string) }); ok { @@ -489,14 +506,10 @@ func (s *OctoRPKI) rrdpError(rsyncURL string, path string, err error, t1 time.Ti } else { log.Errorf("Error when processing %v (for %v): %v.Skipping failover to rsync.", path, rsyncURL, err) rSpan.LogKV("event", "rrdp failure", "type", "skipping failover to rsync", "message", err) - delete(s.RsyncFetchJobs, rsyncURL) + s.rsyncFetchJobManager.delete(rsyncURL) } MetricRRDPErrors.With(prometheus.Labels{"address": path}).Inc() - s.stats.RRDPStats[path].Errors++ - s.stats.RRDPStats[path].LastFetchError = int(time.Now().Unix()) - s.stats.RRDPStats[path].LastError = err.Error() - s.stats.RRDPStats[path].Duration = t2.Sub(t1).Seconds() } func (s *OctoRPKI) mainRsync(pSpan opentracing.Span) { @@ -504,10 +517,14 @@ func (s *OctoRPKI) mainRsync(pSpan opentracing.Span) { span := s.tracer.StartSpan("rsync", opentracing.ChildOf(pSpan.Context())) defer span.Finish() - for uri := range s.RsyncFetchJobs { - s.fetchRsync(uri, span) + fetcher := newRsyncFetcher(s, int(*MaxConcurrentRetrievals), span) + for rsyncURL := range s.rsyncFetchJobManager.get() { + fetcher.fetch(rsyncURL) } + fetcher.done() + fetcher.wait() + t2 := time.Now() MetricOperationTime.With(prometheus.Labels{"type": "rsync"}).Observe(float64(t2.Sub(t1).Seconds())) } @@ -530,21 +547,13 @@ func (s *OctoRPKI) fetchRsync(uri string, span opentracing.Span) { log.Infof("Rsync sync %v", uri) downloadPath := mustExtractFoldersPathFromRsyncURL(uri) - if _, exists := s.stats.RsyncStats[uri]; !exists { - s.stats.RsyncStats[uri] = &Stats{} - } - - s.stats.RsyncStats[uri].URI = uri - s.stats.RsyncStats[uri].Iteration++ - s.stats.RsyncStats[uri].Count = 0 - path := filepath.Join(*Basepath, downloadPath) ctxRsync, cancelRsync := context.WithTimeout(context.Background(), *RsyncTimeout) - t1 := time.Now() + defer cancelRsync() + files, err := syncpki.RunRsync(ctxRsync, uri, *RsyncBin, path) - t2 := time.Now() if err != nil { - s.rsyncError(uri, path, err, t1, t2, rSpan) + s.rsyncError(uri, path, err, rSpan) } else { rSpan.LogKV("event", "rsync", "type", "success", "message", "rsync successfully fetched") sentry.WithScope(func(scope *sentry.Scope) { @@ -553,19 +562,12 @@ func (s *OctoRPKI) fetchRsync(uri string, span opentracing.Span) { sentry.CaptureMessage("fetched rsync successfully") }) } - cancelRsync() MetricSIACounts.With(prometheus.Labels{"address": uri, "type": "rsync"}).Set(float64(len(files))) - lastFetch := time.Now().Unix() - MetricLastFetch.With(prometheus.Labels{"address": uri, "type": "rsync"}).Set(float64(lastFetch)) - - s.stats.RsyncStats[uri].LastFetch = int(lastFetch) - s.stats.RsyncStats[uri].Count = len(files) - s.stats.RsyncStats[uri].Duration = t2.Sub(t1).Seconds() - + MetricLastFetch.With(prometheus.Labels{"address": uri, "type": "rsync"}).Set(float64(time.Now().Unix())) } -func (s *OctoRPKI) rsyncError(uri string, path string, err error, t1 time.Time, t2 time.Time, rSpan opentracing.Span) { +func (s *OctoRPKI) rsyncError(uri string, path string, err error, rSpan opentracing.Span) { rSpan.SetTag("error", true) rSpan.LogKV("event", "rsync failure", "message", err) log.Errorf("Error when processing %v: %v. Will add to rsync.", path, err) @@ -581,10 +583,6 @@ func (s *OctoRPKI) rsyncError(uri string, path string, err error, t1 time.Time, }) MetricRsyncErrors.With(prometheus.Labels{"address": uri}).Inc() - - s.stats.RsyncStats[uri].Errors++ - s.stats.RsyncStats[uri].LastFetchError = int(time.Now().Unix()) - s.stats.RsyncStats[uri].LastError = err.Error() } func filterDuplicates(roalist []prefixfile.ROAJson) []prefixfile.ROAJson { @@ -634,7 +632,7 @@ func (s *OctoRPKI) fetchTAL(path string, tal *librpki.RPKITAL, span opentracing. if *RRDPFailover && tal.HasRsync() { rsync := tal.GetRsyncURI() log.Infof("Root certificate for %s will be downloaded using rsync: %s", path, rsync) - s.RsyncFetchJobs[rsync] = "" + s.rsyncFetchJobManager.set(rsync, "") tSpan.SetTag("failover-rsync", true) return } @@ -885,15 +883,15 @@ func (s *OctoRPKI) mainValidation(pSpan opentracing.Span) { } if cer.HasRRDP() { - prev, ok := s.RRDPFetchDomain[rrdpGeneralName] + prev, ok := s.getRRDPDomain(rrdpGeneralName) if ok && prev != gnExtractedDomain { log.Errorf("rrdp %s tries to override %s with %s", rrdpGeneralName, prev, gnExtractedDomain) continue } - s.RRDPFetchDomain[rrdpGeneralName] = gnExtractedDomain - s.RRDPFetch[rrdpGeneralName] = gnExtracted + s.setRRDPDomain(rrdpGeneralName, gnExtractedDomain) + s.setRRDPFetch(rrdpGeneralName, gnExtracted) } - s.RsyncFetchJobs[gnExtracted] = rrdpGeneralName + s.rsyncFetchJobManager.set(gnExtracted, rrdpGeneralName) s.CurrentRepos[gnExtracted] = time.Now() count++ @@ -1050,7 +1048,7 @@ func (s *OctoRPKI) ServeInfo(w http.ResponseWriter, r *http.Request) { Stable: s.Stable.Load(), LastValidation: int(s.LastComputed.Unix()), ValidationDuration: s.stats.ValidationDuration.Seconds(), - Iteration: s.stats.Iteration, + Iteration: int(s.stats.iterations.Load()), } enc := json.NewEncoder(w) enc.Encode(ir) @@ -1195,22 +1193,59 @@ func main() { func NewOctoRPKI(tals []*pki.PKIFile, talNames []string) *OctoRPKI { return &OctoRPKI{ - TalsFetch: make(map[string]*librpki.RPKITAL), - Tals: tals, - TalNames: talNames, - RRDPInfo: make(map[string]RRDPInfo), - PrevRepos: make(map[string]time.Time), - CurrentRepos: make(map[string]time.Time), - RsyncFetchJobs: make(map[string]string), - RRDPFetch: make(map[string]string), - RRDPFetchDomain: make(map[string]string), - Fetcher: syncpki.NewLocalFetch(*Basepath), - HTTPFetcher: syncpki.NewHTTPFetcher(*UserAgent), - ROAList: newROAList(), - stats: newOctoRPKIStats(), - InfoAuthorities: make([][]SIA, 0), - tracer: opentracing.GlobalTracer(), + TalsFetch: make(map[string]*librpki.RPKITAL), + Tals: tals, + TalNames: talNames, + RRDPInfo: make(map[string]RRDPInfo), + PrevRepos: make(map[string]time.Time), + CurrentRepos: make(map[string]time.Time), + rsyncFetchJobManager: newRsyncFetchJobManager(), + rrdpFetch: make(map[string]string), + rrdpFetchDomain: make(map[string]string), + Fetcher: syncpki.NewLocalFetch(*Basepath), + HTTPFetcher: syncpki.NewHTTPFetcher(*UserAgent), + ROAList: newROAList(), + stats: newOctoRPKIStats(), + InfoAuthorities: make([][]SIA, 0), + tracer: opentracing.GlobalTracer(), + } +} + +type rsyncFetchJobManager struct { + rsyncFetchJobs map[string]string + rsyncFetchJobsMu sync.RWMutex +} + +func newRsyncFetchJobManager() *rsyncFetchJobManager { + return &rsyncFetchJobManager{ + rsyncFetchJobs: make(map[string]string), + } +} + +func (r *rsyncFetchJobManager) delete(job string) { + r.rsyncFetchJobsMu.Lock() + defer r.rsyncFetchJobsMu.Unlock() + + delete(r.rsyncFetchJobs, job) +} + +func (r *rsyncFetchJobManager) get() map[string]string { + r.rsyncFetchJobsMu.RLock() + defer r.rsyncFetchJobsMu.RUnlock() + + ret := make(map[string]string, len(r.rsyncFetchJobs)) + for k, v := range r.rsyncFetchJobs { + ret[k] = v } + + return ret +} + +func (r *rsyncFetchJobManager) set(key, value string) { + r.rsyncFetchJobsMu.Lock() + defer r.rsyncFetchJobsMu.Unlock() + + r.rsyncFetchJobs[key] = value } func newROAList() *prefixfile.ROAList { @@ -1232,9 +1267,9 @@ func (s *OctoRPKI) validationLoop() { span := s.tracer.StartSpan("operation", opentracing.ChildOf(pSpan.Context())) - s.stats.Iteration++ + s.stats.iterations.Add(1) iterationsUntilStable++ - span.SetTag("iteration", s.stats.Iteration) + span.SetTag("iteration", s.stats.iterations.Load()) if *RRDP { s.doRRDP(span) @@ -1250,7 +1285,7 @@ func (s *OctoRPKI) validationLoop() { // Reduce changed := s.MainReduce() - s.Stable.Store(!changed && s.stats.Iteration > 1) + s.Stable.Store(!changed && s.stats.iterations.Load() > 1) s.HasPreviousStable.Store(s.Stable.Load()) if *Mode == "oneoff" && (s.Stable.Load() || !*WaitStable) { diff --git a/cmd/octorpki/rrdp_fetcher.go b/cmd/octorpki/rrdp_fetcher.go new file mode 100644 index 0000000..53d7484 --- /dev/null +++ b/cmd/octorpki/rrdp_fetcher.go @@ -0,0 +1,57 @@ +package main + +import ( + "sync" + + "github.com/opentracing/opentracing-go" +) + +type rrdpFetchJob struct { + path string + rsync string +} + +type rrdpFetcher struct { + octoRPKI *OctoRPKI + jobsCh chan rrdpFetchJob + wg sync.WaitGroup + span opentracing.Span +} + +func newRRDPFetcher(octoRPKI *OctoRPKI, workers int, span opentracing.Span) *rrdpFetcher { + rf := &rrdpFetcher{ + octoRPKI: octoRPKI, + jobsCh: make(chan rrdpFetchJob), + span: span, + } + + for i := 0; i < workers; i++ { + rf.wg.Add(1) + go rf.worker() + } + + return rf +} + +func (r *rrdpFetcher) worker() { + defer r.wg.Done() + + for job := range r.jobsCh { + r.octoRPKI.fetchRRDP(job.path, job.rsync, r.span) + } +} + +func (r *rrdpFetcher) done() { + close(r.jobsCh) +} + +func (r *rrdpFetcher) wait() { + r.wg.Wait() +} + +func (r *rrdpFetcher) fetch(path string, rsync string) { + r.jobsCh <- rrdpFetchJob{ + path: path, + rsync: rsync, + } +} diff --git a/cmd/octorpki/rsync_fetcher.go b/cmd/octorpki/rsync_fetcher.go new file mode 100644 index 0000000..789a950 --- /dev/null +++ b/cmd/octorpki/rsync_fetcher.go @@ -0,0 +1,49 @@ +package main + +import ( + "sync" + + "github.com/opentracing/opentracing-go" +) + +type rsyncFetcher struct { + octoRPKI *OctoRPKI + jobsCh chan string + wg sync.WaitGroup + span opentracing.Span +} + +func newRsyncFetcher(octoRPKI *OctoRPKI, workers int, span opentracing.Span) *rsyncFetcher { + rf := &rsyncFetcher{ + octoRPKI: octoRPKI, + jobsCh: make(chan string), + span: span, + } + + for i := 0; i < workers; i++ { + rf.wg.Add(1) + go rf.worker() + } + + return rf +} + +func (r *rsyncFetcher) worker() { + defer r.wg.Done() + + for rsyncURL := range r.jobsCh { + r.octoRPKI.fetchRsync(rsyncURL, r.span) + } +} + +func (r *rsyncFetcher) done() { + close(r.jobsCh) +} + +func (r *rsyncFetcher) wait() { + r.wg.Wait() +} + +func (r *rsyncFetcher) fetch(rsync string) { + r.jobsCh <- rsync +} diff --git a/sync/lib/rsync.go b/sync/lib/rsync.go index 0a42f7a..030e595 100644 --- a/sync/lib/rsync.go +++ b/sync/lib/rsync.go @@ -91,7 +91,7 @@ func RunRsync(ctx context.Context, uri string, bin string, dirPath string) ([]*F errorStr := scanner.Text() log.Error(errorStr) - err = scanner.Err() + err := scanner.Err() if err != nil { log.Errorf("%v: %v", uri, err) return