From aeaa398edd2276986919e6a5b9094358e86fef76 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Fri, 14 Aug 2020 15:34:10 +0200 Subject: [PATCH 1/7] Add support for sourced metrics. A sourced metric is a metric that obtains its values from an existing source, rather than the values being independently recorded. It thus allows collecting metrics from existing counters or gauges without having to duplicate them in a dedicated prometheus counter or gauge (and hence another atomic value). The first use-case is to feed the bandwidth counters from libp2p directly into prometheus. --- client/network/src/service.rs | 57 ++++++++----- utils/prometheus/src/lib.rs | 3 + utils/prometheus/src/sourced.rs | 138 ++++++++++++++++++++++++++++++++ 3 files changed, 180 insertions(+), 18 deletions(-) create mode 100644 utils/prometheus/src/sourced.rs diff --git a/client/network/src/service.rs b/client/network/src/service.rs index cc3821a455e9f..f2cd8db19acd7 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -53,6 +53,7 @@ use parking_lot::Mutex; use prometheus_endpoint::{ register, Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry, U64, + SourcedCounter, MetricSource }; use sc_peerset::PeersetHandle; use sp_consensus::import_queue::{BlockImportError, BlockImportResult, ImportQueue, Link}; @@ -240,12 +241,6 @@ impl NetworkWorker { local_peer_id_legacy ); - // Initialize the metrics. - let metrics = match ¶ms.metrics_registry { - Some(registry) => Some(Metrics::register(®istry)?), - None => None - }; - let checker = params.on_demand.as_ref() .map(|od| od.checker().clone()) .unwrap_or_else(|| Arc::new(AlwaysBadChecker)); @@ -353,6 +348,17 @@ impl NetworkWorker { (builder.build(), bandwidth) }; + // Initialize the metrics. + let metrics = match ¶ms.metrics_registry { + Some(registry) => { + // External metrics. + Metrics::register_bandwidth(registry, bandwidth.clone())?; + // Other metrics. + Some(Metrics::register(registry)?) + } + None => None + }; + // Listen on multiaddresses. for addr in ¶ms.network_config.listen_addresses { if let Err(err) = Swarm::::listen_on(&mut swarm, addr.clone()) { @@ -1148,9 +1154,6 @@ struct Metrics { kbuckets_num_nodes: GaugeVec, listeners_local_addresses: Gauge, listeners_errors_total: Counter, - // Note: `network_bytes_total` is a monotonic gauge obtained by - // sampling an existing counter. - network_bytes_total: GaugeVec, notifications_sizes: HistogramVec, notifications_streams_closed_total: CounterVec, notifications_streams_opened_total: CounterVec, @@ -1164,7 +1167,34 @@ struct Metrics { requests_out_started_total: CounterVec, } +/// The source for bandwidth metrics. +#[derive(Clone)] +struct BandwidthSource(Arc); + +impl MetricSource for BandwidthSource { + type N = u64; + + fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) { + set(&[&"in"], self.0.total_inbound()); + set(&[&"out"], self.0.total_outbound()); + } +} + impl Metrics { + fn register_bandwidth(registry: &Registry, sinks: Arc) + -> Result<(), PrometheusError> + { + register(SourcedCounter::new( + &Opts::new( + "sub_libp2p_network_bytes_total", + "Total bandwidth usage" + ).variable_label("direction"), + BandwidthSource(sinks), + )?, registry)?; + + Ok(()) + } + fn register(registry: &Registry) -> Result { Ok(Self { // This list is ordered alphabetically @@ -1267,13 +1297,6 @@ impl Metrics { "sub_libp2p_listeners_errors_total", "Total number of non-fatal errors reported by a listener" )?, registry)?, - network_bytes_total: register(GaugeVec::new( - Opts::new( - "sub_libp2p_network_bytes_total", - "Total bandwidth usage" - ), - &["direction"] - )?, registry)?, notifications_sizes: register(HistogramVec::new( HistogramOpts { common_opts: Opts::new( @@ -1721,8 +1744,6 @@ impl Future for NetworkWorker { this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed); if let Some(metrics) = this.metrics.as_ref() { - metrics.network_bytes_total.with_label_values(&["in"]).set(this.service.bandwidth.total_inbound()); - metrics.network_bytes_total.with_label_values(&["out"]).set(this.service.bandwidth.total_outbound()); metrics.is_major_syncing.set(is_major_syncing as u64); for (proto, num_entries) in this.network_service.num_kbuckets_entries() { let proto = maybe_utf8_bytes_to_string(proto.as_bytes()); diff --git a/utils/prometheus/src/lib.rs b/utils/prometheus/src/lib.rs index 9030704cb746f..be7050a8a0736 100644 --- a/utils/prometheus/src/lib.rs +++ b/utils/prometheus/src/lib.rs @@ -31,6 +31,9 @@ use std::net::SocketAddr; #[cfg(not(target_os = "unknown"))] mod networking; +mod sourced; + +pub use sourced::{SourcedCounter, SourcedGauge, MetricSource}; #[cfg(target_os = "unknown")] pub use unknown_os::init_prometheus; diff --git a/utils/prometheus/src/sourced.rs b/utils/prometheus/src/sourced.rs new file mode 100644 index 0000000000000..174bfbe3dd240 --- /dev/null +++ b/utils/prometheus/src/sourced.rs @@ -0,0 +1,138 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Metrics that are collected from existing sources. + +use prometheus::core::{Collector, Desc, Describer, Number, Opts}; +use prometheus::proto; +use std::{cmp::Ordering, marker::PhantomData}; + +/// A counter whose values are obtained from an existing source. +pub type SourcedCounter = SourcedMetric; + +/// A gauge whose values are obtained from an existing source. +pub type SourcedGauge = SourcedMetric; + +/// The type of a sourced counter. +#[derive(Copy, Clone)] +pub enum Counter {} + +/// The type of a sourced gauge. +#[derive(Copy, Clone)] +pub enum Gauge {} + +/// A metric whose values are obtained from an existing source, +/// instead of being independently recorded. +#[derive(Debug, Clone)] +pub struct SourcedMetric { + source: S, + desc: Desc, + _type: PhantomData, +} + +/// A source of values for a [`SourcedMetric`]. +pub trait MetricSource: Sync + Send + Clone { + /// The type of the collected values. + type N: Number; + /// Collects the current values of the metrics from the source. + fn collect(&self, set: impl FnMut(&[&str], Self::N)); +} + +impl SourcedMetric { + /// Creates a new metric that obtains its values from the given source. + pub fn new(opts: &Opts, source: S) -> prometheus::Result { + let desc = opts.describe()?; + Ok(Self { source, desc, _type: PhantomData }) + } +} + +impl Collector for SourcedMetric { + fn desc(&self) -> Vec<&Desc> { + vec![&self.desc] + } + + fn collect(&self) -> Vec { + let mut counters = Vec::new(); + + self.source.collect(|label_values, value| { + let mut m = proto::Metric::default(); + + match T::proto() { + proto::MetricType::COUNTER => { + let mut c = proto::Counter::default(); + c.set_value(value.into_f64()); + m.set_counter(c); + } + proto::MetricType::GAUGE => { + let mut g = proto::Gauge::default(); + g.set_value(value.into_f64()); + m.set_gauge(g); + } + t => { + log::error!("Unsupported sourced metric type: {:?}", t); + } + } + + match self.desc.variable_labels.len().cmp(&label_values.len()) { + Ordering::Greater => + log::warn!("Missing label values for sourced metric {}", self.desc.fq_name), + Ordering::Less => + log::warn!("Too many label values for sourced metric {}", self.desc.fq_name), + Ordering::Equal => {} + } + + m.set_label(self.desc.variable_labels.iter().zip(label_values) + .map(|(l_name, l_value)| { + let mut l = proto::LabelPair::default(); + l.set_name(l_name.to_string()); + l.set_value(l_value.to_string()); + l + }) + .chain(self.desc.const_label_pairs.iter().cloned()) + .collect::>()); + + counters.push(m); + }); + + let mut m = proto::MetricFamily::default(); + m.set_name(self.desc.fq_name.clone()); + m.set_help(self.desc.help.clone()); + m.set_field_type(T::proto()); + m.set_metric(counters); + + vec![m] + } +} + +/// Types of metrics that can obtain their values from an existing source. +pub trait SourcedType: private::Sealed + Sync + Send { + #[doc(hidden)] + fn proto() -> proto::MetricType; +} + +impl SourcedType for Counter { + fn proto() -> proto::MetricType { proto::MetricType::COUNTER } +} + +impl SourcedType for Gauge { + fn proto() -> proto::MetricType { proto::MetricType::GAUGE } +} + +mod private { + pub trait Sealed {} + impl Sealed for super::Counter {} + impl Sealed for super::Gauge {} +} From b26aee33c9173945699d3829e642715411119ccc Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Fri, 14 Aug 2020 16:00:14 +0200 Subject: [PATCH 2/7] Tabs, not spaces. --- utils/prometheus/src/sourced.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/utils/prometheus/src/sourced.rs b/utils/prometheus/src/sourced.rs index 174bfbe3dd240..3af1e3d40f903 100644 --- a/utils/prometheus/src/sourced.rs +++ b/utils/prometheus/src/sourced.rs @@ -38,8 +38,8 @@ pub enum Gauge {} /// instead of being independently recorded. #[derive(Debug, Clone)] pub struct SourcedMetric { - source: S, - desc: Desc, + source: S, + desc: Desc, _type: PhantomData, } @@ -53,18 +53,18 @@ pub trait MetricSource: Sync + Send + Clone { impl SourcedMetric { /// Creates a new metric that obtains its values from the given source. - pub fn new(opts: &Opts, source: S) -> prometheus::Result { + pub fn new(opts: &Opts, source: S) -> prometheus::Result { let desc = opts.describe()?; - Ok(Self { source, desc, _type: PhantomData }) - } + Ok(Self { source, desc, _type: PhantomData }) + } } impl Collector for SourcedMetric { - fn desc(&self) -> Vec<&Desc> { - vec![&self.desc] - } + fn desc(&self) -> Vec<&Desc> { + vec![&self.desc] + } - fn collect(&self) -> Vec { + fn collect(&self) -> Vec { let mut counters = Vec::new(); self.source.collect(|label_values, value| { @@ -107,14 +107,14 @@ impl Collector for SourcedMetric { counters.push(m); }); - let mut m = proto::MetricFamily::default(); - m.set_name(self.desc.fq_name.clone()); - m.set_help(self.desc.help.clone()); - m.set_field_type(T::proto()); - m.set_metric(counters); + let mut m = proto::MetricFamily::default(); + m.set_name(self.desc.fq_name.clone()); + m.set_help(self.desc.help.clone()); + m.set_field_type(T::proto()); + m.set_metric(counters); - vec![m] - } + vec![m] + } } /// Types of metrics that can obtain their values from an existing source. From 22be2f76fd5d07ae390b9d5ab92e86f0c8e251e2 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Mon, 17 Aug 2020 09:42:03 +0200 Subject: [PATCH 3/7] Tweak bandwidth counter registration. --- client/network/src/service.rs | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/client/network/src/service.rs b/client/network/src/service.rs index f2cd8db19acd7..8262b0062f927 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -351,9 +351,9 @@ impl NetworkWorker { // Initialize the metrics. let metrics = match ¶ms.metrics_registry { Some(registry) => { - // External metrics. - Metrics::register_bandwidth(registry, bandwidth.clone())?; - // Other metrics. + // Sourced metrics. + BandwidthCounters::register(registry, bandwidth.clone())?; + // Other (i.e. new) metrics. Some(Metrics::register(registry)?) } None => None @@ -1169,19 +1169,10 @@ struct Metrics { /// The source for bandwidth metrics. #[derive(Clone)] -struct BandwidthSource(Arc); +struct BandwidthCounters(Arc); -impl MetricSource for BandwidthSource { - type N = u64; - - fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) { - set(&[&"in"], self.0.total_inbound()); - set(&[&"out"], self.0.total_outbound()); - } -} - -impl Metrics { - fn register_bandwidth(registry: &Registry, sinks: Arc) +impl BandwidthCounters { + fn register(registry: &Registry, sinks: Arc) -> Result<(), PrometheusError> { register(SourcedCounter::new( @@ -1189,11 +1180,23 @@ impl Metrics { "sub_libp2p_network_bytes_total", "Total bandwidth usage" ).variable_label("direction"), - BandwidthSource(sinks), + BandwidthCounters(sinks), )?, registry)?; Ok(()) } +} + +impl MetricSource for BandwidthCounters { + type N = u64; + + fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) { + set(&[&"in"], self.0.total_inbound()); + set(&[&"out"], self.0.total_outbound()); + } +} + +impl Metrics { fn register(registry: &Registry) -> Result { Ok(Self { From 45b734cce4708839bf290c0d872b4cb1d690fd52 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Mon, 17 Aug 2020 09:43:54 +0200 Subject: [PATCH 4/7] Add debug assertion for variable labels and values. --- utils/prometheus/src/sourced.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/utils/prometheus/src/sourced.rs b/utils/prometheus/src/sourced.rs index 3af1e3d40f903..9badcb7478fe9 100644 --- a/utils/prometheus/src/sourced.rs +++ b/utils/prometheus/src/sourced.rs @@ -86,6 +86,7 @@ impl Collector for SourcedMetric { } } + debug_assert_eq!(self.desc.variable_labels.len(), label_values.len()); match self.desc.variable_labels.len().cmp(&label_values.len()) { Ordering::Greater => log::warn!("Missing label values for sourced metric {}", self.desc.fq_name), From d341e21a3f60a163d0012357b9caf589712662e4 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Mon, 17 Aug 2020 09:45:41 +0200 Subject: [PATCH 5/7] Document monotonicity requirement for sourced counters. --- utils/prometheus/src/sourced.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/utils/prometheus/src/sourced.rs b/utils/prometheus/src/sourced.rs index 9badcb7478fe9..58f60e4969bb8 100644 --- a/utils/prometheus/src/sourced.rs +++ b/utils/prometheus/src/sourced.rs @@ -21,6 +21,10 @@ use prometheus::proto; use std::{cmp::Ordering, marker::PhantomData}; /// A counter whose values are obtained from an existing source. +/// +/// > **Note*: The counter values provided by the source `S` +/// > must be monotonically increasing. Otherwise use a +/// > [`SourcedGauge`] instead. pub type SourcedCounter = SourcedMetric; /// A gauge whose values are obtained from an existing source. From d08fb2ad10a70bd9f6959356588222add48d6157 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Mon, 17 Aug 2020 10:17:59 +0200 Subject: [PATCH 6/7] CI From aa3126a99fc19e22251364f40716e1d10f82e1ad Mon Sep 17 00:00:00 2001 From: Roman Borschel Date: Mon, 17 Aug 2020 22:41:50 +0200 Subject: [PATCH 7/7] Update client/network/src/service.rs Co-authored-by: Max Inden --- client/network/src/service.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 325879fd90efa..713357772d417 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -1201,7 +1201,6 @@ impl MetricSource for BandwidthCounters { } impl Metrics { - fn register(registry: &Registry) -> Result { Ok(Self { // This list is ordered alphabetically