Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Handle transient queue deletion in Khepri minority (backport #11979) (backport #11990) #11991

Merged
merged 4 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
-export([queue/1, queue_names/1]).

-export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]).
-export([delete_transient_queues_on_node/1]).

%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
Expand Down Expand Up @@ -2055,13 +2056,39 @@ maybe_clear_recoverable_node(Node) ->
-spec on_node_down(node()) -> 'ok'.

on_node_down(Node) ->
case delete_transient_queues_on_node(Node) of
ok ->
ok;
{error, timeout} ->
%% This case is possible when running Khepri. The node going down
%% could leave the cluster in a minority so the command to delete
%% the transient queue records would fail. Also see
%% `rabbit_khepri:init/0': we also try this deletion when the node
%% restarts - a time that the cluster is very likely to have a
%% majority - to ensure these records are deleted.
rabbit_log:warning("transient queues for node '~ts' could not be "
"deleted because of a timeout. These queues "
"will be removed when node '~ts' restarts or "
"is removed from the cluster.", [Node, Node]),
ok
end.

-spec delete_transient_queues_on_node(Node) -> Ret when
Node :: node(),
Ret :: ok | rabbit_khepri:timeout_error().

delete_transient_queues_on_node(Node) ->
{Time, Ret} = timer:tc(fun() -> rabbit_db_queue:delete_transient(filter_transient_queues_to_delete(Node)) end),
case Ret of
ok -> ok;
{QueueNames, Deletions} ->
ok ->
ok;
{error, timeout} = Err ->
Err;
{QueueNames, Deletions} when is_list(QueueNames) ->
case length(QueueNames) of
0 -> ok;
N -> rabbit_log:info("~b transient queues from an old incarnation of node ~tp deleted in ~fs",
N -> rabbit_log:info("~b transient queues from node '~ts' "
"deleted in ~fs",
[N, Node, Time / 1_000_000])
end,
notify_queue_binding_deletions(Deletions),
Expand Down
13 changes: 4 additions & 9 deletions deps/rabbit/src/rabbit_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,10 @@ init_using_mnesia() ->
rabbit_sup:start_child(mnesia_sync).

init_using_khepri() ->
case rabbit_khepri:members() of
[] ->
timer:sleep(1000),
init_using_khepri();
Members ->
?LOG_WARNING(
"Found the following metadata store members: ~p", [Members],
#{domain => ?RMQLOG_DOMAIN_DB})
end.
?LOG_DEBUG(
"DB: initialize Khepri",
#{domain => ?RMQLOG_DOMAIN_DB}),
rabbit_khepri:init().

init_finished() ->
%% Used during initialisation by rabbit_logger_exchange_h.erl
Expand Down
79 changes: 55 additions & 24 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,8 @@ set_many_in_khepri(Qs) ->
Queue :: amqqueue:amqqueue(),
FilterFun :: fun((Queue) -> boolean()),
QName :: rabbit_amqqueue:name(),
Ret :: {[QName], [Deletions :: rabbit_binding:deletions()]}.
Ret :: {[QName], [Deletions :: rabbit_binding:deletions()]}
| rabbit_khepri:timeout_error().
%% @doc Deletes all transient queues that match `FilterFun'.
%%
%% @private
Expand Down Expand Up @@ -1088,26 +1089,59 @@ delete_transient_in_khepri(FilterFun) ->
%% process might call itself. Instead we can fetch all of the transient
%% queues with `get_many' and then filter and fold the results outside of
%% Khepri's Ra server process.
case rabbit_khepri:get_many(PathPattern) of
{ok, Qs} ->
Items = maps:fold(
fun(Path, Queue, Acc) when ?is_amqqueue(Queue) ->
case FilterFun(Queue) of
true ->
QueueName = khepri_queue_path_to_name(
Path),
case delete_in_khepri(QueueName, false) of
ok ->
Acc;
Deletions ->
[{QueueName, Deletions} | Acc]
end;
false ->
Acc
end
end, [], Qs),
{QueueNames, Deletions} = lists:unzip(Items),
{QueueNames, lists:flatten(Deletions)};
case rabbit_khepri:adv_get_many(PathPattern) of
{ok, Props} ->
Qs = maps:fold(
fun(Path0, #{data := Q, payload_version := Vsn}, Acc)
when ?is_amqqueue(Q) ->
case FilterFun(Q) of
true ->
Path = khepri_path:combine_with_conditions(
Path0,
[#if_payload_version{version = Vsn}]),
QName = amqqueue:get_name(Q),
[{Path, QName} | Acc];
false ->
Acc
end
end, [], Props),
do_delete_transient_queues_in_khepri(Qs, FilterFun);
{error, _} = Error ->
Error
end.

do_delete_transient_queues_in_khepri([], _FilterFun) ->
%% If there are no changes to make, avoid performing a transaction. When
%% Khepri is in a minority this avoids a long timeout waiting for the
%% transaction command to be processed. Otherwise it avoids appending a
%% somewhat large transaction command to Khepri's log.
{[], []};
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
Res = rabbit_khepri:transaction(
fun() ->
rabbit_misc:fold_while_ok(
fun({Path, QName}, Acc) ->
%% Also see `delete_in_khepri/2'.
case khepri_tx_adv:delete(Path) of
{ok, #{data := _}} ->
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
QName, false),
{ok, [{QName, Deletions} | Acc]};
{ok, _} ->
{ok, Acc};
{error, _} = Error ->
Error
end
end, [], Qs)
end),
case Res of
{ok, Items} ->
{QNames, Deletions} = lists:unzip(Items),
{QNames, lists:flatten(Deletions)};
{error, {khepri, mismatching_node, _}} ->
%% One of the queues changed while attempting to update all
%% queues. Retry the operation.
delete_transient_in_khepri(FilterFun);
{error, _} = Error ->
Error
end.
Expand Down Expand Up @@ -1382,6 +1416,3 @@ khepri_queues_path() ->

khepri_queue_path(#resource{virtual_host = VHost, name = Name}) ->
[?MODULE, queues, VHost, Name].

khepri_queue_path_to_name([?MODULE, queues, VHost, Name]) ->
rabbit_misc:r(VHost, queue, Name).
25 changes: 25 additions & 0 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@

-export([setup/0,
setup/1,
init/0,
can_join_cluster/1,
add_member/2,
remove_member/1,
Expand Down Expand Up @@ -323,6 +324,30 @@ wait_for_register_projections(Timeout, Retries) ->

%% @private

-spec init() -> Ret when
Ret :: ok | timeout_error().

init() ->
case members() of
[] ->
timer:sleep(1000),
init();
Members ->
?LOG_NOTICE(
"Found the following metadata store members: ~p", [Members],
#{domain => ?RMQLOG_DOMAIN_DB}),
%% Delete transient queues on init.
%% Note that we also do this in the
%% `rabbit_amqqueue:on_node_down/1' callback. We must try this
%% deletion during init because the cluster may have been in a
%% minority when this node went down. We wait for a majority while
%% booting (via `rabbit_khepri:setup/0') though so this deletion is
%% likely to succeed.
rabbit_amqqueue:delete_transient_queues_on_node(node())
end.

%% @private

can_join_cluster(DiscoveryNode) when is_atom(DiscoveryNode) ->
ThisNode = node(),
try
Expand Down
44 changes: 44 additions & 0 deletions deps/rabbit_common/src/rabbit_misc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
maps_put_falsy/3
]).
-export([remote_sup_child/2]).
-export([for_each_while_ok/2, fold_while_ok/3]).

%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
Expand Down Expand Up @@ -1621,3 +1622,46 @@ remote_sup_child(Node, Sup) ->
[] -> {error, no_child};
{badrpc, {'EXIT', {noproc, _}}} -> {error, no_sup}
end.

-spec for_each_while_ok(ForEachFun, List) -> Ret when
ForEachFun :: fun((Element) -> ok | {error, ErrReason}),
ErrReason :: any(),
Element :: any(),
List :: [Element],
Ret :: ok | {error, ErrReason}.
%% @doc Calls the given `ForEachFun' for each element in the given `List',
%% short-circuiting if the function returns `{error,_}'.
%%
%% @returns the first `{error,_}' returned by `ForEachFun' or `ok' if
%% `ForEachFun' never returns an error tuple.

for_each_while_ok(Fun, [Elem | Rest]) ->
case Fun(Elem) of
ok ->
for_each_while_ok(Fun, Rest);
{error, _} = Error ->
Error
end;
for_each_while_ok(_, []) ->
ok.

-spec fold_while_ok(FoldFun, Acc, List) -> Ret when
FoldFun :: fun((Element, Acc) -> {ok, Acc} | {error, ErrReason}),
Element :: any(),
List :: Element,
Ret :: {ok, Acc} | {error, ErrReason}.
%% @doc Calls the given `FoldFun' on each element of the given `List' and the
%% accumulator value, short-circuiting if the function returns `{error,_}'.
%%
%% @returns the first `{error,_}' returned by `FoldFun' or `{ok,Acc}' if
%% `FoldFun' never returns an error tuple.

fold_while_ok(Fun, Acc0, [Elem | Rest]) ->
case Fun(Elem, Acc0) of
{ok, Acc} ->
fold_while_ok(Fun, Acc, Rest);
{error, _} = Error ->
Error
end;
fold_while_ok(_Fun, Acc, []) ->
{ok, Acc}.
Loading