From cfe4a25ff07ae74924687d77212f3c4d1c460abf Mon Sep 17 00:00:00 2001 From: Christian Menard Date: Tue, 29 Nov 2022 11:48:30 +0100 Subject: [PATCH 1/7] allow specifying a timeout and store the timeout in the environment --- include/reactor-cpp/environment.hh | 9 +++++++-- lib/environment.cc | 1 + 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/include/reactor-cpp/environment.hh b/include/reactor-cpp/environment.hh index 8679f26f..9acaa30c 100644 --- a/include/reactor-cpp/environment.hh +++ b/include/reactor-cpp/environment.hh @@ -13,6 +13,7 @@ #include #include +#include "reactor-cpp/time.hh" #include "reactor.hh" #include "scheduler.hh" @@ -42,16 +43,19 @@ private: Phase phase_{Phase::Construction}; TimePoint start_time_{}; + const Duration timeout_{}; + void build_dependency_graph(Reactor* reactor); void calculate_indexes(); public: explicit Environment(unsigned int num_workers, bool run_forever = default_run_forever, - bool fast_fwd_execution = default_fast_fwd_execution) + bool fast_fwd_execution = default_fast_fwd_execution, const Duration& timeout = Duration::max()) : num_workers_(num_workers) , run_forever_(run_forever) , fast_fwd_execution_(fast_fwd_execution) - , scheduler_(this) {} + , scheduler_(this) + , timeout_(timeout) {} void register_reactor(Reactor* reactor); void assemble(); @@ -76,6 +80,7 @@ public: [[nodiscard]] auto logical_time() const noexcept -> const LogicalTime& { return scheduler_.logical_time(); } [[nodiscard]] auto start_time() const noexcept -> const TimePoint& { return start_time_; } + [[nodiscard]] auto timeout() const noexcept -> const Duration& { return timeout_; } static auto physical_time() noexcept -> TimePoint { return get_physical_time(); } diff --git a/lib/environment.cc b/lib/environment.cc index 767acc59..d6589204 100644 --- a/lib/environment.cc +++ b/lib/environment.cc @@ -17,6 +17,7 @@ #include "reactor-cpp/logging.hh" #include "reactor-cpp/port.hh" #include "reactor-cpp/reaction.hh" +#include "reactor-cpp/time.hh" namespace reactor { From 30c96e24bd7954c888aaeb8238fd3c7372d122de Mon Sep 17 00:00:00 2001 From: Christian Menard Date: Wed, 14 Dec 2022 14:46:36 +0100 Subject: [PATCH 2/7] implement timeout natively --- examples/hello/main.cc | 3 ++- include/reactor-cpp/action.hh | 14 +++++--------- lib/action.cc | 11 ++++++++++- lib/environment.cc | 4 ++-- 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/examples/hello/main.cc b/examples/hello/main.cc index 478d43c2..e913b2c3 100644 --- a/examples/hello/main.cc +++ b/examples/hello/main.cc @@ -1,5 +1,6 @@ #include +#include "reactor-cpp/action.hh" #include "reactor-cpp/reactor-cpp.hh" using namespace reactor; @@ -9,7 +10,7 @@ class Hello : public Reactor { private: // actions Timer timer{"timer", this, 1s, 2s}; - ShutdownAction sa{"terminate", this}; + ShutdownTrigger sa{"terminate", this}; // reactions Reaction r_hello{"r_hello", 1, this, [this]() { hello(); }}; diff --git a/include/reactor-cpp/action.hh b/include/reactor-cpp/action.hh index 5f86b9d2..d1dec0c1 100644 --- a/include/reactor-cpp/action.hh +++ b/include/reactor-cpp/action.hh @@ -139,7 +139,7 @@ public: , period_(period) {} void startup() final; - void shutdown() final {} + void shutdown() override {} void setup() noexcept final {} [[nodiscard]] auto offset() const noexcept -> const Duration& { return offset_; } @@ -147,20 +147,16 @@ public: [[nodiscard]] auto period() const noexcept -> const Duration& { return period_; } }; -class StartupAction : public Timer { +class StartupTrigger : public Timer { public: - StartupAction(const std::string& name, Reactor* container) + StartupTrigger(const std::string& name, Reactor* container) : Timer(name, container) {} }; -class ShutdownAction : public BaseAction { +class ShutdownTrigger : public Timer { public: - ShutdownAction(const std::string& name, Reactor* container) - : BaseAction(name, container, true, Duration::zero()) {} + ShutdownTrigger(const std::string& name, Reactor* container); - void setup() noexcept final {} - void cleanup() noexcept final {} - void startup() final {} void shutdown() final; }; diff --git a/lib/action.cc b/lib/action.cc index fb6e7a32..680d5432 100644 --- a/lib/action.cc +++ b/lib/action.cc @@ -11,6 +11,7 @@ #include "reactor-cpp/assert.hh" #include "reactor-cpp/environment.hh" #include "reactor-cpp/reaction.hh" +#include "reactor-cpp/time.hh" namespace reactor { @@ -35,6 +36,11 @@ void BaseAction::register_scheduler(Reaction* reaction) { } void Timer::startup() { + // abort if the offset is the maximum duration + if (offset_ == Duration::max()) { + return; + } + Tag tag_zero = Tag::from_physical_time(environment()->start_time()); if (offset_ != Duration::zero()) { environment()->scheduler()->schedule_sync(this, tag_zero.delay(offset_)); @@ -52,7 +58,10 @@ void Timer::cleanup() noexcept { } } -void ShutdownAction::shutdown() { +ShutdownTrigger::ShutdownTrigger(const std::string& name, Reactor* container) + : Timer(name, container, Duration::zero(), container->environment()->timeout()) {} + +void ShutdownTrigger::shutdown() { Tag tag = Tag::from_logical_time(environment()->logical_time()).delay(); environment()->scheduler()->schedule_sync(this, tag); } diff --git a/lib/environment.cc b/lib/environment.cc index d6589204..23fa4c7c 100644 --- a/lib/environment.cc +++ b/lib/environment.cc @@ -232,9 +232,9 @@ auto Environment::startup() -> std::thread { void Environment::dump_trigger_to_yaml(std::ofstream& yaml, const BaseAction& trigger) { yaml << " - name: " << trigger.name() << std::endl; - if (dynamic_cast(&trigger) != nullptr) { + if (dynamic_cast(&trigger) != nullptr) { yaml << " type: startup" << std::endl; - } else if (dynamic_cast(&trigger) != nullptr) { + } else if (dynamic_cast(&trigger) != nullptr) { yaml << " type: shutdown" << std::endl; } else if (dynamic_cast(&trigger) != nullptr) { yaml << " type: timer" << std::endl; From 8dfe7fcd454fb63fcd0d330b1a175a825fcfe76d Mon Sep 17 00:00:00 2001 From: Christian Menard Date: Wed, 14 Dec 2022 16:29:15 +0100 Subject: [PATCH 3/7] make sure shutdown is thread safe and can only be called once Fixes https://github.com/lf-lang/reactor-cpp/issues/34 --- include/reactor-cpp/environment.hh | 2 ++ lib/environment.cc | 16 +++++++++++++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/include/reactor-cpp/environment.hh b/include/reactor-cpp/environment.hh index 9acaa30c..f86ad9fb 100644 --- a/include/reactor-cpp/environment.hh +++ b/include/reactor-cpp/environment.hh @@ -48,6 +48,8 @@ private: void build_dependency_graph(Reactor* reactor); void calculate_indexes(); + std::mutex shutdown_mutex_{}; + public: explicit Environment(unsigned int num_workers, bool run_forever = default_run_forever, bool fast_fwd_execution = default_fast_fwd_execution, const Duration& timeout = Duration::max()) diff --git a/lib/environment.cc b/lib/environment.cc index 23fa4c7c..f021f933 100644 --- a/lib/environment.cc +++ b/lib/environment.cc @@ -88,10 +88,20 @@ void Environment::build_dependency_graph(Reactor* reactor) { // NOLINT } void Environment::sync_shutdown() { - validate(this->phase() == Phase::Execution, "sync_shutdown() may only be called during execution phase!"); - phase_ = Phase::Shutdown; + { + std::lock_guard lock(shutdown_mutex_); - log::Info() << "Terminating the execution"; + if (phase_ >= Phase::Shutdown) { + // sync_shutdown() was already called -> abort + return; + } + + validate(phase_ == Phase::Execution, "sync_shutdown() may only be called during execution phase!"); + phase_ = Phase::Shutdown; + } + + // the following will only be executed once + log::Debug() << "Terminating the execution"; for (auto* reactor : top_level_reactors_) { reactor->shutdown(); From c68c904b62e91a09d4f8cdc37f8b16e2466153ab Mon Sep 17 00:00:00 2001 From: Christian Menard Date: Wed, 14 Dec 2022 16:30:16 +0100 Subject: [PATCH 4/7] all actions/triggers should provide is_present() Fixes https://github.com/lf-lang/reactor-cpp/issues/26 --- include/reactor-cpp/action.hh | 25 +++++++++---------------- include/reactor-cpp/impl/action_impl.hh | 6 ++++++ lib/action.cc | 8 +++++++- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/include/reactor-cpp/action.hh b/include/reactor-cpp/action.hh index d1dec0c1..a9ae14ae 100644 --- a/include/reactor-cpp/action.hh +++ b/include/reactor-cpp/action.hh @@ -24,13 +24,14 @@ private: std::set schedulers_{}; const Duration min_delay_{}; const bool logical_{true}; + bool present_{false}; protected: void register_trigger(Reaction* reaction); void register_scheduler(Reaction* reaction); - virtual void setup() noexcept = 0; - virtual void cleanup() noexcept = 0; + virtual void setup() noexcept { present_ = true; } + virtual void cleanup() noexcept { present_ = false; } BaseAction(const std::string& name, Reactor* container, bool logical, Duration min_delay) : ReactorElement(name, ReactorElement::Type::Action, container) @@ -48,6 +49,8 @@ public: [[nodiscard]] auto inline min_delay() const noexcept -> Duration { return min_delay_; } + [[nodiscard]] auto inline is_present() const noexcept -> bool { return present_; } + friend class Reaction; friend class Scheduler; }; @@ -60,7 +63,7 @@ private: std::mutex mutex_events_; void setup() noexcept final; - void cleanup() noexcept final { value_ptr_ = nullptr; } + void cleanup() noexcept final; protected: Action(const std::string& name, Reactor* container, bool logical, Duration min_delay) @@ -88,28 +91,18 @@ public: template void schedule(std::nullptr_t, Dur) = delete; [[nodiscard]] auto get() const noexcept -> const ImmutableValuePtr& { return value_ptr_; } - - [[nodiscard]] auto is_present() const noexcept -> bool { return value_ptr_ != nullptr; } }; template <> class Action : public BaseAction { -private: - bool present_{false}; - - void cleanup() noexcept final { present_ = false; } - void setup() noexcept final { present_ = true; } - protected: Action(const std::string& name, Reactor* container, bool logical, Duration min_delay) : BaseAction(name, container, logical, min_delay) {} public: - void startup() final {} - void shutdown() final {} - template void schedule(Dur delay = Dur::zero()); - [[nodiscard]] auto is_present() const noexcept -> bool { return present_; } + void startup() final {} + void shutdown() final {} }; template class PhysicalAction : public Action { @@ -140,7 +133,6 @@ public: void startup() final; void shutdown() override {} - void setup() noexcept final {} [[nodiscard]] auto offset() const noexcept -> const Duration& { return offset_; } @@ -157,6 +149,7 @@ class ShutdownTrigger : public Timer { public: ShutdownTrigger(const std::string& name, Reactor* container); + void setup() noexcept final; void shutdown() final; }; diff --git a/include/reactor-cpp/impl/action_impl.hh b/include/reactor-cpp/impl/action_impl.hh index 3f14a450..b100c4b3 100644 --- a/include/reactor-cpp/impl/action_impl.hh +++ b/include/reactor-cpp/impl/action_impl.hh @@ -56,6 +56,7 @@ template void Action::schedule(Dur delay) { } template void Action::setup() noexcept { + BaseAction::setup(); if (value_ptr_ == nullptr) { // only do this once, even if the action was triggered multiple times // lock if this is a physical action std::unique_lock lock = @@ -68,6 +69,11 @@ template void Action::setup() noexcept { reactor_assert(value_ptr_ != nullptr); } +template void Action::cleanup() noexcept { + BaseAction::cleanup(); + value_ptr_ = nullptr; +} + } // namespace reactor #endif diff --git a/lib/action.cc b/lib/action.cc index 680d5432..c1c1f466 100644 --- a/lib/action.cc +++ b/lib/action.cc @@ -50,6 +50,7 @@ void Timer::startup() { } void Timer::cleanup() noexcept { + BaseAction::cleanup(); // schedule the timer again if (period_ != Duration::zero()) { Tag now = Tag::from_logical_time(environment()->logical_time()); @@ -59,7 +60,12 @@ void Timer::cleanup() noexcept { } ShutdownTrigger::ShutdownTrigger(const std::string& name, Reactor* container) - : Timer(name, container, Duration::zero(), container->environment()->timeout()) {} + : Timer(name, container, Duration::zero(), container->environment()->timeout()) {} + +void ShutdownTrigger::setup() noexcept { + BaseAction::setup(); + environment()->sync_shutdown(); +} void ShutdownTrigger::shutdown() { Tag tag = Tag::from_logical_time(environment()->logical_time()).delay(); From 1eb61b4ff06685d0c34d4ffbcc3fb01e33516c40 Mon Sep 17 00:00:00 2001 From: Christian Menard Date: Wed, 14 Dec 2022 16:31:11 +0100 Subject: [PATCH 5/7] schedule shutdown only (again) if it is currently not present --- lib/action.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/action.cc b/lib/action.cc index c1c1f466..58b1dbeb 100644 --- a/lib/action.cc +++ b/lib/action.cc @@ -68,8 +68,10 @@ void ShutdownTrigger::setup() noexcept { } void ShutdownTrigger::shutdown() { - Tag tag = Tag::from_logical_time(environment()->logical_time()).delay(); - environment()->scheduler()->schedule_sync(this, tag); + if (!is_present()) { + Tag tag = Tag::from_logical_time(environment()->logical_time()).delay(); + environment()->scheduler()->schedule_sync(this, tag); + } } } // namespace reactor From ceaa393aef4d5a1eccf80e8e3eefb79a72ada279 Mon Sep 17 00:00:00 2001 From: Christian Menard Date: Wed, 14 Dec 2022 16:31:52 +0100 Subject: [PATCH 6/7] update example --- examples/hello/main.cc | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/examples/hello/main.cc b/examples/hello/main.cc index e913b2c3..cbea895e 100644 --- a/examples/hello/main.cc +++ b/examples/hello/main.cc @@ -30,25 +30,10 @@ class Hello : public Reactor { static void terminate() { std::cout << "Good Bye!" << std::endl; } }; -class Timeout : public Reactor { -private: - Timer timer; - - Reaction r_timer{"r_timer", 1, this, [this]() { environment()->sync_shutdown(); }}; - -public: - Timeout(Environment* env, Duration timeout) - : Reactor("Timeout", env) - , timer{"timer", this, Duration::zero(), timeout} {} - - void assemble() override { r_timer.declare_trigger(&timer); } -}; - auto main() -> int { - Environment env{4}; + Environment env{4, false, false, 5s}; Hello hello{&env}; - Timeout timeout{&env, 5s}; env.assemble(); auto thread = env.startup(); From 82e07c3f7a9c31fa92c1bb0fe2eb66064e031dbb Mon Sep 17 00:00:00 2001 From: Christian Menard Date: Fri, 16 Dec 2022 14:47:31 +0100 Subject: [PATCH 7/7] apply clang format --- include/reactor-cpp/multiport.hh | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/include/reactor-cpp/multiport.hh b/include/reactor-cpp/multiport.hh index d9922cc8..e0123b35 100644 --- a/include/reactor-cpp/multiport.hh +++ b/include/reactor-cpp/multiport.hh @@ -37,9 +37,7 @@ public: } // resets parent multiport - inline void clear() noexcept { - size_.store(0, std::memory_order_relaxed); - } + inline void clear() noexcept { size_.store(0, std::memory_order_relaxed); } }; template >