Skip to content

Commit a0c7725

Browse files
authored
Merge pull request #148 from mutouyun/yonker-yk-master
Yonker yk master
2 parents 120d85a + a1cdc9a commit a0c7725

File tree

3 files changed

+23
-6
lines changed

3 files changed

+23
-6
lines changed

src/libipc/circ/elem_def.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ class conn_head<P, true> : public conn_head_base {
7474
return this->cc_.fetch_and(~cc_id, std::memory_order_acq_rel) & ~cc_id;
7575
}
7676

77+
bool connected(cc_t cc_id) const noexcept {
78+
return (this->connections() & cc_id) != 0;
79+
}
80+
7781
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
7882
cc_t cur = this->cc_.load(order);
7983
cc_t cnt; // accumulates the total bits set in cc
@@ -100,6 +104,11 @@ class conn_head<P, false> : public conn_head_base {
100104
}
101105
}
102106

107+
bool connected(cc_t cc_id) const noexcept {
108+
// In non-broadcast mode, connection tags are only used for counting.
109+
return (this->connections() != 0) && (cc_id != 0);
110+
}
111+
103112
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
104113
return this->connections(order);
105114
}

src/libipc/ipc.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,10 @@ static ipc::buff_t recv(ipc::handle_t h, std::uint64_t tm) {
627627
for (;;) {
628628
// pop a new message
629629
typename queue_t::value_t msg {};
630-
if (!wait_for(inf->rd_waiter_, [que, &msg] {
630+
if (!wait_for(inf->rd_waiter_, [que, &msg, &h] {
631+
if (!que->connected()) {
632+
reconnect(&h, true);
633+
}
631634
return !que->pop(msg);
632635
}, tm)) {
633636
// pop failed, just return.

src/libipc/queue.h

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ class queue_conn {
6363
shm::handle::clear_storage(name);
6464
}
6565

66-
bool connected() const noexcept {
67-
return connected_ != 0;
66+
template <typename Elems>
67+
bool connected(Elems* elems) const noexcept {
68+
return elems->connected(connected_);
6869
}
6970

7071
circ::cc_t connected_id() const noexcept {
@@ -77,16 +78,16 @@ class queue_conn {
7778
-> std::tuple<bool, bool, decltype(std::declval<Elems>().cursor())> {
7879
if (elems == nullptr) return {};
7980
// if it's already connected, just return
80-
if (connected()) return {connected(), false, 0};
81+
if (connected(elems)) return {connected(elems), false, 0};
8182
connected_ = elems->connect_receiver();
82-
return {connected(), true, elems->cursor()};
83+
return {connected(elems), true, elems->cursor()};
8384
}
8485

8586
template <typename Elems>
8687
bool disconnect(Elems* elems) noexcept {
8788
if (elems == nullptr) return false;
8889
// if it's already disconnected, just return false
89-
if (!connected()) return false;
90+
if (!connected(elems)) return false;
9091
elems->disconnect_receiver(std::exchange(connected_, 0));
9192
return true;
9293
}
@@ -150,6 +151,10 @@ class queue_base : public queue_conn {
150151
elems_->disconnect_sender();
151152
}
152153

154+
bool connected() const noexcept {
155+
return base_t::connected(elems_);
156+
}
157+
153158
bool connect() noexcept {
154159
auto tp = base_t::connect(elems_);
155160
if (std::get<0>(tp) && std::get<1>(tp)) {

0 commit comments

Comments
 (0)