Skip to content

Commit

Permalink
Merge pull request #36 from lf-lang/timeout
Browse files Browse the repository at this point in the history
Add native timeout implementation, make sync_shutdown threadsafe and  add is_present for timers
  • Loading branch information
cmnrd authored Dec 21, 2022
2 parents e133bbd + 82e07c3 commit ea4b3e6
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 55 deletions.
20 changes: 3 additions & 17 deletions examples/hello/main.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <iostream>

#include "reactor-cpp/action.hh"
#include "reactor-cpp/reactor-cpp.hh"

using namespace reactor;
Expand All @@ -9,7 +10,7 @@ class Hello : public Reactor {
private:
// actions
Timer timer{"timer", this, 1s, 2s};
ShutdownAction sa{"terminate", this};
ShutdownTrigger sa{"terminate", this};

// reactions
Reaction r_hello{"r_hello", 1, this, [this]() { hello(); }};
Expand All @@ -29,25 +30,10 @@ class Hello : public Reactor {
static void terminate() { std::cout << "Good Bye!" << std::endl; }
};

class Timeout : public Reactor {
private:
Timer timer;

Reaction r_timer{"r_timer", 1, this, [this]() { environment()->sync_shutdown(); }};

public:
Timeout(Environment* env, Duration timeout)
: Reactor("Timeout", env)
, timer{"timer", this, Duration::zero(), timeout} {}

void assemble() override { r_timer.declare_trigger(&timer); }
};

auto main() -> int {
Environment env{4};
Environment env{4, false, false, 5s};

Hello hello{&env};
Timeout timeout{&env, 5s};
env.assemble();

auto thread = env.startup();
Expand Down
39 changes: 14 additions & 25 deletions include/reactor-cpp/action.hh
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ private:
std::set<Reaction*> schedulers_{};
const Duration min_delay_{};
const bool logical_{true};
bool present_{false};

protected:
void register_trigger(Reaction* reaction);
void register_scheduler(Reaction* reaction);

virtual void setup() noexcept = 0;
virtual void cleanup() noexcept = 0;
virtual void setup() noexcept { present_ = true; }
virtual void cleanup() noexcept { present_ = false; }

BaseAction(const std::string& name, Reactor* container, bool logical, Duration min_delay)
: ReactorElement(name, ReactorElement::Type::Action, container)
Expand All @@ -48,6 +49,8 @@ public:

[[nodiscard]] auto inline min_delay() const noexcept -> Duration { return min_delay_; }

[[nodiscard]] auto inline is_present() const noexcept -> bool { return present_; }

friend class Reaction;
friend class Scheduler;
};
Expand All @@ -60,7 +63,7 @@ private:
std::mutex mutex_events_;

void setup() noexcept final;
void cleanup() noexcept final { value_ptr_ = nullptr; }
void cleanup() noexcept final;

protected:
Action(const std::string& name, Reactor* container, bool logical, Duration min_delay)
Expand Down Expand Up @@ -88,28 +91,18 @@ public:
template <class Dur = Duration> void schedule(std::nullptr_t, Dur) = delete;

[[nodiscard]] auto get() const noexcept -> const ImmutableValuePtr<T>& { return value_ptr_; }

[[nodiscard]] auto is_present() const noexcept -> bool { return value_ptr_ != nullptr; }
};

template <> class Action<void> : public BaseAction {
private:
bool present_{false};

void cleanup() noexcept final { present_ = false; }
void setup() noexcept final { present_ = true; }

protected:
Action(const std::string& name, Reactor* container, bool logical, Duration min_delay)
: BaseAction(name, container, logical, min_delay) {}

public:
void startup() final {}
void shutdown() final {}

template <class Dur = Duration> void schedule(Dur delay = Dur::zero());

[[nodiscard]] auto is_present() const noexcept -> bool { return present_; }
void startup() final {}
void shutdown() final {}
};

template <class T> class PhysicalAction : public Action<T> {
Expand Down Expand Up @@ -139,28 +132,24 @@ public:
, period_(period) {}

void startup() final;
void shutdown() final {}
void setup() noexcept final {}
void shutdown() override {}

[[nodiscard]] auto offset() const noexcept -> const Duration& { return offset_; }

[[nodiscard]] auto period() const noexcept -> const Duration& { return period_; }
};

class StartupAction : public Timer {
class StartupTrigger : public Timer {
public:
StartupAction(const std::string& name, Reactor* container)
StartupTrigger(const std::string& name, Reactor* container)
: Timer(name, container) {}
};

class ShutdownAction : public BaseAction {
class ShutdownTrigger : public Timer {
public:
ShutdownAction(const std::string& name, Reactor* container)
: BaseAction(name, container, true, Duration::zero()) {}
ShutdownTrigger(const std::string& name, Reactor* container);

void setup() noexcept final {}
void cleanup() noexcept final {}
void startup() final {}
void setup() noexcept final;
void shutdown() final;
};

Expand Down
11 changes: 9 additions & 2 deletions include/reactor-cpp/environment.hh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <string>
#include <vector>

#include "reactor-cpp/time.hh"
#include "reactor.hh"
#include "scheduler.hh"

Expand Down Expand Up @@ -42,16 +43,21 @@ private:
Phase phase_{Phase::Construction};
TimePoint start_time_{};

const Duration timeout_{};

void build_dependency_graph(Reactor* reactor);
void calculate_indexes();

std::mutex shutdown_mutex_{};

public:
explicit Environment(unsigned int num_workers, bool run_forever = default_run_forever,
bool fast_fwd_execution = default_fast_fwd_execution)
bool fast_fwd_execution = default_fast_fwd_execution, const Duration& timeout = Duration::max())
: num_workers_(num_workers)
, run_forever_(run_forever)
, fast_fwd_execution_(fast_fwd_execution)
, scheduler_(this) {}
, scheduler_(this)
, timeout_(timeout) {}

void register_reactor(Reactor* reactor);
void assemble();
Expand All @@ -76,6 +82,7 @@ public:

[[nodiscard]] auto logical_time() const noexcept -> const LogicalTime& { return scheduler_.logical_time(); }
[[nodiscard]] auto start_time() const noexcept -> const TimePoint& { return start_time_; }
[[nodiscard]] auto timeout() const noexcept -> const Duration& { return timeout_; }

static auto physical_time() noexcept -> TimePoint { return get_physical_time(); }

Expand Down
6 changes: 6 additions & 0 deletions include/reactor-cpp/impl/action_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ template <class Dur> void Action<void>::schedule(Dur delay) {
}

template <class T> void Action<T>::setup() noexcept {
BaseAction::setup();
if (value_ptr_ == nullptr) { // only do this once, even if the action was triggered multiple times
// lock if this is a physical action
std::unique_lock<std::mutex> lock =
Expand All @@ -68,6 +69,11 @@ template <class T> void Action<T>::setup() noexcept {
reactor_assert(value_ptr_ != nullptr);
}

template <class T> void Action<T>::cleanup() noexcept {
BaseAction::cleanup();
value_ptr_ = nullptr;
}

} // namespace reactor

#endif
4 changes: 1 addition & 3 deletions include/reactor-cpp/multiport.hh
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ public:
}

// resets parent multiport
inline void clear() noexcept {
size_.store(0, std::memory_order_relaxed);
}
inline void clear() noexcept { size_.store(0, std::memory_order_relaxed); }
};

template <class T, class A = std::allocator<T>>
Expand Down
23 changes: 20 additions & 3 deletions lib/action.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "reactor-cpp/assert.hh"
#include "reactor-cpp/environment.hh"
#include "reactor-cpp/reaction.hh"
#include "reactor-cpp/time.hh"

namespace reactor {

Expand All @@ -35,6 +36,11 @@ void BaseAction::register_scheduler(Reaction* reaction) {
}

void Timer::startup() {
// abort if the offset is the maximum duration
if (offset_ == Duration::max()) {
return;
}

Tag tag_zero = Tag::from_physical_time(environment()->start_time());
if (offset_ != Duration::zero()) {
environment()->scheduler()->schedule_sync(this, tag_zero.delay(offset_));
Expand All @@ -44,6 +50,7 @@ void Timer::startup() {
}

void Timer::cleanup() noexcept {
BaseAction::cleanup();
// schedule the timer again
if (period_ != Duration::zero()) {
Tag now = Tag::from_logical_time(environment()->logical_time());
Expand All @@ -52,9 +59,19 @@ void Timer::cleanup() noexcept {
}
}

void ShutdownAction::shutdown() {
Tag tag = Tag::from_logical_time(environment()->logical_time()).delay();
environment()->scheduler()->schedule_sync(this, tag);
ShutdownTrigger::ShutdownTrigger(const std::string& name, Reactor* container)
: Timer(name, container, Duration::zero(), container->environment()->timeout()) {}

void ShutdownTrigger::setup() noexcept {
BaseAction::setup();
environment()->sync_shutdown();
}

void ShutdownTrigger::shutdown() {
if (!is_present()) {
Tag tag = Tag::from_logical_time(environment()->logical_time()).delay();
environment()->scheduler()->schedule_sync(this, tag);
}
}

} // namespace reactor
21 changes: 16 additions & 5 deletions lib/environment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "reactor-cpp/logging.hh"
#include "reactor-cpp/port.hh"
#include "reactor-cpp/reaction.hh"
#include "reactor-cpp/time.hh"

namespace reactor {

Expand Down Expand Up @@ -87,10 +88,20 @@ void Environment::build_dependency_graph(Reactor* reactor) { // NOLINT
}

void Environment::sync_shutdown() {
validate(this->phase() == Phase::Execution, "sync_shutdown() may only be called during execution phase!");
phase_ = Phase::Shutdown;
{
std::lock_guard<std::mutex> lock(shutdown_mutex_);

log::Info() << "Terminating the execution";
if (phase_ >= Phase::Shutdown) {
// sync_shutdown() was already called -> abort
return;
}

validate(phase_ == Phase::Execution, "sync_shutdown() may only be called during execution phase!");
phase_ = Phase::Shutdown;
}

// the following will only be executed once
log::Debug() << "Terminating the execution";

for (auto* reactor : top_level_reactors_) {
reactor->shutdown();
Expand Down Expand Up @@ -231,9 +242,9 @@ auto Environment::startup() -> std::thread {

void Environment::dump_trigger_to_yaml(std::ofstream& yaml, const BaseAction& trigger) {
yaml << " - name: " << trigger.name() << std::endl;
if (dynamic_cast<const StartupAction*>(&trigger) != nullptr) {
if (dynamic_cast<const StartupTrigger*>(&trigger) != nullptr) {
yaml << " type: startup" << std::endl;
} else if (dynamic_cast<const ShutdownAction*>(&trigger) != nullptr) {
} else if (dynamic_cast<const ShutdownTrigger*>(&trigger) != nullptr) {
yaml << " type: shutdown" << std::endl;
} else if (dynamic_cast<const Timer*>(&trigger) != nullptr) {
yaml << " type: timer" << std::endl;
Expand Down

0 comments on commit ea4b3e6

Please # to comment.