-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathnetcdf.py
99 lines (89 loc) · 4.17 KB
/
netcdf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# -*- coding: utf-8 -*-
import fsspec
from distutils.version import LooseVersion
from intake.source.base import PatternMixin
from intake.source.utils import reverse_format
from .base import DataSourceMixin
class NetCDFSource(DataSourceMixin, PatternMixin):
"""Open a xarray file.
Parameters
----------
urlpath : str, List[str]
Path to source file. May include glob "*" characters, format
pattern strings, or list.
Some examples:
- ``{{ CATALOG_DIR }}/data/air.nc``
- ``{{ CATALOG_DIR }}/data/*.nc``
- ``{{ CATALOG_DIR }}/data/air_{year}.nc``
chunks : int or dict, optional
Chunks is used to load the new dataset into dask
arrays. ``chunks={}`` loads the dataset with dask using a single
chunk for all arrays.
combine : ({'by_coords', 'nested'}, optional)
Which function is used to concatenate all the files when urlpath
has a wildcard. It is recommended to set this argument in all
your catalogs because the default has changed and is going to change.
It was "nested", and is now the default of xarray.open_mfdataset
which is "auto_combine", and is planed to change from "auto" to
"by_corrds" in a near future.
concat_dim : str, optional
Name of dimension along which to concatenate the files. Can
be new or pre-existing if combine is "nested". Must be None or new if
combine is "by_coords".
path_as_pattern : bool or str, optional
Whether to treat the path as a pattern (ie. ``data_{field}.nc``)
and create new coodinates in the output corresponding to pattern
fields. If str, is treated as pattern to match on. Default is True.
xarray_kwargs: dict
Additional xarray kwargs for xr.open_dataset() or xr.open_mfdataset().
storage_options: dict
If using a remote fs (whether caching locally or not), these are
the kwargs to pass to that FS.
"""
name = 'netcdf'
def __init__(self, urlpath, chunks=None, combine=None, concat_dim=None,
xarray_kwargs=None, metadata=None,
path_as_pattern=True, storage_options=None, **kwargs):
self.path_as_pattern = path_as_pattern
self.urlpath = urlpath
self.chunks = chunks
self.concat_dim = concat_dim
self.combine = combine
self.storage_options = storage_options or {}
self.xarray_kwargs = xarray_kwargs or {}
self._ds = None
if isinstance(self.urlpath, list):
self._can_be_local = fsspec.utils.can_be_local(self.urlpath[0])
else:
self._can_be_local = fsspec.utils.can_be_local(self.urlpath)
super(NetCDFSource, self).__init__(metadata=metadata, **kwargs)
def _open_dataset(self):
import xarray as xr
url = self.urlpath
kwargs = self.xarray_kwargs
if "*" in url or isinstance(url, list):
_open_dataset = xr.open_mfdataset
if self.pattern:
kwargs.update(preprocess=self._add_path_to_ds)
if self.combine is not None:
if 'combine' in kwargs:
raise Exception("Setting 'combine' argument twice in the catalog is invalid")
kwargs.update(combine=self.combine)
if self.concat_dim is not None:
if 'concat_dim' in kwargs:
raise Exception("Setting 'concat_dim' argument twice in the catalog is invalid")
kwargs.update(concat_dim=self.concat_dim)
else:
_open_dataset = xr.open_dataset
if self._can_be_local:
url = fsspec.open_local(self.urlpath, **self.storage_options)
else:
# https://github.com/intake/filesystem_spec/issues/476#issuecomment-732372918
url = fsspec.open(self.urlpath, **self.storage_options).open()
self._ds = _open_dataset(url, chunks=self.chunks, **kwargs)
def _add_path_to_ds(self, ds):
"""Adding path info to a coord for a particular file
"""
var = next(var for var in ds)
new_coords = reverse_format(self.pattern, ds[var].encoding['source'])
return ds.assign_coords(**new_coords)