Skip to content

Commit a2fdefd

Browse files
fix: rpc downloader shutdown (#1762)
+ chore: put hard-coded value in importer constant
1 parent 519fc87 commit a2fdefd

File tree

3 files changed

+20
-12
lines changed

3 files changed

+20
-12
lines changed

src/bin/rpc_downloader.rs

+15-10
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use std::sync::Arc;
33

44
use anyhow::anyhow;
55
use anyhow::Context;
6+
use futures::stream;
67
use futures::StreamExt;
7-
use futures::TryStreamExt;
88
use itertools::Itertools;
99
use serde::Deserialize;
1010
use stratus::config::RpcDownloaderConfig;
@@ -14,9 +14,9 @@ use stratus::eth::primitives::Hash;
1414
use stratus::eth::storage::ExternalRpcStorage;
1515
use stratus::ext::not;
1616
use stratus::infra::BlockchainClient;
17-
use stratus::log_and_err;
1817
use stratus::utils::DropTimer;
1918
use stratus::GlobalServices;
19+
use stratus::GlobalState;
2020
#[cfg(all(not(target_env = "msvc"), any(feature = "jemalloc", feature = "jeprof")))]
2121
use tikv_jemallocator::Jemalloc;
2222

@@ -83,7 +83,8 @@ async fn download_balances(rpc_storage: Arc<dyn ExternalRpcStorage>, chain: &Blo
8383
}
8484

8585
async fn download_blocks(rpc_storage: Arc<dyn ExternalRpcStorage>, chain: Arc<BlockchainClient>, paralellism: usize, end: BlockNumber) -> anyhow::Result<()> {
86-
let _timer = DropTimer::start("rpc-downloader::download_blocks");
86+
const TASK_NAME: &str = "rpc-downloader::download_blocks";
87+
let _timer = DropTimer::start(TASK_NAME);
8788

8889
// prepare download block tasks
8990
let mut start = BlockNumber::ZERO;
@@ -99,16 +100,20 @@ async fn download_blocks(rpc_storage: Arc<dyn ExternalRpcStorage>, chain: Arc<Bl
99100

100101
// execute download block tasks
101102
tracing::info!(tasks = %tasks.len(), %paralellism, "executing block downloads");
102-
let result = futures::stream::iter(tasks).buffer_unordered(paralellism).try_collect::<Vec<()>>().await;
103-
match result {
104-
Ok(_) => {
105-
tracing::info!("tasks finished");
106-
Ok(())
103+
104+
let mut stream = stream::iter(tasks).buffered(paralellism);
105+
while let Some(result) = stream.next().await {
106+
if let Err(e) = result {
107+
tracing::error!(reason = ?e, "download task failed");
107108
}
108-
Err(e) => {
109-
log_and_err!(reason = e, "tasks failed")
109+
110+
if GlobalState::is_shutdown_warn(TASK_NAME) {
111+
break;
110112
}
111113
}
114+
115+
tracing::info!("download finished");
116+
Ok(())
112117
}
113118

114119
async fn download(

src/eth/follower/importer/importer.rs

+2
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ impl Importer {
150150
// Executor
151151
// -----------------------------------------------------------------------------
152152

153+
pub const TASKS_COUNT: usize = 3;
154+
153155
// Executes external blocks and persist them to storage.
154156
async fn start_block_executor(
155157
executor: Arc<Executor>,

src/globals.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::alias::JsonValue;
1515
use crate::config;
1616
use crate::config::StratusConfig;
1717
use crate::config::WithCommonConfig;
18+
use crate::eth::follower::importer::Importer;
1819
use crate::eth::rpc::RpcContext;
1920
use crate::ext::spawn_signal_handler;
2021
use crate::infra::tracing::warn_task_cancellation;
@@ -104,7 +105,7 @@ pub static STRATUS_SHUTDOWN_SIGNAL: Lazy<CancellationToken> = Lazy::new(Cancella
104105
static IMPORTER_SHUTDOWN: AtomicBool = AtomicBool::new(true);
105106

106107
/// A guard that is taken when importer is running.
107-
pub static IMPORTER_ONLINE_TASKS_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(3));
108+
pub static IMPORTER_ONLINE_TASKS_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(Importer::TASKS_COUNT));
108109

109110
/// Transaction should be accepted?
110111
static TRANSACTIONS_ENABLED: AtomicBool = AtomicBool::new(true);
@@ -178,7 +179,7 @@ impl GlobalState {
178179
/// Waits till importer is done.
179180
pub async fn wait_for_importer_to_finish() {
180181
// 3 permits will be available when all 3 tasks are finished
181-
let result = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire_many(3).await;
182+
let result = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire_many(Importer::TASKS_COUNT as u32).await;
182183

183184
if let Err(e) = result {
184185
tracing::error!(reason = ?e, "error waiting for importer to finish");

0 commit comments

Comments
 (0)