From 59dd4274279a3d51e87bb3b7a0ff8a1917305926 Mon Sep 17 00:00:00 2001 From: Jash Rana Date: Fri, 6 Sep 2024 12:20:40 +0000 Subject: [PATCH 01/11] Added monitoring metrics for the EE pipeline. --- weather_mv/loader_pipeline/ee.py | 4 +- weather_mv/loader_pipeline/metrics.py | 64 +++++++++++++++++++++----- weather_mv/loader_pipeline/pipeline.py | 16 ++++++- 3 files changed, 70 insertions(+), 14 deletions(-) diff --git a/weather_mv/loader_pipeline/ee.py b/weather_mv/loader_pipeline/ee.py index 081fae17..b8636df0 100644 --- a/weather_mv/loader_pipeline/ee.py +++ b/weather_mv/loader_pipeline/ee.py @@ -375,8 +375,8 @@ def expand(self, paths): | 'IngestIntoEE' >> IngestIntoEETransform.from_kwargs(**vars(self)) ) - if self.use_metrics: - output | 'AddMetrics' >> beam.ParDo(AddMetrics()) + if self.use_metrics and not self.skip_region_validation: + output | 'AddMetrics' >> beam.ParDo(AddMetrics.from_kwargs(**vars(self))) else: ( paths diff --git a/weather_mv/loader_pipeline/metrics.py b/weather_mv/loader_pipeline/metrics.py index 2e2e219a..29278802 100644 --- a/weather_mv/loader_pipeline/metrics.py +++ b/weather_mv/loader_pipeline/metrics.py @@ -14,15 +14,19 @@ """Utilities for adding metrics to beam pipeline.""" -import time import copy import datetime import inspect import logging -from functools import wraps +import time import typing as t + import apache_beam as beam from apache_beam.metrics import metric +from functools import wraps +from google.cloud import monitoring_v3 + +from .sinks import KwargsFactoryMixin logger = logging.getLogger(__name__) @@ -45,13 +49,13 @@ def process(self,element): We are passing `keyed_fn=True` as we are adding a key to our element. Usually keys are added to later group the element by a `GroupBy` stage. """ + def decorator(func): @wraps(func) def wrapper(self, *args, **kwargs): # If metrics are turned off, don't do anything. - if ( - not hasattr(self, 'use_metrics') or - (hasattr(self, 'use_metrics') and not self.use_metrics) + if not hasattr(self, "use_metrics") or ( + hasattr(self, "use_metrics") and not self.use_metrics ): for result in func(self, *args, **kwargs): yield result @@ -65,13 +69,13 @@ def wrapper(self, *args, **kwargs): # All subsequent wrappers can extract out the dict. # args 0 would be a tuple. if len(args[0]) == 1: - raise ValueError('time_dict not found.') + raise ValueError("time_dict not found.") element, time_dict = args[0] args = (element,) + args[1:] if not isinstance(time_dict, dict): - raise ValueError('time_dict not found.') + raise ValueError("time_dict not found.") # If the function is a generator, yield the output # othewise return it. @@ -90,26 +94,62 @@ def wrapper(self, *args, **kwargs): raise ValueError("Function is not a generator.") return wrapper + return decorator class AddTimer(beam.DoFn): """DoFn to add a empty time_dict per element in PCollection. This dict will stage_names as keys and the time it took for that element in that stage.""" + def process(self, element) -> t.Iterator[t.Any]: time_dict = {} yield element, time_dict -class AddMetrics(beam.DoFn): +class AddMetrics(beam.DoFn, KwargsFactoryMixin): """DoFn to add Element Processing Time metric to beam. Expects PCollection to contain a time_dict.""" - def __init__(self, asset_start_time_format: str = '%Y-%m-%dT%H:%M:%SZ'): + def __init__( + self, + job_name: str, + project: str, + region: str, + asset_start_time_format: str = "%Y-%m-%dT%H:%M:%SZ", + ): super().__init__() - self.element_processing_time = metric.Metrics.distribution('Time', 'element_processing_time_ms') - self.data_latency_time = metric.Metrics.distribution('Time', 'data_latency_time_ms') + self.project = project + self.region = region + self.job_name = job_name + # These are the Apache Beam metrics. + self.element_processing_time = metric.Metrics.distribution("Time", "element_processing_time_ms") + self.data_latency_time = metric.Metrics.distribution("Time", "data_latency_time_ms") self.asset_start_time_format = asset_start_time_format + def create_time_series(self, metric_name: str, metric_value: float) -> None: + """Creates or adds data to a TimeSeries.""" + client = monitoring_v3.MetricServiceClient() + series = monitoring_v3.TimeSeries() + series.metric.type = f"custom.googleapis.com/{metric_name}" + series.metric.labels["description"] = metric_name + series.resource.type = "dataflow_job" + series.resource.labels["job_name"] = self.job_name + series.resource.labels["region"] = self.region + + now = time.time() + seconds = int(now) + nanos = int((now - seconds) * 10**9) + interval = monitoring_v3.TimeInterval( + {"end_time": {"seconds": seconds, "nanos": nanos}} + ) + + point = monitoring_v3.Point( + {"interval": interval, "value": {"double_value": metric_value}} + ) + series.points = [point] + client.create_time_series(name=self.project, time_series=[series]) + logger.info(f"Successfully created time series for {metric_name} at {now}.") + def process(self, element): try: if len(element) == 0: @@ -125,6 +165,7 @@ def process(self, element): # Converting seconds to milli seconds. self.element_processing_time.update(int(total_time * 1000)) + self.create_time_series("element_processing_time_ms", int(total_time * 1000)) # Adding data latency. if asset_start_time: @@ -135,5 +176,6 @@ def process(self, element): # Converting seconds to milli seconds. data_latency_ms = (current_time - asset_start_time) * 1000 self.data_latency_time.update(int(data_latency_ms)) + self.create_time_series("data_latency_time_ms", int(data_latency_ms)) except Exception as e: logger.warning(f"Some error occured while adding metrics. Error {e}") diff --git a/weather_mv/loader_pipeline/pipeline.py b/weather_mv/loader_pipeline/pipeline.py index ef685473..d0a135ae 100644 --- a/weather_mv/loader_pipeline/pipeline.py +++ b/weather_mv/loader_pipeline/pipeline.py @@ -47,6 +47,16 @@ def pattern_to_uris(match_pattern: str, is_zarr: bool = False) -> t.Iterable[str yield from [x.path for x in match.metadata_list] +def arguments_to_dict(args: t.List[str]) -> t.Dict[str, str]: + """Converts a list of arguments to a dictionary.""" + result = {} + for i in range(0, len(args), 2): + key = args[i].lstrip("-") + value = args[i + 1] + result[key] = value + return result + + def pipeline(known_args: argparse.Namespace, pipeline_args: t.List[str]) -> None: all_uris = list(pattern_to_uris(known_args.uris, known_args.zarr)) if not all_uris: @@ -75,7 +85,11 @@ def pipeline(known_args: argparse.Namespace, pipeline_args: t.List[str]) -> None elif known_args.subcommand == 'regrid' or known_args.subcommand == 'rg': paths | "Regrid" >> Regrid.from_kwargs(**vars(known_args)) elif known_args.subcommand == 'earthengine' or known_args.subcommand == 'ee': - paths | "MoveToEarthEngine" >> ToEarthEngine.from_kwargs(**vars(known_args)) + # all_args will contain all the arguments passed to the pipeline. + all_args = {} + all_args.update(arguments_to_dict(pipeline_args)) + all_args.update(**vars(known_args)) + paths | "MoveToEarthEngine" >> ToEarthEngine.from_kwargs(**all_args) else: raise ValueError('invalid subcommand!') From 73bff19115774f4565dc0a4c50ee6a915b91c583 Mon Sep 17 00:00:00 2001 From: Jash Rana Date: Wed, 11 Sep 2024 13:14:42 +0000 Subject: [PATCH 02/11] Converted metrics units from ms to seconds. --- weather_mv/loader_pipeline/metrics.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/weather_mv/loader_pipeline/metrics.py b/weather_mv/loader_pipeline/metrics.py index 29278802..868d13c6 100644 --- a/weather_mv/loader_pipeline/metrics.py +++ b/weather_mv/loader_pipeline/metrics.py @@ -134,6 +134,7 @@ def create_time_series(self, metric_name: str, metric_value: float) -> None: series.metric.labels["description"] = metric_name series.resource.type = "dataflow_job" series.resource.labels["job_name"] = self.job_name + series.resource.labels["project_id"] = self.project series.resource.labels["region"] = self.region now = time.time() @@ -147,8 +148,8 @@ def create_time_series(self, metric_name: str, metric_value: float) -> None: {"interval": interval, "value": {"double_value": metric_value}} ) series.points = [point] - client.create_time_series(name=self.project, time_series=[series]) - logger.info(f"Successfully created time series for {metric_name} at {now}.") + client.create_time_series(name=f"projects/{self.project}", time_series=[series]) + logger.info(f"Successfully created time series for {metric_name} at {now}. Metric value: {metric_value}.") def process(self, element): try: @@ -165,7 +166,7 @@ def process(self, element): # Converting seconds to milli seconds. self.element_processing_time.update(int(total_time * 1000)) - self.create_time_series("element_processing_time_ms", int(total_time * 1000)) + self.create_time_series("element_processing_time", int(total_time)) # Adding data latency. if asset_start_time: @@ -176,6 +177,6 @@ def process(self, element): # Converting seconds to milli seconds. data_latency_ms = (current_time - asset_start_time) * 1000 self.data_latency_time.update(int(data_latency_ms)) - self.create_time_series("data_latency_time_ms", int(data_latency_ms)) + self.create_time_series("data_latency_time", int(data_latency_ms / 1000)) except Exception as e: - logger.warning(f"Some error occured while adding metrics. Error {e}") + logger.warning(f"Some error occured while adding metrics. Error: {e}") From 7eb872ba4662c2843394c4ee197be3e4353bf634 Mon Sep 17 00:00:00 2001 From: Jash Rana Date: Fri, 13 Sep 2024 06:29:54 +0000 Subject: [PATCH 03/11] Added retry logic incase of API failures. --- weather_mv/loader_pipeline/metrics.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/weather_mv/loader_pipeline/metrics.py b/weather_mv/loader_pipeline/metrics.py index 868d13c6..8f95fb69 100644 --- a/weather_mv/loader_pipeline/metrics.py +++ b/weather_mv/loader_pipeline/metrics.py @@ -23,6 +23,7 @@ import apache_beam as beam from apache_beam.metrics import metric +from apache_beam.utils import retry from functools import wraps from google.cloud import monitoring_v3 @@ -30,6 +31,12 @@ logger = logging.getLogger(__name__) +# For Metrics API retry logic. +INITIAL_DELAY = 1.0 # Initial delay in seconds. +MAX_DELAY = 600 # Maximum delay before giving up in seconds. +NUM_RETRIES = 10 # Number of tries with exponential backoff. +TASK_QUEUE_WAIT_TIME = 120 # Task queue wait time in seconds. + def timeit(func_name: str, keyed_fn: bool = False): """Decorator to add time it takes for an element to be processed by a stage. @@ -126,7 +133,14 @@ def __init__( self.data_latency_time = metric.Metrics.distribution("Time", "data_latency_time_ms") self.asset_start_time_format = asset_start_time_format + @retry.with_exponential_backoff( + num_retries=NUM_RETRIES, + logger=logger.warning, + initial_delay_secs=INITIAL_DELAY, + max_delay_secs=MAX_DELAY + ) def create_time_series(self, metric_name: str, metric_value: float) -> None: + logger.info("Using the retry logic.") """Creates or adds data to a TimeSeries.""" client = monitoring_v3.MetricServiceClient() series = monitoring_v3.TimeSeries() @@ -149,7 +163,7 @@ def create_time_series(self, metric_name: str, metric_value: float) -> None: ) series.points = [point] client.create_time_series(name=f"projects/{self.project}", time_series=[series]) - logger.info(f"Successfully created time series for {metric_name} at {now}. Metric value: {metric_value}.") + logger.info(f"Successfully created time series for {metric_name}. Metric value: {metric_value}.") def process(self, element): try: From 79d4566ad267c52612a3c8a3ee822ce99d6e53a6 Mon Sep 17 00:00:00 2001 From: Jash Rana Date: Sun, 15 Sep 2024 19:42:20 +0000 Subject: [PATCH 04/11] Added a window of 5 secs for the metrics. --- weather_mv/loader_pipeline/ee.py | 11 +++- weather_mv/loader_pipeline/metrics.py | 91 +++++++++++++++------------ 2 files changed, 59 insertions(+), 43 deletions(-) diff --git a/weather_mv/loader_pipeline/ee.py b/weather_mv/loader_pipeline/ee.py index b8636df0..0254c807 100644 --- a/weather_mv/loader_pipeline/ee.py +++ b/weather_mv/loader_pipeline/ee.py @@ -33,6 +33,7 @@ from apache_beam.metrics import metric from apache_beam.io.gcp.gcsio import WRITE_CHUNK_SIZE from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.transforms import window from apache_beam.utils import retry from google.auth import compute_engine, default, credentials from google.auth.transport import requests @@ -41,7 +42,7 @@ from .sinks import ToDataSink, open_dataset, open_local, KwargsFactoryMixin, upload from .util import make_attrs_ee_compatible, RateLimit, validate_region, get_utc_timestamp -from .metrics import timeit, AddTimer, AddMetrics +from .metrics import timeit, AddTimer, AddMetrics, Add5SecMetrics logger = logging.getLogger(__name__) @@ -376,7 +377,13 @@ def expand(self, paths): ) if self.use_metrics and not self.skip_region_validation: - output | 'AddMetrics' >> beam.ParDo(AddMetrics.from_kwargs(**vars(self))) + ( + output + | 'AddMetrics' >> beam.ParDo(AddMetrics.from_kwargs(**vars(self))) + | '5SecWindows' >> beam.WindowInto(window.FixedWindows(5)) + | 'Group' >> beam.GroupByKey(lambda x: x) + | 'Add5SecMetrics' >> beam.ParDo(Add5SecMetrics.from_kwargs(**vars(self))) + ) else: ( paths diff --git a/weather_mv/loader_pipeline/metrics.py b/weather_mv/loader_pipeline/metrics.py index 8f95fb69..a8192f8e 100644 --- a/weather_mv/loader_pipeline/metrics.py +++ b/weather_mv/loader_pipeline/metrics.py @@ -117,30 +117,57 @@ def process(self, element) -> t.Iterator[t.Any]: class AddMetrics(beam.DoFn, KwargsFactoryMixin): """DoFn to add Element Processing Time metric to beam. Expects PCollection to contain a time_dict.""" + def __init__(self, asset_start_time_format: str = '%Y-%m-%dT%H:%M:%SZ'): + super().__init__() + self.element_processing_time = metric.Metrics.distribution('Time', 'element_processing_time_ms') + self.data_latency_time = metric.Metrics.distribution('Time', 'data_latency_time_ms') + self.asset_start_time_format = asset_start_time_format + + def process(self, element): + try: + if len(element) == 0: + raise ValueError("time_dict not found.") + (_, asset_start_time), time_dict = element + if not isinstance(time_dict, dict): + raise ValueError("time_dict not found.") + + # Adding element processing time. + total_time = 0 + for stage_time in time_dict.values(): + total_time += stage_time + + # Converting seconds to milli seconds. + self.element_processing_time.update(int(total_time * 1000)) + + # Adding data latency. + if asset_start_time: + current_time = time.time() + asset_start_time = datetime.datetime.strptime( + asset_start_time, self.asset_start_time_format).timestamp() + + # Converting seconds to milli seconds. + data_latency_ms = (current_time - asset_start_time) * 1000 + self.data_latency_time.update(int(data_latency_ms)) + + yield beam.window.TimestampedValue(('data_latency_time', data_latency_ms / 1000), int(current_time)) + yield beam.window.TimestampedValue(('element_processing_time', total_time), int(current_time)) + except Exception as e: + logger.warning(f"Some error occured while adding metrics. Error {e}") + + +class Add5SecMetrics(beam.DoFn, KwargsFactoryMixin): def __init__( self, job_name: str, project: str, region: str, - asset_start_time_format: str = "%Y-%m-%dT%H:%M:%SZ", ): super().__init__() self.project = project self.region = region self.job_name = job_name - # These are the Apache Beam metrics. - self.element_processing_time = metric.Metrics.distribution("Time", "element_processing_time_ms") - self.data_latency_time = metric.Metrics.distribution("Time", "data_latency_time_ms") - self.asset_start_time_format = asset_start_time_format - @retry.with_exponential_backoff( - num_retries=NUM_RETRIES, - logger=logger.warning, - initial_delay_secs=INITIAL_DELAY, - max_delay_secs=MAX_DELAY - ) def create_time_series(self, metric_name: str, metric_value: float) -> None: - logger.info("Using the retry logic.") """Creates or adds data to a TimeSeries.""" client = monitoring_v3.MetricServiceClient() series = monitoring_v3.TimeSeries() @@ -165,32 +192,14 @@ def create_time_series(self, metric_name: str, metric_value: float) -> None: client.create_time_series(name=f"projects/{self.project}", time_series=[series]) logger.info(f"Successfully created time series for {metric_name}. Metric value: {metric_value}.") - def process(self, element): - try: - if len(element) == 0: - raise ValueError("time_dict not found.") - (_, asset_start_time), time_dict = element - if not isinstance(time_dict, dict): - raise ValueError("time_dict not found.") - - # Adding element processing time. - total_time = 0 - for stage_time in time_dict.values(): - total_time += stage_time - - # Converting seconds to milli seconds. - self.element_processing_time.update(int(total_time * 1000)) - self.create_time_series("element_processing_time", int(total_time)) - - # Adding data latency. - if asset_start_time: - current_time = time.time() - asset_start_time = datetime.datetime.strptime( - asset_start_time, self.asset_start_time_format).timestamp() - - # Converting seconds to milli seconds. - data_latency_ms = (current_time - asset_start_time) * 1000 - self.data_latency_time.update(int(data_latency_ms)) - self.create_time_series("data_latency_time", int(data_latency_ms / 1000)) - except Exception as e: - logger.warning(f"Some error occured while adding metrics. Error: {e}") + def process( + self, + element: t.Any, + timestamp=beam.DoFn.TimestampParam, + window=beam.DoFn.WindowParam, + ): + logger.info(f'Window ends at {window.max_timestamp().to_utc_datetime()}') + logger.info(f'Timestamp: {timestamp}') + logger.info(f"{element[0]} values: {element[1]}") + self.create_time_series(f"{element[0]}_max", max(element[1])) + self.create_time_series(f"{element[0]}_mean", sum(element[1]) / len(element[1])) From 5941252f24118e31058b3414c51f19000fad038e Mon Sep 17 00:00:00 2001 From: Jash Rana Date: Mon, 16 Sep 2024 05:10:04 +0000 Subject: [PATCH 05/11] Assigned elements to global window. --- weather_mv/loader_pipeline/metrics.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/weather_mv/loader_pipeline/metrics.py b/weather_mv/loader_pipeline/metrics.py index a8192f8e..79332d82 100644 --- a/weather_mv/loader_pipeline/metrics.py +++ b/weather_mv/loader_pipeline/metrics.py @@ -198,8 +198,6 @@ def process( timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam, ): - logger.info(f'Window ends at {window.max_timestamp().to_utc_datetime()}') - logger.info(f'Timestamp: {timestamp}') logger.info(f"{element[0]} values: {element[1]}") self.create_time_series(f"{element[0]}_max", max(element[1])) self.create_time_series(f"{element[0]}_mean", sum(element[1]) / len(element[1])) From 2977c9702d8b4bad6430d9224f930fff9ddc87c3 Mon Sep 17 00:00:00 2001 From: Jash Rana Date: Wed, 18 Sep 2024 15:58:35 +0000 Subject: [PATCH 06/11] Combined the metric transforms. --- weather_mv/loader_pipeline/ee.py | 8 +--- weather_mv/loader_pipeline/metrics.py | 54 ++++++++++++++++----------- 2 files changed, 34 insertions(+), 28 deletions(-) diff --git a/weather_mv/loader_pipeline/ee.py b/weather_mv/loader_pipeline/ee.py index 0254c807..92dc531f 100644 --- a/weather_mv/loader_pipeline/ee.py +++ b/weather_mv/loader_pipeline/ee.py @@ -377,13 +377,7 @@ def expand(self, paths): ) if self.use_metrics and not self.skip_region_validation: - ( - output - | 'AddMetrics' >> beam.ParDo(AddMetrics.from_kwargs(**vars(self))) - | '5SecWindows' >> beam.WindowInto(window.FixedWindows(5)) - | 'Group' >> beam.GroupByKey(lambda x: x) - | 'Add5SecMetrics' >> beam.ParDo(Add5SecMetrics.from_kwargs(**vars(self))) - ) + output | 'AddMetrics' >> AddMetrics.from_kwargs(**vars(self)) else: ( paths diff --git a/weather_mv/loader_pipeline/metrics.py b/weather_mv/loader_pipeline/metrics.py index 79332d82..1b9b9599 100644 --- a/weather_mv/loader_pipeline/metrics.py +++ b/weather_mv/loader_pipeline/metrics.py @@ -15,6 +15,7 @@ """Utilities for adding metrics to beam pipeline.""" import copy +import dataclasses import datetime import inspect import logging @@ -23,7 +24,7 @@ import apache_beam as beam from apache_beam.metrics import metric -from apache_beam.utils import retry +from apache_beam.transforms import window from functools import wraps from google.cloud import monitoring_v3 @@ -114,7 +115,7 @@ def process(self, element) -> t.Iterator[t.Any]: yield element, time_dict -class AddMetrics(beam.DoFn, KwargsFactoryMixin): +class AddBeamMetrics(beam.DoFn): """DoFn to add Element Processing Time metric to beam. Expects PCollection to contain a time_dict.""" def __init__(self, asset_start_time_format: str = '%Y-%m-%dT%H:%M:%SZ'): @@ -149,23 +150,17 @@ def process(self, element): data_latency_ms = (current_time - asset_start_time) * 1000 self.data_latency_time.update(int(data_latency_ms)) - yield beam.window.TimestampedValue(('data_latency_time', data_latency_ms / 1000), int(current_time)) - yield beam.window.TimestampedValue(('element_processing_time', total_time), int(current_time)) + yield ('data_latency_time', data_latency_ms / 1000) + yield ('element_processing_time', total_time) except Exception as e: logger.warning(f"Some error occured while adding metrics. Error {e}") -class Add5SecMetrics(beam.DoFn, KwargsFactoryMixin): - def __init__( - self, - job_name: str, - project: str, - region: str, - ): - super().__init__() - self.project = project - self.region = region - self.job_name = job_name +@dataclasses.dataclass +class CreateTimeSeries(beam.DoFn): + job_name: str + project: str + region: str def create_time_series(self, metric_name: str, metric_value: float) -> None: """Creates or adds data to a TimeSeries.""" @@ -194,10 +189,27 @@ def create_time_series(self, metric_name: str, metric_value: float) -> None: def process( self, - element: t.Any, - timestamp=beam.DoFn.TimestampParam, - window=beam.DoFn.WindowParam, + element: t.Any ): - logger.info(f"{element[0]} values: {element[1]}") - self.create_time_series(f"{element[0]}_max", max(element[1])) - self.create_time_series(f"{element[0]}_mean", sum(element[1]) / len(element[1])) + metric_name, metric_values = element + logger.info(f"{metric_name} values: {metric_values}") + self.create_time_series(f"{metric_name}_max", max(metric_values)) + self.create_time_series(f"{metric_name}_mean", sum(metric_values) / len(metric_values)) + + +@dataclasses.dataclass +class AddMetrics(beam.PTransform, KwargsFactoryMixin): + job_name: str + project: str + region: str + + def expand(self, pcol: beam.PCollection): + return ( + pcol + | 'AddBeamMetrics' >> beam.ParDo(AddBeamMetrics()) + | 'AddTimestamps' >> beam.Map(lambda element: beam.window.TimestampedValue(element, time.time())) + | 'Window' >> beam.WindowInto(window.FixedWindows(5)) + | 'GroupByKeyAndWindow' >> beam.GroupByKey(lambda element: element) + | 'CreateTimeSeries' >> beam.ParDo(CreateTimeSeries( + self.job_name, self.project, self.region)) + ) From f65c55c4282509ab28b07a33f2cbd8ee2d1139b0 Mon Sep 17 00:00:00 2001 From: Jash Rana Date: Wed, 18 Sep 2024 16:19:35 +0000 Subject: [PATCH 07/11] Removed unneccassary import. --- weather_mv/loader_pipeline/ee.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weather_mv/loader_pipeline/ee.py b/weather_mv/loader_pipeline/ee.py index 92dc531f..d64a8623 100644 --- a/weather_mv/loader_pipeline/ee.py +++ b/weather_mv/loader_pipeline/ee.py @@ -42,7 +42,7 @@ from .sinks import ToDataSink, open_dataset, open_local, KwargsFactoryMixin, upload from .util import make_attrs_ee_compatible, RateLimit, validate_region, get_utc_timestamp -from .metrics import timeit, AddTimer, AddMetrics, Add5SecMetrics +from .metrics import timeit, AddTimer, AddMetrics logger = logging.getLogger(__name__) From 8b3405ac4ab0e2c08fbaf4cf027dc0ea92c3aa7c Mon Sep 17 00:00:00 2001 From: Jash Rana Date: Thu, 19 Sep 2024 07:10:12 +0000 Subject: [PATCH 08/11] Added a trigger to repeatedly dump all the elements in the window every 5 seconds. --- weather_mv/loader_pipeline/metrics.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/weather_mv/loader_pipeline/metrics.py b/weather_mv/loader_pipeline/metrics.py index 1b9b9599..9816a350 100644 --- a/weather_mv/loader_pipeline/metrics.py +++ b/weather_mv/loader_pipeline/metrics.py @@ -24,7 +24,7 @@ import apache_beam as beam from apache_beam.metrics import metric -from apache_beam.transforms import window +from apache_beam.transforms import window, trigger from functools import wraps from google.cloud import monitoring_v3 @@ -207,9 +207,15 @@ def expand(self, pcol: beam.PCollection): return ( pcol | 'AddBeamMetrics' >> beam.ParDo(AddBeamMetrics()) - | 'AddTimestamps' >> beam.Map(lambda element: beam.window.TimestampedValue(element, time.time())) - | 'Window' >> beam.WindowInto(window.FixedWindows(5)) + | 'AddTimestamps' >> beam.Map( + lambda element: window.TimestampedValue(element, time.time())) + | 'Window' >> beam.WindowInto( + window.GlobalWindows(), + trigger=trigger.Repeatedly( + trigger.AfterProcessingTime(5) + ), + accumulation_mode=trigger.AccumulationMode.DISCARDING) | 'GroupByKeyAndWindow' >> beam.GroupByKey(lambda element: element) - | 'CreateTimeSeries' >> beam.ParDo(CreateTimeSeries( - self.job_name, self.project, self.region)) + | 'CreateTimeSeries' >> beam.ParDo( + CreateTimeSeries(self.job_name, self.project, self.region)) ) From b050eb426464dc260839e4ea0fb01061edd32f03 Mon Sep 17 00:00:00 2001 From: Jash Rana Date: Thu, 19 Sep 2024 12:35:35 +0000 Subject: [PATCH 09/11] Yielded the metrics in a single call. --- weather_mv/loader_pipeline/metrics.py | 82 +++++++++++++++++---------- 1 file changed, 51 insertions(+), 31 deletions(-) diff --git a/weather_mv/loader_pipeline/metrics.py b/weather_mv/loader_pipeline/metrics.py index 9816a350..0cc6c709 100644 --- a/weather_mv/loader_pipeline/metrics.py +++ b/weather_mv/loader_pipeline/metrics.py @@ -118,10 +118,14 @@ def process(self, element) -> t.Iterator[t.Any]: class AddBeamMetrics(beam.DoFn): """DoFn to add Element Processing Time metric to beam. Expects PCollection to contain a time_dict.""" - def __init__(self, asset_start_time_format: str = '%Y-%m-%dT%H:%M:%SZ'): + def __init__(self, asset_start_time_format: str = "%Y-%m-%dT%H:%M:%SZ"): super().__init__() - self.element_processing_time = metric.Metrics.distribution('Time', 'element_processing_time_ms') - self.data_latency_time = metric.Metrics.distribution('Time', 'data_latency_time_ms') + self.element_processing_time = metric.Metrics.distribution( + "Time", "element_processing_time_ms" + ) + self.data_latency_time = metric.Metrics.distribution( + "Time", "data_latency_time_ms" + ) self.asset_start_time_format = asset_start_time_format def process(self, element): @@ -144,26 +148,29 @@ def process(self, element): if asset_start_time: current_time = time.time() asset_start_time = datetime.datetime.strptime( - asset_start_time, self.asset_start_time_format).timestamp() + asset_start_time, self.asset_start_time_format + ).timestamp() # Converting seconds to milli seconds. data_latency_ms = (current_time - asset_start_time) * 1000 self.data_latency_time.update(int(data_latency_ms)) - yield ('data_latency_time', data_latency_ms / 1000) - yield ('element_processing_time', total_time) + yield ("custom_metrics", (data_latency_ms / 1000, total_time)) except Exception as e: - logger.warning(f"Some error occured while adding metrics. Error {e}") + logger.warning( + f"Some error occured while adding metrics. Error {e}" + ) @dataclasses.dataclass class CreateTimeSeries(beam.DoFn): + """DoFn to write metrics TimeSeries data in Google Cloud Monitoring.""" job_name: str project: str region: str def create_time_series(self, metric_name: str, metric_value: float) -> None: - """Creates or adds data to a TimeSeries.""" + """Writes data to a Metrics TimeSeries.""" client = monitoring_v3.MetricServiceClient() series = monitoring_v3.TimeSeries() series.metric.type = f"custom.googleapis.com/{metric_name}" @@ -184,17 +191,26 @@ def create_time_series(self, metric_name: str, metric_value: float) -> None: {"interval": interval, "value": {"double_value": metric_value}} ) series.points = [point] - client.create_time_series(name=f"projects/{self.project}", time_series=[series]) - logger.info(f"Successfully created time series for {metric_name}. Metric value: {metric_value}.") + client.create_time_series( + name=f"projects/{self.project}", time_series=[series] + ) + logger.info( + f"Successfully created time series for {metric_name}. Metric value: {metric_value}." + ) - def process( - self, - element: t.Any - ): - metric_name, metric_values = element - logger.info(f"{metric_name} values: {metric_values}") - self.create_time_series(f"{metric_name}_max", max(metric_values)) - self.create_time_series(f"{metric_name}_mean", sum(metric_values) / len(metric_values)) + def process(self, element: t.Any): + _, (data_latency_times, element_processing_times) = element + logger.info(f"data_latency_time values: {data_latency_times}") + self.create_time_series(f"data_latency_time_max", max(data_latency_times)) + self.create_time_series( + f"data_latency_time_mean", sum(data_latency_times) / len(data_latency_times) + ) + + logger.info(f"element_processing_time values: {element_processing_times}") + self.create_time_series(f"element_processing_time_max", max(element_processing_times)) + self.create_time_series( + f"element_processing_time_mean", sum(element_processing_times) / len(element_processing_times) + ) @dataclasses.dataclass @@ -203,19 +219,23 @@ class AddMetrics(beam.PTransform, KwargsFactoryMixin): project: str region: str - def expand(self, pcol: beam.PCollection): + def expand(self, pcoll: beam.PCollection): return ( - pcol - | 'AddBeamMetrics' >> beam.ParDo(AddBeamMetrics()) - | 'AddTimestamps' >> beam.Map( - lambda element: window.TimestampedValue(element, time.time())) - | 'Window' >> beam.WindowInto( + pcoll + | "AddBeamMetrics" >> beam.ParDo(AddBeamMetrics()) + | "AddTimestamps" + >> beam.Map( + lambda element: window.TimestampedValue(element, time.time()) + ) + | "Window" + >> beam.WindowInto( window.GlobalWindows(), - trigger=trigger.Repeatedly( - trigger.AfterProcessingTime(5) - ), - accumulation_mode=trigger.AccumulationMode.DISCARDING) - | 'GroupByKeyAndWindow' >> beam.GroupByKey(lambda element: element) - | 'CreateTimeSeries' >> beam.ParDo( - CreateTimeSeries(self.job_name, self.project, self.region)) + trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5)), + accumulation_mode=trigger.AccumulationMode.DISCARDING, + ) + | "GroupByKeyAndWindow" >> beam.GroupByKey(lambda element: element) + | "CreateTimeSeries" + >> beam.ParDo( + CreateTimeSeries(self.job_name, self.project, self.region) + ) ) From babf1d8f352dbf30e6b04c43c05ad59f54a2fa04 Mon Sep 17 00:00:00 2001 From: Jash Rana Date: Fri, 20 Sep 2024 07:50:26 +0000 Subject: [PATCH 10/11] Unpacked metric values correctly. --- weather_mv/loader_pipeline/metrics.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/weather_mv/loader_pipeline/metrics.py b/weather_mv/loader_pipeline/metrics.py index c8be804a..a5f53b05 100644 --- a/weather_mv/loader_pipeline/metrics.py +++ b/weather_mv/loader_pipeline/metrics.py @@ -225,7 +225,10 @@ def create_time_series(self, metric_name: str, metric_value: float) -> None: ) def process(self, element: t.Any): - _, (data_latency_times, element_processing_times) = element + _, metric_values = element + data_latency_times = [x[0] for x in metric_values] + element_processing_times = [x[1] for x in metric_values] + logger.info(f"data_latency_time values: {data_latency_times}") self.create_time_series(f"data_latency_time_max", max(data_latency_times)) self.create_time_series( From 84ed6bfcc048b584b423b59dae67ff369e81ff81 Mon Sep 17 00:00:00 2001 From: Jash Rana Date: Fri, 20 Sep 2024 09:41:08 +0000 Subject: [PATCH 11/11] Changes for linting and building correctly. --- ci3.8.yml | 1 + ci3.9.yml | 1 + environment.yml | 1 + weather_mv/loader_pipeline/ee.py | 3 +-- weather_mv/loader_pipeline/metrics.py | 11 ++++------- 5 files changed, 8 insertions(+), 9 deletions(-) diff --git a/ci3.8.yml b/ci3.8.yml index ab040765..c8dab6a0 100644 --- a/ci3.8.yml +++ b/ci3.8.yml @@ -37,4 +37,5 @@ dependencies: - cython==0.29.34 - earthengine-api==0.1.329 - git+https://github.com/dabhicusp/cdsapi-beta-google-weather-tools.git@master#egg=cdsapi # TODO([#474](https://github.com/google/weather-tools/issues/474)): Compatible cdsapi with weather-dl. + - google-cloud-monitoring - .[test] diff --git a/ci3.9.yml b/ci3.9.yml index 85cb9cd4..c6257a19 100644 --- a/ci3.9.yml +++ b/ci3.9.yml @@ -37,4 +37,5 @@ dependencies: - cython==0.29.34 - earthengine-api==0.1.329 - git+https://github.com/dabhicusp/cdsapi-beta-google-weather-tools.git@master#egg=cdsapi # TODO([#474](https://github.com/google/weather-tools/issues/474)): Compatible cdsapi with weather-dl. + - google-cloud-monitoring - .[test] diff --git a/environment.yml b/environment.yml index 7325a2f8..2331c428 100644 --- a/environment.yml +++ b/environment.yml @@ -31,6 +31,7 @@ dependencies: - firebase-admin==6.0.1 - setuptools==70.3.0 - git+https://github.com/dabhicusp/cdsapi-beta-google-weather-tools.git@master#egg=cdsapi # TODO([#474](https://github.com/google/weather-tools/issues/474)): Compatible cdsapi with weather-dl. + - google-cloud-monitoring - . - ./weather_dl - ./weather_mv diff --git a/weather_mv/loader_pipeline/ee.py b/weather_mv/loader_pipeline/ee.py index 019436bc..50f4c26c 100644 --- a/weather_mv/loader_pipeline/ee.py +++ b/weather_mv/loader_pipeline/ee.py @@ -33,7 +33,6 @@ from apache_beam.metrics import metric from apache_beam.io.gcp.gcsio import WRITE_CHUNK_SIZE from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.transforms import window from apache_beam.utils import retry from google.auth import compute_engine, default, credentials from google.auth.transport import requests @@ -251,7 +250,7 @@ class ToEarthEngine(ToDataSink): initialization_time_regex: str forecast_time_regex: str ingest_as_virtual_asset: bool - use_deflate:bool + use_deflate: bool use_metrics: bool topic: str diff --git a/weather_mv/loader_pipeline/metrics.py b/weather_mv/loader_pipeline/metrics.py index a5f53b05..25258173 100644 --- a/weather_mv/loader_pipeline/metrics.py +++ b/weather_mv/loader_pipeline/metrics.py @@ -22,7 +22,6 @@ import time import typing as t from collections import OrderedDict -from functools import wraps import apache_beam as beam from apache_beam.metrics import metric @@ -30,8 +29,6 @@ from functools import wraps from google.cloud import monitoring_v3 -from .sinks import KwargsFactoryMixin - from .sinks import get_file_time, KwargsFactoryMixin logger = logging.getLogger(__name__) @@ -230,15 +227,15 @@ def process(self, element: t.Any): element_processing_times = [x[1] for x in metric_values] logger.info(f"data_latency_time values: {data_latency_times}") - self.create_time_series(f"data_latency_time_max", max(data_latency_times)) + self.create_time_series("data_latency_time_max", max(data_latency_times)) self.create_time_series( - f"data_latency_time_mean", sum(data_latency_times) / len(data_latency_times) + "data_latency_time_mean", sum(data_latency_times) / len(data_latency_times) ) logger.info(f"element_processing_time values: {element_processing_times}") - self.create_time_series(f"element_processing_time_max", max(element_processing_times)) + self.create_time_series("element_processing_time_max", max(element_processing_times)) self.create_time_series( - f"element_processing_time_mean", sum(element_processing_times) / len(element_processing_times) + "element_processing_time_mean", sum(element_processing_times) / len(element_processing_times) )