From 2f6f88f42803935472a3ff3ae8284f038fa0e20b Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Fri, 25 Feb 2022 15:47:43 +0100 Subject: [PATCH 1/2] feat: add shouldExclude parameter fix concurrency in entry_io Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- entry/entry_io.go | 256 ++++------------------------------------- entry/fetcher.go | 282 ++++++++++++++++++++++++++++++++++++++++++++++ entry/queue.go | 69 ++++++++++++ go.mod | 1 + iface/iface.go | 11 +- log.go | 24 ++-- log_io.go | 39 ++++--- 7 files changed, 412 insertions(+), 270 deletions(-) create mode 100644 entry/fetcher.go create mode 100644 entry/queue.go diff --git a/entry/entry_io.go b/entry/entry_io.go index 1d60a2a..4aa0485 100644 --- a/entry/entry_io.go +++ b/entry/entry_io.go @@ -2,261 +2,43 @@ package entry // import "berty.tech/go-ipfs-log/entry" import ( "context" - "sync" - "time" "github.com/ipfs/go-cid" core_iface "github.com/ipfs/interface-go-ipfs-core" "berty.tech/go-ipfs-log/iface" - "berty.tech/go-ipfs-log/io/cbor" ) type FetchOptions = iface.FetchOptions // FetchParallel retrieves IPFS log entries. func FetchParallel(ctx context.Context, ipfs core_iface.CoreAPI, hashes []cid.Cid, options *FetchOptions) []iface.IPFSLogEntry { - var ( - entries = []iface.IPFSLogEntry(nil) - fetchedEntries = make([][]iface.IPFSLogEntry, len(hashes)) - wg = sync.WaitGroup{} - ) + fetcher := NewFetcher(ipfs, options) + return fetcher.Fetch(ctx, hashes) - if options.IO == nil { - return nil - } + // wg := sync.WaitGroup{} + // wg.Add(len(hashes)) - wg.Add(len(hashes)) + // fetchedEntries := make([][]iface.IPFSLogEntry, len(hashes)) + // for i, h := range hashes { + // go func(h cid.Cid, i int) { + // fetchedEntries[i] = fetcher.Fetch(ctx, []cid.Cid{h}) + // wg.Done() + // }(h, i) + // } - for i, h := range hashes { - go func(h cid.Cid, i int) { - defer wg.Done() + // wg.Wait() - fetchedEntries[i] = FetchAll(ctx, ipfs, []cid.Cid{h}, options) - }(h, i) - } + // entries := []iface.IPFSLogEntry(nil) + // for i := range hashes { + // entries = append(entries, fetchedEntries[i]...) + // } - wg.Wait() - - for i := range hashes { - entries = append(entries, fetchedEntries[i]...) - } - - return entries + // return entries } // FetchAll gets entries from their CIDs. func FetchAll(ctx context.Context, ipfs core_iface.CoreAPI, hashes []cid.Cid, options *FetchOptions) []iface.IPFSLogEntry { - if options.IO == nil { - io, err := cbor.IO(&Entry{}, &LamportClock{}) - if err != nil { - return nil - } - - options.IO = io - } - - var ( - lock = sync.Mutex{} - result = []iface.IPFSLogEntry(nil) - cache = map[cid.Cid]bool{} - loadingCache = map[cid.Cid]bool{} - loadingQueue = map[int][]cid.Cid{0: hashes} - running = 0 // keep track of how many entries are being fetched at any time - maxClock = 0 // keep track of the latest clock time during load - minClock = 0 // keep track of the minimum clock time during load - concurrency = 1 - length = -1 - ) - - if options.Length != nil { - length = *options.Length - } - - if options.Concurrency > concurrency { - concurrency = options.Concurrency - } - - ctx, cancel := context.WithCancel(ctx) - if options.Timeout > 0 { - ctx, cancel = context.WithTimeout(context.Background(), time.Second*options.Timeout) - } - - defer cancel() - - // Add a multihash to the loading queue - addToLoadingQueue := func(e cid.Cid, idx int) { - lock.Lock() - defer lock.Unlock() - - if _, ok := loadingCache[e]; ok { - return - } - - loadingCache[e] = true - - for _, otherE := range loadingQueue[idx] { - if otherE.Equals(e) { - return - } - } - - loadingQueue[idx] = append(loadingQueue[idx], e) - } - - // Get the next items to process from the loading queue - getNextFromQueue := func(length int) []cid.Cid { - lock.Lock() - defer lock.Unlock() - - if length == 0 { - length = 1 - } - - res := []cid.Cid(nil) - - for key := range loadingQueue { - nextItems := loadingQueue[key] - for len(nextItems) > 0 && len(res) < length { - h := nextItems[0] - nextItems = nextItems[1:] - - res = append(res, h) - } - - loadingQueue[key] = nextItems - - if len(nextItems) == 0 { - delete(loadingQueue, key) - } - } - - return res - } - - // Fetch one entry and add it to the results - fetchEntry := func(hash cid.Cid) { - if !hash.Defined() { - return - } - - if _, ok := cache[hash]; ok { - return - } - - addToResults := func(entry iface.IPFSLogEntry) { - if !entry.IsValid() { - return - } - - ts := entry.GetClock().GetTime() - - // Update min/max clocks - if maxClock < ts { - maxClock = ts - } - - if len(result) > 0 { - if ts := result[len(result)-1].GetClock().GetTime(); ts < minClock { - minClock = ts - } - } else { - minClock = maxClock - } - - isLater := len(result) >= length && ts >= minClock - // const calculateIndex = (idx) => maxClock - ts + ((idx + 1) * idx) - - // Add the entry to the results if - // 1) we're fetching all entries - // 2) results is not filled yet - // the clock of the entry is later than current known minimum clock time - if length < 0 || len(result) < length || isLater { - result = append(result, entry) - cache[hash] = true - - if options.ProgressChan != nil { - options.ProgressChan <- entry - } - - } - - if length < 0 { - // If we're fetching all entries (length === -1), adds nexts and refs to the queue - for i, h := range entry.GetNext() { - addToLoadingQueue(h, i) - } - - for i, h := range entry.GetRefs() { - addToLoadingQueue(h, i) - } - } else { - // If we're fetching entries up to certain length, - // fetch the next if result is filled up, to make sure we "check" - // the next entry if its clock is later than what we have in the result - if _, ok := cache[entry.GetHash()]; len(result) < length || ts > minClock || ts == minClock && !ok { - for _, h := range entry.GetNext() { - addToLoadingQueue(h, maxClock-ts) - } - } - if len(result)+len(entry.GetRefs()) <= length { - for i, h := range entry.GetRefs() { - addToLoadingQueue(h, maxClock-ts+((i+1)*i)) - } - } - } - } - - // Load the entry - entry, err := FromMultihashWithIO(ctx, ipfs, hash, options.Provider, options.IO) - if err != nil { - // TODO: log - return - } - - // Add it to the results - addToResults(entry) - } - - // Add entries to exclude from processing to the cache before we start - // Add entries that we don't need to fetch to the "cache" - for _, e := range options.Exclude { - cache[e.GetHash()] = true - } - - loadingQueueHasItems := func() bool { - for _, s := range loadingQueue { - if len(s) > 0 { - return true - } - } - - return false - } - - go func() { - // Does the loading queue have more to process? - for loadingQueueHasItems() { - if ctx.Err() != nil { - break - } - - if running < concurrency { - nexts := getNextFromQueue(concurrency) - running += len(nexts) - for _, n := range nexts { - fetchEntry(n) - } - - running -= len(nexts) - } - } - cancel() - }() - - // Resolve the promise after a timeout (if given) in order to - // not get stuck loading a block that is unreachable - <-ctx.Done() - - return result + fetcher := NewFetcher(ipfs, options) + return fetcher.Fetch(ctx, hashes) } diff --git a/entry/fetcher.go b/entry/fetcher.go new file mode 100644 index 0000000..96c222e --- /dev/null +++ b/entry/fetcher.go @@ -0,0 +1,282 @@ +package entry + +import ( + "context" + "fmt" + "sync" + "time" + + "berty.tech/go-ipfs-log/identityprovider" + "berty.tech/go-ipfs-log/iface" + "berty.tech/go-ipfs-log/io/cbor" + "github.com/ipfs/go-cid" + core_iface "github.com/ipfs/interface-go-ipfs-core" + "golang.org/x/sync/semaphore" +) + +type taskKind int + +const ( + taskKindAdded = iota + taskKindInProgress + taskKindDone +) + +func noopShouldExclude(hash cid.Cid) bool { + return false +} + +type Fetcher struct { + length int + maxClock int // keep track of the latest clock time during load + minClock int // keep track of the minimum clock time during load + muClock sync.Mutex + + timeout time.Duration + io iface.IO + provider identityprovider.Interface + shouldExclude iface.ExcludeFunc + tasksCache map[cid.Cid]taskKind + cnext chan cid.Cid + condProcess *sync.Cond + muProcess *sync.RWMutex + sem *semaphore.Weighted + ipfs core_iface.CoreAPI + progressChan chan iface.IPFSLogEntry +} + +func NewFetcher(ipfs core_iface.CoreAPI, options *FetchOptions) *Fetcher { + // set default + length := -1 + if options.Length != nil { + length = *options.Length + } + + if options.Concurrency <= 0 { + options.Concurrency = 32 + } + + if options.IO == nil { + io, err := cbor.IO(&Entry{}, &LamportClock{}) + if err != nil { + return nil + } + + options.IO = io + } + + if options.ShouldExclude == nil { + options.ShouldExclude = noopShouldExclude + } + + muProcess := sync.RWMutex{} + + // create Fetcher + return &Fetcher{ + io: options.IO, + length: length, + timeout: options.Timeout, + shouldExclude: options.ShouldExclude, + sem: semaphore.NewWeighted(int64(options.Concurrency)), + ipfs: ipfs, + progressChan: options.ProgressChan, + muProcess: &muProcess, + condProcess: sync.NewCond(&muProcess), + maxClock: 0, + minClock: 0, + tasksCache: make(map[cid.Cid]taskKind), + } +} + +func (f *Fetcher) Fetch(ctx context.Context, hashes []cid.Cid) []iface.IPFSLogEntry { + if f.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, f.timeout) + defer cancel() + } + + return f.processQueue(ctx, hashes) +} + +func (f *Fetcher) processQueue(ctx context.Context, hashes []cid.Cid) []iface.IPFSLogEntry { + results := []iface.IPFSLogEntry{} + queue := newProcessQueue() + + f.muProcess.Lock() + f.addHashesToQueue(queue, hashes...) + taskInProgress := 0 + for queue.Len() > 0 { + // acquire a process slot limited by concurrency limit + if err := f.acquireProcessSlot(ctx); err != nil { + fmt.Printf("error while process next: %s\n", err.Error()) + break + } + + // get next hash + hash := queue.Next() + f.tasksCache[hash] = taskKindInProgress + + // run process + go func(hash cid.Cid) { + entry, err := f.fetchEntry(ctx, hash) + if err != nil { + fmt.Printf("unable to fetch entry: %s\n", err.Error()) + } + + // free process slot + f.processDone() + + f.muProcess.Lock() + + if entry != nil { + entryHash := entry.GetHash() + var lastEntry iface.IPFSLogEntry + if len(results) > 0 { + lastEntry = results[len(results)-1] + } + + // update clock + f.updateClock(ctx, entry, lastEntry) + + // if we don't know this hash yet, add it to result + cache := f.tasksCache[entryHash] + if cache == taskKindAdded || cache == taskKindInProgress { + ts := entry.GetClock().GetTime() + isLater := len(results) >= f.length && ts >= f.minClock + if f.length < 0 || len(results) < f.length || isLater { + results = append(results, entry) + // signal progress + if f.progressChan != nil { + f.progressChan <- entry + } + } + + f.tasksCache[entryHash] = taskKindDone + + // add next elems to queue + f.addNextEntry(ctx, queue, entry, results) + } + } + + // mark this process as done + taskInProgress-- + + // signal that a slot is available + f.condProcess.Signal() + + f.muProcess.Unlock() + }(hash) + + // increase in progress task counter + taskInProgress++ + + // wait until a task is added or that no running task is in progress + for queue.Len() == 0 && taskInProgress > 0 { + f.condProcess.Wait() + } + } + + // wait until all process are done/canceled + for taskInProgress > 0 { + f.condProcess.Wait() + } + + f.muProcess.Unlock() + + return results +} + +func (f *Fetcher) updateClock(ctx context.Context, entry, lastEntry iface.IPFSLogEntry) { + f.muClock.Lock() + + ts := entry.GetClock().GetTime() + + // Update min/max clocks + if f.maxClock < ts { + f.maxClock = ts + } + + if lastEntry != nil { + if ts := lastEntry.GetClock().GetTime(); ts < f.minClock { + f.minClock = ts + } + } else { + f.minClock = f.maxClock + } + + f.muClock.Unlock() +} + +func (f *Fetcher) exclude(hash cid.Cid) (yes bool) { + if yes = !hash.Defined(); yes { + return + } + + // do we have it in the internal cache ? + if _, yes = f.tasksCache[hash]; yes { + return + } + + // should the caller want it ? + yes = f.shouldExclude(hash) + return +} + +func (f *Fetcher) addNextEntry(ctx context.Context, queue processQueue, entry iface.IPFSLogEntry, results []iface.IPFSLogEntry) { + ts := entry.GetClock().GetTime() + + if f.length < 0 { + // If we're fetching all entries (length === -1), adds nexts and refs to the queue + f.addHashesToQueue(queue, entry.GetNext()...) + f.addHashesToQueue(queue, entry.GetRefs()...) + return + } + + // If we're fetching entries up to certain length, + // fetch the next if result is filled up, to make sure we "check" + // the next entry if its clock is later than what we have in the result + if len(results) < f.length || ts > f.minClock || ts == f.minClock { + for _, h := range entry.GetNext() { + f.addHashToQueue(queue, f.maxClock-ts, h) + } + } + if len(results)+len(entry.GetRefs()) <= f.length { + for i, h := range entry.GetRefs() { + f.addHashToQueue(queue, f.maxClock-ts+((i+1)*i), h) + } + } +} + +func (f *Fetcher) fetchEntry(ctx context.Context, hash cid.Cid) (entry iface.IPFSLogEntry, err error) { + // Load the entry + return FromMultihashWithIO(ctx, f.ipfs, hash, f.provider, f.io) +} + +func (f *Fetcher) addHashesToQueue(queue processQueue, hashes ...cid.Cid) (added int) { + for i, h := range hashes { + added += f.addHashToQueue(queue, i, h) + } + + return +} + +func (f *Fetcher) addHashToQueue(queue processQueue, index int, hash cid.Cid) (added int) { + if f.exclude(hash) { + return + } + + queue.Add(index, hash) + f.tasksCache[hash] = taskKindAdded + + return 1 + +} + +func (f *Fetcher) acquireProcessSlot(ctx context.Context) error { + return f.sem.Acquire(ctx, 1) +} + +func (f *Fetcher) processDone() { + // signal that a process slot is available + f.sem.Release(1) +} diff --git a/entry/queue.go b/entry/queue.go new file mode 100644 index 0000000..01d4a0f --- /dev/null +++ b/entry/queue.go @@ -0,0 +1,69 @@ +package entry + +import ( + "container/heap" + + cid "github.com/ipfs/go-cid" +) + +type processQueue interface { + Add(index int, hash cid.Cid) + Next() cid.Cid + Len() int +} + +func newProcessQueue() processQueue { + var items priorityQueue = []*item{} + heap.Init(&items) + return &items +} + +type item struct { + hash cid.Cid + index int +} + +// processHashQueue is not thread safe +type priorityQueue []*item + +func (pq *priorityQueue) Add(index int, hash cid.Cid) { + heap.Push(pq, &item{ + hash: hash, + index: index, + }) + // *pq = append(*pq, hash...) +} + +func (pq *priorityQueue) Next() (hash cid.Cid) { + item := heap.Pop(pq).(*item) + return item.hash +} + +func (pq *priorityQueue) Push(x interface{}) { + item := x.(*item) + *pq = append(*pq, item) +} + +func (pq *priorityQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + *pq = old[0 : n-1] + return item +} + +func (pq priorityQueue) Len() int { return len(pq) } + +func (pq priorityQueue) Less(i, j int) bool { + // We want Pop to give us the highest, not lowest, priority so we use greater than here. + return pq[i].index < pq[j].index +} + +func (pq priorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] +} + +// func (pq processHashQueue) GetQueue() []processItem { +// return pq +// } diff --git a/go.mod b/go.mod index 6b1d7f7..55625dd 100644 --- a/go.mod +++ b/go.mod @@ -20,4 +20,5 @@ require ( github.com/stretchr/testify v1.7.0 go.uber.org/goleak v1.1.11 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ) diff --git a/iface/iface.go b/iface/iface.go index 1e5c59b..8cf2ab4 100644 --- a/iface/iface.go +++ b/iface/iface.go @@ -20,12 +20,15 @@ type WriteOpts struct { EncryptedLinks string EncryptedLinksNonce string } +type ExcludeFunc func(hash cid.Cid) bool type FetchOptions struct { - Length *int - Exclude []IPFSLogEntry - Concurrency int - Timeout time.Duration + Length *int + ShouldExclude ExcludeFunc + Exclude []IPFSLogEntry + Concurrency int + Timeout time.Duration + // @FIXME(gfanton): progress chan is close automatically by IpfsLog ProgressChan chan IPFSLogEntry Provider identityprovider.Interface IO IO diff --git a/log.go b/log.go index abb0045..2afab18 100644 --- a/log.go +++ b/log.go @@ -760,12 +760,13 @@ func NewFromMultihash(ctx context.Context, services core_iface.CoreAPI, identity } data, err := fromMultihash(ctx, services, hash, &FetchOptions{ - Length: fetchOptions.Length, - Exclude: fetchOptions.Exclude, - ProgressChan: fetchOptions.ProgressChan, - Timeout: fetchOptions.Timeout, - Concurrency: fetchOptions.Concurrency, - SortFn: fetchOptions.SortFn, + Length: fetchOptions.Length, + Exclude: fetchOptions.Exclude, + ShouldExclude: fetchOptions.ShouldExclude, + ProgressChan: fetchOptions.ProgressChan, + Timeout: fetchOptions.Timeout, + Concurrency: fetchOptions.Concurrency, + SortFn: fetchOptions.SortFn, }, logOptions.IO) if err != nil { @@ -818,11 +819,12 @@ func NewFromEntryHash(ctx context.Context, services core_iface.CoreAPI, identity // TODO: need to verify the entries with 'key' entries, err := fromEntryHash(ctx, services, []cid.Cid{hash}, &FetchOptions{ - Length: fetchOptions.Length, - Exclude: fetchOptions.Exclude, - ProgressChan: fetchOptions.ProgressChan, - Timeout: fetchOptions.Timeout, - Concurrency: fetchOptions.Concurrency, + Length: fetchOptions.Length, + Exclude: fetchOptions.Exclude, + ShouldExclude: fetchOptions.ShouldExclude, + ProgressChan: fetchOptions.ProgressChan, + Timeout: fetchOptions.Timeout, + Concurrency: fetchOptions.Concurrency, }, logOptions.IO) if err != nil { return nil, errmsg.ErrLogFromEntryHash.Wrap(err) diff --git a/log_io.go b/log_io.go index c5dd5a9..f4bae3a 100644 --- a/log_io.go +++ b/log_io.go @@ -19,12 +19,13 @@ import ( ) type FetchOptions struct { - Length *int - Exclude []iface.IPFSLogEntry - ProgressChan chan iface.IPFSLogEntry - Timeout time.Duration - Concurrency int - SortFn iface.EntrySortFn + Length *int + Exclude []iface.IPFSLogEntry + ShouldExclude iface.ExcludeFunc + ProgressChan chan iface.IPFSLogEntry + Timeout time.Duration + Concurrency int + SortFn iface.EntrySortFn } func toMultihash(ctx context.Context, services core_iface.CoreAPI, log *IPFSLog) (cid.Cid, error) { @@ -53,12 +54,13 @@ func fromMultihash(ctx context.Context, services core_iface.CoreAPI, hash cid.Ci } entries := entry.FetchAll(ctx, services, logHeads.Heads, &iface.FetchOptions{ - Length: options.Length, - Exclude: options.Exclude, - Concurrency: options.Concurrency, - Timeout: options.Timeout, - ProgressChan: options.ProgressChan, - IO: io, + Length: options.Length, + ShouldExclude: options.ShouldExclude, + Exclude: options.Exclude, + Concurrency: options.Concurrency, + Timeout: options.Timeout, + ProgressChan: options.ProgressChan, + IO: io, }) if options.Length != nil && *options.Length > -1 { @@ -99,12 +101,13 @@ func fromEntryHash(ctx context.Context, services core_iface.CoreAPI, hashes []ci } all := entry.FetchParallel(ctx, services, hashes, &iface.FetchOptions{ - Length: options.Length, - Exclude: options.Exclude, - ProgressChan: options.ProgressChan, - Timeout: options.Timeout, - Concurrency: options.Concurrency, - IO: io, + Length: options.Length, + Exclude: options.Exclude, + ShouldExclude: options.ShouldExclude, + ProgressChan: options.ProgressChan, + Timeout: options.Timeout, + Concurrency: options.Concurrency, + IO: io, }) sortFn := sorting.NoZeroes(sorting.LastWriteWins) From 1cbc1c42a415de5dde3e3293069f11284002cd49 Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Fri, 4 Mar 2022 14:29:15 +0100 Subject: [PATCH 2/2] chore: cleanup Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- entry/entry_io.go | 23 ++--------------------- entry/fetcher.go | 10 +++++----- 2 files changed, 7 insertions(+), 26 deletions(-) diff --git a/entry/entry_io.go b/entry/entry_io.go index 4aa0485..0f279f0 100644 --- a/entry/entry_io.go +++ b/entry/entry_io.go @@ -11,30 +11,11 @@ import ( type FetchOptions = iface.FetchOptions -// FetchParallel retrieves IPFS log entries. +// FetchParallel has the same comportement than FetchAll, we keep it for retrop +// compatibility purpose func FetchParallel(ctx context.Context, ipfs core_iface.CoreAPI, hashes []cid.Cid, options *FetchOptions) []iface.IPFSLogEntry { fetcher := NewFetcher(ipfs, options) return fetcher.Fetch(ctx, hashes) - - // wg := sync.WaitGroup{} - // wg.Add(len(hashes)) - - // fetchedEntries := make([][]iface.IPFSLogEntry, len(hashes)) - // for i, h := range hashes { - // go func(h cid.Cid, i int) { - // fetchedEntries[i] = fetcher.Fetch(ctx, []cid.Cid{h}) - // wg.Done() - // }(h, i) - // } - - // wg.Wait() - - // entries := []iface.IPFSLogEntry(nil) - // for i := range hashes { - // entries = append(entries, fetchedEntries[i]...) - // } - - // return entries } // FetchAll gets entries from their CIDs. diff --git a/entry/fetcher.go b/entry/fetcher.go index 96c222e..0ccacf3 100644 --- a/entry/fetcher.go +++ b/entry/fetcher.go @@ -2,7 +2,6 @@ package entry import ( "context" - "fmt" "sync" "time" @@ -37,7 +36,6 @@ type Fetcher struct { provider identityprovider.Interface shouldExclude iface.ExcludeFunc tasksCache map[cid.Cid]taskKind - cnext chan cid.Cid condProcess *sync.Cond muProcess *sync.RWMutex sem *semaphore.Weighted @@ -108,7 +106,8 @@ func (f *Fetcher) processQueue(ctx context.Context, hashes []cid.Cid) []iface.IP for queue.Len() > 0 { // acquire a process slot limited by concurrency limit if err := f.acquireProcessSlot(ctx); err != nil { - fmt.Printf("error while process next: %s\n", err.Error()) + // @FIXME(gfanton): log this + // fmt.Printf("error while process next: %s\n", err.Error()) break } @@ -119,8 +118,9 @@ func (f *Fetcher) processQueue(ctx context.Context, hashes []cid.Cid) []iface.IP // run process go func(hash cid.Cid) { entry, err := f.fetchEntry(ctx, hash) - if err != nil { - fmt.Printf("unable to fetch entry: %s\n", err.Error()) + if err != nil { // nolint:staticcheck + // @FIXME(gfanton): log this + // fmt.Printf("unable to fetch entry: %s\n", err.Error()) } // free process slot