Skip to content

Commit

Permalink
updated path_exists
Browse files Browse the repository at this point in the history
  • Loading branch information
aniketsinghrawat committed Jan 2, 2024
1 parent fdab988 commit 427038e
Showing 1 changed file with 14 additions and 13 deletions.
27 changes: 14 additions & 13 deletions weather_mv/loader_pipeline/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import rioxarray
import xarray as xr
from apache_beam.io.filesystem import CompressionTypes, FileSystem, CompressedFile, DEFAULT_READ_BUFFER_SIZE
from apache_beam.io.filesystems import FileSystems
from pyproj import Transformer

TIF_TRANSFORM_CRS_TO = "EPSG:4326"
Expand Down Expand Up @@ -381,22 +382,22 @@ def upload(src: str, dst: str) -> None:
subprocess.run(f'gsutil -m cp {src} {dst}'.split(), check=True, capture_output=True, text=True, input="n/n")

def path_exists(path: str, force_regrid: bool = False) -> bool:
"""Check if path exists at a certain location.
First Check in Google Cloud Storage then in Local file system."""
"""Check if path exists at a certain location. Pass force_regrid to skip checking."""
if force_regrid:
return False

for command in [
f'gsutil ls {path}',
f'ls {path}'
]:
try:
subprocess.run(command.split(), check=True, capture_output=True, text=True)
logger.info(f"{path} exists.")
# Try on gcs file system.
try:
matches = FileSystems().match([path])
assert len(matches) == 1
if len(matches[0].metadata_list) > 0:
return True
except subprocess.CalledProcessError as _:
pass
logger.info(f"{path} does not exists.")
except Exception as _:
pass
# Try on local file system.
try:
return os.path.exists(path)
except Exception as _:
pass
return False

def copy(src: str, dst: str) -> None:
Expand Down

0 comments on commit 427038e

Please sign in to comment.