Skip to content

Commit

Permalink
implement usage of message containers for shovels
Browse files Browse the repository at this point in the history
- remove old code converinting message properties to map and any
  related conversion function
- use mc to hold the message
- implement some helper functions to convert amqp10 to mc
- use some reasonable default for AMQP1.0 exchange and routing keys
  as they are mandatory for mc to work
- the differences in behaviour are the following compared to 3.12.x
  - default priority is undefined instead of 4
  - some long tagged amqp values are now signedint / unsignedint
  • Loading branch information
luos committed Dec 14, 2023
1 parent 4bdba10 commit 2695dd9
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 216 deletions.
5 changes: 5 additions & 0 deletions deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
new/3,
set_handle/2,
set_settled/2,
set_delivery_tag/2,
set_message_format/2,
set_headers/2,
set_properties/2,
Expand Down Expand Up @@ -298,6 +299,10 @@ set_handle(Handle, #amqp10_msg{transfer = T} = Msg) ->
set_settled(Settled, #amqp10_msg{transfer = T} = Msg) ->
Msg#amqp10_msg{transfer = T#'v1_0.transfer'{settled = Settled}}.

-spec set_delivery_tag(binary(), amqp10_msg()) -> amqp10_msg().
set_delivery_tag(Tag, #amqp10_msg{transfer = T} = Msg) when is_binary(Tag) ->
Msg#amqp10_msg{transfer = T#'v1_0.transfer'{delivery_tag = {binary, Tag}}}.

%% @doc Set amqp message headers.
-spec set_headers(#{atom() => any()}, amqp10_msg()) -> amqp10_msg().
set_headers(Headers, #amqp10_msg{header = undefined} = Msg) ->
Expand Down
4 changes: 4 additions & 0 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
init(Sections) when is_list(Sections) ->
Msg = decode(Sections, #msg{}),
init(Msg);
init(Amqp10Msg) when element(1, Amqp10Msg) =:= amqp10_msg ->
[#'v1_0.transfer'{}| AmqpRecords] = amqp10_msg:to_amqp_records(Amqp10Msg),
Msg = decode(AmqpRecords, #msg{}),
init(Msg);
init(#msg{} = Msg) ->
%% TODO: as the essential annotations, durable, priority, ttl and delivery_count
%% is all we are interested in it isn't necessary to keep hold of the
Expand Down
89 changes: 25 additions & 64 deletions deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_shovel.hrl").


-export([
parse/2,
source_uri/1,
Expand All @@ -31,7 +32,7 @@
ack/3,
nack/3,
status/1,
forward/4
forward/3
]).

%% Function references should not be stored on the metadata store.
Expand Down Expand Up @@ -167,8 +168,8 @@ forward_pending(State) ->
case pop_pending(State) of
empty ->
State;
{{Tag, Props, Payload}, S} ->
S2 = do_forward(Tag, Props, Payload, S),
{{Tag, Msg}, S} ->
S2 = do_forward(Tag, Msg, S),
S3 = control_throttle(S2),
case is_blocked(S3) of
true ->
Expand All @@ -181,91 +182,51 @@ forward_pending(State) ->
end
end.

forward(IncomingTag, Props, Payload, State) ->
forward(IncomingTag, Msg, State) ->
case is_blocked(State) of
true ->
%% We are blocked by client-side flow-control and/or
%% `connection.blocked` message from the destination
%% broker. Simply cache the forward.
PendingEntry = {IncomingTag, Props, Payload},
PendingEntry = {IncomingTag, Msg},
add_pending(PendingEntry, State);
false ->
State1 = do_forward(IncomingTag, Props, Payload, State),
State1 = do_forward(IncomingTag, Msg, State),
control_throttle(State1)
end.

do_forward(IncomingTag, Props, Payload,
do_forward(IncomingTag, InMsg,
State0 = #{dest := #{props_fun := {M, F, Args},
current := {_, _, DstUri},
fields_fun := {Mf, Ff, Argsf}}}) ->
SrcUri = rabbit_shovel_behaviour:source_uri(State0),
% do publish
Exchange = maps:get(exchange, Props, undefined),
RoutingKey = maps:get(routing_key, Props, undefined),
Method = #'basic.publish'{exchange = Exchange, routing_key = RoutingKey},

Mc = mc:convert(mc_amqpl, InMsg),

Exchange = mc:get_annotation(exchange, Mc),
RoutingKey = case mc:get_annotation(routing_keys, Mc) of
[Rk | _] -> Rk;
_ -> undefined
end,
{Props, Payload} = rabbit_basic:from_content(mc:protocol_state(Mc)),

Method = #'basic.publish'{exchange = Exchange,
routing_key = RoutingKey},
Method1 = apply(Mf, Ff, Argsf ++ [SrcUri, DstUri, Method]),
Msg1 = #amqp_msg{props = apply(M, F, Args ++ [SrcUri, DstUri, props_from_map(Props)]),
Msg1 = #amqp_msg{props = apply(M, F, Args ++ [SrcUri, DstUri, Props]),
payload = Payload},
publish(IncomingTag, Method1, Msg1, State0).

props_from_map(Map) ->
#'P_basic'{content_type = maps:get(content_type, Map, undefined),
content_encoding = maps:get(content_encoding, Map, undefined),
headers = maps:get(headers, Map, undefined),
delivery_mode = maps:get(delivery_mode, Map, undefined),
priority = maps:get(priority, Map, undefined),
correlation_id = maps:get(correlation_id, Map, undefined),
reply_to = maps:get(reply_to, Map, undefined),
expiration = maps:get(expiration, Map, undefined),
message_id = maps:get(message_id, Map, undefined),
timestamp = maps:get(timestamp, Map, undefined),
type = maps:get(type, Map, undefined),
user_id = maps:get(user_id, Map, undefined),
app_id = maps:get(app_id, Map, undefined),
cluster_id = maps:get(cluster_id, Map, undefined)}.

map_from_props(#'P_basic'{content_type = Content_type,
content_encoding = Content_encoding,
headers = Headers,
delivery_mode = Delivery_mode,
priority = Priority,
correlation_id = Correlation_id,
reply_to = Reply_to,
expiration = Expiration,
message_id = Message_id,
timestamp = Timestamp,
type = Type,
user_id = User_id,
app_id = App_id,
cluster_id = Cluster_id}) ->
lists:foldl(fun({_K, undefined}, Acc) -> Acc;
({K, V}, Acc) -> Acc#{K => V}
end, #{}, [{content_type, Content_type},
{content_encoding, Content_encoding},
{headers, Headers},
{delivery_mode, Delivery_mode},
{priority, Priority},
{correlation_id, Correlation_id},
{reply_to, Reply_to},
{expiration, Expiration},
{message_id, Message_id},
{timestamp, Timestamp},
{type, Type},
{user_id, User_id},
{app_id, App_id},
{cluster_id, Cluster_id}
]).

handle_source(#'basic.consume_ok'{}, State) ->
State;
handle_source({#'basic.deliver'{delivery_tag = Tag,
exchange = Exchange,
routing_key = RoutingKey},
#amqp_msg{props = Props0, payload = Payload}}, State) ->
Props = (map_from_props(Props0))#{exchange => Exchange,
routing_key => RoutingKey},
% forward to destination
rabbit_shovel_behaviour:forward(Tag, Props, Payload, State);
Anns = #{exchange => Exchange, routing_keys => [RoutingKey]},
Content = rabbit_basic:build_content(Props0, [Payload]),
Mc = mc:init(mc_amqpl, Content, Anns),
rabbit_shovel_behaviour:forward(Tag, Mc, State);

handle_source({'EXIT', Conn, Reason},
#{source := #{current := {Conn, _, _}}}) ->
Expand Down
Loading

0 comments on commit 2695dd9

Please # to comment.