Skip to content

Commit

Permalink
refactor clear cache, more explicit
Browse files Browse the repository at this point in the history
  • Loading branch information
bertiqwerty committed Dec 27, 2024
1 parent b70b43b commit ed4bb6c
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 177 deletions.
2 changes: 1 addition & 1 deletion rvimage/src/rvlib/cache/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ pub trait Cache<A> {
&mut self,
selected_file_idx: usize,
abs_file_paths: &[&str],
reload: bool,
) -> AsyncResultImage;
fn ls(&self, abs_folder_path: &str) -> RvResult<Vec<String>>;
fn new(args: A) -> RvResult<Self>
where
Self: Sized;
fn size_in_mb(&self) -> f64;
fn clear(&mut self) -> RvResult<()>;
}
186 changes: 121 additions & 65 deletions rvimage/src/rvlib/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,56 +3,92 @@ use std::{collections::HashMap, fmt::Debug, fs, marker::PhantomData, path::Path}
use crate::{
cache::core::Cache,
file_util, image_util,
result::trace_ok_err,
threadpool::ThreadPoolQueued,
types::{AsyncResultImage, ImageInfoPair, ResultImage},
types::{AsyncResultImage, ImageInfoPair},
};
use rvimage_domain::{rverr, to_rv, RvError, RvResult};
use rvimage_domain::{to_rv, RvError, RvResult};

use serde::{Deserialize, Serialize};

use super::ReadImageToCache;

fn copy<F>(path_or_url: &str, reader: F, target: &str) -> RvResult<()>
where
F: Fn(&str) -> ResultImage,
{
let im = reader(path_or_url)?;
im.save(target)
.map_err(|e| rverr!("could not save image to {:?}. {}", target, e.to_string()))?;
Ok(())
}
mod detail {
use std::{
collections::HashMap,
fs,
hash::{DefaultHasher, Hash, Hasher},
path::{Path, PathBuf},
str::FromStr,
};

fn preload<'a, I, RTC, A>(
files: I,
tp: &mut ThreadPoolQueued<RvResult<String>>,
reader: &RTC,
tmpdir: &str,
) -> RvResult<HashMap<String, ThreadResult>>
where
I: Iterator<Item = (usize, &'a str)>,
RTC: ReadImageToCache<A> + Clone + Send + 'static,
{
let delay_ms = 10;
fs::create_dir_all(Path::new(tmpdir)).map_err(to_rv)?;
files
.map(|(prio, file)| {
let dst_file = file_util::filename_in_tmpdir(file, tmpdir)?;
let file_for_thread = file.replace('\\', "/");
let reader_for_thread = reader.clone();
let job = Box::new(move || {
match copy(&file_for_thread, |p| reader_for_thread.read(p), &dst_file) {
Ok(_) => Ok(dst_file),
Err(e) => Err(e),
}
});
Ok((
file.to_string(),
ThreadResult::Running(tp.apply(job, prio, delay_ms)?),
))
})
.collect::<RvResult<HashMap<_, _>>>()
}
use rvimage_domain::{rverr, to_rv, RvResult};

use crate::{
cache::ReadImageToCache, file_util, threadpool::ThreadPoolQueued, types::ResultImage,
};

use super::ThreadResult;

pub(super) fn copy<F>(path_or_url: &str, reader: F, target: &str) -> RvResult<()>
where
F: Fn(&str) -> ResultImage,
{
let im = reader(path_or_url)?;
im.save(target)
.map_err(|e| rverr!("could not save image to {:?}. {}", target, e.to_string()))?;
Ok(())
}

pub(super) fn calculate_hash<T: Hash>(t: &T) -> u64 {
let mut s = DefaultHasher::new();
t.hash(&mut s);
s.finish()
}
pub(super) fn filename_in_tmpdir(path: &str, tmpdir: &str) -> RvResult<String> {
let path_hash = calculate_hash(&path);
let path = PathBuf::from_str(path).unwrap();
let fname = format!(
"{path_hash}_{}",
file_util::osstr_to_str(path.file_name()).map_err(to_rv)?
);
Path::new(tmpdir)
.join(&fname)
.to_str()
.map(|s| s.replace('\\', "/"))
.ok_or_else(|| rverr!("filename_in_tmpdir could not transform {:?} to &str", fname))
}
pub(super) fn preload<'a, I, RTC, A>(
files: I,
tp: &mut ThreadPoolQueued<RvResult<String>>,
reader: &RTC,
tmpdir: &str,
) -> RvResult<HashMap<String, ThreadResult>>
where
I: Iterator<Item = (usize, &'a str)>,
RTC: ReadImageToCache<A> + Clone + Send + 'static,
{
let delay_ms = 10;
fs::create_dir_all(Path::new(tmpdir)).map_err(to_rv)?;
files
.map(|(prio, file)| {
let dst_file = filename_in_tmpdir(file, tmpdir)?;
let file_for_thread = file.replace('\\', "/");
let reader_for_thread = reader.clone();
let job = Box::new(move || {
match copy(&file_for_thread, |p| reader_for_thread.read(p), &dst_file) {
Ok(_) => Ok(dst_file),
Err(e) => Err(e),
}
});
Ok((
file.to_string(),
ThreadResult::Running(tp.apply(job, prio, delay_ms)?),
))
})
.collect::<RvResult<HashMap<_, _>>>()
}
}
#[derive(Debug)]
struct LocalImagePathInfoPair {
path: String,
Expand All @@ -70,6 +106,18 @@ pub struct FileCacheCfgArgs {
pub n_prev_images: usize,
pub n_next_images: usize,
pub n_threads: usize,
#[serde(default)]
pub clear_on_close: bool,
}
impl Default for FileCacheCfgArgs {
fn default() -> Self {
Self {
n_prev_images: 4,
n_next_images: 8,
n_threads: 2,
clear_on_close: true,
}
}
}
#[derive(Clone)]
pub struct FileCacheArgs<RA> {
Expand All @@ -80,11 +128,12 @@ pub struct FileCacheArgs<RA> {

pub struct FileCache<RTC, RA>
where
RTC: ReadImageToCache<RA>,
RTC: ReadImageToCache<RA> + Send + Clone + 'static,
{
cached_paths: HashMap<String, ThreadResult>,
n_prev_images: usize,
n_next_images: usize,
clear_on_close: bool,
tpq: ThreadPoolQueued<RvResult<String>>,
tmpdir: String,
reader: RTC,
Expand All @@ -97,22 +146,10 @@ where
fn ls(&self, folder_path: &str) -> RvResult<Vec<String>> {
self.reader.ls(folder_path)
}
fn load_from_cache(
&mut self,
selected_file_idx: usize,
files: &[&str],
reload: bool,
) -> AsyncResultImage {
fn load_from_cache(&mut self, selected_file_idx: usize, files: &[&str]) -> AsyncResultImage {
if files.is_empty() {
return Err(RvError::new("no files to read from"));
}
if reload {
self.cached_paths.clear();
let tmp_path = Path::new(&self.tmpdir);
if tmp_path.exists() {
fs::remove_dir_all(tmp_path).map_err(to_rv)?;
}
}
let start_idx = if selected_file_idx <= self.n_prev_images {
0
} else {
Expand Down Expand Up @@ -145,7 +182,7 @@ where
let files_not_in_cache = prio_file_pairs
.filter(|(_, file)| !self.cached_paths.contains_key(**file))
.map(|(i, file)| (i, *file));
let cache = preload(
let cache = detail::preload(
files_not_in_cache,
&mut self.tpq,
&self.reader,
Expand Down Expand Up @@ -195,18 +232,29 @@ where
n_prev_images,
n_next_images,
n_threads,
clear_on_close,
} = args.cfg_args;
let tp = ThreadPoolQueued::new(n_threads);
Ok(Self {
cached_paths: HashMap::new(),
n_prev_images,
n_next_images,
clear_on_close,
tpq: tp,
tmpdir: args.tmpdir,
reader: RTC::new(args.reader_args)?,
reader_args_phantom: PhantomData {},
})
}
fn clear(&mut self) -> RvResult<()> {
tracing::info!("clearing cache");
self.cached_paths.clear();
let tmp_path = Path::new(&self.tmpdir);
if tmp_path.exists() {
fs::remove_dir_all(tmp_path).map_err(to_rv)?;
}
Ok(())
}
fn size_in_mb(&self) -> f64 {
let n_bytes: u64 = self
.cached_paths
Expand All @@ -216,8 +264,18 @@ where
_ => None,
})
.sum();
let mb_denominator: f64 = 1024f64.powi(2);
n_bytes as f64 / mb_denominator
const MB_DENOMINATOR: f64 = 1024.0 * 1024.0;
n_bytes as f64 / MB_DENOMINATOR
}
}
impl<RTC, RA> Drop for FileCache<RTC, RA>
where
RTC: ReadImageToCache<RA> + Send + Clone + 'static,
{
fn drop(&mut self) {
if self.clear_on_close {
trace_ok_err(self.clear());
}
}
}

Expand Down Expand Up @@ -261,6 +319,7 @@ fn test_file_cache() -> RvResult<()> {
n_prev_images: 2,
n_next_images: 8,
n_threads: 2,
clear_on_close: true,
},
reader_args: (),
tmpdir: tmpdir_str.to_string(),
Expand All @@ -276,8 +335,7 @@ fn test_file_cache() -> RvResult<()> {
} else {
selected + cache.n_next_images
};
let reload = false;
cache.load_from_cache(selected, files, reload)?;
cache.load_from_cache(selected, files)?;
let n_millis = (max_i - min_i) * 100;
println!("waiting {} millis", n_millis);
thread::sleep(Duration::from_millis(n_millis as u64));
Expand All @@ -289,9 +347,9 @@ fn test_file_cache() -> RvResult<()> {
{
println!(
"filename in tmpdir {:?}",
Path::new(file_util::filename_in_tmpdir(file, tmpdir_str)?.as_str())
Path::new(detail::filename_in_tmpdir(file, tmpdir_str)?.as_str())
);
assert!(Path::new(file_util::filename_in_tmpdir(file, tmpdir_str)?.as_str()).exists());
assert!(Path::new(detail::filename_in_tmpdir(file, tmpdir_str)?.as_str()).exists());
}
Ok(())
};
Expand All @@ -306,9 +364,7 @@ fn test_file_cache() -> RvResult<()> {
test(&files_str, 36)?;
for i in (14..25).chain(34..45) {
let f = format!("{}.png", i);
assert!(
Path::new(file_util::filename_in_tmpdir(f.as_str(), tmpdir_str)?.as_str()).exists()
);
assert!(Path::new(detail::filename_in_tmpdir(f.as_str(), tmpdir_str)?.as_str()).exists());
}
Ok(())
}
10 changes: 4 additions & 6 deletions rvimage/src/rvlib/cache/no_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@ where
reader_args_phantom: PhantomData<RA>,
}
impl<RTC: ReadImageToCache<RA>, RA> Cache<RA> for NoCache<RTC, RA> {
fn load_from_cache(
&mut self,
selected_file_idx: usize,
files: &[&str],
_reload: bool,
) -> AsyncResultImage {
fn load_from_cache(&mut self, selected_file_idx: usize, files: &[&str]) -> AsyncResultImage {
let path = &files[selected_file_idx];
self.reader.read(path).map(|im| {
Some(ImageInfoPair {
Expand All @@ -42,4 +37,7 @@ impl<RTC: ReadImageToCache<RA>, RA> Cache<RA> for NoCache<RTC, RA> {
fn size_in_mb(&self) -> f64 {
0.0
}
fn clear(&mut self) -> RvResult<()> {
Ok(())
}
}
Loading

0 comments on commit ed4bb6c

Please # to comment.