Skip to content

Commit

Permalink
Implement foreach_transient()
Browse files Browse the repository at this point in the history
  • Loading branch information
dumbbell committed Sep 29, 2023
1 parent 05f3bb0 commit 4bba307
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
13 changes: 2 additions & 11 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,8 @@
set_ram_duration_target/2, set_maximum_since_use/2,
emit_consumers_local/3, internal_delete/3]).

-include_lib("stdlib/include/assert.hrl").
-include_lib("stdlib/include/qlc.hrl").

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
-include("amqqueue.hrl").

-define(INTEGER_ARG_TYPES, [byte, short, signedint, long,
Expand Down Expand Up @@ -1935,17 +1933,10 @@ has_synchronised_mirrors_online(Q) ->
-spec on_node_up(node()) -> 'ok'.

on_node_up(Node) ->
case rabbit_mirror_queue_misc:are_cmqs_permitted() of
true ->
rabbit_db_queue:foreach_transient(
maybe_clear_recoverable_node(Node));
false ->
ok
end.
rabbit_db_queue:foreach_transient(maybe_clear_recoverable_node(Node)).

maybe_clear_recoverable_node(Node) ->
fun(Q) ->
?assertNot(rabbit_khepri:is_enabled()),
SPids = amqqueue:get_sync_slave_pids(Q),
RSs = amqqueue:get_recoverable_slaves(Q),
case lists:member(Node, RSs) of
Expand Down
22 changes: 20 additions & 2 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1036,8 +1036,26 @@ foreach_transient_in_mnesia(UpdateFun) ->
ok
end).

foreach_transient_in_khepri(_UpdateFun) ->
throw(not_implemented).
foreach_transient_in_khepri(UpdateFun) ->
PathPattern = khepri_queues_path() ++
[?KHEPRI_WILDCARD_STAR,
#if_data_matches{
pattern = amqqueue:pattern_match_on_durable(false)}],
%% The `UpdateFun' might try to determine if the queue's process is alive.
%% This can cause a `calling_self' exception if we use the `FilterFun'
%% within the function passed to `khepri:fold/5' since the Khepri server
%% process might call itself. Instead we can fetch all of the transient
%% queues with `get_many' and then filter and fold the results outside of
%% Khepri's Ra server process.
case rabbit_khepri:get_many(PathPattern) of
{ok, Qs} ->
maps:foreach(
fun(_Path, Queue) when ?is_amqqueue(Queue) ->
UpdateFun(Queue)
end, [], Qs);
{error, _} = Error ->
Error
end.

%% -------------------------------------------------------------------
%% foreach_durable().
Expand Down

0 comments on commit 4bba307

Please # to comment.