Skip to content

Commit 45614c8

Browse files
authored
Merge pull request #38 from lf-lang/network-interfaces
Native runtime implementation of delayed and physical connections
2 parents fdd0376 + 7841b40 commit 45614c8

File tree

15 files changed

+293
-111
lines changed

15 files changed

+293
-111
lines changed

include/reactor-cpp/action.hh

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,10 @@ protected:
4040

4141
public:
4242
[[nodiscard]] auto inline triggers() const noexcept -> const auto& { return triggers_; }
43-
4443
[[nodiscard]] auto inline schedulers() const noexcept -> const auto& { return schedulers_; }
45-
4644
[[nodiscard]] auto inline is_logical() const noexcept -> bool { return logical_; }
47-
4845
[[nodiscard]] auto inline is_physical() const noexcept -> bool { return !logical_; }
49-
5046
[[nodiscard]] auto inline min_delay() const noexcept -> Duration { return min_delay_; }
51-
5247
[[nodiscard]] auto inline is_present() const noexcept -> bool { return present_; }
5348

5449
friend class Reaction;
@@ -62,14 +57,29 @@ private:
6257
std::map<Tag, ImmutableValuePtr<T>> events_;
6358
std::mutex mutex_events_;
6459

65-
void setup() noexcept final;
60+
protected:
61+
void setup() noexcept override;
6662
void cleanup() noexcept final;
6763

68-
protected:
6964
Action(const std::string& name, Reactor* container, bool logical, Duration min_delay)
7065
: BaseAction(name, container, logical, min_delay) {}
7166

7267
public:
68+
// Normally, we should lock the mutex while moving to make this
69+
// fully thread safe. However, we rely assembly happening before
70+
// execution and hence can ignore the mutex.
71+
Action(Action&& action) noexcept
72+
: BaseAction(std::move(action)) {}
73+
auto operator=(Action&& action) noexcept -> Action& {
74+
BaseAction::operator=(std::move(action));
75+
return *this;
76+
}
77+
78+
Action(const Action& action) = delete;
79+
auto operator=(const Action& action) -> Action& = delete;
80+
81+
~Action() override = default;
82+
7383
void startup() final {}
7484
void shutdown() final {}
7585

include/reactor-cpp/connection.hh

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright (C) 2023 TU Dresden
3+
* All rights reserved.
4+
*
5+
* Authors:
6+
* Tassilo Tanneberger
7+
* Christian Menard
8+
*/
9+
10+
#ifndef REACTOR_CPP_CONNECTION_HH
11+
#define REACTOR_CPP_CONNECTION_HH
12+
13+
#include "action.hh"
14+
#include "assert.hh"
15+
#include "environment.hh"
16+
#include "port.hh"
17+
#include "reaction.hh"
18+
#include "reactor.hh"
19+
#include "time.hh"
20+
21+
namespace reactor {
22+
23+
template <class T> class Connection : public Action<T> {
24+
private:
25+
Port<T>* upstream_port_{nullptr};
26+
std::set<Port<T>*> downstream_ports_{};
27+
28+
protected:
29+
Connection(const std::string& name, Reactor* container, bool is_logical, Duration min_delay)
30+
: Action<T>(name, container, is_logical, min_delay) {}
31+
32+
[[nodiscard]] auto downstream_ports() -> auto& { return downstream_ports_; }
33+
[[nodiscard]] auto downstream_ports() const -> const auto& { return downstream_ports_; }
34+
35+
virtual auto upstream_set_callback() noexcept -> PortCallback = 0;
36+
37+
public:
38+
void bind_upstream_port(Port<T>* port) {
39+
reactor_assert(upstream_port_ == nullptr);
40+
upstream_port_ = port;
41+
port->register_set_callback(upstream_set_callback());
42+
}
43+
44+
void bind_downstream_port(Port<T>* port) { reactor_assert(this->downstream_ports_.insert(port).second); };
45+
};
46+
47+
template <class T> class BaseDelayedConnection : public Connection<T> {
48+
public:
49+
BaseDelayedConnection(const std::string& name, Reactor* container, bool is_physical, Duration delay)
50+
: Connection<T>(name, container, is_physical, delay) {}
51+
52+
inline auto upstream_set_callback() noexcept -> PortCallback override {
53+
return [this](const BasePort& port) {
54+
// We know that port must be of type Port<T>*
55+
auto& typed_port = reinterpret_cast<const Port<T>&>(port); // NOLINT
56+
if constexpr (std::is_same<T, void>::value) {
57+
this->schedule();
58+
} else {
59+
this->schedule(std::move(typed_port.get()));
60+
}
61+
};
62+
}
63+
64+
void setup() noexcept override {
65+
Action<T>::setup();
66+
67+
if constexpr (std::is_same<T, void>::value) {
68+
for (auto port : this->downstream_ports()) {
69+
port->set();
70+
}
71+
} else {
72+
for (auto port : this->downstream_ports()) {
73+
port->set(std::move(this->get()));
74+
}
75+
}
76+
}
77+
};
78+
79+
template <class T> class DelayedConnection : public BaseDelayedConnection<T> {
80+
public:
81+
DelayedConnection(const std::string& name, Reactor* container, Duration delay)
82+
: BaseDelayedConnection<T>(name, container, true, delay) {}
83+
};
84+
85+
template <class T> class PhysicalConnection : public BaseDelayedConnection<T> {
86+
public:
87+
PhysicalConnection(const std::string& name, Reactor* container, Duration delay)
88+
: BaseDelayedConnection<T>(name, container, false, delay) {}
89+
};
90+
91+
} // namespace reactor
92+
93+
#endif // REACTOR_CPP_CONNECTION_HH

include/reactor-cpp/fwd.hh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
#ifndef REACTOR_CPP_FWD_HH
1010
#define REACTOR_CPP_FWD_HH
1111

12+
#include <functional>
13+
1214
namespace reactor {
1315

1416
class BaseAction;
@@ -22,6 +24,8 @@ class Tag;
2224
template <class T> class Action;
2325
template <class T> class Port;
2426

27+
using PortCallback = std::function<void(const BasePort&)>;
28+
2529
} // namespace reactor
2630

2731
#endif // REACTOR_CPP_FWD_HH

include/reactor-cpp/impl/port_impl.hh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ template <class T> void Port<T>::set(const ImmutableValuePtr<T>& value_ptr) {
3232
reactor::validate(!has_inward_binding(), "set() may only be called on ports that do not have an inward "
3333
"binding!");
3434
reactor::validate(value_ptr != nullptr, "Ports may not be set to nullptr!");
35+
3536
auto scheduler = environment()->scheduler();
3637
this->value_ptr_ = std::move(value_ptr);
3738
scheduler->set_port(this);

include/reactor-cpp/multiport.hh

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -11,39 +11,49 @@
1111

1212
#include <algorithm>
1313
#include <atomic>
14+
#include <cstddef>
1415
#include <iostream>
1516
#include <type_traits>
1617
#include <vector>
1718

19+
#include "assert.hh"
20+
#include "fwd.hh"
21+
1822
namespace reactor {
1923

2024
class BaseMultiport { // NOLINT cppcoreguidelines-special-member-functions,-warnings-as-errors
21-
protected:
22-
std::atomic<std::size_t> size_{0}; // NOLINT
23-
std::vector<std::size_t> present_ports_{}; // NOLINT
25+
private:
26+
std::atomic<std::size_t> size_{0};
27+
std::vector<std::size_t> present_ports_{};
2428

25-
public:
26-
BaseMultiport() = default;
27-
~BaseMultiport() = default;
29+
// record that the port with the given index has been set
30+
inline void set_present(std::size_t index);
2831

29-
// tells the parent multiport that this port has been set.
30-
[[nodiscard]] inline auto set_present(std::size_t index) -> bool {
31-
auto calculated_index = size_.fetch_add(1, std::memory_order_relaxed);
32+
// reset the list of set port indexes
33+
inline void reset() noexcept { size_.store(0, std::memory_order_relaxed); }
3234

33-
reactor_assert(calculated_index < present_ports_.size());
35+
[[nodiscard]] auto get_set_callback(std::size_t index) noexcept -> PortCallback;
36+
const PortCallback clean_callback_{[this]([[maybe_unused]] const BasePort& port) { this->reset(); }};
3437

35-
present_ports_[calculated_index] = index;
36-
return true;
37-
}
38+
[[nodiscard]] auto get_clean_callback() const noexcept -> const PortCallback& { return clean_callback_; }
3839

39-
// resets parent multiport
40-
inline void clear() noexcept { size_.store(0, std::memory_order_relaxed); }
40+
protected:
41+
[[nodiscard]] inline auto present_ports() const -> const auto& { return present_ports_; }
42+
[[nodiscard]] inline auto present_ports_size() const -> auto { return size_.load(); }
43+
44+
inline void present_ports_reserve(size_t n) { present_ports_.reserve(n); }
45+
46+
void register_port(BasePort& port, size_t idx);
47+
48+
public:
49+
BaseMultiport() = default;
50+
~BaseMultiport() = default;
4151
};
4252

4353
template <class T, class A = std::allocator<T>>
4454
class Multiport : public BaseMultiport { // NOLINT cppcoreguidelines-special-member-functions
4555
protected:
46-
std::vector<T> data_{}; // NOLINT cppcoreguidelines-non-private-member-variables-in-classes
56+
std::vector<T> ports_{}; // NOLINT cppcoreguidelines-non-private-member-variables-in-classes
4757

4858
public:
4959
using value_type = typename A::value_type;
@@ -56,29 +66,30 @@ public:
5666
~Multiport() noexcept = default;
5767

5868
auto operator==(const Multiport& other) const noexcept -> bool {
59-
return std::equal(std::begin(data_), std::end(data_), std::begin(other.data_), std::end(other.data_));
69+
return std::equal(std::begin(ports_), std::end(ports_), std::begin(other.ports_), std::end(other.ports_));
6070
}
6171
auto operator!=(const Multiport& other) const noexcept -> bool { return !(*this == other); };
62-
inline auto operator[](std::size_t index) noexcept -> T& { return data_[index]; }
63-
inline auto operator[](std::size_t index) const noexcept -> const T& { return data_[index]; }
72+
inline auto operator[](std::size_t index) noexcept -> T& { return ports_[index]; }
73+
inline auto operator[](std::size_t index) const noexcept -> const T& { return ports_[index]; }
6474

65-
inline auto begin() noexcept -> iterator { return data_.begin(); };
66-
inline auto begin() const noexcept -> const_iterator { return data_.begin(); };
67-
inline auto cbegin() const noexcept -> const_iterator { return data_.cbegin(); };
68-
inline auto end() noexcept -> iterator { return data_.end(); };
69-
inline auto end() const noexcept -> const_iterator { return data_.end(); };
70-
inline auto cend() const noexcept -> const_iterator { return data_.cend(); };
75+
inline auto begin() noexcept -> iterator { return ports_.begin(); };
76+
inline auto begin() const noexcept -> const_iterator { return ports_.begin(); };
77+
inline auto cbegin() const noexcept -> const_iterator { return ports_.cbegin(); };
78+
inline auto end() noexcept -> iterator { return ports_.end(); };
79+
inline auto end() const noexcept -> const_iterator { return ports_.end(); };
80+
inline auto cend() const noexcept -> const_iterator { return ports_.cend(); };
7181

72-
inline auto size() const noexcept -> size_type { return data_.size(); };
73-
[[nodiscard]] inline auto empty() const noexcept -> bool { return data_.empty(); };
82+
inline auto size() const noexcept -> size_type { return ports_.size(); };
83+
[[nodiscard]] inline auto empty() const noexcept -> bool { return ports_.empty(); };
7484

7585
[[nodiscard]] inline auto present_indices_unsorted() const noexcept -> std::vector<std::size_t> {
76-
return std::vector<std::size_t>(std::begin(present_ports_), std::begin(present_ports_) + size_.load());
86+
return std::vector<std::size_t>(std::begin(present_ports()), std::begin(present_ports()) + present_ports_size());
7787
}
7888

7989
[[nodiscard]] inline auto present_indices_sorted() const noexcept -> std::vector<std::size_t> {
80-
std::sort(std::begin(present_ports_), std::begin(present_ports_) + size_.load());
81-
return std::vector<std::size_t>(std::begin(present_ports_), std::begin(present_ports_) + size_.load());
90+
std::vector<std::size_t> indices(std::begin(present_ports()), std::begin(present_ports()) + present_ports_size());
91+
std::sort(std::begin(indices), std::end(indices));
92+
return indices;
8293
}
8394
};
8495

@@ -89,18 +100,18 @@ public:
89100
~ModifableMultiport() = default;
90101

91102
inline void reserve(std::size_t size) noexcept {
92-
this->data_.reserve(size);
93-
this->present_ports_.reserve(size);
103+
this->ports_.reserve(size);
104+
this->present_ports_reserve(size);
94105
}
95106

96107
inline void push_back(const T& elem) noexcept {
97-
this->data_.push_back(elem);
98-
this->present_ports_.emplace_back(0);
108+
this->ports_.push_back(elem);
109+
this->register_port(this->ports_.back(), this->ports_.size() - 1);
99110
}
100111

101112
template <class... Args> inline void emplace_back(Args&&... args) noexcept {
102-
this->data_.emplace_back(args...);
103-
this->present_ports_.emplace_back(0);
113+
this->ports_.emplace_back(args...);
114+
this->register_port(this->ports_.back(), this->ports_.size() - 1);
104115
}
105116
};
106117
} // namespace reactor

0 commit comments

Comments
 (0)