Skip to content

Commit d53483c

Browse files
committed
refactor the exec::repeat_n algorithm
1 parent 8e1d10d commit d53483c

File tree

5 files changed

+246
-100
lines changed

5 files changed

+246
-100
lines changed

examples/nvexec/maxwell/snr.cuh

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,42 +18,27 @@
1818

1919
#pragma once
2020

21-
#include "common.cuh"
22-
#include "stdexec/execution.hpp" // IWYU pragma: export
21+
#include <stdexec/execution.hpp> // IWYU pragma: export
2322

24-
namespace ex = stdexec;
23+
#include <exec/inline_scheduler.hpp>
24+
#include <exec/repeat_n.hpp>
25+
26+
#include "./common.cuh"
2527

2628
#if STDEXEC_CUDA_COMPILATION()
2729
# include <nvexec/stream_context.cuh> // IWYU pragma: export
2830
# include <nvexec/multi_gpu_context.cuh> // IWYU pragma: export
2931
#else
3032
namespace nvexec {
31-
struct stream_receiver_base {
32-
using receiver_concept = ex::receiver_t;
33-
};
34-
35-
struct stream_sender_base {
36-
using sender_concept = ex::sender_t;
37-
};
38-
39-
namespace detail {
40-
struct stream_op_state_base { };
41-
} // namespace detail
42-
43-
inline auto is_on_gpu() -> bool {
33+
inline constexpr bool is_on_gpu() noexcept {
4434
return false;
4535
}
4636
} // namespace nvexec
4737
#endif
4838

49-
#include <optional>
50-
#include <exec/inline_scheduler.hpp>
51-
52-
STDEXEC_PRAGMA_PUSH()
53-
STDEXEC_PRAGMA_IGNORE_GNU("-Wmissing-braces")
54-
5539
namespace ex = stdexec;
5640

41+
<<<<<<< HEAD
5742
namespace repeat_n_detail {
5843
template <class SenderId, class Closure>
5944
struct repeat_n_sender_t;
@@ -454,6 +439,8 @@ namespace nvexec::_strm {
454439
455440
#endif // STDEXEC_CUDA_COMPILATION()
456441
442+
=======
443+
>>>>>>> f6ba2af5 (refactor the `exec::repeat_n` algorithm)
457444
template <class SchedulerT>
458445
[[nodiscard]]
459446
auto is_gpu_scheduler([[maybe_unused]] SchedulerT&& scheduler) -> bool {
@@ -469,13 +456,12 @@ auto maxwell_eqs_snr(
469456
std::size_t n_iterations,
470457
fields_accessor accessor,
471458
ex::scheduler auto&& computer) {
472-
return ex::just()
473-
| ex::on(
459+
return ex::on(
474460
computer,
475-
repeat_n(
476-
n_iterations,
477-
ex::bulk(ex::par, accessor.cells, update_h(accessor))
478-
| ex::bulk(ex::par, accessor.cells, update_e(time, dt, accessor))))
461+
ex::just() //
462+
| ex::bulk(ex::par, accessor.cells, update_h(accessor))
463+
| ex::bulk(ex::par, accessor.cells, update_e(time, dt, accessor))
464+
| exec::repeat_n(n_iterations))
479465
| ex::then(dump_vtk(write_results, accessor));
480466
}
481467
@@ -489,8 +475,8 @@ void run_snr(
489475
time_storage_t time{is_gpu_scheduler(computer)};
490476
fields_accessor accessor = grid.accessor();
491477
492-
auto init = ex::just()
493-
| ex::on(computer, ex::bulk(ex::par, grid.cells, grid_initializer(dt, accessor)));
478+
auto init =
479+
ex::on(computer, ex::just() | ex::bulk(ex::par, grid.cells, grid_initializer(dt, accessor)));
494480
ex::sync_wait(init);
495481
496482
auto snd = maxwell_eqs_snr(dt, time.get(), write_vtk, n_iterations, accessor, computer);
@@ -499,5 +485,3 @@ void run_snr(
499485
ex::sync_wait(std::move(snd));
500486
});
501487
}
502-
503-
STDEXEC_PRAGMA_POP()

include/exec/repeat_n.hpp

Lines changed: 39 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,19 @@
1919
#include "../stdexec/execution.hpp"
2020
#include "../stdexec/__detail/__meta.hpp"
2121
#include "../stdexec/__detail/__basic_sender.hpp"
22-
#include "../stdexec/__detail/__manual_lifetime.hpp"
22+
#include "../stdexec/__detail/__optional.hpp"
2323

2424
#include "trampoline_scheduler.hpp"
2525
#include "sequence.hpp"
2626

27-
#include "../stdexec/__detail/__atomic.hpp"
2827
#include <cstddef>
2928
#include <exception>
3029
#include <type_traits>
3130

31+
STDEXEC_PRAGMA_PUSH()
32+
STDEXEC_PRAGMA_IGNORE_EDG(expr_has_no_effect)
33+
STDEXEC_PRAGMA_IGNORE_GNU("-Wunused-value")
34+
3235
namespace exec {
3336
namespace __repeat_n {
3437
using namespace stdexec;
@@ -46,28 +49,13 @@ namespace exec {
4649
using receiver_concept = stdexec::receiver_t;
4750
__repeat_n_state<_Sender, _Receiver> *__state_;
4851

49-
template <class... _Args>
50-
void set_value(_Args &&...__args) noexcept {
51-
STDEXEC_TRY {
52-
__state_->__complete(set_value_t(), static_cast<_Args &&>(__args)...);
53-
}
54-
STDEXEC_CATCH_ALL {
55-
if constexpr (!__nothrow_decay_copyable<_Args...>) {
56-
__state_->__complete(set_error_t(), std::current_exception());
57-
}
58-
}
52+
void set_value() noexcept {
53+
__state_->__complete(set_value_t());
5954
}
6055

6156
template <class _Error>
6257
void set_error(_Error &&__err) noexcept {
63-
STDEXEC_TRY {
64-
__state_->__complete(set_error_t(), static_cast<_Error &&>(__err));
65-
}
66-
STDEXEC_CATCH_ALL {
67-
if constexpr (!__nothrow_decay_copyable<_Error>) {
68-
__state_->__complete(set_error_t(), std::current_exception());
69-
}
70-
}
58+
__state_->__complete(set_error_t(), static_cast<_Error &&>(__err));
7159
}
7260

7361
void set_stopped() noexcept {
@@ -89,10 +77,6 @@ namespace exec {
8977
template <class _Child>
9078
__child_count_pair(_Child, std::size_t) -> __child_count_pair<_Child>;
9179

92-
STDEXEC_PRAGMA_PUSH()
93-
94-
STDEXEC_PRAGMA_IGNORE_GNU("-Wtsan")
95-
9680
template <class _Sender, class _Receiver>
9781
struct __repeat_n_state
9882
: stdexec::__enable_receiver_from_this<
@@ -107,73 +91,60 @@ namespace exec {
10791
__result_of<exec::sequence, schedule_result_t<trampoline_scheduler &>, __child_t &>;
10892
using __child_op_t = stdexec::connect_result_t<__child_on_sched_sender_t, __receiver_t>;
10993

110-
__child_count_pair<__child_t> __pair_;
111-
__std::atomic_flag __started_{};
112-
stdexec::__manual_lifetime<__child_op_t> __child_op_;
113-
trampoline_scheduler __sched_;
114-
11594
__repeat_n_state(_Sender &&__sndr, _Receiver &)
11695
: __pair_(__sexpr_apply(static_cast<_Sender &&>(__sndr), stdexec::__detail::__get_data())) {
11796
// Q: should we skip __connect() if __count_ == 0?
11897
__connect();
11998
}
12099

121-
~__repeat_n_state() {
122-
if (!__started_.test(__std::memory_order_acquire)) {
123-
__std::atomic_thread_fence(__std::memory_order_release);
124-
// TSan does not support __std::atomic_thread_fence, so we
125-
// need to use the TSan-specific __tsan_release instead:
126-
STDEXEC_WHEN(STDEXEC_TSAN(), __tsan_release(&__started_));
127-
__child_op_.__destroy();
128-
}
129-
}
130-
131-
void __connect() {
132-
__child_op_.__construct_from([this] {
133-
return stdexec::connect(
134-
exec::sequence(stdexec::schedule(__sched_), __pair_.__child_), __receiver_t{this});
135-
});
100+
auto __connect() -> __child_op_t & {
101+
return __child_op_.__emplace_from(
102+
stdexec::connect,
103+
exec::sequence(stdexec::schedule(__sched_), __pair_.__child_),
104+
__receiver_t{this});
136105
}
137106

138107
void __start() noexcept {
139108
if (__pair_.__count_ == 0) {
140109
stdexec::set_value(static_cast<_Receiver &&>(this->__receiver()));
141110
} else {
142-
[[maybe_unused]]
143-
const bool __already_started = __started_.test_and_set(__std::memory_order_relaxed);
144-
STDEXEC_ASSERT(!__already_started);
145-
stdexec::start(__child_op_.__get());
111+
stdexec::start(*__child_op_);
146112
}
147113
}
148114

149115
template <class _Tag, class... _Args>
150-
void __complete(_Tag, _Args... __args) noexcept { // Intentionally by value...
116+
void __complete(_Tag, _Args &&...__args) noexcept { // Intentionally by value...
117+
static_assert(sizeof...(_Args) <= 1);
118+
static_assert(sizeof...(_Args) == 0 || std::is_same_v<_Tag, stdexec::set_error_t>);
151119
STDEXEC_ASSERT(__pair_.__count_ > 0);
152-
__child_op_.__destroy(); // ... because this could potentially invalidate them.
153-
if constexpr (same_as<_Tag, set_value_t>) {
154-
STDEXEC_TRY {
120+
121+
STDEXEC_TRY {
122+
auto __arg_copy = (0, ..., static_cast<_Args &&>(__args)); // copy any arg...
123+
__child_op_.reset(); // ... because this could potentially invalidate it.
124+
125+
if constexpr (same_as<_Tag, set_value_t>) {
155126
if (--__pair_.__count_ == 0) {
156-
stdexec::set_value(static_cast<_Receiver &&>(this->__receiver()));
127+
stdexec::set_value(std::move(this->__receiver()));
157128
} else {
158-
__connect();
159-
stdexec::start(__child_op_.__get());
129+
stdexec::start(__connect());
160130
}
131+
} else {
132+
_Tag()(std::move(this->__receiver()), static_cast<__decay_t<_Args> &&>(__arg_copy)...);
161133
}
162-
STDEXEC_CATCH_ALL {
163-
stdexec::set_error(
164-
static_cast<_Receiver &&>(this->__receiver()), std::current_exception());
165-
}
166-
} else {
167-
_Tag()(static_cast<_Receiver &&>(this->__receiver()), static_cast<_Args &&>(__args)...);
134+
}
135+
STDEXEC_CATCH_ALL {
136+
stdexec::set_error(std::move(this->__receiver()), std::current_exception());
168137
}
169138
}
139+
140+
__child_count_pair<__child_t> __pair_;
141+
stdexec::__optional<__child_op_t> __child_op_;
142+
trampoline_scheduler __sched_;
170143
};
171144

172145
template <class _Sender, class _Receiver>
173146
__repeat_n_state(_Sender &&, _Receiver &) -> __repeat_n_state<_Sender, _Receiver>;
174147

175-
STDEXEC_PRAGMA_POP()
176-
177148
struct repeat_n_t;
178149
struct _REPEAT_N_EXPECTS_A_SENDER_OF_VOID_;
179150

@@ -236,11 +207,11 @@ namespace exec {
236207
return {{__count}, {}, {}};
237208
}
238209

239-
template <class _Sender>
240-
auto transform_sender(set_value_t, _Sender &&__sndr, __ignore) {
210+
template <class _Sender, bool _NoThrow = __nothrow_decay_copyable<_Sender>>
211+
auto transform_sender(set_value_t, _Sender &&__sndr, __ignore) noexcept(_NoThrow) {
241212
return __sexpr_apply(
242213
static_cast<_Sender &&>(__sndr),
243-
[]<class _Child>(__ignore, std::size_t __count, _Child __child) {
214+
[]<class _Child>(__ignore, std::size_t __count, _Child __child) noexcept(_NoThrow) {
244215
return __make_sexpr<__repeat_n_tag>(__child_count_pair{std::move(__child), __count});
245216
});
246217
}
@@ -263,3 +234,5 @@ namespace stdexec {
263234
};
264235
};
265236
} // namespace stdexec
237+
238+
STDEXEC_PRAGMA_POP()

0 commit comments

Comments
 (0)