Skip to content

Commit

Permalink
feat(source-sync): overhaul behaviour
Browse files Browse the repository at this point in the history
This commit:

- Adds support to download all sources concurrently, each in their own
  thread, rather than sequentially, in the interests of saving time. It
  emits an interim progress report of which sources are still being
  downloaded every time a thread completes

- Adds support for an arbitrary number of sources to download other than
  just all or one of them

- Adds support for excluding an arbitrary number of sources when
  downloading all of them

- fixes a bug where the `--verbose` flag was not being honoured when
  downloading all sources.
  • Loading branch information
andrewpollock committed Nov 15, 2024
1 parent 54560cb commit cdaf7fd
Showing 1 changed file with 36 additions and 17 deletions.
53 changes: 36 additions & 17 deletions tools/source-sync/source_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import argparse
import yaml
import json
import sys
import pprint
import urllib
import concurrent.futures
from urllib3.util.retry import Retry
import requests
from requests.adapters import HTTPAdapter
Expand All @@ -28,7 +28,7 @@ def Sources(source_path: str) -> dict:
def DownloadSource(source: dict, directory: str, verbose: bool = False) -> None:
"""Download the source defined in the dictionary."""
if verbose:
pprint.pp(source)
print(f'Operating on: {pprint.pformat(source)}')
if source['type'] == 0:
DownloadGitSource(source, directory, verbose)
return
Expand All @@ -47,7 +47,7 @@ def DownloadGitSource(source: dict,
"""(Shallow) clone a Git-based source."""
path = os.path.join(directory, source['name'])
if verbose:
print(f'Cloning {source["repo_url"]} into {path}')
print(f'[{source["name"]}]: Cloning {source["repo_url"]} into {path}')
_ = pygit2.clone_repository(
source['repo_url'],
path,
Expand Down Expand Up @@ -81,7 +81,7 @@ def DownloadGCSSource(source: dict,
# Clear the blob's generation for more reliable downloading.
blob = storage.Blob(blob.name, blob.bucket, generation=None)
if verbose:
print(f'Downloading {blob}')
print(f'[{source["name"]}]: Downloading {blob}')
fn = os.path.join(directory, source['name'],
os.path.basename(urllib.parse.unquote(blob.path)))
blob.download_to_filename(fn, retry=retry.DEFAULT_RETRY)
Expand Down Expand Up @@ -125,13 +125,14 @@ def DownloadRESTSource(source: dict,
# The full record was supplied in the initial listing.
with open(fn, mode='w') as record_f:
if verbose:
print(f'Writing {record["id"]} to {record_f.name}')
print(
f'[{source["name"]}]: Writing {record["id"]} to {record_f.name}')
json.dump(record, record_f)


def main() -> None:
parser = argparse.ArgumentParser(
description="Download records from an OSV.dev data source.")
description='Download records from an OSV.dev data source.')
parser.add_argument(
'--verbose',
action=argparse.BooleanOptionalAction,
Expand Down Expand Up @@ -161,12 +162,23 @@ def main() -> None:
action='store',
dest='source',
default='',
nargs='+',
help='The source to download (or "ALL")')
parser.add_argument(
'--source_exclude',
action='store',
dest='source_exclude',
default='',
nargs='+',
help='The sources to exclude from downloading when using "--source ALL")')
args = parser.parse_args()

if not args.list and not args.source:
parser.error('either --list or --source are required')

if args.source_exclude and args.source != ['ALL']:
parser.error('useless use of "--source_exclude" without "--source ALL"')

sources = Sources(args.source_file)

if args.list:
Expand All @@ -175,17 +187,24 @@ def main() -> None:
print(f'\t{source["name"]}')
return

if args.source == 'ALL':
for source in sources:
print(f'Acting on {source["name"]}')
# TODO: parallelize this.
DownloadSource(source, args.directory)
else:
source = [source for source in sources if source['name'] == args.source]
if not source:
print(f'Invalid source {args.source}')
sys.exit(1)
DownloadSource(source[0], args.directory, args.verbose)
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_source = {
executor.submit(DownloadSource, source, args.directory, args.verbose):
source for source in sources if source['name'] in args.source or
(args.source == ['ALL'] and source['name'] not in args.source_exclude)
}
for future in concurrent.futures.as_completed(future_to_source):
inflight = [
source['name']
for (f, source) in future_to_source.items()
if f.running()
]
print(f'Still in flight: {pprint.pformat(inflight, width=1)}')
source = future_to_source[future]
if exc := future.exception():
print(f"source['name'] raised {exc}")
else:
print(f"{source['name']} completed")


if __name__ == '__main__':
Expand Down

0 comments on commit cdaf7fd

Please sign in to comment.