Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Introduce Notification block pinning limit #2935

Merged
merged 10 commits into from
Feb 26, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 20 additions & 7 deletions substrate/client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl fmt::Display for UsageInfo {
pub struct UnpinHandleInner<Block: BlockT> {
/// Hash of the block pinned by this handle
hash: Block::Hash,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
}

impl<Block: BlockT> Debug for UnpinHandleInner<Block> {
Expand All @@ -291,20 +291,33 @@ impl<Block: BlockT> UnpinHandleInner<Block> {
/// Create a new [`UnpinHandleInner`]
pub fn new(
hash: Block::Hash,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
) -> Self {
Self { hash, unpin_worker_sender }
}
}

impl<Block: BlockT> Drop for UnpinHandleInner<Block> {
fn drop(&mut self) {
if let Err(err) = self.unpin_worker_sender.unbounded_send(self.hash) {
if let Err(err) =
self.unpin_worker_sender.unbounded_send(UnpinWorkerMessage::Unpin(self.hash))
{
log::debug!(target: "db", "Unable to unpin block with hash: {}, error: {:?}", self.hash, err);
};
}
}

/// Message that signals notification-based pinning actions to the pinning-worker.
///
/// When the notification is dropped, an `Unpin` message should be sent to the worker.
#[derive(Debug)]
pub enum UnpinWorkerMessage<Block: BlockT> {
/// Should be sent when a import or finality notification is created.
AnnouncePin(Block::Hash),
/// Should be sent when a import or finality notification is dropped.
Unpin(Block::Hash),
}

/// Keeps a specific block pinned while the handle is alive.
/// Once the last handle instance for a given block is dropped, the
/// block is unpinned in the [`Backend`](crate::backend::Backend::unpin_block).
Expand All @@ -315,7 +328,7 @@ impl<Block: BlockT> UnpinHandle<Block> {
/// Create a new [`UnpinHandle`]
pub fn new(
hash: Block::Hash,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
) -> UnpinHandle<Block> {
UnpinHandle(Arc::new(UnpinHandleInner::new(hash, unpin_worker_sender)))
}
Expand Down Expand Up @@ -353,7 +366,7 @@ impl<Block: BlockT> BlockImportNotification<Block> {
header: Block::Header,
is_new_best: bool,
tree_route: Option<Arc<sp_blockchain::TreeRoute<Block>>>,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
) -> Self {
Self {
hash,
Expand Down Expand Up @@ -412,7 +425,7 @@ impl<Block: BlockT> FinalityNotification<Block> {
/// Create finality notification from finality summary.
pub fn from_summary(
mut summary: FinalizeSummary<Block>,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
) -> FinalityNotification<Block> {
let hash = summary.finalized.pop().unwrap_or_default();
FinalityNotification {
Expand All @@ -436,7 +449,7 @@ impl<Block: BlockT> BlockImportNotification<Block> {
/// Create finality notification from finality summary.
pub fn from_summary(
summary: ImportSummary<Block>,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
) -> BlockImportNotification<Block> {
let hash = summary.hash;
BlockImportNotification {
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2524,7 +2524,7 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
self.storage.state_db.pin(&hash, number.saturated_into::<u64>(), hint).map_err(
|_| {
sp_blockchain::Error::UnknownBlock(format!(
"State already discarded for `{:?}`",
"Unable to pin: state already discarded for `{:?}`",
hash
))
},
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/db/src/pinned_blocks_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use schnellru::{Limiter, LruMap};
use sp_runtime::{traits::Block as BlockT, Justifications};

const LOG_TARGET: &str = "db::pin";
const PINNING_CACHE_SIZE: usize = 1024;
const PINNING_CACHE_SIZE: usize = 2048;

/// Entry for pinned blocks cache.
struct PinnedBlockCacheEntry<Block: BlockT> {
Expand Down
1 change: 1 addition & 0 deletions substrate/client/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ tokio = { version = "1.22.0", features = ["parking_lot", "rt-multi-thread", "tim
tempfile = "3.1.0"
directories = "5.0.1"
static_init = "1.0.3"
schnellru = "0.2.1"

[dev-dependencies]
substrate-test-runtime-client = { path = "../../test-utils/runtime/client" }
Expand Down
54 changes: 30 additions & 24 deletions substrate/client/service/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
//! Substrate Client

use super::block_rules::{BlockRules, LookupResult as BlockLookupResult};
use futures::{FutureExt, StreamExt};
use log::{error, info, trace, warn};
use log::{debug, info, trace, warn};
use parking_lot::{Mutex, RwLock};
use prometheus_endpoint::Registry;
use rand::Rng;
Expand All @@ -38,7 +37,7 @@ use sc_client_api::{
execution_extensions::ExecutionExtensions,
notifications::{StorageEventStream, StorageNotifications},
CallExecutor, ExecutorProvider, KeysIter, OnFinalityAction, OnImportAction, PairsIter,
ProofProvider, UsageProvider,
ProofProvider, UnpinWorkerMessage, UsageProvider,
};
use sc_consensus::{
BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult, StateAction,
Expand Down Expand Up @@ -114,7 +113,7 @@ where
block_rules: BlockRules<Block>,
config: ClientConfig<Block>,
telemetry: Option<TelemetryHandle>,
unpin_worker_sender: TracingUnboundedSender<Block::Hash>,
unpin_worker_sender: TracingUnboundedSender<UnpinWorkerMessage<Block>>,
_phantom: PhantomData<RA>,
}

Expand Down Expand Up @@ -326,19 +325,35 @@ where
// dropped, the block will be unpinned automatically.
if let Some(ref notification) = finality_notification {
if let Err(err) = self.backend.pin_block(notification.hash) {
error!(
debug!(
"Unable to pin block for finality notification. hash: {}, Error: {}",
notification.hash, err
);
};
} else {
let _ = self
.unpin_worker_sender
.unbounded_send(UnpinWorkerMessage::AnnouncePin(notification.hash))
.map_err(|e| {
log::debug!(
"Unable to send AnnouncePin worker message for finality: {e}"
)
});
}
}

if let Some(ref notification) = import_notification {
if let Err(err) = self.backend.pin_block(notification.hash) {
error!(
debug!(
"Unable to pin block for import notification. hash: {}, Error: {}",
notification.hash, err
);
} else {
let _ = self
.unpin_worker_sender
.unbounded_send(UnpinWorkerMessage::AnnouncePin(notification.hash))
.map_err(|e| {
log::debug!("Unable to send AnnouncePin worker message for import: {e}")
});
};
}

Expand Down Expand Up @@ -416,25 +431,16 @@ where
backend.commit_operation(op)?;
}

let (unpin_worker_sender, mut rx) =
tracing_unbounded::<Block::Hash>("unpin-worker-channel", 10_000);
let (unpin_worker_sender, rx) = tracing_unbounded::<UnpinWorkerMessage<Block>>(
"notification-pinning-worker-channel",
10_000,
);
let task_backend = Arc::downgrade(&backend);
spawn_handle.spawn(
"unpin-worker",
None,
async move {
while let Some(message) = rx.next().await {
if let Some(backend) = task_backend.upgrade() {
backend.unpin_block(message);
} else {
log::debug!("Terminating unpin-worker, backend reference was dropped.");
return
}
}
log::debug!("Terminating unpin-worker, stream terminated.")
}
.boxed(),
let unpin_worker = crate::client::notification_pinning::NotificationPinningWorker::new(
rx,
task_backend.clone(),
);
spawn_handle.spawn("notification-pinning-worker", None, Box::pin(unpin_worker.work()));

Ok(Client {
backend,
Expand Down
1 change: 1 addition & 0 deletions substrate/client/service/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
mod block_rules;
mod call_executor;
mod client;
mod notification_pinning;
mod wasm_override;
mod wasm_substitutes;

Expand Down
Loading
Loading