From bb93e718c2b424bdb200aa67c96b37fa2bd19600 Mon Sep 17 00:00:00 2001 From: Lois Soto Lopez Date: Thu, 23 May 2024 10:28:26 +0200 Subject: [PATCH] Prometheus: some per-exchange/per-queue metrics aggregated per-channel Add copies of some per-object metrics that are labeled per-channel aggregated to reduce cardinality. These metrics are valuable and easier to process if exposed on per-exchange and per-queue basis. --- deps/rabbit/src/rabbit_core_metrics_gc.erl | 20 +++- .../include/rabbit_core_metrics.hrl | 8 ++ .../rabbit_common/src/rabbit_core_metrics.erl | 44 ++++++--- ...etheus_rabbitmq_core_metrics_collector.erl | 73 ++++++++++++-- .../test/rabbit_prometheus_http_SUITE.erl | 98 ++++++++++++++++++- 5 files changed, 218 insertions(+), 25 deletions(-) diff --git a/deps/rabbit/src/rabbit_core_metrics_gc.erl b/deps/rabbit/src/rabbit_core_metrics_gc.erl index 0849bd503512..792dcb790ab2 100644 --- a/deps/rabbit/src/rabbit_core_metrics_gc.erl +++ b/deps/rabbit/src/rabbit_core_metrics_gc.erl @@ -92,14 +92,17 @@ gc_leader_data(Id, Table, GbSet) -> gc_global_queues() -> GbSet = gb_sets:from_list(rabbit_amqqueue:list_names()), gc_process_and_entity(channel_queue_metrics, GbSet), + gc_entity(queue_delivery_metrics, GbSet), gc_process_and_entity(consumer_created, GbSet), ExchangeGbSet = gb_sets:from_list(rabbit_exchange:list_names()), - gc_process_and_entities(channel_queue_exchange_metrics, GbSet, ExchangeGbSet). + gc_process_and_entities(channel_queue_exchange_metrics, GbSet, ExchangeGbSet), + gc_entities(queue_exchange_metrics, GbSet, ExchangeGbSet). gc_exchanges() -> Exchanges = rabbit_exchange:list_names(), GbSet = gb_sets:from_list(Exchanges), - gc_process_and_entity(channel_exchange_metrics, GbSet). + gc_process_and_entity(channel_exchange_metrics, GbSet), + gc_entity(exchange_metrics, GbSet). gc_nodes() -> Nodes = rabbit_nodes:list_members(), @@ -153,6 +156,12 @@ gc_entity(Table, GbSet) -> ({Id = Key, _, _}, none) -> gc_entity(Id, Table, Key, GbSet); ({Id = Key, _, _, _, _}, none) -> + gc_entity(Id, Table, Key, GbSet); + ({Id = Key, _, _, _, _, _}, none) + when Table == exchange_metrics -> + gc_entity(Id, Table, Key, GbSet); + ({Id = Key, _, _, _, _, _, _, _, _}, none) + when Table == queue_delivery_metrics -> gc_entity(Id, Table, Key, GbSet) end, none, Table). @@ -188,6 +197,13 @@ gc_process_and_entity(Id, Pid, Table, Key, GbSet) -> none end. +gc_entities(Table, QueueGbSet, ExchangeGbSet) -> + ets:foldl(fun({{QueueId, ExchangeId} = Key, _, _}, none) + when Table == queue_exchange_metrics -> + gc_entity(QueueId, Table, Key, QueueGbSet), + gc_entity(ExchangeId, Table, Key, ExchangeGbSet) + end, none, Table). + gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) -> ets:foldl(fun({{Pid, {Q, X}} = Key, _, _}, none) -> gc_process(Pid, Table, Key), diff --git a/deps/rabbit_common/include/rabbit_core_metrics.hrl b/deps/rabbit_common/include/rabbit_core_metrics.hrl index 59743b4ec7da..d0d189139eb8 100644 --- a/deps/rabbit_common/include/rabbit_core_metrics.hrl +++ b/deps/rabbit_common/include/rabbit_core_metrics.hrl @@ -28,6 +28,14 @@ {auth_attempt_metrics, set}, {auth_attempt_detailed_metrics, set}]). +% `CORE_NON_CHANNEL_TABLES` are tables that store counters representing the +% same info as some of the channel_queue_metrics, channel_exchange_metrics and +% channel_queue_exchange_metrics but without including the channel ID in the +% key. +-define(CORE_NON_CHANNEL_TABLES, [{queue_delivery_metrics, set}, + {exchange_metrics, set}, + {queue_exchange_metrics, set}]). + -define(CONNECTION_CHURN_METRICS, {node(), 0, 0, 0, 0, 0, 0, 0}). %% connection_created :: {connection_id, proplist} diff --git a/deps/rabbit_common/src/rabbit_core_metrics.erl b/deps/rabbit_common/src/rabbit_core_metrics.erl index 0c46b41db456..c06b73bc457d 100644 --- a/deps/rabbit_common/src/rabbit_core_metrics.erl +++ b/deps/rabbit_common/src/rabbit_core_metrics.erl @@ -111,13 +111,15 @@ create_table({Table, Type}) -> {read_concurrency, true}]). init() -> - _ = [create_table({Table, Type}) - || {Table, Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES], + Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES, + _ = [create_table({Table, Type}) + || {Table, Type} <- Tables], ok. terminate() -> + Tables = ?CORE_TABLES ++ ?CORE_EXTRA_TABLES ++ ?CORE_NON_CHANNEL_TABLES, [ets:delete(Table) - || {Table, _Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES], + || {Table, _Type} <- Tables], ok. connection_created(Pid, Infos) -> @@ -166,53 +168,65 @@ channel_stats(reductions, Id, Value) -> ets:insert(channel_process_metrics, {Id, Value}), ok. -channel_stats(exchange_stats, publish, Id, Value) -> +channel_stats(exchange_stats, publish, {_ChannelPid, XName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_exchange_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0}), + _ = ets:update_counter(exchange_metrics, XName, {2, Value}, {XName, 0, 0, 0, 0, 0}), ok; -channel_stats(exchange_stats, confirm, Id, Value) -> +channel_stats(exchange_stats, confirm, {_ChannelPid, XName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_exchange_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0}), + _ = ets:update_counter(exchange_metrics, XName, {3, Value}, {XName, 0, 0, 0, 0, 0}), ok; -channel_stats(exchange_stats, return_unroutable, Id, Value) -> +channel_stats(exchange_stats, return_unroutable, {_ChannelPid, XName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_exchange_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0}), + _ = ets:update_counter(exchange_metrics, XName, {4, Value}, {XName, 0, 0, 0, 0, 0}), ok; -channel_stats(exchange_stats, drop_unroutable, Id, Value) -> +channel_stats(exchange_stats, drop_unroutable, {_ChannelPid, XName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_exchange_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0}), + _ = ets:update_counter(exchange_metrics, XName, {5, Value}, {XName, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_exchange_stats, publish, Id, Value) -> +channel_stats(queue_exchange_stats, publish, {_ChannelPid, QueueExchange} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_exchange_metrics, Id, Value, {Id, 0, 0}), + _ = ets:update_counter(queue_exchange_metrics, QueueExchange, Value, {QueueExchange, 0, 0}), ok; -channel_stats(queue_stats, get, Id, Value) -> +channel_stats(queue_stats, get, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {2, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_delivery_metrics, QName, {2, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, get_no_ack, Id, Value) -> +channel_stats(queue_stats, get_no_ack, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {3, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_delivery_metrics, QName, {3, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, deliver, Id, Value) -> +channel_stats(queue_stats, deliver, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {4, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_delivery_metrics, QName, {4, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, deliver_no_ack, Id, Value) -> +channel_stats(queue_stats, deliver_no_ack, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {5, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_delivery_metrics, QName, {5, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, redeliver, Id, Value) -> +channel_stats(queue_stats, redeliver, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {6, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_delivery_metrics, QName, {6, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, ack, Id, Value) -> +channel_stats(queue_stats, ack, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {7, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_delivery_metrics, QName, {7, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok; -channel_stats(queue_stats, get_empty, Id, Value) -> +channel_stats(queue_stats, get_empty, {_ChannelPid, QName} = Id, Value) -> %% Includes delete marker _ = ets:update_counter(channel_queue_metrics, Id, {8, Value}, {Id, 0, 0, 0, 0, 0, 0, 0, 0}), + _ = ets:update_counter(queue_delivery_metrics, QName, {8, Value}, {QName, 0, 0, 0, 0, 0, 0, 0, 0}), ok. delete(Table, Key) -> diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index fc6f393f1359..94bdecd52c41 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -160,7 +160,6 @@ {2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes}, {2, undefined, stream_segments, counter, "Total number of stream segment files", segments} ]}, - %%% Metrics that contain reference to a channel. Some of them also have %%% a queue name, but in this case filtering on it doesn't make any %%% sense, as the queue is not an object of interest here. @@ -209,9 +208,32 @@ ]}, {channel_queue_exchange_metrics, [ - {2, undefined, queue_messages_published_total, counter, "Total number of messages published to queues"} - ]} -]). + {2, undefined, queue_messages_published_total, counter, "Total number of messages published into a queue through an exchange on a channel"} + ]}, + +%%% Metrics in the following 3 groups reference a queue and/or exchange. +%%% They each have a corresponding group in the above per-channel +%%% section but here the channel is not an object of interest. + {exchange_metrics, [ + {2, undefined, exchange_messages_published_total, counter, "Total number of messages published into an exchange"}, + {3, undefined, exchange_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed"}, + {4, undefined, exchange_messages_unroutable_returned_total, counter, "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"}, + {5, undefined, exchange_messages_unroutable_dropped_total, counter, "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"} + ]}, + + {queue_delivery_metrics, [ + {2, undefined, queue_get_ack_total, counter, "Total number of messages fetched from a queue with basic.get in manual acknowledgement mode"}, + {3, undefined, queue_get_total, counter, "Total number of messages fetched from a queue with basic.get in automatic acknowledgement mode"}, + {4, undefined, queue_messages_delivered_ack_total, counter, "Total number of messages delivered from a queue to consumers in manual acknowledgement mode"}, + {5, undefined, queue_messages_delivered_total, counter, "Total number of messages delivered from a queue to consumers in automatic acknowledgement mode"}, + {6, undefined, queue_messages_redelivered_total, counter, "Total number of messages redelivered from a queue to consumers"}, + {7, undefined, queue_messages_acked_total, counter, "Total number of messages acknowledged by consumers on a queue"}, + {8, undefined, queue_get_empty_total, counter, "Total number of times basic.get operations fetched no message on a queue"} + ]}, + + {queue_exchange_metrics, [ + {2, undefined, queue_exchange_messages_published_total, counter, "Total number of messages published into a queue through an exchange"} + ]}]). %% Metrics that can be only requested through `/metrics/detailed` -define(METRICS_CLUSTER,[ @@ -542,8 +564,11 @@ get_data(queue_metrics = Table, false, VHostsFilter) -> {disk_reads, A15}, {disk_writes, A16}, {segments, A17}]}]; get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; Table == queue_coarse_metrics; + Table == queue_delivery_metrics; Table == channel_queue_metrics; Table == connection_coarse_metrics; + Table == exchange_metrics; + Table == queue_exchange_metrics; Table == channel_queue_exchange_metrics; Table == ra_metrics; Table == channel_process_metrics -> @@ -551,6 +576,10 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; %% For queue_coarse_metrics ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> Acc; + ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> + Acc; + ({{#resource{kind = queue, virtual_host = VHost}, #resource{kind = exchange}}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> + Acc; ({_, V1}, {T, A1}) -> {T, V1 + A1}; ({_, V1, _}, {T, A1}) -> @@ -577,6 +606,36 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; _ -> [Result] end; +get_data(exchange_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter)-> + ets:foldl(fun + ({#resource{kind = exchange, virtual_host = VHost}, _, _, _, _, _} = Row, Acc) when + map_get(VHost, VHostsFilter) + -> + [Row | Acc]; + (_Row, Acc) -> + Acc + end, [], Table); +get_data(queue_delivery_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) -> + ets:foldl(fun + ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _, _, _, _, _} = Row, Acc) when + map_get(VHost, VHostsFilter) + -> + [Row | Acc]; + (_Row, Acc) -> + Acc + end, [], Table); +get_data(queue_exchange_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) -> + ets:foldl(fun + ({{ + #resource{kind = queue, virtual_host = VHost}, + #resource{kind = exchange, virtual_host = VHost} + }, _, _} = Row, Acc) when + map_get(VHost, VHostsFilter) + -> + [Row | Acc]; + (_Row, Acc) -> + Acc + end, [], Table); get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) -> ets:foldl(fun ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) -> @@ -669,15 +728,15 @@ division(A, B) -> accumulate_count_and_sum(Value, {Count, Sum}) -> {Count + 1, Sum + Value}. -empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count -> +empty(T) when T == channel_queue_exchange_metrics; T == queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count -> {T, 0}; empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics -> {T, 0, 0, 0}; -empty(T) when T == channel_exchange_metrics; T == queue_coarse_metrics; T == connection_metrics -> +empty(T) when T == channel_exchange_metrics; T == exchange_metrics; T == queue_coarse_metrics; T == connection_metrics -> {T, 0, 0, 0, 0}; empty(T) when T == ra_metrics -> {T, 0, 0, 0, 0, 0, {0, 0}}; -empty(T) when T == channel_queue_metrics; T == channel_metrics -> +empty(T) when T == channel_queue_metrics; T == queue_delivery_metrics; T == channel_metrics -> {T, 0, 0, 0, 0, 0, 0, 0}; empty(queue_metrics = T) -> {T, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}. diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index 033723507a8f..ed09bfd43616 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -34,7 +34,7 @@ groups() -> {config_path, [], generic_tests()}, {global_labels, [], generic_tests()}, {aggregated_metrics, [], [ - aggregated_metrics_test, + aggregated_metrics_test, specific_erlang_metrics_present_test, global_metrics_present_test, global_metrics_single_metric_family_test @@ -57,6 +57,9 @@ groups() -> queue_consumer_count_single_vhost_per_object_test, queue_consumer_count_all_vhosts_per_object_test, queue_coarse_metrics_per_object_test, + queue_delivery_metrics_per_object_test, + exchange_metrics_per_object_test, + queue_exchange_metrics_per_object_test, queue_metrics_per_object_test, queue_consumer_count_and_queue_metrics_mutually_exclusive_test, vhost_status_metric, @@ -367,12 +370,15 @@ aggregated_metrics_test(Config) -> %% Check the first metric value from each ETS table owned by rabbitmq_metrics ?assertEqual(match, re:run(Body, "^rabbitmq_channel_consumers ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_channel_messages_published_total ", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_exchange_messages_published_total ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_channel_process_reductions_total ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_channel_get_ack_total ", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_queue_get_ack_total ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_connections_opened_total ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_connection_incoming_bytes_total ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_connection_incoming_packets_total ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_queue_messages_published_total ", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_queue_exchange_messages_published_total ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_process_open_fds ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_process_max_fds ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_io_read_ops_total ", [{capture, none}, multiline])), @@ -403,12 +409,15 @@ per_object_metrics_test(Config, Path) -> %% Check the first metric value from each ETS table owned by rabbitmq_metrics ?assertEqual(match, re:run(Body, "^rabbitmq_channel_consumers{", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_channel_messages_published_total{", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_exchange_messages_published_total{", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_channel_process_reductions_total{", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_channel_get_ack_total{", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_queue_get_ack_total{", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_connections_opened_total ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_connection_incoming_bytes_total{", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_connection_incoming_packets_total{", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_queue_messages_published_total{", [{capture, none}, multiline])), + ?assertEqual(match, re:run(Body, "^rabbitmq_queue_exchange_messages_published_total{", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_process_open_fds ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_process_max_fds ", [{capture, none}, multiline])), ?assertEqual(match, re:run(Body, "^rabbitmq_io_read_ops_total ", [{capture, none}, multiline])), @@ -523,6 +532,93 @@ queue_coarse_metrics_per_object_test(Config) -> map_get(rabbitmq_detailed_queue_messages, parse_response(Body3))), ok. +queue_delivery_metrics_per_object_test(Config) -> + Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7]}, + + {_, Body1} = http_get_with_pal(Config, + "/metrics/detailed?vhost=vhost-1&family=queue_delivery_metrics", + [], 200), + ?assertEqual( + Expected1, + map_get( + rabbitmq_detailed_queue_messages_delivered_ack_total, + parse_response(Body1))), + + {_, Body2} = http_get_with_pal(Config, + "/metrics/detailed?vhost=vhost-2&family=queue_delivery_metrics", + [], 200), + Expected2 = #{#{queue => "vhost-2-queue-with-consumer", vhost => "vhost-2"} => [11]}, + + ?assertEqual( + Expected2, + map_get( + rabbitmq_detailed_queue_messages_delivered_ack_total, + parse_response(Body2))), + ok. + +exchange_metrics_per_object_test(Config) -> + Expected1 = #{#{exchange => "", vhost => "vhost-1"} => [14]}, + + {_, Body} = http_get_with_pal(Config, + "/metrics/detailed?vhost=vhost-1&family=exchange_metrics", + [], 200), + ?assertEqual( + Expected1, + map_get( + rabbitmq_detailed_exchange_messages_published_total, + parse_response(Body))), + ok. + +queue_exchange_metrics_per_object_test(Config) -> + Expected1 = #{ + #{ + queue => "vhost-1-queue-with-messages", + vhost => "vhost-1", + exchange => "" + } => [7], + #{ + exchange => "", + queue => "vhost-1-queue-with-consumer", + vhost => "vhost-1" + } => [7] + }, + + {_, Body1} = http_get_with_pal(Config, + "/metrics/detailed?vhost=vhost-1&family=queue_exchange_metrics", + [], 200), + ?assertEqual( + Expected1, + map_get( + rabbitmq_detailed_queue_exchange_messages_published_total, + parse_response(Body1))), + + + {_, Body2} = http_get_with_pal(Config, + "/metrics/detailed?vhost=vhost-2&family=queue_exchange_metrics", + [], 200), + + + Expected2 = #{ + #{ + queue => "vhost-2-queue-with-messages", + vhost => "vhost-2", + exchange => "" + } => [11], + #{ + exchange => "", + queue => "vhost-2-queue-with-consumer", + vhost => "vhost-2" + } => [11] + }, + + ?assertEqual( + Expected2, + map_get( + rabbitmq_detailed_queue_exchange_messages_published_total, + parse_response(Body2))), + + ok. + queue_metrics_per_object_test(Config) -> Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7], #{queue => "vhost-1-queue-with-messages", vhost => "vhost-1"} => [1]},