Skip to content

Commit

Permalink
bugfix: ReferenceFileSystem.cat() scrambles and omits reference mappi…
Browse files Browse the repository at this point in the history
…ngs (#1436)
  • Loading branch information
arongergely authored Nov 22, 2023
1 parent 14a7788 commit f7b454e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 12 deletions.
16 changes: 9 additions & 7 deletions fsspec/implementations/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,28 +798,30 @@ def cat(self, path, recursive=False, on_error="raise", **kwargs):
out = {}
for proto, paths in proto_dict.items():
fs = self.fss[proto]
urls, starts, ends = [], [], []
urls, starts, ends, valid_paths = [], [], [], []
for p in paths:
# find references or label not-found. Early exit if any not
# found and on_error is "raise"
try:
u, s, e = self._cat_common(p)
urls.append(u)
starts.append(s)
ends.append(e)
except FileNotFoundError as err:
if on_error == "raise":
raise
if on_error != "omit":
out[p] = err
else:
urls.append(u)
starts.append(s)
ends.append(e)
valid_paths.append(p)

# process references into form for merging
urls2 = []
starts2 = []
ends2 = []
paths2 = []
whole_files = set()
for u, s, e, p in zip(urls, starts, ends, paths):
for u, s, e, p in zip(urls, starts, ends, valid_paths):
if isinstance(u, bytes):
# data
out[p] = u
Expand All @@ -831,7 +833,7 @@ def cat(self, path, recursive=False, on_error="raise", **kwargs):
starts2.append(s)
ends2.append(e)
paths2.append(p)
for u, s, e, p in zip(urls, starts, ends, paths):
for u, s, e, p in zip(urls, starts, ends, valid_paths):
# second run to account for files that are to be loaded whole
if s is not None and u not in whole_files:
urls2.append(u)
Expand All @@ -851,7 +853,7 @@ def cat(self, path, recursive=False, on_error="raise", **kwargs):
bytes_out = fs.cat_ranges(new_paths, new_starts, new_ends)

# unbundle from merged bytes - simple approach
for u, s, e, p in zip(urls, starts, ends, paths):
for u, s, e, p in zip(urls, starts, ends, valid_paths):
if p in out:
continue # was bytes, already handled
for np, ns, ne, b in zip(new_paths, new_starts, new_ends, bytes_out):
Expand Down
27 changes: 22 additions & 5 deletions fsspec/implementations/tests/test_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ def test_merging(m):
def test_cat_file_ranges(m):
other = b"other test data"
m.pipe("/b", other)

fs = fsspec.filesystem(
"reference",
fo={
Expand All @@ -467,15 +468,26 @@ def test_cat_file_ranges(m):
assert fs.cat_file("d", 1, -3) == other[4:10][1:-3]


def test_cat_missing(m):
@pytest.mark.parametrize(
"fo",
[
{
"c": ["memory://b"],
"d": ["memory://unknown", 4, 6],
},
{
"c": ["memory://b"],
"d": ["//unknown", 4, 6],
},
],
ids=["memory protocol", "mixed protocols: memory and unspecified"],
)
def test_cat_missing(m, fo):
other = b"other test data"
m.pipe("/b", other)
fs = fsspec.filesystem(
"reference",
fo={
"c": ["memory://b"],
"d": ["memory://unknown", 4, 6],
},
fo=fo,
)
with pytest.raises(FileNotFoundError):
fs.cat("notafile")
Expand Down Expand Up @@ -508,6 +520,11 @@ def test_cat_missing(m):
out = mapper.getitems(["c", "d"], on_error="return")
assert isinstance(out["d"], ReferenceNotReachable)

out = fs.cat(["notone", "c", "d"], on_error="return")
assert isinstance(out["notone"], FileNotFoundError)
assert out["c"] == other
assert isinstance(out["d"], ReferenceNotReachable)

out = mapper.getitems(["c", "d"], on_error="omit")
assert list(out) == ["c"]

Expand Down

0 comments on commit f7b454e

Please # to comment.