Skip to content

Commit 448c860

Browse files
Add onboarder base class. (os-climate#144)
* Add onboarder base class with WISC as example of use. --------- Signed-off-by: Joe Moorhouse <[email protected]> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 7cd4a2f commit 448c860

File tree

4 files changed

+206
-115
lines changed

4 files changed

+206
-115
lines changed

src/hazard/onboard/wisc_european_winter_storm.py

+81-90
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,54 @@
1-
import glob
21
import logging
32
import os
4-
from pathlib import Path
5-
from typing import Any, Dict, Iterable, List, Optional
3+
from pathlib import Path, PurePath
4+
from typing import Iterable, Optional
65
import warnings
76
import zipfile
87

9-
from dask.distributed import Client
10-
from fsspec.implementations.local import LocalFileSystem
118
from fsspec.spec import AbstractFileSystem
129
import numpy as np
1310
import xarray as xr
1411

15-
# import cartopy.crs as ccrs
16-
# import matplotlib.pyplot as plt
17-
import glob
1812
from scipy.optimize import curve_fit
1913

20-
from hazard.indicator_model import IndicatorModel
2114
from hazard.inventory import Colormap, HazardResource, MapInfo, Scenario
22-
from hazard.protocols import OpenDataset, ReadWriteDataArray
15+
from hazard.onboarder import Onboarder
16+
from hazard.protocols import WriteDataArray
2317
from hazard.sources.osc_zarr import OscZarr
2418
from hazard.utilities.tiles import create_tiles_for_resource
2519
from hazard.utilities.xarray_utilities import data_array
2620

2721
logger = logging.getLogger(__name__)
2822

2923

30-
class WISCWinterStormEventSource(OpenDataset):
31-
def __init__(self, source_dir: str, fs: Optional[AbstractFileSystem] = None):
32-
"""Source that can create WISC return period wind speed maps from the event set:
24+
class WISCEuropeanWinterStorm(Onboarder):
25+
def __init__(
26+
self,
27+
source_dir_base: PurePath = PurePath(),
28+
fs: Optional[AbstractFileSystem] = None,
29+
):
30+
super().__init__(source_dir_base, fs)
31+
"""
32+
Peak 3s gust wind speed for different return periods inferred from the WISC event set.
33+
34+
METADATA:
35+
Link: https://cds.climate.copernicus.eu/datasets/sis-european-wind-storm-synthetic-events?tab=overview
36+
Data type: Synthetic European winter storm events.
37+
Hazard indicator: Wind
38+
Region: Europe
39+
Resolution: 2.4 arcmin
40+
Scenarios: Historical
41+
Time range: Not applicable
42+
File type: NetCDF
43+
44+
DATA DESCRIPTION:
45+
The WISC dataset contains a set of synthetic windstorm events consisting of 22,980 individual
46+
storm footprints over Europe. These are a physically realistic set of plausible windstorm
47+
events based on the modelled climatic conditions, calculated using the Met Office HadGEM3 model
48+
(Global Atmosphere 3 and Global Land 3 configurations).
49+
Return period maps of peak gust wind speed are inferred from this data.
50+
51+
Special thanks to Annabel Hall; her work on investigating the fitting of the WISC data set is adapted hereunder.
3352
https://cds.climate.copernicus.eu/datasets/sis-european-wind-storm-synthetic-events?tab=overview
3453
https://cds.climate.copernicus.eu/how-to-api
3554
@@ -39,8 +58,6 @@ def __init__(self, source_dir: str, fs: Optional[AbstractFileSystem] = None):
3958
fs (Optional[AbstractFileSystem], optional): AbstractFileSystem instance.
4059
If None, a LocalFileSystem is used.
4160
"""
42-
self.fs = fs if fs else LocalFileSystem()
43-
self.source_dir = source_dir
4461
self.years = [
4562
1986,
4663
1987,
@@ -72,10 +89,41 @@ def __init__(self, source_dir: str, fs: Optional[AbstractFileSystem] = None):
7289
# synth_sets = [1.2, 2.0, 3.0]
7390
self.synth_set = 1.2
7491

75-
def prepare_source_files(self, working_dir: Path):
76-
self._download_all(working_dir)
92+
def prepare(self, working_dir: Path, force_download: bool = False):
93+
# self._download_all(working_dir)
7794
self._extract_all(working_dir)
7895

96+
def onboard(self, target: WriteDataArray):
97+
logger.info("Creating data set from events")
98+
resource = self._resource()
99+
for scenario in resource.scenarios:
100+
for year in scenario.years:
101+
ds = self._peak_annual_gust_speed_fit()
102+
# note that the co-ordinates will be written into the parent of resource.path
103+
target.write(
104+
resource.path.format(scenario=scenario.id, year=year),
105+
ds["wind_speed"].compute(),
106+
spatial_coords=resource.store_netcdf_coords,
107+
)
108+
109+
def create_maps(self, source: OscZarr, target: OscZarr):
110+
"""Create map images."""
111+
for resource in self.inventory():
112+
create_tiles_for_resource(
113+
source,
114+
target,
115+
resource,
116+
nodata_as_zero=True,
117+
nodata_as_zero_coarsening=True,
118+
)
119+
120+
def inventory(self) -> Iterable[HazardResource]:
121+
"""Get the inventory item(s)."""
122+
return [self._resource()]
123+
124+
def source_dir_from_base(self, source_dir_base):
125+
return source_dir_base / "wisc" / "v1"
126+
79127
def _download_all(self, working_dir: Path):
80128
try:
81129
import cdsapi
@@ -109,11 +157,17 @@ def _extract_all(self, working_dir: Path):
109157
with zipfile.ZipFile(
110158
str(working_dir / (set_name + ".zip")), "r"
111159
) as zip_ref:
112-
zip_ref.extractall(
113-
str(working_dir / str(self.synth_set).replace(".", "_") / str(year))
160+
synth_set_year = PurePath(
161+
str(self.synth_set).replace(".", "_"), str(year)
162+
)
163+
zip_ref.extractall(working_dir / synth_set_year)
164+
self.fs.upload(
165+
str(working_dir / synth_set_year),
166+
str(self.source_dir / synth_set_year),
167+
recursive=True,
114168
)
115169

116-
def occurrence_exceedance_count(self):
170+
def _occurrence_exceedance_count(self):
117171
"""Calculate occurrence exceedance count (i.e. number of events). Only used for diagnostic purposes.
118172
119173
Returns:
@@ -155,7 +209,7 @@ def occurrence_exceedance_count(self):
155209
exceedance_count[i, :, :] += count
156210
return exceedance_count
157211

158-
def peak_annual_gust_speed(self):
212+
def _peak_annual_gust_speed(self):
159213
first_file = self._file_list(self.years[0])[0]
160214
all_files = [f for y in self.years for f in self._file_list(y)]
161215
first = xr.open_dataset(first_file)
@@ -203,7 +257,7 @@ def _gev_icdf(self, p, mu, xi, sigma):
203257
x_p = mu - (sigma / xi) * (1 - (-np.log(1 - p)) ** (-xi))
204258
return x_p
205259

206-
def fit_gumbel(self, annual_max_gust_speed: xr.DataArray):
260+
def _fit_gumbel(self, annual_max_gust_speed: xr.DataArray):
207261
"""See for example "A review of methods to calculate extreme wind speeds", Palutikof et al. (1999).
208262
https://rmets.onlinelibrary.wiley.com/doi/pdf/10.1017/S1350482799001103
209263
we fit to a Type I extreme distribution, where $X_T$ is the 1-in-T year 3s peak gust wind speed:
@@ -260,7 +314,7 @@ def fit_gumbel(self, annual_max_gust_speed: xr.DataArray):
260314
fitted_speeds[:, j, i] = alpha * -np.log(-np.log(cum_prob)) + beta
261315
return fitted_speeds
262316

263-
def fit_gev(self, exceedance_count: xr.DataArray, p_tail: float = 1 / 5):
317+
def _fit_gev(self, exceedance_count: xr.DataArray, p_tail: float = 1 / 5):
264318
"""For reference, but fit_gumbel is preferred."""
265319
return_periods = np.array([5, 10, 20, 50, 100, 200, 500])
266320
data = np.zeros(
@@ -307,7 +361,7 @@ def fit_gev(self, exceedance_count: xr.DataArray, p_tail: float = 1 / 5):
307361
return fitted_speeds
308362

309363
def _file_list(self, year: int):
310-
return glob.glob(
364+
return self.fs.glob(
311365
str(
312366
Path(self.source_dir)
313367
/ str(self.synth_set).replace(".", "_")
@@ -316,76 +370,13 @@ def _file_list(self, year: int):
316370
)
317371
)
318372

319-
def open_dataset_year(
320-
self, gcm: str, scenario: str, quantity: str, year: int, chunks=None
321-
) -> xr.Dataset:
373+
def _peak_annual_gust_speed_fit(self) -> xr.Dataset:
322374
logger.info("Computing peak annual 3s gust wind speeds")
323-
peak_annual_gust_speed = self.peak_annual_gust_speed()
375+
peak_annual_gust_speed = self._peak_annual_gust_speed()
324376
# any deferred behaviour undesirable here
325377
peak_annual_gust_speed = peak_annual_gust_speed.compute()
326378
logger.info("Fitting peak wind speeds")
327-
return self.fit_gumbel(peak_annual_gust_speed).to_dataset(name="wind_speed")
328-
329-
330-
class WISCEuropeanWinterStorm(IndicatorModel[str]):
331-
def __init__(self):
332-
"""
333-
Peak 3s gust wind speed for different return periods inferred from the WISC event set.
334-
335-
METADATA:
336-
Link: https://cds.climate.copernicus.eu/datasets/sis-european-wind-storm-synthetic-events?tab=overview
337-
Data type: Synthetic European winter storm events.
338-
Hazard indicator: Wind
339-
Region: Europe
340-
Resolution: 2.4 arcmin
341-
Scenarios: Historical
342-
Time range: Not applicable
343-
File type: NetCDF
344-
345-
DATA DESCRIPTION:
346-
The WISC dataset contains a set of synthetic windstorm events consisting of 22,980 individual
347-
storm footprints over Europe. These are a physically realistic set of plausible windstorm
348-
events based on the modelled climatic conditions, calculated using the Met Office HadGEM3 model
349-
(Global Atmosphere 3 and Global Land 3 configurations).
350-
Return period maps of peak gust wind speed are inferred from this data.
351-
352-
Special thanks to Annabel Hall; her work on investigating the fitting of the WISC data set is adapted hereunder.
353-
"""
354-
355-
def batch_items(self):
356-
"""Get a list of all batch items."""
357-
return ["historical"]
358-
359-
def run_single(
360-
self, item: str, source: Any, target: ReadWriteDataArray, client: Client
361-
):
362-
assert isinstance(source, WISCWinterStormEventSource)
363-
logger.info("Creating data set from events")
364-
resource = self._resource()
365-
for scenario in resource.scenarios:
366-
for year in scenario.years:
367-
ds = source.open_dataset_year("", scenario.id, "", year)
368-
# note that the co-ordinates will be written into the parent of resource.path
369-
target.write(
370-
resource.path.format(scenario=scenario.id, year=year),
371-
ds["wind_speed"].compute(),
372-
spatial_coords=resource.store_netcdf_coords,
373-
)
374-
375-
def create_maps(self, source: OscZarr, target: OscZarr):
376-
"""Create map images."""
377-
for resource in self.inventory():
378-
create_tiles_for_resource(
379-
source,
380-
target,
381-
resource,
382-
nodata_as_zero=True,
383-
nodata_as_zero_coarsening=True,
384-
)
385-
386-
def inventory(self) -> Iterable[HazardResource]:
387-
"""Get the inventory item(s)."""
388-
return [self._resource()]
379+
return self._fit_gumbel(peak_annual_gust_speed).to_dataset(name="wind_speed")
389380

390381
def _resource(self) -> HazardResource:
391382
"""Create resource."""

src/hazard/onboarder.py

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
from abc import ABC, abstractmethod
2+
import logging
3+
from pathlib import Path, PurePath
4+
from fsspec import AbstractFileSystem
5+
from fsspec.implementations.local import LocalFileSystem
6+
from typing import Iterable, Optional, Union
7+
8+
from hazard.inventory import HazardResource
9+
from hazard.protocols import WriteDataArray
10+
from hazard.sources.osc_zarr import OscZarr
11+
from hazard.utilities.tiles import create_tiles_for_resource
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
class Onboarder(ABC):
17+
"""Onboard a hazard indicator data set into physrisk. Typically this is a pre-existing data set
18+
which requires a transformation which might be a simple conversion into physrisk conventions
19+
or something more complex.
20+
"""
21+
22+
def __init__(
23+
self,
24+
source_dir_base: Union[str, PurePath] = PurePath(),
25+
fs: Optional[AbstractFileSystem] = None,
26+
):
27+
"""Create Onboarder instance. The original file set is the starting point on the
28+
onboarding. This is stored in the (abstract) file system specified.
29+
30+
Args:
31+
source_dir_base (Union[str, PurePath], optional): Base path of the source files.
32+
fs (Optional[AbstractFileSystem], optional): File system for storing source files.
33+
Defaults to None.
34+
"""
35+
self.source_dir = self.source_dir_from_base(
36+
Path(source_dir_base)
37+
if isinstance(source_dir_base, str)
38+
else source_dir_base
39+
)
40+
self.fs = fs if fs else LocalFileSystem()
41+
42+
def create_maps(self, source: OscZarr, target: OscZarr):
43+
for resource in self.inventory():
44+
create_tiles_for_resource(
45+
source,
46+
target,
47+
resource,
48+
nodata_as_zero=True,
49+
nodata_as_zero_coarsening=True,
50+
)
51+
52+
@abstractmethod
53+
def inventory(self) -> Iterable[HazardResource]:
54+
"""Return resource(s) to add to Inventory."""
55+
...
56+
57+
def is_source_dir_populated(self) -> bool:
58+
"""Return True if source_dir is already populated."""
59+
return self.fs.exists(self.source_dir) and any(self.fs.ls(self.source_dir))
60+
61+
@abstractmethod
62+
def onboard(self, target: WriteDataArray):
63+
"""Onboard the data, reading from the file source and writing to the target provided.
64+
65+
Args:
66+
target (WriteDataArray): Hazard indicators are written to this target.
67+
"""
68+
...
69+
70+
@abstractmethod
71+
def prepare(self, working_dir: Path, force_download: bool = True):
72+
"""Create the source files in source_dir_base using abstract file system fs. Typically
73+
this might involve downloading, unzipping and rearranging the input files. The intent is
74+
to retain the data lineage.
75+
76+
Args:
77+
working_dir (Path): Path to local working directory for any temporary downloading and unzipping prior to
78+
copy to source directory.
79+
force_download (bool, optional): If False and feature is supported, files will not be re-downloaded. Defaults to True.
80+
"""
81+
...
82+
83+
@abstractmethod
84+
def source_dir_from_base(self, source_dir_base: PurePath) -> PurePath:
85+
"""Return source_dir relative to source_dir_base."""
86+
...

src/hazard/protocols.py

-18
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import typing
21
from typing import Iterable, List, Optional, Protocol
32

43
import xarray as xr
@@ -40,20 +39,3 @@ def write( # noqa:E704
4039

4140

4241
class ReadWriteDataArray(ReadDataArray, WriteDataArray): ... # noqa: E701
43-
44-
45-
class WriteDataset(Protocol):
46-
"""Write DataArray."""
47-
48-
def write(self, path: str, dataset: xr.Dataset): ... # noqa:E704
49-
50-
51-
T = typing.TypeVar("T")
52-
53-
54-
class PTransform(Protocol):
55-
def batch_items(self) -> Iterable[T]: ... # noqa:E704
56-
57-
def process_item(self, item: T) -> xr.DataArray: ... # noqa:E704
58-
59-
def item_path(self, item: T) -> str: ... # noqa:E704

0 commit comments

Comments
 (0)