/
xarray.py
293 lines (230 loc) · 10.1 KB
/
xarray.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
from typing import List, Literal, Mapping, Optional, Union, overload
from pathlib import Path
import ujson # type: ignore
import xarray as xr
from xarray import register_dataset_accessor
from xarray.backends import BackendArray
from xarray.core.indexes import Index
import virtualizarr.kerchunk as kerchunk
from virtualizarr.kerchunk import KerchunkStoreRefs
from virtualizarr.manifests import ChunkManifest, ManifestArray
from virtualizarr.zarr import dataset_to_zarr, attrs_from_zarr_group_json, metadata_from_zarr_json
class ManifestBackendArray(ManifestArray, BackendArray):
"""Using this prevents xarray from wrapping the KerchunkArray in ExplicitIndexingAdapter etc."""
...
def open_virtual_dataset(
filepath: str,
filetype: Optional[str] = None,
drop_variables: Optional[List[str]] = None,
indexes: Optional[Mapping[str, Index]] = None,
virtual_array_class=ManifestArray,
) -> xr.Dataset:
"""
Open a file or store as an xarray Dataset wrapping virtualized zarr arrays.
No data variables will be loaded.
Xarray indexes can optionally be created (the default behaviour). To avoid creating any xarray indexes pass indexes={}.
Parameters
----------
filepath : str, default None
File path to open as a set of virtualized zarr arrays.
filetype : str, default None
Type of file to be opened. Used to determine which kerchunk file format backend to use.
Can be one of {'netCDF3', 'netCDF4', 'zarr_v3'}.
If not provided will attempt to automatically infer the correct filetype from the the filepath's extension.
drop_variables: list[str], default is None
Variables in the file to drop before returning.
indexes : Mapping[str, Index], default is None
Indexes to use on the returned xarray Dataset.
Default is None, which will read any 1D coordinate data to create in-memory Pandas indexes.
To avoid creating any indexes, pass indexes={}.
virtual_array_class
Virtual array class to use to represent the references to the chunks in each on-disk array.
Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that.
"""
if drop_variables is None:
drop_variables = []
if virtual_array_class is not ManifestArray:
raise NotImplementedError()
if filetype == "zarr_v3":
# TODO is there a neat way of auto-detecting this?
return open_virtual_dataset_from_v3_store(storepath=filepath, drop_variables=drop_variables, indexes=indexes)
else:
# this is the only place we actually always need to use kerchunk directly
vds_refs = kerchunk.read_kerchunk_references_from_file(
filepath=filepath,
filetype=filetype,
)
if indexes is None:
# add default indexes by reading data from file
# TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables...
# TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references
ds = xr.open_dataset(filepath)
indexes = ds.xindexes
ds.close()
vds = dataset_from_kerchunk_refs(
vds_refs,
drop_variables=drop_variables,
virtual_array_class=virtual_array_class,
indexes=indexes,
)
# TODO we should probably also use vds.set_close() to tell xarray how to close the file we opened
return vds
def open_virtual_dataset_from_v3_store(
storepath: str,
drop_variables: List[str],
indexes: Optional[Mapping[str, Index]],
) -> xr.Dataset:
"""
Read a Zarr v3 store and return an xarray Dataset containing virtualized arrays.
"""
_storepath = Path(storepath)
ds_attrs = attrs_from_zarr_group_json(_storepath / "zarr.json")
# TODO recursive glob to create a datatree
vars = {}
for array_dir in _storepath.glob("*/"):
var_name = array_dir.name
if var_name in drop_variables:
break
zarray, dim_names, attrs = metadata_from_zarr_json(array_dir / "zarr.json")
manifest = ChunkManifest.from_zarr_json(str(array_dir / "manifest.json"))
marr = ManifestArray(chunkmanifest=manifest, zarray=zarray)
var = xr.Variable(data=marr, dims=dim_names, attrs=attrs)
vars[var_name] = var
if indexes is None:
raise NotImplementedError()
data_vars, coords = separate_coords(vars, indexes)
ds = xr.Dataset(
data_vars,
coords=coords,
# indexes={}, # TODO should be added in a later version of xarray
attrs=ds_attrs,
)
return ds
def dataset_from_kerchunk_refs(
refs: KerchunkStoreRefs,
drop_variables: List[str] = [],
virtual_array_class: type = ManifestArray,
indexes: Optional[Mapping[str, Index]] = None,
) -> xr.Dataset:
"""
Translate a store-level kerchunk reference dict into an xarray Dataset containing virtualized arrays.
drop_variables: list[str], default is None
Variables in the file to drop before returning.
virtual_array_class
Virtual array class to use to represent the references to the chunks in each on-disk array.
Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that.
"""
var_names = kerchunk.find_var_names(refs)
if drop_variables is None:
drop_variables = []
var_names_to_keep = [
var_name for var_name in var_names if var_name not in drop_variables
]
vars = {}
for var_name in var_names_to_keep:
vars[var_name] = variable_from_kerchunk_refs(
refs, var_name, virtual_array_class
)
data_vars, coords = separate_coords(vars, indexes)
ds_attrs = kerchunk.fully_decode_arr_refs(refs["refs"]).get(".zattrs", {})
ds = xr.Dataset(
data_vars,
coords=coords,
# indexes={}, # TODO should be added in a later version of xarray
attrs=ds_attrs,
)
return ds
def variable_from_kerchunk_refs(
refs: KerchunkStoreRefs, var_name: str, virtual_array_class
) -> xr.Variable:
"""Create a single xarray Variable by reading specific keys of a kerchunk references dict."""
arr_refs = kerchunk.extract_array_refs(refs, var_name)
chunk_dict, zarray, zattrs = kerchunk.parse_array_refs(arr_refs)
manifest = ChunkManifest._from_kerchunk_chunk_dict(chunk_dict)
dims = zattrs["_ARRAY_DIMENSIONS"]
varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest)
return xr.Variable(data=varr, dims=dims, attrs=zattrs)
def separate_coords(
vars: dict[str, xr.Variable],
indexes={},
) -> tuple[dict[str, xr.Variable], xr.Coordinates]:
"""
Try to generate a set of coordinates that won't cause xarray to automatically build a pandas.Index for the 1D coordinates.
I thought this should be easy but it was actually really hard - in the end I had to checkout xarray v2023.08.0, the last one before #8107 was merged.
"""
# this would normally come from CF decoding, let's hope the fact we're skipping that doesn't cause any problems...
coord_names: List[str] = []
# split data and coordinate variables (promote dimension coordinates)
data_vars = {}
coord_vars = {}
for name, var in vars.items():
if name in coord_names or var.dims == (name,):
# use workaround to avoid creating IndexVariables described here https://github.com/pydata/xarray/pull/8107#discussion_r1311214263
if len(var.dims) == 1:
dim1d, *_ = var.dims
coord_vars[name] = (dim1d, var.data)
else:
coord_vars[name] = var
else:
data_vars[name] = var
coords = xr.Coordinates(coord_vars, indexes=indexes)
return data_vars, coords
@register_dataset_accessor("virtualize")
class VirtualiZarrDatasetAccessor:
"""
Xarray accessor for writing out virtual datasets to disk.
Methods on this object are called via `ds.virtualize.{method}`.
"""
def __init__(self, ds):
self.ds = ds
def to_zarr(self, storepath: str) -> None:
"""
Serialize all virtualized arrays in this xarray dataset as a Zarr store.
Currently requires all variables to be backed by ManifestArray objects.
Not very useful until some implementation of a Zarr reader can actually read these manifest.json files.
See https://github.com/zarr-developers/zarr-specs/issues/287
Parameters
----------
storepath : str
"""
dataset_to_zarr(self.ds, storepath)
@overload
def to_kerchunk(self, filepath: None, format: Literal["dict"]) -> KerchunkStoreRefs:
...
@overload
def to_kerchunk(self, filepath: str, format: Literal["json"]) -> None:
...
@overload
def to_kerchunk(self, filepath: str, format: Literal["parquet"]) -> None:
...
def to_kerchunk(
self,
filepath: Optional[str] = None,
format: Union[Literal["dict"], Literal["json"], Literal["parquet"]] = "dict",
) -> Union[KerchunkStoreRefs, None]:
"""
Serialize all virtualized arrays in this xarray dataset into the kerchunk references format.
Parameters
----------
filepath : str, default: None
File path to write kerchunk references into. Not required if format is 'dict'.
format : 'dict', 'json', or 'parquet'
Format to serialize the kerchunk references as.
If 'json' or 'parquet' then the 'filepath' argument is required.
References
----------
https://fsspec.github.io/kerchunk/spec.html
"""
refs = kerchunk.dataset_to_kerchunk_refs(self.ds)
if format == "dict":
return refs
elif format == "json":
if filepath is None:
raise ValueError("Filepath must be provided when format is 'json'")
with open(filepath, "w") as json_file:
ujson.dump(refs, json_file)
return None
elif format == "parquet":
raise NotImplementedError()
else:
raise ValueError(f"Unrecognized output format: {format}")