Skip to content

Commit

Permalink
Fix download channel architecture
Browse files Browse the repository at this point in the history
  • Loading branch information
milesmcc committed Mar 16, 2024
1 parent dd17f35 commit 72b3e32
Showing 1 changed file with 60 additions and 51 deletions.
111 changes: 60 additions & 51 deletions pkg/repodump/repodump.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,66 +42,68 @@ type carPullRequest struct {
did string
}

func (s *RepoDump) startRepoDownloader(ctx context.Context, carChan chan *carPullRequest, wg *sync.WaitGroup) {
func (s *RepoDump) startRepoDownloader(ctx context.Context, carChan chan chan *carPullRequest, wg *sync.WaitGroup) {
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
for downloadRequest := range carChan {
// Download the car
if s.SkipDids(downloadRequest.did) {
log.Infof("Skipping car: %s from %s (likely already downloaded)", downloadRequest.did, downloadRequest.pdsEndpoint)
continue
}
log.Infof("Downloading car: %s from %s", downloadRequest.did, downloadRequest.pdsEndpoint)

// Pull the bytes
repoBytes, err := s.Hydrator.GetRepoBytes(downloadRequest.did, downloadRequest.pdsEndpoint)
if err != nil {
log.Errorf("Failed to download car %s from %s: %v", downloadRequest.did, downloadRequest.pdsEndpoint, err)
continue
}
for carDownloadChannel := range carChan {
wg.Add(1)
for downloadRequest := range carDownloadChannel {
// Download the car
if s.SkipDids(downloadRequest.did) {
log.Infof("Skipping car: %s from %s (likely already downloaded)", downloadRequest.did, downloadRequest.pdsEndpoint)
continue
}
log.Infof("Downloading car: %s from %s", downloadRequest.did, downloadRequest.pdsEndpoint)

// Parse the repo so that we can pull all the identities in the repo
repo, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repoBytes))
if err != nil {
log.Errorf("Unable to read repo %s: %s", downloadRequest.did, err)
}
// Pull the bytes
repoBytes, err := s.Hydrator.GetRepoBytes(downloadRequest.did, downloadRequest.pdsEndpoint)
if err != nil {
log.Errorf("Failed to download car %s from %s: %v", downloadRequest.did, downloadRequest.pdsEndpoint, err)
continue
}

// Pull all the identities
identities, err := s.Hydrator.GetIdentitiesInRepo(repo)
if err != nil {
log.Errorf("Unable to read identities in repo %s: %s", downloadRequest.did, err)
}
// Parse the repo so that we can pull all the identities in the repo
repo, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repoBytes))
if err != nil {
log.Errorf("Unable to read repo %s: %s", downloadRequest.did, err)
}

log.Infof("Found %d identities in repo %s", len(identities), downloadRequest.did)
// Pull all the identities
identities, err := s.Hydrator.GetIdentitiesInRepo(repo)
if err != nil {
log.Errorf("Unable to read identities in repo %s: %s", downloadRequest.did, err)
}

// Find the unique PDSes
didFindNewPds := false
for _, identity := range identities {
if _, ok := s.PdsCompleted[identity.PDSEndpoint()]; !ok {
if _, ok := s.PdsQueue[identity.PDSEndpoint()]; !ok {
log.Infof("Adding PDS to queue: %s", identity.PDSEndpoint())
s.PdsQueue[identity.PDSEndpoint()] = true
didFindNewPds = true
log.Infof("Found %d identities in repo %s", len(identities), downloadRequest.did)

// Find the unique PDSes
didFindNewPds := false
for _, identity := range identities {
if _, ok := s.PdsCompleted[identity.PDSEndpoint()]; !ok {
if _, ok := s.PdsQueue[identity.PDSEndpoint()]; !ok {
log.Infof("Adding PDS to queue: %s", identity.PDSEndpoint())
s.PdsQueue[identity.PDSEndpoint()] = true
didFindNewPds = true
}
}
}
}

if didFindNewPds {
// Save the state to disk
err = s.saveIntermediateStateToDisk()
if err != nil {
log.Errorf("Failed to save intermediate state to disk: %v", err)
if didFindNewPds {
// Save the state to disk
err = s.saveIntermediateStateToDisk()
if err != nil {
log.Errorf("Failed to save intermediate state to disk: %v", err)
}
}
}

// Write the bytes to the output channel
s.Output <- CarOutput{
Did: downloadRequest.did,
Data: repoBytes,
// Write the bytes to the output channel
s.Output <- CarOutput{
Did: downloadRequest.did,
Data: repoBytes,
}
}
wg.Done()
}
wg.Done()
}()
}
}
Expand Down Expand Up @@ -183,9 +185,9 @@ func (s *RepoDump) BeginDownloading(ctx context.Context) error {
}

// Start the downloader
carDownloadChannel := make(chan *carPullRequest)
carDownloadChannels := make(chan chan *carPullRequest) // A channel of channels; each subchannel is a queue of carPullRequests from the same PDS
var wg sync.WaitGroup
go s.startRepoDownloader(ctx, carDownloadChannel, &wg)
go s.startRepoDownloader(ctx, carDownloadChannels, &wg)

err = s.loadIntermediateStateFromDisk()
if err != nil {
Expand Down Expand Up @@ -214,6 +216,10 @@ func (s *RepoDump) BeginDownloading(ctx context.Context) error {
Host: pdsEndpoint,
}

// Create the channel and add it to the downloaders
channel := make(chan *carPullRequest)
carDownloadChannels <- channel

cursor := ""
for {
s.Hydrator.Ratelimit.Take()
Expand All @@ -231,15 +237,15 @@ func (s *RepoDump) BeginDownloading(ctx context.Context) error {
// Go through and pull each repo
for _, r := range out.Repos {
log.Infof("Pulling CAR from %s: %s", pdsEndpoint, r.Did)
carDownloadChannel <- &carPullRequest{
channel <- &carPullRequest{
pdsEndpoint: pdsEndpoint,
did: r.Did,
}
}
}

// Close the channel so that the downloaders know that they are done
close(carDownloadChannel)
close(channel)

// Wait for the downloaders to finish on this PDS before moving on
// to the next one
Expand All @@ -251,6 +257,9 @@ func (s *RepoDump) BeginDownloading(ctx context.Context) error {
s.PdsCompleted[pdsEndpoint] = true
}

// Close the channel for the downloaders
close(carDownloadChannels)

<-ctx.Done()
log.Infof("Shutting down...")

Expand Down

0 comments on commit 72b3e32

Please sign in to comment.