Skip to content

Commit 8ef194f

Browse files
author
Joe Hamman
authored
WIP: Compute==False for to_zarr and to_netcdf (#1811)
* move backend append logic to the prepare_variable methods * deprecate variables/dimensions/attrs properties on AbstractWritableDataStore * warnings instead of errors for backend properties * use attrs.update when setting zarr attributes * more performance improvements to attributes in zarr backend * fix typo * new set_dimensions method for writable data stores * more fixes for zarr * more tests for zarr and remove append logic for zarr * more tests for zarr and remove append logic for zarr * a few more tweaks to zarr attrs * Add encode methods to writable data stores, fixes for Zarr tests * fix for InMemoryDataStore * fix for unlimited dimensions Scipy Datastores * another patch for scipy * whatsnew * initial commit returning dask futures from to_netcdf and to_zarr methods * ordereddict * address some of rabernats comments, in particular, this commit removes the _DIMENSION_KEY from the zarr_group.attrs * stop skipping zero-dim zarr tests * update minimum zarr version for tests * cleanup a bit before adding tests * tempoary checkin * cleanup implementation of compute=False for to_foo functions, still needs additional tests * docs and more tests, failing tests on h5netcdf backend only * skip h5netcdf/netcdf4 tests in certain places * remove spurious returns * finalize stores when compute=False * more docs, skip h5netcdf netcdf tests, raise informative error for h5netcdf and scipy * cleanup whats-new * reorg dask task graph when using compute=False and save_mfdataset * move compute_false tests to DaskTests class * small doc/style fixes * save api.py
1 parent 9f58d50 commit 8ef194f

File tree

10 files changed

+147
-39
lines changed

10 files changed

+147
-39
lines changed

doc/dask.rst

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,21 @@ Once you've manipulated a dask array, you can still write a dataset too big to
100100
fit into memory back to disk by using :py:meth:`~xarray.Dataset.to_netcdf` in the
101101
usual way.
102102

103+
.. ipython:: python
104+
105+
ds.to_netcdf('manipulated-example-data.nc')
106+
107+
By setting the ``compute`` argument to ``False``, :py:meth:`~xarray.Dataset.to_netcdf`
108+
will return a dask delayed object that can be computed later.
109+
110+
.. ipython:: python
111+
112+
from dask.diagnostics import ProgressBar
113+
# or distributed.progress when using the distributed scheduler
114+
delayed_obj = ds.to_netcdf('manipulated-example-data.nc', compute=False)
115+
with ProgressBar():
116+
results = delayed_obj.compute()
117+
103118
.. note::
104119

105120
When using dask's distributed scheduler to write NETCDF4 files,

doc/whats-new.rst

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,19 @@ Enhancements
6969
- ``plot.line()`` learned new kwargs: ``xincrease``, ``yincrease`` that change the direction of the respective axes.
7070
By `Deepak Cherian <https://github.com/dcherian>`_.
7171

72+
- Added the ``parallel`` option to :py:func:`open_mfdataset`. This option uses
73+
``dask.delayed`` to parallelize the open and preprocessing steps within
74+
``open_mfdataset``. This is expected to provide performance improvements when
75+
opening many files, particularly when used in conjunction with dask's
76+
multiprocessing or distributed schedulers (:issue:`1981`).
77+
By `Joe Hamman <https://github.com/jhamman>`_.
78+
79+
- New ``compute`` option in :py:meth:`~xarray.Dataset.to_netcdf`,
80+
:py:meth:`~xarray.Dataset.to_zarr`, and :py:func:`~xarray.save_mfdataset` to
81+
allow for the lazy computation of netCDF and zarr stores. This feature is
82+
currently only supported by the netCDF4 and zarr backends. (:issue:`1784`).
83+
By `Joe Hamman <https://github.com/jhamman>`_.
84+
7285
Bug fixes
7386
~~~~~~~~~
7487

@@ -104,12 +117,6 @@ The minor release includes a number of bug-fixes and backwards compatible enhanc
104117
Enhancements
105118
~~~~~~~~~~~~
106119

107-
- Added the ``parallel`` option to :py:func:`open_mfdataset`. This option uses
108-
``dask.delayed`` to parallelize the open and preprocessing steps within
109-
``open_mfdataset``. This is expected to provide performance improvements when
110-
opening many files, particularly when used in conjunction with dask's
111-
multiprocessing or distributed schedulers (:issue:`1981`).
112-
By `Joe Hamman <https://github.com/jhamman>`_.
113120
- :py:meth:`~xarray.DataArray.isin` and :py:meth:`~xarray.Dataset.isin` methods,
114121
which test each value in the array for whether it is contained in the
115122
supplied list, returning a bool array. See :ref:`selecting values with isin`

xarray/backends/api.py

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,13 @@ def _get_lock(engine, scheduler, format, path_or_file):
144144
return lock
145145

146146

147+
def _finalize_store(write, store):
148+
""" Finalize this store by explicitly syncing and closing"""
149+
del write # ensure writing is done first
150+
store.sync()
151+
store.close()
152+
153+
147154
def open_dataset(filename_or_obj, group=None, decode_cf=True,
148155
mask_and_scale=True, decode_times=True, autoclose=False,
149156
concat_characters=True, decode_coords=True, engine=None,
@@ -620,7 +627,8 @@ def open_mfdataset(paths, chunks=None, concat_dim=_CONCAT_DIM_DEFAULT,
620627

621628

622629
def to_netcdf(dataset, path_or_file=None, mode='w', format=None, group=None,
623-
engine=None, writer=None, encoding=None, unlimited_dims=None):
630+
engine=None, writer=None, encoding=None, unlimited_dims=None,
631+
compute=True):
624632
"""This function creates an appropriate datastore for writing a dataset to
625633
disk as a netCDF file
626634
@@ -680,19 +688,22 @@ def to_netcdf(dataset, path_or_file=None, mode='w', format=None, group=None,
680688
unlimited_dims = dataset.encoding.get('unlimited_dims', None)
681689
try:
682690
dataset.dump_to_store(store, sync=sync, encoding=encoding,
683-
unlimited_dims=unlimited_dims)
691+
unlimited_dims=unlimited_dims, compute=compute)
684692
if path_or_file is None:
685693
return target.getvalue()
686694
finally:
687695
if sync and isinstance(path_or_file, basestring):
688696
store.close()
689697

698+
if not compute:
699+
import dask
700+
return dask.delayed(_finalize_store)(store.delayed_store, store)
701+
690702
if not sync:
691703
return store
692704

693-
694705
def save_mfdataset(datasets, paths, mode='w', format=None, groups=None,
695-
engine=None):
706+
engine=None, compute=True):
696707
"""Write multiple datasets to disk as netCDF files simultaneously.
697708
698709
This function is intended for use with datasets consisting of dask.array
@@ -742,6 +753,9 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None,
742753
default engine is chosen based on available dependencies, with a
743754
preference for 'netcdf4' if writing to a file on disk.
744755
See `Dataset.to_netcdf` for additional information.
756+
compute: boolean
757+
If true compute immediately, otherwise return a
758+
``dask.delayed.Delayed`` object that can be computed later.
745759
746760
Examples
747761
--------
@@ -769,11 +783,17 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None,
769783
'datasets, paths and groups arguments to '
770784
'save_mfdataset')
771785

772-
writer = ArrayWriter()
773-
stores = [to_netcdf(ds, path, mode, format, group, engine, writer)
786+
writer = ArrayWriter() if compute else None
787+
stores = [to_netcdf(ds, path, mode, format, group, engine, writer,
788+
compute=compute)
774789
for ds, path, group in zip(datasets, paths, groups)]
790+
791+
if not compute:
792+
import dask
793+
return dask.delayed(stores)
794+
775795
try:
776-
writer.sync()
796+
delayed = writer.sync(compute=compute)
777797
for store in stores:
778798
store.sync()
779799
finally:
@@ -782,7 +802,7 @@ def save_mfdataset(datasets, paths, mode='w', format=None, groups=None,
782802

783803

784804
def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None,
785-
encoding=None):
805+
encoding=None, compute=True):
786806
"""This function creates an appropriate datastore for writing a dataset to
787807
a zarr ztore
788808
@@ -803,5 +823,9 @@ def to_zarr(dataset, store=None, mode='w-', synchronizer=None, group=None,
803823

804824
# I think zarr stores should always be sync'd immediately
805825
# TODO: figure out how to properly handle unlimited_dims
806-
dataset.dump_to_store(store, sync=True, encoding=encoding)
826+
dataset.dump_to_store(store, sync=True, encoding=encoding, compute=compute)
827+
828+
if not compute:
829+
import dask
830+
return dask.delayed(_finalize_store)(store.delayed_store, store)
807831
return store

xarray/backends/common.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,19 +264,23 @@ def add(self, source, target):
264264
else:
265265
target[...] = source
266266

267-
def sync(self):
267+
def sync(self, compute=True):
268268
if self.sources:
269269
import dask.array as da
270-
da.store(self.sources, self.targets, lock=self.lock)
270+
delayed_store = da.store(self.sources, self.targets,
271+
lock=self.lock, compute=compute,
272+
flush=True)
271273
self.sources = []
272274
self.targets = []
275+
return delayed_store
273276

274277

275278
class AbstractWritableDataStore(AbstractDataStore):
276279
def __init__(self, writer=None, lock=HDF5_LOCK):
277280
if writer is None:
278281
writer = ArrayWriter(lock=lock)
279282
self.writer = writer
283+
self.delayed_store = None
280284

281285
def encode(self, variables, attributes):
282286
"""
@@ -318,11 +322,11 @@ def set_attribute(self, k, v): # pragma: no cover
318322
def set_variable(self, k, v): # pragma: no cover
319323
raise NotImplementedError
320324

321-
def sync(self):
325+
def sync(self, compute=True):
322326
if self._isopen and self._autoclose:
323327
# datastore will be reopened during write
324328
self.close()
325-
self.writer.sync()
329+
self.delayed_store = self.writer.sync(compute=compute)
326330

327331
def store_dataset(self, dataset):
328332
"""

xarray/backends/h5netcdf_.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,12 @@ def prepare_variable(self, name, variable, check_encoding=False,
212212

213213
return target, variable.data
214214

215-
def sync(self):
215+
def sync(self, compute=True):
216+
if not compute:
217+
raise NotImplementedError(
218+
'compute=False is not supported for the h5netcdf backend yet')
216219
with self.ensure_open(autoclose=True):
217-
super(H5NetCDFStore, self).sync()
220+
super(H5NetCDFStore, self).sync(compute=compute)
218221
self.ds.sync()
219222

220223
def close(self):

xarray/backends/netCDF4_.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,9 +439,9 @@ def prepare_variable(self, name, variable, check_encoding=False,
439439

440440
return target, variable.data
441441

442-
def sync(self):
442+
def sync(self, compute=True):
443443
with self.ensure_open(autoclose=True):
444-
super(NetCDF4DataStore, self).sync()
444+
super(NetCDF4DataStore, self).sync(compute=compute)
445445
self.ds.sync()
446446

447447
def close(self):

xarray/backends/scipy_.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,9 +219,12 @@ def prepare_variable(self, name, variable, check_encoding=False,
219219

220220
return target, data
221221

222-
def sync(self):
222+
def sync(self, compute=True):
223+
if not compute:
224+
raise NotImplementedError(
225+
'compute=False is not supported for the scipy backend yet')
223226
with self.ensure_open(autoclose=True):
224-
super(ScipyDataStore, self).sync()
227+
super(ScipyDataStore, self).sync(compute=compute)
225228
self.ds.flush()
226229

227230
def close(self):

xarray/backends/zarr.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,8 +342,8 @@ def store(self, variables, attributes, *args, **kwargs):
342342
AbstractWritableDataStore.store(self, variables, attributes,
343343
*args, **kwargs)
344344

345-
def sync(self):
346-
self.writer.sync()
345+
def sync(self, compute=True):
346+
self.delayed_store = self.writer.sync(compute=compute)
347347

348348

349349
def open_zarr(store, group=None, synchronizer=None, auto_chunk=True,

xarray/core/dataset.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,7 +1055,7 @@ def reset_coords(self, names=None, drop=False, inplace=False):
10551055
return obj
10561056

10571057
def dump_to_store(self, store, encoder=None, sync=True, encoding=None,
1058-
unlimited_dims=None):
1058+
unlimited_dims=None, compute=True):
10591059
"""Store dataset contents to a backends.*DataStore object."""
10601060
if encoding is None:
10611061
encoding = {}
@@ -1074,10 +1074,11 @@ def dump_to_store(self, store, encoder=None, sync=True, encoding=None,
10741074
store.store(variables, attrs, check_encoding,
10751075
unlimited_dims=unlimited_dims)
10761076
if sync:
1077-
store.sync()
1077+
store.sync(compute=compute)
10781078

10791079
def to_netcdf(self, path=None, mode='w', format=None, group=None,
1080-
engine=None, encoding=None, unlimited_dims=None):
1080+
engine=None, encoding=None, unlimited_dims=None,
1081+
compute=True):
10811082
"""Write dataset contents to a netCDF file.
10821083
10831084
Parameters
@@ -1136,16 +1137,20 @@ def to_netcdf(self, path=None, mode='w', format=None, group=None,
11361137
By default, no dimensions are treated as unlimited dimensions.
11371138
Note that unlimited_dims may also be set via
11381139
``dataset.encoding['unlimited_dims']``.
1140+
compute: boolean
1141+
If true compute immediately, otherwise return a
1142+
``dask.delayed.Delayed`` object that can be computed later.
11391143
"""
11401144
if encoding is None:
11411145
encoding = {}
11421146
from ..backends.api import to_netcdf
11431147
return to_netcdf(self, path, mode, format=format, group=group,
11441148
engine=engine, encoding=encoding,
1145-
unlimited_dims=unlimited_dims)
1149+
unlimited_dims=unlimited_dims,
1150+
compute=compute)
11461151

11471152
def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None,
1148-
encoding=None):
1153+
encoding=None, compute=True):
11491154
"""Write dataset contents to a zarr group.
11501155
11511156
.. note:: Experimental
@@ -1167,6 +1172,9 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None,
11671172
Nested dictionary with variable names as keys and dictionaries of
11681173
variable specific encodings as values, e.g.,
11691174
``{'my_variable': {'dtype': 'int16', 'scale_factor': 0.1,}, ...}``
1175+
compute: boolean
1176+
If true compute immediately, otherwise return a
1177+
``dask.delayed.Delayed`` object that can be computed later.
11701178
"""
11711179
if encoding is None:
11721180
encoding = {}
@@ -1176,7 +1184,7 @@ def to_zarr(self, store=None, mode='w-', synchronizer=None, group=None,
11761184
"and 'w-'.")
11771185
from ..backends.api import to_zarr
11781186
return to_zarr(self, store=store, mode=mode, synchronizer=synchronizer,
1179-
group=group, encoding=encoding)
1187+
group=group, encoding=encoding, compute=compute)
11801188

11811189
def __unicode__(self):
11821190
return formatting.dataset_repr(self)

0 commit comments

Comments
 (0)