Skip to content

Commit

Permalink
Add test for stream filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
acogoluegnes committed May 25, 2023
1 parent d75d8e7 commit c1e1353
Showing 1 changed file with 47 additions and 1 deletion.
48 changes: 47 additions & 1 deletion deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ groups() ->
[{single_node, [],
[test_stream,
test_stream_tls,
test_publish_v2,
test_gc_consumers,
test_gc_publishers,
unauthenticated_client_rejected_tcp_connected,
Expand Down Expand Up @@ -172,6 +173,36 @@ test_stream_tls(Config) ->
test_server(ssl, Stream, Config),
ok.

test_publish_v2(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
Transport = gen_tcp,
Port = get_stream_port(Config),
Opts = [{active, false}, {mode, binary}],
{ok, S} =
Transport:connect("localhost", Port, Opts),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
C3 = test_create_stream(Transport, S, Stream, C2),
PublisherId = 42,
C4 = test_declare_publisher(Transport, S, PublisherId, Stream, C3),
Body = <<"hello">>,
C5 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body, C4),
C6 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body, C5),
SubscriptionId = 42,
C7 = test_subscribe(Transport, S, SubscriptionId, Stream,
#{<<"filter.0">> => <<"foo">>},
C6),
C8 = test_deliver(Transport, S, SubscriptionId, 0, Body, C7),
C8b = test_deliver(Transport, S, SubscriptionId, 1, Body, C8),

C9 = test_unsubscribe(Transport, S, SubscriptionId, C8b),

C10 = test_delete_stream(Transport, S, Stream, C9),
_C11 = test_close(Transport, S, C10),
closed = wait_for_socket_close(Transport, S, 10),
ok.

test_metadata(Config) ->
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
Transport = gen_tcp,
Expand Down Expand Up @@ -618,10 +649,25 @@ test_declare_publisher(Transport, S, PublisherId, Stream, C0) ->
C.

test_publish_confirm(Transport, S, PublisherId, Body, C0) ->
test_publish_confirm(Transport, S, publish, PublisherId, Body, C0).

test_publish_confirm(Transport, S, publish = PublishCmd, PublisherId, Body, C0) ->
BodySize = byte_size(Body),
Messages = [<<1:64, 0:1, BodySize:31, Body:BodySize/binary>>],
PublishFrame =
rabbit_stream_core:frame({publish, PublisherId, 1, Messages}),
rabbit_stream_core:frame({PublishCmd, PublisherId, 1, Messages}),
ok = Transport:send(S, PublishFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
?assertMatch({publish_confirm, PublisherId, [1]}, Cmd),
C;
test_publish_confirm(Transport, S, publish_v2 = PublishCmd, PublisherId, Body, C0) ->
BodySize = byte_size(Body),
FilterValue = <<"foo">>,
FilterValueSize = byte_size(FilterValue),
Messages = [<<1:64, FilterValueSize:16, FilterValue:FilterValueSize/binary,
0:1, BodySize:31, Body:BodySize/binary>>],
PublishFrame =
rabbit_stream_core:frame({PublishCmd, PublisherId, 1, Messages}),
ok = Transport:send(S, PublishFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
?assertMatch({publish_confirm, PublisherId, [1]}, Cmd),
Expand Down

0 comments on commit c1e1353

Please # to comment.