Skip to content

Commit

Permalink
feat: accept file size to skip head request
Browse files Browse the repository at this point in the history
  • Loading branch information
eeroel committed Sep 18, 2023
1 parent 4638fcf commit f4df3a8
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 11 deletions.
2 changes: 1 addition & 1 deletion python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ class DeltaFileSystemHandler:
"""
def normalize_path(self, path: str) -> str:
"""Normalize filesystem path."""
def open_input_file(self, path: str) -> ObjectInputFile:
def open_input_file(self, path: str, size: int | None = None) -> ObjectInputFile:
"""Open an input file for random access reading."""
def open_output_stream(
self, path: str, metadata: dict[str, str] | None = None
Expand Down
32 changes: 28 additions & 4 deletions python/deltalake/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,47 @@ class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler):
DeltaStorageHandler is a concrete implementations of a PyArrow FileSystemHandler.
"""

def open_input_file(self, path: str) -> pa.PythonFile:
known_sizes: Dict[str, int] = {}

def __new__( # type:ignore
cls,
table_uri: str,
storage_options: Optional[Dict[str, str]] = None,
known_sizes: Optional[Dict[str, int]] = None,
):
return super().__new__(
cls, table_uri=table_uri, options=storage_options # type:ignore
)

def __init__(
self,
table_uri: str,
storage_options: Optional[Dict[str, str]] = None,
known_sizes: Optional[Dict[str, int]] = None,
):
if known_sizes:
self.known_sizes = known_sizes
return

def open_input_file(self, path: str, size: Optional[int] = None) -> pa.PythonFile:
"""
Open an input file for random access reading.
:param source: The source to open for reading.
:return: NativeFile
"""
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path))
size = self.known_sizes.get(path)
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path, size))

def open_input_stream(self, path: str) -> pa.PythonFile:
def open_input_stream(self, path: str, size: Optional[int] = None) -> pa.PythonFile:
"""
Open an input stream for sequential reading.
:param source: The source to open for reading.
:return: NativeFile
"""
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path))
size = self.known_sizes.get(path)
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path, size))

def open_output_stream(
self, path: str, metadata: Optional[Dict[str, str]] = None
Expand Down
7 changes: 5 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,10 +522,13 @@ def to_pyarrow_dataset(
)

if not filesystem:
file_sizes = self.get_add_actions().to_pydict()
file_sizes = {
x: y for x, y in zip(file_sizes["path"], file_sizes["size_bytes"])
}
filesystem = pa_fs.PyFileSystem(
DeltaStorageHandler(
self._table.table_uri(),
self._storage_options,
self._table.table_uri(), self._storage_options, file_sizes
)
)

Expand Down
16 changes: 12 additions & 4 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,15 @@ impl DeltaFileSystemHandler {
Ok(())
}

fn open_input_file(&self, path: String) -> PyResult<ObjectInputFile> {
fn open_input_file(&self, path: String, size: Option<i64>) -> PyResult<ObjectInputFile> {
let path = Self::parse_path(&path);
let file = self
.rt
.block_on(ObjectInputFile::try_new(
Arc::clone(&self.rt),
self.inner.clone(),
path,
size,
))
.map_err(PythonError::from)?;
Ok(file)
Expand Down Expand Up @@ -296,11 +297,18 @@ impl ObjectInputFile {
rt: Arc<Runtime>,
store: Arc<DynObjectStore>,
path: Path,
size: Option<i64>,
) -> Result<Self, ObjectStoreError> {
// Issue a HEAD Object to get the content-length and ensure any
// If file size is not given, issue a HEAD Object to get the content-length and ensure any
// errors (e.g. file not found) don't wait until the first read() call.
let meta = store.head(&path).await?;
let content_length = meta.size as i64;
let content_length = match size {
Some(s) => s,
None => {
let meta = store.head(&path).await?;
meta.size as i64
}
};

// TODO make sure content length is valid
// https://github.com/apache/arrow/blob/f184255cbb9bf911ea2a04910f711e1a924b12b8/cpp/src/arrow/filesystem/s3fs.cc#L1083
Ok(Self {
Expand Down
23 changes: 23 additions & 0 deletions python/tests/test_file_system_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,29 @@ def test_open_input_file(file_systems, table_data):
assert file.read_at(10, 0) == arrow_file.read_at(10, 0)


def test_open_input_file_with_size(tmp_path, table_data):
file_path = "table.parquet"
input_size = 12345 # incorrect file size for testing purposes

# test that injected file size gets stored correctly
store1 = DeltaStorageHandler(
str(tmp_path.absolute()), known_sizes={file_path: input_size}
)
wrapped_fs = fs.PyFileSystem(store1)
arrow_fs = fs.SubTreeFileSystem(str(tmp_path.absolute()), fs.LocalFileSystem())
pq.write_table(table_data, file_path, filesystem=arrow_fs)
file = wrapped_fs.open_input_file(file_path)
assert file.size() == input_size

# confirm that true size is different
store2 = DeltaStorageHandler(str(tmp_path.absolute()))
wrapped_fs = fs.PyFileSystem(store2)
arrow_fs = fs.SubTreeFileSystem(str(tmp_path.absolute()), fs.LocalFileSystem())
pq.write_table(table_data, file_path, filesystem=arrow_fs)
file = wrapped_fs.open_input_file(file_path)
assert file.size() != input_size


def test_read_table(file_systems, table_data):
store, arrow_fs = file_systems
file_path = "table.parquet"
Expand Down
19 changes: 19 additions & 0 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import datetime
from pathlib import Path
from threading import Barrier, Thread
from types import SimpleNamespace
from unittest.mock import Mock

from packaging import version
Expand Down Expand Up @@ -104,6 +105,24 @@ def test_read_simple_table_update_incremental():
assert dt.to_pyarrow_dataset().to_table().to_pydict() == {"id": [5, 7, 9]}


def test_read_simple_table_file_sizes_failure():
table_path = "../rust/tests/data/simple_table"
dt = DeltaTable(table_path)
add_actions = dt.get_add_actions().to_pydict()

# set all sizes to -1, the idea is to break the reading
add_actions_modified = {
x: [-1 for item in x] if x == "size_bytes" else y
for x, y in add_actions.items()
}
dt.get_add_actions = lambda: SimpleNamespace(
to_pydict=lambda: add_actions_modified
) # type:ignore

with pytest.raises(OSError, match="Cannot seek past end of file."):
dt.to_pyarrow_dataset().to_table().to_pydict()


def test_read_partitioned_table_to_dict():
table_path = "../rust/tests/data/delta-0.8.0-partitioned"
dt = DeltaTable(table_path)
Expand Down

0 comments on commit f4df3a8

Please # to comment.