diff --git a/weather_mv/loader_pipeline/bq.py b/weather_mv/loader_pipeline/bq.py index bc71da01..2f1bbb9b 100644 --- a/weather_mv/loader_pipeline/bq.py +++ b/weather_mv/loader_pipeline/bq.py @@ -28,6 +28,7 @@ from apache_beam.io import WriteToBigQuery, BigQueryDisposition from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.transforms import window +from apache_beam.metrics import metric from google.cloud import bigquery from xarray.core.utils import ensure_us_time_resolution @@ -294,7 +295,7 @@ def to_rows(self, coordinates: t.Iterable[t.Dict], ds: xr.Dataset, uri: str) -> # 'row' ends up looking like: # {'latitude': 88.0, 'longitude': 2.0, 'time': '2015-01-01 06:00:00', 'd': -2.0187, 'cc': 0.007812, # 'z': 50049.8, 'data_import_time': '2020-12-05 00:12:02.424573 UTC', ...} - beam.metrics.Metrics.counter('Success', 'ExtractRows').inc() + metric.Metrics.counter('Success', 'ExtractRows').inc() yield row def chunks_to_rows(self, _, ds: xr.Dataset) -> t.Iterator[t.Dict]: diff --git a/weather_mv/loader_pipeline/ee.py b/weather_mv/loader_pipeline/ee.py index cd2f5dcc..2bef5eb7 100644 --- a/weather_mv/loader_pipeline/ee.py +++ b/weather_mv/loader_pipeline/ee.py @@ -30,6 +30,7 @@ import ee import numpy as np from apache_beam.io.filesystems import FileSystems +from apache_beam.metrics import metric from apache_beam.io.gcp.gcsio import WRITE_CHUNK_SIZE from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.utils import retry @@ -40,6 +41,7 @@ from .sinks import ToDataSink, open_dataset, open_local, KwargsFactoryMixin, upload from .util import make_attrs_ee_compatible, RateLimit, validate_region, get_utc_timestamp +from .metrics import timeit, AddTimer, AddMetrics logger = logging.getLogger(__name__) @@ -353,10 +355,14 @@ def expand(self, paths): if not self.dry_run: ( paths + | 'AddTimer' >> beam.ParDo(AddTimer()) | 'FilterFiles' >> FilterFilesTransform.from_kwargs(**vars(self)) | 'ReshuffleFiles' >> beam.Reshuffle() - | 'ConvertToAsset' >> ConvertToAsset.from_kwargs(band_names_dict=band_names_dict, **vars(self)) + | 'ConvertToAsset' >> beam.ParDo( + ConvertToAsset.from_kwargs(band_names_dict=band_names_dict, **vars(self)) + ) | 'IngestIntoEE' >> IngestIntoEETransform.from_kwargs(**vars(self)) + | 'AddMetrics' >> beam.ParDo(AddMetrics()) ) else: ( @@ -402,6 +408,7 @@ def __init__(self, self.ee_asset_type = ee_asset_type self.force_overwrite = force + @timeit('FilterFileTransform') def process(self, uri: str) -> t.Iterator[str]: """Yields uri if the asset does not already exist.""" self.check_setup() @@ -423,7 +430,7 @@ def process(self, uri: str) -> t.Iterator[str]: @dataclasses.dataclass -class ConvertToAsset(beam.DoFn, beam.PTransform, KwargsFactoryMixin): +class ConvertToAsset(beam.DoFn, KwargsFactoryMixin): """Writes asset after extracting input data and uploads it to GCS. Attributes: @@ -579,6 +586,7 @@ def get_dims_data(index: int) -> t.List[t.Any]: self.add_to_queue(queue, asset_data) self.add_to_queue(queue, None) # Indicates end of the subprocess. + @timeit('ConvertToAsset') def process(self, uri: str) -> t.Iterator[AssetData]: """Opens grib files and yields AssetData. @@ -610,9 +618,6 @@ def process(self, uri: str) -> t.Iterator[AssetData]: process.terminate() - def expand(self, pcoll): - return pcoll | beam.FlatMap(self.process) - class IngestIntoEETransform(SetupEarthEngine, KwargsFactoryMixin): """Ingests asset into earth engine and yields asset id. @@ -757,9 +762,10 @@ def start_ingestion(self, asset_data: AssetData) -> t.Optional[str]: ee.data.deleteAsset(asset_name) raise + @timeit('IngestIntoEE') def process(self, asset_data: AssetData) -> t.Iterator[str]: """Uploads an asset into the earth engine.""" asset_id = self.start_ingestion(asset_data) - beam.metrics.Metrics.counter('Success', 'IngestIntoEE').inc() + metric.Metrics.counter('Success', 'IngestIntoEE').inc() yield asset_id diff --git a/weather_mv/loader_pipeline/ee_test.py b/weather_mv/loader_pipeline/ee_test.py index 7627152e..9ac78826 100644 --- a/weather_mv/loader_pipeline/ee_test.py +++ b/weather_mv/loader_pipeline/ee_test.py @@ -75,7 +75,7 @@ def test_convert_to_image_asset(self): data_path = f'{self.test_data_folder}/test_data_grib_single_timestep' asset_path = os.path.join(self.tmpdir.name, 'test_data_grib_single_timestep.tiff') - next(self.convert_to_image_asset.process(data_path)) + next(self.convert_to_image_asset.process((data_path, {}))) # The size of tiff is expected to be more than grib. self.assertTrue(os.path.getsize(asset_path) > os.path.getsize(data_path)) @@ -84,7 +84,7 @@ def test_convert_to_image_asset__with_multiple_grib_edition(self): data_path = f'{self.test_data_folder}/test_data_grib_multiple_edition_single_timestep.bz2' asset_path = os.path.join(self.tmpdir.name, 'test_data_grib_multiple_edition_single_timestep.tiff') - next(self.convert_to_image_asset.process(data_path)) + next(self.convert_to_image_asset.process((data_path, {}))) # The size of tiff is expected to be more than grib. self.assertTrue(os.path.getsize(asset_path) > os.path.getsize(data_path)) @@ -93,7 +93,7 @@ def test_convert_to_table_asset(self): data_path = f'{self.test_data_folder}/test_data_grib_single_timestep' asset_path = os.path.join(self.tmpdir.name, 'test_data_grib_single_timestep.csv') - next(self.convert_to_table_asset.process(data_path)) + next(self.convert_to_table_asset.process((data_path, {}))) # The size of tiff is expected to be more than grib. self.assertTrue(os.path.getsize(asset_path) > os.path.getsize(data_path)) @@ -102,7 +102,7 @@ def test_convert_to_table_asset__with_multiple_grib_edition(self): data_path = f'{self.test_data_folder}/test_data_grib_multiple_edition_single_timestep.bz2' asset_path = os.path.join(self.tmpdir.name, 'test_data_grib_multiple_edition_single_timestep.csv') - next(self.convert_to_table_asset.process(data_path)) + next(self.convert_to_table_asset.process((data_path, {}))) # The size of tiff is expected to be more than grib. self.assertTrue(os.path.getsize(asset_path) > os.path.getsize(data_path)) diff --git a/weather_mv/loader_pipeline/metrics.py b/weather_mv/loader_pipeline/metrics.py new file mode 100644 index 00000000..5a0a289c --- /dev/null +++ b/weather_mv/loader_pipeline/metrics.py @@ -0,0 +1,107 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Utilities for adding metrics to beam pipeline.""" + +import time +import copy +import inspect +from functools import wraps +import apache_beam as beam +from apache_beam.metrics import metric + + +def timeit(func_name: str, keyed_fn: bool = False): + """Decorator to add time it takes for an element to be processed by a stage. + + Args: + func_name: A unique name of the stage. + keyed_fn (optional): This has to be passed true if the input is adding keys to the element. + + For example a stage like + + class Shard(beam.DoFn): + @timeit('Sharding', keyed_fn=True) + def process(self,element): + key = randrange(10) + yield key, element + + We are passing `keyed_fn=True` as we are adding a key to our element. Usually keys are added + to later group the element by a `GroupBy` stage. + """ + def decorator(func): + @wraps(func) + def wrapper(self, *args, **kwargs): + start_time = time.time() + time_dict = {} + + # Only the first timer wrapper will have no time_dict. + # All subsequent wrappers can extract out the dict. + # args 0 would be a tuple. + if len(args[0]) == 1: + raise ValueError('time_dict not found.') + + element, time_dict = args[0] + args = (element,) + args[1:] + + if not isinstance(time_dict, dict): + raise ValueError('time_dict not found.') + + # If the function is a generator, yield the output + # othewise return it. + if inspect.isgeneratorfunction(func): + for result in func(self, *args, **kwargs): + end_time = time.time() + processing_time = end_time - start_time + new_time_dict = copy.deepcopy(time_dict) + new_time_dict[func_name] = processing_time + if keyed_fn: + (key, element) = result + yield key, (element, new_time_dict) + else: + yield result, new_time_dict + else: + raise ValueError("Function is not a generator.") + + return wrapper + return decorator + + +class AddTimer(beam.DoFn): + """DoFn to add a empty time_dict per element in PCollection. This dict will stage_names as keys + and the time it took for that element in that stage.""" + def process(self, element): + time_dict = {} + yield element, time_dict + + +class AddMetrics(beam.DoFn): + """DoFn to add Element Processing Time metric to beam. Expects PCollection to contain a time_dict.""" + + def __init__(self): + super().__init__() + self.element_processing_time = metric.Metrics.distribution('Time', 'element_processing_time_ms') + + def process(self, element): + if len(element) == 0: + raise ValueError("time_dict not found.") + _, time_dict = element + if not isinstance(time_dict, dict): + raise ValueError("time_dict not found.") + + total_time = 0 + for stage_time in time_dict.values(): + total_time += stage_time + + self.element_processing_time.update(int(total_time * 1000)) diff --git a/weather_mv/loader_pipeline/util.py b/weather_mv/loader_pipeline/util.py index 079b86de..a17150ce 100644 --- a/weather_mv/loader_pipeline/util.py +++ b/weather_mv/loader_pipeline/util.py @@ -38,6 +38,7 @@ from xarray.core.utils import ensure_us_time_resolution from .sinks import DEFAULT_COORD_KEYS +from .metrics import timeit logger = logging.getLogger(__name__) @@ -324,6 +325,16 @@ def _shard(elem, num_shards: int): return (np.random.randint(0, num_shards), elem) +class Shard(beam.DoFn): + """DoFn to shard elements into groups.""" + def __init__(self, num_shards: int): + super().__init__() + self.num_shards = num_shards + + @timeit('Sharding', keyed_fn=True) + def process(self, element, *args, **kwargs): + yield _shard(element, num_shards=self.num_shards) + class RateLimit(beam.PTransform, abc.ABC): """PTransform to extend to apply a global rate limit to an operation. @@ -377,7 +388,7 @@ def process(self, elem: t.Any): def expand(self, pcol: beam.PCollection): return (pcol - | beam.Map(_shard, self._num_shards) + | beam.ParDo(Shard(num_shards=self._num_shards)) | beam.GroupByKey() | beam.ParDo( _RateLimitDoFn(self.process, self._latency_per_request))) diff --git a/weather_mv/setup.py b/weather_mv/setup.py index c4c86b58..88e0cd5a 100644 --- a/weather_mv/setup.py +++ b/weather_mv/setup.py @@ -65,7 +65,7 @@ packages=find_packages(), author='Anthromets', author_email='anthromets-ecmwf@google.com', - version='0.2.24', + version='0.2.25', url='https://weather-tools.readthedocs.io/en/latest/weather_mv/', description='A tool to load weather data into BigQuery.', install_requires=beam_gcp_requirements + base_requirements,