From 5dd9bb21e9af761f5e9385e2cab079562c2a0f10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 8 Nov 2024 09:26:24 +0100 Subject: [PATCH] Use infinity timout for RA local query in stream coordinator The 5-second default timeout is too short. (cherry picked from commit 1634adbff37bbf17087f40b9b4707c49edfe66ae) --- deps/rabbit/src/rabbit_stream_coordinator.erl | 12 ++++++++---- deps/rabbit/src/rabbit_stream_sac_coordinator.erl | 10 ++++------ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 6eac47fc781e..1a994e01d819 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -55,7 +55,8 @@ -export([query_local_pid/3, query_writer_pid/2, query_members/2, - query_stream_overview/2]). + query_stream_overview/2, + ra_local_query/1]). -export([log_overview/1, @@ -271,7 +272,7 @@ sac_state(#?MODULE{single_active_consumer = SacState}) -> %% for debugging state() -> - case ra:local_query({?MODULE, node()}, fun(State) -> State end) of + case ra_local_query(fun(State) -> State end) of {ok, {_, Res}, _} -> Res; Any -> @@ -289,7 +290,7 @@ local_pid(StreamId) when is_list(StreamId) -> query_pid(StreamId, MFA). query_pid(StreamId, MFA) when is_list(StreamId) -> - case ra:local_query({?MODULE, node()}, MFA) of + case ra_local_query(MFA) of {ok, {_, {ok, Pid}}, _} -> case erpc:call(node(Pid), erlang, is_process_alive, [Pid]) of true -> @@ -380,7 +381,7 @@ query_writer_pid(StreamId, #?MODULE{streams = Streams}) -> end. do_query(MFA) -> - case ra:local_query({?MODULE, node()}, MFA) of + case ra_local_query(MFA) of {ok, {_, {ok, _} = Result}, _} -> Result; {ok, {_, {error, not_found}}, _} -> @@ -2337,3 +2338,6 @@ key_metrics_rpc(ServerId) -> maps_to_list(M) -> lists:sort(maps:to_list(M)). + +ra_local_query(QueryFun) -> + ra:local_query({?MODULE, node()}, QueryFun, infinity). diff --git a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl index 9e46085ed9d1..cb0510498566 100644 --- a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl @@ -41,6 +41,8 @@ group_consumers/5, overview/1]). +-import(rabbit_stream_coordinator, [ra_local_query/1]). + %% Single Active Consumer API -spec register_consumer(binary(), binary(), @@ -129,9 +131,7 @@ process_command(Cmd) -> {ok, [term()] | {error, atom()}}. consumer_groups(VirtualHost, InfoKeys) -> - case ra:local_query({rabbit_stream_coordinator, - node()}, - fun(State) -> + case ra_local_query(fun(State) -> SacState = rabbit_stream_coordinator:sac_state(State), consumer_groups(VirtualHost, @@ -152,9 +152,7 @@ consumer_groups(VirtualHost, InfoKeys) -> {ok, [term()]} | {error, atom()}. group_consumers(VirtualHost, Stream, Reference, InfoKeys) -> - case ra:local_query({rabbit_stream_coordinator, - node()}, - fun(State) -> + case ra_local_query(fun(State) -> SacState = rabbit_stream_coordinator:sac_state(State), group_consumers(VirtualHost,