Concurrent fetching of resources #967
titanventura
started this conversation in
Ideas
Replies: 2 comments
-
Here is an example of concurrent fetching for linode instance and instance disks package compute
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/linode/linodego"
log "github.com/sirupsen/logrus"
"github.com/tailwarden/komiser/models"
. "github.com/tailwarden/komiser/models"
"github.com/tailwarden/komiser/providers"
)
func LinodeInstancesAndInstanceDisks(ctx context.Context, client providers.ProviderClient) ([]Resource, error) {
linodeInstances, err := client.LinodeClient.ListInstances(ctx, &linodego.ListOptions{
PageOptions: &linodego.PageOptions{},
PageSize: 0,
Filter: "",
})
if err != nil {
return nil, err
}
resources := make([]Resource, 0)
linodeInstanceChan := make(chan *linodego.Instance)
numWorkers := 4
workerChans := make([]<-chan models.Resource, 0)
for i := 0; i < numWorkers; i++ {
workerChans = append(workerChans, linodeInstanceDiskWorker(ctx, client, linodeInstanceChan))
}
for _, instance := range linodeInstances {
tags := make([]Tag, 0)
for _, tag := range instance.Tags {
if strings.Contains(tag, ":") {
parts := strings.Split(tag, ":")
tags = append(tags, models.Tag{
Key: parts[0],
Value: parts[1],
})
} else {
tags = append(tags, models.Tag{
Key: tag,
Value: tag,
})
}
}
resources = append(resources, models.Resource{
Provider: "Linode",
Account: client.Name,
Service: "Linode Instance",
Region: instance.Region,
ResourceId: fmt.Sprintf("%d", instance.ID),
Cost: 0,
Name: instance.Label,
FetchedAt: time.Now(),
CreatedAt: *instance.Created,
Tags: tags,
Link: fmt.Sprintf("https://cloud.linode.com/linodes/%d", instance.ID),
})
linodeInstanceChan <- &instance
}
close(linodeInstanceChan)
for resource := range merge(workerChans...) {
resources = append(resources, resource)
}
log.WithFields(log.Fields{
"provider": "Linode",
"account": client.Name,
"service": "Linode Instance and Instance Disk",
"resources": len(resources),
}).Info("Fetched resources")
return resources, nil
}
func linodeInstanceDiskWorker(ctx context.Context, client providers.ProviderClient, in <-chan *linodego.Instance) <-chan models.Resource {
out := make(chan models.Resource)
go func() {
for instance := range in {
instanceDisks, _ := client.LinodeClient.ListInstanceDisks(ctx, instance.ID, &linodego.ListOptions{})
for _, disk := range instanceDisks {
out <- models.Resource{
Provider: "Linode",
Account: client.Name,
Service: "Linode Instance Disk",
Region: instance.Region,
ResourceId: fmt.Sprintf("%d", disk.ID),
Cost: 0,
Name: disk.Label,
FetchedAt: time.Now(),
CreatedAt: *disk.Created,
Link: fmt.Sprintf("https://cloud.linode.com/linodes/%d/storage", instance.ID),
}
}
}
close(out)
}()
return out
}
func merge(cs ...<-chan models.Resource) <-chan models.Resource {
var wg sync.WaitGroup
out := make(chan models.Resource)
output := func(c <-chan models.Resource) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
} |
Beta Was this translation helpful? Give feedback.
0 replies
-
Hey @mlabouardy @jakepage91 @AvineshTripathi , I have implemented a base version of the concurrent resource fetching and you can find it here https://github.com/titanventura/komiser/blob/concurrent-resource-fetching/internal/internal.go Please let me know your thoughts ! We can iterate on this. Thanks to Komiser ! |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Currently, komiser leverages go's concurrency at provider level, meaning, each provider (aws,gcp,linode etc) have their own goroutines. But it does not implement concurrency inside each provider. This can make nested fetching of resources a bit slower.
references:
this discussion is to brainstorm concurrent fetching of resources across komiser. It is always important to have these in mind while implementing concurrency in go:
Beta Was this translation helpful? Give feedback.
All reactions