diff --git a/buildstockbatch/gcp/gcp.py b/buildstockbatch/gcp/gcp.py index 800b87f5..39f511bf 100644 --- a/buildstockbatch/gcp/gcp.py +++ b/buildstockbatch/gcp/gcp.py @@ -931,21 +931,23 @@ def start_combine_results_job_on_cloud(self, results_dir, do_timeseries=True): # Create the job jobs_client = run_v2.JobsClient() - jobs_client.create_job( - run_v2.CreateJobRequest( + op = jobs_client.create_job( + request=run_v2.CreateJobRequest( parent=f"projects/{self.gcp_project}/locations/{self.region}", job_id=self.postprocessing_job_id, job=job, ) ) + # Wait for the operation to finish + op.result() + logger.info("Job created successfully. Starting the job...") # Start the job! - attempts_remaining = 3 - while True: - try: - jobs_client.run_job(name=self.postprocessing_job_name) - logger.info( - f""" + try: + op = jobs_client.run_job(name=self.postprocessing_job_name) + + logger.info( + f""" ╔══════════════════════════════════════════════════════════════════════════════╗ ║ Post-processing Cloud Run Job started! ║ ║ ║ @@ -958,40 +960,26 @@ def start_combine_results_job_on_cloud(self, results_dir, do_timeseries=True): https://console.cloud.google.com/storage/browser/{self.gcs_bucket}/{self.gcs_prefix}/results/ Run this script with --clean to clean up the GCP environment after post-processing is complete.""" - ) - break - except: - attempts_remaining -= 1 - if attempts_remaining > 0: - # retry after delay - logger.warning( - "Post-processing Cloud Run job failed to start. " - f"{attempts_remaining} attempt(s) remaining. " - "Will retry in 1 second...", - exc_info=logger.isEnabledFor(logging.DEBUG), - ) - time.sleep(1) - continue - - # no attempts remaining - logger.warning( - "Post-processing Cloud Run job failed to start after three attempts. " - "You may want to investigate why and try starting it at the console: " - f"{self.postprocessing_job_console_url}", - exc_info=True, - ) - return + ) + except: + logger.warning( + "Post-processing Cloud Run job failed to start. " + "You may want to investigate why and try starting it at the console: " + f"{self.postprocessing_job_console_url}", + exc_info=True, + ) + return # Monitor job/execution status, starting by polling the Job for an Execution logger.info("Waiting for execution to begin...") job_start_time = datetime.now() job = self.get_existing_postprocessing_job() - while not job.latest_created_execution: + while not job.latest_created_execution or not job.latest_created_execution.name: time.sleep(1) job = self.get_existing_postprocessing_job() execution_start_time = datetime.now() logger.info( - f"Execution has started (after {str(execution_start_time - job_start_time)} " + f"Execution has started (after {(execution_start_time - job_start_time).total_seconds()} " "seconds). Waiting for execution to finish..." )