diff --git a/docs/usage.md b/docs/usage.md index 53228b4..0eacf17 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -338,6 +338,25 @@ Currently you can only serialize virtual variables backed by `ManifestArray` obj ### Writing as Zarr -TODO: Write out references as a Zarr v3 store following the [Chunk Manifest ZEP](https://github.com/zarr-developers/zarr-specs/issues/287), see [PR #45](https://github.com/TomNicholas/VirtualiZarr/pull/45) +Alternatively, we can write these references out as an actual Zarr store, at least one that is compliant with the [proposed "Chunk Manifest" ZEP](https://github.com/zarr-developers/zarr-specs/issues/287). To do this we simply use the {py:meth}`ds.virtualize.to_zarr ` accessor method. -TODO: Explanation of how this requires changes in zarr upstream to be able to read it +```python +combined_vds.virtualize.to_zarr('combined.zarr') +``` + +The result is a zarr v3 store on disk which contains the chunk manifest information written out as `manifest.json` files, so the store looks like this: + +``` +combined/zarr.json <- group metadata +combined/air/zarr.json <- array metadata +combined/air/manifest.json <- array manifest +... +``` + +The advantage of this format is that any zarr v3 reader that understands the chunk manifest ZEP could read from this store, no matter what language it is written in (e.g. via `zarr-python`, `zarr-js`, or rust). This reading would also not require `fsspec`. + +```{note} +Currently there are not yet any zarr v3 readers which understand the chunk manifest ZEP, so until then this feature cannot be used for data processing. + +This store can however be read by {py:func}`~virtualizarr.xarray.open_virtual_dataset`, by passing `filetype="zarr_v3"`. +``` diff --git a/virtualizarr/manifests/manifest.py b/virtualizarr/manifests/manifest.py index 4778876..b12813e 100644 --- a/virtualizarr/manifests/manifest.py +++ b/virtualizarr/manifests/manifest.py @@ -1,4 +1,5 @@ import itertools +import json import re from typing import Any, Iterable, Iterator, List, Mapping, Tuple, Union, cast @@ -110,14 +111,19 @@ def dict(self) -> dict[str, dict[str, Union[str, int]]]: """Converts the entire manifest to a nested dictionary.""" return {k: dict(entry) for k, entry in self.entries.items()} - @staticmethod - def from_zarr_json(filepath: str) -> "ChunkManifest": + @classmethod + def from_zarr_json(cls, filepath: str) -> "ChunkManifest": """Create a ChunkManifest from a Zarr manifest.json file.""" - raise NotImplementedError() + with open(filepath, "r") as manifest_file: + entries_dict = json.load(manifest_file) + + entries = {cast(ChunkKey, k): ChunkEntry(**entry) for k, entry in entries_dict.items()} + return cls(entries=entries) def to_zarr_json(self, filepath: str) -> None: """Write a ChunkManifest to a Zarr manifest.json file.""" - raise NotImplementedError() + with open(filepath, "w") as json_file: + json.dump(self.dict(), json_file, indent=4, separators=(", ", ": ")) @classmethod def _from_kerchunk_chunk_dict(cls, kerchunk_chunk_dict) -> "ChunkManifest": diff --git a/virtualizarr/tests/test_zarr.py b/virtualizarr/tests/test_zarr.py new file mode 100644 index 0000000..2faf43c --- /dev/null +++ b/virtualizarr/tests/test_zarr.py @@ -0,0 +1,27 @@ +import xarray as xr +import numpy as np +import xarray.testing as xrt +from virtualizarr import open_virtual_dataset, ManifestArray +from virtualizarr.manifests.manifest import ChunkEntry + + +def test_zarr_v3_roundtrip(tmpdir): + arr = ManifestArray( + chunkmanifest={"0.0": ChunkEntry(path="test.nc", offset=6144, length=48)}, + zarray=dict( + shape=(2, 3), + dtype=np.dtype(" + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/virtualizarr/vendor/zarr/__init__.py b/virtualizarr/vendor/zarr/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/virtualizarr/vendor/zarr/utils.py b/virtualizarr/vendor/zarr/utils.py new file mode 100644 index 0000000..918d6b5 --- /dev/null +++ b/virtualizarr/vendor/zarr/utils.py @@ -0,0 +1,22 @@ +import json +import numbers + +from typing import Any + + +class NumberEncoder(json.JSONEncoder): + def default(self, o): + # See json.JSONEncoder.default docstring for explanation + # This is necessary to encode numpy dtype + if isinstance(o, numbers.Integral): + return int(o) + if isinstance(o, numbers.Real): + return float(o) + return json.JSONEncoder.default(self, o) + + +def json_dumps(o: Any) -> bytes: + """Write JSON in a consistent, human-readable way.""" + return json.dumps( + o, indent=4, sort_keys=True, ensure_ascii=True, separators=(",", ": "), cls=NumberEncoder + ).encode("ascii") diff --git a/virtualizarr/xarray.py b/virtualizarr/xarray.py index 5c3c854..1fa8357 100644 --- a/virtualizarr/xarray.py +++ b/virtualizarr/xarray.py @@ -1,3 +1,4 @@ +from pathlib import Path from typing import List, Literal, Mapping, Optional, Union, overload, MutableMapping, Iterable import ujson # type: ignore @@ -10,6 +11,7 @@ import virtualizarr.kerchunk as kerchunk from virtualizarr.kerchunk import KerchunkStoreRefs, FileType from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.zarr import dataset_to_zarr, attrs_from_zarr_group_json, metadata_from_zarr_json class ManifestBackendArray(ManifestArray, BackendArray): @@ -39,7 +41,7 @@ def open_virtual_dataset( File path to open as a set of virtualized zarr arrays. filetype : FileType, default None Type of file to be opened. Used to determine which kerchunk file format backend to use. - Can be one of {'netCDF3', 'netCDF4'}. + Can be one of {'netCDF3', 'netCDF4', 'zarr_v3'}. If not provided will attempt to automatically infer the correct filetype from the the filepath's extension. drop_variables: list[str], default is None Variables in the file to drop before returning. @@ -76,57 +78,116 @@ def open_virtual_dataset( if common: raise ValueError(f"Cannot both load and drop variables {common}") - # this is the only place we actually always need to use kerchunk directly - # TODO avoid even reading byte ranges for variables that will be dropped later anyway? - vds_refs = kerchunk.read_kerchunk_references_from_file( - filepath=filepath, - filetype=filetype, - ) - virtual_vars = virtual_vars_from_kerchunk_refs( - vds_refs, - drop_variables=drop_variables + loadable_variables, - virtual_array_class=virtual_array_class, - ) - ds_attrs = kerchunk.fully_decode_arr_refs(vds_refs["refs"]).get(".zattrs", {}) - - if indexes is None or len(loadable_variables) > 0: - # TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables... - # TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references - # TODO really we probably want a dedicated xarray backend that iterates over all variables only once - ds = xr.open_dataset(filepath, drop_variables=drop_variables) - - if indexes is None: - # add default indexes by reading data from file - indexes = {name: index for name, index in ds.xindexes.items()} - elif indexes != {}: - # TODO allow manual specification of index objects - raise NotImplementedError() - else: - indexes = dict(**indexes) # for type hinting: to allow mutation - loadable_vars = {name: var for name, var in ds.variables.items() if name in loadable_variables} + if virtual_array_class is not ManifestArray: + raise NotImplementedError() - # if we only read the indexes we can just close the file right away as nothing is lazy - if loadable_vars == {}: - ds.close() + if filetype == "zarr_v3": + # TODO is there a neat way of auto-detecting this? + return open_virtual_dataset_from_v3_store(storepath=filepath, drop_variables=drop_variables, indexes=indexes) else: - loadable_vars = {} - indexes = {} + # this is the only place we actually always need to use kerchunk directly + # TODO avoid even reading byte ranges for variables that will be dropped later anyway? + vds_refs = kerchunk.read_kerchunk_references_from_file( + filepath=filepath, + filetype=filetype, + ) + virtual_vars = virtual_vars_from_kerchunk_refs( + vds_refs, + drop_variables=drop_variables + loadable_variables, + virtual_array_class=virtual_array_class, + ) + ds_attrs = kerchunk.fully_decode_arr_refs(vds_refs["refs"]).get(".zattrs", {}) + + if indexes is None or len(loadable_variables) > 0: + # TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables... + # TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references + # TODO really we probably want a dedicated xarray backend that iterates over all variables only once + ds = xr.open_dataset(filepath, drop_variables=drop_variables) + + if indexes is None: + # add default indexes by reading data from file + indexes = {name: index for name, index in ds.xindexes.items()} + elif indexes != {}: + # TODO allow manual specification of index objects + raise NotImplementedError() + else: + indexes = dict(**indexes) # for type hinting: to allow mutation + + loadable_vars = {name: var for name, var in ds.variables.items() if name in loadable_variables} + + # if we only read the indexes we can just close the file right away as nothing is lazy + if loadable_vars == {}: + ds.close() + else: + loadable_vars = {} + indexes = {} + + vars = {**virtual_vars, **loadable_vars} + + data_vars, coords = separate_coords(vars, indexes) + + vds = xr.Dataset( + data_vars, + coords=coords, + # indexes={}, # TODO should be added in a later version of xarray + attrs=ds_attrs, + ) + + # TODO we should probably also use vds.set_close() to tell xarray how to close the file we opened - vars = {**virtual_vars, **loadable_vars} + return vds + + +def open_virtual_dataset_from_v3_store( + storepath: str, + drop_variables: List[str], + indexes: Optional[Mapping[str, Index]], +) -> xr.Dataset: + """ + Read a Zarr v3 store and return an xarray Dataset containing virtualized arrays. + """ + _storepath = Path(storepath) + + ds_attrs = attrs_from_zarr_group_json(_storepath / "zarr.json") + + # TODO recursive glob to create a datatree + # Note: this .is_file() check should not be necessary according to the pathlib docs, but tests fail on github CI without it + # see https://github.com/TomNicholas/VirtualiZarr/pull/45#discussion_r1547833166 + all_paths = _storepath.glob("*/") + directory_paths = [p for p in all_paths if not p.is_file()] + + vars = {} + for array_dir in directory_paths: + var_name = array_dir.name + if var_name in drop_variables: + break + + zarray, dim_names, attrs = metadata_from_zarr_json(array_dir / "zarr.json") + manifest = ChunkManifest.from_zarr_json(str(array_dir / "manifest.json")) + + marr = ManifestArray(chunkmanifest=manifest, zarray=zarray) + var = xr.Variable(data=marr, dims=dim_names, attrs=attrs) + vars[var_name] = var + + if indexes is None: + raise NotImplementedError() + elif indexes != {}: + # TODO allow manual specification of index objects + raise NotImplementedError() + else: + indexes = dict(**indexes) # for type hinting: to allow mutation data_vars, coords = separate_coords(vars, indexes) - vds = xr.Dataset( + ds = xr.Dataset( data_vars, coords=coords, # indexes={}, # TODO should be added in a later version of xarray attrs=ds_attrs, ) - # TODO we should probably also use vds.set_close() to tell xarray how to close the file we opened - - return vds + return ds def virtual_vars_from_kerchunk_refs( @@ -161,9 +222,9 @@ def virtual_vars_from_kerchunk_refs( def dataset_from_kerchunk_refs( refs: KerchunkStoreRefs, - drop_variables: Optional[List[str]] = None, - virtual_array_class=ManifestArray, - indexes={}, + drop_variables: List[str] = [], + virtual_array_class: type = ManifestArray, + indexes: Optional[MutableMapping[str, Index]] = None, ) -> xr.Dataset: """ Translate a store-level kerchunk reference dict into an xarray Dataset containing virtualized arrays. @@ -177,6 +238,8 @@ def dataset_from_kerchunk_refs( vars = virtual_vars_from_kerchunk_refs(refs, drop_variables, virtual_array_class) + if indexes is None: + indexes = {} data_vars, coords = separate_coords(vars, indexes) ds_attrs = kerchunk.fully_decode_arr_refs(refs["refs"]).get(".zattrs", {}) @@ -261,13 +324,16 @@ def to_zarr(self, storepath: str) -> None: """ Serialize all virtualized arrays in this xarray dataset as a Zarr store. + Currently requires all variables to be backed by ManifestArray objects. + + Not very useful until some implementation of a Zarr reader can actually read these manifest.json files. + See https://github.com/zarr-developers/zarr-specs/issues/287 + Parameters ---------- storepath : str """ - raise NotImplementedError( - "No point in writing out these virtual arrays to Zarr until at least one Zarr reader can actually read them." - ) + dataset_to_zarr(self.ds, storepath) @overload def to_kerchunk(self, filepath: None, format: Literal["dict"]) -> KerchunkStoreRefs: diff --git a/virtualizarr/zarr.py b/virtualizarr/zarr.py index f94bcd0..5477413 100644 --- a/virtualizarr/zarr.py +++ b/virtualizarr/zarr.py @@ -1,8 +1,16 @@ -from typing import Any, Literal, NewType, Optional, Tuple, Union, List, Dict + +from pathlib import Path +from typing import Any, Literal, NewType, Optional, Tuple, Union, List, Dict, TYPE_CHECKING +import json import numpy as np import ujson # type: ignore +import xarray as xr from pydantic import BaseModel, ConfigDict, field_validator +from virtualizarr.vendor.zarr.utils import json_dumps + +if TYPE_CHECKING: + pass # TODO replace these with classes imported directly from Zarr? (i.e. Zarr Object Models) ZAttrs = NewType( @@ -82,11 +90,18 @@ def from_kerchunk_refs(cls, decoded_arr_refs_zarray) -> "ZArray": zarr_format=int(decoded_arr_refs_zarray["zarr_format"]), ) - def to_kerchunk_json(self) -> str: + def dict(self) -> dict[str, Any]: zarray_dict = dict(self) - # TODO not sure if there is a better way to get the ' str: + return ujson.dumps(self.dict()) + + +def encode_dtype(dtype: np.dtype) -> str: + # TODO not sure if there is a better way to get the ' int: @@ -96,3 +111,149 @@ def ceildiv(a: int, b: int) -> int: See https://stackoverflow.com/questions/14822184/is-there-a-ceiling-equivalent-of-operator-in-python """ return -(a // -b) + + +def dataset_to_zarr(ds: xr.Dataset, storepath: str) -> None: + """ + Write an xarray dataset whose variables wrap ManifestArrays to a v3 Zarr store, writing chunk references into manifest.json files. + + Currently requires all variables to be backed by ManifestArray objects. + + Not very useful until some implementation of a Zarr reader can actually read these manifest.json files. + See https://github.com/zarr-developers/zarr-specs/issues/287 + + Parameters + ---------- + ds: xr.Dataset + storepath: str + """ + + from virtualizarr.manifests import ManifestArray + + _storepath = Path(storepath) + Path.mkdir(_storepath, exist_ok=False) + + # should techically loop over groups in a tree but a dataset corresponds to only one group + group_metadata = { + "zarr_format": 3, + "node_type": "group", + "attributes": ds.attrs + } + with open(_storepath / 'zarr.json', "wb") as group_metadata_file: + group_metadata_file.write(json_dumps(group_metadata)) + + for name, var in ds.variables.items(): + array_dir = _storepath / name + marr = var.data + + # TODO move this check outside the writing loop so we don't write an incomplete store on failure? + # TODO at some point this should be generalized to also write in-memory arrays as normal zarr chunks, see GH isse #62. + if not isinstance(marr, ManifestArray): + raise TypeError( + "Only xarray objects wrapping ManifestArrays can be written to zarr using this method, " + f"but variable {name} wraps an array of type {type(marr)}" + ) + + Path.mkdir(array_dir, exist_ok=False) + + # write the chunk references into a manifest.json file + # and the array metadata into a zarr.json file + to_zarr_json(var, array_dir) + + +def to_zarr_json(var: xr.Variable, array_dir: Path) -> None: + """ + Write out both the zarr.json and manifest.json file into the given zarr array directory. + + Follows the Zarr v3 manifest storage transformer ZEP (see https://github.com/zarr-developers/zarr-specs/issues/287). + + Parameters + ---------- + var : xr.Variable + Must be wrapping a ManifestArray + dirpath : str + Zarr store array directory into which to write files. + """ + + marr = var.data + + marr.manifest.to_zarr_json(array_dir / 'manifest.json') + + metadata = zarr_v3_array_metadata(marr.zarray, list(var.dims), var.attrs) + with open(array_dir / 'zarr.json', "wb") as metadata_file: + metadata_file.write(json_dumps(metadata)) + + +def zarr_v3_array_metadata(zarray: ZArray, dim_names: List[str], attrs: dict) -> dict: + """Construct a v3-compliant metadata dict from v2 zarray + information stored on the xarray variable.""" + # TODO it would be nice if we could use the zarr-python metadata.ArrayMetadata classes to do this conversion for us + + metadata = zarray.dict() + + # adjust to match v3 spec + metadata["zarr_format"] = 3 + metadata["node_type"] = "array" + metadata["data_type"] = str(np.dtype(metadata.pop("dtype"))) + metadata["chunk_grid"] = {"name": "regular", "configuration": {"chunk_shape": metadata.pop("chunks")}} + metadata["chunk_key_encoding"] = { + "name": "default", + "configuration": { + "separator": "/" + } + } + metadata["codecs"] = metadata.pop("filters") + metadata.pop("compressor") # TODO this should be entered in codecs somehow + metadata.pop("order") # TODO this should be replaced by a transpose codec + + # indicate that we're using the manifest storage transformer ZEP + metadata["storage_transformers"] = [ + { + "name": "chunk-manifest-json", + "configuration": { + "manifest": "./manifest.json" + } + } + ] + + # add information from xarray object + metadata["dimension_names"] = dim_names + metadata["attributes"] = attrs + + return metadata + + +def attrs_from_zarr_group_json(filepath: Path) -> dict: + with open(filepath, "r") as metadata_file: + attrs = json.load(metadata_file) + return attrs["attributes"] + + +def metadata_from_zarr_json(filepath: Path) -> Tuple[ZArray, List[str], dict]: + with open(filepath, "r") as metadata_file: + metadata = json.load(metadata_file) + + if { + "name": "chunk-manifest-json", + "configuration": { + "manifest": "./manifest.json", + } + } not in metadata.get("storage_transformers", []): + raise ValueError("Can only read byte ranges from Zarr v3 stores which implement the manifest storage transformer ZEP.") + + attrs = metadata.pop("attributes") + dim_names = metadata.pop("dimension_names") + + chunk_shape = metadata["chunk_grid"]["configuration"]["chunk_shape"] + + zarray = ZArray( + chunks=metadata["chunk_grid"]["configuration"]["chunk_shape"], + compressor=metadata["codecs"], + dtype=np.dtype(metadata["data_type"]), + fill_value=metadata["fill_value"], + filters=metadata.get("filters", None), + order="C", + shape=chunk_shape, + zarr_format=3, + ) + + return zarray, dim_names, attrs