Skip to content

Commit af37170

Browse files
committed
applying the comments
1 parent 4d8423d commit af37170

File tree

10 files changed

+66
-59
lines changed

10 files changed

+66
-59
lines changed

include/reactor-cpp/connection.hh

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,18 +124,21 @@ public:
124124
, log_{this->fqn()} {};
125125

126126
inline auto upstream_set_callback() noexcept -> PortCallback override {
127-
return [&](const BasePort& port) { // NOLINT unused this
127+
return [this](const BasePort& port) {
128+
// We know that port must be of type Port<T>
129+
auto& typed_port = reinterpret_cast<const Port<T>&>(port); // NOLINT
130+
const auto* scheduler = port.environment()->scheduler();
128131
// This callback will be called from a reaction executing in the context
129132
// of the upstream port. Hence, we can retrieve the current tag directly
130133
// without locking.
134+
auto tag = Tag::from_logical_time(scheduler->logical_time());
135+
bool result{false};
131136
if constexpr (std::is_same<T, void>::value) {
132-
this->schedule_at(Tag::from_logical_time(port.environment()->scheduler()->logical_time()));
137+
result = this->schedule_at(tag);
133138
} else {
134-
// We know that port must be of type Port<T>
135-
auto& typed_port = reinterpret_cast<const Port<T>&>(port); // NOLINT
136-
this->schedule_at(std::move(typed_port.get()),
137-
Tag::from_logical_time(port.environment()->scheduler()->logical_time()));
139+
result = this->schedule_at(std::move(typed_port.get()), tag);
138140
}
141+
reactor_assert(result);
139142
};
140143
}
141144

include/reactor-cpp/connection_properties.hh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ struct ConnectionProperties {
1919
ConnectionType type_ = ConnectionType::Normal;
2020
Duration delay_{0};
2121
Environment* enclave_{nullptr};
22+
23+
auto operator<(const ConnectionProperties& elem2) const noexcept -> bool {
24+
return this->type_ < elem2.type_ && this->delay_ < elem2.delay_;
25+
}
2226
};
2327

2428
} // namespace reactor

include/reactor-cpp/environment.hh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ private:
4444
/// Set of actions that act as an input to the reactor program in this environment
4545
std::set<BaseAction*> input_actions_{};
4646
std::set<Reaction*> reactions_{};
47-
std::set<BasePort*> ports_{};
4847
std::vector<Dependency> dependencies_{};
4948

5049
/// The environment containing this environment. nullptr if this is the top environment

include/reactor-cpp/graph.hh

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@ private:
2424
// this is required for the Graph::get_edges() method
2525
using map_key = std::pair<E, P>;
2626
struct map_key_compare {
27-
// TODO: check this maybe will cause some very funny TM problems later
2827
auto operator()(const map_key& left_site, const map_key& right_site) const -> bool {
29-
return left_site.first < right_site.first;
28+
return left_site.first < right_site.first && left_site.second < right_site.second;
3029
}
3130
};
3231

include/reactor-cpp/impl/port_impl.hh

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
#define REACTOR_CPP_IMPL_PORT_IMPL_HH
1111

1212
#include "reactor-cpp/assert.hh"
13-
#include "reactor-cpp/connection.hh"
1413
#include "reactor-cpp/environment.hh"
1514
#include "reactor-cpp/port.hh"
15+
#include "reactor-cpp/reactor.hh"
1616

1717
namespace reactor {
1818

@@ -56,8 +56,9 @@ template <class T> auto Port<T>::get() const noexcept -> const ImmutableValuePtr
5656
return value_ptr_;
5757
}
5858
template <class T>
59-
void Port<T>::pull_connection(const ConnectionProperties& properties, const std::vector<BasePort*>& downstream) {
60-
Connection<T>* connection = nullptr;
59+
void Port<T>::instantiate_connection_to(const ConnectionProperties& properties,
60+
const std::vector<BasePort*>& downstream) {
61+
std::unique_ptr<Connection<T>> connection = nullptr;
6162
if (downstream.empty()) {
6263
return;
6364
}
@@ -69,27 +70,31 @@ void Port<T>::pull_connection(const ConnectionProperties& properties, const std:
6970
auto index = this->container()->number_of_connections();
7071

7172
if (properties.type_ == ConnectionType::Delayed) {
72-
connection = new DelayedConnection<T>(this->name() + "_delayed_connection_" + std::to_string(index), // NOLINT
73-
this->container(), // NOLINT
74-
properties.delay_); // NOLINT
73+
connection =
74+
std::make_unique<DelayedConnection<T>>(this->name() + "_delayed_connection_" + std::to_string(index), // NOLINT
75+
this->container(), // NOLINT
76+
properties.delay_); // NOLINT
7577
}
7678
if (properties.type_ == ConnectionType::Physical) {
77-
connection = new PhysicalConnection<T>(this->name() + "_physical_connection_" + std::to_string(index), // NOLINT
78-
this->container(), // NOLINT
79-
properties.delay_); // NOLINT
79+
connection = std::make_unique<PhysicalConnection<T>>(this->name() + "_physical_connection_" +
80+
std::to_string(index), // NOLINT
81+
this->container(), // NOLINT
82+
properties.delay_); // NOLINT
8083
}
8184
if (properties.type_ == ConnectionType::Enclaved) {
82-
connection = // NOLINT
83-
new EnclaveConnection<T>(this->name() + "_enclave_connection_" + std::to_string(index), enclave); // NOLINT
85+
connection = // NOLINT
86+
std::make_unique<EnclaveConnection<T>>(this->name() + "_enclave_connection_" + std::to_string(index),
87+
enclave); // NOLINT
8488
}
8589
if (properties.type_ == ConnectionType::DelayedEnclaved) {
86-
connection = // NOLINT
87-
new DelayedEnclaveConnection<T>(this->name() + "_delayed_enclave_connection_" + std::to_string(index), // NOLINT
88-
enclave, // NOLINT
89-
properties.delay_); // NOLINT
90+
connection = // NOLINT
91+
std::make_unique<DelayedEnclaveConnection<T>>(this->name() + "_delayed_enclave_connection_" +
92+
std::to_string(index), // NOLINT
93+
enclave, // NOLINT
94+
properties.delay_); // NOLINT
9095
}
9196
if (properties.type_ == ConnectionType::PhysicalEnclaved) {
92-
connection = new PhysicalEnclaveConnection<T>( // NOLINT
97+
connection = std::make_unique<PhysicalEnclaveConnection<T>>( // NOLINT
9398
this->name() + "_physical_enclave_connection_" + std::to_string(index), enclave); // NOLINT
9499
}
95100

@@ -98,7 +103,7 @@ void Port<T>::pull_connection(const ConnectionProperties& properties, const std:
98103
connection->bind_downstream_ports(downstream);
99104
connection->bind_upstream_port(this);
100105
this->register_set_callback(connection->upstream_set_callback());
101-
this->container()->register_connection(connection);
106+
this->container()->register_connection(std::move(connection));
102107
}
103108

104109
} // namespace reactor

include/reactor-cpp/port.hh

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ public:
7676
outward_bindings_.insert(port); // NOLINT
7777
}
7878

79-
virtual void pull_connection(const ConnectionProperties& properties, const std::vector<BasePort*>& downstreams) = 0;
79+
virtual void instantiate_connection_to(const ConnectionProperties& properties,
80+
const std::vector<BasePort*>& downstreams) = 0;
8081

8182
[[nodiscard]] inline auto is_input() const noexcept -> bool { return type_ == PortType::Input; }
8283
[[nodiscard]] inline auto is_output() const noexcept -> bool { return type_ == PortType::Output; }
@@ -123,7 +124,8 @@ public:
123124
Port(const std::string& name, PortType type, Reactor* container)
124125
: BasePort(name, type, container) {}
125126

126-
void pull_connection(const ConnectionProperties& properties, const std::vector<BasePort*>& downstream) override;
127+
void instantiate_connection_to(const ConnectionProperties& properties,
128+
const std::vector<BasePort*>& downstream) override;
127129
[[nodiscard]] auto typed_inward_binding() const noexcept -> Port<T>*;
128130
[[nodiscard]] auto typed_outward_bindings() const noexcept -> const std::set<Port<T>*>&;
129131

@@ -152,7 +154,8 @@ public:
152154
Port(const std::string& name, PortType type, Reactor* container)
153155
: BasePort(name, type, container) {}
154156

155-
void pull_connection(const ConnectionProperties& properties, const std::vector<BasePort*>& downstream) override;
157+
void instantiate_connection_to(const ConnectionProperties& properties,
158+
const std::vector<BasePort*>& downstream) override;
156159
[[nodiscard]] auto typed_inward_binding() const noexcept -> Port<void>*;
157160
[[nodiscard]] auto typed_outward_bindings() const noexcept -> const std::set<Port<void>*>&;
158161

include/reactor-cpp/reactor.hh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public:
3939
Reactor(const std::string& name, Environment* environment);
4040
~Reactor() override = default;
4141

42-
void register_connection(BaseAction* connection);
42+
void register_connection(std::unique_ptr<BaseAction>&& connection);
4343
[[nodiscard]] auto inline actions() const noexcept -> const auto& { return actions_; }
4444
[[nodiscard]] auto inline inputs() const noexcept -> const auto& { return inputs_; }
4545
[[nodiscard]] auto inline outputs() const noexcept -> const auto& { return outputs_; }

lib/environment.cc

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
namespace reactor {
2626

27-
Environment::Environment(unsigned int num_workers, bool fast_fwd_execution, const Duration& timeout) // NOLINT
27+
Environment::Environment(unsigned int num_workers, bool fast_fwd_execution, const Duration& timeout)
2828
: log_("Environment")
2929
, num_workers_(num_workers)
3030
, fast_fwd_execution_(fast_fwd_execution)
@@ -74,15 +74,6 @@ void recursive_assemble(Reactor* container) { // NOLINT
7474
}
7575
}
7676

77-
void Environment::register_port(BasePort* port) noexcept {
78-
if (top_environment_ == nullptr || top_environment_ == this) {
79-
ports_.insert(port);
80-
return;
81-
}
82-
83-
return top_environment_->register_port(port);
84-
}
85-
8677
void Environment::assemble() { // NOLINT
8778
phase_ = Phase::Assembly;
8879

@@ -132,21 +123,21 @@ void Environment::assemble() { // NOLINT
132123
}
133124
}
134125
for (auto& [env, sinks_same_env] : collector) {
135-
source_port->pull_connection(properties, sinks_same_env);
126+
source_port->instantiate_connection_to(properties, sinks_same_env);
136127

137128
log::Debug() << "from: " << source_port->container()->fqn() << " |-> to: " << sinks_same_env.size()
138129
<< " objects";
139130
}
140131
} else {
141-
source_port->pull_connection(properties, sinks);
132+
source_port->instantiate_connection_to(properties, sinks);
142133

143134
log::Debug() << "from: " << source_port->container()->fqn() << " |-> to: " << sinks.size() << " objects";
144135
}
145136
}
146137
}
147138
}
148139

149-
log::Debug() << "Building the Graph";
140+
log::Debug() << "Building the Dependency-Graph";
150141
for (auto* reactor : top_level_reactors_) {
151142
build_dependency_graph(reactor);
152143
}
@@ -222,7 +213,7 @@ void Environment::sync_shutdown() {
222213
}
223214

224215
void Environment::async_shutdown() {
225-
scheduler_.lock();
216+
[[maybe_unused]] auto lock_guard = scheduler_.lock();
226217
sync_shutdown();
227218
}
228219

lib/port.cc

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,9 @@ void Port<void>::set() {
7777
this->present_ = true;
7878
}
7979

80-
void Port<void>::pull_connection(const ConnectionProperties& properties, const std::vector<BasePort*>& downstream) {
81-
Connection<void>* connection = nullptr;
80+
void Port<void>::instantiate_connection_to(const ConnectionProperties& properties,
81+
const std::vector<BasePort*>& downstream) {
82+
std::unique_ptr<Connection<void>> connection = nullptr;
8283

8384
if (downstream.empty()) {
8485
return;
@@ -91,26 +92,28 @@ void Port<void>::pull_connection(const ConnectionProperties& properties, const s
9192
auto index = this->container()->number_of_connections();
9293

9394
if (properties.type_ == ConnectionType::Delayed) {
94-
connection = new DelayedConnection<void>(this->name() + "_delayed_connection_" + std::to_string(index),
95-
this->container(), // NOLINT
96-
properties.delay_); // NOLINT
95+
connection =
96+
std::make_unique<DelayedConnection<void>>(this->name() + "_delayed_connection_" + std::to_string(index),
97+
this->container(), // NOLINT
98+
properties.delay_); // NOLINT
9799
}
98100
if (properties.type_ == ConnectionType::Physical) {
99-
connection = new PhysicalConnection<void>(this->name() + "_physical_connection_" + std::to_string(index),
100-
this->container(), // NOLINT
101-
properties.delay_); // NOLINT
101+
connection =
102+
std::make_unique<PhysicalConnection<void>>(this->name() + "_physical_connection_" + std::to_string(index),
103+
this->container(), // NOLINT
104+
properties.delay_); // NOLINT
102105
}
103106
if (properties.type_ == ConnectionType::Enclaved) {
104-
connection =
105-
new EnclaveConnection<void>(this->name() + "_enclave_connection_" + std::to_string(index), enclave); // NOLINT
107+
connection = std::make_unique<EnclaveConnection<void>>(
108+
this->name() + "_enclave_connection_" + std::to_string(index), enclave); // NOLINT
106109
}
107110
if (properties.type_ == ConnectionType::DelayedEnclaved) {
108-
connection = new DelayedEnclaveConnection<void>(
111+
connection = std::make_unique<DelayedEnclaveConnection<void>>(
109112
this->name() + "_delayed_enclave_connection_" + std::to_string(index), enclave, // NOLINT
110113
properties.delay_); // NOLINT
111114
}
112115
if (properties.type_ == ConnectionType::PhysicalEnclaved) {
113-
connection = new PhysicalEnclaveConnection<void>(
116+
connection = std::make_unique<PhysicalEnclaveConnection<void>>(
114117
this->name() + "_physical_enclave_connection_" + std::to_string(index), enclave); // NOLINT
115118
}
116119

@@ -119,7 +122,7 @@ void Port<void>::pull_connection(const ConnectionProperties& properties, const s
119122
connection->bind_downstream_ports(downstream); // NOLINT Pointer is not null
120123
connection->bind_upstream_port(this);
121124
this->register_set_callback(connection->upstream_set_callback());
122-
this->container()->register_connection(connection);
125+
this->container()->register_connection(std::move(connection));
123126
}
124127

125128
// This function can be used to chain two callbacks. This mechanism is not

lib/reactor.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ void Reactor::register_reactor([[maybe_unused]] Reactor* reactor) {
7171
Statistics::increment_reactor_instances();
7272
}
7373

74-
void Reactor::register_connection([[maybe_unused]] BaseAction* connection) {
74+
void Reactor::register_connection([[maybe_unused]] std::unique_ptr<BaseAction>&& connection) {
7575
reactor_assert(connection != nullptr);
76-
[[maybe_unused]] auto result = connections_.insert(std::unique_ptr<BaseAction>(connection)).second;
76+
[[maybe_unused]] auto result = connections_.insert(std::move(connection)).second;
7777
reactor_assert(result);
7878
}
7979

0 commit comments

Comments
 (0)