Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Stream files into the local store while capturing them #12563

Merged
merged 4 commits into from
Aug 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions src/rust/engine/fs/fs_util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,11 @@ async fn execute(top_match: &clap::ArgMatches<'_>) -> Result<(), ExitError> {
.ok_or_else(|| format!("Tried to save file {:?} but it did not exist", path))?;
match file {
fs::Stat::File(f) => {
let digest = store::OneOffStoreFileByDigest::new(store.clone(), Arc::new(posix_fs))
.store_by_digest(f)
.await
.unwrap();
let digest =
store::OneOffStoreFileByDigest::new(store.clone(), Arc::new(posix_fs), false)
.store_by_digest(f)
.await
.unwrap();

let report = ensure_uploaded_to_remote(&store, store_has_remote, digest)
.await
Expand Down Expand Up @@ -458,7 +459,7 @@ async fn execute(top_match: &clap::ArgMatches<'_>) -> Result<(), ExitError> {

let snapshot = Snapshot::from_path_stats(
store_copy.clone(),
store::OneOffStoreFileByDigest::new(store_copy, posix_fs),
store::OneOffStoreFileByDigest::new(store_copy, posix_fs, false),
paths,
)
.await?;
Expand Down
29 changes: 3 additions & 26 deletions src/rust/engine/fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub use crate::glob_matching::{
};

use std::cmp::min;
use std::io::{self, Read};
use std::io;
use std::ops::Deref;
use std::os::unix::fs::PermissionsExt;
use std::path::{Component, Path, PathBuf};
Expand Down Expand Up @@ -522,31 +522,8 @@ impl PosixFS {
self.ignore.is_ignored(stat)
}

pub async fn read_file(&self, file: &File) -> Result<FileContent, io::Error> {
let path = file.path.clone();
let path_abs = self.root.0.join(&file.path);
self
.executor
.spawn_blocking(move || {
let is_executable = path_abs.metadata()?.permissions().mode() & 0o100 == 0o100;
std::fs::File::open(&path_abs)
.and_then(|mut f| {
let mut content = Vec::new();
f.read_to_end(&mut content)?;
Ok(FileContent {
path: path,
content: Bytes::from(content),
is_executable,
})
})
.map_err(|e| {
io::Error::new(
e.kind(),
format!("Failed to read file {:?}: {}", path_abs, e),
)
})
})
.await
pub fn file_path(&self, file: &File) -> PathBuf {
self.root.0.join(&file.path)
}

pub async fn read_link(&self, link: &Link) -> Result<PathBuf, io::Error> {
Expand Down
35 changes: 7 additions & 28 deletions src/rust/engine/fs/src/posixfs_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,16 @@ async fn is_executable_true() {
}

#[tokio::test]
async fn read_file() {
async fn file_path() {
let dir = tempfile::TempDir::new().unwrap();
let path = PathBuf::from("marmosets");
let content = "cute".as_bytes().to_vec();
make_file(
&std::fs::canonicalize(dir.path()).unwrap().join(&path),
&content,
0o600,
);
let fs = new_posixfs(&dir.path());
let file_content = fs
.read_file(&File {
path: path.clone(),
is_executable: false,
})
.await
.unwrap();
assert_eq!(file_content.path, path);
assert_eq!(file_content.content, content);
}

#[tokio::test]
async fn read_file_missing() {
let dir = tempfile::TempDir::new().unwrap();
new_posixfs(&dir.path())
.read_file(&File {
path: PathBuf::from("marmosets"),
is_executable: false,
})
.await
.expect_err("Expected error");
let expected_path = std::fs::canonicalize(dir.path()).unwrap().join(&path);
let actual_path = fs.file_path(&File {
path: path.clone(),
is_executable: false,
});
assert_eq!(actual_path, expected_path);
}

#[tokio::test]
Expand Down
6 changes: 3 additions & 3 deletions src/rust/engine/fs/store/benches/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub fn criterion_benchmark_snapshot_capture(c: &mut Criterion) {
(20, 10_000_000, true, 10),
(1, 200_000_000, true, 10),
] {
let (count, size, _immutable, captures) = params;
let (count, size, immutable, captures) = params;
let storedir = TempDir::new().unwrap();
let store = Store::local_only(executor.clone(), storedir.path()).unwrap();
let (tempdir, path_stats) = tempdir_containing(count, size);
Expand All @@ -114,7 +114,7 @@ pub fn criterion_benchmark_snapshot_capture(c: &mut Criterion) {
let _ = executor
.block_on(Snapshot::digest_from_path_stats(
store.clone(),
OneOffStoreFileByDigest::new(store.clone(), posix_fs.clone()),
OneOffStoreFileByDigest::new(store.clone(), posix_fs.clone(), immutable),
path_stats.clone(),
))
.unwrap();
Expand Down Expand Up @@ -337,7 +337,7 @@ fn snapshot(
.unwrap();
Snapshot::digest_from_path_stats(
store2.clone(),
OneOffStoreFileByDigest::new(store2, Arc::new(posix_fs)),
OneOffStoreFileByDigest::new(store2, Arc::new(posix_fs), true),
path_stats,
)
.await
Expand Down
32 changes: 30 additions & 2 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ mod snapshot_tests;
pub use crate::snapshot_ops::{SnapshotOps, SnapshotOpsError, StoreWrapper, SubsetParams};

use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Debug;
use std::fs::OpenOptions;
use std::io::Write;
use std::io::{self, Read, Write};
use std::os::unix::fs::OpenOptionsExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
Expand Down Expand Up @@ -359,7 +360,10 @@ impl Store {
}

///
/// Store a file locally.
/// A convenience method for storing a file.
///
/// NB: This method should not be used for large blobs: prefer to stream them from their source
/// using `store_file`.
///
pub async fn store_file_bytes(
&self,
Expand All @@ -372,6 +376,30 @@ impl Store {
.await
}

///
/// Store a file locally by streaming its contents.
///
pub async fn store_file<F, R>(
&self,
initial_lease: bool,
data_is_immutable: bool,
data_provider: F,
) -> Result<Digest, String>
where
R: Read + Debug,
F: Fn() -> Result<R, io::Error> + Send + 'static,
{
self
.local
.store(
EntryType::File,
initial_lease,
data_is_immutable,
data_provider,
)
.await
}

/// Store a digest under a given file path, returning a Snapshot
pub async fn snapshot_of_one_file(
&self,
Expand Down
33 changes: 24 additions & 9 deletions src/rust/engine/fs/store/src/local.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use super::{EntryType, ShrinkBehavior};

use std::collections::BinaryHeap;
use std::fmt::Debug;
use std::io::{self, Read};
use std::path::Path;
use std::sync::Arc;
use std::time::{self, Duration};

use bytes::Bytes;
use bytes::{Buf, Bytes};
use futures::future;
use hashing::{Digest, Fingerprint, EMPTY_DIGEST};
use lmdb::Error::NotFound;
Expand Down Expand Up @@ -250,18 +252,31 @@ impl ByteStore {
bytes: Bytes,
initial_lease: bool,
) -> Result<Digest, String> {
self
.store(entry_type, initial_lease, true, move || {
Ok(bytes.clone().reader())
})
.await
}

pub async fn store<F, R>(
&self,
entry_type: EntryType,
initial_lease: bool,
data_is_immutable: bool,
data_provider: F,
) -> Result<Digest, String>
where
R: Read + Debug,
F: Fn() -> Result<R, io::Error> + Send + 'static,
{
let dbs = match entry_type {
EntryType::Directory => self.inner.directory_dbs.clone(),
EntryType::File => self.inner.file_dbs.clone(),
};
let bytes2 = bytes.clone();
let digest = self
.inner
.executor
.spawn_blocking(move || Digest::of_bytes(&bytes))
.await;
dbs?.store_bytes(digest.hash, bytes2, initial_lease).await?;
Ok(digest)
dbs?
.store(initial_lease, data_is_immutable, data_provider)
.await
}

///
Expand Down
40 changes: 14 additions & 26 deletions src/rust/engine/fs/store/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright 2017 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).

use std::collections::HashMap;
use std::ffi::OsString;
use std::fmt;
use std::iter::Iterator;
Expand Down Expand Up @@ -270,7 +269,7 @@ impl Snapshot {
.map_err(|err| format!("Error expanding globs: {}", err))?;
Snapshot::from_path_stats(
store.clone(),
OneOffStoreFileByDigest::new(store, posix_fs),
OneOffStoreFileByDigest::new(store, posix_fs, true),
path_stats,
)
.await
Expand Down Expand Up @@ -355,48 +354,37 @@ pub trait StoreFileByDigest<Error> {
}

///
/// A StoreFileByDigest which reads with a PosixFS and writes to a Store, with no caching.
/// A StoreFileByDigest which reads immutable files with a PosixFS and writes to a Store, with no
/// caching.
///
#[derive(Clone)]
pub struct OneOffStoreFileByDigest {
store: Store,
posix_fs: Arc<PosixFS>,
immutable: bool,
}

impl OneOffStoreFileByDigest {
pub fn new(store: Store, posix_fs: Arc<PosixFS>) -> OneOffStoreFileByDigest {
OneOffStoreFileByDigest { store, posix_fs }
pub fn new(store: Store, posix_fs: Arc<PosixFS>, immutable: bool) -> OneOffStoreFileByDigest {
OneOffStoreFileByDigest {
store,
posix_fs,
immutable,
}
}
}

impl StoreFileByDigest<String> for OneOffStoreFileByDigest {
fn store_by_digest(&self, file: File) -> future::BoxFuture<'static, Result<Digest, String>> {
let store = self.store.clone();
let posix_fs = self.posix_fs.clone();
let immutable = self.immutable;
let res = async move {
let content = posix_fs
.read_file(&file)
let path = posix_fs.file_path(&file);
store
.store_file(true, immutable, move || std::fs::File::open(&path))
.await
.map_err(move |err| format!("Error reading file {:?}: {:?}", file, err))?;
store.store_file_bytes(content.content, true).await
};
res.boxed()
}
}

#[derive(Clone)]
pub struct StoreManyFileDigests {
pub hash: HashMap<PathBuf, Digest>,
}

impl StoreFileByDigest<String> for StoreManyFileDigests {
fn store_by_digest(&self, file: File) -> future::BoxFuture<'static, Result<Digest, String>> {
future::ready(self.hash.get(&file.path).copied().ok_or_else(|| {
format!(
"Could not find file {} when storing file by digest",
file.path.display()
)
}))
.boxed()
}
}
2 changes: 1 addition & 1 deletion src/rust/engine/fs/store/src/snapshot_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub fn setup() -> (
let dir = tempfile::Builder::new().prefix("root").tempdir().unwrap();
let ignorer = GitignoreStyleExcludes::create(vec![]).unwrap();
let posix_fs = Arc::new(PosixFS::new(dir.path(), ignorer, executor).unwrap());
let file_saver = OneOffStoreFileByDigest::new(store.clone(), posix_fs.clone());
let file_saver = OneOffStoreFileByDigest::new(store.clone(), posix_fs.clone(), true);
(store, dir, posix_fs, file_saver)
}

Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl CommandRunner {
.await?;
Snapshot::from_path_stats(
store.clone(),
OneOffStoreFileByDigest::new(store, posix_fs),
OneOffStoreFileByDigest::new(store, posix_fs, true),
path_stats,
)
.await
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/sharded_lmdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ task_executor = { path = "../task_executor" }
tempfile = "3"

[dev-dependencies]
parking_lot = "0.11"
tokio = { version = "1.4", features = ["macros"] }
Loading