-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Xarray open_mfdataset with engine Zarr #4187
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Changes from all commits
f55ed1c
49f6512
f35a3e5
9f728aa
8d0a844
b3b0f1d
2221943
ac35e7c
64654f3
d5a5cef
4c0ef19
d158c21
5171420
e1e51bb
b6bf2cf
3bc4be8
a79b125
2d3bbb5
f7cf580
53c8623
34d755e
b39b37e
276006a
6f04be6
aa97e1a
06de16a
f94fc9f
16e08e3
22828fc
021f2cc
985f28c
e8ed887
d693514
df34f18
98351c7
160bd67
7e57e9b
ac0f093
6a1516c
8999faf
5df0985
2d94ea2
377ef53
f48c84b
8376cca
aed1cc5
b488363
bae7f10
37ff214
b8b98f5
5c37329
831f15b
80dd7da
4ebf380
4ce3007
89a780b
6f6eb23
62893ab
13be3e0
1977ba1
afbcf78
cba93c3
bc740f6
746caa6
ef2a0f6
a7b24ff
6823721
3fa73a7
70fa30e
cb6d066
cd0b9ef
f2c368a
0530bba
738303b
38fc6f7
aca2012
0b34ab8
6fbeadf
6f6aae7
543a1c7
aa4833f
6b99225
5571fff
b9a239e
e9c35f9
827e546
1484a2a
ad6f31e
e2e1c81
dce4e7c
31ce87d
09bf681
47f1e32
da42dab
9b9dc3a
4a0b922
cd783a5
2c28a98
7b34d1b
40c4d46
2c73e0b
3b0e9b1
d4398d4
da7baae
48dae50
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
from ..core.pycompat import integer_types | ||
from ..core.utils import FrozenDict, HiddenKeyDict | ||
from ..core.variable import Variable | ||
from .api import open_dataset | ||
from .common import AbstractWritableDataStore, BackendArray, _encode_variable_name | ||
|
||
# need some special secret attributes to tell us the dimensions | ||
|
@@ -361,6 +362,51 @@ def encode_variable(self, variable): | |
def encode_attribute(self, a): | ||
return encode_zarr_attr_value(a) | ||
|
||
def get_chunk(self, name, var, chunks): | ||
chunk_spec = dict(zip(var.dims, var.encoding.get("chunks"))) | ||
|
||
# Coordinate labels aren't chunked | ||
if var.ndim == 1 and var.dims[0] == name: | ||
return chunk_spec | ||
|
||
if chunks == "auto": | ||
return chunk_spec | ||
|
||
for dim in var.dims: | ||
if dim in chunks: | ||
spec = chunks[dim] | ||
if isinstance(spec, int): | ||
spec = (spec,) | ||
if isinstance(spec, (tuple, list)) and chunk_spec[dim]: | ||
if any(s % chunk_spec[dim] for s in spec): | ||
warnings.warn( | ||
"Specified Dask chunks %r would " | ||
"separate Zarr chunk shape %r for " | ||
"dimension %r. This significantly " | ||
"degrades performance. Consider " | ||
"rechunking after loading instead." | ||
% (chunks[dim], chunk_spec[dim], dim), | ||
stacklevel=2, | ||
) | ||
chunk_spec[dim] = chunks[dim] | ||
return chunk_spec | ||
|
||
def maybe_chunk(self, name, var, chunks, overwrite_encoded_chunks): | ||
chunk_spec = self.get_chunk(name, var, chunks) | ||
|
||
if (var.ndim > 0) and (chunk_spec is not None): | ||
from dask.base import tokenize | ||
|
||
# does this cause any data to be read? | ||
token2 = tokenize(name, var._data, chunks) | ||
name2 = f"xarray-{name}-{token2}" | ||
var = var.chunk(chunk_spec, name=name2, lock=None) | ||
if overwrite_encoded_chunks and var.chunks is not None: | ||
var.encoding["chunks"] = tuple(x[0] for x in var.chunks) | ||
return var | ||
else: | ||
return var | ||
|
||
def store( | ||
self, | ||
variables, | ||
|
@@ -601,130 +647,33 @@ def open_zarr( | |
---------- | ||
http://zarr.readthedocs.io/ | ||
""" | ||
if "auto_chunk" in kwargs: | ||
auto_chunk = kwargs.pop("auto_chunk") | ||
if auto_chunk: | ||
chunks = "auto" # maintain backwards compatibility | ||
else: | ||
chunks = None | ||
|
||
warnings.warn( | ||
"auto_chunk is deprecated. Use chunks='auto' instead.", | ||
FutureWarning, | ||
stacklevel=2, | ||
) | ||
|
||
if kwargs: | ||
raise TypeError( | ||
"open_zarr() got unexpected keyword arguments " + ",".join(kwargs.keys()) | ||
) | ||
|
||
if not isinstance(chunks, (int, dict)): | ||
if chunks != "auto" and chunks is not None: | ||
raise ValueError( | ||
"chunks must be an int, dict, 'auto', or None. " | ||
"Instead found %s. " % chunks | ||
) | ||
|
||
if chunks == "auto": | ||
try: | ||
import dask.array # noqa | ||
except ImportError: | ||
chunks = None | ||
|
||
if not decode_cf: | ||
mask_and_scale = False | ||
decode_times = False | ||
concat_characters = False | ||
decode_coords = False | ||
decode_timedelta = False | ||
|
||
def maybe_decode_store(store, lock=False): | ||
ds = conventions.decode_cf( | ||
store, | ||
mask_and_scale=mask_and_scale, | ||
decode_times=decode_times, | ||
concat_characters=concat_characters, | ||
decode_coords=decode_coords, | ||
drop_variables=drop_variables, | ||
decode_timedelta=decode_timedelta, | ||
use_cftime=use_cftime, | ||
) | ||
backend_kwargs = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We'll need to update There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I've updated the io.rst documentation to use
|
||
"synchronizer": synchronizer, | ||
"consolidated": consolidated, | ||
"overwrite_encoded_chunks": overwrite_encoded_chunks, | ||
"chunk_store": chunk_store, | ||
} | ||
|
||
# TODO: this is where we would apply caching | ||
|
||
return ds | ||
|
||
# Zarr supports a wide range of access modes, but for now xarray either | ||
# reads or writes from a store, never both. For open_zarr, we only read | ||
mode = "r" | ||
zarr_store = ZarrStore.open_group( | ||
store, | ||
mode=mode, | ||
synchronizer=synchronizer, | ||
ds = open_dataset( | ||
filename_or_obj=store, | ||
group=group, | ||
consolidated=consolidated, | ||
chunk_store=chunk_store, | ||
decode_cf=decode_cf, | ||
mask_and_scale=mask_and_scale, | ||
decode_times=decode_times, | ||
concat_characters=concat_characters, | ||
decode_coords=decode_coords, | ||
engine="zarr", | ||
chunks=chunks, | ||
drop_variables=drop_variables, | ||
backend_kwargs=backend_kwargs, | ||
decode_timedelta=decode_timedelta, | ||
use_cftime=use_cftime, | ||
) | ||
ds = maybe_decode_store(zarr_store) | ||
|
||
# auto chunking needs to be here and not in ZarrStore because variable | ||
# chunks do not survive decode_cf | ||
# return trivial case | ||
if not chunks: | ||
return ds | ||
|
||
# adapted from Dataset.Chunk() | ||
if isinstance(chunks, int): | ||
chunks = dict.fromkeys(ds.dims, chunks) | ||
|
||
if isinstance(chunks, tuple) and len(chunks) == len(ds.dims): | ||
chunks = dict(zip(ds.dims, chunks)) | ||
|
||
def get_chunk(name, var, chunks): | ||
chunk_spec = dict(zip(var.dims, var.encoding.get("chunks"))) | ||
|
||
# Coordinate labels aren't chunked | ||
if var.ndim == 1 and var.dims[0] == name: | ||
return chunk_spec | ||
|
||
if chunks == "auto": | ||
return chunk_spec | ||
|
||
for dim in var.dims: | ||
if dim in chunks: | ||
spec = chunks[dim] | ||
if isinstance(spec, int): | ||
spec = (spec,) | ||
if isinstance(spec, (tuple, list)) and chunk_spec[dim]: | ||
if any(s % chunk_spec[dim] for s in spec): | ||
warnings.warn( | ||
"Specified Dask chunks %r would " | ||
"separate Zarr chunk shape %r for " | ||
"dimension %r. This significantly " | ||
"degrades performance. Consider " | ||
"rechunking after loading instead." | ||
% (chunks[dim], chunk_spec[dim], dim), | ||
stacklevel=2, | ||
) | ||
chunk_spec[dim] = chunks[dim] | ||
return chunk_spec | ||
|
||
def maybe_chunk(name, var, chunks): | ||
from dask.base import tokenize | ||
|
||
chunk_spec = get_chunk(name, var, chunks) | ||
|
||
if (var.ndim > 0) and (chunk_spec is not None): | ||
# does this cause any data to be read? | ||
token2 = tokenize(name, var._data) | ||
name2 = "zarr-%s" % token2 | ||
var = var.chunk(chunk_spec, name=name2, lock=None) | ||
if overwrite_encoded_chunks and var.chunks is not None: | ||
var.encoding["chunks"] = tuple(x[0] for x in var.chunks) | ||
return var | ||
else: | ||
return var | ||
|
||
variables = {k: maybe_chunk(k, v, chunks) for k, v in ds.variables.items()} | ||
return ds._replace_vars_and_dims(variables) | ||
return ds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern with this code is that introducing an entirely separate code-path inside
open_dataset()
for chunking zarr in particular feels strange and a little unexpected. Any time we use totally separate code branches for some logic, the odds of introducing inconsistencies/bugs increases greatly.I wonder if we could consolidate this logic somehow in order to avoid adding a separate branch for the code here? For example, we could put a
get_chunk
method on all xarray backends classes, even if it currently only returns a filler value and/or raises an error forchunks='auto'
? Chunking is not unique to zarr, e.g., netCDF4 files also have chunks, although the default "auto" chunking logic should probably be different.I would be OK holding this off for a later clean-up, but this really would be worth doing eventually. CC @alexamici RE: the backends refactor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out! I agree completely that the
open_dataset()
function is overdue for a refactor, it was a nightmare to go through all the if-then branches, but the comprehensive test suite helped to catch most of the bugs and I've tested it on my own real world dataset so the logic should be ok for now 🤞.Personally I would prefer to hold this off, since this
open_mfdataset
PR (and the previous one at #4003) has been sitting around for months, and I've had to resolve quite a few merge conflicts to keep up. No point in contaminating this complex PR by refactoring the NetCDF logic either.