Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Optimize IO scheduling for conflict resolution #3057

Open
wjones127 opened this issue Oct 28, 2024 · 0 comments · May be fixed by #3117
Open

Optimize IO scheduling for conflict resolution #3057

wjones127 opened this issue Oct 28, 2024 · 0 comments · May be fixed by #3117
Assignees

Comments

@wjones127
Copy link
Contributor

wjones127 commented Oct 28, 2024

When we commit a transaction, we load each of the transaction files from concurrent commits to check for conflicts. However, this is currently inefficient in two ways:

  1. We do this completely synchronously, reading a transaction file and processing one at a time
    • Using a buffered read would be appropriate
    • We also might optimize the path to find the transaction file path. Right now, it requires reading a whole manifest file. It would be nice if we could get that path in fewer IOPs.
  2. We don't seem to cache the transaction files, so each commit may have to re-open these transaction files.
    • A small TTL cache would be appropriate here

The logic for this is at:

loop {
version += 1;
match dataset.checkout_version(version).await {
Ok(next_dataset) => {
let other_txn = if let Some(txn_file) = &next_dataset.manifest.transaction_file {
Some(read_transaction_file(object_store, &next_dataset.base, txn_file).await?)
} else {
None
};
other_transactions.push(other_txn);
dataset = next_dataset;
}
Err(crate::Error::NotFound { .. }) | Err(crate::Error::DatasetNotFound { .. }) => {
break;
}
Err(e) => {
return Err(e);
}
}
}
let mut target_version = version;
if is_detached_version(target_version) {
return Err(Error::Internal { message: "more than 2^65 versions have been created and so regular version numbers are appearing as 'detached' versions.".into(), location: location!() });
}
// If any of them conflict with the transaction, return an error
for (version_offset, other_transaction) in other_transactions.iter().enumerate() {
let other_version = transaction.read_version + version_offset as u64 + 1;
check_transaction(transaction, other_version, other_transaction)?;
}

You can see this clearly with the following repro:

import lance
import pyarrow as pa
import numpy as np

from lance.tracing import trace_to_chrome
trace_to_chrome("trace.json", level="debug")

nrows = 10 * 1024
ntransactions = 100

data = pa.table({
    'id': range(nrows),
    'vectors': pa.FixedShapeTensorArray.from_numpy_ndarray(np.random.rand(nrows, 128))
})
data_size = data.get_total_buffer_size()
print(f"Data size: {data_size:,} bytes")

from concurrent.futures import ThreadPoolExecutor, as_completed

current_version = lance.dataset("test").version

with ThreadPoolExecutor(10) as executor:
    # Concurrently write fragments
    futures = [
        executor.submit(lance.fragment.write_fragments, data, "test")
        for _ in range(ntransactions)
    ]
    # Serially commit transactions
    for future in as_completed(futures):
        fragments = future.result()
        operation = lance.LanceOperation.Append(fragments)
        # read_version turns out to be critical for performance
        lance.LanceDataset.commit("test", operation, read_version=current_version)
# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant