Skip to content

Commit fc4f38c

Browse files
authored
Merge pull request #37 from lf-lang/affiliated
Enable multiple environments and schedulers to run at the same time
2 parents 37f5d94 + 40f9339 commit fc4f38c

File tree

5 files changed

+175
-83
lines changed

5 files changed

+175
-83
lines changed

include/reactor-cpp/environment.hh

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <string>
1414
#include <vector>
1515

16+
#include "reactor-cpp/logging.hh"
1617
#include "reactor-cpp/time.hh"
1718
#include "reactor.hh"
1819
#include "scheduler.hh"
@@ -30,6 +31,9 @@ public:
3031

3132
private:
3233
using Dependency = std::pair<Reaction*, Reaction*>;
34+
35+
const std::string name_{};
36+
const log::NamedLogger log_;
3337
const unsigned int num_workers_{default_number_worker};
3438
unsigned int max_reaction_index_{default_max_reaction_index};
3539
const bool run_forever_{default_run_forever};
@@ -39,6 +43,13 @@ private:
3943
std::set<Reaction*> reactions_{};
4044
std::vector<Dependency> dependencies_{};
4145

46+
/// The environment containing this environment. nullptr if this is the top environment
47+
Environment* containing_environment_{nullptr};
48+
/// Set of all environments contained by this environment
49+
std::set<Environment*> contained_environments_{};
50+
/// Pointer to the top level environment
51+
Environment* top_environment_{nullptr};
52+
4253
Scheduler scheduler_;
4354
Phase phase_{Phase::Construction};
4455
TimePoint start_time_{};
@@ -50,14 +61,14 @@ private:
5061

5162
std::mutex shutdown_mutex_{};
5263

64+
auto startup(const TimePoint& start_time) -> std::thread;
65+
5366
public:
5467
explicit Environment(unsigned int num_workers, bool run_forever = default_run_forever,
55-
bool fast_fwd_execution = default_fast_fwd_execution, const Duration& timeout = Duration::max())
56-
: num_workers_(num_workers)
57-
, run_forever_(run_forever)
58-
, fast_fwd_execution_(fast_fwd_execution)
59-
, scheduler_(this)
60-
, timeout_(timeout) {}
68+
bool fast_fwd_execution = default_fast_fwd_execution, const Duration& timeout = Duration::max());
69+
explicit Environment(const std::string& name, Environment* containing_environment);
70+
71+
auto name() -> const std::string& { return name_; }
6172

6273
void register_reactor(Reactor* reactor);
6374
void assemble();

include/reactor-cpp/logging.hh

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
#include <iostream>
1616
#include <memory>
1717
#include <mutex>
18+
#include <string>
19+
#include <utility>
1820

1921
namespace reactor::log {
2022

2123
template <bool enabled> class BaseLogger {};
2224

23-
template <> class BaseLogger<true> { // NOLINT
25+
template <> class BaseLogger<true> {
2426
private:
2527
using Lock = std::unique_lock<std::mutex>;
2628

@@ -34,6 +36,10 @@ public:
3436
, lock_(mutex_) {
3537
std::cerr << log_prefix;
3638
}
39+
BaseLogger(const BaseLogger&) = delete;
40+
auto operator=(const BaseLogger&) -> BaseLogger& = delete;
41+
BaseLogger(BaseLogger&&) = delete;
42+
auto operator=(BaseLogger&&) -> BaseLogger& = delete;
3743

3844
template <class T> auto operator<<(const T& msg) -> BaseLogger& {
3945
std::cerr << msg; // NOLINT
@@ -43,36 +49,61 @@ public:
4349
~BaseLogger() { std::cerr << std::endl; }
4450
};
4551

46-
template <> class BaseLogger<false> { // NOLINT
52+
template <> class BaseLogger<false> {
4753
public:
4854
explicit BaseLogger([[maybe_unused]] const std::string& /*unused*/) {}
55+
BaseLogger(const BaseLogger&) = delete;
56+
auto operator=(const BaseLogger&) -> BaseLogger& = delete;
57+
BaseLogger(BaseLogger&&) = delete;
58+
auto operator=(BaseLogger&&) -> BaseLogger& = delete;
4959

50-
template <class T> auto operator<<(const T& /*unused*/) const -> const BaseLogger& { return *this; }
60+
template <class T> auto operator<<([[maybe_unused]] const T& /*unused*/) const -> const BaseLogger& { return *this; }
5161

5262
~BaseLogger() = default;
5363
};
5464

5565
constexpr bool debug_enabled = 4 <= REACTOR_CPP_LOG_LEVEL;
5666
constexpr bool info_enabled = 3 <= REACTOR_CPP_LOG_LEVEL;
57-
constexpr bool warning_enabled = 2 <= REACTOR_CPP_LOG_LEVEL;
67+
constexpr bool warn_enabled = 2 <= REACTOR_CPP_LOG_LEVEL;
5868
constexpr bool error_enabled = 1 <= REACTOR_CPP_LOG_LEVEL;
5969

6070
struct Debug : BaseLogger<debug_enabled> {
6171
Debug()
62-
: BaseLogger<debug_enabled>("[DEBUG] "){}; // NOLINT Update C++20
72+
: BaseLogger<debug_enabled>("[DEBUG] "){};
6373
};
6474
struct Info : BaseLogger<info_enabled> {
6575
Info()
66-
: BaseLogger<info_enabled>("[INFO] "){}; // NOLINT Update C++20
76+
: BaseLogger<info_enabled>("[INFO] "){};
6777
};
68-
struct Warn : BaseLogger<warning_enabled> {
78+
struct Warn : BaseLogger<warn_enabled> {
6979
Warn()
70-
: BaseLogger<warning_enabled>("[WARN] "){}; // NOLINT Update C++20
80+
: BaseLogger<warn_enabled>("[WARN] "){};
7181
};
7282
struct Error : BaseLogger<error_enabled> {
7383
Error()
74-
: BaseLogger<error_enabled>("[ERROR] "){}; // NOLINT Update C++20
84+
: BaseLogger<error_enabled>("[ERROR] "){};
7585
};
86+
87+
class NamedLogger {
88+
private:
89+
const std::string debug_prefix_{};
90+
const std::string info_prefix_{};
91+
const std::string warn_prefix_{};
92+
const std::string error_prefix_{};
93+
94+
public:
95+
NamedLogger(const std::string& name)
96+
: debug_prefix_("[DEBUG] (" + name + ") ")
97+
, info_prefix_("[INFO] (" + name + ") ")
98+
, warn_prefix_("[WARN] (" + name + ") ")
99+
, error_prefix_("[ERROR] (" + name + ") ") {}
100+
101+
[[nodiscard]] auto debug() const -> BaseLogger<debug_enabled> { return BaseLogger<debug_enabled>(debug_prefix_); }
102+
[[nodiscard]] auto info() const -> BaseLogger<info_enabled> { return BaseLogger<info_enabled>(info_prefix_); }
103+
[[nodiscard]] auto warn() const -> BaseLogger<warn_enabled> { return BaseLogger<warn_enabled>(warn_prefix_); }
104+
[[nodiscard]] auto error() const -> BaseLogger<error_enabled> { return BaseLogger<error_enabled>(error_prefix_); }
105+
};
106+
76107
} // namespace reactor::log
77108

78109
#endif // REACTOR_CPP_LOGGING_HH

include/reactor-cpp/scheduler.hh

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include "fwd.hh"
2323
#include "logical_time.hh"
24+
#include "reactor-cpp/logging.hh"
2425
#include "reactor-cpp/time.hh"
2526
#include "safe_vector.hh"
2627
#include "semaphore.hh"
@@ -32,20 +33,23 @@ class Scheduler;
3233
class Worker;
3334

3435
class Worker { // NOLINT
35-
public:
36+
private:
3637
Scheduler& scheduler_;
3738
const unsigned int identity_{0};
3839
std::thread thread_{};
40+
log::NamedLogger log_;
3941

4042
// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
4143
static thread_local const Worker* current_worker;
4244

4345
void work() const;
4446
void execute_reaction(Reaction* reaction) const;
4547

46-
Worker(Scheduler& scheduler, unsigned int identity)
48+
public:
49+
Worker(Scheduler& scheduler, unsigned int identity, const std::string& name)
4750
: scheduler_{scheduler}
48-
, identity_{identity} {}
51+
, identity_{identity}
52+
, log_(name) {}
4953
Worker(Worker&& worker); // NOLINT(performance-noexcept-move-constructor)
5054
Worker(const Worker& worker) = delete;
5155

@@ -62,10 +66,12 @@ private:
6266
Semaphore sem_{0};
6367
std::ptrdiff_t waiting_workers_{0};
6468
const unsigned int num_workers_;
69+
log::NamedLogger& log_;
6570

6671
public:
67-
explicit ReadyQueue(unsigned num_workers)
68-
: num_workers_(num_workers) {}
72+
explicit ReadyQueue(log::NamedLogger& log, unsigned num_workers)
73+
: num_workers_(num_workers)
74+
, log_(log) {}
6975

7076
/**
7177
* Retrieve a ready reaction from the queue.
@@ -95,15 +101,19 @@ class Scheduler { // NOLINT
95101
private:
96102
const bool using_workers_;
97103
LogicalTime logical_time_{};
104+
TimePoint last_observed_physical_time_{TimePoint::min()};
98105

99106
Environment* environment_;
100107
std::vector<Worker> workers_{};
108+
log::NamedLogger log_;
101109

102110
std::mutex scheduling_mutex_;
103111
std::condition_variable cv_schedule_;
104112

105113
std::shared_mutex mutex_event_queue_;
106114
std::map<Tag, ActionListPtr> event_queue_;
115+
/// stores the actions triggered at the current tag
116+
ActionListPtr triggered_actions_{nullptr};
107117

108118
std::vector<ActionListPtr> action_list_pool_;
109119
static constexpr std::size_t action_list_pool_increment_{10};

lib/environment.cc

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
#include <algorithm>
1212
#include <fstream>
1313
#include <map>
14+
#include <thread>
15+
#include <vector>
1416

1517
#include "reactor-cpp/action.hh"
1618
#include "reactor-cpp/assert.hh"
@@ -21,6 +23,28 @@
2123

2224
namespace reactor {
2325

26+
Environment::Environment(unsigned int num_workers, bool run_forever, bool fast_fwd_execution, const Duration& timeout)
27+
: log_("Environment")
28+
, num_workers_(num_workers)
29+
, run_forever_(run_forever)
30+
, fast_fwd_execution_(fast_fwd_execution)
31+
, top_environment_(this)
32+
, scheduler_(this)
33+
, timeout_(timeout) {}
34+
35+
Environment::Environment(const std::string& name, Environment* containing_environment)
36+
: name_(name)
37+
, log_("Environment " + name)
38+
, num_workers_(containing_environment->num_workers_)
39+
, run_forever_(containing_environment->run_forever_)
40+
, fast_fwd_execution_(containing_environment->fast_fwd_execution_)
41+
, containing_environment_(containing_environment)
42+
, top_environment_(containing_environment_->top_environment_)
43+
, scheduler_(this)
44+
, timeout_(containing_environment->timeout()) {
45+
reactor_assert(containing_environment->contained_environments_.insert(this).second);
46+
}
47+
2448
void Environment::register_reactor(Reactor* reactor) {
2549
reactor_assert(reactor != nullptr);
2650
validate(this->phase() == Phase::Construction, "Reactors may only be registered during construction phase!");
@@ -36,6 +60,7 @@ void recursive_assemble(Reactor* container) { // NOLINT
3660
}
3761

3862
void Environment::assemble() {
63+
log_.debug() << "Assemble";
3964
validate(this->phase() == Phase::Construction, "assemble() may only be called during construction phase!");
4065
phase_ = Phase::Assembly;
4166
for (auto* reactor : top_level_reactors_) {
@@ -47,6 +72,11 @@ void Environment::assemble() {
4772
build_dependency_graph(reactor);
4873
}
4974
calculate_indexes();
75+
76+
// assemble all contained environments
77+
for (auto* env : contained_environments_) {
78+
env->assemble();
79+
}
5080
}
5181

5282
void Environment::build_dependency_graph(Reactor* reactor) { // NOLINT
@@ -89,7 +119,7 @@ void Environment::build_dependency_graph(Reactor* reactor) { // NOLINT
89119

90120
void Environment::sync_shutdown() {
91121
{
92-
std::lock_guard<std::mutex> lock(shutdown_mutex_);
122+
std::lock_guard<std::mutex> lock{shutdown_mutex_};
93123

94124
if (phase_ >= Phase::Shutdown) {
95125
// sync_shutdown() was already called -> abort
@@ -101,7 +131,7 @@ void Environment::sync_shutdown() {
101131
}
102132

103133
// the following will only be executed once
104-
log::Debug() << "Terminating the execution";
134+
log_.debug() << "Terminating the execution";
105135

106136
for (auto* reactor : top_level_reactors_) {
107137
reactor->shutdown();
@@ -164,7 +194,7 @@ void Environment::export_dependency_graph(const std::string& path) {
164194

165195
dot.close();
166196

167-
log::Info() << "Reaction graph was written to " << path;
197+
log_.info() << "Reaction graph was written to " << path;
168198
}
169199

170200
void Environment::calculate_indexes() {
@@ -177,7 +207,7 @@ void Environment::calculate_indexes() {
177207
graph[dependencies.first].insert(dependencies.second);
178208
}
179209

180-
log::Debug() << "Reactions sorted by index:";
210+
log_.debug() << "Reactions sorted by index:";
181211
unsigned int index = 0;
182212
while (!graph.empty()) {
183213
// find nodes with degree zero and assign index
@@ -195,7 +225,7 @@ void Environment::calculate_indexes() {
195225
"/tmp/reactor_dependency_graph.dot");
196226
}
197227

198-
log::Debug dbg;
228+
auto dbg = log_.debug();
199229
dbg << index << ": ";
200230
for (auto* reaction : degree_zero) {
201231
dbg << reaction->fqn() << ", ";
@@ -218,26 +248,39 @@ void Environment::calculate_indexes() {
218248
}
219249

220250
auto Environment::startup() -> std::thread {
221-
validate(this->phase() == Phase::Assembly, "startup() may only be called during assembly phase!");
251+
validate(this == top_environment_, "startup() may only be called on the top environment");
252+
auto start_time = get_physical_time();
253+
return startup(start_time);
254+
}
222255

223-
// build the dependency graph
224-
for (auto* reactor : top_level_reactors_) {
225-
build_dependency_graph(reactor);
226-
}
227-
calculate_indexes();
256+
auto Environment::startup(const TimePoint& start_time) -> std::thread {
257+
validate(this->phase() == Phase::Assembly, "startup() may only be called during assembly phase!");
228258

229-
log::Info() << "Starting the execution";
259+
log_.debug() << "Starting the execution";
230260
phase_ = Phase::Startup;
231261

232-
start_time_ = get_physical_time();
262+
start_time_ = start_time;
233263
// start up initialize all reactors
234264
for (auto* reactor : top_level_reactors_) {
235265
reactor->startup();
236266
}
237267

238268
// start processing events
239269
phase_ = Phase::Execution;
240-
return std::thread([this]() { this->scheduler_.start(); });
270+
271+
return std::thread([this]() {
272+
std::vector<std::thread> threads;
273+
// startup all contained environments recursively
274+
for (auto* env : contained_environments_) {
275+
threads.emplace_back(env->startup(start_time_));
276+
}
277+
// start the local scheduler and wait until it returns
278+
this->scheduler_.start();
279+
// then join all the created threads
280+
for (auto& thread : threads) {
281+
thread.join();
282+
}
283+
});
241284
}
242285

243286
void Environment::dump_trigger_to_yaml(std::ofstream& yaml, const BaseAction& trigger) {
@@ -367,7 +410,7 @@ void Environment::dump_to_yaml(const std::string& path) {
367410
yaml << " - to: " << iterator.second->fqn() << std::endl;
368411
}
369412

370-
log::Info() << "Program structure was dumped to " << path;
413+
log_.info() << "Program structure was dumped to " << path;
371414
}
372415

373416
} // namespace reactor

0 commit comments

Comments
 (0)