diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index c5b661651e68..8e676225b53a 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -910,7 +910,7 @@ ensure_credential_expiry_timer(User) -> ok; false -> protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, - "Credential expired ~b ms ago", [Time]) + "Credential expired ~b ms ago", [abs(Time)]) end end. diff --git a/deps/rabbitmq_auth_backend_oauth2/BUILD.bazel b/deps/rabbitmq_auth_backend_oauth2/BUILD.bazel index 6529f4a3622b..85de4faebfc7 100644 --- a/deps/rabbitmq_auth_backend_oauth2/BUILD.bazel +++ b/deps/rabbitmq_auth_backend_oauth2/BUILD.bazel @@ -93,7 +93,7 @@ eunit( broker_for_integration_suites( extra_plugins = [ - "//deps/rabbitmq_mqtt:erlang_app", + "//deps/rabbitmq_web_mqtt:erlang_app", ], ) diff --git a/deps/rabbitmq_auth_backend_oauth2/Makefile b/deps/rabbitmq_auth_backend_oauth2/Makefile index 96f8cf6a2970..4bdbabcde617 100644 --- a/deps/rabbitmq_auth_backend_oauth2/Makefile +++ b/deps/rabbitmq_auth_backend_oauth2/Makefile @@ -8,7 +8,7 @@ export BUILD_WITHOUT_QUIC LOCAL_DEPS = inets public_key BUILD_DEPS = rabbit_common DEPS = rabbit cowlib jose base64url oauth2_client -TEST_DEPS = cowboy rabbitmq_web_dispatch rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client rabbitmq_mqtt emqtt +TEST_DEPS = cowboy rabbitmq_web_dispatch rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client rabbitmq_web_mqtt emqtt PLT_APPS += rabbitmqctl diff --git a/deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl b/deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl index 9e1b8159e345..9f4d7723771e 100644 --- a/deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl +++ b/deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl @@ -43,7 +43,10 @@ groups() -> test_failed_connection_with_a_non_token, test_failed_connection_with_a_token_with_insufficient_vhost_permission, test_failed_connection_with_a_token_with_insufficient_resource_permission, - more_than_one_resource_server_id_not_allowed_in_one_token + more_than_one_resource_server_id_not_allowed_in_one_token, + mqtt_expirable_token, + web_mqtt_expirable_token, + mqtt_expired_token ]}, {token_refresh, [], [ @@ -422,15 +425,80 @@ mqtt(Config) -> {ok, Pub} = emqtt:start_link([{clientid, <<"mqtt-publisher">>} | Opts]), {ok, _} = emqtt:connect(Pub), {ok, _} = emqtt:publish(Pub, Topic, Payload, at_least_once), - receive - {publish, #{client_pid := Sub, - topic := Topic, - payload := Payload}} -> ok + receive {publish, #{client_pid := Sub, + topic := Topic, + payload := Payload}} -> ok after 1000 -> ct:fail("no publish received") end, ok = emqtt:disconnect(Sub), ok = emqtt:disconnect(Pub). +mqtt_expirable_token(Config) -> + mqtt_expirable_token0(tcp_port_mqtt, + [], + fun emqtt:connect/1, + Config). + +web_mqtt_expirable_token(Config) -> + mqtt_expirable_token0(tcp_port_web_mqtt, + [{ws_path, "/ws"}], + fun emqtt:ws_connect/1, + Config). + +mqtt_expirable_token0(Port, AdditionalOpts, Connect, Config) -> + Topic = <<"test/topic">>, + Payload = <<"mqtt-test-message">>, + + Seconds = 4, + Millis = Seconds * 1000, + {_Algo, Token} = generate_expirable_token(Config, + [<<"rabbitmq.configure:*/*/*">>, + <<"rabbitmq.write:*/*/*">>, + <<"rabbitmq.read:*/*/*">>], + Seconds), + + Opts = [{port, rabbit_ct_broker_helpers:get_node_config(Config, 0, Port)}, + {proto_ver, v5}, + {username, <<"">>}, + {password, Token}] ++ AdditionalOpts, + {ok, Sub} = emqtt:start_link([{clientid, <<"my subscriber">>} | Opts]), + {ok, _} = Connect(Sub), + {ok, _, [1]} = emqtt:subscribe(Sub, Topic, at_least_once), + {ok, Pub} = emqtt:start_link([{clientid, <<"my publisher">>} | Opts]), + {ok, _} = Connect(Pub), + {ok, _} = emqtt:publish(Pub, Topic, Payload, at_least_once), + receive {publish, #{client_pid := Sub, + topic := Topic, + payload := Payload}} -> ok + after 1000 -> ct:fail("no publish received") + end, + + %% reason code "Maximum connect time" defined in + %% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901208 + ReasonCode = 16#A0, + true = unlink(Sub), + true = unlink(Pub), + + %% In 4 seconds from now, we expect that RabbitMQ disconnects us because our token expired. + receive {disconnected, ReasonCode, _} -> ok + after Millis * 2 -> ct:fail("missing DISCONNECT packet from server") + end, + receive {disconnected, ReasonCode, _} -> ok + after Millis * 2 -> ct:fail("missing DISCONNECT packet from server") + end. + +mqtt_expired_token(Config) -> + {_Algo, Token} = generate_expired_token(Config), + Opts = [{port, rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt)}, + {proto_ver, v5}, + {username, <<"">>}, + {password, Token}], + ClientId = atom_to_binary(?FUNCTION_NAME), + {ok, C} = emqtt:start_link([{clientid, ClientId} | Opts]), + true = unlink(C), + ?assertMatch({error, {bad_username_or_password, _}}, + emqtt:connect(C)). + test_successful_connection_with_complex_claim_as_a_map(Config) -> {_Algo, Token} = generate_valid_token_with_extra_fields( Config, diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index eeea5b8a8295..d0da340bb711 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -189,6 +189,7 @@ process_connect( ok ?= check_user_connection_limit(Username), {ok, AuthzCtx} ?= check_vhost_access(VHost, User, ClientId, PeerIp), ok ?= check_user_loopback(Username, PeerIp), + ok ?= ensure_credential_expiry_timer(User, PeerIp), rabbit_core_metrics:auth_attempt_succeeded(PeerIp, Username, mqtt), ok = register_client_id(VHost, ClientId, CleanStart, WillProps), {ok, WillMsg} ?= make_will_msg(Packet), @@ -1086,6 +1087,27 @@ check_user_loopback(Username, PeerIp) -> {error, ?RC_NOT_AUTHORIZED} end. + +ensure_credential_expiry_timer(User = #user{username = Username}, PeerIp) -> + case rabbit_access_control:expiry_timestamp(User) of + never -> + ok; + Ts when is_integer(Ts) -> + Time = (Ts - os:system_time(second)) * 1000, + ?LOG_DEBUG("Credential expires in ~b ms frow now " + "(absolute timestamp = ~b seconds since epoch)", + [Time, Ts]), + case Time > 0 of + true -> + _TimerRef = erlang:send_after(Time, self(), credential_expired), + ok; + false -> + auth_attempt_failed(PeerIp, Username), + ?LOG_WARNING("Credential expired ~b ms ago", [abs(Time)]), + {error, ?RC_NOT_AUTHORIZED} + end + end. + get_vhost(UserBin, none, Port) -> get_vhost_no_ssl(UserBin, Port); get_vhost(UserBin, SslLogin, Port) -> diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index 6af0d577e44c..c37a6e0ef64e 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -121,7 +121,8 @@ handle_cast({duplicate_id, SendWill}, {stop, {shutdown, duplicate_id}, {SendWill, State}}; handle_cast({close_connection, Reason}, - State = #state{conn_name = ConnName, proc_state = PState}) -> + State = #state{conn_name = ConnName, + proc_state = PState}) -> ?LOG_WARNING("MQTT disconnecting client ~tp with client ID '~ts', reason: ~ts", [ConnName, rabbit_mqtt_processor:info(client_id, PState), Reason]), case Reason of @@ -209,6 +210,14 @@ handle_info({keepalive, Req}, State = #state{proc_state = PState, {stop, Reason, State} end; +handle_info(credential_expired, + State = #state{conn_name = ConnName, + proc_state = PState}) -> + ?LOG_WARNING("MQTT disconnecting client ~tp with client ID '~ts' because credential expired", + [ConnName, rabbit_mqtt_processor:info(client_id, PState)]), + rabbit_mqtt_processor:send_disconnect(?RC_MAXIMUM_CONNECT_TIME, PState), + {stop, {shutdown, {disconnect, server_initiated}}, State}; + handle_info(login_timeout, State = #state{proc_state = connect_packet_unprocessed, conn_name = ConnName}) -> %% The connection is also closed if the CONNECT packet happens to diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl index 176a29e86842..67e99400b500 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl @@ -176,8 +176,9 @@ websocket_info({'$gen_cast', {duplicate_id, SendWill}}, rabbit_mqtt_processor:send_disconnect(?RC_SESSION_TAKEN_OVER, ProcState), defer_close(?CLOSE_NORMAL, SendWill), {[], State}; -websocket_info({'$gen_cast', {close_connection, Reason}}, State = #state{proc_state = ProcState, - conn_name = ConnName}) -> +websocket_info({'$gen_cast', {close_connection, Reason}}, + State = #state{proc_state = ProcState, + conn_name = ConnName}) -> ?LOG_WARNING("Web MQTT disconnecting client with ID '~s' (~p), reason: ~s", [rabbit_mqtt_processor:info(client_id, ProcState), ConnName, Reason]), case Reason of @@ -215,6 +216,14 @@ websocket_info({keepalive, Req}, State = #state{proc_state = ProcState, [ConnName, Reason]), stop(State) end; +websocket_info(credential_expired, + State = #state{proc_state = ProcState, + conn_name = ConnName}) -> + ?LOG_WARNING("Web MQTT disconnecting client with ID '~s' (~p) because credential expired", + [rabbit_mqtt_processor:info(client_id, ProcState), ConnName]), + rabbit_mqtt_processor:send_disconnect(?RC_MAXIMUM_CONNECT_TIME, ProcState), + defer_close(?CLOSE_NORMAL), + {[], State}; websocket_info(emit_stats, State) -> {[], emit_stats(State), hibernate}; websocket_info({{'DOWN', _QName}, _MRef, process, _Pid, _Reason} = Evt,