From d5287df8b40426a1dead21866efdccf3773fd9c3 Mon Sep 17 00:00:00 2001 From: Rajendra Adhikari Date: Wed, 14 Aug 2024 09:14:41 -0600 Subject: [PATCH] Add --continue_upload flag --- buildstockbatch/base.py | 6 ++++-- buildstockbatch/hpc.py | 19 +++++++++++++++---- buildstockbatch/local.py | 11 ++++++++++- buildstockbatch/postprocessing.py | 20 +++++++++++++------- docs/changelog/changelog_dev.rst | 6 ++++++ 5 files changed, 48 insertions(+), 14 deletions(-) diff --git a/buildstockbatch/base.py b/buildstockbatch/base.py index 5ae99b47..21521139 100644 --- a/buildstockbatch/base.py +++ b/buildstockbatch/base.py @@ -936,11 +936,12 @@ def get_fs(self): def upload_results(self, *args, **kwargs): return postprocessing.upload_results(*args, **kwargs) - def process_results(self, skip_combine=False, use_dask_cluster=True): + def process_results(self, skip_combine=False, use_dask_cluster=True, continue_upload=False): if use_dask_cluster: self.get_dask_client() # noqa F841 try: + do_timeseries = False wfg_args = self.cfg["workflow_generator"].get("args", {}) if self.cfg["workflow_generator"]["type"] == "residential_hpxml": if "simulation_output_report" in wfg_args.keys(): @@ -956,7 +957,8 @@ def process_results(self, skip_combine=False, use_dask_cluster=True): aws_conf = self.cfg.get("postprocessing", {}).get("aws", {}) if "s3" in aws_conf or "aws" in self.cfg: s3_bucket, s3_prefix = self.upload_results( - aws_conf, self.output_dir, self.results_dir, self.sampler.csv_path + aws_conf, self.output_dir, self.results_dir, self.sampler.csv_path, + continue_upload=continue_upload ) if "athena" in aws_conf: postprocessing.create_athena_tables( diff --git a/buildstockbatch/hpc.py b/buildstockbatch/hpc.py index e57e8690..ffe1a253 100644 --- a/buildstockbatch/hpc.py +++ b/buildstockbatch/hpc.py @@ -567,7 +567,7 @@ def queue_jobs(self, array_ids=None, hipri=False): job_id = m.group(1) return [job_id] - def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False): + def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False, continue_upload=False): # Configuration values hpc_cfg = self.cfg[self.HPC_NAME] account = hpc_cfg["account"] @@ -605,6 +605,7 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False) "PROJECTFILE": self.project_filename, "OUT_DIR": self.output_dir, "UPLOADONLY": str(upload_only), + "CONTINUE_UPLOAD": str(continue_upload), "MEMORY": str(memory), "NPROCS": str(n_procs), } @@ -868,7 +869,14 @@ def user_cli(Batch: SlurmBatch, argv: list): ) group.add_argument( "--uploadonly", - help="Only upload to S3, useful when postprocessing is already done. Ignores the upload flag in yaml", + help="Only upload to S3, useful when postprocessing is already done. Ignores the upload flag in yaml."\ + " Errors out if files already exists in s3", + action="store_true", + ) + group.add_argument( + "--continue_upload", + help="Only upload to S3, useful when postprocessing is already done. Ignores the upload flag in yaml."\ + " Continues with remaining files if files already exists in s3", action="store_true", ) group.add_argument( @@ -893,9 +901,9 @@ def user_cli(Batch: SlurmBatch, argv: list): return # if the project has already been run, simply queue the correct post-processing step - if args.postprocessonly or args.uploadonly: + if args.postprocessonly or args.uploadonly or args.continue_upload: batch = Batch(project_filename) - batch.queue_post_processing(upload_only=args.uploadonly, hipri=args.hipri) + batch.queue_post_processing(upload_only=args.uploadonly, hipri=args.hipri, continue_upload=args.continue_upload) return if args.rerun_failed: @@ -943,6 +951,7 @@ def main(): job_array_number = int(os.environ.get("SLURM_ARRAY_TASK_ID", 0)) post_process = get_bool_env_var("POSTPROCESS") upload_only = get_bool_env_var("UPLOADONLY") + continue_upload = get_bool_env_var("CONTINUE_UPLOAD") measures_only = get_bool_env_var("MEASURESONLY") sampling_only = get_bool_env_var("SAMPLINGONLY") if job_array_number: @@ -958,6 +967,8 @@ def main(): assert not sampling_only if upload_only: batch.process_results(skip_combine=True) + elif continue_upload: + batch.process_results(skip_combine=True, continue_upload=True) else: batch.process_results() else: diff --git a/buildstockbatch/local.py b/buildstockbatch/local.py index 5aa1402b..335e4eec 100644 --- a/buildstockbatch/local.py +++ b/buildstockbatch/local.py @@ -393,7 +393,14 @@ def main(): ) group.add_argument( "--uploadonly", - help="Only upload to S3, useful when postprocessing is already done. Ignores the " "upload flag in yaml", + help="Only upload to S3, useful when postprocessing is already done. Ignores the upload flag in yaml."\ + " Errors out if files already exists in s3", + action="store_true", + ) + group.add_argument( + "--continue_upload", + help="Only upload to S3, useful when postprocessing is already done. Ignores the upload flag in yaml."\ + " Continues with remaining files if files already exists in s3", action="store_true", ) group.add_argument( @@ -421,6 +428,8 @@ def main(): return if args.uploadonly: batch.process_results(skip_combine=True) + elif args.continue_upload: + batch.process_results(skip_combine=True, continue_upload=True) else: batch.process_results() diff --git a/buildstockbatch/postprocessing.py b/buildstockbatch/postprocessing.py index 4461cc2e..d379434a 100644 --- a/buildstockbatch/postprocessing.py +++ b/buildstockbatch/postprocessing.py @@ -612,7 +612,7 @@ def remove_intermediate_files(fs, results_dir, keep_individual_timeseries=False) fs.rm(ts_in_dir, recursive=True) -def upload_results(aws_conf, output_dir, results_dir, buildstock_csv_filename): +def upload_results(aws_conf, output_dir, results_dir, buildstock_csv_filename, continue_upload = False): logger.info("Uploading the parquet files to s3") output_folder_name = Path(output_dir).name @@ -627,7 +627,7 @@ def upload_results(aws_conf, output_dir, results_dir, buildstock_csv_filename): all_files.append(file.relative_to(parquet_dir)) for file in [*ts_dir.glob("_common_metadata"), *ts_dir.glob("_metadata")]: all_files.append(file.relative_to(parquet_dir)) - + logger.info(f"{len(all_files)} parquet files will be uploaded.") s3_prefix = aws_conf.get("s3", {}).get("prefix", "").rstrip("/") s3_bucket = aws_conf.get("s3", {}).get("bucket", None) if not (s3_prefix and s3_bucket): @@ -637,10 +637,14 @@ def upload_results(aws_conf, output_dir, results_dir, buildstock_csv_filename): s3 = boto3.resource("s3") bucket = s3.Bucket(s3_bucket) - n_existing_files = len(list(bucket.objects.filter(Prefix=s3_prefix_output))) - if n_existing_files > 0: - logger.error(f"There are already {n_existing_files} files in the s3 folder {s3_bucket}/{s3_prefix_output}.") - raise FileExistsError(f"s3://{s3_bucket}/{s3_prefix_output}") + existing_files = {f.key.removeprefix(s3_prefix_output) for f in bucket.objects.filter(Prefix=s3_prefix_output)} + + if len(existing_files) > 0: + logger.info(f"There are already {len(existing_files)} files in the s3 folder {s3_bucket}/{s3_prefix_output}.") + if not continue_upload: + raise FileExistsError("Either use --continue_upload or delete files from s3") + all_files = [file for file in all_files if str(file) not in existing_files] + logger.info(f"Only uploading the rest of the {len(all_files)} files") def upload_file(filepath, s3key=None): full_path = filepath if filepath.is_absolute() else parquet_dir.joinpath(filepath) @@ -653,7 +657,9 @@ def upload_file(filepath, s3key=None): tasks = list(map(dask.delayed(upload_file), all_files)) if buildstock_csv_filename is not None: buildstock_csv_filepath = Path(buildstock_csv_filename) - if buildstock_csv_filepath.exists(): + if f"buildstock_csv/{buildstock_csv_filepath.name}" in existing_files: + logger.info("Buildstock CSV already exists in s3.") + elif buildstock_csv_filepath.exists(): tasks.append( dask.delayed(upload_file)( buildstock_csv_filepath, diff --git a/docs/changelog/changelog_dev.rst b/docs/changelog/changelog_dev.rst index 5178193f..b498820a 100644 --- a/docs/changelog/changelog_dev.rst +++ b/docs/changelog/changelog_dev.rst @@ -109,3 +109,9 @@ Development Changelog Stop creating dask _metadata files for the timeseries parquet files since it crashes the postprocessing. + + .. change:: + :tags: general, bugfix + :pullreq: 465 + + Upload only the missing files to s3