diff --git a/weather_mv/loader_pipeline/sinks.py b/weather_mv/loader_pipeline/sinks.py index 37752a24..e6331dfd 100644 --- a/weather_mv/loader_pipeline/sinks.py +++ b/weather_mv/loader_pipeline/sinks.py @@ -33,6 +33,7 @@ import rioxarray import xarray as xr from apache_beam.io.filesystem import CompressionTypes, FileSystem, CompressedFile, DEFAULT_READ_BUFFER_SIZE +from apache_beam.io.filesystems import FileSystems from pyproj import Transformer TIF_TRANSFORM_CRS_TO = "EPSG:4326" @@ -381,22 +382,22 @@ def upload(src: str, dst: str) -> None: subprocess.run(f'gsutil -m cp {src} {dst}'.split(), check=True, capture_output=True, text=True, input="n/n") def path_exists(path: str, force_regrid: bool = False) -> bool: - """Check if path exists at a certain location. - First Check in Google Cloud Storage then in Local file system.""" + """Check if path exists at a certain location. Pass force_regrid to skip checking.""" if force_regrid: return False - - for command in [ - f'gsutil ls {path}', - f'ls {path}' - ]: - try: - subprocess.run(command.split(), check=True, capture_output=True, text=True) - logger.info(f"{path} exists.") + # Try on gcs file system. + try: + matches = FileSystems().match([path]) + assert len(matches) == 1 + if len(matches[0].metadata_list) > 0: return True - except subprocess.CalledProcessError as _: - pass - logger.info(f"{path} does not exists.") + except Exception as _: + pass + # Try on local file system. + try: + return os.path.exists(path) + except Exception as _: + pass return False def copy(src: str, dst: str) -> None: