Skip to content

Commit

Permalink
Yielding asset_name from the last step of the pipeline.
Browse files Browse the repository at this point in the history
  • Loading branch information
deepgabani8 committed Sep 27, 2024
1 parent 571e4c0 commit 85dd522
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
8 changes: 4 additions & 4 deletions weather_mv/loader_pipeline/ee.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ def start_ingestion(self, asset_data: AssetData) -> t.Optional[str]:
'endTime': asset_data.end_time,
'properties': asset_data.properties,
})
return result.get('id')
return asset_name
elif self.ee_asset_type == 'TABLE': # ingest a feature collection.
self.wait_for_task_queue()
task_id = ee.data.newTaskId(1)[0]
Expand All @@ -777,7 +777,7 @@ def start_ingestion(self, asset_data: AssetData) -> t.Optional[str]:
'endTime': asset_data.end_time,
'properties': asset_data.properties
})
return response.get('id')
return asset_name
except ee.EEException as e:
if "Could not parse a valid CRS from the first overview of the GeoTIFF" in repr(e):
logger.info(f"Failed to create asset '{asset_name}' in earth engine: {e}. Moving on...")
Expand All @@ -794,8 +794,8 @@ def start_ingestion(self, asset_data: AssetData) -> t.Optional[str]:
@timeit('IngestIntoEE')
def process(self, asset_data: AssetData) -> t.Iterator[t.Tuple[str, float]]:
"""Uploads an asset into the earth engine."""
asset_id = self.start_ingestion(asset_data)
asset_name = self.start_ingestion(asset_data)
metric.Metrics.counter('Success', 'IngestIntoEE').inc()

asset_start_time = asset_data.start_time
yield asset_id, asset_start_time
yield asset_name, asset_start_time
4 changes: 2 additions & 2 deletions weather_mv/loader_pipeline/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def process(self, element):
try:
if len(element) == 0:
raise ValueError("time_dict not found.")
(asset_id, asset_start_time), time_dict = element
(asset_name, asset_start_time), time_dict = element
if not isinstance(time_dict, OrderedDict):
raise ValueError("time_dict not found.")

Expand Down Expand Up @@ -178,7 +178,7 @@ def process(self, element):
in zip(time_dict.items(), list(time_dict.items())[1:])
}
logger.info(
f"Step intervals for {uri}:{asset_id} :: {json.dumps(step_intervals, indent=4)}"
f"Step intervals for {uri}:{asset_name} :: {json.dumps(step_intervals, indent=4)}"
)

yield ("custom_metrics", (data_latency_ms / 1000, element_processing_time / 1000))
Expand Down
2 changes: 1 addition & 1 deletion weather_mv/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
packages=find_packages(),
author='Anthromets',
author_email='[email protected]',
version='0.2.31',
version='0.2.32',
url='https://weather-tools.readthedocs.io/en/latest/weather_mv/',
description='A tool to load weather data into BigQuery.',
install_requires=beam_gcp_requirements + base_requirements,
Expand Down

0 comments on commit 85dd522

Please sign in to comment.