Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to write into existing Zarr archive due to mismatching dask chunks #10034

Open
5 tasks
meteoDaniel opened this issue Feb 7, 2025 · 6 comments
Open
5 tasks
Labels
bug needs triage Issue that has not been reviewed by xarray team member

Comments

@meteoDaniel
Copy link

What happened?

I am having a Zarr archive (v3) that I open this way: xarray.open_zarr(self.parsed_data_path, zarr_format=3, consolidated=False)

<xarray.Dataset> Size: 4GB
Dimensions:                       (dt_calc: 1, dt_fore: 49, lat: 746, lon: 1215)
Coordinates:
    lon                           (lat, lon) float64 7MB dask.array<chunksize=(187, 304), meta=np.ndarray>
    lat                           (lat, lon) float64 7MB dask.array<chunksize=(187, 304), meta=np.ndarray>
  * dt_fore                       (dt_fore) float64 392B 0.0 1.0 ... 47.0 48.0
  * dt_calc                       (dt_calc) datetime64[ns] 8B 2025-01-22
Data variables: (12/41)
    total_cloud_cover             (dt_calc, dt_fore, lat, lon) int16 89MB dask.array<chunksize=(1, 49, 10, 10), meta=np.ndarray>
    wind_speed_341                (dt_calc, dt_fore, lat, lon) int16 89MB dask.array<chunksize=(1, 49, 10, 10), meta=np.ndarray>
    wind_direction_149            (dt_calc, dt_fore, lat, lon) int16 89MB dask.array<chunksize=(1, 49, 10, 10), meta=np.ndarray>
    wind_speed_271                (dt_calc, dt_fore, lat, lon) int16 89MB dask.array<chunksize=(1, 49, 10, 10), meta=np.ndarray>
    wind_direction_97             (dt_calc, dt_fore, lat, lon) int16 89MB dask.array<chunksize=(1, 49, 10, 10), meta=np.ndarray>
    specific_humidity_3000        (dt_calc, dt_fore, lat, lon) float16 89MB dask.array<chunksize=(1, 49, 10, 10), meta=np.ndarray>
    ...                            ...
    geopotential_height_500       (dt_calc, dt_fore, lat, lon) int16 89MB dask.array<chunksize=(1, 49, 10, 10), meta=np.ndarray>
    wind_direction_271            (dt_calc, dt_fore, lat, lon) int16 89MB dask.array<chunksize=(1, 49, 10, 10), meta=np.ndarray>
    air_temperature_850           (dt_calc, dt_fore, lat, lon) float16 89MB dask.array<chunksize=(1, 49, 10, 10), meta=np.ndarray>
    wind_direction_10             (dt_calc, dt_fore, lat, lon) int16 89MB dask.array<chunksize=(1, 49, 10, 10), meta=np.ndarray>
    medium_level_clouds           (dt_calc, dt_fore, lat, lon) int16 89MB dask.array<chunksize=(1, 49, 10, 10), meta=np.ndarray>
    geopotential_height_850       (dt_calc, dt_fore, lat, lon) int16 89MB dask.array<chunksize=(1, 49, 10, 10), meta=np.ndarray>

Now I want to write this into another Zarr archive with different chunksizes = (31, 49, 50, 50). When I do this the same way I did before I receive the following error message:

ValueError: Specified zarr chunks encoding['chunks']=(31, 49, 50, 50) for variable named 'wind_direction_97' would overlap multiple dask chunks ((31,), (49,), (10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 6), (10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 5)) on the region (slice(None, None, None), slice(None, None, None), slice(None, None, None), slice(None, None, None)). Writing this array in parallel with dask could lead to corrupted data. Consider either rechunking using `chunk()`, deleting or modifying `encoding['chunks']`, or specify `safe_chunks=False`.

So I chunk the data as suggested:

self.parsed_data = self.parsed_data.chunk({COLUMN_DT_CALC: 31, COLUMN_DT_FORE: 49, "lat": 50, "lon": 50})

Afterwards I receive another error:

ValueError: Specified zarr chunks encoding['chunks']=(31, 49, 50, 50) for variable named 'wind_direction_97' would overlap multiple dask chunks ((1,), (49,), (50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 46), (50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 15)) on the region (slice(21, 22, None), slice(None, 49, None), slice(None, 746, None), slice(None, 1215, None)). Writing this array in parallel with dask could lead to corrupted data. Consider either rechunking using `chunk()`, deleting or modifying `encoding['chunks']`, or specify `safe_chunks=False`

I am trying to debug this for several hours now and I can't find any helpful notes, so I hope anyone of you can help me to fix this. I applied drop_encoding() before.

The Zarr archive will be initialized as followed:

def _initialize_zarr_store(
    store: Union[zarr.storage.FsspecStore, Path],
    dataset: xarray.Dataset,
    encoding: Dict[str, Any],
    full_time_dimension: str,
    write_dim: str,
    zarr_format: int,
):
    """Initialize the Zarr store with an empty template if it does not already exist."""
    template = xarray.full_like(dataset, np.nan)
    try:
        template = (
            template.rename({write_dim: "drop_dim"})
            .isel(drop_dim=0, drop=True)
            .expand_dims(dim={write_dim: full_time_dimension}, axis=0)
        )
        template.to_zarr(
            store,
            mode="w-",
            encoding=encoding,
            compute=False,
            consolidated=(zarr_format < 3),
            zarr_format=zarr_format,
        )

    except (zarr.errors.ContainsGroupError, FileExistsError):
        # Store already exists, no need to create a new template
        pass

Please note that I had to add this line: template = xarray.full_like(dataset, np.nan) after the update to the latest version, as compute=False does not work correctly.

And this is how I write the data with to_zarr:

    dataset.to_zarr(
        store,
        region=region,
        mode="r+",
        compute=True,
        consolidated=True if zarr_format < 3 else False,
        zarr_format=zarr_format,
    )

Note that there was no Issue with versions: zarr==2.17.2, xarray==2025.1.1

What did you expect to happen?

That I am able to write a dataset (loaded from a Zarrv3 archive) into a region of another Zarr archive that is in version 2.

Minimal Complete Verifiable Example

MVCE confirmation

  • Minimal example — the example is as focused as reasonably possible to demonstrate the underlying issue in xarray.
  • Complete example — the example is self-contained, including all data and the text of any traceback.
  • Verifiable example — the example copy & pastes into an IPython prompt or Binder notebook, returning the result.
  • New issue — a search of GitHub Issues suggests this is not a duplicate.
  • Recent environment — the issue occurs with the latest version of xarray and its dependencies.

Relevant log output

Anything else we need to know?

No response

Environment

INSTALLED VERSIONS

commit: None
python: 3.12.8 (main, Feb 4 2025, 04:56:13) [GCC 12.2.0]
python-bits: 64
OS: Linux
OS-release: 6.8.0-110052-tuxedo
machine: x86_64
processor:
byteorder: little
LC_ALL: None
LANG: C.UTF-8
LOCALE: ('C', 'UTF-8')
libhdf5: 1.14.2
libnetcdf: 4.9.4-development

xarray: 2025.1.2
pandas: 2.2.3
numpy: 2.2.0
scipy: 1.14.1
netCDF4: 1.7.2
pydap: None
h5netcdf: 1.4.1
h5py: 3.12.1
zarr: 3.0.1
cftime: 1.6.4.post1
nc_time_axis: None
iris: None
bottleneck: None
dask: 2024.11.2
distributed: 2024.11.2
matplotlib: 3.9.3
cartopy: None
seaborn: None
numbagg: None
fsspec: 2024.12.0
cupy: None
pint: None
sparse: None
flox: None
numpy_groupies: None
setuptools: 75.6.0
pip: 24.3.1
conda: None
pytest: 7.3.1
mypy: None
IPython: 8.11.0
sphinx: None

@meteoDaniel meteoDaniel added bug needs triage Issue that has not been reviewed by xarray team member labels Feb 7, 2025
@meteoDaniel
Copy link
Author

Dear @rabernat ,

Do you have an idea Whats wrong here?

@josephnowak
Copy link
Contributor

josephnowak commented Feb 11, 2025

Hi, it's very strange to me that it works with Zarr==2.17.2 but not with more recent versions taking into account that the validation of the chunks is on the Xarray side, I think I can take a deeper look next week, but your issue may be related with this one #9767 (you will find a code there that allows aligning the Dask-Zarr chunks), meaning that your datasets are not ending up in a full chunk and when you want to append a new array to them you have to take into account that last chunk shape to align the Dask chunks.

@rabernat
Copy link
Contributor

Thanks for reporting this issue @meteoDaniel!

To obtain support from the Xarray developers, it would be great if you could craft a minimal, complete, verifiable example of this bug, ideally using synthetic data. Without that, we can only speculate about what might be wrong.

@meteoDaniel
Copy link
Author

Dear @rabernat, thanks for the hint. I know that this is helpful but at this time I was not able to prepare one. I thought that there is maybe sth. obviously wrong that is easy to Spot for you.

I will try to find some time to prepare an executable test case.

@meteoDaniel
Copy link
Author

@josephnowak @rabernat : Here is a fully reproduicable example:

import xarray
import zarr
from typing import Union, Dict, Optional, List, Tuple, Any
from datetime import datetime, timedelta
from pathlib import Path
import pandas as pd
import numcodecs
import numpy as np

def encoding_for_datasets_as_zarr(
    dataset: xarray.Dataset,
    coordinate_dimensions: List[str],
    data_type_mapping: Dict[str, str],
    chunks: Optional[
        Union[Tuple[int, int, int, int], Tuple[int, int, int, int, int]]
    ] = None,
    zarr_format: int = 2,
) -> Dict[str, Dict[str, Any]]:
    """defines encoding for dataset to store as zarr"""
    compressor = numcodecs.Blosc(
        cname="zstd", clevel=6, shuffle=2
    )  # shuffle=2 means bitshuffle

    compressor_naming = "compressor"
    if zarr_format == 3:
        compressor = zarr.codecs.BloscCodec(
            cname="zstd", clevel=3, shuffle=zarr.codecs.BloscShuffle.bitshuffle
        )
        compressor_naming = "compressors"

    encoding = {
        key: {"dtype": data_type_mapping[key], compressor_naming: compressor}
        for key in list(dataset.keys())
        if key not in coordinate_dimensions
    }
    for coord_dimension in coordinate_dimensions:
        if coord_dimension == "station_id":
            encoding.update({coord_dimension: {"dtype": "object"}})
        else:
            encoding.update({coord_dimension: {"dtype": "float64"}})

    if chunks is not None:
        for key in list(dataset.keys()):
            if key not in coordinate_dimensions:
                encoding[key]["chunks"] = chunks

        for key in ["lat", "lon"]:
            encoding.update(
                {key: {"chunks": (chunks[-2], chunks[-1]), "dtype": "float64"}}
            )

    return encoding


def _initialize_zarr_store(
    store: Union[zarr.storage.FsspecStore, Path],
    dataset: xarray.Dataset,
    encoding: Dict[str, Any],
    full_time_dimension: str,
    write_dim: str,
    zarr_format: int,
) -> None:
    """Initialize the Zarr store with an empty template if it does not already exist."""
    template = xarray.full_like(dataset, np.nan)
    try:
        template = (
            template.rename({write_dim: "drop_dim"})
            .isel(drop_dim=0, drop=True)
            .expand_dims(dim={write_dim: full_time_dimension}, axis=0)
        )
        template.to_zarr(
            store,
            mode="w-",
            encoding=encoding,
            compute=False,
            consolidated=(zarr_format < 3),
            zarr_format=zarr_format,
        )

    except (zarr.errors.ContainsGroupError, FileExistsError):
        # Store already exists, no need to create a new template
        pass


def store_xarray_as_zarr_monthly(
    store: Union[zarr.storage.FsspecStore, Path],
    dataset: xarray.Dataset,
    data_type_mapping: Dict[str, str],
    append_dim_timestamp: datetime,
    write_dim: str = "dt_calc",
    time_dimensions: Optional[List[str]] = None,
    interval: str = "1D",
    zarr_format: int = 2,
) -> None:
    """
    Takes xarray.Dataset object and stores it in the given s3 bucket

    Note: Be aware that you have ti chunk x/y or lat/lon dimension before.
    Only chunk level append_dim will be changed

    Args:
        store: Path or zarr.storage.FsspecStore object as location to expand and write the zarr archive
        dataset: xarray.Dataset objects contains the data
        data_type_mapping: is the final mapping to the dtype for the given data
        append_dim_timestamp: name of the month the data will be stored for
        write_dim: dimension to write and expand data along zarr archive
        time_dimensions: all time dimensions will be stored as float64 and has to be encoded additionally
        interval: (or freq) required to define the number of positions along append_dim axis
        zarr_format (int): define which zarr version should be used

    Note:
        Food analogy for optimal chunks:
        🥟 Ravioli chunks: (100, 100, 100): this chunk shape is optimized for isotropic, random access across
             the whole array.
        🥞 Lasagna / Pancake chunks(slices for big areas at one time) : (1, 1000, 1000) - this chunk shape is optimized
             for reading and writing data in contiguous slices. This is appropriate for 2D spatial data stacked in time
             when the goal is making maps.
        🍝 Spaghetti chunks (point slices over time): (1000, 30, 30) - this chunk shape is optimized for extracting
            long series from a few individual points.

    Returns:
        None, stores data in s3
    """
    if time_dimensions is None:
        time_dimensions = ["dt_calc", "dt_fore"]

    append_dim_timestamp = append_dim_timestamp.replace(second=0, microsecond=0)

    start_date_range = append_dim_timestamp.replace(day=1)
    end_date_range = (append_dim_timestamp.replace(day=27) + timedelta(days=6)).replace(
        day=1
    )

    full_time_dimension = (
        pd.to_datetime(
            pd.date_range(
                start_date_range,
                end_date_range,
                inclusive="left",
                freq=interval,
            )
        )
        .to_pydatetime()
        .tolist()
    )
    optimal_chunks = {"dt_calc": 31, "dt_fore": 49, "lat": 50, "lon": 50}

    encoding = encoding_for_datasets_as_zarr(
        dataset,
        time_dimensions,
        data_type_mapping,
        chunks=tuple(optimal_chunks.values()),
        zarr_format=zarr_format,
    )
    _initialize_zarr_store(
        store,
        dataset,
        encoding,
        full_time_dimension,
        write_dim,
        zarr_format,
    )
    idx = full_time_dimension.index(append_dim_timestamp)
    region = {
        dim_name: slice(value)
        for dim_name, value in dataset.sizes.items()
        if dim_name != write_dim
    }
    region.update({write_dim: slice(idx, idx + 1)})
    dataset.to_zarr(
        store,
        region=region,
        mode="r+",
        compute=True,
        consolidated=True if zarr_format < 3 else False,
        zarr_format=zarr_format,
    )


dt_calc = pd.Timestamp("2025-01-23")
dt_fore = np.arange(49)  # Forecast lead times
lat = np.linspace(-90, 90, 602)  # Latitude values
lon = np.linspace(-180, 180, 1204)  # Longitude values
chunk_size_intermediate_file = (1, 49, 15, 15)
test_path_intermediate = Path("/app/test_int.zarr")
test_path_archive = Path("/app/test_archive.zarr")
# Create random air temperature values
air_temperature_2m = np.random.rand(len(dt_fore), len(lat), len(lon)) * 30  # Temperatures in Celsius

# Create xarray dataset
ds = xarray.Dataset(
    {
        "air_temperature_2m": (['dt_calc' ,'dt_fore', 'lat', 'lon'], np.expand_dims(air_temperature_2m, axis=0))
    },
    coords={
        "dt_calc": [dt_calc],
        "dt_fore": dt_fore,
        "lat": lat,
        "lon": lon,
    }
)
encoding = encoding_for_datasets_as_zarr(
    ds,
    ["dt_calc", "dt_fore"],
    {"air_temperature_2m": "float"},
    chunks=chunk_size_intermediate_file,
    zarr_format=3,
)
shard_size = (chunk_size_intermediate_file[0], chunk_size_intermediate_file[1], chunk_size_intermediate_file[2] * 5, chunk_size_intermediate_file[3] * 5)
encoding["air_temperature_2m"]["shards"] = shard_size

ds.to_zarr(
    test_path_intermediate,
    mode="w-",
    encoding=encoding,
    compute=True,
)

del ds

chunk_size_archive = {"dt_calc": 31, "dt_fore": 49, "lat": 50, "lon": 50}
ds = xarray.open_zarr(test_path_intermediate, zarr_format=3, consolidated=False).drop_encoding()
ds = ds.chunk(chunk_size_archive)

store_xarray_as_zarr_monthly(
    test_path_archive,
    ds,
{"air_temperature_2m": "float"},
    dt_calc,
    "dt_calc",
    zarr_format=2
)

@meteoDaniel meteoDaniel changed the title Unable to write into existing Zarr archive due to mismatching dask chunks | compute=False does not work Unable to write into existing Zarr archive due to mismatching dask chunks Feb 14, 2025
@meteoDaniel
Copy link
Author

I followed the solution suggested by @josephnowak and added the following lines right before to_zarr.

    archive_dataset = xarray.open_zarr(store, zarr_format=zarr_format, consolidated=True if zarr_format < 3 else False)
    dataset = xarray.concat([archive_dataset, dataset], dim=write_dim).chunk({"lat": 50, "lon": 50}).isel(
        dt_calc=[-1]
    )

This is not solving the issue. How do I know when data will be corrupted ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug needs triage Issue that has not been reviewed by xarray team member
Projects
None yet
Development

No branches or pull requests

3 participants