From c706b967de7c20c3596f75b0eba962a99cd550fb Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 29 May 2023 12:07:21 +0100 Subject: [PATCH 1/9] WIP: super stream exchange type --- deps/rabbitmq_stream/Makefile | 3 +- .../src/rabbit_exchange_type_super_stream.erl | 86 +++++++++++++++++++ 2 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl diff --git a/deps/rabbitmq_stream/Makefile b/deps/rabbitmq_stream/Makefile index e63a7ab94733..7d0312b62d85 100644 --- a/deps/rabbitmq_stream/Makefile +++ b/deps/rabbitmq_stream/Makefile @@ -22,9 +22,10 @@ endef LOCAL_DEPS = ssl -DEPS = rabbit rabbitmq_stream_common osiris ranch +DEPS = rabbit rabbitmq_stream_common osiris ranch murmerl3 TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client +dep_murmerl3 = git https://github.com/rabbitmq/murmerl3 master DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk diff --git a/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl b/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl new file mode 100644 index 000000000000..bb21c7c6a075 --- /dev/null +++ b/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl @@ -0,0 +1,86 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2023 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_exchange_type_super_stream). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, + serialise_events/0, + route/2, + info/1, + info/2]). +-export([validate/1, + validate_binding/2, + create/2, + delete/2, + policy_changed/2, + add_binding/3, + remove_bindings/3, + assert_args_equivalence/2]). + +-rabbit_boot_step( + {rabbit_exchange_type_super_stream_registry, + [{description, "exchange type x-super-stream: registry"}, + {mfa, {rabbit_registry, register, + [exchange, <<"x-super-stream">>, ?MODULE]}}, + {cleanup, {rabbit_registry, unregister, + [exchange, <<"x-super-stream">>]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +-define(MNESIA_TABLE, rabbit_route). +-define(SEED, 104729). + +description() -> + [{description, <<"Super stream exchange type using murmur3 hashing">>}]. + +serialise_events() -> false. + +route(#exchange{name = Name}, + #delivery{message = #basic_message{routing_keys = [RKey | _]}}) -> + %% get all bindings for the exchange and use murmur3 to generate + %% the binding key to match on + MatchHead = #route{binding = #binding{source = Name, _ = '_'}}, + Routes = ets:select(?MNESIA_TABLE, [{MatchHead, [], [['$_']]}]), + N = integer_to_binary(hash_mod(RKey, length(Routes))), + case lists:search( + fun(#route{binding = #binding{key = Key}}) -> + Key =:= N + end, Routes) of + {value, #route{binding = #binding{destination = Dest}}} -> + [Dest]; + false -> + [] + end. + +info(_) -> []. +info(_, _) -> []. +validate(_X) -> ok. + +validate_binding(_X, #binding{key = K}) -> + try + %% just check the Key is an integer + _ = binary_to_integer(K), + ok + catch error:badarg -> + {error, + {binding_invalid, "The binding key must be an integer: ~tp", [K]}} + end. + +create(_Serial, _X) -> ok. +delete(_Serial, _X) -> ok. +policy_changed(_X1, _X2) -> ok. +add_binding(_Serial, _X, _B) -> ok. +remove_bindings(_Serial, _X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). + +hash_mod([RKey | _], N) -> + murmerl3:hash_32(RKey, ?SEED) rem N. From d8c6fd5beb14f2b963d409dc7b1a002041ff7c4c Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 6 Jun 2023 17:28:36 +0100 Subject: [PATCH 2/9] wip --- ...CLI.Ctl.Commands.AddSuperStreamCommand.erl | 79 ++++---- .../src/rabbit_exchange_type_super_stream.erl | 47 ++--- .../src/rabbit_stream_manager.erl | 179 +++++++++++++----- .../src/rabbit_stream_utils.erl | 14 ++ .../test/rabbit_stream_manager_SUITE.erl | 104 ++++++---- 5 files changed, 274 insertions(+), 149 deletions(-) diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl index 92a80458f64c..20c6b6a08118 100644 --- a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl @@ -41,6 +41,7 @@ description() -> switches() -> [{partitions, integer}, {routing_keys, string}, + {exchange_type, string}, {max_length_bytes, string}, {max_age, string}, {stream_max_segment_size_bytes, string}, @@ -55,6 +56,9 @@ validate([], _Opts) -> validate([_Name], #{partitions := _, routing_keys := _}) -> {validation_failure, "Specify --partitions or routing-keys, not both."}; +validate([_Name], #{exchange_type := <<"x-super-stream">>, routing_keys := _}) -> + {validation_failure, + "Exchange type x-super-stream cannot be used with routing-keys."}; validate([_Name], #{partitions := Partitions}) when Partitions < 1 -> {validation_failure, "The partition number must be greater than 0"}; validate([_Name], Opts) -> @@ -125,6 +129,17 @@ validate_stream_arguments(#{initial_cluster_size := Value} = Opts) -> "Invalid value for --initial-cluster-size, the " "value must be a positive integer."} end; +validate_stream_arguments(#{exchange_type := Type} = Opts) -> + case Type of + <<"direct">> -> + validate_stream_arguments(maps:remove(exchange_type, Opts)); + <<"x-super-stream">> -> + validate_stream_arguments(maps:remove(exchange_type, Opts)); + _ -> + {validation_failure, + "Invalid value for --exchange_type, must be one of:" + "'direct' or 'x-super-stream'"} + end; validate_stream_arguments(_) -> ok. @@ -162,48 +177,30 @@ usage_doc_guides() -> run([SuperStream], #{node := NodeName, - vhost := VHost, timeout := Timeout, partitions := Partitions} = Opts) -> - Streams = - [list_to_binary(binary_to_list(SuperStream) - ++ "-" - ++ integer_to_list(K)) - || K <- lists:seq(0, Partitions - 1)], - RoutingKeys = - [integer_to_binary(K) || K <- lists:seq(0, Partitions - 1)], - create_super_stream(NodeName, - Timeout, - VHost, - SuperStream, - Streams, - stream_arguments(Opts), - RoutingKeys); + Spec0 = maps:with([vhost, + exchange_type], Opts), + Spec = Spec0#{username => cli_acting_user(), + name => SuperStream, + partitions_source => {partition_count, Partitions}, + arguments => stream_arguments(Opts)}, + create_super_stream(NodeName, Timeout, Spec); run([SuperStream], #{node := NodeName, - vhost := VHost, timeout := Timeout, routing_keys := RoutingKeysStr} = Opts) -> + Spec0 = maps:with([vhost, + exchange_type], Opts), RoutingKeys = - [rabbit_data_coercion:to_binary( - string:strip(K)) - || K - <- string:tokens( - rabbit_data_coercion:to_list(RoutingKeysStr), ",")], - Streams = - [list_to_binary(binary_to_list(SuperStream) - ++ "-" - ++ binary_to_list(K)) - || K <- RoutingKeys], - create_super_stream(NodeName, - Timeout, - VHost, - SuperStream, - Streams, - stream_arguments(Opts), - RoutingKeys). + [K || K <- string:lexemes(RoutingKeysStr, ", ")], + Spec = Spec0#{username => cli_acting_user(), + name => SuperStream, + partitions_source => {routing_keys, RoutingKeys}, + arguments => stream_arguments(Opts)}, + create_super_stream(NodeName, Timeout, Spec). stream_arguments(Opts) -> stream_arguments(#{}, Opts). @@ -250,26 +247,16 @@ duration_to_seconds([{sign, _}, create_super_stream(NodeName, Timeout, - VHost, - SuperStream, - Streams, - Arguments, - RoutingKeys) -> + Spec) -> case rabbit_misc:rpc_call(NodeName, rabbit_stream_manager, create_super_stream, - [VHost, - SuperStream, - Streams, - Arguments, - RoutingKeys, - cli_acting_user()], - Timeout) + [Spec], Timeout) of ok -> {ok, rabbit_misc:format("Super stream ~ts has been created", - [SuperStream])}; + [maps:get(name, Spec)])}; Error -> Error end. diff --git a/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl b/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl index bb21c7c6a075..97f45aa81b5e 100644 --- a/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl @@ -25,17 +25,15 @@ remove_bindings/3, assert_args_equivalence/2]). --rabbit_boot_step( - {rabbit_exchange_type_super_stream_registry, - [{description, "exchange type x-super-stream: registry"}, - {mfa, {rabbit_registry, register, - [exchange, <<"x-super-stream">>, ?MODULE]}}, - {cleanup, {rabbit_registry, unregister, - [exchange, <<"x-super-stream">>]}}, - {requires, rabbit_registry}, - {enables, kernel_ready}]}). +-rabbit_boot_step({rabbit_exchange_type_super_stream_registry, + [{description, "exchange type x-super-stream: registry"}, + {mfa, {rabbit_registry, register, + [exchange, <<"x-super-stream">>, ?MODULE]}}, + {cleanup, {rabbit_registry, unregister, + [exchange, <<"x-super-stream">>]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). --define(MNESIA_TABLE, rabbit_route). -define(SEED, 104729). description() -> @@ -47,17 +45,22 @@ route(#exchange{name = Name}, #delivery{message = #basic_message{routing_keys = [RKey | _]}}) -> %% get all bindings for the exchange and use murmur3 to generate %% the binding key to match on - MatchHead = #route{binding = #binding{source = Name, _ = '_'}}, - Routes = ets:select(?MNESIA_TABLE, [{MatchHead, [], [['$_']]}]), - N = integer_to_binary(hash_mod(RKey, length(Routes))), - case lists:search( - fun(#route{binding = #binding{key = Key}}) -> - Key =:= N - end, Routes) of - {value, #route{binding = #binding{destination = Dest}}} -> - [Dest]; - false -> - [] + % MatchHead = #route{binding = #binding{source = Name, _ = '_'}}, + % Routes = ets:select(?MNESIA_TABLE, [{MatchHead, [], ['$_']}]), + case rabbit_binding:list_for_source(Name) of + [] -> + []; + Bindings -> + N = integer_to_binary(hash_mod(RKey, length(Bindings))), + % rabbit_log:debug("searching for ~p in ~p", [N, Routes]), + case lists:search(fun(#binding{key = Key}) -> + Key =:= N + end, Bindings) of + {value, #binding{destination = Dest}} -> + [Dest]; + false -> + [] + end end. info(_) -> []. @@ -82,5 +85,5 @@ remove_bindings(_Serial, _X, _Bs) -> ok. assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). -hash_mod([RKey | _], N) -> +hash_mod(RKey, N) -> murmerl3:hash_32(RKey, ?SEED) rem N. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index f496dfce98e1..44a67db39787 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -29,6 +29,10 @@ -export([start_link/1, create/4, delete/3, + create_super_stream/1, + %% obsolete use create_super_stream/1 + % create_super_stream/3, + %% obsolete use create_super_stream/1 create_super_stream/6, delete_super_stream/3, lookup_leader/2, @@ -41,6 +45,16 @@ -record(state, {configuration}). +-type super_stream_spec() :: +#{name := binary(), + vhost := binary(), + username := binary(), + partitions_source := {partition_count, pos_integer()} | + {routing_keys, [binary()]}, + arguments => map(), + exchange_type => binary() %<<"direct">> | <<"x-super-stream">>, + }. + start_link(Conf) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Conf], []). @@ -74,10 +88,63 @@ create_super_stream(VirtualHost, Arguments, RoutingKeys, Username) -> + Options = #{partitions => Partitions, + args => Arguments, + routing_keys => RoutingKeys, + username => Username}, + create_super_stream(VirtualHost, Name, Options). + + + +-spec create_super_stream(binary(), + binary(), + map()) -> ok | {error, term()}. +create_super_stream(VirtualHost, + Name, + #{username := Username} = Options) -> + Type = maps:get(exchange_type, Options, <<"direct">>), + Partitions = maps:get(partitions, Options, []), + Arguments = maps:get(args, Options, #{}), + RoutingKeys = maps:get(routing_keys, Options, []), gen_server:call(?MODULE, {create_super_stream, VirtualHost, Name, + Type, + Partitions, + Arguments, + RoutingKeys, + Username}). + + +-spec create_super_stream(super_stream_spec()) -> + ok | {error, term()}. +create_super_stream(#{name := Name, + vhost := VHost, + username := Username, + partitions_source := PartitionSource} = Spec) -> + Type = maps:get(exchange_type, Spec, <<"direct">>), + Arguments = maps:get(arguments, Spec, #{}), + {Partitions, RoutingKeys} = + case PartitionSource of + {partitions, Count} -> + Streams = [rabbit_stream_utils:partition_name(Name, K) + || K <- lists:seq(0, Count - 1)], + Keys = [integer_to_binary(K) || + K <- lists:seq(0, Count - 1)], + {Streams, Keys}; + {routing_keys, Keys} -> + Streams = + [rabbit_stream_utils:partition_name(Name, K) + || K <- Keys], + {Streams, Keys} + end, + + gen_server:call(?MODULE, + {create_super_stream, + VHost, + Name, + Type, Partitions, Arguments, RoutingKeys, @@ -212,6 +279,7 @@ handle_call({delete, VirtualHost, Reference, Username}, _From, handle_call({create_super_stream, VirtualHost, Name, + Type, Partitions, Arguments, RoutingKeys, @@ -221,7 +289,8 @@ handle_call({create_super_stream, {error, Reason} -> {reply, {error, Reason}, State}; ok -> - case declare_super_stream_exchange(VirtualHost, Name, Username) of + case declare_super_stream_exchange(VirtualHost, + Name, Type, Username) of ok -> RollbackOperations = [fun() -> @@ -260,6 +329,7 @@ handle_call({create_super_stream, BindingsResult = add_super_stream_bindings(VirtualHost, Name, + Type, Partitions, RoutingKeys, Username), @@ -447,46 +517,8 @@ handle_call({partition_index, VirtualHost, SuperStream, Stream}, "super stream ~tp (virtual host ~tp)", [Stream, SuperStream, VirtualHost]), Res = try - _ = rabbit_exchange:lookup_or_die(ExchangeName), - UnorderedBindings = - _ = [Binding - || Binding = #binding{destination = #resource{name = Q} = D} - <- rabbit_binding:list_for_source(ExchangeName), - is_resource_stream_queue(D), Q == Stream], - OrderedBindings = - rabbit_stream_utils:sort_partitions(UnorderedBindings), - rabbit_log:debug("Bindings: ~tp", [OrderedBindings]), - case OrderedBindings of - [] -> - {error, stream_not_found}; - Bindings -> - Binding = lists:nth(1, Bindings), - #binding{args = Args} = Binding, - case rabbit_misc:table_lookup(Args, - <<"x-stream-partition-order">>) - of - {_, Order} -> - Index = rabbit_data_coercion:to_integer(Order), - {ok, Index}; - _ -> - Pattern = <<"-">>, - Size = byte_size(Pattern), - case string:find(Stream, Pattern, trailing) of - nomatch -> - {ok, -1}; - <> -> - try - Index = binary_to_integer(Rest), - {ok, Index} - catch - error:_ -> - {ok, -1} - end; - _ -> - {ok, -1} - end - end - end + partition_index(rabbit_exchange:lookup_or_die(ExchangeName), + Stream) catch exit:Error -> rabbit_log:error("Error while looking up exchange ~tp, ~tp", @@ -626,6 +658,7 @@ super_stream_partitions(VirtualHost, SuperStream) -> is_resource_stream_queue(D)], OrderedBindings = rabbit_stream_utils:sort_partitions(UnorderedBindings), + rabbit_log:debug("OrderedBindings ~p", [OrderedBindings]), {ok, lists:foldl(fun (#binding{destination = #resource{kind = queue, name = Q}}, @@ -700,7 +733,7 @@ check_already_existing_queue0(VirtualHost, [Q | T], _Error) -> rabbit_misc:format("~ts is not a correct name for a queue", [Q])}} end. -declare_super_stream_exchange(VirtualHost, Name, Username) -> +declare_super_stream_exchange(VirtualHost, Name, Type, Username) -> case rabbit_stream_utils:enforce_correct_name(Name) of {ok, CorrectName} -> Args = @@ -708,7 +741,8 @@ declare_super_stream_exchange(VirtualHost, Name, Username) -> <<"x-super-stream">>, bool, true), - CheckedType = rabbit_exchange:check_type(<<"direct">>), + CheckedType = rabbit_exchange:check_type(Type), + rabbit_log:debug("CheckedType ~p", [CheckedType]), ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName), X = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> @@ -744,6 +778,7 @@ declare_super_stream_exchange(VirtualHost, Name, Username) -> add_super_stream_bindings(VirtualHost, Name, + Type, Partitions, RoutingKeys, Username) -> @@ -752,6 +787,7 @@ add_super_stream_bindings(VirtualHost, lists:foldl(fun ({Partition, RoutingKey}, {ok, Order}) -> case add_super_stream_binding(VirtualHost, Name, + Type, Partition, RoutingKey, Order, @@ -775,6 +811,7 @@ add_super_stream_bindings(VirtualHost, add_super_stream_binding(VirtualHost, SuperStream, + ExchangeType, Partition, RoutingKey, Order, @@ -786,11 +823,15 @@ add_super_stream_binding(VirtualHost, ExchangeName = rabbit_misc:r(VirtualHost, exchange, ExchangeNameBin), QueueName = rabbit_misc:r(VirtualHost, queue, QueueNameBin), Pid = self(), - Arguments = - rabbit_misc:set_table_value([], - <<"x-stream-partition-order">>, - long, - Order), + Arguments = case ExchangeType of + <<"direct">> -> + rabbit_misc:set_table_value([], + <<"x-stream-partition-order">>, + long, + Order); + _ -> + [] + end, case rabbit_binding:add(#binding{source = ExchangeName, destination = QueueName, key = RoutingKey, @@ -908,3 +949,45 @@ is_resource_stream_queue(#resource{kind = queue} = Resource) -> end; is_resource_stream_queue(_) -> false. + +partition_index(#exchange{name = ExchangeName, + type = ExchangeType}, Stream) -> + UnorderedBindings = + [Binding + || Binding = #binding{destination = #resource{name = Q} = D} + <- rabbit_binding:list_for_source(ExchangeName), + is_resource_stream_queue(D), Q == Stream], + case UnorderedBindings of + [] -> + {error, stream_not_found}; + _ when ExchangeType =:= direct -> + Bindings = rabbit_stream_utils:sort_partitions(UnorderedBindings), + Binding = lists:nth(1, Bindings), + #binding{args = Args} = Binding, + case rabbit_misc:table_lookup(Args, + <<"x-stream-partition-order">>) + of + {_, Order} -> + Index = rabbit_data_coercion:to_integer(Order), + {ok, Index}; + _ -> + Pattern = <<"-">>, + Size = byte_size(Pattern), + case string:find(Stream, Pattern, trailing) of + nomatch -> + {ok, -1}; + <> -> + try + Index = binary_to_integer(Rest), + {ok, Index} + catch + error:_ -> + {ok, -1} + end; + _ -> + {ok, -1} + end + end; + [#binding{key = Key}] when ExchangeType =:= 'x-super-stream' -> + {ok, binary_to_integer(Key)} + end. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index 5ba9b17e465f..2dc214fe919c 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -28,6 +28,7 @@ extract_stream_list/2, sort_partitions/1, strip_cr_lf/1, + partition_name/2, consumer_activity_status/2, command_versions/0]). @@ -241,9 +242,22 @@ sort_partitions(Partitions) -> end, Partitions). +table_lookup(Key, Args, Default) -> + case rabbit_misc:table_lookup(Args, Key) of + undefined -> + Default; + {_T, V} -> + V + end. + strip_cr_lf(NameBin) -> binary:replace(NameBin, [<<"\n">>, <<"\r">>], <<"">>, [global]). +partition_name(SuperStream, Suffix0) + when is_binary(SuperStream) -> + Suffix = rabbit_data_coercion:to_binary(Suffix0), + <>. + consumer_activity_status(Active, Properties) -> case {rabbit_stream_reader:single_active_consumer(Properties), Active} of diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl index 7ccc2deb4685..190ec279e634 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl @@ -8,9 +8,10 @@ -module(rabbit_stream_manager_SUITE). -include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). +% -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-compile(nowarn_export_all). -compile(export_all). all() -> @@ -18,10 +19,13 @@ all() -> groups() -> [{non_parallel_tests, [], - [manage_super_stream, + [manage_super_stream_exchange_type_direct, + manage_super_stream_exchange_type_x_super_stream, + route_direct_super_stream, lookup_leader, lookup_member, - partition_index]}]. + partition_index, + partition_index_x_super_stream]}]. %% ------------------------------------------------------------------- %% Testsuite setup/teardown. @@ -98,32 +102,30 @@ lookup_member(Config) -> ?assertEqual({ok, deleted}, delete_stream(Config, Stream)). -manage_super_stream(Config) -> +manage_super_stream_exchange_type_direct(Config) -> + manage_super_stream(Config, <<"direct">>). + +manage_super_stream_exchange_type_x_super_stream(Config) -> + manage_super_stream(Config, <<"x-super-stream">>). + +manage_super_stream(Config, Type) -> % create super stream ?assertEqual(ok, create_super_stream(Config, - <<"invoices">>, - [<<"invoices-0">>, <<"invoices-1">>, - <<"invoices-2">>], - [<<"0">>, <<"1">>, <<"2">>])), + #{name => <<"invoices">>, + exchange_type => Type, + partitions_source => {partitions, 3}})), % get the correct partitions ?assertEqual({ok, [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]}, partitions(Config, <<"invoices">>)), - [?assertEqual({ok, [Partition]}, - route(Config, RoutingKey, <<"invoices">>)) - || {Partition, RoutingKey} - <- [{<<"invoices-0">>, <<"0">>}, {<<"invoices-1">>, <<"1">>}, - {<<"invoices-2">>, <<"2">>}]], - % get an error if trying to re-create it ?assertMatch({error, _}, create_super_stream(Config, - <<"invoices">>, - [<<"invoices-0">>, <<"invoices-1">>, - <<"invoices-2">>], - [<<"0">>, <<"1">>, <<"2">>])), + #{name => <<"invoices">>, + exchange_type => Type, + partitions_source => {partitions, 3}})), % can delete it ?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)), @@ -134,22 +136,40 @@ manage_super_stream(Config) -> % cannot create the super stream because a partition already exists ?assertMatch({error, _}, create_super_stream(Config, - <<"invoices">>, - [<<"invoices-0">>, <<"invoices-1">>, - <<"invoices-2">>], - [<<"0">>, <<"1">>, <<"2">>])), + #{name => <<"invoices">>, + exchange_type => Type, + partitions_source => {partitions, 3}})), ?assertMatch({ok, _}, delete_stream(Config, <<"invoices-1">>)), ok. +route_direct_super_stream(Config) -> + % create super stream + ?assertEqual(ok, + create_super_stream(Config, + #{name => <<"invoices">>, + exchange_type => <<"direct">>, + partitions_source => {partitions, 3}})), + % get the correct partitions + ?assertEqual({ok, + [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]}, + partitions(Config, <<"invoices">>)), + + [?assertEqual({ok, [Partition]}, + route(Config, RoutingKey, <<"invoices">>)) + || {Partition, RoutingKey} + <- [{<<"invoices-0">>, <<"0">>}, {<<"invoices-1">>, <<"1">>}, + {<<"invoices-2">>, <<"2">>}]], + ?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)), + ok. + partition_index(Config) -> % create super stream ?assertEqual(ok, create_super_stream(Config, - <<"invoices">>, - [<<"invoices-0">>, <<"invoices-1">>, - <<"invoices-2">>], - [<<"0">>, <<"1">>, <<"2">>])), + #{name => <<"invoices">>, + exchange_type => <<"direct">>, + partitions_source => {partitions, 3}})), [?assertEqual({ok, Index}, partition_index(Config, <<"invoices">>, Stream)) || {Index, Stream} @@ -189,17 +209,35 @@ partition_index(Config) -> amqp_connection:close(C), ok. -create_super_stream(Config, Name, Partitions, RKs) -> +partition_index_x_super_stream(Config) -> + % create super stream + ?assertEqual(ok, + create_super_stream(Config, + #{name => <<"invoices">>, + exchange_type => <<"x-super-stream">>, + partitions_source => {partitions, 3}})), + [?assertEqual({ok, Index}, + partition_index(Config, <<"invoices">>, Stream)) + || {Index, Stream} + <- [{0, <<"invoices-0">>}, {1, <<"invoices-1">>}, + {2, <<"invoices-2">>}]], + + ?assertEqual({error, stream_not_found}, + partition_index(Config, <<"invoices">>, + <<"bananas-gorilla">>)), + + ?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)), + + ok. + +create_super_stream(Config, Spec0) -> + Spec = Spec0#{vhost => <<"/">>, + username => <<"guest">>}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stream_manager, create_super_stream, - [<<"/">>, - Name, - Partitions, - #{}, - RKs, - <<"guest">>]). + [Spec]). delete_super_stream(Config, Name) -> rabbit_ct_broker_helpers:rpc(Config, From a9eed09fd12c8716da9a7babae246e7d4575d8de Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 7 Jun 2023 11:08:16 +0100 Subject: [PATCH 3/9] tests --- .../src/rabbit_exchange_type_super_stream.erl | 3 -- .../src/rabbit_stream_manager.erl | 14 +++++---- .../src/rabbit_stream_utils.erl | 21 ++++++------- deps/rabbitmq_stream/test/commands_SUITE.erl | 31 +++++++++++++++++++ .../test/rabbit_stream_manager_SUITE.erl | 31 +++++++++++++++---- 5 files changed, 73 insertions(+), 27 deletions(-) diff --git a/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl b/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl index 97f45aa81b5e..6865ec64e44d 100644 --- a/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl @@ -45,14 +45,11 @@ route(#exchange{name = Name}, #delivery{message = #basic_message{routing_keys = [RKey | _]}}) -> %% get all bindings for the exchange and use murmur3 to generate %% the binding key to match on - % MatchHead = #route{binding = #binding{source = Name, _ = '_'}}, - % Routes = ets:select(?MNESIA_TABLE, [{MatchHead, [], ['$_']}]), case rabbit_binding:list_for_source(Name) of [] -> []; Bindings -> N = integer_to_binary(hash_mod(RKey, length(Bindings))), - % rabbit_log:debug("searching for ~p in ~p", [N, Routes]), case lists:search(fun(#binding{key = Key}) -> Key =:= N end, Bindings) of diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 44a67db39787..60f0b278f67b 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -119,6 +119,9 @@ create_super_stream(VirtualHost, -spec create_super_stream(super_stream_spec()) -> ok | {error, term()}. +create_super_stream(#{exchange_type := <<"x-super-stream">>, + partitions_source := {routing_keys, _}}) -> + {error, unsupported_specification}; create_super_stream(#{name := Name, vhost := VHost, username := Username, @@ -127,7 +130,7 @@ create_super_stream(#{name := Name, Arguments = maps:get(arguments, Spec, #{}), {Partitions, RoutingKeys} = case PartitionSource of - {partitions, Count} -> + {partition_count, Count} -> Streams = [rabbit_stream_utils:partition_name(Name, K) || K <- lists:seq(0, Count - 1)], Keys = [integer_to_binary(K) || @@ -650,15 +653,14 @@ delete_stream(VirtualHost, Reference, Username) -> super_stream_partitions(VirtualHost, SuperStream) -> ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), try - _ = rabbit_exchange:lookup_or_die(ExchangeName), + Exchange = rabbit_exchange:lookup_or_die(ExchangeName), UnorderedBindings = [Binding || Binding = #binding{destination = D} <- rabbit_binding:list_for_source(ExchangeName), is_resource_stream_queue(D)], OrderedBindings = - rabbit_stream_utils:sort_partitions(UnorderedBindings), - rabbit_log:debug("OrderedBindings ~p", [OrderedBindings]), + rabbit_stream_utils:sort_partitions(Exchange, UnorderedBindings), {ok, lists:foldl(fun (#binding{destination = #resource{kind = queue, name = Q}}, @@ -951,7 +953,7 @@ is_resource_stream_queue(_) -> false. partition_index(#exchange{name = ExchangeName, - type = ExchangeType}, Stream) -> + type = ExchangeType} = Exchange, Stream) -> UnorderedBindings = [Binding || Binding = #binding{destination = #resource{name = Q} = D} @@ -961,7 +963,7 @@ partition_index(#exchange{name = ExchangeName, [] -> {error, stream_not_found}; _ when ExchangeType =:= direct -> - Bindings = rabbit_stream_utils:sort_partitions(UnorderedBindings), + Bindings = rabbit_stream_utils:sort_partitions(Exchange, UnorderedBindings), Binding = lists:nth(1, Bindings), #binding{args = Args} = Binding, case rabbit_misc:table_lookup(Args, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index 2dc214fe919c..2606618eb8a1 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -26,7 +26,7 @@ check_write_permitted/3, check_read_permitted/3, extract_stream_list/2, - sort_partitions/1, + sort_partitions/2, strip_cr_lf/1, partition_name/2, consumer_activity_status/2, @@ -222,8 +222,8 @@ extract_stream_list(<>, Streams) -> extract_stream_list(Rest, [Stream | Streams]). --spec sort_partitions([#binding{}]) -> [#binding{}]. -sort_partitions(Partitions) -> +-spec sort_partitions(#exchange{}, [#binding{}]) -> [#binding{}]. +sort_partitions(#exchange{type = direct}, Partitions) -> lists:sort(fun(#binding{args = Args1}, #binding{args = Args2}) -> Arg1 = rabbit_misc:table_lookup(Args1, @@ -240,15 +240,12 @@ sort_partitions(Partitions) -> _ -> true end end, - Partitions). - -table_lookup(Key, Args, Default) -> - case rabbit_misc:table_lookup(Args, Key) of - undefined -> - Default; - {_T, V} -> - V - end. + Partitions); +sort_partitions(#exchange{type = 'x-super-stream'}, Partitions) -> + lists:sort(fun(#binding{key = Key1}, #binding{key = Key2}) -> + binary_to_integer(Key1) + =< binary_to_integer(Key2) + end, Partitions). strip_cr_lf(NameBin) -> binary:replace(NameBin, [<<"\n">>, <<"\r">>], <<"">>, [global]). diff --git a/deps/rabbitmq_stream/test/commands_SUITE.erl b/deps/rabbitmq_stream/test/commands_SUITE.erl index b718568e31e6..e1975dfbf037 100644 --- a/deps/rabbitmq_stream/test/commands_SUITE.erl +++ b/deps/rabbitmq_stream/test/commands_SUITE.erl @@ -55,6 +55,7 @@ groups() -> list_group_consumers_run]}, {super_streams, [], [add_super_stream_merge_defaults, + add_delete_super_stream_x_super_stram_run, add_super_stream_validate, delete_super_stream_merge_defaults, delete_super_stream_validate, @@ -617,6 +618,7 @@ add_delete_super_stream_run(Config) -> Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), Opts = #{node => Node, + exchange_type => <<"direct">>, timeout => 10000, vhost => <<"/">>}, @@ -682,6 +684,35 @@ add_delete_super_stream_run(Config) -> ok. +add_delete_super_stream_x_super_stram_run(Config) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Opts = + #{node => Node, + exchange_type => <<"x-super-stream">>, + timeout => 10000, + vhost => <<"/">>}, + + % with number of partitions + ?assertMatch({ok, _}, + ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>], + maps:merge(#{partitions => 3}, + Opts))), + ?assertEqual({ok, + [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]}, + partitions(Config, <<"invoices">>)), + ?assertMatch({ok, _}, + ?COMMAND_DELETE_SUPER_STREAM:run([<<"invoices">>], Opts)), + ?assertEqual({error, stream_not_found}, + partitions(Config, <<"invoices">>)), + + % with routing keys + ?assertMatch({error, _}, + ?COMMAND_ADD_SUPER_STREAM:run([<<"invoices">>], + maps:merge(#{routing_keys => + <<" amer,emea , apac">>}, + Opts))), + ok. + partitions(Config, Name) -> rabbit_ct_broker_helpers:rpc(Config, 0, diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl index 190ec279e634..da0ce3654f99 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl @@ -21,6 +21,7 @@ groups() -> [{non_parallel_tests, [], [manage_super_stream_exchange_type_direct, manage_super_stream_exchange_type_x_super_stream, + create_super_stream_with_routing_keys, route_direct_super_stream, lookup_leader, lookup_member, @@ -114,7 +115,7 @@ manage_super_stream(Config, Type) -> create_super_stream(Config, #{name => <<"invoices">>, exchange_type => Type, - partitions_source => {partitions, 3}})), + partitions_source => {partition_count, 3}})), % get the correct partitions ?assertEqual({ok, [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]}, @@ -125,7 +126,7 @@ manage_super_stream(Config, Type) -> create_super_stream(Config, #{name => <<"invoices">>, exchange_type => Type, - partitions_source => {partitions, 3}})), + partitions_source => {partition_count, 3}})), % can delete it ?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)), @@ -138,18 +139,36 @@ manage_super_stream(Config, Type) -> create_super_stream(Config, #{name => <<"invoices">>, exchange_type => Type, - partitions_source => {partitions, 3}})), + partitions_source => {partition_count, 3}})), ?assertMatch({ok, _}, delete_stream(Config, <<"invoices-1">>)), ok. +create_super_stream_with_routing_keys(Config) -> + RKs = [<<"1">>, <<"2">>, <<"3">>], + % create super stream + ?assertEqual(ok, + create_super_stream(Config, + #{name => <<"invoices">>, + partitions_source => {routing_keys, RKs}})), + + ?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)), + + % should fail when exchange_type is x-super-stream + ?assertMatch({error, _}, + create_super_stream(Config, + #{name => <<"invoices">>, + exchange_type => <<"x-super-stream">>, + partitions_source => {routing_keys, RKs}})), + ok. + route_direct_super_stream(Config) -> % create super stream ?assertEqual(ok, create_super_stream(Config, #{name => <<"invoices">>, exchange_type => <<"direct">>, - partitions_source => {partitions, 3}})), + partitions_source => {partition_count, 3}})), % get the correct partitions ?assertEqual({ok, [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]}, @@ -169,7 +188,7 @@ partition_index(Config) -> create_super_stream(Config, #{name => <<"invoices">>, exchange_type => <<"direct">>, - partitions_source => {partitions, 3}})), + partitions_source => {partition_count, 3}})), [?assertEqual({ok, Index}, partition_index(Config, <<"invoices">>, Stream)) || {Index, Stream} @@ -215,7 +234,7 @@ partition_index_x_super_stream(Config) -> create_super_stream(Config, #{name => <<"invoices">>, exchange_type => <<"x-super-stream">>, - partitions_source => {partitions, 3}})), + partitions_source => {partition_count, 3}})), [?assertEqual({ok, Index}, partition_index(Config, <<"invoices">>, Stream)) || {Index, Stream} From b595d7ff0d637085141a8d8d89af693eddb31c30 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 7 Jun 2023 12:48:52 +0100 Subject: [PATCH 4/9] "formatting" --- ...CLI.Ctl.Commands.AddSuperStreamCommand.erl | 41 +- .../src/rabbit_exchange_type_super_stream.erl | 34 +- deps/rabbitmq_stream/src/rabbit_stream.erl | 9 +- .../src/rabbit_stream_manager.erl | 93 ++--- .../src/rabbit_stream_reader.erl | 371 ++++++++++-------- .../src/rabbit_stream_utils.erl | 18 +- deps/rabbitmq_stream/test/commands_SUITE.erl | 11 +- .../test/rabbit_stream_SUITE.erl | 118 +++--- .../test/rabbit_stream_manager_SUITE.erl | 27 +- 9 files changed, 408 insertions(+), 314 deletions(-) diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl index 20c6b6a08118..4b8f68049018 100644 --- a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl @@ -56,7 +56,8 @@ validate([], _Opts) -> validate([_Name], #{partitions := _, routing_keys := _}) -> {validation_failure, "Specify --partitions or routing-keys, not both."}; -validate([_Name], #{exchange_type := <<"x-super-stream">>, routing_keys := _}) -> +validate([_Name], + #{exchange_type := <<"x-super-stream">>, routing_keys := _}) -> {validation_failure, "Exchange type x-super-stream cannot be used with routing-keys."}; validate([_Name], #{partitions := Partitions}) when Partitions < 1 -> @@ -153,8 +154,7 @@ usage() -> "s ] [--routing-keys ]">>. usage_additional() -> - [[<<"">>, - <<"The name of the super stream.">>], + [[<<"">>, <<"The name of the super stream.">>], [<<"--vhost ">>, <<"The virtual host the super stream is added to.">>], [<<"--partitions ">>, @@ -180,26 +180,25 @@ run([SuperStream], timeout := Timeout, partitions := Partitions} = Opts) -> - Spec0 = maps:with([vhost, - exchange_type], Opts), - Spec = Spec0#{username => cli_acting_user(), - name => SuperStream, - partitions_source => {partition_count, Partitions}, - arguments => stream_arguments(Opts)}, + Spec0 = maps:with([vhost, exchange_type], Opts), + Spec = + Spec0#{username => cli_acting_user(), + name => SuperStream, + partitions_source => {partition_count, Partitions}, + arguments => stream_arguments(Opts)}, create_super_stream(NodeName, Timeout, Spec); run([SuperStream], #{node := NodeName, timeout := Timeout, routing_keys := RoutingKeysStr} = Opts) -> - Spec0 = maps:with([vhost, - exchange_type], Opts), - RoutingKeys = - [K || K <- string:lexemes(RoutingKeysStr, ", ")], - Spec = Spec0#{username => cli_acting_user(), - name => SuperStream, - partitions_source => {routing_keys, RoutingKeys}, - arguments => stream_arguments(Opts)}, + Spec0 = maps:with([vhost, exchange_type], Opts), + RoutingKeys = [K || K <- string:lexemes(RoutingKeysStr, ", ")], + Spec = + Spec0#{username => cli_acting_user(), + name => SuperStream, + partitions_source => {routing_keys, RoutingKeys}, + arguments => stream_arguments(Opts)}, create_super_stream(NodeName, Timeout, Spec). stream_arguments(Opts) -> @@ -207,6 +206,7 @@ stream_arguments(Opts) -> %% Something strange, dialyzer infers that map_size/1 returns positive_integer() -dialyzer({no_match, stream_arguments/2}). + stream_arguments(Acc, Arguments) when map_size(Arguments) =:= 0 -> Acc; stream_arguments(Acc, #{max_length_bytes := Value} = Arguments) -> @@ -245,13 +245,12 @@ duration_to_seconds([{sign, _}, {seconds, S}]) -> Y * 365 * 86400 + M * 30 * 86400 + D * 86400 + H * 3600 + Mn * 60 + S. -create_super_stream(NodeName, - Timeout, - Spec) -> +create_super_stream(NodeName, Timeout, Spec) -> case rabbit_misc:rpc_call(NodeName, rabbit_stream_manager, create_super_stream, - [Spec], Timeout) + [Spec], + Timeout) of ok -> {ok, diff --git a/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl b/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl index 6865ec64e44d..a498369d7286 100644 --- a/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_exchange_type_super_stream.erl @@ -60,25 +60,41 @@ route(#exchange{name = Name}, end end. -info(_) -> []. -info(_, _) -> []. -validate(_X) -> ok. +info(_) -> + []. + +info(_, _) -> + []. + +validate(_X) -> + ok. validate_binding(_X, #binding{key = K}) -> try %% just check the Key is an integer _ = binary_to_integer(K), ok - catch error:badarg -> + catch + error:badarg -> {error, {binding_invalid, "The binding key must be an integer: ~tp", [K]}} end. -create(_Serial, _X) -> ok. -delete(_Serial, _X) -> ok. -policy_changed(_X1, _X2) -> ok. -add_binding(_Serial, _X, _B) -> ok. -remove_bindings(_Serial, _X, _Bs) -> ok. +create(_Serial, _X) -> + ok. + +delete(_Serial, _X) -> + ok. + +policy_changed(_X1, _X2) -> + ok. + +add_binding(_Serial, _X, _B) -> + ok. + +remove_bindings(_Serial, _X, _Bs) -> + ok. + assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl index 40c612bc8d83..5d48f8990e1a 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream.erl @@ -40,8 +40,7 @@ start(_Type, _Args) -> rabbit_stream_metrics:init(), - rabbit_global_counters:init([{protocol, stream}], - ?PROTOCOL_COUNTERS), + rabbit_global_counters:init([{protocol, stream}], ?PROTOCOL_COUNTERS), rabbit_global_counters:init([{protocol, stream}, {queue_type, ?STREAM_QUEUE_TYPE}]), rabbit_stream_sup:start_link(). @@ -130,8 +129,10 @@ kill_connection(ConnectionName) -> {ConnectionPid, #{<<"connection_name">> := ConnectionNameBin}} -> exit(ConnectionPid, kill); - {ConnectionPid, _ClientProperties} -> ok - after 1000 -> ok + {ConnectionPid, _ClientProperties} -> + ok + after 1000 -> + ok end end, pg_local:get_members(rabbit_stream_connections)). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 60f0b278f67b..24c89902aced 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -30,9 +30,6 @@ create/4, delete/3, create_super_stream/1, - %% obsolete use create_super_stream/1 - % create_super_stream/3, - %% obsolete use create_super_stream/1 create_super_stream/6, delete_super_stream/3, lookup_leader/2, @@ -43,17 +40,22 @@ partitions/2, partition_index/3]). + %% obsolete use create_super_stream/1 + % create_super_stream/3, + %% obsolete use create_super_stream/1 + -record(state, {configuration}). -type super_stream_spec() :: -#{name := binary(), - vhost := binary(), - username := binary(), - partitions_source := {partition_count, pos_integer()} | - {routing_keys, [binary()]}, - arguments => map(), - exchange_type => binary() %<<"direct">> | <<"x-super-stream">>, - }. + #{name := binary(), + vhost := binary(), + username := binary(), + partitions_source := + {partition_count, pos_integer()} | {routing_keys, [binary()]}, + arguments => map(), + exchange_type => binary()}. + + %<<"direct">> | <<"x-super-stream">>, start_link(Conf) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Conf], []). @@ -88,19 +90,16 @@ create_super_stream(VirtualHost, Arguments, RoutingKeys, Username) -> - Options = #{partitions => Partitions, - args => Arguments, - routing_keys => RoutingKeys, - username => Username}, + Options = + #{partitions => Partitions, + args => Arguments, + routing_keys => RoutingKeys, + username => Username}, create_super_stream(VirtualHost, Name, Options). - - --spec create_super_stream(binary(), - binary(), - map()) -> ok | {error, term()}. -create_super_stream(VirtualHost, - Name, +-spec create_super_stream(binary(), binary(), map()) -> + ok | {error, term()}. +create_super_stream(VirtualHost, Name, #{username := Username} = Options) -> Type = maps:get(exchange_type, Options, <<"direct">>), Partitions = maps:get(partitions, Options, []), @@ -116,30 +115,29 @@ create_super_stream(VirtualHost, RoutingKeys, Username}). - -spec create_super_stream(super_stream_spec()) -> - ok | {error, term()}. + ok | {error, term()}. create_super_stream(#{exchange_type := <<"x-super-stream">>, partitions_source := {routing_keys, _}}) -> {error, unsupported_specification}; create_super_stream(#{name := Name, vhost := VHost, username := Username, - partitions_source := PartitionSource} = Spec) -> + partitions_source := PartitionSource} = + Spec) -> Type = maps:get(exchange_type, Spec, <<"direct">>), Arguments = maps:get(arguments, Spec, #{}), {Partitions, RoutingKeys} = case PartitionSource of {partition_count, Count} -> - Streams = [rabbit_stream_utils:partition_name(Name, K) - || K <- lists:seq(0, Count - 1)], - Keys = [integer_to_binary(K) || - K <- lists:seq(0, Count - 1)], + Streams = + [rabbit_stream_utils:partition_name(Name, K) + || K <- lists:seq(0, Count - 1)], + Keys = [integer_to_binary(K) || K <- lists:seq(0, Count - 1)], {Streams, Keys}; {routing_keys, Keys} -> Streams = - [rabbit_stream_utils:partition_name(Name, K) - || K <- Keys], + [rabbit_stream_utils:partition_name(Name, K) || K <- Keys], {Streams, Keys} end, @@ -825,15 +823,16 @@ add_super_stream_binding(VirtualHost, ExchangeName = rabbit_misc:r(VirtualHost, exchange, ExchangeNameBin), QueueName = rabbit_misc:r(VirtualHost, queue, QueueNameBin), Pid = self(), - Arguments = case ExchangeType of - <<"direct">> -> - rabbit_misc:set_table_value([], - <<"x-stream-partition-order">>, - long, - Order); - _ -> - [] - end, + Arguments = + case ExchangeType of + <<"direct">> -> + rabbit_misc:set_table_value([], + <<"x-stream-partition-order">>, + long, + Order); + _ -> + [] + end, case rabbit_binding:add(#binding{source = ExchangeName, destination = QueueName, key = RoutingKey, @@ -952,22 +951,24 @@ is_resource_stream_queue(#resource{kind = queue} = Resource) -> is_resource_stream_queue(_) -> false. -partition_index(#exchange{name = ExchangeName, - type = ExchangeType} = Exchange, Stream) -> +partition_index(#exchange{name = ExchangeName, type = ExchangeType} = + Exchange, + Stream) -> UnorderedBindings = [Binding || Binding = #binding{destination = #resource{name = Q} = D} - <- rabbit_binding:list_for_source(ExchangeName), + <- rabbit_binding:list_for_source(ExchangeName), is_resource_stream_queue(D), Q == Stream], case UnorderedBindings of [] -> {error, stream_not_found}; _ when ExchangeType =:= direct -> - Bindings = rabbit_stream_utils:sort_partitions(Exchange, UnorderedBindings), + Bindings = + rabbit_stream_utils:sort_partitions(Exchange, + UnorderedBindings), Binding = lists:nth(1, Bindings), #binding{args = Args} = Binding, - case rabbit_misc:table_lookup(Args, - <<"x-stream-partition-order">>) + case rabbit_misc:table_lookup(Args, <<"x-stream-partition-order">>) of {_, Order} -> Index = rabbit_data_coercion:to_integer(Order), diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index eb5e35c7b89a..6af6f8ae1578 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -47,9 +47,7 @@ send_limit :: non_neg_integer(), log :: undefined | osiris_log:state(), last_listener_offset = undefined :: undefined | osiris:offset()}). --record(request, - {start :: integer(), - content :: term()}). +-record(request, {start :: integer(), content :: term()}). -record(stream_connection_state, {data :: rabbit_stream_core:state(), blocked :: boolean(), consumers :: #{subscription_id() => #consumer{}}}). @@ -228,8 +226,8 @@ init([KeepaliveSup, socket_op(Sock, fun(S) -> rabbit_net:socket_ends(S, inbound) end), DeliverVersion = ?VERSION_1, - RequestTimeout = application:get_env(rabbitmq_stream, - request_timeout, 60_000), + RequestTimeout = + application:get_env(rabbitmq_stream, request_timeout, 60_000), Connection = #stream_connection{name = rabbit_data_coercion:to_binary(ConnStr), @@ -558,9 +556,9 @@ invalid_transition(Transport, Socket, From, To) -> close_immediately(Transport, Socket), stop. --spec resource_alarm(pid(), - rabbit_alarm:resource_alarm_source(), - rabbit_alarm:resource_alert()) -> ok. +-spec resource_alarm(pid(), rabbit_alarm:resource_alarm_source(), + rabbit_alarm:resource_alert()) -> + ok. resource_alarm(ConnectionPid, disk, {_, Conserve, _}) -> ConnectionPid ! {resource_alarm, Conserve}, ok; @@ -662,12 +660,14 @@ augment_infos_with_user_provided_connection_name(Infos, end. close(Transport, - #stream_connection{socket = S, virtual_host = VirtualHost, + #stream_connection{socket = S, + virtual_host = VirtualHost, outstanding_requests = Requests}, #stream_connection_state{consumers = Consumers}) -> [begin %% we discard the result (updated requests) because they are no longer used - _ = maybe_unregister_consumer(VirtualHost, Consumer, + _ = maybe_unregister_consumer(VirtualHost, + Consumer, single_active_consumer(Properties), Requests), case Log of @@ -795,32 +795,32 @@ open(info, {OK, S, Data}, connection_state = State2}} end; open(info, - {sac, {{subscription_id, SubId}, - {active, Active}, {extra, Extra}}}, + {sac, {{subscription_id, SubId}, {active, Active}, {extra, Extra}}}, State) -> - Msg0 = #{subscription_id => SubId, - active => Active}, - Msg1 = case Extra of - [{stepping_down, true}] -> - Msg0#{stepping_down => true}; - _ -> - Msg0 - end, + Msg0 = #{subscription_id => SubId, active => Active}, + Msg1 = + case Extra of + [{stepping_down, true}] -> + Msg0#{stepping_down => true}; + _ -> + Msg0 + end, open(info, {sac, Msg1}, State); -open(info, - {sac, #{subscription_id := SubId, - active := Active} = Msg}, +open(info, {sac, #{subscription_id := SubId, active := Active} = Msg}, #statem_data{transport = Transport, - connection = #stream_connection{virtual_host = VirtualHost} = Connection0, + connection = + #stream_connection{virtual_host = VirtualHost} = + Connection0, connection_state = ConnState0} = State) -> #stream_connection_state{consumers = Consumers0} = ConnState0, - Stream = case Msg of - #{stream := S} -> - S; - _ -> - stream_from_consumers(SubId, Consumers0) - end, + Stream = + case Msg of + #{stream := S} -> + S; + _ -> + stream_from_consumers(SubId, Consumers0) + end, rabbit_log:debug("Subscription ~tp on ~tp instructed to become active: " "~tp", @@ -878,12 +878,15 @@ open(info, [SubId, Stream]), rabbit_log:debug("Active ~tp, message ~tp", [Active, Msg]), case {Active, Msg} of - {false, #{stepping_down := true, - stream := St, - consumer_name := ConsumerName}} -> - rabbit_log:debug("Former active consumer gone, activating consumer " ++ - "on stream ~tp, group ~tp", [St, ConsumerName]), - _ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost, + {false, + #{stepping_down := true, + stream := St, + consumer_name := ConsumerName}} -> + rabbit_log:debug("Former active consumer gone, activating consumer " + ++ "on stream ~tp, group ~tp", + [St, ConsumerName]), + _ = + rabbit_stream_sac_coordinator:activate_consumer(VirtualHost, St, ConsumerName); _ -> @@ -971,24 +974,29 @@ open(info, emit_stats, Connection1 = emit_stats(Connection, State), {keep_state, StatemData#statem_data{connection = Connection1}}; open(info, check_outstanding_requests, - #statem_data{connection = #stream_connection{outstanding_requests = Requests, - request_timeout = Timeout} = Connection0} = + #statem_data{connection = + #stream_connection{outstanding_requests = Requests, + request_timeout = Timeout} = + Connection0} = StatemData) -> Time = erlang:monotonic_time(millisecond), - rabbit_log:debug("Checking outstanding requests at ~tp: ~tp", [Time, Requests]), - HasTimedOut = maps:fold(fun(_, #request{}, true) -> - true; - (K, #request{content = Ctnt, start = Start}, false) -> - case (Time - Start) > Timeout of - true -> - rabbit_log:debug("Request ~tp with content ~tp has timed out", - [K, Ctnt]), - - true; - false -> - false - end - end, false, Requests), + rabbit_log:debug("Checking outstanding requests at ~tp: ~tp", + [Time, Requests]), + HasTimedOut = + maps:fold(fun (_, #request{}, true) -> + true; + (K, #request{content = Ctnt, start = Start}, false) -> + case Time - Start > Timeout of + true -> + rabbit_log:debug("Request ~tp with content ~tp has timed out", + [K, Ctnt]), + + true; + false -> + false + end + end, + false, Requests), case HasTimedOut of true -> rabbit_log_connection:info("Forcing stream connection ~tp closing: request to client timed out", @@ -996,9 +1004,10 @@ open(info, check_outstanding_requests, _ = demonitor_all_streams(Connection0), {stop, {request_timeout, <<"Request timeout">>}}; false -> - Connection1 = ensure_outstanding_requests_timer( - Connection0#stream_connection{outstanding_requests_timer = undefined} - ), + Connection1 = + ensure_outstanding_requests_timer(Connection0#stream_connection{outstanding_requests_timer + = + undefined}), {keep_state, StatemData#statem_data{connection = Connection1}} end; open(info, {shutdown, Explanation} = Reason, @@ -1051,7 +1060,8 @@ open(cast, Ids -> Acc#{PublisherId => [PublishingId | Ids]} end; - false -> Acc + false -> + Acc end end, #{}, CorrelationList), @@ -1175,7 +1185,8 @@ open(cast, case {Credit, Log} of {_, undefined} -> Consumer; %% SAC not active - {0, _} -> Consumer; + {0, _} -> + Consumer; {_, _} -> case send_chunks(DeliverVersion, Transport, @@ -1192,7 +1203,8 @@ open(cast, [Reason]), %% likely a connection problem Consumer; - {ok, Csmr} -> Csmr + {ok, Csmr} -> + Csmr end end, ConsumersAcc#{SubscriptionId => Consumer1} @@ -1412,8 +1424,7 @@ handle_frame_pre_auth(Transport, {sasl_authenticate, ?RESPONSE_AUTHENTICATION_FAILURE, <<>>}}; {protocol_error, Msg, Args} -> - rabbit_core_metrics:auth_attempt_failed(Host, - <<>>, + rabbit_core_metrics:auth_attempt_failed(Host, <<>>, stream), notify_auth_result(none, user_authentication_failure, @@ -1955,8 +1966,7 @@ handle_frame_post_auth(Transport, Properties]), Sac = single_active_consumer(Properties), ConsumerName = consumer_name(Properties), - case {Sac, ConsumerName} - of + case {Sac, ConsumerName} of {true, undefined} -> rabbit_log:warning("Cannot create subcription ~tp, a single active " "consumer must have a name", @@ -2095,7 +2105,8 @@ handle_frame_post_auth(Transport, 1), {Connection, State#stream_connection_state{consumers = - Consumers#{SubscriptionId => Consumer1}}}; + Consumers#{SubscriptionId => + Consumer1}}}; #{SubscriptionId := Consumer} -> #consumer{credit = AvailableCredit, last_listener_offset = LLO} = Consumer, @@ -2426,7 +2437,8 @@ handle_frame_post_auth(Transport, NodesAcc) end, Acc1, ReplicaNodes); - {error, _} -> Acc + {error, _} -> + Acc end end, #{}, Streams), @@ -2445,13 +2457,16 @@ handle_frame_post_auth(Transport, lists:foldr(fun(Node, Acc) -> PortFunction = case TransportLayer of - tcp -> port; - ssl -> tls_port + tcp -> + port; + ssl -> + tls_port end, Host = rpc:call(Node, rabbit_stream, host, []), Port = rpc:call(Node, rabbit_stream, PortFunction, []), case {is_binary(Host), is_integer(Port)} of - {true, true} -> Acc#{Node => {Host, Port}}; + {true, true} -> + Acc#{Node => {Host, Port}}; _ -> rabbit_log:warning("Error when retrieving broker metadata: ~tp ~tp", [Host, Port]), @@ -2463,21 +2478,25 @@ handle_frame_post_auth(Transport, Metadata = lists:foldl(fun(Stream, Acc) -> case maps:get(Stream, Topology) of - {error, Err} -> Acc#{Stream => Err}; + {error, Err} -> + Acc#{Stream => Err}; {ok, #{leader_node := LeaderNode, replica_nodes := Replicas}} -> LeaderInfo = case NodeEndpoints of - #{LeaderNode := Info} -> Info; - _ -> undefined + #{LeaderNode := Info} -> + Info; + _ -> + undefined end, ReplicaInfos = lists:foldr(fun(Replica, A) -> case NodeEndpoints of #{Replica := I} -> [I | A]; - _ -> A + _ -> + A end end, [], Replicas), @@ -2561,7 +2580,8 @@ handle_frame_post_auth(Transport, [RC]) end, case maps:take(CorrelationId, Requests0) of - {#request{content = #{subscription_id := SubscriptionId} = Msg}, Rs} -> + {#request{content = #{subscription_id := SubscriptionId} = Msg}, + Rs} -> Stream = stream_from_consumers(SubscriptionId, Consumers), rabbit_log:debug("Received consumer update response for subscription " "~tp on stream ~tp, correlation ID ~tp", @@ -2610,7 +2630,10 @@ handle_frame_post_auth(Transport, Consumer1 = Consumer#consumer{log = Segment}, #consumer{credit = Crdt, send_limit = SndLmt, - configuration = #consumer_configuration{counters = ConsumerCounters}} = Consumer1, + configuration = + #consumer_configuration{counters = + ConsumerCounters}} = + Consumer1, rabbit_log:debug("Dispatching to subscription ~tp (stream ~tp), " "credit(s) ~tp, send limit ~tp", @@ -2619,7 +2642,8 @@ handle_frame_post_auth(Transport, Crdt, SndLmt]), - ConsumedMessagesBefore = messages_consumed(ConsumerCounters), + ConsumedMessagesBefore = + messages_consumed(ConsumerCounters), Consumer2 = case send_chunks(DeliverVersion, @@ -2643,13 +2667,15 @@ handle_frame_post_auth(Transport, #consumer{log = Log2} = Consumer2, ConsumerOffset = osiris_log:next_offset(Log2), - ConsumedMessagesAfter = messages_consumed(ConsumerCounters), + ConsumedMessagesAfter = + messages_consumed(ConsumerCounters), rabbit_log:debug("Subscription ~tp (stream ~tp) is now at offset ~tp with ~tp " "message(s) distributed after subscription", [SubscriptionId, Stream, ConsumerOffset, - ConsumedMessagesAfter - ConsumedMessagesBefore]), + ConsumedMessagesAfter + - ConsumedMessagesBefore]), Consumers#{SubscriptionId => Consumer2}; #{SubscriptionId := @@ -2663,10 +2689,12 @@ handle_frame_post_auth(Transport, case Msg of #{stepping_down := true} -> ConsumerName = consumer_name(Properties), - rabbit_log:debug("Subscription ~tp on stream ~tp, group ~tp " ++ - "has stepped down, activating consumer", - [SubscriptionId, Stream, ConsumerName]), - _ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost, + rabbit_log:debug("Subscription ~tp on stream ~tp, group ~tp " + ++ "has stepped down, activating consumer", + [SubscriptionId, Stream, + ConsumerName]), + _ = + rabbit_stream_sac_coordinator:activate_consumer(VirtualHost, Stream, ConsumerName), ok; @@ -2815,9 +2843,9 @@ init_reader(ConnectionTransport, [SubscriptionId, osiris_log:next_offset(Segment)]), Segment. - single_active_consumer(#consumer{configuration = - #consumer_configuration{properties = Properties}}) -> + #consumer_configuration{properties = + Properties}}) -> single_active_consumer(Properties); single_active_consumer(#{<<"single-active-consumer">> := <<"true">>}) -> @@ -2868,7 +2896,9 @@ maybe_dispatch_on_subscription(Transport, rabbit_log:debug("Subscription ~tp on ~tp is now at offset ~tp with ~tp " "message(s) distributed after subscription", - [SubscriptionId, Stream, ConsumerOffset, + [SubscriptionId, + Stream, + ConsumerOffset, messages_consumed(ConsumerCounters1)]), rabbit_stream_metrics:consumer_created(self(), @@ -2934,16 +2964,19 @@ maybe_register_consumer(VirtualHost, Active. maybe_send_consumer_update(Transport, - Connection = #stream_connection{ - socket = S, - correlation_id_sequence = CorrIdSeq}, + Connection = + #stream_connection{socket = S, + correlation_id_sequence = + CorrIdSeq}, Consumer, Active, Msg) -> #consumer{configuration = - #consumer_configuration{subscription_id = SubscriptionId}} = Consumer, - Frame = rabbit_stream_core:frame({request, CorrIdSeq, - {consumer_update, SubscriptionId, Active}}), + #consumer_configuration{subscription_id = SubscriptionId}} = + Consumer, + Frame = + rabbit_stream_core:frame({request, CorrIdSeq, + {consumer_update, SubscriptionId, Active}}), Connection1 = register_request(Connection, Msg), @@ -2951,37 +2984,46 @@ maybe_send_consumer_update(Transport, Connection1. register_request(#stream_connection{outstanding_requests = Requests0, - correlation_id_sequence = CorrIdSeq} = C, + correlation_id_sequence = CorrIdSeq} = + C, RequestContent) -> rabbit_log:debug("Registering RPC request ~tp with correlation ID ~tp", [RequestContent, CorrIdSeq]), Requests1 = maps:put(CorrIdSeq, request(RequestContent), Requests0), - ensure_outstanding_requests_timer( - C#stream_connection{correlation_id_sequence = CorrIdSeq + 1, - outstanding_requests = Requests1}). + ensure_outstanding_requests_timer(C#stream_connection{correlation_id_sequence + = CorrIdSeq + 1, + outstanding_requests = + Requests1}). request(Content) -> #request{start = erlang:monotonic_time(millisecond), content = Content}. -ensure_outstanding_requests_timer(#stream_connection{ - outstanding_requests = Requests, - outstanding_requests_timer = undefined - } = C) when map_size(Requests) =:= 0 -> +ensure_outstanding_requests_timer(#stream_connection{outstanding_requests + = Requests, + outstanding_requests_timer + = undefined} = + C) + when map_size(Requests) =:= 0 -> C; -ensure_outstanding_requests_timer(#stream_connection{ - outstanding_requests = Requests, - outstanding_requests_timer = TRef - } = C) when map_size(Requests) =:= 0 -> +ensure_outstanding_requests_timer(#stream_connection{outstanding_requests + = Requests, + outstanding_requests_timer + = TRef} = + C) + when map_size(Requests) =:= 0 -> _ = erlang:cancel_timer(TRef, [{async, true}, {info, false}]), C#stream_connection{outstanding_requests_timer = undefined}; -ensure_outstanding_requests_timer(#stream_connection{ - outstanding_requests = Requests, - outstanding_requests_timer = undefined, - request_timeout = Timeout - } = C) when map_size(Requests) > 0 -> +ensure_outstanding_requests_timer(#stream_connection{outstanding_requests + = Requests, + outstanding_requests_timer + = undefined, + request_timeout = + Timeout} = + C) + when map_size(Requests) > 0 -> TRef = erlang:send_after(Timeout, self(), check_outstanding_requests), C#stream_connection{outstanding_requests_timer = TRef}; ensure_outstanding_requests_timer(C) -> @@ -3001,21 +3043,26 @@ maybe_unregister_consumer(VirtualHost, Requests) -> ConsumerName = consumer_name(Properties), - Requests1 = maps:fold( - fun(_, #request{content = - #{active := false, - subscription_id := SubId, - stepping_down := true}}, Acc) when SubId =:= SubscriptionId -> - _ = rabbit_stream_sac_coordinator:activate_consumer(VirtualHost, + Requests1 = + maps:fold(fun (_, + #request{content = + #{active := false, + subscription_id := SubId, + stepping_down := true}}, + Acc) + when SubId =:= SubscriptionId -> + _ = + rabbit_stream_sac_coordinator:activate_consumer(VirtualHost, Stream, ConsumerName), - rabbit_log:debug("Outstanding SAC activation request for stream '~tp', " ++ - "group '~tp', sending activation.", + rabbit_log:debug("Outstanding SAC activation request for stream '~tp', " + ++ "group '~tp', sending activation.", [Stream, ConsumerName]), Acc; - (K, V, Acc) -> + (K, V, Acc) -> Acc#{K => V} - end, maps:new(), Requests), + end, + maps:new(), Requests), _ = rabbit_stream_sac_coordinator:unregister_consumer(VirtualHost, Stream, @@ -3101,7 +3148,9 @@ clean_state_after_stream_deletion_or_failure(Stream, PublisherToIds, stream_leaders = Leaders, - outstanding_requests = Requests0} = + outstanding_requests + = + Requests0} = C0, #stream_connection_state{consumers = @@ -3111,19 +3160,21 @@ clean_state_after_stream_deletion_or_failure(Stream, case stream_has_subscriptions(Stream, C0) of true -> #{Stream := SubscriptionIds} = StreamSubscriptions, - Requests1 = lists:foldl( - fun(SubId, Rqsts0) -> - rabbit_stream_metrics:consumer_cancelled(self(), - stream_r(Stream, - C0), - SubId), - #{SubId := Consumer} = Consumers, - Rqsts1 = maybe_unregister_consumer( - VirtualHost, Consumer, - single_active_consumer(Consumer), - Rqsts0), - Rqsts1 - end, Requests0, SubscriptionIds), + Requests1 = + lists:foldl(fun(SubId, Rqsts0) -> + rabbit_stream_metrics:consumer_cancelled(self(), + stream_r(Stream, + C0), + SubId), + #{SubId := Consumer} = Consumers, + Rqsts1 = + maybe_unregister_consumer(VirtualHost, + Consumer, + single_active_consumer(Consumer), + Rqsts0), + Rqsts1 + end, + Requests0, SubscriptionIds), {true, C0#stream_connection{stream_subscriptions = maps:remove(Stream, @@ -3150,7 +3201,8 @@ clean_state_after_stream_deletion_or_failure(Stream, PubId), {maps:remove(PubId, Pubs), maps:remove({Stream, Ref}, PubToIds)}; - _ -> {Pubs, PubToIds} + _ -> + {Pubs, PubToIds} end end, {Publishers, PublisherToIds}, Publishers), @@ -3237,11 +3289,11 @@ remove_subscription(SubscriptionId, stream_r(Stream, Connection2), SubscriptionId), - Requests1 = maybe_unregister_consumer( - VirtualHost, Consumer, - single_active_consumer( - Consumer#consumer.configuration#consumer_configuration.properties), - Requests0), + Requests1 = + maybe_unregister_consumer(VirtualHost, + Consumer, + single_active_consumer(Consumer#consumer.configuration#consumer_configuration.properties), + Requests0), {Connection2#stream_connection{outstanding_requests = Requests1}, State#stream_connection_state{consumers = Consumers1}}. @@ -3281,7 +3333,8 @@ demonitor_stream(Stream, Stream -> demonitor(MonitorRef, [flush]), Acc; - _ -> maps:put(MonitorRef, Strm, Acc) + _ -> + maps:put(MonitorRef, Strm, Acc) end end, #{}, Monitors0), @@ -3302,8 +3355,10 @@ stream_has_publishers(Stream, #stream_connection{publishers = Publishers}) -> lists:any(fun(#publisher{stream = S}) -> case S of - Stream -> true; - _ -> false + Stream -> + true; + _ -> + false end end, maps:values(Publishers)). @@ -3399,17 +3454,20 @@ send_chunks(_DeliverVersion, #consumer{send_limit = SendLimit} = Consumer, Credit, LastLstOffset, - _Counter) when Credit =< SendLimit -> + _Counter) + when Credit =< SendLimit -> %% there are fewer credits than the credit limit so we won't enter %% the send_chunks loop until we have more than the limit available. %% Once we have that we are able to consume all credits all the way down %% to zero {ok, - Consumer#consumer{credit = Credit, last_listener_offset = LastLstOffset}}; + Consumer#consumer{credit = Credit, + last_listener_offset = LastLstOffset}}; send_chunks(DeliverVersion, Transport, #consumer{configuration = #consumer_configuration{socket = Socket}, - log = Log} = Consumer, + log = Log} = + Consumer, Credit, LastLstOffset, Counter) -> @@ -3425,8 +3483,8 @@ send_chunks(DeliverVersion, send_chunks(_DeliverVersion, Transport, - #consumer{ - configuration = #consumer_configuration{socket = Socket}} = + #consumer{configuration = + #consumer_configuration{socket = Socket}} = Consumer, Log, 0, @@ -3441,7 +3499,8 @@ send_chunks(_DeliverVersion, last_listener_offset = LastLstOffset}}; send_chunks(DeliverVersion, Transport, - #consumer{configuration = #consumer_configuration{socket = Socket}} = + #consumer{configuration = + #consumer_configuration{socket = Socket}} = Consumer, Log, Credit, @@ -3701,16 +3760,16 @@ i(host, #stream_connection{host = Host}, _) -> i(peer_host, #stream_connection{peer_host = PeerHost}, _) -> PeerHost; i(SSL, #stream_connection{socket = Sock, proxy_socket = ProxySock}, _) - when SSL =:= ssl; - SSL =:= ssl_protocol; - SSL =:= ssl_key_exchange; - SSL =:= ssl_cipher; - SSL =:= ssl_hash -> + when SSL =:= ssl; + SSL =:= ssl_protocol; + SSL =:= ssl_key_exchange; + SSL =:= ssl_cipher; + SSL =:= ssl_hash -> rabbit_ssl:info(SSL, {Sock, ProxySock}); -i(Cert, #stream_connection{socket = Sock},_) - when Cert =:= peer_cert_issuer; - Cert =:= peer_cert_subject; - Cert =:= peer_cert_validity -> +i(Cert, #stream_connection{socket = Sock}, _) + when Cert =:= peer_cert_issuer; + Cert =:= peer_cert_subject; + Cert =:= peer_cert_validity -> rabbit_ssl:cert_info(Cert, Sock); i(channels, _, _) -> 0; @@ -3767,9 +3826,9 @@ setopts(Transport, Sock, Opts) -> stream_from_consumers(SubId, Consumers) -> case Consumers of - #{SubId := #consumer{configuration = #consumer_configuration{stream = S}}} -> + #{SubId := + #consumer{configuration = #consumer_configuration{stream = S}}} -> S; _ -> undefined end. - diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index 2606618eb8a1..f54061078f42 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -235,23 +235,25 @@ sort_partitions(#exchange{type = direct}, Partitions) -> {{_, Order1}, {_, Order2}} -> rabbit_data_coercion:to_integer(Order1) =< rabbit_data_coercion:to_integer(Order2); - {undefined, {_, _Order2}} -> false; - {{_, _Order1}, undefined} -> true; - _ -> true + {undefined, {_, _Order2}} -> + false; + {{_, _Order1}, undefined} -> + true; + _ -> + true end end, Partitions); sort_partitions(#exchange{type = 'x-super-stream'}, Partitions) -> lists:sort(fun(#binding{key = Key1}, #binding{key = Key2}) -> - binary_to_integer(Key1) - =< binary_to_integer(Key2) - end, Partitions). + binary_to_integer(Key1) =< binary_to_integer(Key2) + end, + Partitions). strip_cr_lf(NameBin) -> binary:replace(NameBin, [<<"\n">>, <<"\r">>], <<"">>, [global]). -partition_name(SuperStream, Suffix0) - when is_binary(SuperStream) -> +partition_name(SuperStream, Suffix0) when is_binary(SuperStream) -> Suffix = rabbit_data_coercion:to_binary(Suffix0), <>. diff --git a/deps/rabbitmq_stream/test/commands_SUITE.erl b/deps/rabbitmq_stream/test/commands_SUITE.erl index e1975dfbf037..4a66e5ec112e 100644 --- a/deps/rabbitmq_stream/test/commands_SUITE.erl +++ b/deps/rabbitmq_stream/test/commands_SUITE.erl @@ -800,10 +800,13 @@ start_stream_tls_connection(Port) -> start_stream_connection(ssl, Port). start_stream_connection(Transport, Port) -> - TlsOpts = case Transport of - ssl -> [{verify, verify_none}]; - _ -> [] - end, + TlsOpts = + case Transport of + ssl -> + [{verify, verify_none}]; + _ -> + [] + end, {ok, S} = Transport:connect("localhost", Port, [{active, false}, {mode, binary}] ++ TlsOpts), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 4ca32d0366e5..65c3f2c9a743 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -64,44 +64,42 @@ end_per_suite(Config) -> Config. init_per_group(Group, Config) - when Group == single_node orelse Group == single_node_1 -> - Config1 = rabbit_ct_helpers:set_config( - Config, [{rmq_nodes_clustered, false}, - {rabbitmq_ct_tls_verify, verify_none}, - {rabbitmq_stream, verify_none} - ]), - rabbit_ct_helpers:run_setup_steps( - Config1, - [fun(StepConfig) -> - rabbit_ct_helpers:merge_app_env(StepConfig, - {rabbit, - [{core_metrics_gc_interval, - 1000}]}) - end, - fun(StepConfig) -> - rabbit_ct_helpers:merge_app_env(StepConfig, - {rabbitmq_stream, - [{connection_negotiation_step_timeout, - 500}]}) - end] - ++ rabbit_ct_broker_helpers:setup_steps()); + when Group == single_node orelse Group == single_node_1 -> + Config1 = + rabbit_ct_helpers:set_config(Config, + [{rmq_nodes_clustered, false}, + {rabbitmq_ct_tls_verify, verify_none}, + {rabbitmq_stream, verify_none}]), + rabbit_ct_helpers:run_setup_steps(Config1, + [fun(StepConfig) -> + rabbit_ct_helpers:merge_app_env(StepConfig, + {rabbit, + [{core_metrics_gc_interval, + 1000}]}) + end, + fun(StepConfig) -> + rabbit_ct_helpers:merge_app_env(StepConfig, + {rabbitmq_stream, + [{connection_negotiation_step_timeout, + 500}]}) + end] + ++ rabbit_ct_broker_helpers:setup_steps()); init_per_group(cluster = Group, Config) -> - Config1 = rabbit_ct_helpers:set_config( - Config, [{rmq_nodes_clustered, true}, - {rmq_nodes_count, 3}, - {rmq_nodename_suffix, Group}, - {tcp_ports_base}, - {rabbitmq_ct_tls_verify, verify_none} - ]), - rabbit_ct_helpers:run_setup_steps( - Config1, - [fun(StepConfig) -> - rabbit_ct_helpers:merge_app_env(StepConfig, - {aten, - [{poll_interval, - 1000}]}) - end] - ++ rabbit_ct_broker_helpers:setup_steps()); + Config1 = + rabbit_ct_helpers:set_config(Config, + [{rmq_nodes_clustered, true}, + {rmq_nodes_count, 3}, + {rmq_nodename_suffix, Group}, + {tcp_ports_base}, + {rabbitmq_ct_tls_verify, verify_none}]), + rabbit_ct_helpers:run_setup_steps(Config1, + [fun(StepConfig) -> + rabbit_ct_helpers:merge_app_env(StepConfig, + {aten, + [{poll_interval, + 1000}]}) + end] + ++ rabbit_ct_broker_helpers:setup_steps()); init_per_group(_, Config) -> rabbit_ct_helpers:run_setup_steps(Config). @@ -111,22 +109,28 @@ end_per_group(_, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:teardown_steps()). -init_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) -> - ok = rabbit_ct_broker_helpers:rpc(Config, - 0, - application, - set_env, - [rabbitmq_stream, request_timeout, 2000]), +init_per_testcase(close_connection_on_consumer_update_timeout = + TestCase, + Config) -> + ok = + rabbit_ct_broker_helpers:rpc(Config, + 0, + application, + set_env, + [rabbitmq_stream, request_timeout, 2000]), rabbit_ct_helpers:testcase_started(Config, TestCase); init_per_testcase(TestCase, Config) -> rabbit_ct_helpers:testcase_started(Config, TestCase). -end_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) -> - ok = rabbit_ct_broker_helpers:rpc(Config, - 0, - application, - set_env, - [rabbitmq_stream, request_timeout, 60000]), +end_per_testcase(close_connection_on_consumer_update_timeout = + TestCase, + Config) -> + ok = + rabbit_ct_broker_helpers:rpc(Config, + 0, + application, + set_env, + [rabbitmq_stream, request_timeout, 60000]), rabbit_ct_helpers:testcase_finished(Config, TestCase); end_per_testcase(TestCase, Config) -> rabbit_ct_helpers:testcase_finished(Config, TestCase). @@ -375,8 +379,8 @@ close_connection_on_consumer_update_timeout(Config) -> Transport = gen_tcp, Port = get_stream_port(Config), {ok, S} = - Transport:connect("localhost", Port, - [{active, false}, {mode, binary}]), + Transport:connect("localhost", Port, + [{active, false}, {mode, binary}]), C0 = rabbit_stream_core:init(0), C1 = test_peer_properties(Transport, S, C0), C2 = test_authenticate(Transport, S, C1), @@ -384,7 +388,10 @@ close_connection_on_consumer_update_timeout(Config) -> C3 = test_create_stream(Transport, S, Stream, C2), SubId = 42, - C4 = test_subscribe(Transport, S, SubId, Stream, + C4 = test_subscribe(Transport, + S, + SubId, + Stream, #{<<"single-active-consumer">> => <<"true">>, <<"name">> => <<"foo">>}, C3), @@ -392,8 +399,8 @@ close_connection_on_consumer_update_timeout(Config) -> ?assertMatch({request, _, {consumer_update, SubId, true}}, Cmd), closed = wait_for_socket_close(Transport, S, 10), {ok, Sb} = - Transport:connect("localhost", Port, - [{active, false}, {mode, binary}]), + Transport:connect("localhost", Port, + [{active, false}, {mode, binary}]), Cb0 = rabbit_stream_core:init(0), Cb1 = test_peer_properties(Transport, Sb, Cb0), Cb2 = test_authenticate(Transport, Sb, Cb1), @@ -475,8 +482,7 @@ test_server(Transport, Stream, Config) -> ssl -> [{active, false}, {mode, binary}, {verify, verify_none}] end, - {ok, S} = - Transport:connect("localhost", Port, Opts), + {ok, S} = Transport:connect("localhost", Port, Opts), C0 = rabbit_stream_core:init(0), C1 = test_peer_properties(Transport, S, C0), C2 = test_authenticate(Transport, S, C1), diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl index da0ce3654f99..e388dbc1f2ff 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl @@ -115,7 +115,8 @@ manage_super_stream(Config, Type) -> create_super_stream(Config, #{name => <<"invoices">>, exchange_type => Type, - partitions_source => {partition_count, 3}})), + partitions_source => + {partition_count, 3}})), % get the correct partitions ?assertEqual({ok, [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]}, @@ -126,7 +127,8 @@ manage_super_stream(Config, Type) -> create_super_stream(Config, #{name => <<"invoices">>, exchange_type => Type, - partitions_source => {partition_count, 3}})), + partitions_source => + {partition_count, 3}})), % can delete it ?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)), @@ -139,7 +141,8 @@ manage_super_stream(Config, Type) -> create_super_stream(Config, #{name => <<"invoices">>, exchange_type => Type, - partitions_source => {partition_count, 3}})), + partitions_source => + {partition_count, 3}})), ?assertMatch({ok, _}, delete_stream(Config, <<"invoices-1">>)), ok. @@ -150,7 +153,8 @@ create_super_stream_with_routing_keys(Config) -> ?assertEqual(ok, create_super_stream(Config, #{name => <<"invoices">>, - partitions_source => {routing_keys, RKs}})), + partitions_source => + {routing_keys, RKs}})), ?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)), @@ -159,7 +163,8 @@ create_super_stream_with_routing_keys(Config) -> create_super_stream(Config, #{name => <<"invoices">>, exchange_type => <<"x-super-stream">>, - partitions_source => {routing_keys, RKs}})), + partitions_source => + {routing_keys, RKs}})), ok. route_direct_super_stream(Config) -> @@ -168,7 +173,8 @@ route_direct_super_stream(Config) -> create_super_stream(Config, #{name => <<"invoices">>, exchange_type => <<"direct">>, - partitions_source => {partition_count, 3}})), + partitions_source => + {partition_count, 3}})), % get the correct partitions ?assertEqual({ok, [<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]}, @@ -188,7 +194,8 @@ partition_index(Config) -> create_super_stream(Config, #{name => <<"invoices">>, exchange_type => <<"direct">>, - partitions_source => {partition_count, 3}})), + partitions_source => + {partition_count, 3}})), [?assertEqual({ok, Index}, partition_index(Config, <<"invoices">>, Stream)) || {Index, Stream} @@ -234,7 +241,8 @@ partition_index_x_super_stream(Config) -> create_super_stream(Config, #{name => <<"invoices">>, exchange_type => <<"x-super-stream">>, - partitions_source => {partition_count, 3}})), + partitions_source => + {partition_count, 3}})), [?assertEqual({ok, Index}, partition_index(Config, <<"invoices">>, Stream)) || {Index, Stream} @@ -250,8 +258,7 @@ partition_index_x_super_stream(Config) -> ok. create_super_stream(Config, Spec0) -> - Spec = Spec0#{vhost => <<"/">>, - username => <<"guest">>}, + Spec = Spec0#{vhost => <<"/">>, username => <<"guest">>}, rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_stream_manager, From 234fe7c23a53a993fcc87627c9a8ead609a97864 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 7 Jun 2023 14:44:35 +0100 Subject: [PATCH 5/9] fixes --- ...r.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl | 2 +- deps/rabbitmq_stream/test/rabbit_stream_utils_SUITE.erl | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl index 4b8f68049018..ea8618ff98d9 100644 --- a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.AddSuperStreamCommand.erl @@ -193,7 +193,7 @@ run([SuperStream], routing_keys := RoutingKeysStr} = Opts) -> Spec0 = maps:with([vhost, exchange_type], Opts), - RoutingKeys = [K || K <- string:lexemes(RoutingKeysStr, ", ")], + RoutingKeys = [string:trim(K) || K <- string:lexemes(RoutingKeysStr, ",")], Spec = Spec0#{username => cli_acting_user(), name => SuperStream, diff --git a/deps/rabbitmq_stream/test/rabbit_stream_utils_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_utils_SUITE.erl index 6008918ffa99..64abcbfb9e6e 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_utils_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_utils_SUITE.erl @@ -45,11 +45,13 @@ end_per_testcase(_TestCase, _Config) -> %%%=================================================================== sort_partitions(_Config) -> - [] = rabbit_stream_utils:sort_partitions([]), + Exchange = #exchange{type = direct}, + [] = rabbit_stream_utils:sort_partitions(Exchange, []), ?assertEqual([<<"a">>, <<"b">>, <<"c">>], [S || #binding{destination = #resource{name = S}} - <- rabbit_stream_utils:sort_partitions([binding(<<"c">>, + <- rabbit_stream_utils:sort_partitions(Exchange, + [binding(<<"c">>, 2), binding(<<"b">>, 1), @@ -58,7 +60,8 @@ sort_partitions(_Config) -> ?assertEqual([<<"a">>, <<"c">>, <<"no-order-field">>], [S || #binding{destination = #resource{name = S}} - <- rabbit_stream_utils:sort_partitions([binding(<<"c">>, + <- rabbit_stream_utils:sort_partitions(Exchange, + [binding(<<"c">>, 10), binding(<<"no-order-field">>), binding(<<"a">>, From eee81b02940a78e73fb173b64e2b7f24b2889c3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 7 Jun 2023 17:05:14 +0200 Subject: [PATCH 6/9] Add test for super stream exchange --- .../test/rabbit_stream_SUITE.erl | 5 + .../test/rabbit_stream_SUITE_data/Makefile | 1 + .../test/rabbit_stream_SUITE_data/pom.xml | 20 ++- .../stream/SuperStreamExchangeTest.java | 148 ++++++++++++++++++ .../java/com/rabbitmq/stream/TestUtils.java | 79 +++++++++- 5 files changed, 243 insertions(+), 10 deletions(-) create mode 100644 deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/SuperStreamExchangeTest.java diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 65c3f2c9a743..330015874214 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -424,6 +424,7 @@ java(Config) -> StreamPortNode2 = get_stream_port(Config, 1), StreamPortTlsNode1 = get_stream_port_tls(Config, 0), StreamPortTlsNode2 = get_stream_port_tls(Config, 1), + AmqpPortNode1 = get_amqp_port(Config), Node1Name = get_node_name(Config, 0), Node2Name = get_node_name(Config, 1), RabbitMqCtl = get_rabbitmqctl(Config), @@ -439,6 +440,7 @@ java(Config) -> {"NODE2_STREAM_PORT=~b", [StreamPortNode2]}, {"NODE2_STREAM_PORT_TLS=~b", [StreamPortTlsNode2]}, + {"NODE1_AMQP_PORT=~b", [AmqpPortNode1]}, {"RABBITMQCTL=~tp", [RabbitMqCtl]}]), {ok, _} = MakeResult. @@ -459,6 +461,9 @@ get_stream_port_tls(Config, Node) -> rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_stream_tls). +get_amqp_port(Config) -> + rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp). + get_node_name(Config) -> get_node_name(Config, 0). diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/Makefile b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/Makefile index 0b3647e9e9d8..4909c9935898 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/Makefile +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/Makefile @@ -7,6 +7,7 @@ MVN_FLAGS += -Dhostname=$(HOSTNAME) \ -Dnode2.name=$(NODE2_NAME) \ -Dnode2.stream.port=$(NODE2_STREAM_PORT) \ -Dnode2.stream.port.tls=$(NODE2_STREAM_PORT_TLS) \ + -Dnode1.amqp.port=$(NODE1_AMQP_PORT) \ -Drabbitmqctl.bin=$(RABBITMQCTL) .PHONY: tests clean diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml index f238dc168cd3..ace38706493b 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml @@ -27,13 +27,14 @@ [0.7.0-SNAPSHOT,) - 5.9.0 - 3.23.1 + 5.17.0 + 5.9.3 + 3.24.2 1.2.11 - 3.10.1 - 2.22.2 - 2.24.0 - 1.15.0 + 3.11.0 + 3.1.0 + 2.37.0 + 1.17.0 UTF-8 @@ -45,6 +46,13 @@ ${stream-client.version} + + com.rabbitmq + amqp-client + ${amqp-client.version} + test + + org.junit.jupiter junit-jupiter-engine diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/SuperStreamExchangeTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/SuperStreamExchangeTest.java new file mode 100644 index 000000000000..d31c1cc0109b --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/SuperStreamExchangeTest.java @@ -0,0 +1,148 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream; + +import static com.rabbitmq.stream.TestUtils.*; +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.stream.TestUtils.CallableConsumer; +import io.netty.channel.EventLoopGroup; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.function.BiConsumer; +import java.util.stream.IntStream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +public class SuperStreamExchangeTest { + + EventLoopGroup eventLoopGroup; + + Environment environment; + + Connection connection; + int partitions = 3; + int messageCount = 10_000; + String superStream; + + @BeforeEach + void init(TestInfo info) throws Exception { + EnvironmentBuilder environmentBuilder = + Environment.builder() + .port(TestUtils.streamPortNode1()) + .netty() + .eventLoopGroup(eventLoopGroup) + .environmentBuilder(); + environment = environmentBuilder.build(); + ConnectionFactory cf = new ConnectionFactory(); + cf.setPort(TestUtils.amqpPortNode1()); + connection = cf.newConnection(); + superStream = TestUtils.streamName(info); + } + + @AfterEach + void tearDown() throws Exception { + environment.close(); + deleteSuperStreamTopology(connection, superStream, partitions); + connection.close(); + } + + @Test + void publish() throws Exception { + declareSuperStreamTopology(connection, superStream, partitions); + List routingKeys = new ArrayList<>(messageCount); + IntStream.range(0, messageCount) + .forEach(ignored -> routingKeys.add(UUID.randomUUID().toString())); + + CountDownLatch publishLatch = new CountDownLatch(messageCount); + try (Producer producer = + environment + .producerBuilder() + .superStream(superStream) + .routing(msg -> msg.getProperties().getMessageIdAsString()) + .producerBuilder() + .build()) { + ConfirmationHandler confirmationHandler = status -> publishLatch.countDown(); + routingKeys.forEach( + rk -> + producer.send( + producer.messageBuilder().properties().messageId(rk).messageBuilder().build(), + confirmationHandler)); + + assertThat(publishLatch).is(completed()); + } + + CallableConsumer>> consumeMessages = + receivedMessages -> { + CountDownLatch consumeLatch = new CountDownLatch(messageCount); + try (Consumer ignored = + environment + .consumerBuilder() + .superStream(superStream) + .offset(OffsetSpecification.first()) + .messageHandler( + (ctx, msg) -> { + receivedMessages + .computeIfAbsent(ctx.stream(), k -> ConcurrentHashMap.newKeySet()) + .add(msg.getProperties().getMessageIdAsString()); + consumeLatch.countDown(); + }) + .build()) { + + assertThat(consumeLatch).is(completed()); + assertThat(receivedMessages.values().stream().mapToInt(Set::size).sum()) + .isEqualTo(messageCount); + } + }; + + Map> streamProducerMessages = new ConcurrentHashMap<>(partitions); + consumeMessages.accept(streamProducerMessages); + + deleteSuperStreamTopology(connection, superStream, partitions); + declareSuperStreamTopology(connection, superStream, partitions); + + try (Channel channel = connection.createChannel()) { + channel.confirmSelect(); + for (String rk : routingKeys) { + channel.basicPublish( + superStream, rk, new AMQP.BasicProperties.Builder().messageId(rk).build(), null); + } + channel.waitForConfirmsOrDie(); + } + + Map> amqpProducerMessages = new ConcurrentHashMap<>(partitions); + consumeMessages.accept(amqpProducerMessages); + assertThat(amqpProducerMessages) + .hasSameSizeAs(streamProducerMessages) + .containsKeys(streamProducerMessages.keySet().toArray(new String[] {})); + + BiConsumer, Set> compareSets = + (s1, s2) -> { + assertThat(s1).hasSameSizeAs(s2); + s1.forEach(rk -> assertThat(s2).contains(rk)); + }; + + amqpProducerMessages.forEach( + (key, value) -> compareSets.accept(value, streamProducerMessages.get(key))); + } +} diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java index 3ddb695f4b65..9daa8085e942 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java @@ -21,6 +21,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.fail; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; import com.rabbitmq.stream.impl.Client; import com.rabbitmq.stream.impl.Client.Response; import io.netty.channel.EventLoopGroup; @@ -28,10 +30,11 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.function.BooleanSupplier; +import java.util.stream.IntStream; import org.assertj.core.api.Condition; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.*; @@ -40,12 +43,17 @@ public class TestUtils { static int streamPortNode1() { String port = System.getProperty("node1.stream.port", "5552"); - return Integer.valueOf(port); + return Integer.parseInt(port); } static int streamPortNode2() { String port = System.getProperty("node2.stream.port", "5552"); - return Integer.valueOf(port); + return Integer.parseInt(port); + } + + static int amqpPortNode1() { + String port = System.getProperty("node1.amqp.port", "5672"); + return Integer.parseInt(port); } static void waitUntil(BooleanSupplier condition) throws InterruptedException { @@ -218,4 +226,67 @@ static Condition responseCode(short expectedResponse) { expectedResponse); } } + + static void deleteSuperStreamTopology(Connection connection, String superStream, int partitions) + throws Exception { + String[] routingKeys = + IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new); + try (Channel ch = connection.createChannel()) { + ch.exchangeDelete(superStream); + for (String routingKey : routingKeys) { + String partitionName = superStream + "-" + routingKey; + ch.queueDelete(partitionName); + } + } + } + + static void declareSuperStreamTopology(Connection connection, String superStream, int partitions) + throws Exception { + String[] rks = IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new); + try (Channel ch = connection.createChannel()) { + ch.exchangeDeclare( + superStream, + "x-super-stream", + true, + false, + Collections.singletonMap("x-super-stream", true)); + + List bindings = new ArrayList<>(rks.length); + for (int i = 0; i < rks.length; i++) { + bindings.add(new Object[] {rks[i], i}); + } + // shuffle the order to make sure we get in the correct order from the server + Collections.shuffle(bindings); + + for (Object[] binding : bindings) { + String routingKey = (String) binding[0]; + String partitionName = superStream + "-" + routingKey; + ch.queueDeclare( + partitionName, true, false, false, Collections.singletonMap("x-queue-type", "stream")); + ch.queueBind( + partitionName, + superStream, + routingKey, + Collections.singletonMap("x-stream-partition-order", binding[1])); + } + } + } + + static Condition completed() { + return new Condition<>( + countDownLatch -> { + try { + return countDownLatch.await(10, SECONDS); + } catch (InterruptedException e) { + Thread.interrupted(); + throw new RuntimeException(e); + } + }, + "not completed after 10 seconds"); + } + + interface CallableConsumer { + + void accept(T o) throws Exception; + } } From c11388cb0881724a9cd4a70f9f46ff6a8ddd8e01 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 7 Jun 2023 16:50:22 +0100 Subject: [PATCH 7/9] bazel --- deps/rabbitmq_stream/BUILD.bazel | 5 ++++- deps/rabbitmq_stream/app.bzl | 3 +++ moduleindex.yaml | 18 ++++++++++++++++++ 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/deps/rabbitmq_stream/BUILD.bazel b/deps/rabbitmq_stream/BUILD.bazel index 2018bef39ceb..2fb7d8736bd6 100644 --- a/deps/rabbitmq_stream/BUILD.bazel +++ b/deps/rabbitmq_stream/BUILD.bazel @@ -60,7 +60,10 @@ rabbitmq_app( app_module = APP_MODULE, app_name = APP_NAME, beam_files = [":beam_files"], - extra_apps = ["ssl"], + extra_apps = [ + "ssl", + "murmerl3", + ], license_files = [":license_files"], priv = [":priv"], deps = [ diff --git a/deps/rabbitmq_stream/app.bzl b/deps/rabbitmq_stream/app.bzl index fa4c8e801491..27510a5d41c3 100644 --- a/deps/rabbitmq_stream/app.bzl +++ b/deps/rabbitmq_stream/app.bzl @@ -16,6 +16,7 @@ def all_beam_files(name = "all_beam_files"): "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand.erl", "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand.erl", "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand.erl", + "src/rabbit_exchange_type_super_stream.erl", "src/rabbit_stream.erl", "src/rabbit_stream_connection_sup.erl", "src/rabbit_stream_manager.erl", @@ -55,6 +56,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand.erl", "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand.erl", "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand.erl", + "src/rabbit_exchange_type_super_stream.erl", "src/rabbit_stream.erl", "src/rabbit_stream_connection_sup.erl", "src/rabbit_stream_manager.erl", @@ -104,6 +106,7 @@ def all_srcs(name = "all_srcs"): "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand.erl", "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand.erl", "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand.erl", + "src/rabbit_exchange_type_super_stream.erl", "src/rabbit_stream.erl", "src/rabbit_stream_connection_sup.erl", "src/rabbit_stream_manager.erl", diff --git a/moduleindex.yaml b/moduleindex.yaml index 650f89584793..36ca0a492601 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -111,6 +111,8 @@ credentials_obfuscation: ct_helper: - ct_helper - ct_helper_error_h +cth_styledout: +- cth_styledout cuttlefish: - conf_parse - cuttlefish @@ -314,6 +316,10 @@ jose: - jose_jws_alg_rsa_pkcs1_v1_5 - jose_jws_alg_rsa_pss - jose_jwt +katana_code: +- ktn_code +- ktn_dodger +- ktn_io_string meck: - meck - meck_args_matcher @@ -326,6 +332,8 @@ meck: - meck_proc - meck_ret_spec - meck_util +murmerl3: +- murmerl3 my_plugin: - my_plugin observer_cli: @@ -1083,6 +1091,7 @@ rabbitmq_stream: - Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand - Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand - Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand +- rabbit_exchange_type_super_stream - rabbit_stream - rabbit_stream_connection_sup - rabbit_stream_manager @@ -1179,6 +1188,15 @@ ranch: - ranch_sup - ranch_tcp - ranch_transport +rebar3_format: +- default_formatter +- erlfmt_formatter +- otp_formatter +- rebar3_ast_formatter +- rebar3_format +- rebar3_format_prv +- rebar3_formatter +- sr_formatter recon: - recon - recon_alloc From 9c2581451b140fa1fef0148e04389c2e94b84fd0 Mon Sep 17 00:00:00 2001 From: Rin Kuryloski Date: Thu, 8 Jun 2023 12:28:12 +0200 Subject: [PATCH 8/9] Update bazel build `# gazelle:erlang_app_dep murmerl3` will not be necessary after an update is made to rules_erlang --- MODULE.bazel | 7 +++ bazel/BUILD.murmerl3 | 85 ++++++++++++++++++++++++++++++++ deps/rabbitmq_stream/BUILD.bazel | 8 +-- 3 files changed, 96 insertions(+), 4 deletions(-) create mode 100644 bazel/BUILD.murmerl3 diff --git a/MODULE.bazel b/MODULE.bazel index c03d7dde27c6..a358c76d1f06 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -246,6 +246,12 @@ erlang_package.hex_package( version = "1.4.1", ) +erlang_package.git_package( + branch = "master", + build_file = "@//:bazel/BUILD.murmerl3", + repository = "rabbitmq/murmerl3", +) + erlang_package.hex_package( name = "thoas", build_file = "@rabbitmq-server//bazel:BUILD.thoas", @@ -357,6 +363,7 @@ use_repo( "gun", "jose", "json", + "murmerl3", "observer_cli", "osiris", "prometheus", diff --git a/bazel/BUILD.murmerl3 b/bazel/BUILD.murmerl3 new file mode 100644 index 000000000000..1eb31e1ea38b --- /dev/null +++ b/bazel/BUILD.murmerl3 @@ -0,0 +1,85 @@ +load("@rules_erlang//:erlang_bytecode2.bzl", "erlang_bytecode", "erlc_opts") +load("@rules_erlang//:erlang_app.bzl", "erlang_app") + +erlc_opts( + name = "erlc_opts", + values = select({ + "@rules_erlang//:debug_build": [ + "+debug_info", + ], + "//conditions:default": [ + "+debug_info", + "+deterministic", + ], + }), + visibility = [":__subpackages__"], +) + +erlang_bytecode( + name = "other_beam", + srcs = [ + "src/murmerl3.erl", + ], + hdrs = [":public_and_private_hdrs"], + app_name = "murmerl3", + dest = "ebin", + erlc_opts = "//:erlc_opts", +) + +filegroup( + name = "beam_files", + srcs = [":other_beam"], +) + +filegroup( + name = "srcs", + srcs = [ + "src/murmerl3.app.src", + "src/murmerl3.erl", + ], +) + +filegroup(name = "private_hdrs") + +filegroup(name = "public_hdrs") + +filegroup(name = "priv") + +filegroup( + name = "license_files", + srcs = [ + "LICENSE", + ], +) + +filegroup( + name = "public_and_private_hdrs", + srcs = [ + ":private_hdrs", + ":public_hdrs", + ], +) + +filegroup( + name = "all_srcs", + srcs = [ + ":public_and_private_hdrs", + ":srcs", + ], +) + +erlang_app( + name = "erlang_app", + srcs = [":all_srcs"], + hdrs = [":public_hdrs"], + app_name = "murmerl3", + beam_files = [":beam_files"], + license_files = [":license_files"], + priv = [":priv"], +) + +alias( + name = "murmerl3", + actual = ":erlang_app", + visibility = ["//visibility:public"], +) diff --git a/deps/rabbitmq_stream/BUILD.bazel b/deps/rabbitmq_stream/BUILD.bazel index 2fb7d8736bd6..dfae94638ddd 100644 --- a/deps/rabbitmq_stream/BUILD.bazel +++ b/deps/rabbitmq_stream/BUILD.bazel @@ -49,6 +49,8 @@ test_suite_beam_files(name = "test_suite_beam_files") # gazelle:erlang_app_extra_app ssl +# gazelle:erlang_app_dep murmerl3 + # gazelle:erlang_app_dep_exclude rabbit_common rabbitmq_app( @@ -60,15 +62,13 @@ rabbitmq_app( app_module = APP_MODULE, app_name = APP_NAME, beam_files = [":beam_files"], - extra_apps = [ - "ssl", - "murmerl3", - ], + extra_apps = ["ssl"], license_files = [":license_files"], priv = [":priv"], deps = [ "//deps/rabbit:erlang_app", "//deps/rabbitmq_stream_common:erlang_app", + "@murmerl3//:erlang_app", "@osiris//:erlang_app", "@ranch//:erlang_app", ], From a288ac308c8f83f106a27bf9afe40b9341654b60 Mon Sep 17 00:00:00 2001 From: Rin Kuryloski Date: Thu, 8 Jun 2023 14:20:17 +0200 Subject: [PATCH 9/9] Use the latest rules_erlang rules_erlang 3.10.7 contains a fix for the discovery of the dependency on murmerl3 by rabbitmq_stream, so the explicit gazelle directive for the dep can be removed --- MODULE.bazel | 2 +- deps/rabbitmq_stream/BUILD.bazel | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/MODULE.bazel b/MODULE.bazel index a358c76d1f06..5def34ebff89 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -31,7 +31,7 @@ bazel_dep( bazel_dep( name = "rules_erlang", - version = "3.10.5", + version = "3.10.7", ) bazel_dep( diff --git a/deps/rabbitmq_stream/BUILD.bazel b/deps/rabbitmq_stream/BUILD.bazel index dfae94638ddd..a66f3eda6e21 100644 --- a/deps/rabbitmq_stream/BUILD.bazel +++ b/deps/rabbitmq_stream/BUILD.bazel @@ -49,8 +49,6 @@ test_suite_beam_files(name = "test_suite_beam_files") # gazelle:erlang_app_extra_app ssl -# gazelle:erlang_app_dep murmerl3 - # gazelle:erlang_app_dep_exclude rabbit_common rabbitmq_app(