Skip to content

Commit

Permalink
updated check for assets already present in EE
Browse files Browse the repository at this point in the history
  • Loading branch information
Piyush-Ingale committed Jul 12, 2023
1 parent ffd7613 commit 41c9fda
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 5 deletions.
6 changes: 4 additions & 2 deletions weather_mv/loader_pipeline/ee.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,10 @@ def process(self, uri: str) -> t.Iterator[str]:

# Checks if the asset is already present in the GCS bucket or not.
target_path = os.path.join(
self.asset_location, f'{asset_name}{ASSET_TYPE_TO_EXTENSION_MAPPING[self.ee_asset_type]}')
if not self.force_overwrite and FileSystems.exists(target_path):
self.asset_location, f'{asset_name}*{ASSET_TYPE_TO_EXTENSION_MAPPING[self.ee_asset_type]}')
files = FileSystems.match([target_path])

if not self.force_overwrite and files[0].metadata_list:
logger.info(f'Asset file {target_path} already exists in GCS bucket. Skipping...')
return

Expand Down
7 changes: 4 additions & 3 deletions weather_mv/loader_pipeline/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ def create_partition(ds:xr.Dataset, group:t.List[dict], flat_dims:t.Set[str], co
channel_name += f'{attrs.get("GRIB_stepType")}_{var}'
else:
channel_name += var
channel_name = re.sub(r'[^a-zA-Z0-9_]+', r'_', channel_name)
da_units['unit_'+channel_name] = None
if 'units' in attrs:
da_units['unit_'+channel_name] = attrs['units']
Expand Down Expand Up @@ -554,9 +555,9 @@ def __open_dataset_file(filename: str,
ds = xr.open_dataset(filename)
return _add_is_normalized_attr(__partition_dataset(ds, tiff_config), False)
except ValueError as e:
e_str = str(e)
if not ("Consider explicitly selecting one of the installed engines" in e_str and "cfgrib" in e_str):
raise
e_str = str(e)
if not ("Consider explicitly selecting one of the installed engines" in e_str and "cfgrib" in e_str):
raise

if not disable_grib_schema_normalization:
logger.warning("Assuming grib.")
Expand Down

0 comments on commit 41c9fda

Please sign in to comment.