Skip to content

Commit

Permalink
Merge pull request #62 from rewiringamerica/natalie/retry-missing
Browse files Browse the repository at this point in the history
Add the option to rerun failed tasks
  • Loading branch information
mfathollahzadeh authored Jan 31, 2024
2 parents cdb1649 + a941448 commit feb1607
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 23 deletions.
77 changes: 61 additions & 16 deletions buildstockbatch/cloud/docker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import pandas as pd
import pathlib
import random
import re
import shutil
import subprocess
import tarfile
Expand Down Expand Up @@ -105,8 +106,13 @@ class BatchInfo:
CONTAINER_RUNTIME = ContainerRuntime.DOCKER
MAX_JOB_COUNT = 10000

def __init__(self, project_filename):
def __init__(self, project_filename, missing_only=False):
"""
:param missing_only: If true, use asset files from a previous job and only run the simulations
that don't already have successful results from a previous run. Support must be added in subclasses.
"""
super().__init__(project_filename)
self.missing_only = missing_only

self.docker_client = docker.DockerClient.from_env()
try:
Expand All @@ -115,6 +121,9 @@ def __init__(self, project_filename):
logger.error("The docker server did not respond, make sure Docker Desktop is started then retry.")
raise RuntimeError("The docker server did not respond, make sure Docker Desktop is started then retry.")

def get_fs(self):
return LocalFileSystem()

@staticmethod
def validate_project(project_file):
super(DockerBatchBase, DockerBatchBase).validate_project(project_file)
Expand All @@ -139,7 +148,7 @@ def copy_files_at_cloud(self, files_to_copy):
:param files_to_copy: a dict where the key is a file on the cloud to copy, and the value is
the filename to copy the source file to. Both are relative to the ``tmppath`` used in
``prep_batches()`` (so the implementation should prepend the bucket name and prefix
``_run_batch_prep()`` (so the implementation should prepend the bucket name and prefix
where they were uploaded to by ``upload_batch_files_to_cloud``).
"""
raise NotImplementedError
Expand All @@ -166,12 +175,15 @@ def run_batch(self):
tmppath = pathlib.Path(tmpdir)
epws_to_copy, batch_info = self._run_batch_prep(tmppath)

# Copy all the files to cloud storage
logger.info("Uploading files for batch...")
self.upload_batch_files_to_cloud(tmppath)
# If we're rerunning failed tasks from a previous job, DO NOT overwrite the job files.
# That would assign a new random set of buildings to each task, making the rerun useless.
if not self.missing_only:
# Copy all the files to cloud storage
logger.info("Uploading files for batch...")
self.upload_batch_files_to_cloud(tmppath)

logger.info("Copying duplicate weather files...")
self.copy_files_at_cloud(epws_to_copy)
logger.info("Copying duplicate weather files...")
self.copy_files_at_cloud(epws_to_copy)

self.start_batch_job(batch_info)

Expand Down Expand Up @@ -200,18 +212,22 @@ def _run_batch_prep(self, tmppath):
:returns: DockerBatchBase.BatchInfo
"""

# Project configuration
logger.info("Writing project configuration for upload...")
with open(tmppath / "config.json", "wt", encoding="utf-8") as f:
json.dump(self.cfg, f)
if not self.missing_only:
# Project configuration
logger.info("Writing project configuration for upload...")
with open(tmppath / "config.json", "wt", encoding="utf-8") as f:
json.dump(self.cfg, f)

# Collect simulations to queue (along with the EPWs those sims need)
logger.info("Preparing simulation batch jobs...")
batch_info, epws_needed = self._prep_jobs_for_batch(tmppath)

# Weather files
logger.info("Prepping weather files...")
epws_to_copy = self._prep_weather_files_for_batch(tmppath, epws_needed)
if self.missing_only:
epws_to_copy = None
else:
# Weather files
logger.info("Prepping weather files...")
epws_to_copy = self._prep_weather_files_for_batch(tmppath, epws_needed)

return (epws_to_copy, batch_info)

Expand Down Expand Up @@ -364,6 +380,9 @@ def _prep_jobs_for_batch(self, tmppath):
)
job_count = i
logger.debug("Job count = {}".format(job_count))
batch_info = DockerBatchBase.BatchInfo(n_sims=n_sims, n_sims_per_job=n_sims_per_job, job_count=job_count)
if self.missing_only:
return batch_info, epws_needed

# Compress job jsons
jobs_dir = tmppath / "jobs"
Expand Down Expand Up @@ -394,7 +413,7 @@ def _prep_jobs_for_batch(self, tmppath):
"lib/housing_characteristics",
)

return DockerBatchBase.BatchInfo(n_sims=n_sims, n_sims_per_job=n_sims_per_job, job_count=job_count), epws_needed
return batch_info, epws_needed

def _determine_epws_needed_for_batch(self, buildstock_df):
"""
Expand Down Expand Up @@ -506,7 +525,7 @@ def run_simulations(cls, cfg, job_id, jobs_d, sim_dir, fs, output_path):
elif os.path.isfile(item):
os.remove(item)

# Upload simulation outputs tarfile to s3
# Upload simulation outputs tarfile to bucket
fs.put(
str(simulation_output_tar_filename),
f"{output_path}/results/simulation_output/simulations_job{job_id}.tar.gz",
Expand All @@ -528,6 +547,32 @@ def run_simulations(cls, cfg, job_id, jobs_d, sim_dir, fs, output_path):
elif os.path.isfile(item):
os.remove(item)

def find_missing_tasks(self, expected):
"""Creates a file with a list of task numbers that are missing results.
This only checks for results_job[ID].json.gz files in the results directory.
:param expected: Number of result files expected.
:returns: The number of files that were missing.
"""
fs = self.get_fs()
done_tasks = set()

for f in fs.ls(f"{self.results_dir}/simulation_output/"):
if m := re.match(".*results_job(\\d*).json.gz$", f):
done_tasks.add(int(m.group(1)))

missing_tasks = []
with fs.open(f"{self.results_dir}/missing_tasks.txt", "w") as f:
for task_id in range(expected):
if task_id not in done_tasks:
f.write(f"{task_id}\n")
missing_tasks.append(str(task_id))

logger.info(f"Found missing tasks: {', '.join(missing_tasks)}")

return len(missing_tasks)

def log_summary(self):
"""
Log a summary of how many simulations succeeded, failed, or ended with other statuses.
Expand Down
43 changes: 36 additions & 7 deletions buildstockbatch/gcp/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ class GcpBatch(DockerBatchBase):
DEFAULT_PP_CPUS = 2
DEFAULT_PP_MEMORY_MIB = 4096

def __init__(self, project_filename, job_identifier=None):
def __init__(self, project_filename, job_identifier=None, missing_only=False):
"""
:param project_filename: Path to the project's configuration file.
:param job_identifier: Optional job ID that will override gcp.job_identifier from the project file.
"""
super().__init__(project_filename)
super().__init__(project_filename, missing_only)

if job_identifier:
assert len(job_identifier) <= 48, "Job identifier must be no longer than 48 characters."
Expand Down Expand Up @@ -591,6 +591,7 @@ def start_batch_job(self, batch_info):
"JOB_NAME": self.job_identifier,
"GCS_PREFIX": self.gcs_prefix,
"GCS_BUCKET": self.gcs_bucket,
"MISSING_ONLY": str(self.missing_only),
}
bsb_runnable.environment = environment

Expand Down Expand Up @@ -622,9 +623,21 @@ def start_batch_job(self, batch_info):
max_run_duration=f"{task_duration_secs}s",
)

if self.missing_only:
# Save a list of task numbers to rerun in a file on GCS. Task i of the new job will read line i of
# this file to find the task of the original job it should rerun.
if job_count := self.find_missing_tasks(batch_info.job_count):
logger.info(f"Found {job_count} out of {batch_info.job_count} tasks to rerun.")
else:
raise ValidationError(
f"There are no tasks to retry. All {batch_info.job_count} results files are present."
)
else:
job_count = batch_info.job_count

# How many of these tasks to run.
group = batch_v1.TaskGroup(
task_count=batch_info.job_count,
task_count=job_count,
parallelism=gcp_cfg.get("parallelism", None),
task_spec=task,
)
Expand Down Expand Up @@ -700,7 +713,7 @@ def start_batch_job(self, batch_info):
# Monitor job status while waiting for the job to complete
n_completed_last_time = 0
client = batch_v1.BatchServiceClient()
with tqdm.tqdm(desc="Running Simulations", total=batch_info.job_count, unit="task") as progress_bar:
with tqdm.tqdm(desc="Running Simulations", total=job_count, unit="task") as progress_bar:
job_status = None
while job_status not in ("SUCCEEDED", "FAILED", "DELETION_IN_PROGRESS"):
time.sleep(10)
Expand Down Expand Up @@ -742,7 +755,7 @@ def start_batch_job(self, batch_info):
tsv_logger.log_stats(logging.INFO)

@classmethod
def run_task(cls, task_index, job_name, gcs_bucket, gcs_prefix):
def run_task(cls, task_index, job_name, gcs_bucket, gcs_prefix, missing_only):
"""
Run a few simulations inside a container.
Expand All @@ -754,13 +767,21 @@ def run_task(cls, task_index, job_name, gcs_bucket, gcs_prefix):
:param job_name: Job identifier
:param gcs_bucket: GCS bucket for input and output files
:param gcs_prefix: Prefix used for GCS files
:param missing_only: If True, rerun a task from a previous job. The provided task_index is used as an index
into the list of tasks that need to be rerun.
"""
# Local directory where we'll write files
sim_dir = pathlib.Path("/var/simdata/openstudio")

client = storage.Client()
bucket = client.get_bucket(gcs_bucket)

if missing_only:
# Find task number of the original task we're retrying.
tasks = bucket.blob(f"{gcs_prefix}/results/missing_tasks.txt").download_as_bytes().decode()
task_index = int(tasks.split()[task_index])
logger.info(f"Rerunning task {task_index}")

logger.info("Extracting assets TAR file")
# Copy assets file to local machine to extract TAR file
assets_file_path = sim_dir.parent / "assets.tar.gz"
Expand Down Expand Up @@ -1119,7 +1140,8 @@ def main():
gcs_bucket = os.environ["GCS_BUCKET"]
gcs_prefix = os.environ["GCS_PREFIX"]
job_name = os.environ["JOB_NAME"]
GcpBatch.run_task(task_index, job_name, gcs_bucket, gcs_prefix)
missing_only = os.environ["MISSING_ONLY"] == "True"
GcpBatch.run_task(task_index, job_name, gcs_bucket, gcs_prefix, missing_only)
elif "POSTPROCESS" == os.environ.get("JOB_TYPE", ""):
gcs_bucket = os.environ["GCS_BUCKET"]
gcs_prefix = os.environ["GCS_PREFIX"]
Expand Down Expand Up @@ -1154,6 +1176,13 @@ def main():
help="Only do postprocessing, useful for when the simulations are already done",
action="store_true",
)
group.add_argument(
"--missingonly",
action="store_true",
help="Only run batches of simulations that are missing from a previous job, then run post-processing. "
"Assumes that the project file is the same as the previous job, other than the job identifier. "
"Will not rerun individual failed simulations, only full batches that are missing.",
)
parser.add_argument(
"-v",
"--verbose",
Expand All @@ -1172,7 +1201,7 @@ def main():
if args.validateonly:
return True

batch = GcpBatch(args.project_filename, args.job_identifier)
batch = GcpBatch(args.project_filename, args.job_identifier, missing_only=args.missingonly)
if args.clean:
batch.clean()
return
Expand Down
20 changes: 20 additions & 0 deletions buildstockbatch/test/test_docker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,23 @@ def test_run_simulations(basic_residential_project_file):
# Check that files were cleaned up correctly
assert not os.listdir(sim_dir)
os.chdir(old_cwd)


def test_find_missing_tasks(basic_residential_project_file, mocker):
project_filename, results_dir = basic_residential_project_file()
results_dir = os.path.join(results_dir, "results")
mocker.patch.object(DockerBatchBase, "results_dir", results_dir)
dbb = DockerBatchBase(project_filename)

existing_results = [1, 3, 5]
missing_results = [0, 2, 4, 6]
os.makedirs(os.path.join(results_dir, "simulation_output"))
for res in existing_results:
# Create empty results files
with open(os.path.join(results_dir, f"simulation_output/results_job{res}.json.gz"), "w"):
pass

assert dbb.find_missing_tasks(7) == len(missing_results)

with open(os.path.join(results_dir, "missing_tasks.txt"), "r") as f:
assert [int(t) for t in f.readlines()] == missing_results
13 changes: 13 additions & 0 deletions docs/run_sims.rst
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,19 @@ You can optionally override the ``job_identifier`` from the command line
quickly assign a new ID with each run without updating the config file.


Retry failed tasks
..................

Occasionally, especially when using spot instances, tasks will fail for transient reasons, like
the VM being preempted. While preempted tasks are automatically retried a few times, if they continue
to fail, the entire job will fail and postprocessing will not run.

If this happens, you can rerun the same job with the ``--missingonly`` flag. This will rerun only the
tasks that didn't produce output files, then run postprocessing. Note: This flag assumes that your
project config file has not changed since the previous run, other than the job identifier.
If it has changed, the behavior is undefined.


Show existing jobs
..................

Expand Down

0 comments on commit feb1607

Please sign in to comment.