Skip to content

Commit

Permalink
Fit cached FSs better with async backends (#1429)
Browse files Browse the repository at this point in the history
* Fit cached FSs better with async backends

* Add test
  • Loading branch information
martindurant authored Nov 15, 2023
1 parent 2942485 commit 1637a97
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 1 deletion.
12 changes: 12 additions & 0 deletions fsspec/implementations/cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ def __getattribute__(self, item):
"open",
"cat",
"cat_file",
"cat_ranges",
"get",
"read_block",
"tail",
Expand Down Expand Up @@ -715,6 +716,17 @@ def save_cache(self):
def load_cache(self):
pass

def cat_ranges(
self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
):
lpaths = [self._check_file(p) for p in paths]
rpaths = [p for l, p in zip(lpaths, paths) if l is False]
lpaths = [l for l, p in zip(lpaths, paths) if l is False]
self.fs.get(rpaths, lpaths)
return super().cat_ranges(
paths, starts, ends, max_gap=max_gap, on_error=on_error, **kwargs
)

def _open(self, path, mode="rb", **kwargs):
path = self._strip_protocol(path)

Expand Down
2 changes: 1 addition & 1 deletion fsspec/implementations/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ def __init__(
**(ref_storage_args or target_options or {}), protocol=target_protocol
)
ref_fs, fo2 = fsspec.core.url_to_fs(fo, **dic)
if ref_fs.isfile(fo):
if ref_fs.isfile(fo2):
# text JSON
with fsspec.open(fo, "rb", **dic) as f:
logger.info("Read reference from URL %s", fo)
Expand Down
27 changes: 27 additions & 0 deletions fsspec/implementations/tests/test_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,3 +621,30 @@ def test_mapping_getitems(m):
fs = fsspec.filesystem("reference", fo=refs, fs=h)
mapping = fs.get_mapper("")
assert mapping.getitems(["b", "a"]) == {"a": b"A", "b": b"B"}


def test_cached(m, tmpdir):
fn = f"{tmpdir}/ref.json"

m.pipe({"a": b"A", "b": b"B"})
m.pipe("ref.json", b"""{"a": ["a"], "b": ["b"]}""")

fs = fsspec.filesystem(
"reference",
fo="simplecache::memory://ref.json",
fs=m,
target_options={"cache_storage": str(tmpdir), "same_names": True},
)
assert fs.cat("a") == b"A"
assert os.path.exists(fn)

# truncate original file to show we are loading from the cached version
m.pipe("ref.json", b"")
fs = fsspec.filesystem(
"reference",
fo="simplecache::memory://ref.json",
fs=m,
target_options={"cache_storage": str(tmpdir), "same_names": True},
skip_instance_cache=True,
)
assert fs.cat("a") == b"A"

0 comments on commit 1637a97

Please # to comment.