Skip to content

Commit 10a8055

Browse files
authored
Merge pull request #45 from lf-lang/enclave-connections
Factored event queue into its own class and fixed race condition between multiple starting enclaves
2 parents 84f527e + 1d17d9c commit 10a8055

File tree

3 files changed

+125
-77
lines changed

3 files changed

+125
-77
lines changed

include/reactor-cpp/assert.hh

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,18 @@ constexpr bool runtime_assertion = true;
3333
#include <unistd.h>
3434
#endif
3535

36+
// assert macro that avoids unused variable warnings
37+
#ifdef NDEBUG
38+
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
39+
#define reactor_assert(x) \
40+
do { \
41+
(void)sizeof(x); \
42+
} while (0)
43+
#else
44+
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
45+
#define reactor_assert(x) assert(x)
46+
#endif
47+
3648
namespace reactor {
3749
using EnvPhase = Environment::Phase;
3850

@@ -67,18 +79,6 @@ constexpr inline void validate([[maybe_unused]] bool condition, [[maybe_unused]]
6779
}
6880
}
6981

70-
// assert macro that avoids unused variable warnings
71-
#ifdef NDEBUG
72-
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
73-
#define reactor_assert(x) \
74-
do { \
75-
(void)sizeof(x); \
76-
} while (0)
77-
#else
78-
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
79-
#define reactor_assert(x) assert(x)
80-
#endif
81-
8282
template <typename E> constexpr auto extract_value(E enum_value) -> typename std::underlying_type<E>::type {
8383
return static_cast<typename std::underlying_type<E>::type>(enum_value);
8484
}

include/reactor-cpp/scheduler.hh

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,36 @@ public:
9999
using ActionList = SafeVector<BaseAction*>;
100100
using ActionListPtr = std::unique_ptr<ActionList>;
101101

102+
class EventQueue {
103+
private:
104+
std::shared_mutex mutex_;
105+
std::map<Tag, ActionListPtr> event_queue_;
106+
/// stores the actions triggered at the current tag
107+
ActionListPtr triggered_actions_{nullptr};
108+
109+
std::vector<ActionListPtr> action_list_pool_;
110+
static constexpr std::size_t action_list_pool_increment_{10};
111+
112+
void fill_action_list_pool();
113+
114+
public:
115+
EventQueue() { fill_action_list_pool(); }
116+
117+
[[nodiscard]] auto empty() const -> bool { return event_queue_.empty(); }
118+
[[nodiscard]] auto next_tag() const -> Tag;
119+
120+
auto insert_event_at(const Tag& tag) -> const ActionListPtr&;
121+
122+
// should only be called while holding the scheduler mutex
123+
auto extract_next_event() -> ActionListPtr;
124+
125+
// should only be called while holding the scheduler mutex
126+
void return_action_list(ActionListPtr&& action_list);
127+
128+
// should only be called while holding the scheduler mutex
129+
void discard_events_until_tag(const Tag& tag);
130+
};
131+
102132
class Scheduler { // NOLINT
103133
private:
104134
const bool using_workers_;
@@ -110,21 +140,11 @@ private:
110140

111141
std::mutex scheduling_mutex_;
112142
std::condition_variable cv_schedule_;
113-
// lock used to protect the scheduler from asynchronous requests (i.e. from
114-
// enclaves pr federates). We hold the mutex from construction and release in
115-
// start().
116-
std::unique_lock<std::mutex> startup_lock_{scheduling_mutex_};
117143

118-
std::shared_mutex mutex_event_queue_;
119-
std::map<Tag, ActionListPtr> event_queue_;
144+
EventQueue event_queue_;
120145
/// stores the actions triggered at the current tag
121146
ActionListPtr triggered_actions_{nullptr};
122147

123-
std::vector<ActionListPtr> action_list_pool_;
124-
static constexpr std::size_t action_list_pool_increment_{10};
125-
void fill_action_list_pool();
126-
auto insert_event_at(const Tag& tag) -> const ActionListPtr&;
127-
128148
std::vector<std::vector<BasePort*>> set_ports_;
129149
std::vector<std::vector<Reaction*>> triggered_reactions_;
130150
std::vector<std::vector<Reaction*>> reaction_queue_;

lib/scheduler.cc

Lines changed: 82 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,56 @@ void ReadyQueue::fill_up(std::vector<Reaction*>& ready_reactions) {
149149
}
150150
}
151151

152+
void EventQueue::fill_action_list_pool() {
153+
for (std::size_t i{0}; i < action_list_pool_increment_; i++) {
154+
action_list_pool_.emplace_back(std::make_unique<ActionList>());
155+
}
156+
}
157+
158+
auto EventQueue::next_tag() const -> Tag {
159+
reactor_assert(!event_queue_.empty());
160+
return event_queue_.begin()->first;
161+
}
162+
163+
auto EventQueue::extract_next_event() -> ActionListPtr {
164+
reactor_assert(!event_queue_.empty());
165+
return std::move(event_queue_.extract(event_queue_.begin()).mapped());
166+
}
167+
168+
auto EventQueue::insert_event_at(const Tag& tag) -> const ActionListPtr& {
169+
auto shared_lock = std::shared_lock<std::shared_mutex>(mutex_);
170+
171+
auto event_it = event_queue_.find(tag);
172+
if (event_it == event_queue_.end()) {
173+
shared_lock.unlock();
174+
{
175+
auto unique_lock = std::unique_lock<std::shared_mutex>(mutex_);
176+
if (action_list_pool_.empty()) {
177+
fill_action_list_pool();
178+
}
179+
const auto& result = event_queue_.try_emplace(tag, std::move(action_list_pool_.back()));
180+
if (result.second) {
181+
action_list_pool_.pop_back();
182+
}
183+
return result.first->second;
184+
}
185+
} else {
186+
return event_it->second;
187+
}
188+
}
189+
190+
void EventQueue::return_action_list(ActionListPtr&& action_list) {
191+
reactor_assert(action_list != nullptr);
192+
action_list_pool_.emplace_back(std::forward<ActionListPtr>(action_list));
193+
}
194+
195+
void EventQueue::discard_events_until_tag(const Tag& tag) {
196+
while (!empty() && next_tag() <= tag) {
197+
auto actions = extract_next_event();
198+
return_action_list(std::move(actions));
199+
}
200+
}
201+
152202
void Scheduler::terminate_all_workers() {
153203
log_.debug() << "Send termination signal to all workers";
154204
auto num_workers = environment_->num_workers();
@@ -202,10 +252,25 @@ auto Scheduler::schedule_ready_reactions() -> bool {
202252
void Scheduler::start() {
203253
log_.debug() << "Starting the scheduler...";
204254

205-
// Initialize our logical time to the value right before the start tag. This
206-
// is important for usage with enclaves/federates, to indicate, that no events
207-
// before the start tag ca be generated.
208-
logical_time_.advance_to(environment_->start_tag().decrement());
255+
{
256+
// Other schedulers (enclaves or federates) could try to access our logical
257+
// time and our event queue. Thus, we need to lock the main scheduling mutex
258+
// in order to avoid data races.
259+
std::lock_guard<std::mutex> lock_guard(scheduling_mutex_);
260+
261+
// Initialize our logical time to the value right before the start tag. This
262+
// is important for usage with enclaves/federates, to indicate, that no events
263+
// before the start tag can be generated.
264+
logical_time_.advance_to(environment_->start_tag().decrement());
265+
266+
// It could happen that another scheduler (enclave or federates) already
267+
// tried to acquire a tag before our start tag. In that case, we will have
268+
// empty events on the queue, that are earlier than our startup event. We
269+
// resolve this simply by deleting all such events. Once we have processed
270+
// the startup reactions, the start tag will be released, and consequently
271+
// also all earlier tags are released.
272+
event_queue_.discard_events_until_tag(Tag::from_logical_time(logical_time_));
273+
}
209274

210275
auto num_workers = environment_->num_workers();
211276
// initialize the reaction queue, set ports vector, and triggered reactions
@@ -214,11 +279,6 @@ void Scheduler::start() {
214279
set_ports_.resize(num_workers);
215280
triggered_reactions_.resize(num_workers);
216281

217-
// release the scheduling mutex, allowing other asynchronous processes (i.e.
218-
// enclaves or federates) to access the event queue and the current logical
219-
// time.
220-
startup_lock_.unlock();
221-
222282
// Initialize and start the workers. By resizing the workers vector first,
223283
// we make sure that there is sufficient space for all the workers and non of
224284
// them needs to be moved. This is important because a running worker may not
@@ -265,8 +325,9 @@ void Scheduler::next() { // NOLINT
265325

266326
while (triggered_actions_ == nullptr || triggered_actions_->empty()) {
267327
if (triggered_actions_ != nullptr) {
268-
action_list_pool_.emplace_back(std::move(triggered_actions_));
328+
event_queue_.return_action_list(std::move(triggered_actions_));
269329
}
330+
reactor_assert(triggered_actions_ == nullptr);
270331

271332
// shutdown if there are no more events in the queue
272333
if (event_queue_.empty() && !stop_) {
@@ -283,25 +344,23 @@ void Scheduler::next() { // NOLINT
283344
continue_execution_ = false;
284345
log_.debug() << "Shutting down the scheduler";
285346
Tag t_next = Tag::from_logical_time(logical_time_).delay();
286-
if (!event_queue_.empty() && t_next == event_queue_.begin()->first) {
347+
if (!event_queue_.empty() && t_next == event_queue_.next_tag()) {
287348
log_.debug() << "Schedule the last round of reactions including all "
288349
"termination reactions";
289-
triggered_actions_ = std::move(event_queue_.begin()->second);
290-
event_queue_.erase(event_queue_.begin());
350+
triggered_actions_ = event_queue_.extract_next_event();
291351
log_.debug() << "advance logical time to tag " << t_next;
292352
logical_time_.advance_to(t_next);
293353
} else {
294354
return;
295355
}
296356
} else {
297-
// find the next tag
298-
auto t_next = event_queue_.begin()->first;
357+
auto t_next = event_queue_.next_tag();
299358
log_.debug() << "try to advance logical time to tag " << t_next;
300359

301360
// synchronize with physical time if not in fast forward mode
302361
if (!environment_->fast_fwd_execution()) {
303362
bool result = PhysicalTimeBarrier::acquire_tag(
304-
t_next, lock, cv_schedule_, [&t_next, this]() { return t_next != event_queue_.begin()->first; });
363+
t_next, lock, cv_schedule_, [&t_next, this]() { return t_next != event_queue_.next_tag(); });
305364
// If acquire tag returns false, then a new event was inserted into the queue and we need to start over
306365
if (!result) {
307366
continue;
@@ -312,10 +371,10 @@ void Scheduler::next() { // NOLINT
312371
bool result{true};
313372
for (auto* action : environment_->input_actions_) {
314373
bool inner_result = action->acquire_tag(t_next, lock, cv_schedule_,
315-
[&t_next, this]() { return t_next != event_queue_.begin()->first; });
374+
[&t_next, this]() { return t_next != event_queue_.next_tag(); });
316375
// If the wait was aborted or if the next tag changed in the meantime,
317376
// we need to break from the loop and continue with the main loop.
318-
if (!inner_result || t_next != event_queue_.begin()->first) {
377+
if (!inner_result || t_next != event_queue_.next_tag()) {
319378
result = false;
320379
break;
321380
}
@@ -325,12 +384,11 @@ void Scheduler::next() { // NOLINT
325384
continue;
326385
}
327386

328-
// Retrieve all events with tag equal to current logical time from the
329-
// queue.
387+
// Retrieve all triggered actions at the next tag.
330388
// We do not need to lock mutex_event_queue_ here, as the lock on
331389
// scheduling_mutex_ already ensures that no one can write to the event
332390
// queue.
333-
triggered_actions_ = std::move(event_queue_.extract(event_queue_.begin()).mapped());
391+
triggered_actions_ = event_queue_.extract_next_event();
334392

335393
// advance logical time
336394
log_.debug() << "advance logical time to tag " << t_next;
@@ -364,47 +422,17 @@ Scheduler::Scheduler(Environment* env)
364422
: using_workers_(env->num_workers() > 1)
365423
, environment_(env)
366424
, log_("Scheduler " + env->name())
367-
, ready_queue_(log_, env->num_workers()) {
368-
fill_action_list_pool();
369-
}
425+
, ready_queue_(log_, env->num_workers()) {}
370426

371427
Scheduler::~Scheduler() = default;
372428

373-
void Scheduler::fill_action_list_pool() {
374-
for (std::size_t i{0}; i < action_list_pool_increment_; i++) {
375-
action_list_pool_.emplace_back(std::make_unique<ActionList>());
376-
}
377-
}
378-
379-
auto Scheduler::insert_event_at(const Tag& tag) -> const ActionListPtr& {
380-
auto shared_lock = std::shared_lock<std::shared_mutex>(mutex_event_queue_);
381-
382-
auto event_it = event_queue_.find(tag);
383-
if (event_it == event_queue_.end()) {
384-
shared_lock.unlock();
385-
{
386-
auto unique_lock = std::unique_lock<std::shared_mutex>(mutex_event_queue_);
387-
if (action_list_pool_.empty()) {
388-
fill_action_list_pool();
389-
}
390-
const auto& result = event_queue_.try_emplace(tag, std::move(action_list_pool_.back()));
391-
if (result.second) {
392-
action_list_pool_.pop_back();
393-
}
394-
return result.first->second;
395-
}
396-
} else {
397-
return event_it->second;
398-
}
399-
}
400-
401429
void Scheduler::schedule_sync(BaseAction* action, const Tag& tag) {
402430
log_.debug() << "Schedule action " << action->fqn() << (action->is_logical() ? " synchronously " : " asynchronously ")
403431
<< " with tag " << tag;
404432
reactor_assert(logical_time_ < tag);
405433
tracepoint(reactor_cpp, schedule_action, action->container()->fqn(), action->name(), tag);
406434

407-
const auto& action_list = insert_event_at(tag);
435+
const auto& action_list = event_queue_.insert_event_at(tag);
408436
action_list->push_back(action);
409437
}
410438

@@ -441,7 +469,7 @@ auto Scheduler::schedule_empty_async_at(const Tag& tag) -> bool {
441469
// processed.
442470
return tag == logical_time_;
443471
}
444-
insert_event_at(tag);
472+
event_queue_.insert_event_at(tag);
445473
}
446474
notify();
447475
return true;

0 commit comments

Comments
 (0)