Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Google Cloud Monitoring metrics #480

Closed
wants to merge 12 commits into from
1 change: 1 addition & 0 deletions ci3.8.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ dependencies:
- cython==0.29.34
- earthengine-api==0.1.329
- git+https://github.com/dabhicusp/cdsapi-beta-google-weather-tools.git@master#egg=cdsapi # TODO([#474](https://github.com/google/weather-tools/issues/474)): Compatible cdsapi with weather-dl.
- google-cloud-monitoring
- .[test]
1 change: 1 addition & 0 deletions ci3.9.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ dependencies:
- cython==0.29.34
- earthengine-api==0.1.329
- git+https://github.com/dabhicusp/cdsapi-beta-google-weather-tools.git@master#egg=cdsapi # TODO([#474](https://github.com/google/weather-tools/issues/474)): Compatible cdsapi with weather-dl.
- google-cloud-monitoring
- .[test]
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ dependencies:
- firebase-admin==6.0.1
- setuptools==70.3.0
- git+https://github.com/dabhicusp/cdsapi-beta-google-weather-tools.git@master#egg=cdsapi # TODO([#474](https://github.com/google/weather-tools/issues/474)): Compatible cdsapi with weather-dl.
- google-cloud-monitoring
- .
- ./weather_dl
- ./weather_mv
Expand Down
6 changes: 3 additions & 3 deletions weather_mv/loader_pipeline/ee.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class ToEarthEngine(ToDataSink):
initialization_time_regex: str
forecast_time_regex: str
ingest_as_virtual_asset: bool
use_deflate:bool
use_deflate: bool
use_metrics: bool
topic: str

Expand Down Expand Up @@ -376,8 +376,8 @@ def expand(self, paths):
| 'IngestIntoEE' >> IngestIntoEETransform.from_kwargs(**vars(self))
)

if self.use_metrics:
output | 'AddMetrics' >> beam.ParDo(AddMetrics())
if self.use_metrics and not self.skip_region_validation:
output | 'AddMetrics' >> AddMetrics.from_kwargs(**vars(self))
else:
(
paths
Expand Down
96 changes: 93 additions & 3 deletions weather_mv/loader_pipeline/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,23 @@
import time
import typing as t
from collections import OrderedDict
from functools import wraps

import apache_beam as beam
from apache_beam.metrics import metric
from apache_beam.transforms import window, trigger
from functools import wraps
from google.cloud import monitoring_v3

from .sinks import get_file_time, KwargsFactoryMixin

logger = logging.getLogger(__name__)

# For Metrics API retry logic.
INITIAL_DELAY = 1.0 # Initial delay in seconds.
MAX_DELAY = 600 # Maximum delay before giving up in seconds.
NUM_RETRIES = 10 # Number of tries with exponential backoff.
TASK_QUEUE_WAIT_TIME = 120 # Task queue wait time in seconds.


def timeit(func_name: str, keyed_fn: bool = False):
"""Decorator to add time it takes for an element to be processed by a stage.
Expand Down Expand Up @@ -115,7 +123,7 @@ def process(self, element) -> t.Iterator[t.Any]:
yield element, time_dict


class AddMetrics(beam.DoFn):
class AddBeamMetrics(beam.DoFn):
"""DoFn to add Element Processing Time metric to beam. Expects PCollection
to contain a time_dict."""

Expand Down Expand Up @@ -170,8 +178,90 @@ def process(self, element):
logger.info(
f"{uri}: Time from {current_step} -> {next_step}: {step_time} seconds."
)

yield ("custom_metrics", (data_latency_ms / 1000, element_processing_time / 1000))
except Exception as e:
logger.warning(
f"Some error occured while adding metrics. Error {e}"
)


@dataclasses.dataclass
class CreateTimeSeries(beam.DoFn):
"""DoFn to write metrics TimeSeries data in Google Cloud Monitoring."""
job_name: str
project: str
region: str

def create_time_series(self, metric_name: str, metric_value: float) -> None:
"""Writes data to a Metrics TimeSeries."""
client = monitoring_v3.MetricServiceClient()
series = monitoring_v3.TimeSeries()
series.metric.type = f"custom.googleapis.com/{metric_name}"
series.metric.labels["description"] = metric_name
series.resource.type = "dataflow_job"
series.resource.labels["job_name"] = self.job_name
series.resource.labels["project_id"] = self.project
series.resource.labels["region"] = self.region

now = time.time()
seconds = int(now)
nanos = int((now - seconds) * 10**9)
interval = monitoring_v3.TimeInterval(
{"end_time": {"seconds": seconds, "nanos": nanos}}
)

point = monitoring_v3.Point(
{"interval": interval, "value": {"double_value": metric_value}}
)
series.points = [point]
client.create_time_series(
name=f"projects/{self.project}", time_series=[series]
)
logger.info(
f"Successfully created time series for {metric_name}. Metric value: {metric_value}."
)

def process(self, element: t.Any):
_, metric_values = element
data_latency_times = [x[0] for x in metric_values]
element_processing_times = [x[1] for x in metric_values]

logger.info(f"data_latency_time values: {data_latency_times}")
self.create_time_series("data_latency_time_max", max(data_latency_times))
self.create_time_series(
"data_latency_time_mean", sum(data_latency_times) / len(data_latency_times)
)

logger.info(f"element_processing_time values: {element_processing_times}")
self.create_time_series("element_processing_time_max", max(element_processing_times))
self.create_time_series(
"element_processing_time_mean", sum(element_processing_times) / len(element_processing_times)
)


@dataclasses.dataclass
class AddMetrics(beam.PTransform, KwargsFactoryMixin):
job_name: str
project: str
region: str

def expand(self, pcoll: beam.PCollection):
return (
pcoll
| "AddBeamMetrics" >> beam.ParDo(AddBeamMetrics())
| "AddTimestamps"
>> beam.Map(
lambda element: window.TimestampedValue(element, time.time())
)
| "Window"
>> beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterProcessingTime(5)),
accumulation_mode=trigger.AccumulationMode.DISCARDING,
)
| "GroupByKeyAndWindow" >> beam.GroupByKey(lambda element: element)
| "CreateTimeSeries"
>> beam.ParDo(
CreateTimeSeries(self.job_name, self.project, self.region)
)
)
16 changes: 15 additions & 1 deletion weather_mv/loader_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ def pattern_to_uris(match_pattern: str, is_zarr: bool = False) -> t.Iterable[str
yield from [x.path for x in match.metadata_list]


def arguments_to_dict(args: t.List[str]) -> t.Dict[str, str]:
"""Converts a list of arguments to a dictionary."""
result = {}
for i in range(0, len(args), 2):
key = args[i].lstrip("-")
value = args[i + 1]
result[key] = value
return result


def pipeline(known_args: argparse.Namespace, pipeline_args: t.List[str]) -> None:
all_uris = list(pattern_to_uris(known_args.uris, known_args.zarr))
if not all_uris:
Expand Down Expand Up @@ -75,7 +85,11 @@ def pipeline(known_args: argparse.Namespace, pipeline_args: t.List[str]) -> None
elif known_args.subcommand == 'regrid' or known_args.subcommand == 'rg':
paths | "Regrid" >> Regrid.from_kwargs(**vars(known_args))
elif known_args.subcommand == 'earthengine' or known_args.subcommand == 'ee':
paths | "MoveToEarthEngine" >> ToEarthEngine.from_kwargs(**vars(known_args))
# all_args will contain all the arguments passed to the pipeline.
all_args = {}
all_args.update(arguments_to_dict(pipeline_args))
all_args.update(**vars(known_args))
paths | "MoveToEarthEngine" >> ToEarthEngine.from_kwargs(**all_args)
else:
raise ValueError('invalid subcommand!')

Expand Down
Loading