Skip to content

Commit

Permalink
Metrics update return type and Error handling (#470)
Browse files Browse the repository at this point in the history
* Added a return type to avoid typehint errors

* version bump

* error handling in metrics AddMetrics

* lint fixes

* empty commit
  • Loading branch information
aniketsinghrawat authored Sep 3, 2024
1 parent f0ead39 commit 39c4925
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 25 deletions.
57 changes: 33 additions & 24 deletions weather_mv/loader_pipeline/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
import copy
import datetime
import inspect
import logging
from functools import wraps
import typing as t
import apache_beam as beam
from apache_beam.metrics import metric

logger = logging.getLogger(__name__)


def timeit(func_name: str, keyed_fn: bool = False):
"""Decorator to add time it takes for an element to be processed by a stage.
Expand Down Expand Up @@ -92,39 +96,44 @@ def wrapper(self, *args, **kwargs):
class AddTimer(beam.DoFn):
"""DoFn to add a empty time_dict per element in PCollection. This dict will stage_names as keys
and the time it took for that element in that stage."""
def process(self, element):
def process(self, element) -> t.Iterator[t.Any]:
time_dict = {}
yield element, time_dict


class AddMetrics(beam.DoFn):
"""DoFn to add Element Processing Time metric to beam. Expects PCollection to contain a time_dict."""

def __init__(self):
def __init__(self, asset_start_time_format: str = '%Y-%m-%dT%H:%M:%SZ'):
super().__init__()
self.element_processing_time = metric.Metrics.distribution('Time', 'element_processing_time_ms')
self.data_latency_time = metric.Metrics.distribution('Time', 'data_latency_time_ms')
self.asset_start_time_format = asset_start_time_format

def process(self, element):
if len(element) == 0:
raise ValueError("time_dict not found.")
(_, asset_start_time), time_dict = element
if not isinstance(time_dict, dict):
raise ValueError("time_dict not found.")

# Adding element processing time.
total_time = 0
for stage_time in time_dict.values():
total_time += stage_time

# Converting seconds to ms.
self.element_processing_time.update(int(total_time * 1000))

# Adding data latency.
if asset_start_time:
current_time = time.time()
asset_start_time = datetime.datetime.strptime(asset_start_time, '%Y-%m-%dT%H:%M:%SZ').timestamp()

# Converting seconds to ms.
data_latency_ms = (current_time - asset_start_time) * 1000
self.data_latency_time.update(int(data_latency_ms))
try:
if len(element) == 0:
raise ValueError("time_dict not found.")
(_, asset_start_time), time_dict = element
if not isinstance(time_dict, dict):
raise ValueError("time_dict not found.")

# Adding element processing time.
total_time = 0
for stage_time in time_dict.values():
total_time += stage_time

# Converting seconds to milli seconds.
self.element_processing_time.update(int(total_time * 1000))

# Adding data latency.
if asset_start_time:
current_time = time.time()
asset_start_time = datetime.datetime.strptime(
asset_start_time, self.asset_start_time_format).timestamp()

# Converting seconds to milli seconds.
data_latency_ms = (current_time - asset_start_time) * 1000
self.data_latency_time.update(int(data_latency_ms))
except Exception as e:
logger.warning(f"Some error occured while adding metrics. Error {e}")
2 changes: 1 addition & 1 deletion weather_mv/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
packages=find_packages(),
author='Anthromets',
author_email='[email protected]',
version='0.2.26',
version='0.2.27',
url='https://weather-tools.readthedocs.io/en/latest/weather_mv/',
description='A tool to load weather data into BigQuery.',
install_requires=beam_gcp_requirements + base_requirements,
Expand Down

0 comments on commit 39c4925

Please sign in to comment.