Skip to content

Yonker yk master #148

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 2 commits into from
May 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/libipc/circ/elem_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ class conn_head<P, true> : public conn_head_base {
return this->cc_.fetch_and(~cc_id, std::memory_order_acq_rel) & ~cc_id;
}

bool connected(cc_t cc_id) const noexcept {
return (this->connections() & cc_id) != 0;
}

std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
cc_t cur = this->cc_.load(order);
cc_t cnt; // accumulates the total bits set in cc
Expand All @@ -100,6 +104,11 @@ class conn_head<P, false> : public conn_head_base {
}
}

bool connected(cc_t cc_id) const noexcept {
// In non-broadcast mode, connection tags are only used for counting.
return (this->connections() != 0) && (cc_id != 0);
}

std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
return this->connections(order);
}
Expand Down
5 changes: 4 additions & 1 deletion src/libipc/ipc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,10 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
for (;;) {
// pop a new message
typename queue_t::value_t msg {};
if (!wait_for(inf->rd_waiter_, [que, &msg] {
if (!wait_for(inf->rd_waiter_, [que, &msg, &h] {
if (!que->connected()) {
reconnect(&h, true);
}
return !que->pop(msg);
}, tm)) {
// pop failed, just return.
Expand Down
15 changes: 10 additions & 5 deletions src/libipc/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ class queue_conn {
shm::handle::clear_storage(name);
}

bool connected() const noexcept {
return connected_ != 0;
template <typename Elems>
bool connected(Elems* elems) const noexcept {
return elems->connected(connected_);
}

circ::cc_t connected_id() const noexcept {
Expand All @@ -77,16 +78,16 @@ class queue_conn {
-> std::tuple<bool, bool, decltype(std::declval<Elems>().cursor())> {
if (elems == nullptr) return {};
// if it's already connected, just return
if (connected()) return {connected(), false, 0};
if (connected(elems)) return {connected(elems), false, 0};
connected_ = elems->connect_receiver();
return {connected(), true, elems->cursor()};
return {connected(elems), true, elems->cursor()};
}

template <typename Elems>
bool disconnect(Elems* elems) noexcept {
if (elems == nullptr) return false;
// if it's already disconnected, just return false
if (!connected()) return false;
if (!connected(elems)) return false;
elems->disconnect_receiver(std::exchange(connected_, 0));
return true;
}
Expand Down Expand Up @@ -150,6 +151,10 @@ class queue_base : public queue_conn {
elems_->disconnect_sender();
}

bool connected() const noexcept {
return base_t::connected(elems_);
}

bool connect() noexcept {
auto tp = base_t::connect(elems_);
if (std::get<0>(tp) && std::get<1>(tp)) {
Expand Down