-
Notifications
You must be signed in to change notification settings - Fork 7
/
xarray.py
210 lines (168 loc) · 7.54 KB
/
xarray.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
from typing import List, Literal, Optional, Union, overload
import ujson # type: ignore
import xarray as xr
from xarray import register_dataset_accessor
from xarray.backends import BackendArray
import virtualizarr.kerchunk as kerchunk
from virtualizarr.kerchunk import KerchunkStoreRefs
from virtualizarr.manifests import ChunkManifest, ManifestArray
from virtualizarr.zarr import dataset_to_zarr
class ManifestBackendArray(ManifestArray, BackendArray):
"""Using this prevents xarray from wrapping the KerchunkArray in ExplicitIndexingAdapter etc."""
...
def open_dataset_via_kerchunk(
filepath: str,
filetype: str,
drop_variables: Optional[List[str]] = None,
virtual_array_class=ManifestArray,
) -> xr.Dataset:
"""
Use kerchunk to open a single legacy file as an xarray Dataset wrapping virtualized zarr arrays.
It's important that we avoid creating any IndexVariables, as our virtualized zarr array objects don't actually contain a collection that can be turned into a pandas.Index.
Parameters
----------
filepath : str, default None
File path to open as a set of virtualized zarr arrays.
filetype : str, default None
Type of file to be opened. Used to determine which kerchunk file format backend to use.
If not provided will attempt to automatically infer the correct filetype from the the filepath's extension.
drop_variables: list[str], default is None
Variables in the file to drop before returning.
virtual_array_class
Virtual array class to use to represent the references to the chunks in each on-disk array.
Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that.
"""
# this is the only place we actually always need to use kerchunk directly
ds_refs = kerchunk.read_kerchunk_references_from_file(
filepath=filepath,
filetype=filetype,
)
ds = dataset_from_kerchunk_refs(
ds_refs,
drop_variables=drop_variables,
virtual_array_class=virtual_array_class,
)
# TODO we should probably also use ds.set_close() to tell xarray how to close the file we opened
return ds
def dataset_from_kerchunk_refs(
refs: KerchunkStoreRefs,
drop_variables: Optional[List[str]] = None,
virtual_array_class=ManifestArray,
) -> xr.Dataset:
"""
Translate a store-level kerchunk reference dict into an xarray Dataset containing virtualized arrays.
drop_variables: list[str], default is None
Variables in the file to drop before returning.
virtual_array_class
Virtual array class to use to represent the references to the chunks in each on-disk array.
Currently can only be ManifestArray, but once VirtualZarrArray is implemented the default should be changed to that.
"""
var_names = kerchunk.find_var_names(refs)
if drop_variables is None:
drop_variables = []
var_names_to_keep = [
var_name for var_name in var_names if var_name not in drop_variables
]
vars = {}
for var_name in var_names_to_keep:
# TODO abstract all this parsing into a function/method?
arr_refs = kerchunk.extract_array_refs(refs, var_name)
chunk_dict, zarray, zattrs = kerchunk.parse_array_refs(arr_refs)
manifest = ChunkManifest.from_kerchunk_chunk_dict(chunk_dict)
dims = zattrs["_ARRAY_DIMENSIONS"]
varr = virtual_array_class(zarray=zarray, chunkmanifest=manifest)
vars[var_name] = xr.Variable(data=varr, dims=dims, attrs=zattrs)
data_vars, coords = separate_coords(vars)
ds_attrs = kerchunk.fully_decode_arr_refs(refs["refs"]).get(".zattrs", {})
ds = xr.Dataset(
data_vars,
coords=coords,
# indexes={}, # this kwarg does exist in later versions of xarray
attrs=ds_attrs,
)
return ds
def separate_coords(
vars: dict[str, xr.Variable]
) -> tuple[dict[str, xr.Variable], xr.Coordinates]:
"""
Try to generate a set of coordinates that won't cause xarray to automatically build a pandas.Index for the 1D coordinates.
I thought this should be easy but it was actually really hard - in the end I had to checkout xarray v2023.08.0, the last one before #8107 was merged.
"""
# this would normally come from CF decoding, let's hope the fact we're skipping that doesn't cause any problems...
coord_names: List[str] = []
# split data and coordinate variables (promote dimension coordinates)
data_vars = {}
coord_vars = {}
for name, var in vars.items():
if name in coord_names or var.dims == (name,):
# use workaround to avoid creating IndexVariables described here https://github.com/pydata/xarray/pull/8107#discussion_r1311214263
if len(var.dims) == 1:
dim1d, *_ = var.dims
coord_vars[name] = (dim1d, var.data)
else:
coord_vars[name] = var
else:
data_vars[name] = var
# this is stolen from https://github.com/pydata/xarray/pull/8051
# needed otherwise xarray errors whilst trying to turn the KerchunkArrays for the 1D coordinate variables into indexes
# but it doesn't appear to work with `main` since #8107, which is why the workaround above is needed
# EDIT: actually even the workaround doesn't work - to avoid creating indexes I had to checkout xarray v2023.08.0, the last one before #8107 was merged
set_indexes = False
if set_indexes:
coords = coord_vars
else:
# explict Coordinates object with no index passed
coords = xr.Coordinates(coord_vars, indexes={})
return data_vars, coords
@register_dataset_accessor("virtualize")
class VirtualiZarrDatasetAccessor:
def __init__(self, ds):
self.ds = ds
def to_zarr(self, storepath: str) -> None:
"""
Write out all virtualized arrays as a new Zarr store on disk.
Parameters
----------
filepath : str, default: None
"""
dataset_to_zarr(self.ds, storepath)
@overload
def to_kerchunk(self, filepath: None, format: Literal["dict"]) -> KerchunkStoreRefs:
...
@overload
def to_kerchunk(self, filepath: str, format: Literal["json"]) -> None:
...
@overload
def to_kerchunk(self, filepath: str, format: Literal["parquet"]) -> None:
...
def to_kerchunk(
self,
filepath: Optional[str] = None,
format: Union[Literal["dict"], Literal["json"], Literal["parquet"]] = "dict",
) -> Union[KerchunkStoreRefs, None]:
"""
Serialize all virtualized arrays into the kerchunk references format.
Parameters
----------
filepath : str, default: None
File path to write kerchunk references into. Not required if format is 'dict'.
format : 'dict', 'json', or 'parquet'
Format to serialize the kerchunk references as.
If 'json' or 'parquet' then the 'filepath' argument is required.
References
----------
https://fsspec.github.io/kerchunk/spec.html
"""
refs = kerchunk.dataset_to_kerchunk_refs(self.ds)
if format == "dict":
return refs
elif format == "json":
if filepath is None:
raise ValueError("Filepath must be provided when format is 'json'")
with open(filepath, "w") as json_file:
ujson.dump(refs, json_file)
return None
elif format == "parquet":
raise NotImplementedError()
else:
raise ValueError(f"Unrecognized output format: {format}")