Skip to content

Commit

Permalink
Emit cancellation event only when stream consumer is cancelled
Browse files Browse the repository at this point in the history
Not when the channel or the connection is closed.

References #13085, #9356
  • Loading branch information
acogoluegnes committed Jan 17, 2025
1 parent a8d848f commit 69d0382
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 18 deletions.
6 changes: 1 addition & 5 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -976,11 +976,7 @@ close(#stream_client{readers = Readers,
name = QName}) ->
maps:foreach(fun (CTag, #stream{log = Log}) ->
close_log(Log),
rabbit_core_metrics:consumer_deleted(self(), CTag, QName),
rabbit_event:notify(consumer_deleted,
[{consumer_tag, CTag},
{channel, self()},
{queue, QName}])
rabbit_core_metrics:consumer_deleted(self(), CTag, QName)
end, Readers).

update(Q, State)
Expand Down
14 changes: 9 additions & 5 deletions deps/rabbitmq_stream/src/rabbit_stream_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-export([init/0]).
-export([consumer_created/9,
consumer_updated/9,
consumer_cancelled/3]).
consumer_cancelled/4]).
-export([publisher_created/4,
publisher_updated/7,
publisher_deleted/3]).
Expand Down Expand Up @@ -104,16 +104,20 @@ consumer_updated(Connection,

ok.

consumer_cancelled(Connection, StreamResource, SubscriptionId) ->
consumer_cancelled(Connection, StreamResource, SubscriptionId, Notify) ->
ets:delete(?TABLE_CONSUMER,
{StreamResource, Connection, SubscriptionId}),
rabbit_global_counters:consumer_deleted(stream),
rabbit_core_metrics:consumer_deleted(Connection,
consumer_tag(SubscriptionId),
StreamResource),
rabbit_event:notify(consumer_deleted,
[{consumer_tag, consumer_tag(SubscriptionId)},
{channel, self()}, {queue, StreamResource}]),
case Notify of
true ->
rabbit_event:notify(consumer_deleted,
[{consumer_tag, consumer_tag(SubscriptionId)},
{channel, self()}, {queue, StreamResource}]);
_ -> ok
end,
ok.

publisher_created(Connection,
Expand Down
21 changes: 13 additions & 8 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
%% Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
%% Copyright (c) 2020-2025 Broadcom. All Rights Reserved.
%% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

Expand Down Expand Up @@ -2249,7 +2249,7 @@ handle_frame_post_auth(Transport,
{Connection, State};
true ->
{Connection1, State1} =
remove_subscription(SubscriptionId, Connection, State),
remove_subscription(SubscriptionId, Connection, State, true),
response_ok(Transport, Connection, unsubscribe, CorrelationId),
{Connection1, State1}
end;
Expand Down Expand Up @@ -3081,7 +3081,7 @@ evaluate_state_after_secret_update(Transport,
_ ->
{C1, S1} =
lists:foldl(fun(SubId, {Conn, St}) ->
remove_subscription(SubId, Conn, St)
remove_subscription(SubId, Conn, St, false)
end, {C0, S0}, Subs),
{Acc#{Str => ok}, C1, S1}
end
Expand Down Expand Up @@ -3216,7 +3216,8 @@ notify_connection_closed(#statem_data{connection =
ConnectionState}) ->
rabbit_core_metrics:connection_closed(self()),
[rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(S, Connection), SubId)
stream_r(S, Connection),
SubId, false)
|| #consumer{configuration =
#consumer_configuration{stream = S,
subscription_id = SubId}}
Expand Down Expand Up @@ -3304,7 +3305,8 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(Stream,
C0),
SubId),
SubId,
false),
maybe_unregister_consumer(
VirtualHost, Consumer,
single_active_consumer(Consumer),
Expand All @@ -3314,7 +3316,8 @@ clean_state_after_stream_deletion_or_failure(MemberPid, Stream,
rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(Stream,
C0),
SubId),
SubId,
false),
maybe_unregister_consumer(
VirtualHost, Consumer,
single_active_consumer(Consumer),
Expand Down Expand Up @@ -3431,7 +3434,8 @@ remove_subscription(SubscriptionId,
stream_subscriptions =
StreamSubscriptions} =
Connection,
#stream_connection_state{consumers = Consumers} = State) ->
#stream_connection_state{consumers = Consumers} = State,
Notify) ->
#{SubscriptionId := Consumer} = Consumers,
#consumer{log = Log,
configuration = #consumer_configuration{stream = Stream, member_pid = MemberPid}} =
Expand All @@ -3457,7 +3461,8 @@ remove_subscription(SubscriptionId,
Connection2 = maybe_clean_connection_from_stream(MemberPid, Stream, Connection1),
rabbit_stream_metrics:consumer_cancelled(self(),
stream_r(Stream, Connection2),
SubscriptionId),
SubscriptionId,
Notify),

Requests1 = maybe_unregister_consumer(
VirtualHost, Consumer,
Expand Down

0 comments on commit 69d0382

Please # to comment.