Skip to content

Commit

Permalink
Merge pull request #405 from redboltz/remove_broker_session_lock
Browse files Browse the repository at this point in the history
Removed sessions serarch code broker the broker.
  • Loading branch information
redboltz authored Mar 1, 2025
2 parents 77c19ef + 68a85dd commit 7b9b067
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 100 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* Refined CI. #401
* Refined log level. #399, #402
* Refined documentr. Added coding rule to navigation bar. #396
* Refined broker. #395, #398
* Refined broker. #395, #398, #405
* Changed the broker auth JSON file comment syntax. #394
** Now supports C/C++-style line and block comments.
** The `#` line comment syntax is no longer supported.
Expand Down
118 changes: 21 additions & 97 deletions tool/include/broker/broker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,7 @@ class broker {
<< ASYNC_MQTT_ADD_VALUE(address, epsp.get_address())
<< "cid:" << client_id
<< " new connection inserted.";
it = idx.emplace_hint(
it,
auto ss =
session_state<epsp_type>::create(
mtx_subs_map_,
subs_map_,
Expand All @@ -419,7 +418,11 @@ class broker {
clean_start,
force_move(will_expiry_interval),
force_move(session_expiry_interval)
)
);
epsp.set_session_state(*ss);
it = idx.emplace_hint(
it,
ss
);
if (response_topic_requested) {
// set_response_topic never modify key part
Expand Down Expand Up @@ -483,7 +486,7 @@ class broker {
<< "cid:" << client_id
<< "online connection exists, discard old one due to session_expiry and renew";
bool inserted;
std::tie(it, inserted) = idx.emplace(
auto ss =
session_state<epsp_type>::create(
mtx_subs_map_,
subs_map_,
Expand All @@ -499,7 +502,10 @@ class broker {
clean_start,
force_move(will_expiry_interval),
force_move(session_expiry_interval)
)
);
epsp.set_session_state(*ss);
std::tie(it, inserted) = idx.emplace(
ss
);
BOOST_ASSERT(inserted);
if (response_topic_requested) {
Expand Down Expand Up @@ -565,6 +571,7 @@ class broker {
will_expiry_interval,
session_expiry_interval
);
epsp.set_session_state(*e);
},
[](auto&) { BOOST_ASSERT(false); }
);
Expand Down Expand Up @@ -618,6 +625,7 @@ class broker {
will_expiry_interval,
force_move(session_expiry_interval)
);
epsp.set_session_state(*e);
},
[](auto&) { BOOST_ASSERT(false); }
);
Expand Down Expand Up @@ -885,16 +893,7 @@ class broker {
}
);

std::shared_lock<mutex> g(mtx_sessions_);
auto& idx = sessions_.template get<tag_con>();
auto it = idx.find(epsp);

// broker uses async_* APIs
// If broker erase a connection, then async_force_disconnect()
// and/or async_force_disconnect () is called.
// During async operation, spep is valid but it has already been
// erased from sessions_
if (it == idx.end()) return;
auto& ss = *epsp.get_session_state();

auto send_pubres =
[&] (bool authorized, bool matched) {
Expand Down Expand Up @@ -1071,7 +1070,7 @@ class broker {
// See if this session is authorized to publish this topic
if ([&] {
std::shared_lock<mutex> g_sec{mtx_security_};
return security_.auth_pub(topic, (*it)->get_username()) != security::authorization::type::allow;
return security_.auth_pub(topic, ss.get_username()) != security::authorization::type::allow;
} ()
) {
// Publish not authorized
Expand Down Expand Up @@ -1103,7 +1102,7 @@ class broker {
}

bool matched = do_publish(
**it,
ss,
force_move(topic),
force_move(payload),
opts.get_qos() | opts.get_retain(), // remove dup flag
Expand Down Expand Up @@ -1294,20 +1293,7 @@ class broker {
}
);

std::shared_lock<mutex> g(mtx_sessions_);
auto& idx = sessions_.template get<tag_con>();
auto it = idx.find(epsp);

// broker uses async_* APIs
// If broker erase a connection, then async_force_disconnect()
// and/or async_force_disconnect () is called.
// During async operation, spep is valid but it has already been
// erased from sessions_
if (it == idx.end()) return;

// const_cast is appropriate here
// See https://github.com/boostorg/multi_index/issues/50
auto& ss = const_cast<session_state<epsp_type>&>(**it);
auto& ss = *epsp.get_session_state();
ss.erase_inflight_message_by_packet_id(packet_id);
ss.send_offline_messages_by_packet_id_release();
}
Expand All @@ -1324,20 +1310,7 @@ class broker {
}
);

std::shared_lock<mutex> g(mtx_sessions_);
auto& idx = sessions_.template get<tag_con>();
auto it = idx.find(epsp);

// broker uses async_* APIs
// If broker erase a connection, then async_force_disconnect()
// and/or async_force_disconnect () is called.
// During async operation, spep is valid but it has already been
// erased from sessions_
if (it == idx.end()) return;

// const_cast is appropriate here
// See https://github.com/boostorg/multi_index/issues/50
auto& ss = const_cast<session_state<epsp_type>&>(**it);
auto& ss = *epsp.get_session_state();

if (make_error_code(reason_code)) return;
auto rc =
Expand Down Expand Up @@ -1430,17 +1403,6 @@ class broker {
}
);

std::shared_lock<mutex> g(mtx_sessions_);
auto& idx = sessions_.template get<tag_con>();
auto it = idx.find(epsp);

// broker uses async_* APIs
// If broker erase a connection, then async_force_disconnect()
// and/or async_force_disconnect () is called.
// During async operation, spep is valid but it has already been
// erased from sessions_
if (it == idx.end()) return;

switch (epsp.get_protocol_version()) {
case protocol_version::v3_1_1:
epsp.async_send(
Expand Down Expand Up @@ -1518,20 +1480,8 @@ class broker {
}
);

std::shared_lock<mutex> g(mtx_sessions_);
auto& idx = sessions_.template get<tag_con>();
auto it = idx.find(epsp);

// broker uses async_* APIs
// If broker erase a connection, then async_force_disconnect()
// and/or async_force_disconnect () is called.
// During async operation, spep is valid but it has already been
// erased from sessions_
if (it == idx.end()) return;

// const_cast is appropriate here
// See https://github.com/boostorg/multi_index/issues/50
auto& ss = const_cast<session_state<epsp_type>&>(**it);
auto& ss = *epsp.get_session_state();
ss.erase_inflight_message_by_packet_id(packet_id);
ss.send_offline_messages_by_packet_id_release();
}
Expand All @@ -1547,25 +1497,13 @@ class broker {
async_read_packet(force_move(epsp));
}
);
std::shared_lock<mutex> g(mtx_sessions_);
auto& idx = sessions_.template get<tag_con>();
auto it = idx.find(epsp);

// broker uses async_* APIs
// If broker erase a connection, then async_force_disconnect()
// and/or async_force_disconnect () is called.
// During async operation, spep is valid but it has already been
// erased from sessions_
if (it == idx.end()) return;

// The element of sessions_ must have longer lifetime
// than corresponding subscription.
// Because the subscription store the reference of the element.
std::optional<session_state_ref<epsp_type>> ssr_opt;

// const_cast is appropriate here
// See https://github.com/boostorg/multi_index/issues/50
auto& ss = const_cast<session_state<epsp_type>&>(**it);
auto& ss = *epsp.get_session_state();
ssr_opt.emplace(ss);

BOOST_ASSERT(ssr_opt);
Expand Down Expand Up @@ -1761,26 +1699,12 @@ class broker {
}
);


std::shared_lock<mutex> g(mtx_sessions_);
auto& idx = sessions_.template get<tag_con>();
auto it = idx.find(epsp);

// broker uses async_* APIs
// If broker erase a connection, then async_force_disconnect()
// and/or async_force_disconnect () is called.
// During async operation, spep is valid but it has already been
// erased from sessions_
if (it == idx.end()) return;

// The element of sessions_ must have longer lifetime
// than corresponding subscription.
// Because the subscription store the reference of the element.
std::optional<session_state_ref<epsp_type>> ssr_opt;

// const_cast is appropriate here
// See https://github.com/boostorg/multi_index/issues/50
auto& ss = const_cast<session_state<epsp_type>&>(**it);
auto& ss = *epsp.get_session_state();
ssr_opt.emplace(ss);

BOOST_ASSERT(ssr_opt);
Expand Down
10 changes: 10 additions & 0 deletions tool/include/broker/endpoint_variant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <memory>
#include <async_mqtt/asio_bind/endpoint.hpp>
#include <async_mqtt/protocol/packet/packet_id_type.hpp>
#include <broker/session_state_fwd.hpp>

namespace async_mqtt {

Expand Down Expand Up @@ -398,11 +399,20 @@ class epsp_wrap {
return epsp_;
}

session_state<this_type>* get_session_state() const {
return session_state_;
}

void set_session_state(session_state<this_type>& ss) {
session_state_ = &ss;
}

private:
epsp_type epsp_;
std::string client_id_;
std::optional<std::string> preauthed_user_name_;
mutable std::optional<protocol_version> protocol_version_;
session_state<this_type>* session_state_ = nullptr;
};

} // namespace async_mqtt
Expand Down
9 changes: 7 additions & 2 deletions tool/include/broker/session_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,7 @@ struct session_state : std::enable_shared_from_this<session_state<Sp>> {
}
};

std::lock_guard<mutex> g(mtx_offline_messages_);
if (offline_messages_.empty()) {
if (offline_messages_empty_) {
auto qos_value = pubopts.get_qos();
if (qos_value == qos::at_least_once ||
qos_value == qos::exactly_once) {
Expand All @@ -306,6 +305,7 @@ struct session_state : std::enable_shared_from_this<session_state<Sp>> {
}

// offline_messages_ is not empty or packet_id_exhausted
std::lock_guard<mutex> g(mtx_offline_messages_);
offline_messages_.push_back(
exe_,
force_move(pub_topic),
Expand Down Expand Up @@ -339,6 +339,7 @@ struct session_state : std::enable_shared_from_this<session_state<Sp>> {
pubopts,
force_move(props)
);
offline_messages_empty_ = false;
}
}

Expand All @@ -360,6 +361,7 @@ struct session_state : std::enable_shared_from_this<session_state<Sp>> {
{
std::lock_guard<mutex> g(mtx_offline_messages_);
offline_messages_.clear();
offline_messages_empty_ = true;
}
unsubscribe_all();
shared_targets_.erase(*this);
Expand Down Expand Up @@ -553,13 +555,15 @@ struct session_state : std::enable_shared_from_this<session_state<Sp>> {
if (auto epsp = lock()) {
std::lock_guard<mutex> g(mtx_offline_messages_);
offline_messages_.send_until_fail(epsp, get_protocol_version());
offline_messages_empty_ = offline_messages_.empty();
}
}

void send_offline_messages_by_packet_id_release() {
if (auto epsp = lock()) {
std::lock_guard<mutex> g(mtx_offline_messages_);
offline_messages_.send_until_fail(epsp, get_protocol_version());
offline_messages_empty_ = offline_messages_.empty();
}
}

Expand Down Expand Up @@ -753,6 +757,7 @@ struct session_state : std::enable_shared_from_this<session_state<Sp>> {

mutable mutex mtx_offline_messages_;
offline_messages offline_messages_;
std::atomic<bool> offline_messages_empty_ = true;

using elem_type = typename sub_con_map<epsp_type>::handle;
std::set<elem_type> handles_; // to efficient remove
Expand Down

0 comments on commit 7b9b067

Please # to comment.