diff --git a/.github/workflows/ship.yaml b/.github/workflows/ship.yaml index 467e9220..66cf29af 100644 --- a/.github/workflows/ship.yaml +++ b/.github/workflows/ship.yaml @@ -162,9 +162,9 @@ jobs: sudo apt-get update sudo apt-get install -y ${{ matrix.linux-packages }} - # - uses: Swatinem/rust-cache@v2 - # with: - # shared-key: "build-${matrix.runner}" # share the cache across jobs + - uses: Swatinem/rust-cache@v2 + with: + shared-key: "build-${matrix.runner}" # share the cache across jobs - name: build the CLI run: | diff --git a/Cargo.lock b/Cargo.lock index 3cac1d71..4c824528 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1341,8 +1341,8 @@ dependencies = [ "bb8", "bb8-tiberius", "prometheus", - "query-engine-execution", "query-engine-metadata", + "query-engine-metrics", "schemars", "serde", "serde_json", @@ -1739,6 +1739,7 @@ dependencies = [ "bb8-tiberius", "bytes", "prometheus", + "query-engine-metrics", "query-engine-sql", "query-engine-translation", "serde_json", @@ -1757,6 +1758,22 @@ dependencies = [ "serde", ] +[[package]] +name = "query-engine-metrics" +version = "0.1.1" +dependencies = [ + "bb8", + "bb8-tiberius", + "bytes", + "prometheus", + "serde_json", + "sqlformat", + "thiserror", + "tiberius", + "tokio-stream", + "tracing", +] + [[package]] name = "query-engine-sql" version = "0.1.1" diff --git a/Cargo.toml b/Cargo.toml index fbb2ee89..b4e95544 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "crates/query-engine/metadata", "crates/query-engine/sql", "crates/query-engine/translation", + "crates/query-engine/metrics", "crates/cli", ] diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index 9d748f0d..accd1115 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -8,7 +8,7 @@ workspace = true [dependencies] query-engine-metadata = { path = "../query-engine/metadata" } -query-engine-execution = { path = "../query-engine/execution" } +query-engine-metrics = { path = "../query-engine/metrics" } schemars = { version = "0.8.16", features = ["smol_str", "preserve_order"] } diff --git a/crates/configuration/src/version1.rs b/crates/configuration/src/version1.rs index 4703bac3..844c841d 100644 --- a/crates/configuration/src/version1.rs +++ b/crates/configuration/src/version1.rs @@ -2,9 +2,9 @@ use super::introspection; use crate::error::Error; -use query_engine_execution::metrics; use query_engine_metadata::metadata; use query_engine_metadata::metadata::{database, Nullable}; +use query_engine_metrics::metrics; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; @@ -74,7 +74,7 @@ pub struct Configuration { #[derive(Debug)] pub struct State { pub mssql_pool: bb8::Pool, - pub metrics: query_engine_execution::metrics::Metrics, + pub metrics: query_engine_metrics::metrics::Metrics, } /// Validate the user configuration. @@ -99,7 +99,7 @@ pub async fn create_state( let mssql_pool = create_mssql_pool(&configuration.config) .await .map_err(InitializationError::UnableToCreateMSSQLPool)?; - let metrics = query_engine_execution::metrics::Metrics::initialize(metrics_registry) + let metrics = query_engine_metrics::metrics::Metrics::initialize(metrics_registry) .map_err(InitializationError::MetricsError)?; Ok(State { mssql_pool, diff --git a/crates/query-engine/execution/Cargo.toml b/crates/query-engine/execution/Cargo.toml index b948f673..21077217 100644 --- a/crates/query-engine/execution/Cargo.toml +++ b/crates/query-engine/execution/Cargo.toml @@ -9,6 +9,7 @@ workspace = true [dependencies] query-engine-sql = { path = "../sql" } query-engine-translation = { path = "../translation" } +query-engine-metrics = { path = "../metrics" } tiberius = { version = "0.12.2", default-features = false, features = ["rustls"] } bb8 = "0.8.1" diff --git a/crates/query-engine/execution/src/lib.rs b/crates/query-engine/execution/src/lib.rs index 5e60fbf3..bd36ebd9 100644 --- a/crates/query-engine/execution/src/lib.rs +++ b/crates/query-engine/execution/src/lib.rs @@ -3,6 +3,5 @@ pub mod error; pub mod helpers; -pub mod metrics; pub mod mutation; pub mod query; diff --git a/crates/query-engine/execution/src/mutation.rs b/crates/query-engine/execution/src/mutation.rs index 9e21241f..58f469db 100644 --- a/crates/query-engine/execution/src/mutation.rs +++ b/crates/query-engine/execution/src/mutation.rs @@ -1,9 +1,9 @@ use crate::{ helpers::{execute_statement, rollback_on_exception}, - metrics, query::execute_query, }; use bytes::{BufMut, Bytes, BytesMut}; +use query_engine_metrics::metrics; use query_engine_sql::sql::{ self, ast::With, diff --git a/crates/query-engine/execution/src/query.rs b/crates/query-engine/execution/src/query.rs index 9d7232c8..7bc629d2 100644 --- a/crates/query-engine/execution/src/query.rs +++ b/crates/query-engine/execution/src/query.rs @@ -1,7 +1,7 @@ //! Execute a Query execution plan against the database. -use crate::metrics; use bytes::{BufMut, Bytes, BytesMut}; +use query_engine_metrics::metrics; use crate::error::Error; use query_engine_sql::sql; diff --git a/crates/query-engine/metrics/Cargo.toml b/crates/query-engine/metrics/Cargo.toml new file mode 100644 index 00000000..5884f844 --- /dev/null +++ b/crates/query-engine/metrics/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "query-engine-metrics" +version.workspace = true +edition.workspace = true + +[lints] +workspace = true + +[dependencies] +tiberius = { version = "0.12.2", default-features = false, features = ["rustls"] } +bb8 = "0.8.1" +bb8-tiberius = "0.15.0" +bytes = "1.6.0" +prometheus = "0.13.3" +serde_json = "1.0.116" +sqlformat = "0.2.3" +tokio-stream = "0.1.14" +tracing = "0.1.40" +thiserror = "1.0" diff --git a/crates/query-engine/metrics/mod.rs b/crates/query-engine/metrics/mod.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/crates/query-engine/metrics/mod.rs @@ -0,0 +1 @@ + diff --git a/crates/query-engine/metrics/src/lib.rs b/crates/query-engine/metrics/src/lib.rs new file mode 100644 index 00000000..e1448832 --- /dev/null +++ b/crates/query-engine/metrics/src/lib.rs @@ -0,0 +1 @@ +pub mod metrics; diff --git a/crates/query-engine/metrics/src/metrics.rs b/crates/query-engine/metrics/src/metrics.rs new file mode 100644 index 00000000..d6f3487e --- /dev/null +++ b/crates/query-engine/metrics/src/metrics.rs @@ -0,0 +1,285 @@ +//! Metrics setup and update for our connector. + +use prometheus::{Histogram, HistogramTimer, IntCounter, Registry}; + +/// The collection of all metrics exposed through the `/metrics` endpoint. +#[derive(Debug, Clone)] +pub struct Metrics { + query_total: IntCounter, + explain_total: IntCounter, + query_total_time: Histogram, + mutation_total_time: Histogram, + query_plan_time: Histogram, + mutation_plan_time: Histogram, + query_execution_time: Histogram, + mutation_execution_time: Histogram, + connection_acquisition_wait_time: Histogram, + pub error_metrics: ErrorMetrics, +} + +impl Metrics { + /// Set up counters and gauges used to produce Prometheus metrics + pub fn initialize(metrics_registry: &mut Registry) -> Result { + let query_total = add_int_counter_metric( + metrics_registry, + "ndc_sqlserver_query_total", + "Total successful queries.", + )?; + + let explain_total = add_int_counter_metric( + metrics_registry, + "ndc_sqlserver_explain_total", + "Total successful explains.", + )?; + + let query_total_time = add_histogram_metric( + metrics_registry, + "ndc_sqlserver_query_total_time", + "Total time taken to plan and execute a query, in seconds", + )?; + + let query_plan_time = add_histogram_metric( + metrics_registry, + "ndc_sqlserver_query_plan_time", + "Time taken to plan a query for execution, in seconds.", + )?; + + let query_execution_time = add_histogram_metric( + metrics_registry, + "ndc_sqlserver_query_execution_time", + "Time taken to execute an already-planned query, in seconds.", + )?; + + let mutation_total_time = add_histogram_metric( + metrics_registry, + "ndc_sqlserver_mutation_total_time", + "Total time taken to plan and execute a mutation, in seconds", + )?; + + let mutation_plan_time = add_histogram_metric( + metrics_registry, + "ndc_sqlserver_mutation_plan_time", + "Time taken to plan a mutation for execution, in seconds.", + )?; + + let mutation_execution_time = add_histogram_metric( + metrics_registry, + "ndc_sqlserver_mutation_execution_time", + "Time taken to execute an already-planned mutation, in seconds.", + )?; + + let connection_acquisition_wait_time = add_histogram_metric( + metrics_registry, + "ndc_sqlserver_connection_acquisition_wait_time", + "Time taken to acquire a connection.", + )?; + + let error_metrics = ErrorMetrics::initialize(metrics_registry)?; + + Ok(Self { + query_total, + explain_total, + query_total_time, + query_plan_time, + query_execution_time, + connection_acquisition_wait_time, + error_metrics, + mutation_execution_time, + mutation_plan_time, + mutation_total_time, + }) + } + + pub fn record_successful_query(&self) { + self.query_total.inc() + } + + pub fn record_successful_explain(&self) { + self.explain_total.inc() + } + + pub fn time_query_total(&self) -> Timer { + Timer(self.query_total_time.start_timer()) + } + + pub fn time_mutation_total(&self) -> Timer { + Timer(self.mutation_total_time.start_timer()) + } + + pub fn time_query_plan(&self) -> Timer { + Timer(self.query_plan_time.start_timer()) + } + + pub fn time_mutation_plan(&self) -> Timer { + Timer(self.mutation_plan_time.start_timer()) + } + + pub fn time_query_execution(&self) -> Timer { + Timer(self.query_execution_time.start_timer()) + } + + pub fn time_mutation_execution(&self) -> Timer { + Timer(self.mutation_execution_time.start_timer()) + } + + pub fn time_connection_acquisition_wait(&self) -> Timer { + Timer(self.connection_acquisition_wait_time.start_timer()) + } +} + +/// Create a new int counter metric and register it with the provided Prometheus Registry +fn add_int_counter_metric( + metrics_registry: &mut Registry, + metric_name: &str, + metric_description: &str, +) -> Result { + let int_counter = IntCounter::with_opts(prometheus::Opts::new(metric_name, metric_description)) + .map_err(Error)?; + register_collector(metrics_registry, int_counter) +} + +/// Create a new histogram metric using the default buckets, and register it with the provided +/// Prometheus Registry. +fn add_histogram_metric( + metrics_registry: &mut prometheus::Registry, + metric_name: &str, + metric_description: &str, +) -> Result { + let histogram = Histogram::with_opts(prometheus::HistogramOpts::new( + metric_name, + metric_description, + )) + .map_err(Error)?; + register_collector(metrics_registry, histogram) +} + +/// Register a new collector with the registry, and returns it for later use. +fn register_collector( + metrics_registry: &mut Registry, + collector: Collector, +) -> Result { + metrics_registry + .register(Box::new(collector.clone())) + .map_err(Error)?; + Ok(collector) +} + +/// A wrapper around the Prometheus [HistogramTimer] that can make a decision +/// on whether to record or not based on a result. +pub struct Timer(HistogramTimer); + +impl Timer { + /// Stops the timer, recording if the result is `Ok`, and discarding it if + /// the result is an `Err`. It returns its input for convenience. + pub fn complete_with(self, result: Result) -> Result { + match result { + Ok(_) => { + self.0.stop_and_record(); + } + Err(_) => { + self.0.stop_and_discard(); + } + }; + result + } +} + +/// A wrapper around the internal Prometheus error type to avoid exposing more +/// than we need. +#[derive(Debug)] +pub struct Error(prometheus::Error); + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl std::error::Error for Error {} + +/// A collection of metrics indicating errors. +#[derive(Debug, Clone)] +pub struct ErrorMetrics { + /// the connector received an invalid request. + invalid_request_total: IntCounter, + /// the connector received a request using capabilities it does not support. + unsupported_capability_total: IntCounter, + /// the connector could not fulfill a request because it does not support + /// certain features (which are not described as capabilities). + unsupported_feature_total: IntCounter, + /// the connector had an internal error. + connector_error_total: IntCounter, + /// the database emmited an error. + database_error_total: IntCounter, + /// we failed to acquire a database connection from the pool + connection_acquisition_error_total: IntCounter, +} + +impl ErrorMetrics { + /// Set up counters and gauges used to produce Prometheus metrics + pub fn initialize(metrics_registry: &mut prometheus::Registry) -> Result { + let invalid_request_total = add_int_counter_metric( + metrics_registry, + "ndc_sqlserver_error_invalid_request_total_count", + "Total number of invalid requests encountered.", + )?; + + let unsupported_capability_total = add_int_counter_metric( + metrics_registry, + "ndc_sqlserver_error_unsupported_capability_total_count", + "Total number of invalid requests with unsupported capabilities encountered.", + )?; + + let unsupported_feature_total = add_int_counter_metric( + metrics_registry, + "ndc_sqlserver_error_unsupported_capabilities_total_count", + "Total number of invalid requests with unsupported capabilities encountered.", + )?; + + let connector_error_total = add_int_counter_metric( + metrics_registry, + "ndc_sqlserver_error_connector_error_total_count", + "Total number of requests failed due to an internal conenctor error.", + )?; + + let database_error_total = add_int_counter_metric( + metrics_registry, + "ndc_sqlserver_error_database_error_total_count", + "Total number of requests failed due to a database error.", + )?; + + let connection_acquisition_error_total = add_int_counter_metric( + metrics_registry, + "ndc_sqlserver_error_connection_acquisition_error_total_count", + "Total number of failures to acquire a database connection.", + )?; + + Ok(ErrorMetrics { + invalid_request_total, + unsupported_capability_total, + unsupported_feature_total, + connector_error_total, + database_error_total, + connection_acquisition_error_total, + }) + } + + pub fn record_invalid_request(&self) { + self.invalid_request_total.inc(); + } + pub fn record_unsupported_capability(&self) { + self.unsupported_capability_total.inc(); + } + pub fn record_unsupported_feature(&self) { + self.unsupported_feature_total.inc(); + } + pub fn record_connector_error(&self) { + self.connector_error_total.inc(); + } + pub fn record_database_error(&self) { + self.database_error_total.inc(); + } + pub fn record_connection_acquisition_error(&self) { + self.connection_acquisition_error_total.inc() + } +}