Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add content validation result metrics #796

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion portalnet/src/metrics/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct OverlayMetrics {
message_count: IntCounterVec,
utp_outcome_count: IntCounterVec,
utp_active_count: IntGaugeVec,
validation_count: IntCounterVec,
}

impl OverlayMetrics {
Expand Down Expand Up @@ -58,11 +59,22 @@ impl OverlayMetrics {
utp_active_count_labels,
);

let validation_count_options = opts!(
"trin_validation_total",
"count all content validations successful and failed"
);
let validation_count_labels = &["protocol", "success"];
let validation_count = OverlayMetrics::register_counter_metric(
validation_count_options,
validation_count_labels,
);

Self {
protocol: protocol.into(),
message_count,
utp_outcome_count,
utp_active_count,
validation_count,
}
}

Expand Down Expand Up @@ -131,6 +143,22 @@ impl OverlayMetrics {
self.utp_active_count.with_label_values(&labels).dec();
}

//
// Validations
//
/// Returns the value of the given metric with the specified labels.
pub fn validation_count_by_outcome(&self, outcome: bool) -> u64 {
let outcome = outcome.to_string();
let labels = [self.protocol.into(), outcome.as_str()];
self.validation_count.with_label_values(&labels).get()
}

pub fn report_validation(&self, success: bool) {
let success = success.to_string();
let labels: [&str; 2] = [self.protocol.into(), success.as_str()];
self.validation_count.with_label_values(&labels).inc();
}

fn register_counter_metric(options: Opts, labels: &[&str]) -> IntCounterVec {
// Register the metric with the default registry, or if that fails, register with a
// newly-created registry.
Expand Down Expand Up @@ -216,12 +244,16 @@ impl OverlayMetrics {
pub fn get_message_summary(&self) -> String {
// for every offer you made, how many accepts did you receive
// for every offer you received, how many accepts did you make
let successful_validations = self.validation_count_by_outcome(true);
let failed_validations = self.validation_count_by_outcome(false);
format!(
"offers={}/{}, accepts={}/{}",
"offers={}/{}, accepts={}/{}, validations={}/{}",
self.message_count_by_labels(MessageDirectionLabel::Received, MessageLabel::Accept),
self.message_count_by_labels(MessageDirectionLabel::Sent, MessageLabel::Offer),
self.message_count_by_labels(MessageDirectionLabel::Sent, MessageLabel::Accept),
self.message_count_by_labels(MessageDirectionLabel::Received, MessageLabel::Offer),
successful_validations,
successful_validations + failed_validations,
)
}
}
33 changes: 23 additions & 10 deletions portalnet/src/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,11 +447,7 @@ where
Ok(Response::Content(found_content)) => {
match found_content {
Content::Content(content) => {
match self
.validator
.validate_content(&content_key, &content)
.await
{
match self.validate_content(&content_key, &content).await {
Ok(_) => Ok((Content::Content(content), false)),
Err(msg) => Err(OverlayRequestError::FailedValidation(format!(
"Network: {:?}, Reason: {msg:?}",
Expand All @@ -464,11 +460,7 @@ where
Content::ConnectionId(conn_id) => {
let conn_id = u16::from_be(conn_id);
let content = self.init_find_content_stream(enr, conn_id).await?;
match self
.validator
.validate_content(&content_key, &content)
.await
{
match self.validate_content(&content_key, &content).await {
Ok(_) => Ok((Content::Content(content), true)),
Err(msg) => Err(OverlayRequestError::FailedValidation(format!(
"Network: {:?}, Reason: {msg:?}",
Expand All @@ -483,6 +475,27 @@ where
}
}

async fn validate_content(
&self,
content_key: &TContentKey,
content: &[u8],
) -> anyhow::Result<()> {
match self.validator.validate_content(content_key, content).await {
Ok(_) => {
self.metrics.report_validation(true);
Ok(())
}
Err(msg) => {
self.metrics.report_validation(false);
Err(anyhow!(
"Content validation failed for content key {:?} with error: {:?}",
content_key,
msg
))
}
}
}

/// Initialize FindContent uTP stream with remote node
async fn init_find_content_stream(
&self,
Expand Down
12 changes: 12 additions & 0 deletions portalnet/src/overlay_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ where
let store = self.store.clone();
let kbuckets = self.kbuckets.clone();
let command_tx = self.command_tx.clone();
let metrics = self.metrics.clone();
tokio::spawn(async move {
Self::process_received_content(
kbuckets,
Expand All @@ -798,6 +799,7 @@ where
callback,
query_info.trace,
nodes_to_poke,
metrics,
)
.await;
});
Expand Down Expand Up @@ -874,6 +876,7 @@ where
);

let trace = query_info.trace;
let metrics = metrics.clone();
Self::process_received_content(
kbuckets,
command_tx,
Expand All @@ -885,6 +888,7 @@ where
callback,
trace,
nodes_to_poke,
metrics,
)
.await;
});
Expand Down Expand Up @@ -1258,6 +1262,7 @@ where
if let Err(err) = Self::process_accept_utp_payload(
validator,
store,
metrics,
kbuckets,
command_tx,
content_keys,
Expand Down Expand Up @@ -1572,6 +1577,7 @@ where
async fn process_accept_utp_payload(
validator: Arc<TValidator>,
store: Arc<RwLock<TStore>>,
metrics: Arc<OverlayMetrics>,
kbuckets: Arc<RwLock<KBucketsTable<NodeId, Node>>>,
command_tx: UnboundedSender<OverlayCommand<TContentKey>>,
content_keys: Vec<TContentKey>,
Expand Down Expand Up @@ -1599,20 +1605,23 @@ where
// - Propagate all validated content
let validator = Arc::clone(&validator);
let store = Arc::clone(&store);
let metrics = Arc::clone(&metrics);
tokio::spawn(async move {
// Validated received content
if let Err(err) = validator
.validate_content(&key, &content_value.to_vec())
.await
{
// Skip storing & propagating content if it's not valid
metrics.report_validation(false);
warn!(
error = %err,
content.key = %key.to_hex(),
"Error validating accepted content"
);
return None;
}
metrics.report_validation(true);

// Check if data should be stored, and store if true.
let key_desired = store.read().is_key_within_radius_and_unavailable(&key);
Expand Down Expand Up @@ -1808,6 +1817,7 @@ where
responder: Option<oneshot::Sender<RecursiveFindContentResult>>,
trace: Option<QueryTrace>,
nodes_to_poke: Vec<NodeId>,
metrics: Arc<OverlayMetrics>,
) {
let mut content = content;
// Operate under assumption that all content in the store is valid
Expand All @@ -1819,6 +1829,7 @@ where
} else {
let content_id = content_key.content_id();
if let Err(err) = validator.validate_content(&content_key, &content).await {
metrics.report_validation(false);
warn!(
error = ?err,
content.id = %hex_encode_compact(content_id),
Expand All @@ -1830,6 +1841,7 @@ where
}
return;
};
metrics.report_validation(true);

// skip storing if the content is already stored
// or if there's an error reading the store
Expand Down