Skip to content

Commit

Permalink
RI Basis AMQP Receiver Buffering Fix (#216)
Browse files Browse the repository at this point in the history
  • Loading branch information
pablohoch authored Mar 2, 2022
1 parent 5760584 commit 417149a
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .pkg
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,4 @@
[rabbitmq-c]
url=git@github.com:motis-project/rabbitmq-c.git
branch=master
commit=087417f9a05212a61eb73c1691316898ceebfb46
commit=5e6efe17f52a66da36994d1c6ae28e1280e0bc4f
6 changes: 3 additions & 3 deletions .pkg.lock
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
1454394902606524866
3330282991838792458
cista 32e52a0082e89986bb86afad513654fa6a6c1412
zlib 1e1dfdedddb54a2e2cb8fec3b67f925233c495aa
boost c90d53bdcd7ff741a416ae122b33c9c2a96e8be7
Expand All @@ -24,7 +24,7 @@ lmdb 9bd01f14f549d8202413c4cd5f49b066b0a22b66
mimalloc 076f815cece59e0a0ee08237c8fbba75465452b6
miniz 1edbdece9d71dc65c6ff405572ee37cbdcef7af4
libressl 502ef24e637b026290d641528c4fa82267033d36
net 9ea612ff034accbd01825959496ee03a3fece1ef
net 126869fa3a2ea5c3024c44c180daf4031f802853
lua 7bb93325b26f84c7e8b51fcbd857361ce7605a1d
luabind 9223568bbcf818ecfb1001d49f567627ee10852a
tbb 5ce3e80d2b8262bf853c20a8fbaadeee3119eef5
Expand All @@ -34,7 +34,7 @@ rapidjson e4a599d2b5dec065b1ea2af5d8dac52baa9df5f5
ppr 3cf8bc12ca2d1f74f61fb7cd56606c8fe4d4da77
protobuf 297171ade5e9bd01d823ffe8b203a5443ec03f15
pugixml 60175e80e2f5e97e027ac78f7e14c5acc009ce50
rabbitmq-c 087417f9a05212a61eb73c1691316898ceebfb46
rabbitmq-c 5e6efe17f52a66da36994d1c6ae28e1280e0bc4f
zstd bce26304a57dd504ae7ae51f384bf9292d7e3acb
tar 3a08b6575eb6a04e6b3d0977e6da3b61a91d62f2
clipper 904f0e6644c7f01c176443613be8f7788d59c658
Expand Down
12 changes: 5 additions & 7 deletions modules/ris/src/ris.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,15 @@ struct ris::impl {
&config_.rabbitmq_, [](std::string const& log_msg) {
LOG(info) << "rabbitmq: " << log_msg;
});
ribasis_receiver_->run([this, d, sched, last = now(),
buffer = std::vector<amqp::msg>{}](
ribasis_receiver_->run([this, d, sched, buffer = std::vector<amqp::msg>{}](
amqp::msg const& m) mutable {
buffer.emplace_back(m);

if (auto const n = now(); (n - last) > config_.update_interval_) {
if (auto const n = now();
(n - ribasis_receiver_last_update_) < config_.update_interval_) {
return;
} else {
last = n;
ribasis_receiver_last_update_ = n;

auto msgs_copy = buffer;
buffer.clear();
Expand Down Expand Up @@ -955,6 +955,7 @@ struct ris::impl {
}

std::unique_ptr<amqp::ssl_connection> ribasis_receiver_;
unixtime ribasis_receiver_last_update_{now()};

db::env env_;
std::mutex min_max_mutex_;
Expand Down Expand Up @@ -985,9 +986,6 @@ ris::ris() : module("RIS", "ris") {
param(config_.rabbitmq_.user_, "rabbitmq.username", "RabbitMQ username");
param(config_.rabbitmq_.pw_, "rabbitmq.password", "RabbitMQ password");
param(config_.rabbitmq_.vhost_, "rabbitmq.vhost", "RabbitMQ vhost");
param(config_.rabbitmq_.exchange_, "rabbitmq.exchange", "RabbitMQ exchange");
param(config_.rabbitmq_.routing_key_, "rabbitmq.routing_key",
"RabbitMQ routing key");
param(config_.rabbitmq_.queue_, "rabbitmq.queue", "RabbitMQ queue name");
param(config_.rabbitmq_.ca_, "rabbitmq.ca", "RabbitMQ path to CA file");
param(config_.rabbitmq_.cert_, "rabbitmq.cert",
Expand Down

0 comments on commit 417149a

Please # to comment.