Skip to content

Commit

Permalink
Add content validation result metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
njgheorghita committed Sep 15, 2023
1 parent 89e5a52 commit ba4e672
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
21 changes: 21 additions & 0 deletions portalnet/src/metrics/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct OverlayMetrics {
message_count: IntCounterVec,
utp_outcome_count: IntCounterVec,
utp_active_count: IntGaugeVec,
validation_count: IntCounterVec,
}

impl OverlayMetrics {
Expand Down Expand Up @@ -58,11 +59,22 @@ impl OverlayMetrics {
utp_active_count_labels,
);

let validation_count_options = opts!(
"trin_validation_total",
"count all content validations successful and failed"
);
let validation_count_labels = &["protocol", "success"];
let validation_count = OverlayMetrics::register_counter_metric(
validation_count_options,
validation_count_labels,
);

Self {
protocol: protocol.into(),
message_count,
utp_outcome_count,
utp_active_count,
validation_count,
}
}

Expand Down Expand Up @@ -131,6 +143,15 @@ impl OverlayMetrics {
self.utp_active_count.with_label_values(&labels).dec();
}

//
// Validations
//
pub fn report_validation(&self, success: bool) {
let success = success.to_string();
let labels: [&str; 2] = [self.protocol.into(), success.as_str()];
self.validation_count.with_label_values(&labels).inc();
}

fn register_counter_metric(options: Opts, labels: &[&str]) -> IntCounterVec {
// Register the metric with the default registry, or if that fails, register with a
// newly-created registry.
Expand Down
12 changes: 12 additions & 0 deletions portalnet/src/overlay_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ where
let store = self.store.clone();
let kbuckets = self.kbuckets.clone();
let command_tx = self.command_tx.clone();
let metrics = self.metrics.clone();
tokio::spawn(async move {
Self::process_received_content(
kbuckets,
Expand All @@ -798,6 +799,7 @@ where
callback,
query_info.trace,
nodes_to_poke,
metrics,
)
.await;
});
Expand Down Expand Up @@ -874,6 +876,7 @@ where
);

let trace = query_info.trace;
let metrics = metrics.clone();
Self::process_received_content(
kbuckets,
command_tx,
Expand All @@ -885,6 +888,7 @@ where
callback,
trace,
nodes_to_poke,
metrics,
)
.await;
});
Expand Down Expand Up @@ -1258,6 +1262,7 @@ where
if let Err(err) = Self::process_accept_utp_payload(
validator,
store,
metrics,
kbuckets,
command_tx,
content_keys,
Expand Down Expand Up @@ -1572,6 +1577,7 @@ where
async fn process_accept_utp_payload(
validator: Arc<TValidator>,
store: Arc<RwLock<TStore>>,
metrics: Arc<OverlayMetrics>,
kbuckets: Arc<RwLock<KBucketsTable<NodeId, Node>>>,
command_tx: UnboundedSender<OverlayCommand<TContentKey>>,
content_keys: Vec<TContentKey>,
Expand Down Expand Up @@ -1599,20 +1605,23 @@ where
// - Propagate all validated content
let validator = Arc::clone(&validator);
let store = Arc::clone(&store);
let metrics = Arc::clone(&metrics);
tokio::spawn(async move {
// Validated received content
if let Err(err) = validator
.validate_content(&key, &content_value.to_vec())
.await
{
// Skip storing & propagating content if it's not valid
metrics.report_validation(false);
warn!(
error = %err,
content.key = %key.to_hex(),
"Error validating accepted content"
);
return None;
}
metrics.report_validation(true);

// Check if data should be stored, and store if true.
let key_desired = store.read().is_key_within_radius_and_unavailable(&key);
Expand Down Expand Up @@ -1808,6 +1817,7 @@ where
responder: Option<oneshot::Sender<RecursiveFindContentResult>>,
trace: Option<QueryTrace>,
nodes_to_poke: Vec<NodeId>,
metrics: Arc<OverlayMetrics>,
) {
let mut content = content;
// Operate under assumption that all content in the store is valid
Expand All @@ -1819,6 +1829,7 @@ where
} else {
let content_id = content_key.content_id();
if let Err(err) = validator.validate_content(&content_key, &content).await {
metrics.report_validation(false);
warn!(
error = ?err,
content.id = %hex_encode_compact(content_id),
Expand All @@ -1830,6 +1841,7 @@ where
}
return;
};
metrics.report_validation(true);

// skip storing if the content is already stored
// or if there's an error reading the store
Expand Down

0 comments on commit ba4e672

Please # to comment.