From 417149ada64503c2abfb8329c22bd0da3a6b6646 Mon Sep 17 00:00:00 2001 From: Pablo Hoch Date: Wed, 2 Mar 2022 13:33:46 +0100 Subject: [PATCH] RI Basis AMQP Receiver Buffering Fix (#216) --- .pkg | 2 +- .pkg.lock | 6 +++--- modules/ris/src/ris.cc | 12 +++++------- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/.pkg b/.pkg index 32c799ad4..9afd2276d 100644 --- a/.pkg +++ b/.pkg @@ -93,4 +93,4 @@ [rabbitmq-c] url=git@github.com:motis-project/rabbitmq-c.git branch=master - commit=087417f9a05212a61eb73c1691316898ceebfb46 + commit=5e6efe17f52a66da36994d1c6ae28e1280e0bc4f diff --git a/.pkg.lock b/.pkg.lock index 9ee4ca8bc..07cb9cf3a 100644 --- a/.pkg.lock +++ b/.pkg.lock @@ -1,4 +1,4 @@ -1454394902606524866 +3330282991838792458 cista 32e52a0082e89986bb86afad513654fa6a6c1412 zlib 1e1dfdedddb54a2e2cb8fec3b67f925233c495aa boost c90d53bdcd7ff741a416ae122b33c9c2a96e8be7 @@ -24,7 +24,7 @@ lmdb 9bd01f14f549d8202413c4cd5f49b066b0a22b66 mimalloc 076f815cece59e0a0ee08237c8fbba75465452b6 miniz 1edbdece9d71dc65c6ff405572ee37cbdcef7af4 libressl 502ef24e637b026290d641528c4fa82267033d36 -net 9ea612ff034accbd01825959496ee03a3fece1ef +net 126869fa3a2ea5c3024c44c180daf4031f802853 lua 7bb93325b26f84c7e8b51fcbd857361ce7605a1d luabind 9223568bbcf818ecfb1001d49f567627ee10852a tbb 5ce3e80d2b8262bf853c20a8fbaadeee3119eef5 @@ -34,7 +34,7 @@ rapidjson e4a599d2b5dec065b1ea2af5d8dac52baa9df5f5 ppr 3cf8bc12ca2d1f74f61fb7cd56606c8fe4d4da77 protobuf 297171ade5e9bd01d823ffe8b203a5443ec03f15 pugixml 60175e80e2f5e97e027ac78f7e14c5acc009ce50 -rabbitmq-c 087417f9a05212a61eb73c1691316898ceebfb46 +rabbitmq-c 5e6efe17f52a66da36994d1c6ae28e1280e0bc4f zstd bce26304a57dd504ae7ae51f384bf9292d7e3acb tar 3a08b6575eb6a04e6b3d0977e6da3b61a91d62f2 clipper 904f0e6644c7f01c176443613be8f7788d59c658 diff --git a/modules/ris/src/ris.cc b/modules/ris/src/ris.cc index 5ad730a31..a996fb051 100644 --- a/modules/ris/src/ris.cc +++ b/modules/ris/src/ris.cc @@ -230,15 +230,15 @@ struct ris::impl { &config_.rabbitmq_, [](std::string const& log_msg) { LOG(info) << "rabbitmq: " << log_msg; }); - ribasis_receiver_->run([this, d, sched, last = now(), - buffer = std::vector{}]( + ribasis_receiver_->run([this, d, sched, buffer = std::vector{}]( amqp::msg const& m) mutable { buffer.emplace_back(m); - if (auto const n = now(); (n - last) > config_.update_interval_) { + if (auto const n = now(); + (n - ribasis_receiver_last_update_) < config_.update_interval_) { return; } else { - last = n; + ribasis_receiver_last_update_ = n; auto msgs_copy = buffer; buffer.clear(); @@ -955,6 +955,7 @@ struct ris::impl { } std::unique_ptr ribasis_receiver_; + unixtime ribasis_receiver_last_update_{now()}; db::env env_; std::mutex min_max_mutex_; @@ -985,9 +986,6 @@ ris::ris() : module("RIS", "ris") { param(config_.rabbitmq_.user_, "rabbitmq.username", "RabbitMQ username"); param(config_.rabbitmq_.pw_, "rabbitmq.password", "RabbitMQ password"); param(config_.rabbitmq_.vhost_, "rabbitmq.vhost", "RabbitMQ vhost"); - param(config_.rabbitmq_.exchange_, "rabbitmq.exchange", "RabbitMQ exchange"); - param(config_.rabbitmq_.routing_key_, "rabbitmq.routing_key", - "RabbitMQ routing key"); param(config_.rabbitmq_.queue_, "rabbitmq.queue", "RabbitMQ queue name"); param(config_.rabbitmq_.ca_, "rabbitmq.ca", "RabbitMQ path to CA file"); param(config_.rabbitmq_.cert_, "rabbitmq.cert",