Skip to content

Commit

Permalink
Updated README file. (#485)
Browse files Browse the repository at this point in the history
  • Loading branch information
j9sh264 authored Oct 16, 2024
1 parent 88fffbd commit 1cb4d28
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 2 deletions.
2 changes: 2 additions & 0 deletions weather_mv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ _Command options_:
* `--group_common_hypercubes`: A flag that allows to split up large grib files into multiple level-wise ImageCollections / COGS.
* `--use_deflate`: A flag that allows you to use deflate algorithm for beter compression. Using deflate compression
takes extra time in COG creation. Default:False.
* `--use_metrics`: A flag that allows you to add Beam metrics to the pipeline. Default: False.
* `--use_monitoring_metrics`: A flag that allows you to to add Google Cloud Monitoring metrics to the pipeline. Default: False.

Invoke with `ee -h` or `earthengine --help` to see the full range of options.

Expand Down
4 changes: 4 additions & 0 deletions weather_mv/loader_pipeline/ee.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ class ToEarthEngine(ToDataSink):
use_metrics: bool
use_monitoring_metrics: bool
topic: str
# Pipeline arguments.
job_name: str
project: str
region: str

@classmethod
def add_parser_arguments(cls, subparser: argparse.ArgumentParser):
Expand Down
7 changes: 5 additions & 2 deletions weather_mv/loader_pipeline/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class AddBeamMetrics(beam.DoFn):
"""DoFn to add Element Processing Time metric to beam. Expects PCollection
to contain a time_dict."""

def __init__(self, asset_start_time_format: str = "%Y-%m-%dT%H:%M:%SZ"):
def __init__(self, asset_start_time_format: str):
super().__init__()
self.element_processing_time = metric.Metrics.distribution(
"Time", "element_processing_time_ms"
Expand Down Expand Up @@ -258,13 +258,16 @@ def process(self, element: t.Any):

@dataclasses.dataclass
class AddMetrics(beam.PTransform, KwargsFactoryMixin):
"""A custom transform to add metrics to the pipeline."""

job_name: str
project: str
region: str
use_monitoring_metrics: bool
asset_start_time_format: str = "%Y-%m-%dT%H:%M:%SZ"

def expand(self, pcoll: beam.PCollection):
metrics = pcoll | "AddBeamMetrics" >> beam.ParDo(AddBeamMetrics())
metrics = pcoll | "AddBeamMetrics" >> beam.ParDo(AddBeamMetrics(self.asset_start_time_format))
if self.use_monitoring_metrics:
(
metrics
Expand Down

0 comments on commit 1cb4d28

Please sign in to comment.