diff --git a/deps/rabbit/src/rabbit_feature_flags.erl b/deps/rabbit/src/rabbit_feature_flags.erl index c81f7573ed6e..9ccd299080d0 100644 --- a/deps/rabbit/src/rabbit_feature_flags.erl +++ b/deps/rabbit/src/rabbit_feature_flags.erl @@ -126,9 +126,6 @@ get_overriden_running_nodes/0]). -endif. -%% Default timeout for operations on remote nodes. --define(TIMEOUT, 60000). - -type feature_flag_modattr() :: {feature_name(), feature_props()}. %% The value of a `-rabbitmq_feature_flag()' module attribute used to diff --git a/deps/rabbit/src/rabbit_ff_controller.erl b/deps/rabbit/src/rabbit_ff_controller.erl index 1c8ff2a3055e..eaebd4448aea 100644 --- a/deps/rabbit/src/rabbit_ff_controller.erl +++ b/deps/rabbit/src/rabbit_ff_controller.erl @@ -1186,9 +1186,32 @@ this_node_first(Nodes) -> Ret :: term() | {error, term()}. rpc_call(Node, Module, Function, Args, Timeout) -> + SleepBetweenRetries = 5000, + T0 = erlang:monotonic_time(), try erpc:call(Node, Module, Function, Args, Timeout) catch + %% In case of `noconnection' with `Timeout'=infinity, we don't retry + %% at all. This is because the infinity "timeout" is used to run + %% callbacks on remote node and they can last an indefinite amount of + %% time, for instance, if there is a lot of data to migrate. + error:{erpc, noconnection} = Reason + when is_integer(Timeout) andalso Timeout > SleepBetweenRetries -> + ?LOG_ERROR( + "Feature flags: no connection to node `~ts`; " + "retrying in ~b milliseconds", + [Node, SleepBetweenRetries], + #{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}), + timer:sleep(SleepBetweenRetries), + T1 = erlang:monotonic_time(), + TDiff = erlang:convert_time_unit(T1 - T0, native, millisecond), + Remaining = Timeout - TDiff, + Timeout1 = erlang:max(Remaining, 0), + case Timeout1 of + 0 -> {error, Reason}; + _ -> rpc_call(Node, Module, Function, Args, Timeout1) + end; + Class:Reason:Stacktrace -> Message0 = erl_error:format_exception(Class, Reason, Stacktrace), Message1 = lists:flatten(Message0), diff --git a/deps/rabbit/test/feature_flags_v2_SUITE.erl b/deps/rabbit/test/feature_flags_v2_SUITE.erl index 25788c58edf2..a9c019fe9ab0 100644 --- a/deps/rabbit/test/feature_flags_v2_SUITE.erl +++ b/deps/rabbit/test/feature_flags_v2_SUITE.erl @@ -28,6 +28,7 @@ mf_wait_and_count_runs_v2_post_enable/1, mf_crash_on_joining_node/1, + rpc_calls/1, enable_unknown_feature_flag_on_a_single_node/1, enable_supported_feature_flag_on_a_single_node/1, enable_unknown_feature_flag_in_a_3node_cluster/1, @@ -36,6 +37,7 @@ enable_unsupported_feature_flag_in_a_3node_cluster/1, enable_feature_flag_in_cluster_and_add_member_after/1, enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2/1, + enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2/0, enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2/1, enable_feature_flag_with_post_enable/1, have_required_feature_flag_in_cluster_and_add_member_with_it_disabled/1, @@ -54,6 +56,10 @@ all() -> groups() -> Groups = [ + {direct, [parallel], + [ + rpc_calls + ]}, {cluster_size_1, [parallel], [ enable_unknown_feature_flag_on_a_single_node, @@ -94,6 +100,8 @@ end_per_suite(Config) -> init_per_group(feature_flags_v2, Config) -> rabbit_ct_helpers:set_config(Config, {enable_feature_flags_v2, true}); +init_per_group(direct, Config) -> + Config; init_per_group(cluster_size_1, Config) -> rabbit_ct_helpers:set_config(Config, {nodes_count, 1}); init_per_group(cluster_size_3, Config) -> @@ -104,11 +112,15 @@ init_per_group(_Group, Config) -> end_per_group(_Group, Config) -> Config. +init_per_testcase(rpc_calls, Config) -> + Config; init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:run_steps( Config, [fun(Cfg) -> start_slave_nodes(Cfg, Testcase) end]). +end_per_testcase(rpc_calls, Config) -> + Config; end_per_testcase(_Testcase, Config) -> rabbit_ct_helpers:run_steps( Config, @@ -328,6 +340,16 @@ get_peer_proc() -> %% Testcases. %% ------------------------------------------------------------------- +rpc_calls(_Config) -> + List = [1, 2, 3], + ?assertEqual( + lists:sum(List), + rabbit_ff_controller:rpc_call(node(), lists, sum, [List], 11000)), + ?assertEqual( + {error, {erpc, noconnection}}, + rabbit_ff_controller:rpc_call( + nonode@non_existing_host, lists, sum, [List], 11000)). + enable_unknown_feature_flag_on_a_single_node(Config) -> [Node] = ?config(nodes, Config), ok = run_on_node( @@ -865,6 +887,9 @@ enable_feature_flag_in_cluster_and_add_member_concurrently_mfv2(Config) -> || Node <- AllNodes], ok. +enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2() -> + [{timetrap, {minutes, 3}}]. + enable_feature_flag_in_cluster_and_remove_member_concurrently_mfv2(Config) -> AllNodes = [LeavingNode | [FirstNode | _] = Nodes] = ?config( nodes, Config),