1313#include " action.hh"
1414#include " assert.hh"
1515#include " environment.hh"
16+ #include " fwd.hh"
17+ #include " logical_time.hh"
1618#include " port.hh"
1719#include " reaction.hh"
1820#include " reactor.hh"
1921#include " time.hh"
22+ #include " time_barrier.hh"
2023
2124namespace reactor {
2225
@@ -28,26 +31,32 @@ private:
2831protected:
2932 Connection (const std::string& name, Reactor* container, bool is_logical, Duration min_delay)
3033 : Action<T>(name, container, is_logical, min_delay) {}
34+ Connection (const std::string& name, Environment* environment, bool is_logical, Duration min_delay)
35+ : Action<T>(name, environment, is_logical, min_delay) {}
3136
3237 [[nodiscard]] auto downstream_ports () -> auto& { return downstream_ports_; }
3338 [[nodiscard]] auto downstream_ports () const -> const auto& { return downstream_ports_; }
39+ [[nodiscard]] auto upstream_port () -> auto* { return upstream_port_; }
40+ [[nodiscard]] auto upstream_port () const -> const auto* { return upstream_port_; }
3441
3542 virtual auto upstream_set_callback () noexcept -> PortCallback = 0;
3643
3744public:
38- void bind_upstream_port (Port<T>* port) {
45+ virtual void bind_upstream_port (Port<T>* port) {
3946 reactor_assert (upstream_port_ == nullptr );
4047 upstream_port_ = port;
4148 port->register_set_callback (upstream_set_callback ());
4249 }
4350
44- void bind_downstream_port (Port<T>* port) { reactor_assert (this ->downstream_ports_ .insert (port).second ); };
51+ virtual void bind_downstream_port (Port<T>* port) { reactor_assert (this ->downstream_ports_ .insert (port).second ); };
4552};
4653
4754template <class T > class BaseDelayedConnection : public Connection <T> {
48- public :
55+ protected :
4956 BaseDelayedConnection (const std::string& name, Reactor* container, bool is_logical, Duration delay)
5057 : Connection<T>(name, container, is_logical, delay) {}
58+ BaseDelayedConnection (const std::string& name, Environment* environment, bool is_logical, Duration delay)
59+ : Connection<T>(name, environment, is_logical, delay) {}
5160
5261 inline auto upstream_set_callback () noexcept -> PortCallback override {
5362 return [this ](const BasePort& port) {
@@ -61,6 +70,7 @@ public:
6170 };
6271 }
6372
73+ public:
6474 void setup () noexcept override {
6575 Action<T>::setup ();
6676
@@ -88,6 +98,144 @@ public:
8898 : BaseDelayedConnection<T>(name, container, false , delay) {}
8999};
90100
101+ template <class T > class EnclaveConnection : public BaseDelayedConnection <T> {
102+ private:
103+ LogicalTimeBarrier logical_time_barrier_;
104+
105+ protected:
106+ log::NamedLogger log_; // NOLINT
107+
108+ EnclaveConnection (const std::string& name, Environment* enclave, const Duration& delay)
109+ : BaseDelayedConnection<T>(name, enclave, false , delay)
110+ , log_{this ->fqn ()} {}
111+
112+ public:
113+ EnclaveConnection (const std::string& name, Environment* enclave)
114+ : BaseDelayedConnection<T>(name, enclave, false , Duration::zero())
115+ , log_{this ->fqn ()} {}
116+
117+ inline auto upstream_set_callback () noexcept -> PortCallback override {
118+ return [this ](const BasePort& port) {
119+ // We know that port must be of type Port<T>
120+ auto & typed_port = reinterpret_cast <const Port<T>&>(port); // NOLINT
121+ const auto * scheduler = port.environment ()->scheduler ();
122+ // This callback will be called from a reaction executing in the context
123+ // of the upstream port. Hence, we can retrieve the current tag directly
124+ // without locking.
125+ auto tag = Tag::from_logical_time (scheduler->logical_time ());
126+ bool result{false };
127+ if constexpr (std::is_same<T, void >::value) {
128+ result = this ->schedule_at (tag);
129+ } else {
130+ result = this ->schedule_at (std::move (typed_port.get ()), tag);
131+ }
132+ reactor_assert (result);
133+ };
134+ }
135+
136+ inline auto acquire_tag (const Tag& tag, std::unique_lock<std::mutex>& lock, std::condition_variable& cv,
137+ const std::function<bool (void )>& abort_waiting) -> bool override {
138+ log_.debug () << " downstream tries to acquire tag " << tag;
139+
140+ if (this ->upstream_port () == nullptr ) {
141+ return true ;
142+ }
143+
144+ if (logical_time_barrier_.try_acquire_tag (tag)) {
145+ return true ;
146+ }
147+
148+ // Insert an empty event into the upstream event queue. This ensures that we
149+ // will get notified and woken up as soon as the tag becomes safe to process.
150+ // It is important to unlock the mutex here. Otherwise we could enter a deadlock as
151+ // scheduling the upstream event also requires holding the upstream mutex.
152+ lock.unlock ();
153+ bool result = this ->upstream_port ()->environment ()->scheduler ()->schedule_empty_async_at (tag);
154+ lock.lock ();
155+
156+ // If inserting the empty event was not successful, then this is because the upstream
157+ // scheduler already processes a later event. In this case, it is safe to assume that
158+ // the tag is acquired.
159+ if (!result) {
160+ return true ;
161+ }
162+
163+ // Wait until we receive a release_tag message from upstream
164+ return logical_time_barrier_.acquire_tag (tag, lock, cv, abort_waiting);
165+ }
166+
167+ void bind_upstream_port (Port<T>* port) override {
168+ Connection<T>::bind_upstream_port (port);
169+ port->environment ()->scheduler ()->register_release_tag_callback ([this ](const LogicalTime& tag) {
170+ logical_time_barrier_.release_tag (tag);
171+ log_.debug () << " upstream released tag " << tag;
172+ this ->environment ()->scheduler ()->notify ();
173+ });
174+ }
175+ };
176+
177+ template <class T > class DelayedEnclaveConnection : public EnclaveConnection <T> {
178+ public:
179+ DelayedEnclaveConnection (const std::string& name, Environment* enclave, Duration delay)
180+ : EnclaveConnection<T>(name, enclave, delay) {}
181+
182+ inline auto upstream_set_callback () noexcept -> PortCallback override {
183+ return [this ](const BasePort& port) {
184+ // We know that port must be of type Port<T>
185+ auto & typed_port = reinterpret_cast <const Port<T>&>(port); // NOLINT
186+ const auto * scheduler = port.environment ()->scheduler ();
187+ // This callback will be called from a reaction executing in the context
188+ // of the upstream port. Hence, we can retrieve the current tag directly
189+ // without locking.
190+ auto tag = Tag::from_logical_time (scheduler->logical_time ()).delay (this ->min_delay ());
191+ bool result{false };
192+ if constexpr (std::is_same<T, void >::value) {
193+ result = this ->schedule_at (tag);
194+ } else {
195+ result = this ->schedule_at (std::move (typed_port.get ()), tag);
196+ }
197+ reactor_assert (result);
198+ };
199+ }
200+
201+ inline auto acquire_tag (const Tag& tag, std::unique_lock<std::mutex>& lock, std::condition_variable& cv,
202+ const std::function<bool (void )>& abort_waiting) -> bool override {
203+ // Since this is a delayed connection, we can go back in time and need to
204+ // acquire the latest upstream tag that can create an event at the given
205+ // tag. We also need to consider that given a delay d and a tag g=(t, n),
206+ // for any value of n, g + d = (t, 0). Hence, we need to quire a tag with
207+ // the highest possible microstep value.
208+ auto upstream_tag = tag.subtract (this ->min_delay ());
209+ return EnclaveConnection<T>::acquire_tag (upstream_tag, lock, cv, abort_waiting);
210+ }
211+ };
212+
213+ template <class T > class PhysicalEnclaveConnection : public EnclaveConnection <T> {
214+ public:
215+ PhysicalEnclaveConnection (const std::string& name, Environment* enclave)
216+ : EnclaveConnection<T>(name, enclave) {}
217+
218+ inline auto upstream_set_callback () noexcept -> PortCallback override {
219+ return [this ](const BasePort& port) {
220+ // We know that port must be of type Port<T>
221+ auto & typed_port = reinterpret_cast <const Port<T>&>(port); // NOLINT
222+ if constexpr (std::is_same<T, void >::value) {
223+ this ->schedule ();
224+ } else {
225+ this ->schedule (std::move (typed_port.get ()));
226+ }
227+ };
228+ }
229+
230+ inline auto acquire_tag (const Tag& tag, std::unique_lock<std::mutex>& lock, std::condition_variable& cv,
231+ const std::function<bool (void )>& abort_waiting) -> bool override {
232+ this ->log_ .debug () << " downstream tries to acquire tag " << tag;
233+ return PhysicalTimeBarrier::acquire_tag (tag, lock, cv, abort_waiting);
234+ }
235+
236+ void bind_upstream_port (Port<T>* port) override { Connection<T>::bind_upstream_port (port); }
237+ };
238+
91239} // namespace reactor
92240
93241#endif // REACTOR_CPP_CONNECTION_HH
0 commit comments