Skip to content

Commit

Permalink
Add metrics to weather_mv ee. (#465)
Browse files Browse the repository at this point in the history
* Add metrics to weather_mv ee.

* clean up

* removed comments

* removed test data files

* added license, improved doc string

* removed unnecessary imports

* removed expand from ConvertToAsset

* nits

* version bump

* lint fixes

* test fixes

* fixed pytype
  • Loading branch information
aniketsinghrawat authored Jul 23, 2024
1 parent 2166b50 commit a9b8039
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 13 deletions.
3 changes: 2 additions & 1 deletion weather_mv/loader_pipeline/bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from apache_beam.io import WriteToBigQuery, BigQueryDisposition
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import window
from apache_beam.metrics import metric
from google.cloud import bigquery
from xarray.core.utils import ensure_us_time_resolution

Expand Down Expand Up @@ -294,7 +295,7 @@ def to_rows(self, coordinates: t.Iterable[t.Dict], ds: xr.Dataset, uri: str) ->
# 'row' ends up looking like:
# {'latitude': 88.0, 'longitude': 2.0, 'time': '2015-01-01 06:00:00', 'd': -2.0187, 'cc': 0.007812,
# 'z': 50049.8, 'data_import_time': '2020-12-05 00:12:02.424573 UTC', ...}
beam.metrics.Metrics.counter('Success', 'ExtractRows').inc()
metric.Metrics.counter('Success', 'ExtractRows').inc()
yield row

def chunks_to_rows(self, _, ds: xr.Dataset) -> t.Iterator[t.Dict]:
Expand Down
18 changes: 12 additions & 6 deletions weather_mv/loader_pipeline/ee.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import ee
import numpy as np
from apache_beam.io.filesystems import FileSystems
from apache_beam.metrics import metric
from apache_beam.io.gcp.gcsio import WRITE_CHUNK_SIZE
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils import retry
Expand All @@ -40,6 +41,7 @@

from .sinks import ToDataSink, open_dataset, open_local, KwargsFactoryMixin, upload
from .util import make_attrs_ee_compatible, RateLimit, validate_region, get_utc_timestamp
from .metrics import timeit, AddTimer, AddMetrics

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -353,10 +355,14 @@ def expand(self, paths):
if not self.dry_run:
(
paths
| 'AddTimer' >> beam.ParDo(AddTimer())
| 'FilterFiles' >> FilterFilesTransform.from_kwargs(**vars(self))
| 'ReshuffleFiles' >> beam.Reshuffle()
| 'ConvertToAsset' >> ConvertToAsset.from_kwargs(band_names_dict=band_names_dict, **vars(self))
| 'ConvertToAsset' >> beam.ParDo(
ConvertToAsset.from_kwargs(band_names_dict=band_names_dict, **vars(self))
)
| 'IngestIntoEE' >> IngestIntoEETransform.from_kwargs(**vars(self))
| 'AddMetrics' >> beam.ParDo(AddMetrics())
)
else:
(
Expand Down Expand Up @@ -402,6 +408,7 @@ def __init__(self,
self.ee_asset_type = ee_asset_type
self.force_overwrite = force

@timeit('FilterFileTransform')
def process(self, uri: str) -> t.Iterator[str]:
"""Yields uri if the asset does not already exist."""
self.check_setup()
Expand All @@ -423,7 +430,7 @@ def process(self, uri: str) -> t.Iterator[str]:


@dataclasses.dataclass
class ConvertToAsset(beam.DoFn, beam.PTransform, KwargsFactoryMixin):
class ConvertToAsset(beam.DoFn, KwargsFactoryMixin):
"""Writes asset after extracting input data and uploads it to GCS.
Attributes:
Expand Down Expand Up @@ -579,6 +586,7 @@ def get_dims_data(index: int) -> t.List[t.Any]:
self.add_to_queue(queue, asset_data)
self.add_to_queue(queue, None) # Indicates end of the subprocess.

@timeit('ConvertToAsset')
def process(self, uri: str) -> t.Iterator[AssetData]:
"""Opens grib files and yields AssetData.
Expand Down Expand Up @@ -610,9 +618,6 @@ def process(self, uri: str) -> t.Iterator[AssetData]:

process.terminate()

def expand(self, pcoll):
return pcoll | beam.FlatMap(self.process)


class IngestIntoEETransform(SetupEarthEngine, KwargsFactoryMixin):
"""Ingests asset into earth engine and yields asset id.
Expand Down Expand Up @@ -757,9 +762,10 @@ def start_ingestion(self, asset_data: AssetData) -> t.Optional[str]:
ee.data.deleteAsset(asset_name)
raise

@timeit('IngestIntoEE')
def process(self, asset_data: AssetData) -> t.Iterator[str]:
"""Uploads an asset into the earth engine."""
asset_id = self.start_ingestion(asset_data)
beam.metrics.Metrics.counter('Success', 'IngestIntoEE').inc()
metric.Metrics.counter('Success', 'IngestIntoEE').inc()

yield asset_id
8 changes: 4 additions & 4 deletions weather_mv/loader_pipeline/ee_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_convert_to_image_asset(self):
data_path = f'{self.test_data_folder}/test_data_grib_single_timestep'
asset_path = os.path.join(self.tmpdir.name, 'test_data_grib_single_timestep.tiff')

next(self.convert_to_image_asset.process(data_path))
next(self.convert_to_image_asset.process((data_path, {})))

# The size of tiff is expected to be more than grib.
self.assertTrue(os.path.getsize(asset_path) > os.path.getsize(data_path))
Expand All @@ -84,7 +84,7 @@ def test_convert_to_image_asset__with_multiple_grib_edition(self):
data_path = f'{self.test_data_folder}/test_data_grib_multiple_edition_single_timestep.bz2'
asset_path = os.path.join(self.tmpdir.name, 'test_data_grib_multiple_edition_single_timestep.tiff')

next(self.convert_to_image_asset.process(data_path))
next(self.convert_to_image_asset.process((data_path, {})))

# The size of tiff is expected to be more than grib.
self.assertTrue(os.path.getsize(asset_path) > os.path.getsize(data_path))
Expand All @@ -93,7 +93,7 @@ def test_convert_to_table_asset(self):
data_path = f'{self.test_data_folder}/test_data_grib_single_timestep'
asset_path = os.path.join(self.tmpdir.name, 'test_data_grib_single_timestep.csv')

next(self.convert_to_table_asset.process(data_path))
next(self.convert_to_table_asset.process((data_path, {})))

# The size of tiff is expected to be more than grib.
self.assertTrue(os.path.getsize(asset_path) > os.path.getsize(data_path))
Expand All @@ -102,7 +102,7 @@ def test_convert_to_table_asset__with_multiple_grib_edition(self):
data_path = f'{self.test_data_folder}/test_data_grib_multiple_edition_single_timestep.bz2'
asset_path = os.path.join(self.tmpdir.name, 'test_data_grib_multiple_edition_single_timestep.csv')

next(self.convert_to_table_asset.process(data_path))
next(self.convert_to_table_asset.process((data_path, {})))

# The size of tiff is expected to be more than grib.
self.assertTrue(os.path.getsize(asset_path) > os.path.getsize(data_path))
Expand Down
107 changes: 107 additions & 0 deletions weather_mv/loader_pipeline/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utilities for adding metrics to beam pipeline."""

import time
import copy
import inspect
from functools import wraps
import apache_beam as beam
from apache_beam.metrics import metric


def timeit(func_name: str, keyed_fn: bool = False):
"""Decorator to add time it takes for an element to be processed by a stage.
Args:
func_name: A unique name of the stage.
keyed_fn (optional): This has to be passed true if the input is adding keys to the element.
For example a stage like
class Shard(beam.DoFn):
@timeit('Sharding', keyed_fn=True)
def process(self,element):
key = randrange(10)
yield key, element
We are passing `keyed_fn=True` as we are adding a key to our element. Usually keys are added
to later group the element by a `GroupBy` stage.
"""
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
start_time = time.time()
time_dict = {}

# Only the first timer wrapper will have no time_dict.
# All subsequent wrappers can extract out the dict.
# args 0 would be a tuple.
if len(args[0]) == 1:
raise ValueError('time_dict not found.')

element, time_dict = args[0]
args = (element,) + args[1:]

if not isinstance(time_dict, dict):
raise ValueError('time_dict not found.')

# If the function is a generator, yield the output
# othewise return it.
if inspect.isgeneratorfunction(func):
for result in func(self, *args, **kwargs):
end_time = time.time()
processing_time = end_time - start_time
new_time_dict = copy.deepcopy(time_dict)
new_time_dict[func_name] = processing_time
if keyed_fn:
(key, element) = result
yield key, (element, new_time_dict)
else:
yield result, new_time_dict
else:
raise ValueError("Function is not a generator.")

return wrapper
return decorator


class AddTimer(beam.DoFn):
"""DoFn to add a empty time_dict per element in PCollection. This dict will stage_names as keys
and the time it took for that element in that stage."""
def process(self, element):
time_dict = {}
yield element, time_dict


class AddMetrics(beam.DoFn):
"""DoFn to add Element Processing Time metric to beam. Expects PCollection to contain a time_dict."""

def __init__(self):
super().__init__()
self.element_processing_time = metric.Metrics.distribution('Time', 'element_processing_time_ms')

def process(self, element):
if len(element) == 0:
raise ValueError("time_dict not found.")
_, time_dict = element
if not isinstance(time_dict, dict):
raise ValueError("time_dict not found.")

total_time = 0
for stage_time in time_dict.values():
total_time += stage_time

self.element_processing_time.update(int(total_time * 1000))
13 changes: 12 additions & 1 deletion weather_mv/loader_pipeline/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from xarray.core.utils import ensure_us_time_resolution

from .sinks import DEFAULT_COORD_KEYS
from .metrics import timeit

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -324,6 +325,16 @@ def _shard(elem, num_shards: int):
return (np.random.randint(0, num_shards), elem)


class Shard(beam.DoFn):
"""DoFn to shard elements into groups."""
def __init__(self, num_shards: int):
super().__init__()
self.num_shards = num_shards

@timeit('Sharding', keyed_fn=True)
def process(self, element, *args, **kwargs):
yield _shard(element, num_shards=self.num_shards)

class RateLimit(beam.PTransform, abc.ABC):
"""PTransform to extend to apply a global rate limit to an operation.
Expand Down Expand Up @@ -377,7 +388,7 @@ def process(self, elem: t.Any):

def expand(self, pcol: beam.PCollection):
return (pcol
| beam.Map(_shard, self._num_shards)
| beam.ParDo(Shard(num_shards=self._num_shards))
| beam.GroupByKey()
| beam.ParDo(
_RateLimitDoFn(self.process, self._latency_per_request)))
Expand Down
2 changes: 1 addition & 1 deletion weather_mv/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
packages=find_packages(),
author='Anthromets',
author_email='[email protected]',
version='0.2.24',
version='0.2.25',
url='https://weather-tools.readthedocs.io/en/latest/weather_mv/',
description='A tool to load weather data into BigQuery.',
install_requires=beam_gcp_requirements + base_requirements,
Expand Down

0 comments on commit a9b8039

Please sign in to comment.