Skip to content

Commit

Permalink
feat: support stable row ids in Dataset::take_rows() (#2447)
Browse files Browse the repository at this point in the history
Part of #2307

* `Dataset::take_rows()` is now taking `row_ids`, which may now be
stable row ids. These are translated into row addresses internally, and
then use the existing logic.
* `Fragment::take_rows()` now optionally returns row address column, if
asked, instead of row id.
  • Loading branch information
wjones127 authored Jun 21, 2024
1 parent 9703e50 commit 0608cfd
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 152 deletions.
40 changes: 21 additions & 19 deletions rust/lance-table/src/utils/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,30 +196,32 @@ pub fn apply_row_id_and_deletes(
batch.num_columns() > 0 || config.with_row_id || config.with_row_addr || has_deletions
);

let should_fetch_row_id = config.with_row_id || has_deletions;
// If row id sequence is None, then row id IS row address.
let should_fetch_row_addr = config.with_row_addr
|| (config.with_row_id && config.row_id_sequence.is_none())
|| has_deletions;

let num_rows = batch.num_rows() as u32;

let row_addrs =
if config.with_row_addr || (should_fetch_row_id && config.row_id_sequence.is_none()) {
let ids_in_batch = config
.params
.slice(batch_offset as usize, num_rows as usize)
.unwrap()
.to_offsets()
.unwrap();
let row_addrs: UInt64Array = ids_in_batch
.values()
.iter()
.map(|row_id| u64::from(RowAddress::new_from_parts(fragment_id, *row_id)))
.collect();
let row_addrs = if should_fetch_row_addr {
let ids_in_batch = config
.params
.slice(batch_offset as usize, num_rows as usize)
.unwrap()
.to_offsets()
.unwrap();
let row_addrs: UInt64Array = ids_in_batch
.values()
.iter()
.map(|row_id| u64::from(RowAddress::new_from_parts(fragment_id, *row_id)))
.collect();

Some(Arc::new(row_addrs))
} else {
None
};
Some(Arc::new(row_addrs))
} else {
None
};

let row_ids = if should_fetch_row_id {
let row_ids = if config.with_row_id {
if let Some(row_id_sequence) = &config.row_id_sequence {
let row_ids = row_id_sequence
.slice(batch_offset as usize, num_rows as usize)
Expand Down
27 changes: 14 additions & 13 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -944,30 +944,31 @@ impl FileFragment {
}
}

/// Take rows based on internal local row ids
/// Take rows based on internal local row offsets
///
/// If the row ids are out-of-bounds, this will return an error. But if the
/// row id is marked deleted, it will be ignored. Thus, the number of rows
/// returned may be less than the number of row ids provided.
/// If the row offsets are out-of-bounds, this will return an error. But if the
/// row offset is marked deleted, it will be ignored. Thus, the number of rows
/// returned may be less than the number of row offsets provided.
///
/// To recover the original row ids from the returned RecordBatch, set the
/// `with_row_id` parameter to true. This will add a column named `_row_id`
/// To recover the original row addresses from the returned RecordBatch, set the
/// `with_row_address` parameter to true. This will add a column named `_rowaddr`
/// to the RecordBatch at the end.
pub(crate) async fn take_rows(
&self,
row_ids: &[u32],
row_offsets: &[u32],
projection: &Schema,
with_row_id: bool,
with_row_address: bool,
) -> Result<RecordBatch> {
// TODO: support taking row addresses
let reader = self.open(projection, with_row_id, false).await?;
let reader = self.open(projection, false, with_row_address).await?;

if row_ids.len() > 1 && Self::row_ids_contiguous(row_ids) {
let range = (row_ids[0] as usize)..(row_ids[row_ids.len() - 1] as usize + 1);
if row_offsets.len() > 1 && Self::row_ids_contiguous(row_offsets) {
let range =
(row_offsets[0] as usize)..(row_offsets[row_offsets.len() - 1] as usize + 1);
reader.legacy_read_range_as_batch(range).await
} else {
// FIXME, change this method to streams
reader.take_as_batch(row_ids).await
reader.take_as_batch(row_offsets).await
}
}

Expand Down Expand Up @@ -2008,7 +2009,7 @@ mod tests {
&Int32Array::from(vec![121, 125, 128])
);
assert_eq!(
batch.column_by_name(ROW_ID).unwrap().as_ref(),
batch.column_by_name(ROW_ADDR).unwrap().as_ref(),
&UInt64Array::from(vec![(3 << 32) + 1, (3 << 32) + 5, (3 << 32) + 8])
);
}
Expand Down
Loading

0 comments on commit 0608cfd

Please # to comment.