Skip to content

Commit 8ffddae

Browse files
committed
factor out the event queue in its own class
1 parent 1ec6e5b commit 8ffddae

File tree

2 files changed

+84
-52
lines changed

2 files changed

+84
-52
lines changed

include/reactor-cpp/scheduler.hh

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,33 @@ public:
9999
using ActionList = SafeVector<BaseAction*>;
100100
using ActionListPtr = std::unique_ptr<ActionList>;
101101

102+
class EventQueue {
103+
private:
104+
std::shared_mutex mutex_;
105+
std::map<Tag, ActionListPtr> event_queue_;
106+
/// stores the actions triggered at the current tag
107+
ActionListPtr triggered_actions_{nullptr};
108+
109+
std::vector<ActionListPtr> action_list_pool_;
110+
static constexpr std::size_t action_list_pool_increment_{10};
111+
112+
void fill_action_list_pool();
113+
114+
public:
115+
EventQueue() { fill_action_list_pool(); }
116+
117+
[[nodiscard]] auto empty() const -> bool { return event_queue_.empty(); }
118+
[[nodiscard]] auto next_tag() const -> Tag;
119+
120+
auto insert_event_at(const Tag& tag) -> const ActionListPtr&;
121+
122+
// should only be called while holding the scheduler mutex
123+
auto extract_next_event() -> ActionListPtr&&;
124+
125+
// should only be called while holding the scheduler mutex
126+
void return_action_list(ActionListPtr&& action_list);
127+
};
128+
102129
class Scheduler { // NOLINT
103130
private:
104131
const bool using_workers_;
@@ -111,16 +138,10 @@ private:
111138
std::mutex scheduling_mutex_;
112139
std::condition_variable cv_schedule_;
113140

114-
std::shared_mutex mutex_event_queue_;
115-
std::map<Tag, ActionListPtr> event_queue_;
141+
EventQueue event_queue_;
116142
/// stores the actions triggered at the current tag
117143
ActionListPtr triggered_actions_{nullptr};
118144

119-
std::vector<ActionListPtr> action_list_pool_;
120-
static constexpr std::size_t action_list_pool_increment_{10};
121-
void fill_action_list_pool();
122-
auto insert_event_at(const Tag& tag) -> const ActionListPtr&;
123-
124145
std::vector<std::vector<BasePort*>> set_ports_;
125146
std::vector<std::vector<Reaction*>> triggered_reactions_;
126147
std::vector<std::vector<Reaction*>> reaction_queue_;

lib/scheduler.cc

Lines changed: 56 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,49 @@ void ReadyQueue::fill_up(std::vector<Reaction*>& ready_reactions) {
149149
}
150150
}
151151

152+
void EventQueue::fill_action_list_pool() {
153+
for (std::size_t i{0}; i < action_list_pool_increment_; i++) {
154+
action_list_pool_.emplace_back(std::make_unique<ActionList>());
155+
}
156+
}
157+
158+
auto EventQueue::next_tag() const -> Tag {
159+
reactor_assert(!event_queue_.empty());
160+
return event_queue_.begin()->first;
161+
}
162+
163+
auto EventQueue::extract_next_event() -> ActionListPtr&& {
164+
reactor_assert(!event_queue_.empty());
165+
return std::move(event_queue_.extract(event_queue_.begin()).mapped());
166+
}
167+
168+
auto EventQueue::insert_event_at(const Tag& tag) -> const ActionListPtr& {
169+
auto shared_lock = std::shared_lock<std::shared_mutex>(mutex_);
170+
171+
auto event_it = event_queue_.find(tag);
172+
if (event_it == event_queue_.end()) {
173+
shared_lock.unlock();
174+
{
175+
auto unique_lock = std::unique_lock<std::shared_mutex>(mutex_);
176+
if (action_list_pool_.empty()) {
177+
fill_action_list_pool();
178+
}
179+
const auto& result = event_queue_.try_emplace(tag, std::move(action_list_pool_.back()));
180+
if (result.second) {
181+
action_list_pool_.pop_back();
182+
}
183+
return result.first->second;
184+
}
185+
} else {
186+
return event_it->second;
187+
}
188+
}
189+
190+
void EventQueue::return_action_list(ActionListPtr&& action_list) {
191+
reactor_assert(action_list != nullptr);
192+
action_list_pool_.emplace_back(std::forward<ActionListPtr>(action_list));
193+
}
194+
152195
void Scheduler::terminate_all_workers() {
153196
log_.debug() << "Send termination signal to all workers";
154197
auto num_workers = environment_->num_workers();
@@ -260,8 +303,9 @@ void Scheduler::next() { // NOLINT
260303

261304
while (triggered_actions_ == nullptr || triggered_actions_->empty()) {
262305
if (triggered_actions_ != nullptr) {
263-
action_list_pool_.emplace_back(std::move(triggered_actions_));
306+
event_queue_.return_action_list(std::move(triggered_actions_));
264307
}
308+
reactor_assert(triggered_actions_ == nullptr);
265309

266310
// shutdown if there are no more events in the queue
267311
if (event_queue_.empty() && !stop_) {
@@ -278,25 +322,23 @@ void Scheduler::next() { // NOLINT
278322
continue_execution_ = false;
279323
log_.debug() << "Shutting down the scheduler";
280324
Tag t_next = Tag::from_logical_time(logical_time_).delay();
281-
if (!event_queue_.empty() && t_next == event_queue_.begin()->first) {
325+
if (!event_queue_.empty() && t_next == event_queue_.next_tag()) {
282326
log_.debug() << "Schedule the last round of reactions including all "
283327
"termination reactions";
284-
triggered_actions_ = std::move(event_queue_.begin()->second);
285-
event_queue_.erase(event_queue_.begin());
328+
triggered_actions_ = std::move(event_queue_.extract_next_event());
286329
log_.debug() << "advance logical time to tag " << t_next;
287330
logical_time_.advance_to(t_next);
288331
} else {
289332
return;
290333
}
291334
} else {
292-
// find the next tag
293-
auto t_next = event_queue_.begin()->first;
335+
auto t_next = event_queue_.next_tag();
294336
log_.debug() << "try to advance logical time to tag " << t_next;
295337

296338
// synchronize with physical time if not in fast forward mode
297339
if (!environment_->fast_fwd_execution()) {
298340
bool result = PhysicalTimeBarrier::acquire_tag(
299-
t_next, lock, cv_schedule_, [&t_next, this]() { return t_next != event_queue_.begin()->first; });
341+
t_next, lock, cv_schedule_, [&t_next, this]() { return t_next != event_queue_.next_tag(); });
300342
// If acquire tag returns false, then a new event was inserted into the queue and we need to start over
301343
if (!result) {
302344
continue;
@@ -307,10 +349,10 @@ void Scheduler::next() { // NOLINT
307349
bool result{true};
308350
for (auto* action : environment_->input_actions_) {
309351
bool inner_result = action->acquire_tag(t_next, lock, cv_schedule_,
310-
[&t_next, this]() { return t_next != event_queue_.begin()->first; });
352+
[&t_next, this]() { return t_next != event_queue_.next_tag(); });
311353
// If the wait was aborted or if the next tag changed in the meantime,
312354
// we need to break from the loop and continue with the main loop.
313-
if (!inner_result || t_next != event_queue_.begin()->first) {
355+
if (!inner_result || t_next != event_queue_.next_tag()) {
314356
result = false;
315357
break;
316358
}
@@ -320,12 +362,11 @@ void Scheduler::next() { // NOLINT
320362
continue;
321363
}
322364

323-
// Retrieve all events with tag equal to current logical time from the
324-
// queue.
365+
// Retrieve all triggered actions at the next tag.
325366
// We do not need to lock mutex_event_queue_ here, as the lock on
326367
// scheduling_mutex_ already ensures that no one can write to the event
327368
// queue.
328-
triggered_actions_ = std::move(event_queue_.extract(event_queue_.begin()).mapped());
369+
triggered_actions_ = std::move(event_queue_.extract_next_event());
329370

330371
// advance logical time
331372
log_.debug() << "advance logical time to tag " << t_next;
@@ -359,47 +400,17 @@ Scheduler::Scheduler(Environment* env)
359400
: using_workers_(env->num_workers() > 1)
360401
, environment_(env)
361402
, log_("Scheduler " + env->name())
362-
, ready_queue_(log_, env->num_workers()) {
363-
fill_action_list_pool();
364-
}
403+
, ready_queue_(log_, env->num_workers()) {}
365404

366405
Scheduler::~Scheduler() = default;
367406

368-
void Scheduler::fill_action_list_pool() {
369-
for (std::size_t i{0}; i < action_list_pool_increment_; i++) {
370-
action_list_pool_.emplace_back(std::make_unique<ActionList>());
371-
}
372-
}
373-
374-
auto Scheduler::insert_event_at(const Tag& tag) -> const ActionListPtr& {
375-
auto shared_lock = std::shared_lock<std::shared_mutex>(mutex_event_queue_);
376-
377-
auto event_it = event_queue_.find(tag);
378-
if (event_it == event_queue_.end()) {
379-
shared_lock.unlock();
380-
{
381-
auto unique_lock = std::unique_lock<std::shared_mutex>(mutex_event_queue_);
382-
if (action_list_pool_.empty()) {
383-
fill_action_list_pool();
384-
}
385-
const auto& result = event_queue_.try_emplace(tag, std::move(action_list_pool_.back()));
386-
if (result.second) {
387-
action_list_pool_.pop_back();
388-
}
389-
return result.first->second;
390-
}
391-
} else {
392-
return event_it->second;
393-
}
394-
}
395-
396407
void Scheduler::schedule_sync(BaseAction* action, const Tag& tag) {
397408
log_.debug() << "Schedule action " << action->fqn() << (action->is_logical() ? " synchronously " : " asynchronously ")
398409
<< " with tag " << tag;
399410
reactor_assert(logical_time_ < tag);
400411
tracepoint(reactor_cpp, schedule_action, action->container()->fqn(), action->name(), tag);
401412

402-
const auto& action_list = insert_event_at(tag);
413+
const auto& action_list = event_queue_.insert_event_at(tag);
403414
action_list->push_back(action);
404415
}
405416

@@ -436,7 +447,7 @@ auto Scheduler::schedule_empty_async_at(const Tag& tag) -> bool {
436447
// processed.
437448
return tag == logical_time_;
438449
}
439-
insert_event_at(tag);
450+
event_queue_.insert_event_at(tag);
440451
}
441452
notify();
442453
return true;

0 commit comments

Comments
 (0)