From 4d367d6843063389e44ed92f5ab5415c91c60cb0 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 22 Oct 2024 15:21:04 -0700 Subject: [PATCH 1/2] Added a catch to task_wait() that listens for PERMISSION_DENIED errors from Globus, then kills both the Globus prune task and Prefect prune flow. --- orchestration/globus/transfer.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/orchestration/globus/transfer.py b/orchestration/globus/transfer.py index de07afe..39dfa2d 100644 --- a/orchestration/globus/transfer.py +++ b/orchestration/globus/transfer.py @@ -243,7 +243,12 @@ def task_wait( if task["nice_status"] in ["FILE_NOT_FOUND"]: transfer_client.cancel_task(task_id) - raise TransferError("Received FILE_NOT_FOUND, cancelling task") + raise TransferError(f"Received FILE_NOT_FOUND, cancelling Globus task {task_id}") + + if task["nice_status"] in ["PERMISSION_DENIED"]: + transfer_client.cancel_task(task_id) + raise TransferError(f"Received PERMISSION_DENIED, cancelling Globus task {task_id}") + return True From 59eb8fd5ca29720b7c3b4afc8f8fc664799d57c2 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 22 Oct 2024 17:27:07 -0700 Subject: [PATCH 2/2] Adjusted prune code such that check_endpoint can be set to None rather than a GlobusEndpoint. Updated alcf.py flow to handle updates to pruning code. Updated tiff_to_zarr.py to ensure proper file system permissions are set after all files are created, otherwise there is a Globus permissions error that prevents deletion. --- orchestration/flows/bl832/alcf.py | 5 +++++ orchestration/flows/bl832/prune.py | 17 +++++++++-------- orchestration/globus/transfer.py | 4 ++-- scripts/polaris/tiff_to_zarr.py | 6 +++++- 4 files changed, 21 insertions(+), 11 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index 0f5479f..1ce7bf4 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -643,6 +643,11 @@ def process_new_832_ALCF_flow(folder_name: str, # Step 4: Schedule deletion of files from ALCF, NERSC, and data832 logger.info("Scheduling deletion of files from ALCF, NERSC, and data832") nersc_transfer_success = False + # alcf_transfer_success = True + # alcf_reconstruction_success = True + # alcf_tiff_to_zarr_success = True + # data832_tiff_transfer_success = True + # data832_zarr_transfer_success = True schedule_pruning( alcf_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, diff --git a/orchestration/flows/bl832/prune.py b/orchestration/flows/bl832/prune.py index de70751..44b87be 100644 --- a/orchestration/flows/bl832/prune.py +++ b/orchestration/flows/bl832/prune.py @@ -1,6 +1,7 @@ import logging from prefect import flow, get_run_logger from prefect.blocks.system import JSON +from typing import Union from orchestration.flows.bl832.config import Config832 from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe @@ -12,7 +13,7 @@ def prune_files( relative_path: str, source_endpoint: GlobusEndpoint, - check_endpoint: GlobusEndpoint = None, + check_endpoint: Union[GlobusEndpoint, None] = None, config=None ): """ @@ -47,7 +48,7 @@ def prune_files( def prune_spot832( relative_path: str, source_endpoint: GlobusEndpoint, - check_endpoint: GlobusEndpoint, + check_endpoint: Union[GlobusEndpoint, None] = None, config=None, ): prune_files( @@ -62,7 +63,7 @@ def prune_spot832( def prune_data832( relative_path: str, source_endpoint: GlobusEndpoint, - check_endpoint: GlobusEndpoint, + check_endpoint: Union[GlobusEndpoint, None] = None, config=None, ): prune_files( @@ -76,7 +77,7 @@ def prune_data832( def prune_data832_raw( relative_path: str, source_endpoint: GlobusEndpoint, - check_endpoint: GlobusEndpoint, + check_endpoint: Union[GlobusEndpoint, None] = None, config=None, ): prune_files( @@ -90,7 +91,7 @@ def prune_data832_raw( def prune_data832_scratch( relative_path: str, source_endpoint: GlobusEndpoint, - check_endpoint: GlobusEndpoint, + check_endpoint: Union[GlobusEndpoint, None] = None, config=None, ): prune_files( @@ -104,7 +105,7 @@ def prune_data832_scratch( def prune_alcf832_raw( relative_path: str, source_endpoint: GlobusEndpoint, - check_endpoint: GlobusEndpoint, + check_endpoint: Union[GlobusEndpoint, None] = None, config=None, ): prune_files( @@ -118,7 +119,7 @@ def prune_alcf832_raw( def prune_alcf832_scratch( relative_path: str, source_endpoint: GlobusEndpoint, - check_endpoint: GlobusEndpoint, + check_endpoint: Union[GlobusEndpoint, None] = None, config=None, ): prune_files( @@ -132,7 +133,7 @@ def prune_alcf832_scratch( def prune_nersc832_alsdev_scratch( relative_path: str, source_endpoint: GlobusEndpoint, - check_endpoint: GlobusEndpoint, + check_endpoint: Union[GlobusEndpoint, None] = None, config=None, ): prune_files( diff --git a/orchestration/globus/transfer.py b/orchestration/globus/transfer.py index 39dfa2d..812d83d 100644 --- a/orchestration/globus/transfer.py +++ b/orchestration/globus/transfer.py @@ -5,7 +5,7 @@ import os from pathlib import Path from time import time -from typing import Dict, List +from typing import Dict, List, Union from dotenv import load_dotenv from globus_sdk import ( ClientCredentialsAuthorizer, @@ -257,7 +257,7 @@ def prune_one_safe( if_older_than_days: int, tranfer_client: TransferClient, source_endpoint: GlobusEndpoint, - check_endpoint: GlobusEndpoint, + check_endpoint: Union[GlobusEndpoint, None], max_wait_seconds: int = 120, logger=logger, ): diff --git a/scripts/polaris/tiff_to_zarr.py b/scripts/polaris/tiff_to_zarr.py index 6aad60a..ece85ce 100644 --- a/scripts/polaris/tiff_to_zarr.py +++ b/scripts/polaris/tiff_to_zarr.py @@ -63,7 +63,7 @@ def main(): last_part = os.path.basename(os.path.normpath(tiff_dir)) zarr_dir = os.path.abspath(os.path.join(tiff_dir, '..', last_part + '.zarr')) if not os.path.exists(zarr_dir): - os.makedirs(zarr_dir) + os.makedirs(zarr_dir, mode=0o2775, exist_ok=True) print('Output directory: ' + zarr_dir) @@ -80,6 +80,10 @@ def main(): # Set permissions for the output directory and its contents set_permissions_recursive(zarr_dir) + # Extract and set permissions for the parent directory (folder_name) + parent_dir = os.path.abspath(os.path.join(tiff_dir, '../')) # Extract parent directory + set_permissions_recursive(parent_dir) # Set permissions for parent directory + if __name__ == "__main__": main()