Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat(rpc): reduce verbosity in subscriber notification logs #2028

Merged
merged 7 commits into from
Feb 26, 2025
111 changes: 88 additions & 23 deletions src/eth/rpc/rpc_subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,33 @@ impl RpcSubscriptions {
};

let interested_subs = subs.pending_txs.read().await;
let interested_subs = interested_subs.values().collect_vec();
let subscribers = interested_subs.values().collect_vec();

tracing::info!(
tx_hash = ?tx_hash,
subscribers = ?interested_subs,
"notifying subscribers about new pending transaction"
);
if !subscribers.is_empty() {
// Group clients by type and count how many of each type
let mut client_type_counts = std::collections::HashMap::new();

Self::notify(interested_subs, tx_hash.to_string());
for sub in &subscribers {
let client_str = sub.client.to_string();
let client_type = Self::get_client_type(&client_str);

*client_type_counts.entry(client_type).or_insert(0) += 1;
}

// Format the log message
let mut client_summary = Vec::new();
for (client_type, count) in client_type_counts.iter() {
client_summary.push(format!("{}: {}", client_type, count));
}

tracing::info!(
tx_hash = ?tx_hash,
clients = ?client_summary.join(", "),
"notifying subscribers about new pending transaction"
);
}

Self::notify(subscribers, tx_hash.to_string());
}
warn_task_rx_closed(TASK_NAME);
Ok(())
Expand All @@ -202,16 +220,34 @@ impl RpcSubscriptions {
};

let interested_subs = subs.new_heads.read().await;
let interested_subs = interested_subs.values().collect_vec();
let subscribers = interested_subs.values().collect_vec();

if !subscribers.is_empty() {
// Group clients by type and count how many of each type
let mut client_type_counts = std::collections::HashMap::new();

for sub in &subscribers {
let client_str = sub.client.to_string();
let client_type = Self::get_client_type(&client_str);

*client_type_counts.entry(client_type).or_insert(0) += 1;
}

// Format the log message
let mut client_summary = Vec::new();
for (client_type, count) in client_type_counts.iter() {
client_summary.push(format!("{}: {}", client_type, count));
}

tracing::info!(
block_number = ?block_header.number,
block_hash = ?block_header.hash,
subscribers = ?interested_subs,
"notifying subscribers about new block"
);
tracing::info!(
block_number = ?block_header.number,
block_hash = ?block_header.hash,
clients = ?client_summary.join(", "),
"notifying subscribers about new block"
);
}

Self::notify(interested_subs, block_header);
Self::notify(subscribers, block_header);
}
warn_task_rx_closed(TASK_NAME);
Ok(())
Expand All @@ -234,20 +270,38 @@ impl RpcSubscriptions {
};

let interested_subs = subs.logs.read().await;
let interested_subs = interested_subs
let matching_subscribers = interested_subs
.values()
.flat_map(HashMap::values)
.filter_map(|s| if_else!(s.filter.matches(&log), Some(&s.inner), None))
.collect_vec();

tracing::info!(
log_block_number = ?log.block_number,
log_tx_hash = ?log.transaction_hash,
subscribers = ?interested_subs,
"notifying subscribers about new logs"
);
if !matching_subscribers.is_empty() {
// Group clients by type and count how many of each type
let mut client_type_counts = std::collections::HashMap::new();

for sub in &matching_subscribers {
let client_str = sub.client.to_string();
let client_type = Self::get_client_type(&client_str);

*client_type_counts.entry(client_type).or_insert(0) += 1;
}

// Format the log message
let mut client_summary = Vec::new();
for (client_type, count) in client_type_counts.iter() {
client_summary.push(format!("{}: {}", client_type, count));
}

Self::notify(interested_subs, log);
tracing::info!(
log_block_number = ?log.block_number,
log_tx_hash = ?log.transaction_hash,
clients = ?client_summary.join(", "),
"notifying subscribers about new logs"
);
}

Self::notify(matching_subscribers, log);
}
warn_task_rx_closed(TASK_NAME);
Ok(())
Expand Down Expand Up @@ -293,6 +347,17 @@ impl RpcSubscriptions {
});
}
}

fn get_client_type(client_str: &str) -> String {
// Split the client string by "::" and take the first part
if let Some(index) = client_str.find("::") {
// Extract the type part (before the first "::")
client_str[..index].to_string()
} else {
// If there's no "::", return "other"
"other".to_string()
}
}
}

// -----------------------------------------------------------------------------
Expand Down
Loading