From 407f9498b61e45d224a61441402ed8ba4eb74628 Mon Sep 17 00:00:00 2001 From: Ivan Kalinin Date: Fri, 22 Nov 2024 15:49:09 +0100 Subject: [PATCH] feat(storage): split cells table into data and refs --- storage/src/db/kv_db/mod.rs | 4 +- storage/src/db/kv_db/refcount.rs | 87 ---- storage/src/db/kv_db/tables.rs | 45 +- storage/src/lib.rs | 2 + .../persistent_state/shard_state/writer.rs | 19 +- storage/src/store/shard_state/cell_storage.rs | 486 +++++++++--------- storage/src/store/shard_state/mod.rs | 51 +- .../src/store/shard_state/store_state_raw.rs | 12 +- 8 files changed, 325 insertions(+), 381 deletions(-) delete mode 100644 storage/src/db/kv_db/refcount.rs diff --git a/storage/src/db/kv_db/mod.rs b/storage/src/db/kv_db/mod.rs index 465e1e436..f9c22c024 100644 --- a/storage/src/db/kv_db/mod.rs +++ b/storage/src/db/kv_db/mod.rs @@ -4,7 +4,6 @@ use weedb::{ Caches, MigrationError, Semver, Tables, VersionProvider, WeeDb, WeeDbBuilder, WeeDbRaw, }; -pub mod refcount; pub mod tables; pub trait WeeDbExt: Sized { @@ -60,7 +59,8 @@ weedb::tables! { pub package_entries: tables::PackageEntries, pub block_data_entries: tables::BlockDataEntries, pub shard_states: tables::ShardStates, - pub cells: tables::Cells, + pub cell_data: tables::CellData, + pub cell_refs: tables::CellRefs, pub temp_cells: tables::TempCells, pub block_connections: tables::BlockConnections, pub shards_internal_messages: tables::ShardsInternalMessages, diff --git a/storage/src/db/kv_db/refcount.rs b/storage/src/db/kv_db/refcount.rs deleted file mode 100644 index eb42c0be7..000000000 --- a/storage/src/db/kv_db/refcount.rs +++ /dev/null @@ -1,87 +0,0 @@ -use std::cmp::Ordering; -use std::convert::TryInto; - -use weedb::rocksdb; -use weedb::rocksdb::compaction_filter::Decision; - -pub fn merge_operator( - _key: &[u8], - existing: Option<&[u8]>, - operands: &rocksdb::MergeOperands, -) -> Option> { - let (mut rc, mut payload) = existing.map_or((0, None), decode_value_with_rc); - for (delta, new_payload) in operands.into_iter().map(decode_value_with_rc) { - if payload.is_none() && delta > 0 { - payload = new_payload; - } - rc += delta; - } - - Some(match rc.cmp(&0) { - Ordering::Less => rc.to_le_bytes().to_vec(), - Ordering::Equal => Vec::new(), - Ordering::Greater => { - let payload = payload.unwrap_or(&[]); - let mut result = Vec::with_capacity(RC_BYTES + payload.len()); - result.extend_from_slice(&rc.to_le_bytes()); - result.extend_from_slice(payload); - result - } - }) -} - -pub fn compaction_filter(_level: u32, _key: &[u8], value: &[u8]) -> Decision { - if value.is_empty() { - metrics::counter!("tycho_compaction_removes").increment(1); - Decision::Remove - } else { - metrics::counter!("tycho_compaction_keeps").increment(1); - Decision::Keep - } -} - -pub fn decode_value_with_rc(bytes: &[u8]) -> (RcType, Option<&[u8]>) { - let without_payload = match bytes.len().cmp(&RC_BYTES) { - Ordering::Greater => false, - Ordering::Equal => true, - Ordering::Less => return (0, None), - }; - - let rc = RcType::from_le_bytes(bytes[..RC_BYTES].try_into().unwrap()); - if rc <= 0 || without_payload { - (rc, None) - } else { - (rc, Some(&bytes[RC_BYTES..])) - } -} - -// will be use in persistent storage writer -pub fn strip_refcount(bytes: &[u8]) -> Option<&[u8]> { - if bytes.len() < RC_BYTES { - return None; - } - if RcType::from_le_bytes(bytes[..RC_BYTES].try_into().unwrap()) > 0 { - Some(&bytes[RC_BYTES..]) - } else { - None - } -} - -pub fn add_positive_refount(rc: u32, data: Option<&[u8]>, target: &mut Vec) { - target.extend_from_slice(&RcType::from(rc).to_le_bytes()); - if let Some(data) = data { - target.extend_from_slice(data); - } -} - -pub fn encode_positive_refcount(rc: u32) -> [u8; RC_BYTES] { - RcType::from(rc).to_le_bytes() -} - -pub fn encode_negative_refcount(rc: u32) -> [u8; RC_BYTES] { - (-RcType::from(rc)).to_le_bytes() -} - -type RcType = i64; - -const RC_BYTES: usize = std::mem::size_of::(); diff --git a/storage/src/db/kv_db/tables.rs b/storage/src/db/kv_db/tables.rs index 2a36459fe..c341b11a5 100644 --- a/storage/src/db/kv_db/tables.rs +++ b/storage/src/db/kv_db/tables.rs @@ -5,8 +5,6 @@ use weedb::rocksdb::{ }; use weedb::{rocksdb, Caches, ColumnFamily, ColumnFamilyOptions}; -use super::refcount; - // took from // https://github.com/tikv/tikv/blob/d60c7fb6f3657dc5f3c83b0e3fc6ac75636e1a48/src/config/mod.rs#L170 // todo: need to benchmark and update if it's not optimal @@ -198,21 +196,50 @@ impl ColumnFamilyOptions for ShardStates { } } -/// Stores cells data +/// Stores cell data /// - Key: `[u8; 32]` (cell repr hash) /// - Value: `StorageCell` -pub struct Cells; +pub struct CellData; -impl ColumnFamily for Cells { - const NAME: &'static str = "cells"; +impl ColumnFamily for CellData { + const NAME: &'static str = "cell_data"; } -impl ColumnFamilyOptions for Cells { +impl ColumnFamilyOptions for CellData { fn options(opts: &mut Options, caches: &mut Caches) { opts.set_level_compaction_dynamic_level_bytes(true); - opts.set_merge_operator_associative("cell_merge", refcount::merge_operator); - opts.set_compaction_filter("cell_compaction", refcount::compaction_filter); + optimize_for_level_compaction(opts, ByteSize::gib(1u64)); + + let mut block_factory = BlockBasedOptions::default(); + block_factory.set_block_cache(&caches.block_cache); + block_factory.set_data_block_index_type(DataBlockIndexType::BinaryAndHash); + block_factory.set_whole_key_filtering(true); + block_factory.set_checksum_type(rocksdb::ChecksumType::NoChecksum); + + block_factory.set_bloom_filter(10.0, false); + block_factory.set_block_size(16 * 1024); + block_factory.set_format_version(5); + + opts.set_block_based_table_factory(&block_factory); + opts.set_optimize_filters_for_hits(true); + // option is set for cf + opts.set_compression_type(DBCompressionType::Lz4); + } +} + +/// Stores cell refs +/// - Key: `[u8; 32]` (cell repr hash) +/// - Value: u64 (le) +pub struct CellRefs; + +impl ColumnFamily for CellRefs { + const NAME: &'static str = "cell_refs"; +} + +impl ColumnFamilyOptions for CellRefs { + fn options(opts: &mut rocksdb::Options, caches: &mut Caches) { + opts.set_level_compaction_dynamic_level_bytes(true); optimize_for_level_compaction(opts, ByteSize::gib(1u64)); diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 9c6d9e329..a2f5fce48 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -137,6 +137,8 @@ impl StorageBuilder { temp_file_storage.clone(), self.config.cells_cache_size.as_u64(), )?; + shard_state_storage.preload_cell_refs().await?; + let persistent_state_storage = PersistentStateStorage::new( base_db.clone(), &file_db, diff --git a/storage/src/store/persistent_state/shard_state/writer.rs b/storage/src/store/persistent_state/shard_state/writer.rs index 313bb4419..b1f233b24 100644 --- a/storage/src/store/persistent_state/shard_state/writer.rs +++ b/storage/src/store/persistent_state/shard_state/writer.rs @@ -223,8 +223,8 @@ impl<'a> ShardStateWriter<'a> { let mut file = self.states_dir.unnamed_file().open()?; let raw = self.db.rocksdb().as_ref(); - let read_options = self.db.cells.read_config(); - let cf = self.db.cells.cf(); + let read_options = self.db.cell_data.read_config(); + let cf = self.db.cell_data.cf(); let mut references_buffer = SmallVec::<[[u8; 32]; 4]>::with_capacity(4); @@ -256,18 +256,9 @@ impl<'a> ShardStateWriter<'a> { .get_pinned_cf_opt(&cf, hash, read_options)? .ok_or(CellWriterError::CellNotFound)?; - let value = match crate::refcount::strip_refcount(value.as_ref()) { - Some(bytes) => bytes, - None => { - return Err(CellWriterError::CellNotFound.into()); - } - }; - if value.is_empty() { - return Err(CellWriterError::InvalidCell.into()); - } - - let (descriptor, data) = deserialize_cell(value, &mut references_buffer) - .ok_or(CellWriterError::InvalidCell)?; + let (descriptor, data) = + deserialize_cell(value.as_ref(), &mut references_buffer) + .ok_or(CellWriterError::InvalidCell)?; let mut reference_indices = SmallVec::with_capacity(references_buffer.len()); diff --git a/storage/src/store/shard_state/cell_storage.rs b/storage/src/store/shard_state/cell_storage.rs index cf812549f..6d68c45c2 100644 --- a/storage/src/store/shard_state/cell_storage.rs +++ b/storage/src/store/shard_state/cell_storage.rs @@ -1,16 +1,16 @@ use std::cell::UnsafeCell; use std::collections::hash_map; use std::mem::{ManuallyDrop, MaybeUninit}; -use std::sync::atomic::{AtomicI64, AtomicU8, Ordering}; +use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::{Arc, Weak}; use anyhow::{Context, Result}; use bumpalo::Bump; use everscale_types::cell::*; -use parking_lot::Mutex; use quick_cache::sync::{Cache, DefaultLifecycle}; -use triomphe::ThinArc; use tycho_util::metrics::HistogramGuard; +use tycho_util::progress_bar::ProgressBar; +use tycho_util::sync::CancellationFlag; use tycho_util::{FastDashMap, FastHashMap, FastHasherState}; use weedb::{rocksdb, BoundedCfHandle}; @@ -20,7 +20,6 @@ pub struct CellStorage { db: BaseDb, cells_cache: Arc, raw_cells_cache: RawCellsCache, - pending: PendingOperations, } impl CellStorage { @@ -32,10 +31,49 @@ impl CellStorage { db, cells_cache, raw_cells_cache, - pending: PendingOperations::default(), }) } + pub fn preload_cell_refs(&self, cancelled: CancellationFlag) -> Result { + let mut iter = self.db.cell_refs.raw_iterator(); + iter.seek_to_first(); + + let mut pg = + ProgressBar::builder().build(|msg| tracing::info!("preloading cell refs... {msg}")); + + pg.set_total(u16::MAX as u64 + 1); + + let mut total_cells = 0; + let mut cancelled = cancelled.debounce(10000); + loop { + let (key, value) = match iter.item() { + Some(item) if !cancelled.check() => item, + Some(_) => anyhow::bail!("preload cancelled"), + None => match iter.status() { + Ok(()) => break, + Err(e) => return Err(e.into()), + }, + }; + + let key = HashBytes::from_slice(key); + + if total_cells % 10000 == 0 { + // Interpret highest two bytes as progress + pg.set_progress(u16::from_be_bytes([key[0], key[1]])); + } + + self.raw_cells_cache + .refs_shard(&key) + .insert(key, u64::from_le_bytes(value.try_into().unwrap())); + total_cells += 1; + + iter.next(); + } + + pg.complete(); + Ok(total_cells) + } + pub fn apply_temp_cell(&self, root: &HashBytes) -> Result<()> { const MAX_NEW_CELLS_BATCH_SIZE: usize = 10000; @@ -75,11 +113,11 @@ impl CellStorage { } struct Context<'a> { - cells_cf: BoundedCfHandle<'a>, + cell_data_cf: BoundedCfHandle<'a>, + cell_refs_cf: BoundedCfHandle<'a>, db: &'a BaseDb, - buffer: Vec, transaction: FastHashMap, - new_cells_batch: rocksdb::WriteBatch, + cell_data_batch: rocksdb::WriteBatch, new_cell_count: usize, raw_cache: &'a RawCellsCache, } @@ -87,11 +125,11 @@ impl CellStorage { impl<'a> Context<'a> { fn new(db: &'a BaseDb, raw_cache: &'a RawCellsCache) -> Self { Self { - cells_cf: db.cells.cf(), + cell_data_cf: db.cell_data.cf(), + cell_refs_cf: db.cell_refs.cf(), db, - buffer: Vec::with_capacity(512), transaction: Default::default(), - new_cells_batch: rocksdb::WriteBatch::default(), + cell_data_batch: rocksdb::WriteBatch::default(), new_cell_count: 0, raw_cache, } @@ -137,62 +175,47 @@ impl CellStorage { InsertedCell::Existing } hash_map::Entry::Vacant(entry) => { - if let Some(value) = self.db.cells.get(key)? { - let (rc, value) = refcount::decode_value_with_rc(value.as_ref()); - debug_assert!(rc > 0 && value.is_some() || rc == 0 && value.is_none()); - if value.is_some() { - entry.insert(1); // 1 new reference - return Ok(InsertedCell::Existing); - } - } + entry.insert(1); - entry.insert(0); // 0 new references (the first one is included in the merge below) - let iter = self.load_temp(key)?; + if self.raw_cache.refs_shard(key).contains_key(key) { + return Ok(InsertedCell::Existing); + } - self.buffer.clear(); - refcount::add_positive_refount( - 1, - Some(iter.data.as_ref()), - &mut self.buffer, + let temp_cell = self.load_temp(key)?; + self.cell_data_batch.put_cf( + &self.cell_data_cf, + key, + temp_cell.data.as_ref(), ); - self.raw_cache.add_refs(key, 1); - - self.new_cells_batch - .put_cf(&self.cells_cf, key, self.buffer.as_slice()); - self.new_cell_count += 1; if self.new_cell_count >= MAX_NEW_CELLS_BATCH_SIZE { - self.flush_new_cells()?; + self.flush_cell_data()?; } - InsertedCell::New(iter) + InsertedCell::New(temp_cell) } }) } - fn flush_new_cells(&mut self) -> Result<(), rocksdb::Error> { + fn flush_cell_data(&mut self) -> Result<(), rocksdb::Error> { if self.new_cell_count > 0 { self.db .rocksdb() - .write(std::mem::take(&mut self.new_cells_batch))?; + .write(std::mem::take(&mut self.cell_data_batch))?; self.new_cell_count = 0; } Ok(()) } - fn flush_existing_cells(&mut self) -> Result<(), rocksdb::Error> { + fn flush_cell_refs(&mut self) -> Result<(), rocksdb::Error> { let mut batch = rocksdb::WriteBatch::default(); - for (key, &refs_diff) in &self.transaction { - if refs_diff == 0 { - continue; - } + for (key, &inserts) in &self.transaction { + debug_assert_ne!(inserts, 1); - self.buffer.clear(); - refcount::add_positive_refount(refs_diff, None, &mut self.buffer); - self.raw_cache.add_refs(key, refs_diff); - batch.merge_cf(&self.cells_cf, key, self.buffer.as_slice()); + let rc = self.raw_cache.add_refs(key, inserts); + batch.put_cf(&self.cell_refs_cf, key, rc.to_le_bytes()); } self.db.rocksdb().write(batch) @@ -224,8 +247,8 @@ impl CellStorage { // Clear big chunks of data before finalization drop(stack); - ctx.flush_new_cells()?; - ctx.flush_existing_cells()?; + ctx.flush_cell_data()?; + ctx.flush_cell_refs()?; Ok(()) } @@ -234,9 +257,9 @@ impl CellStorage { &self, batch: &mut rocksdb::WriteBatch, root: Cell, - ) -> Result<(PendingOperation<'_>, usize), CellStorageError> { + ) -> Result { struct CellWithRefs<'a> { - rc: u32, + inserts: u32, data: Option<&'a [u8]>, } @@ -249,47 +272,16 @@ impl CellStorage { } impl Context<'_> { - fn insert_cell( - &mut self, - key: &HashBytes, - cell: &DynCell, - depth: usize, - ) -> Result { + fn insert_cell(&mut self, cell: &DynCell) -> Result { + let key = cell.repr_hash(); + Ok(match self.transaction.entry(*key) { hash_map::Entry::Occupied(mut value) => { - value.get_mut().rc += 1; + value.get_mut().inserts += 1; false } hash_map::Entry::Vacant(entry) => { - // A constant which tells since which depth we should start to use cache. - // This method is used mostly for inserting new states, so we can assume - // that first N levels will mostly be new. - // - // This value was chosen empirically. - const NEW_CELLS_DEPTH_THRESHOLD: usize = 4; - - let (old_rc, has_value) = 'value: { - if depth >= NEW_CELLS_DEPTH_THRESHOLD { - // NOTE: `get` here is used to affect a "hotness" of the value, because - // there is a big chance that we will need it soon during state processing - if let Some(entry) = self.raw_cache.shard(key).get(key) { - let rc = entry.header.header.load(Ordering::Acquire); - break 'value (rc, rc > 0); - } - } - - match self.db.cells.get(key).map_err(CellStorageError::Internal)? { - Some(value) => { - let (rc, value) = - refcount::decode_value_with_rc(value.as_ref()); - (rc, value.is_some()) - } - None => (0, false), - } - }; - - // TODO: lower to `debug_assert` when sure - assert!(has_value && old_rc > 0 || !has_value && old_rc == 0); + let has_value = self.raw_cache.refs_shard(key).contains_key(key); let data = if !has_value { self.buffer.clear(); @@ -300,31 +292,32 @@ impl CellStorage { } else { None }; - entry.insert(CellWithRefs { rc: 1, data }); + entry.insert(CellWithRefs { inserts: 1, data }); + !has_value } }) } - fn finalize(mut self, batch: &mut rocksdb::WriteBatch) -> usize { + fn finalize(self, batch: &mut rocksdb::WriteBatch) -> usize { let total = self.transaction.len(); - let cells_cf = &self.db.cells.cf(); - for (key, CellWithRefs { rc, data }) in self.transaction { - self.buffer.clear(); - refcount::add_positive_refount(rc, data, &mut self.buffer); + + let cell_data_cf = &self.db.cell_data.cf(); + let cell_refs_cf = &self.db.cell_refs.cf(); + + for (key, CellWithRefs { inserts, data }) in self.transaction { if let Some(data) = data { - self.raw_cache.insert(&key, rc, data); - } else { - self.raw_cache.add_refs(&key, rc); + self.raw_cache.insert_value(&key, data); + batch.put_cf(cell_data_cf, key, data); } - batch.merge_cf(cells_cf, key.as_slice(), &self.buffer); + + let rc = self.raw_cache.add_refs(&key, inserts); + batch.put_cf(cell_refs_cf, key, rc.to_le_bytes()); } total } } - let pending_op = self.pending.begin(); - // Prepare context and handles let alloc = Bump::new(); @@ -337,12 +330,9 @@ impl CellStorage { }; // Check root cell - { - let key = root.repr_hash(); - - if !ctx.insert_cell(key, root.as_ref(), 0)? { - return Ok((pending_op, 0)); - } + // TODO: Increase cell count even for the root cell + if !ctx.insert_cell(root.as_ref())? { + return Ok(0); } let mut stack = Vec::with_capacity(16); @@ -350,15 +340,12 @@ impl CellStorage { // Check other cells 'outer: loop { - let depth = stack.len(); let Some(iter) = stack.last_mut() else { break; }; - for child in &mut *iter { - let key = child.repr_hash(); - - if ctx.insert_cell(key, child, depth)? { + for child in iter.by_ref() { + if ctx.insert_cell(child)? { stack.push(child.references()); continue 'outer; } @@ -371,7 +358,7 @@ impl CellStorage { drop(stack); // Write transaction to the `WriteBatch` - Ok((pending_op, ctx.finalize(batch))) + Ok(ctx.finalize(batch)) } pub fn load_cell( @@ -384,19 +371,12 @@ impl CellStorage { return Ok(cell); } - let cell = match self.raw_cells_cache.get_raw(&self.db, hash, &self.pending) { - Ok(value) => 'cell: { - if let Some(value) = value { - let rc = &value.header.header; - if rc.load(Ordering::Acquire) > 0 { - match StorageCell::deserialize(self.clone(), &value.slice) { - Some(cell) => break 'cell Arc::new(cell), - None => return Err(CellStorageError::InvalidCell), - } - } - } - return Err(CellStorageError::CellNotFound); - } + let cell = match self.raw_cells_cache.get_raw(&self.db, hash) { + Ok(Some(value)) => match StorageCell::deserialize(self.clone(), &value) { + Some(cell) => Arc::new(cell), + None => return Err(CellStorageError::InvalidCell), + }, + Ok(None) => return Err(CellStorageError::CellNotFound), Err(e) => return Err(CellStorageError::Internal(e)), }; @@ -412,10 +392,10 @@ impl CellStorage { batch: &mut rocksdb::WriteBatch, alloc: &Bump, hash: &HashBytes, - ) -> Result<(PendingOperation<'_>, usize), CellStorageError> { + ) -> Result { #[derive(Clone, Copy)] struct CellState<'a> { - rc: i64, + rc: u64, removes: u32, refs: &'a [HashBytes], } @@ -423,7 +403,7 @@ impl CellStorage { impl<'a> CellState<'a> { fn remove(&mut self) -> Result, CellStorageError> { self.removes += 1; - if self.removes as i64 <= self.rc { + if self.removes as u64 <= self.rc { Ok(self.next_refs()) } else { Err(CellStorageError::CounterMismatch) @@ -431,7 +411,7 @@ impl CellStorage { } fn next_refs(&self) -> Option<&'a [HashBytes]> { - if self.rc > self.removes as i64 { + if self.rc > self.removes as u64 { None } else { Some(self.refs) @@ -439,60 +419,70 @@ impl CellStorage { } } - let pending_op = self.pending.begin(); - - let cells = &self.db.cells; - let cells_cf = &cells.cf(); - let mut transaction: FastHashMap<&HashBytes, CellState<'_>> = FastHashMap::with_capacity_and_hasher(128, Default::default()); let mut buffer = Vec::with_capacity(4); let mut stack = Vec::with_capacity(16); - stack.push(hash); + stack.push(std::slice::from_ref(hash).iter()); // While some cells left - while let Some(cell_id) = stack.pop() { - let refs = match transaction.entry(cell_id) { - hash_map::Entry::Occupied(mut v) => v.get_mut().remove()?, - hash_map::Entry::Vacant(v) => { - let rc = - self.raw_cells_cache - .get_raw_for_delete(&self.db, cell_id, &mut buffer)?; - debug_assert!(rc > 0); - - v.insert(CellState { - rc, - removes: 1, - refs: alloc.alloc_slice_copy(buffer.as_slice()), - }) - .next_refs() - } + 'outer: loop { + let Some(iter) = stack.last_mut() else { + break; }; - if let Some(refs) = refs { - // Add all children - for cell_id in refs { - // Unknown cell, push to the stack to process it - stack.push(cell_id); + for cell_id in iter.by_ref() { + // Process the current cell + let refs = match transaction.entry(cell_id) { + hash_map::Entry::Occupied(mut v) => v.get_mut().remove()?, + hash_map::Entry::Vacant(v) => { + let rc = self.raw_cells_cache.get_raw_for_delete( + &self.db, + cell_id, + &mut buffer, + )?; + debug_assert!(rc > 0); + + v.insert(CellState { + rc, + removes: 1, + refs: alloc.alloc_slice_copy(buffer.as_slice()), + }) + .next_refs() + } + }; + + if let Some(refs) = refs { + // And proceed to its refs if any + stack.push(refs.iter()); + continue 'outer; } } + + stack.pop(); } // Clear big chunks of data before finalization drop(stack); // Write transaction to the `WriteBatch` + let cell_data_cf = &self.db.cell_data.cf(); + let cell_refs_cf = &self.db.cell_refs.cf(); + let total = transaction.len(); for (key, CellState { removes, .. }) in transaction { - self.raw_cells_cache.remove_refs(key, removes); - batch.merge_cf( - cells_cf, - key.as_slice(), - refcount::encode_negative_refcount(removes), - ); + let rc = self.raw_cells_cache.remove_refs(key, removes); + + if rc == 0 { + batch.delete_cf(cell_data_cf, key); + batch.delete_cf(cell_refs_cf, key); + } else { + batch.put_cf(cell_refs_cf, key, rc.to_le_bytes()); + } } - Ok((pending_op, total)) + + Ok(total) } pub fn drop_cell(&self, hash: &HashBytes) { @@ -895,21 +885,24 @@ impl CellsCache { type CellsCacheShard = FastDashMap>; struct RawCellsCache { - shards: [RawCellsCacheShard; CELL_SHARDS], + data_shards: [CellDataCacheShard; CELL_SHARDS], + refs_shards: [CellRefsCacheShard; CELL_SHARDS], } -type RawCellsCacheShard = Cache; -type RawCellsCacheItem = ThinArc; +type CellDataCacheShard = Cache; +type CellDataCacheItem = Arc<[u8]>; + +type CellRefsCacheShard = FastDashMap; #[derive(Clone, Copy)] pub struct CellSizeEstimator; -impl quick_cache::Weighter for CellSizeEstimator { - fn weight(&self, key: &HashBytes, val: &RawCellsCacheItem) -> u64 { - const STATIC_SIZE: usize = std::mem::size_of::() - + std::mem::size_of::() - + std::mem::size_of::() * 2; // ArcInner refs + HeaderWithLength length +impl quick_cache::Weighter for CellSizeEstimator { + fn weight(&self, _: &HashBytes, val: &CellDataCacheItem) -> u64 { + const STATIC_SIZE: usize = std::mem::size_of::() + + std::mem::size_of::() + + std::mem::size_of::() * 2; // strong + weak refs - let len = key.0.len() + val.slice.len() + STATIC_SIZE; + let len = val.len() + STATIC_SIZE; len as u64 } } @@ -951,7 +944,7 @@ impl RawCellsCache { cell_shard_size = %bytesize::ByteSize(shard_size_in_bytes), ); - let shards = [(); CELL_SHARDS].map(|_| { + let data_shards = [(); CELL_SHARDS].map(|_| { Cache::with( estimated_cell_cache_capacity as usize, shard_size_in_bytes, @@ -961,42 +954,50 @@ impl RawCellsCache { ) }); - Self { shards } + let refs_shards = [(); CELL_SHARDS].map(|_| Default::default()); + + Self { + data_shards, + refs_shards, + } } #[inline(always)] - fn shard(&self, key: &HashBytes) -> &RawCellsCacheShard { - &self.shards[key[0] as usize] + fn shard(&self, key: &HashBytes) -> (&CellDataCacheShard, &CellRefsCacheShard) { + ( + &self.data_shards[key[0] as usize], + &self.refs_shards[key[0] as usize], + ) + } + + #[inline(always)] + fn data_shard(&self, key: &HashBytes) -> &CellDataCacheShard { + &self.data_shards[key[0] as usize] + } + + #[inline(always)] + fn refs_shard(&self, key: &HashBytes) -> &CellRefsCacheShard { + &self.refs_shards[key[0] as usize] } fn get_raw( &self, db: &BaseDb, key: &HashBytes, - pending: &PendingOperations, - ) -> Result, rocksdb::Error> { + ) -> Result, rocksdb::Error> { use quick_cache::sync::GuardResult; - match self.shard(key).get_value_or_guard(key, None) { + match self.data_shard(key).get_value_or_guard(key, None) { GuardResult::Value(value) => Ok(Some(value)), GuardResult::Guard(g) => Ok( if let Some(value) = { let _histogram = HistogramGuard::begin("tycho_storage_get_cell_from_rocksdb_time"); - db.cells.get(key.as_slice())? + db.cell_data.get(key.as_slice())? } { - let (rc, data) = refcount::decode_value_with_rc(value.as_ref()); - data.map(|value| { - let value = - RawCellsCacheItem::from_header_and_slice(AtomicI64::new(rc), value); - - pending.run_if_none(|| { - // Insert value to the cache only if there are no pending operations - _ = g.insert(value.clone()); - }); - - value - }) + let value = CellDataCacheItem::from(value.as_ref()); + _ = g.insert(value.clone()); + Some(value) } else { None }, @@ -1010,88 +1011,71 @@ impl RawCellsCache { db: &BaseDb, key: &HashBytes, refs_buffer: &mut Vec, - ) -> Result { + ) -> Result { refs_buffer.clear(); - // NOTE: `peek` here is used to avoid affecting a "hotness" of the value - if let Some(value) = self.shard(key).peek(key) { - let rc = value.header.header.load(Ordering::Acquire); - if rc <= 0 { - return Err(CellStorageError::CellNotFound); - } + let (data_shard, refs_shard) = self.shard(key); - StorageCell::deserialize_references(&value.slice, refs_buffer) + let Some(rc) = refs_shard.get(key).map(|v| *v) else { + return Err(CellStorageError::CellNotFound); + }; + + // NOTE: `peek` here is used to avoid affecting a "hotness" of the value + if let Some(value) = data_shard.peek(key) { + StorageCell::deserialize_references(&value, refs_buffer) .then_some(rc) .ok_or(CellStorageError::InvalidCell) } else { - match db.cells.get(key.as_slice()) { - Ok(value) => { - if let Some(value) = value { - if let (rc, Some(value)) = refcount::decode_value_with_rc(&value) { - return StorageCell::deserialize_references(value, refs_buffer) - .then_some(rc) - .ok_or(CellStorageError::InvalidCell); - } - } - - Err(CellStorageError::CellNotFound) + match db.cell_data.get(key.as_slice()) { + Ok(Some(value)) => { + return StorageCell::deserialize_references(&value, refs_buffer) + .then_some(rc) + .ok_or(CellStorageError::InvalidCell); } + Ok(None) => Err(CellStorageError::CellNotFound), Err(e) => Err(CellStorageError::Internal(e)), } } } - fn insert(&self, key: &HashBytes, refs: u32, value: &[u8]) { - let value = RawCellsCacheItem::from_header_and_slice(AtomicI64::new(refs as _), value); - self.shard(key).insert(*key, value); + fn insert_value(&self, key: &HashBytes, value: &[u8]) { + self.data_shard(key) + .insert(*key, CellDataCacheItem::from(value)); } - fn add_refs(&self, key: &HashBytes, refs: u32) { - // NOTE: `peek` here is used to avoid affecting a "hotness" of the value - if let Some(v) = self.shard(key).peek(key) { - v.header.header.fetch_add(refs as i64, Ordering::Release); - } - } + /// Adds the specified amount of refs and returns the new value + fn add_refs(&self, key: &HashBytes, inserts: u32) -> u64 { + use dashmap::mapref::entry::Entry; - fn remove_refs(&self, key: &HashBytes, refs: u32) { - // NOTE: `peek` here is used to avoid affecting a "hotness" of the value - if let Some(v) = self.shard(key).peek(key) { - let old_refs = v.header.header.fetch_sub(refs as i64, Ordering::Release); - debug_assert!(old_refs >= refs as i64); + match self.refs_shard(key).entry(*key) { + Entry::Vacant(entry) => { + entry.insert(inserts as _); + inserts as _ + } + Entry::Occupied(mut entry) => { + *entry.get_mut() += inserts as u64; + *entry.get() + } } } -} - -#[derive(Default)] -struct PendingOperations { - // TODO: Replace with two atomic counters for inserts and pending operations - pending_count: Mutex, -} -impl PendingOperations { - fn begin(&self) -> PendingOperation<'_> { - *self.pending_count.lock() += 1; - PendingOperation { operations: self } - } - - #[inline] - fn run_if_none(&self, f: F) { - let guard = self.pending_count.lock(); - if *guard == 0 { - f(); + /// Removes the specified amount of refs and returns the new value + fn remove_refs(&self, key: &HashBytes, removes: u32) -> u64 { + let (data_shard, refs_shard) = self.shard(key); + + let mut new_refs = 0; + let remove_data = refs_shard + .remove_if_mut(key, |_, current_refs| { + *current_refs = current_refs.saturating_sub(removes as _); // TODO: Panic on overflow? + new_refs = *current_refs; + *current_refs == 0 + }) + .is_some(); + + if remove_data { + data_shard.remove(key); } - // NOTE: Make sure to drop the lock only after the operation is executed - drop(guard); - } -} - -pub struct PendingOperation<'a> { - operations: &'a PendingOperations, -} - -impl Drop for PendingOperation<'_> { - fn drop(&mut self) { - *self.operations.pending_count.lock() -= 1; + new_refs } } diff --git a/storage/src/store/shard_state/mod.rs b/storage/src/store/shard_state/mod.rs index 9662a4174..f65109953 100644 --- a/storage/src/store/shard_state/mod.rs +++ b/storage/src/store/shard_state/mod.rs @@ -9,6 +9,7 @@ use everscale_types::prelude::{Cell, HashBytes}; use tycho_block_util::block::*; use tycho_block_util::state::*; use tycho_util::metrics::HistogramGuard; +use tycho_util::sync::CancellationFlag; use weedb::rocksdb; use self::cell_storage::*; @@ -58,6 +59,43 @@ impl ShardStateStorage { })) } + #[tracing::instrument(skip_all)] + pub async fn preload_cell_refs(&self) -> Result<()> { + tracing::info!("started"); + + let cell_storage = self.cell_storage.clone(); + + let cancelled = CancellationFlag::new(); + scopeguard::defer! { + cancelled.cancel(); + } + + let span = tracing::Span::current(); + let cancelled = cancelled.clone(); + + let started_at = Instant::now(); + + tokio::task::spawn_blocking(move || { + let _span = span.enter(); + + let guard = scopeguard::guard((), |_| { + tracing::warn!("cancelled"); + }); + + let total_cells = cell_storage.preload_cell_refs(cancelled)?; + + scopeguard::ScopeGuard::into_inner(guard); + + tracing::info!( + total_cells, + elapsed = %humantime::format_duration(started_at.elapsed()), + "finished" + ); + Ok::<_, anyhow::Error>(()) + }) + .await? + } + pub fn metrics(&self) -> ShardStateStorageMetrics { ShardStateStorageMetrics { max_new_mc_cell_count: self.max_new_mc_cell_count.swap(0, Ordering::AcqRel), @@ -108,7 +146,7 @@ impl ShardStateStorage { let mut batch = rocksdb::WriteBatch::default(); - let (pending_op, new_cell_count) = cell_storage.store_cell(&mut batch, root_cell)?; + let new_cell_count = cell_storage.store_cell(&mut batch, root_cell)?; metrics::histogram!("tycho_storage_cell_count").record(new_cell_count as f64); batch.put_cf(&cf.bound(), block_id.to_vec(), root_hash.as_slice()); @@ -116,9 +154,6 @@ impl ShardStateStorage { let hist = HistogramGuard::begin("tycho_storage_state_update_time"); raw_db.write(batch)?; - // Ensure that pending operation guard is dropped after the batch is written - drop(pending_op); - hist.finish(); let updated = handle.meta().add_flags(BlockFlags::HAS_STATE); @@ -223,16 +258,12 @@ impl ShardStateStorage { let mut batch = rocksdb::WriteBatch::default(); let (total, inner_alloc) = tokio::task::spawn_blocking(move || { - let (pending_op, stats) = - cell_storage.remove_cell(&mut batch, &alloc, &root_hash)?; + let stats = cell_storage.remove_cell(&mut batch, &alloc, &root_hash)?; batch.delete_cf(&db.shard_states.get_unbounded_cf().bound(), key); db.raw() .rocksdb() - .write_opt(batch, db.cells.write_config())?; - - // Ensure that pending operation guard is dropped after the batch is written - drop(pending_op); + .write_opt(batch, db.cell_data.write_config())?; Ok::<_, anyhow::Error>((stats, alloc)) }) diff --git a/storage/src/store/shard_state/store_state_raw.rs b/storage/src/store/shard_state/store_state_raw.rs index dc990b8a3..ed91b269b 100644 --- a/storage/src/store/shard_state/store_state_raw.rs +++ b/storage/src/store/shard_state/store_state_raw.rs @@ -119,7 +119,7 @@ impl StoreStateContext { .open_as_mapped_mut()?; let raw = self.db.rocksdb().as_ref(); - let write_options = self.db.cells.new_write_config(); + let write_options = self.db.temp_cells.new_write_config(); let mut ctx = FinalizationContext::new(&self.db); ctx.clear_temp_cells(&self.db)?; @@ -627,14 +627,10 @@ mod test { let cell = cell_storage.load_cell(&HashBytes::from_slice(value.as_ref()))?; let mut batch = WriteBatch::default(); - let (pending_op, _) = - cell_storage.remove_cell(&mut batch, &bump, cell.hash(LevelMask::MAX_LEVEL))?; + cell_storage.remove_cell(&mut batch, &bump, cell.hash(LevelMask::MAX_LEVEL))?; // execute batch - db.rocksdb().write_opt(batch, db.cells.write_config())?; - - // Ensure that pending operation guard is dropped after the batch is written - drop(pending_op); + db.rocksdb().write_opt(batch, db.cell_data.write_config())?; tracing::info!("State deleted. Progress: {}/{total_states}", deleted + 1); } @@ -643,7 +639,7 @@ mod test { db.trigger_compaction().await; db.trigger_compaction().await; - let cells_left = db.cells.iterator(IteratorMode::Start).count(); + let cells_left = db.cell_data.iterator(IteratorMode::Start).count(); tracing::info!("States GC finished. Cells left: {cells_left}"); assert_eq!(cells_left, 0, "Gc is broken. Press F to pay respect");