Skip to content

Commit

Permalink
wire up route level body count tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cratelyn committed Nov 8, 2024
1 parent cada6bd commit 4061287
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 31 deletions.
3 changes: 2 additions & 1 deletion linkerd/app/outbound/src/http/logical/policy/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ where
// AND/OR headers
.push(extensions::NewSetExtensions::layer())
.push({
// XXX(kate); now, we have a route label extractor.
let mk = Self::label_extractor;
metrics::layer(&metrics.requests, &metrics.body_data, mk)
metrics::layer(&metrics.requests, mk, &metrics.body_data)
})
.check_new::<Self>()
.check_new_service::<Self, http::Request<http::BoxBody>>()
Expand Down
23 changes: 17 additions & 6 deletions linkerd/app/outbound/src/http/logical/policy/route/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use super::{backend::metrics as backend, retry};
use crate::{ParentRef, RouteRef};
use linkerd_app_core::{
metrics::prom::{self, EncodeLabelSetMut},
svc,
};
use linkerd_http_prom::{
body_data::request::{BodyDataMetrics, NewRecordBodyData, RequestBodyFamilies},
body_data::request::{NewRecordBodyData, RequestBodyFamilies},
record_response::{self, StreamLabel},
};

Expand Down Expand Up @@ -55,6 +54,15 @@ pub type LabelGrpcRouteRsp = LabelGrpcRsp<labels::Route>;
pub type LabelHttpRouteBackendRsp = LabelHttpRsp<labels::RouteBackend>;
pub type LabelGrpcRouteBackendRsp = LabelGrpcRsp<labels::RouteBackend>;

/*
pub type NewRecordBodyData<T, N, X> = body_data::request::NewRecordBodyData<
N,
fn(&T) -> labels::RouteLabelExtract, // XXX(kate) we may be able to default this
labels::RouteLabelExtract,
labels::Route,
>;
*/

pub type NewRecordDuration<T, M, N> =
record_response::NewRecordResponse<T, ExtractRecordDurationParams<M>, M, N>;

Expand All @@ -64,22 +72,25 @@ pub struct ExtractRecordDurationParams<M>(pub M);
#[derive(Clone, Debug)]
pub struct ExtractRecordBodyDataParams(RequestBodyFamilies<labels::Route>);

pub fn layer<T, N>(
pub fn layer<T, N, X>(
metrics: &RequestMetrics<T::StreamLabel>,
mk: X,
body_data: &RequestBodyFamilies<labels::Route>,
mk: &labels::RouteLabelExtract,
) -> impl svc::Layer<
N,
Service = NewRecordBodyData<
ExtractRecordBodyDataParams,
NewRecordDuration<T, RequestMetrics<T::StreamLabel>, N>,
X,
labels::RouteLabelExtract,
labels::Route,
>,
>
where
T: Clone + MkStreamLabel,
X: Clone,
{
let record = NewRecordDuration::layer_via(ExtractRecordDurationParams(metrics.clone()));
let body_data = NewRecordBodyData::layer_via(ExtractRecordBodyDataParams(body_data.clone()));
let body_data = NewRecordBodyData::new(mk, body_data.clone());

svc::layers().push(record).push(body_data)
}
Expand Down
83 changes: 60 additions & 23 deletions linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::http::policy::route::MatchedRoute;

use super::{
super::{Grpc, Http, Route},
labels,
Expand All @@ -12,19 +14,25 @@ use linkerd_app_core::{
Layer, NewService,
},
};
use linkerd_http_prom::body_data::request::RequestBodyFamilies;
use linkerd_proxy_client_policy as policy;

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn http_request_statuses() {
let _trace = linkerd_tracing::test::trace_init();

let metrics = super::HttpRouteMetrics::default().requests;
let super::HttpRouteMetrics {
requests,
body_data,
..
} = super::HttpRouteMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let (mut svc, mut handle) = mock_http_route_metrics(&metrics, &parent_ref, &route_ref);
let (mut svc, mut handle) =
mock_http_route_metrics(&requests, &body_data, &parent_ref, &route_ref);

// Send one request and ensure it's counted.
let ok = metrics.get_statuses(&labels::Rsp(
let ok = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), &Uri::default()),
labels::HttpRsp {
status: Some(http::StatusCode::OK),
Expand All @@ -43,7 +51,7 @@ async fn http_request_statuses() {

// Send another request and ensure it's counted with a different response
// status.
let no_content = metrics.get_statuses(&labels::Rsp(
let no_content = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), &Uri::default()),
labels::HttpRsp {
status: Some(http::StatusCode::NO_CONTENT),
Expand All @@ -67,7 +75,7 @@ async fn http_request_statuses() {
.await;

// Emit a response with an error and ensure it's counted.
let unknown = metrics.get_statuses(&labels::Rsp(
let unknown = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref.clone(), route_ref.clone(), &Uri::default()),
labels::HttpRsp {
status: None,
Expand All @@ -81,7 +89,7 @@ async fn http_request_statuses() {

// Emit a successful response with a body that fails and ensure that both
// the status and error are recorded.
let mixed = metrics.get_statuses(&labels::Rsp(
let mixed = requests.get_statuses(&labels::Rsp(
labels::Route::new(parent_ref, route_ref, &Uri::default()),
labels::HttpRsp {
status: Some(http::StatusCode::OK),
Expand Down Expand Up @@ -117,13 +125,18 @@ async fn http_request_hostnames() {

let _trace = linkerd_tracing::test::trace_init();

let metrics = super::HttpRouteMetrics::default().requests;
let super::HttpRouteMetrics {
requests,
body_data,
..
} = super::HttpRouteMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let (mut svc, mut handle) = mock_http_route_metrics(&metrics, &parent_ref, &route_ref);
let (mut svc, mut handle) =
mock_http_route_metrics(&requests, &body_data, &parent_ref, &route_ref);

let get_counter = |host: Option<&'static str>, status: Option<http::StatusCode>| {
metrics.get_statuses(&labels::Rsp(
requests.get_statuses(&labels::Rsp(
labels::Route::new_with_name(
parent_ref.clone(),
route_ref.clone(),
Expand Down Expand Up @@ -235,13 +248,18 @@ async fn http_request_hostnames() {
async fn grpc_request_statuses_ok() {
let _trace = linkerd_tracing::test::trace_init();

let metrics = super::GrpcRouteMetrics::default().requests;
let super::GrpcRouteMetrics {
requests,
body_data,
..
} = super::GrpcRouteMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let (mut svc, mut handle) = mock_grpc_route_metrics(&metrics, &parent_ref, &route_ref);
let (mut svc, mut handle) =
mock_grpc_route_metrics(&requests, &body_data, &parent_ref, &route_ref);

// Send one request and ensure it's counted.
let ok = metrics.get_statuses(&labels::Rsp(
let ok = requests.get_statuses(&labels::Rsp(
labels::Route::new(
parent_ref.clone(),
route_ref.clone(),
Expand Down Expand Up @@ -280,14 +298,19 @@ async fn grpc_request_statuses_ok() {
async fn grpc_request_statuses_not_found() {
let _trace = linkerd_tracing::test::trace_init();

let metrics = super::GrpcRouteMetrics::default().requests;
let super::GrpcRouteMetrics {
requests,
body_data,
..
} = super::GrpcRouteMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let (mut svc, mut handle) = mock_grpc_route_metrics(&metrics, &parent_ref, &route_ref);
let (mut svc, mut handle) =
mock_grpc_route_metrics(&requests, &body_data, &parent_ref, &route_ref);

// Send another request and ensure it's counted with a different response
// status.
let not_found = metrics.get_statuses(&labels::Rsp(
let not_found = requests.get_statuses(&labels::Rsp(
labels::Route::new(
parent_ref.clone(),
route_ref.clone(),
Expand Down Expand Up @@ -326,12 +349,17 @@ async fn grpc_request_statuses_not_found() {
async fn grpc_request_statuses_error_response() {
let _trace = linkerd_tracing::test::trace_init();

let metrics = super::GrpcRouteMetrics::default().requests;
let super::GrpcRouteMetrics {
requests,
body_data,
..
} = super::GrpcRouteMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let (mut svc, mut handle) = mock_grpc_route_metrics(&metrics, &parent_ref, &route_ref);
let (mut svc, mut handle) =
mock_grpc_route_metrics(&requests, &body_data, &parent_ref, &route_ref);

let unknown = metrics.get_statuses(&labels::Rsp(
let unknown = requests.get_statuses(&labels::Rsp(
labels::Route::new(
parent_ref.clone(),
route_ref.clone(),
Expand Down Expand Up @@ -360,12 +388,17 @@ async fn grpc_request_statuses_error_response() {
async fn grpc_request_statuses_error_body() {
let _trace = linkerd_tracing::test::trace_init();

let metrics = super::GrpcRouteMetrics::default().requests;
let super::GrpcRouteMetrics {
requests,
body_data,
..
} = super::GrpcRouteMetrics::default();
let parent_ref = crate::ParentRef(policy::Meta::new_default("parent"));
let route_ref = crate::RouteRef(policy::Meta::new_default("route"));
let (mut svc, mut handle) = mock_grpc_route_metrics(&metrics, &parent_ref, &route_ref);
let (mut svc, mut handle) =
mock_grpc_route_metrics(&requests, &body_data, &parent_ref, &route_ref);

let unknown = metrics.get_statuses(&labels::Rsp(
let unknown = requests.get_statuses(&labels::Rsp(
labels::Route::new(
parent_ref.clone(),
route_ref.clone(),
Expand Down Expand Up @@ -404,6 +437,7 @@ const MOCK_GRPC_REQ_URI: &str = "http://host/svc/method";

pub fn mock_http_route_metrics(
metrics: &RequestMetrics<LabelHttpRouteRsp>,
body_data: &RequestBodyFamilies<labels::Route>,
parent_ref: &crate::ParentRef,
route_ref: &crate::RouteRef,
) -> (svc::BoxHttp, Handle) {
Expand All @@ -425,8 +459,9 @@ pub fn mock_http_route_metrics(
)
.expect("find default route");

let extract = MatchedRoute::label_extractor;
let (tx, handle) = tower_test::mock::pair::<http::Request<BoxBody>, http::Response<BoxBody>>();
let svc = super::layer(metrics)
let svc = super::layer(metrics, extract, body_data)
.layer(move |_t: Http<()>| tx.clone())
.new_service(Http {
r#match,
Expand All @@ -446,6 +481,7 @@ pub fn mock_http_route_metrics(

pub fn mock_grpc_route_metrics(
metrics: &RequestMetrics<LabelGrpcRouteRsp>,
body_data: &RequestBodyFamilies<labels::Route>,
parent_ref: &crate::ParentRef,
route_ref: &crate::RouteRef,
) -> (svc::BoxHttp, Handle) {
Expand All @@ -471,8 +507,9 @@ pub fn mock_grpc_route_metrics(
)
.expect("find default route");

let extract = MatchedRoute::label_extractor;
let (tx, handle) = tower_test::mock::pair::<http::Request<BoxBody>, http::Response<BoxBody>>();
let svc = super::layer(metrics)
let svc = super::layer(metrics, extract, body_data)
.layer(move |_t: Grpc<()>| tx.clone())
.new_service(Grpc {
r#match,
Expand Down
2 changes: 1 addition & 1 deletion linkerd/http/prom/src/body_data/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ where
///
/// This uses an `X`-typed [`ExtractParam<P, T>`] implementation to extract service parameters
/// from a `T`-typed target.
pub fn layer_via(metrics: RequestBodyFamilies<L>, extract: X) -> impl Layer<N, Service = Self> {
pub fn new(extract: X, metrics: RequestBodyFamilies<L>) -> impl Layer<N, Service = Self> {
svc::layer::mk(move |inner| Self {
inner,
extract: extract.clone(),
Expand Down

0 comments on commit 4061287

Please # to comment.