From 85dd522d9f980b9f1efd233c144e83a053773268 Mon Sep 17 00:00:00 2001 From: deepgabani8 Date: Fri, 27 Sep 2024 09:26:30 +0000 Subject: [PATCH] Yielding asset_name from the last step of the pipeline. --- weather_mv/loader_pipeline/ee.py | 8 ++++---- weather_mv/loader_pipeline/metrics.py | 4 ++-- weather_mv/setup.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/weather_mv/loader_pipeline/ee.py b/weather_mv/loader_pipeline/ee.py index 9f8de0c7..a847db6c 100644 --- a/weather_mv/loader_pipeline/ee.py +++ b/weather_mv/loader_pipeline/ee.py @@ -764,7 +764,7 @@ def start_ingestion(self, asset_data: AssetData) -> t.Optional[str]: 'endTime': asset_data.end_time, 'properties': asset_data.properties, }) - return result.get('id') + return asset_name elif self.ee_asset_type == 'TABLE': # ingest a feature collection. self.wait_for_task_queue() task_id = ee.data.newTaskId(1)[0] @@ -777,7 +777,7 @@ def start_ingestion(self, asset_data: AssetData) -> t.Optional[str]: 'endTime': asset_data.end_time, 'properties': asset_data.properties }) - return response.get('id') + return asset_name except ee.EEException as e: if "Could not parse a valid CRS from the first overview of the GeoTIFF" in repr(e): logger.info(f"Failed to create asset '{asset_name}' in earth engine: {e}. Moving on...") @@ -794,8 +794,8 @@ def start_ingestion(self, asset_data: AssetData) -> t.Optional[str]: @timeit('IngestIntoEE') def process(self, asset_data: AssetData) -> t.Iterator[t.Tuple[str, float]]: """Uploads an asset into the earth engine.""" - asset_id = self.start_ingestion(asset_data) + asset_name = self.start_ingestion(asset_data) metric.Metrics.counter('Success', 'IngestIntoEE').inc() asset_start_time = asset_data.start_time - yield asset_id, asset_start_time + yield asset_name, asset_start_time diff --git a/weather_mv/loader_pipeline/metrics.py b/weather_mv/loader_pipeline/metrics.py index 0f0f96e5..65dad9e5 100644 --- a/weather_mv/loader_pipeline/metrics.py +++ b/weather_mv/loader_pipeline/metrics.py @@ -142,7 +142,7 @@ def process(self, element): try: if len(element) == 0: raise ValueError("time_dict not found.") - (asset_id, asset_start_time), time_dict = element + (asset_name, asset_start_time), time_dict = element if not isinstance(time_dict, OrderedDict): raise ValueError("time_dict not found.") @@ -178,7 +178,7 @@ def process(self, element): in zip(time_dict.items(), list(time_dict.items())[1:]) } logger.info( - f"Step intervals for {uri}:{asset_id} :: {json.dumps(step_intervals, indent=4)}" + f"Step intervals for {uri}:{asset_name} :: {json.dumps(step_intervals, indent=4)}" ) yield ("custom_metrics", (data_latency_ms / 1000, element_processing_time / 1000)) diff --git a/weather_mv/setup.py b/weather_mv/setup.py index dbf80916..37f92038 100644 --- a/weather_mv/setup.py +++ b/weather_mv/setup.py @@ -65,7 +65,7 @@ packages=find_packages(), author='Anthromets', author_email='anthromets-ecmwf@google.com', - version='0.2.31', + version='0.2.32', url='https://weather-tools.readthedocs.io/en/latest/weather_mv/', description='A tool to load weather data into BigQuery.', install_requires=beam_gcp_requirements + base_requirements,