diff --git a/tools/datafix/reimport_gcs_record.py b/tools/datafix/reimport_gcs_record.py index 25c53c74bea..a90380dad65 100755 --- a/tools/datafix/reimport_gcs_record.py +++ b/tools/datafix/reimport_gcs_record.py @@ -16,6 +16,7 @@ from google.cloud import datastore from google.cloud import storage +from google.cloud.exceptions import NotFound from google.cloud.storage import retry from google.cloud.datastore.query import PropertyFilter @@ -23,7 +24,6 @@ import os import functools -MAX_BATCH_SIZE = 500 MAX_QUERY_SIZE = 30 @@ -168,29 +168,34 @@ def main() -> None: result_to_fix = [r for r in result if r['source_of_truth'] == 2] print(f"There are {len(result_to_fix)} bugs to operate on...") - # Chunk the results to modify in acceptibly sized batches for the API. - for batch in range(0, len(result_to_fix), MAX_BATCH_SIZE): - try: - with ds_client.transaction() as xact: - for bug in result_to_fix[batch:batch + MAX_BATCH_SIZE]: - bug_in_gcs = objname_for_bug(ds_client, bug) - if args.verbose: - print(f"Resetting creation time for {bug_in_gcs['uri']}") - if not args.dryrun: + try: + with ds_client.transaction() as xact: + for bug in result_to_fix: + bug_in_gcs = objname_for_bug(ds_client, bug) + if args.verbose: + print(f"Resetting creation time for {bug_in_gcs['uri']}") + if not args.dryrun: + try: reset_object_creation(bug_in_gcs["bucket"], bug_in_gcs["path"], args.tmpdir) - bug["import_last_modified"] = None - if args.verbose: - print(f"Resetting import_last_modified for {bug['db_id']}") - print(f"Review at {url_base}{bug['db_id']} when reimport completes.") - xact.put(bug) - if args.dryrun: - raise Exception("Dry run mode. Preventing transaction from commiting") # pylint: disable=broad-exception-raised - except Exception as e: - # Don't have the first batch's transaction-aborting exception stop - # subsequent batches from being attempted. - if args.dryrun and e.args[0].startswith("Dry run mode"): - pass + except NotFound as e: + if args.verbose: + print(f"Skipping, got {e}\n") + continue + bug["import_last_modified"] = None + if args.verbose: + print(f"Resetting import_last_modified for {bug['db_id']}") + print(f"Review at {url_base}{bug['db_id']} when reimport completes.") + xact.put(bug) + if args.dryrun: + raise Exception("Dry run mode. Preventing transaction from commiting") # pylint: disable=broad-exception-raised + except Exception as e: + # Don't have the first batch's transaction-aborting exception stop + # subsequent batches from being attempted. + if args.dryrun and e.args[0].startswith("Dry run mode"): + pass + else: + raise if __name__ == "__main__":