Skip to content

Commit

Permalink
Add --continue_upload flag
Browse files Browse the repository at this point in the history
  • Loading branch information
rajeee committed Aug 14, 2024
1 parent fe1ba37 commit d5287df
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 14 deletions.
6 changes: 4 additions & 2 deletions buildstockbatch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,11 +936,12 @@ def get_fs(self):
def upload_results(self, *args, **kwargs):
return postprocessing.upload_results(*args, **kwargs)

def process_results(self, skip_combine=False, use_dask_cluster=True):
def process_results(self, skip_combine=False, use_dask_cluster=True, continue_upload=False):
if use_dask_cluster:
self.get_dask_client() # noqa F841

try:
do_timeseries = False
wfg_args = self.cfg["workflow_generator"].get("args", {})
if self.cfg["workflow_generator"]["type"] == "residential_hpxml":
if "simulation_output_report" in wfg_args.keys():
Expand All @@ -956,7 +957,8 @@ def process_results(self, skip_combine=False, use_dask_cluster=True):
aws_conf = self.cfg.get("postprocessing", {}).get("aws", {})
if "s3" in aws_conf or "aws" in self.cfg:
s3_bucket, s3_prefix = self.upload_results(
aws_conf, self.output_dir, self.results_dir, self.sampler.csv_path
aws_conf, self.output_dir, self.results_dir, self.sampler.csv_path,
continue_upload=continue_upload
)
if "athena" in aws_conf:
postprocessing.create_athena_tables(
Expand Down
19 changes: 15 additions & 4 deletions buildstockbatch/hpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ def queue_jobs(self, array_ids=None, hipri=False):
job_id = m.group(1)
return [job_id]

def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False):
def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False, continue_upload=False):
# Configuration values
hpc_cfg = self.cfg[self.HPC_NAME]
account = hpc_cfg["account"]
Expand Down Expand Up @@ -605,6 +605,7 @@ def queue_post_processing(self, after_jobids=[], upload_only=False, hipri=False)
"PROJECTFILE": self.project_filename,
"OUT_DIR": self.output_dir,
"UPLOADONLY": str(upload_only),
"CONTINUE_UPLOAD": str(continue_upload),
"MEMORY": str(memory),
"NPROCS": str(n_procs),
}
Expand Down Expand Up @@ -868,7 +869,14 @@ def user_cli(Batch: SlurmBatch, argv: list):
)
group.add_argument(
"--uploadonly",
help="Only upload to S3, useful when postprocessing is already done. Ignores the upload flag in yaml",
help="Only upload to S3, useful when postprocessing is already done. Ignores the upload flag in yaml."\
" Errors out if files already exists in s3",
action="store_true",
)
group.add_argument(
"--continue_upload",
help="Only upload to S3, useful when postprocessing is already done. Ignores the upload flag in yaml."\
" Continues with remaining files if files already exists in s3",
action="store_true",
)
group.add_argument(
Expand All @@ -893,9 +901,9 @@ def user_cli(Batch: SlurmBatch, argv: list):
return

# if the project has already been run, simply queue the correct post-processing step
if args.postprocessonly or args.uploadonly:
if args.postprocessonly or args.uploadonly or args.continue_upload:
batch = Batch(project_filename)
batch.queue_post_processing(upload_only=args.uploadonly, hipri=args.hipri)
batch.queue_post_processing(upload_only=args.uploadonly, hipri=args.hipri, continue_upload=args.continue_upload)
return

if args.rerun_failed:
Expand Down Expand Up @@ -943,6 +951,7 @@ def main():
job_array_number = int(os.environ.get("SLURM_ARRAY_TASK_ID", 0))
post_process = get_bool_env_var("POSTPROCESS")
upload_only = get_bool_env_var("UPLOADONLY")
continue_upload = get_bool_env_var("CONTINUE_UPLOAD")
measures_only = get_bool_env_var("MEASURESONLY")
sampling_only = get_bool_env_var("SAMPLINGONLY")
if job_array_number:
Expand All @@ -958,6 +967,8 @@ def main():
assert not sampling_only
if upload_only:
batch.process_results(skip_combine=True)
elif continue_upload:
batch.process_results(skip_combine=True, continue_upload=True)
else:
batch.process_results()
else:
Expand Down
11 changes: 10 additions & 1 deletion buildstockbatch/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,14 @@ def main():
)
group.add_argument(
"--uploadonly",
help="Only upload to S3, useful when postprocessing is already done. Ignores the " "upload flag in yaml",
help="Only upload to S3, useful when postprocessing is already done. Ignores the upload flag in yaml."\
" Errors out if files already exists in s3",
action="store_true",
)
group.add_argument(
"--continue_upload",
help="Only upload to S3, useful when postprocessing is already done. Ignores the upload flag in yaml."\
" Continues with remaining files if files already exists in s3",
action="store_true",
)
group.add_argument(
Expand Down Expand Up @@ -421,6 +428,8 @@ def main():
return
if args.uploadonly:
batch.process_results(skip_combine=True)
elif args.continue_upload:
batch.process_results(skip_combine=True, continue_upload=True)
else:
batch.process_results()

Expand Down
20 changes: 13 additions & 7 deletions buildstockbatch/postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ def remove_intermediate_files(fs, results_dir, keep_individual_timeseries=False)
fs.rm(ts_in_dir, recursive=True)


def upload_results(aws_conf, output_dir, results_dir, buildstock_csv_filename):
def upload_results(aws_conf, output_dir, results_dir, buildstock_csv_filename, continue_upload = False):
logger.info("Uploading the parquet files to s3")

output_folder_name = Path(output_dir).name
Expand All @@ -627,7 +627,7 @@ def upload_results(aws_conf, output_dir, results_dir, buildstock_csv_filename):
all_files.append(file.relative_to(parquet_dir))
for file in [*ts_dir.glob("_common_metadata"), *ts_dir.glob("_metadata")]:
all_files.append(file.relative_to(parquet_dir))

logger.info(f"{len(all_files)} parquet files will be uploaded.")
s3_prefix = aws_conf.get("s3", {}).get("prefix", "").rstrip("/")
s3_bucket = aws_conf.get("s3", {}).get("bucket", None)
if not (s3_prefix and s3_bucket):
Expand All @@ -637,10 +637,14 @@ def upload_results(aws_conf, output_dir, results_dir, buildstock_csv_filename):

s3 = boto3.resource("s3")
bucket = s3.Bucket(s3_bucket)
n_existing_files = len(list(bucket.objects.filter(Prefix=s3_prefix_output)))
if n_existing_files > 0:
logger.error(f"There are already {n_existing_files} files in the s3 folder {s3_bucket}/{s3_prefix_output}.")
raise FileExistsError(f"s3://{s3_bucket}/{s3_prefix_output}")
existing_files = {f.key.removeprefix(s3_prefix_output) for f in bucket.objects.filter(Prefix=s3_prefix_output)}

if len(existing_files) > 0:
logger.info(f"There are already {len(existing_files)} files in the s3 folder {s3_bucket}/{s3_prefix_output}.")
if not continue_upload:
raise FileExistsError("Either use --continue_upload or delete files from s3")
all_files = [file for file in all_files if str(file) not in existing_files]
logger.info(f"Only uploading the rest of the {len(all_files)} files")

def upload_file(filepath, s3key=None):
full_path = filepath if filepath.is_absolute() else parquet_dir.joinpath(filepath)
Expand All @@ -653,7 +657,9 @@ def upload_file(filepath, s3key=None):
tasks = list(map(dask.delayed(upload_file), all_files))
if buildstock_csv_filename is not None:
buildstock_csv_filepath = Path(buildstock_csv_filename)
if buildstock_csv_filepath.exists():
if f"buildstock_csv/{buildstock_csv_filepath.name}" in existing_files:
logger.info("Buildstock CSV already exists in s3.")
elif buildstock_csv_filepath.exists():
tasks.append(
dask.delayed(upload_file)(
buildstock_csv_filepath,
Expand Down
6 changes: 6 additions & 0 deletions docs/changelog/changelog_dev.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,9 @@ Development Changelog

Stop creating dask _metadata files for the timeseries parquet files since it crashes the
postprocessing.

.. change::
:tags: general, bugfix
:pullreq: 465

Upload only the missing files to s3

0 comments on commit d5287df

Please sign in to comment.