-
Notifications
You must be signed in to change notification settings - Fork 900
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
[C++] LogBufferDescriptor::indexByPosition triggers integer overflow #579
Comments
Code to reproduce: #include <cassert>
#include <thread>
#include <type_traits>
#include <Aeron.h>
constexpr auto message_size = 1024u;
constexpr auto payload_size = message_size - 32u;
constexpr auto msg_count_until_max_pos = 100u;
constexpr int64_t get_max_position(const unsigned int term_buf_len_pow) {
constexpr auto max_pos_mult_pow = 31;
return 1L << (term_buf_len_pow + max_pos_mult_pow);
}
auto offer(aeron::ExclusivePublication& pub, const unsigned int value) {
uint8_t buf[payload_size];
aeron::concurrent::AtomicBuffer transport_buf{buf, payload_size};
transport_buf.overlayStruct<std::remove_const_t<decltype(value)>>(0) = value;
auto result = pub.offer(transport_buf);
while (result == aeron::BACK_PRESSURED)
result = pub.offer(transport_buf);
return result;
}
auto claim(aeron::ExclusivePublication& pub, const unsigned int value) {
aeron::concurrent::logbuffer::BufferClaim claim;
auto result = pub.tryClaim(payload_size, claim);
while (result == aeron::BACK_PRESSURED)
result = pub.tryClaim(payload_size, claim);
if (result) {
claim.buffer().overlayStruct<std::remove_const_t<decltype(value)>>(claim.offset()) = value;
claim.commit();
}
return result;
}
template <typename publish_method_t>
void hit_max_position(
aeron::ExclusivePublication& pub,
aeron::Subscription& sub,
const int64_t initial_position,
const int64_t max_position,
publish_method_t&& publish) noexcept {
auto send_value = 0u;
auto expected_rcv_value = 0u;
auto expected_position = initial_position;
while (send_value != msg_count_until_max_pos) {
auto result = publish(pub, send_value++);
expected_position += message_size;
assert(result == expected_position);
sub.poll([&expected_rcv_value] (
const aeron::concurrent::AtomicBuffer& buffer,
aeron::util::index_t offset,
aeron::util::index_t,
const aeron::Header&) noexcept {
assert(buffer.template overlayStruct<decltype(expected_rcv_value)>(offset) == expected_rcv_value++);
}, 1);
}
assert(expected_position == max_position);
auto result = publish(pub, send_value);
assert(result == aeron::MAX_POSITION_EXCEEDED);
while (expected_rcv_value != send_value) {
sub.poll([&expected_rcv_value] (
const aeron::concurrent::AtomicBuffer& buffer,
aeron::util::index_t offset,
aeron::util::index_t,
const aeron::Header&) noexcept {
assert(buffer.template overlayStruct<decltype(expected_rcv_value)>(offset) == expected_rcv_value++);
}, 1);
}
};
template <typename publish_method_t>
void check_mitigation(
aeron::ExclusivePublication& pub,
aeron::Subscription& sub,
publish_method_t&& publish) noexcept {
auto send_value = 0u;
auto expected_rcv_value = 0u;
int64_t expected_position = 0;
while (send_value != msg_count_until_max_pos) {
auto result = publish(pub, send_value++);
expected_position += message_size;
assert(result == expected_position);
sub.poll([&expected_rcv_value] (
const aeron::concurrent::AtomicBuffer& buffer,
aeron::util::index_t offset,
aeron::util::index_t,
const aeron::Header&) noexcept {
assert(buffer.template overlayStruct<decltype(expected_rcv_value)>(offset) == expected_rcv_value++);
}, 1);
}
while (expected_rcv_value != send_value) {
sub.poll([&expected_rcv_value] (
const aeron::concurrent::AtomicBuffer& buffer,
aeron::util::index_t offset,
aeron::util::index_t,
const aeron::Header&) noexcept {
assert(buffer.template overlayStruct<decltype(expected_rcv_value)>(offset) == expected_rcv_value++);
}, 1);
}
}
int main() {
constexpr auto max_position = get_max_position(26);
constexpr int64_t offset = message_size * msg_count_until_max_pos;
constexpr auto initial_position = max_position - offset;
aeron::Context c;
c.aeronDir("/dev/shm/transport_test_max_position_exceeded");
aeron::Aeron client(c);
auto pub_reg_id = client.addExclusivePublication("aeron:ipc?init-term-id=0|term-length=67108864|term-offset=67006464|term-id=2147483647", 0);
auto sub_reg_id = client.addSubscription(
"aeron:ipc",
0,
[] (aeron::Image& i) {std::cout << "up " << i.sessionId() << std::endl; },
[] (aeron::Image& i) {std::cout << "down " << i.sessionId() << std::endl; });
auto pub = client.findExclusivePublication(pub_reg_id);
while (!pub) {
std::this_thread::yield();
pub = client.findExclusivePublication(pub_reg_id);
}
while (!pub->isConnected())
std::this_thread::yield();
auto sub = client.findSubscription(sub_reg_id);
while (!sub) {
std::this_thread::yield();
sub = client.findSubscription(sub_reg_id);
}
hit_max_position(*pub, *sub, initial_position, max_position, offer);
pub_reg_id = client.addExclusivePublication("aeron:ipc", 0);
pub = client.findExclusivePublication(pub_reg_id);
while (!pub) {
std::this_thread::yield();
pub = client.findExclusivePublication(pub_reg_id);
}
while (!pub->isConnected())
std::this_thread::yield();
check_mitigation(*pub, *sub, offer);
} |
For convenience, I added the code above as a system test. https://github.com/denizevrenci/aeron/tree/max_pos_exceeded_test |
Ok, found the issue. It isn't a race condition so I will change the issue title. |
When I reassign a publication to the same channel by
Thereby, the previous publication object managed by the
shared_ptr
is destroyed and a new object is started to be managed by theshared_ptr
. The subscription side receives the new image event before the unavailable image event.During this time, if I poll the subscription, the soon-to-be-unavailable image is still accessible by the subscription. However, when
poll
traverses up the stack, it seems to try to access destructed memory. Below is the relevant part of the stack trace.Could the conductor thread be cleaning up the term buffer while the main thread is making a call to
Subscription::poll
?The text was updated successfully, but these errors were encountered: