Skip to content

Commit

Permalink
rabbit_db_queue: Transactionally delete transient queues from Khepri
Browse files Browse the repository at this point in the history
The prior code skirted transactions because the filter function might
cause Khepri to call itself. We want to use the same idea as the old
code - get all queues, filter them, then delete them - but we want to
perform the deletion in a transaction and fail the transaction if any
queues changed since we read them.

This fixes a bug - that the call to `delete_in_khepri/2` could return
an error tuple that would be improperly recognized as `Deletions` -
but should also make deleting transient queues atomic and fast.
Each call to `delete_in_khepri/2` needed to wait on Ra to replicate
because the deletion is an individual command sent from one process.
Performing all deletions at once means we only need to wait for one
command to be replicated across the cluster.

We also bubble up any errors to delete now rather than storing them as
deletions. This fixes a crash that occurs on node down when Khepri is
in a minority.

(cherry picked from commit 0dd26f0)
  • Loading branch information
the-mikedavis authored and mergify[bot] committed Aug 13, 2024
1 parent 950f555 commit 0f90906
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 25 deletions.
79 changes: 55 additions & 24 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,8 @@ set_many_in_khepri(Qs) ->
Queue :: amqqueue:amqqueue(),
FilterFun :: fun((Queue) -> boolean()),
QName :: rabbit_amqqueue:name(),
Ret :: {[QName], [Deletions :: rabbit_binding:deletions()]}.
Ret :: {[QName], [Deletions :: rabbit_binding:deletions()]}
| rabbit_khepri:timeout_error().
%% @doc Deletes all transient queues that match `FilterFun'.
%%
%% @private
Expand Down Expand Up @@ -1073,26 +1074,59 @@ delete_transient_in_khepri(FilterFun) ->
%% process might call itself. Instead we can fetch all of the transient
%% queues with `get_many' and then filter and fold the results outside of
%% Khepri's Ra server process.
case rabbit_khepri:get_many(PathPattern) of
{ok, Qs} ->
Items = maps:fold(
fun(Path, Queue, Acc) when ?is_amqqueue(Queue) ->
case FilterFun(Queue) of
true ->
QueueName = khepri_queue_path_to_name(
Path),
case delete_in_khepri(QueueName, false) of
ok ->
Acc;
Deletions ->
[{QueueName, Deletions} | Acc]
end;
false ->
Acc
end
end, [], Qs),
{QueueNames, Deletions} = lists:unzip(Items),
{QueueNames, lists:flatten(Deletions)};
case rabbit_khepri:adv_get_many(PathPattern) of
{ok, Props} ->
Qs = maps:fold(
fun(Path0, #{data := Q, payload_version := Vsn}, Acc)
when ?is_amqqueue(Q) ->
case FilterFun(Q) of
true ->
Path = khepri_path:combine_with_conditions(
Path0,
[#if_payload_version{version = Vsn}]),
QName = amqqueue:get_name(Q),
[{Path, QName} | Acc];
false ->
Acc
end
end, [], Props),
do_delete_transient_queues_in_khepri(Qs, FilterFun);
{error, _} = Error ->
Error
end.

do_delete_transient_queues_in_khepri([], _FilterFun) ->
%% If there are no changes to make, avoid performing a transaction. When
%% Khepri is in a minority this avoids a long timeout waiting for the
%% transaction command to be processed. Otherwise it avoids appending a
%% somewhat large transaction command to Khepri's log.
{[], []};
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
Res = rabbit_khepri:transaction(
fun() ->
rabbit_misc:fold_while_ok(
fun({Path, QName}, Acc) ->
%% Also see `delete_in_khepri/2'.
case khepri_tx_adv:delete(Path) of
{ok, #{data := _}} ->
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
QName, false),
{ok, [{QName, Deletions} | Acc]};
{ok, _} ->
{ok, Acc};
{error, _} = Error ->
Error
end
end, [], Qs)
end),
case Res of
{ok, Items} ->
{QNames, Deletions} = lists:unzip(Items),
{QNames, lists:flatten(Deletions)};
{error, {khepri, mismatching_node, _}} ->
%% One of the queues changed while attempting to update all
%% queues. Retry the operation.
delete_transient_in_khepri(FilterFun);
{error, _} = Error ->
Error
end.
Expand Down Expand Up @@ -1366,6 +1400,3 @@ khepri_queues_path() ->

khepri_queue_path(#resource{virtual_host = VHost, name = Name}) ->
[?MODULE, queues, VHost, Name].

khepri_queue_path_to_name([?MODULE, queues, VHost, Name]) ->
rabbit_misc:r(VHost, queue, Name).
23 changes: 22 additions & 1 deletion deps/rabbit_common/src/rabbit_misc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
maps_put_falsy/3
]).
-export([remote_sup_child/2]).
-export([for_each_while_ok/2]).
-export([for_each_while_ok/2, fold_while_ok/3]).

%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
Expand Down Expand Up @@ -1655,3 +1655,24 @@ for_each_while_ok(Fun, [Elem | Rest]) ->
end;
for_each_while_ok(_, []) ->
ok.

-spec fold_while_ok(FoldFun, Acc, List) -> Ret when
FoldFun :: fun((Element, Acc) -> {ok, Acc} | {error, ErrReason}),
Element :: any(),
List :: Element,
Ret :: {ok, Acc} | {error, ErrReason}.
%% @doc Calls the given `FoldFun' on each element of the given `List' and the
%% accumulator value, short-circuiting if the function returns `{error,_}'.
%%
%% @returns the first `{error,_}' returned by `FoldFun' or `{ok,Acc}' if
%% `FoldFun' never returns an error tuple.

fold_while_ok(Fun, Acc0, [Elem | Rest]) ->
case Fun(Elem, Acc0) of
{ok, Acc} ->
fold_while_ok(Fun, Acc, Rest);
{error, _} = Error ->
Error
end;
fold_while_ok(_Fun, Acc, []) ->
{ok, Acc}.

0 comments on commit 0f90906

Please # to comment.