include/boost/capy/when_any.hpp

99.2% Lines (131/132) 96.4% Functions (432/448)
include/boost/capy/when_any.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Michael Vandeberg
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_WHEN_ANY_HPP
11 #define BOOST_CAPY_WHEN_ANY_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/concept/executor.hpp>
15 #include <boost/capy/concept/io_awaitable.hpp>
16 #include <coroutine>
17 #include <boost/capy/ex/executor_ref.hpp>
18 #include <boost/capy/ex/frame_allocator.hpp>
19 #include <boost/capy/ex/io_env.hpp>
20 #include <boost/capy/task.hpp>
21
22 #include <array>
23 #include <atomic>
24 #include <exception>
25 #include <optional>
26 #include <ranges>
27 #include <stdexcept>
28 #include <stop_token>
29 #include <tuple>
30 #include <type_traits>
31 #include <utility>
32 #include <variant>
33 #include <vector>
34
35 /*
36 when_any - Race multiple tasks, return first completion
37 ========================================================
38
39 OVERVIEW:
40 ---------
41 when_any launches N tasks concurrently and completes when the FIRST task
42 finishes (success or failure). It then requests stop for all siblings and
43 waits for them to acknowledge before returning.
44
45 ARCHITECTURE:
46 -------------
47 The design mirrors when_all but with inverted completion semantics:
48
49 when_all: complete when remaining_count reaches 0 (all done)
50 when_any: complete when has_winner becomes true (first done)
51 BUT still wait for remaining_count to reach 0 for cleanup
52
53 Key components:
54 - when_any_state: Shared state tracking winner and completion
55 - when_any_runner: Wrapper coroutine for each child task
56 - when_any_launcher: Awaitable that starts all runners concurrently
57
58 CRITICAL INVARIANTS:
59 --------------------
60 1. Exactly one task becomes the winner (via atomic compare_exchange)
61 2. All tasks must complete before parent resumes (cleanup safety)
62 3. Stop is requested immediately when winner is determined
63 4. Only the winner's result/exception is stored
64
65 TYPE DEDUPLICATION:
66 -------------------
67 std::variant requires unique alternative types. Since when_any can race
68 tasks with identical return types (e.g., three task<int>), we must
69 deduplicate types before constructing the variant.
70
71 Example: when_any(task<int>, task<string>, task<int>)
72 - Raw types after void->monostate: int, string, int
73 - Deduplicated variant: std::variant<int, string>
74 - Return: pair<size_t, variant<int, string>>
75
76 The winner_index tells you which task won (0, 1, or 2), while the variant
77 holds the result. Use the index to determine how to interpret the variant.
78
79 VOID HANDLING:
80 --------------
81 void tasks contribute std::monostate to the variant (then deduplicated).
82 All-void tasks result in: pair<size_t, variant<monostate>>
83
84 MEMORY MODEL:
85 -------------
86 Synchronization chain from winner's write to parent's read:
87
88 1. Winner thread writes result_/winner_exception_ (non-atomic)
89 2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
90 3. Last task thread (may be winner or non-winner) calls signal_completion()
91 → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
92 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
93 5. Parent coroutine resumes and reads result_/winner_exception_
94
95 Synchronization analysis:
96 - All fetch_sub operations on remaining_count_ form a release sequence
97 - Winner's fetch_sub releases; subsequent fetch_sub operations participate
98 in the modification order of remaining_count_
99 - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
100 modification order, establishing happens-before from winner's writes
101 - Executor dispatch() is expected to provide queue-based synchronization
102 (release-on-post, acquire-on-execute) completing the chain to parent
103 - Even inline executors work (same thread = sequenced-before)
104
105 Alternative considered: Adding winner_ready_ atomic (set with release after
106 storing winner data, acquired before reading) would make synchronization
107 self-contained and not rely on executor implementation details. Current
108 approach is correct but requires careful reasoning about release sequences
109 and executor behavior.
110
111 EXCEPTION SEMANTICS:
112 --------------------
113 Unlike when_all (which captures first exception, discards others), when_any
114 treats exceptions as valid completions. If the winning task threw, that
115 exception is rethrown. Exceptions from non-winners are silently discarded.
116 */
117
118 namespace boost {
119 namespace capy {
120
121 namespace detail {
122
123 /** Convert void to monostate for variant storage.
124
125 std::variant<void, ...> is ill-formed, so void tasks contribute
126 std::monostate to the result variant instead. Non-void types
127 pass through unchanged.
128
129 @tparam T The type to potentially convert (void becomes monostate).
130 */
131 template<typename T>
132 using void_to_monostate_t = std::conditional_t<std::is_void_v<T>, std::monostate, T>;
133
134 // Type deduplication: std::variant requires unique alternative types.
135 // Fold left over the type list, appending each type only if not already present.
136 template<typename Variant, typename T>
137 struct variant_append_if_unique;
138
139 template<typename... Vs, typename T>
140 struct variant_append_if_unique<std::variant<Vs...>, T>
141 {
142 using type = std::conditional_t<
143 (std::is_same_v<T, Vs> || ...),
144 std::variant<Vs...>,
145 std::variant<Vs..., T>>;
146 };
147
148 template<typename Accumulated, typename... Remaining>
149 struct deduplicate_impl;
150
151 template<typename Accumulated>
152 struct deduplicate_impl<Accumulated>
153 {
154 using type = Accumulated;
155 };
156
157 template<typename Accumulated, typename T, typename... Rest>
158 struct deduplicate_impl<Accumulated, T, Rest...>
159 {
160 using next = typename variant_append_if_unique<Accumulated, T>::type;
161 using type = typename deduplicate_impl<next, Rest...>::type;
162 };
163
164 // Deduplicated variant; void types become monostate before deduplication
165 template<typename T0, typename... Ts>
166 using unique_variant_t = typename deduplicate_impl<
167 std::variant<void_to_monostate_t<T0>>,
168 void_to_monostate_t<Ts>...>::type;
169
170 // Result: (winner_index, deduplicated_variant). Use index to disambiguate
171 // when multiple tasks share the same return type.
172 template<typename T0, typename... Ts>
173 using when_any_result_t = std::pair<std::size_t, unique_variant_t<T0, Ts...>>;
174
175 /** Core shared state for when_any operations.
176
177 Contains all members and methods common to both heterogeneous (variadic)
178 and homogeneous (range) when_any implementations. State classes embed
179 this via composition to avoid CRTP destructor ordering issues.
180
181 @par Thread Safety
182 Atomic operations protect winner selection and completion count.
183 */
184 struct when_any_core
185 {
186 std::atomic<std::size_t> remaining_count_;
187 std::size_t winner_index_{0};
188 std::exception_ptr winner_exception_;
189 std::stop_source stop_source_;
190
191 // Bridges parent's stop token to our stop_source
192 struct stop_callback_fn
193 {
194 std::stop_source* source_;
195 9 void operator()() const noexcept { source_->request_stop(); }
196 };
197 using stop_callback_t = std::stop_callback<stop_callback_fn>;
198 std::optional<stop_callback_t> parent_stop_callback_;
199
200 std::coroutine_handle<> continuation_;
201 io_env const* caller_env_ = nullptr;
202
203 // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
204 std::atomic<bool> has_winner_{false};
205
206 65 explicit when_any_core(std::size_t count) noexcept
207 65 : remaining_count_(count)
208 {
209 65 }
210
211 /** Atomically claim winner status; exactly one task succeeds. */
212 190 bool try_win(std::size_t index) noexcept
213 {
214 190 bool expected = false;
215 190 if(has_winner_.compare_exchange_strong(
216 expected, true, std::memory_order_acq_rel))
217 {
218 65 winner_index_ = index;
219 65 stop_source_.request_stop();
220 65 return true;
221 }
222 125 return false;
223 }
224
225 /** @pre try_win() returned true. */
226 8 void set_winner_exception(std::exception_ptr ep) noexcept
227 {
228 8 winner_exception_ = ep;
229 8 }
230
231 // Runners signal completion directly via final_suspend; no member function needed.
232 };
233
234 /** Shared state for heterogeneous when_any operation.
235
236 Coordinates winner selection, result storage, and completion tracking
237 for all child tasks in a when_any operation. Uses composition with
238 when_any_core for shared functionality.
239
240 @par Lifetime
241 Allocated on the parent coroutine's frame, outlives all runners.
242
243 @tparam T0 First task's result type.
244 @tparam Ts Remaining tasks' result types.
245 */
246 template<typename T0, typename... Ts>
247 struct when_any_state
248 {
249 static constexpr std::size_t task_count = 1 + sizeof...(Ts);
250 using variant_type = unique_variant_t<T0, Ts...>;
251
252 when_any_core core_;
253 std::optional<variant_type> result_;
254 std::array<std::coroutine_handle<>, task_count> runner_handles_{};
255
256 43 when_any_state()
257 43 : core_(task_count)
258 {
259 43 }
260
261 // Runners self-destruct in final_suspend. No destruction needed here.
262
263 /** @pre core_.try_win() returned true.
264 @note Uses in_place_type (not index) because variant is deduplicated.
265 */
266 template<typename T>
267 35 void set_winner_result(T value)
268 noexcept(std::is_nothrow_move_constructible_v<T>)
269 {
270 35 result_.emplace(std::in_place_type<T>, std::move(value));
271 35 }
272
273 /** @pre core_.try_win() returned true. */
274 3 void set_winner_void() noexcept
275 {
276 3 result_.emplace(std::in_place_type<std::monostate>, std::monostate{});
277 3 }
278 };
279
280 /** Wrapper coroutine that runs a single child task for when_any.
281
282 Propagates executor/stop_token to the child, attempts to claim winner
283 status on completion, and signals completion for cleanup coordination.
284
285 @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
286 */
287 template<typename StateType>
288 struct when_any_runner
289 {
290 struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
291 {
292 StateType* state_ = nullptr;
293 std::size_t index_ = 0;
294 io_env env_;
295
296 190 when_any_runner get_return_object() noexcept
297 {
298 190 return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
299 }
300
301 // Starts suspended; launcher sets up state/ex/token then resumes
302 190 std::suspend_always initial_suspend() noexcept
303 {
304 190 return {};
305 }
306
307 190 auto final_suspend() noexcept
308 {
309 struct awaiter
310 {
311 promise_type* p_;
312 bool await_ready() const noexcept { return false; }
313 auto await_suspend(std::coroutine_handle<> h) noexcept
314 {
315 // Extract everything needed before self-destruction.
316 auto& core = p_->state_->core_;
317 auto* counter = &core.remaining_count_;
318 auto* caller_env = core.caller_env_;
319 auto cont = core.continuation_;
320
321 h.destroy();
322
323 // If last runner, dispatch parent for symmetric transfer.
324 auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
325 if(remaining == 1)
326 return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
327 return detail::symmetric_transfer(std::noop_coroutine());
328 }
329 void await_resume() const noexcept {}
330 };
331 190 return awaiter{this};
332 }
333
334 178 void return_void() noexcept {}
335
336 // Exceptions are valid completions in when_any (unlike when_all)
337 12 void unhandled_exception()
338 {
339 12 if(state_->core_.try_win(index_))
340 8 state_->core_.set_winner_exception(std::current_exception());
341 12 }
342
343 /** Injects executor and stop token into child awaitables. */
344 template<class Awaitable>
345 struct transform_awaiter
346 {
347 std::decay_t<Awaitable> a_;
348 promise_type* p_;
349
350 190 bool await_ready() { return a_.await_ready(); }
351 190 auto await_resume() { return a_.await_resume(); }
352
353 template<class Promise>
354 185 auto await_suspend(std::coroutine_handle<Promise> h)
355 {
356 using R = decltype(a_.await_suspend(h, &p_->env_));
357 if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
358 185 return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
359 else
360 return a_.await_suspend(h, &p_->env_);
361 }
362 };
363
364 template<class Awaitable>
365 190 auto await_transform(Awaitable&& a)
366 {
367 using A = std::decay_t<Awaitable>;
368 if constexpr (IoAwaitable<A>)
369 {
370 return transform_awaiter<Awaitable>{
371 380 std::forward<Awaitable>(a), this};
372 }
373 else
374 {
375 static_assert(sizeof(A) == 0, "requires IoAwaitable");
376 }
377 190 }
378 };
379
380 std::coroutine_handle<promise_type> h_;
381
382 190 explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
383 190 : h_(h)
384 {
385 190 }
386
387 // Enable move for all clang versions - some versions need it
388 when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
389
390 // Non-copyable
391 when_any_runner(when_any_runner const&) = delete;
392 when_any_runner& operator=(when_any_runner const&) = delete;
393 when_any_runner& operator=(when_any_runner&&) = delete;
394
395 190 auto release() noexcept
396 {
397 190 return std::exchange(h_, nullptr);
398 }
399 };
400
401 /** Wraps a child awaitable, attempts to claim winner on completion.
402
403 Uses requires-expressions to detect state capabilities:
404 - set_winner_void(): for heterogeneous void tasks (stores monostate)
405 - set_winner_result(): for non-void tasks
406 - Neither: for homogeneous void tasks (no result storage)
407 */
408 template<IoAwaitable Awaitable, typename StateType>
409 when_any_runner<StateType>
410 190 make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
411 {
412 using T = awaitable_result_t<Awaitable>;
413 if constexpr (std::is_void_v<T>)
414 {
415 co_await std::move(inner);
416 if(state->core_.try_win(index))
417 {
418 // Heterogeneous void tasks store monostate in the variant
419 if constexpr (requires { state->set_winner_void(); })
420 state->set_winner_void();
421 // Homogeneous void tasks have no result to store
422 }
423 }
424 else
425 {
426 auto result = co_await std::move(inner);
427 if(state->core_.try_win(index))
428 {
429 // Defensive: move should not throw (already moved once), but we
430 // catch just in case since an uncaught exception would be devastating.
431 try
432 {
433 state->set_winner_result(std::move(result));
434 }
435 catch(...)
436 {
437 state->core_.set_winner_exception(std::current_exception());
438 }
439 }
440 }
441 380 }
442
443 /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
444 template<IoAwaitable... Awaitables>
445 class when_any_launcher
446 {
447 using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
448
449 std::tuple<Awaitables...>* tasks_;
450 state_type* state_;
451
452 public:
453 43 when_any_launcher(
454 std::tuple<Awaitables...>* tasks,
455 state_type* state)
456 43 : tasks_(tasks)
457 43 , state_(state)
458 {
459 43 }
460
461 43 bool await_ready() const noexcept
462 {
463 43 return sizeof...(Awaitables) == 0;
464 }
465
466 /** CRITICAL: If the last task finishes synchronously, parent resumes and
467 destroys this object before await_suspend returns. Must not reference
468 `this` after the final launch_one call.
469 */
470 43 std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
471 {
472 43 state_->core_.continuation_ = continuation;
473 43 state_->core_.caller_env_ = caller_env;
474
475 43 if(caller_env->stop_token.stop_possible())
476 {
477 18 state_->core_.parent_stop_callback_.emplace(
478 9 caller_env->stop_token,
479 9 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
480
481 9 if(caller_env->stop_token.stop_requested())
482 3 state_->core_.stop_source_.request_stop();
483 }
484
485 43 auto token = state_->core_.stop_source_.get_token();
486 [&]<std::size_t... Is>(std::index_sequence<Is...>) {
487 (..., launch_one<Is>(caller_env->executor, token));
488 43 }(std::index_sequence_for<Awaitables...>{});
489
490 86 return std::noop_coroutine();
491 43 }
492
493 43 void await_resume() const noexcept
494 {
495 43 }
496
497 private:
498 /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
499 template<std::size_t I>
500 105 void launch_one(executor_ref caller_ex, std::stop_token token)
501 {
502 105 auto runner = make_when_any_runner(
503 105 std::move(std::get<I>(*tasks_)), state_, I);
504
505 105 auto h = runner.release();
506 105 h.promise().state_ = state_;
507 105 h.promise().index_ = I;
508 105 h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator};
509
510 105 std::coroutine_handle<> ch{h};
511 105 state_->runner_handles_[I] = ch;
512 105 caller_ex.post(ch);
513 210 }
514 };
515
516 } // namespace detail
517
518 /** Wait for the first awaitable to complete.
519
520 Races multiple heterogeneous awaitables concurrently and returns when the
521 first one completes. The result includes the winner's index and a
522 deduplicated variant containing the result value.
523
524 @par Suspends
525 The calling coroutine suspends when co_await is invoked. All awaitables
526 are launched concurrently and execute in parallel. The coroutine resumes
527 only after all awaitables have completed, even though the winner is
528 determined by the first to finish.
529
530 @par Completion Conditions
531 @li Winner is determined when the first awaitable completes (success or exception)
532 @li Only one task can claim winner status via atomic compare-exchange
533 @li Once a winner exists, stop is requested for all remaining siblings
534 @li Parent coroutine resumes only after all siblings acknowledge completion
535 @li The winner's result is returned; if the winner threw, the exception is rethrown
536
537 @par Cancellation Semantics
538 Cancellation is supported via stop_token propagated through the
539 IoAwaitable protocol:
540 @li Each child awaitable receives a stop_token derived from a shared stop_source
541 @li When the parent's stop token is activated, the stop is forwarded to all children
542 @li When a winner is determined, stop_source_.request_stop() is called immediately
543 @li Siblings must handle cancellation gracefully and complete before parent resumes
544 @li Stop requests are cooperative; tasks must check and respond to them
545
546 @par Concurrency/Overlap
547 All awaitables are launched concurrently before any can complete.
548 The launcher iterates through the arguments, starting each task on the
549 caller's executor. Tasks may execute in parallel on multi-threaded
550 executors or interleave on single-threaded executors. There is no
551 guaranteed ordering of task completion.
552
553 @par Notable Error Conditions
554 @li Winner exception: if the winning task threw, that exception is rethrown
555 @li Non-winner exceptions: silently discarded (only winner's result matters)
556 @li Cancellation: tasks may complete via cancellation without throwing
557
558 @par Example
559 @code
560 task<void> example() {
561 auto [index, result] = co_await when_any(
562 fetch_from_primary(), // task<Response>
563 fetch_from_backup() // task<Response>
564 );
565 // index is 0 or 1, result holds the winner's Response
566 auto response = std::get<Response>(result);
567 }
568 @endcode
569
570 @par Example with Heterogeneous Types
571 @code
572 task<void> mixed_types() {
573 auto [index, result] = co_await when_any(
574 fetch_int(), // task<int>
575 fetch_string() // task<std::string>
576 );
577 if (index == 0)
578 std::cout << "Got int: " << std::get<int>(result) << "\n";
579 else
580 std::cout << "Got string: " << std::get<std::string>(result) << "\n";
581 }
582 @endcode
583
584 @tparam A0 First awaitable type (must satisfy IoAwaitable).
585 @tparam As Remaining awaitable types (must satisfy IoAwaitable).
586 @param a0 The first awaitable to race.
587 @param as Additional awaitables to race concurrently.
588 @return A task yielding a pair of (winner_index, result_variant).
589
590 @throws Rethrows the winner's exception if the winning task threw an exception.
591
592 @par Remarks
593 Awaitables are moved into the coroutine frame; original objects become
594 empty after the call. When multiple awaitables share the same return type,
595 the variant is deduplicated to contain only unique types. Use the winner
596 index to determine which awaitable completed first. Void awaitables
597 contribute std::monostate to the variant.
598
599 @see when_all, IoAwaitable
600 */
601 template<IoAwaitable A0, IoAwaitable... As>
602 43 [[nodiscard]] auto when_any(A0 a0, As... as)
603 -> task<detail::when_any_result_t<
604 detail::awaitable_result_t<A0>,
605 detail::awaitable_result_t<As>...>>
606 {
607 using result_type = detail::when_any_result_t<
608 detail::awaitable_result_t<A0>,
609 detail::awaitable_result_t<As>...>;
610
611 detail::when_any_state<
612 detail::awaitable_result_t<A0>,
613 detail::awaitable_result_t<As>...> state;
614 std::tuple<A0, As...> awaitable_tuple(std::move(a0), std::move(as)...);
615
616 co_await detail::when_any_launcher<A0, As...>(&awaitable_tuple, &state);
617
618 if(state.core_.winner_exception_)
619 std::rethrow_exception(state.core_.winner_exception_);
620
621 co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
622 86 }
623
624 /** Concept for ranges of full I/O awaitables.
625
626 A range satisfies `IoAwaitableRange` if it is a sized input range
627 whose value type satisfies @ref IoAwaitable. This enables when_any
628 to accept any container or view of awaitables, not just std::vector.
629
630 @tparam R The range type.
631
632 @par Requirements
633 @li `R` must satisfy `std::ranges::input_range`
634 @li `R` must satisfy `std::ranges::sized_range`
635 @li `std::ranges::range_value_t<R>` must satisfy @ref IoAwaitable
636
637 @par Syntactic Requirements
638 Given `r` of type `R`:
639 @li `std::ranges::begin(r)` is valid
640 @li `std::ranges::end(r)` is valid
641 @li `std::ranges::size(r)` returns `std::ranges::range_size_t<R>`
642 @li `*std::ranges::begin(r)` satisfies @ref IoAwaitable
643
644 @par Example
645 @code
646 template<IoAwaitableRange R>
647 task<void> race_all(R&& awaitables) {
648 auto winner = co_await when_any(std::forward<R>(awaitables));
649 // Process winner...
650 }
651 @endcode
652
653 @see when_any, IoAwaitable
654 */
655 template<typename R>
656 concept IoAwaitableRange =
657 std::ranges::input_range<R> &&
658 std::ranges::sized_range<R> &&
659 IoAwaitable<std::ranges::range_value_t<R>>;
660
661 namespace detail {
662
663 /** Shared state for homogeneous when_any (range overload).
664
665 Uses composition with when_any_core for shared functionality.
666 Simpler than heterogeneous: optional<T> instead of variant, vector
667 instead of array for runner handles.
668 */
669 template<typename T>
670 struct when_any_homogeneous_state
671 {
672 when_any_core core_;
673 std::optional<T> result_;
674 std::vector<std::coroutine_handle<>> runner_handles_;
675
676 19 explicit when_any_homogeneous_state(std::size_t count)
677 19 : core_(count)
678 38 , runner_handles_(count)
679 {
680 19 }
681
682 // Runners self-destruct in final_suspend. No destruction needed here.
683
684 /** @pre core_.try_win() returned true. */
685 17 void set_winner_result(T value)
686 noexcept(std::is_nothrow_move_constructible_v<T>)
687 {
688 17 result_.emplace(std::move(value));
689 17 }
690 };
691
692 /** Specialization for void tasks (no result storage needed). */
693 template<>
694 struct when_any_homogeneous_state<void>
695 {
696 when_any_core core_;
697 std::vector<std::coroutine_handle<>> runner_handles_;
698
699 3 explicit when_any_homogeneous_state(std::size_t count)
700 3 : core_(count)
701 6 , runner_handles_(count)
702 {
703 3 }
704
705 // Runners self-destruct in final_suspend. No destruction needed here.
706
707 // No set_winner_result - void tasks have no result to store
708 };
709
710 /** Launches all runners concurrently; see await_suspend for lifetime concerns. */
711 template<IoAwaitableRange Range>
712 class when_any_homogeneous_launcher
713 {
714 using Awaitable = std::ranges::range_value_t<Range>;
715 using T = awaitable_result_t<Awaitable>;
716
717 Range* range_;
718 when_any_homogeneous_state<T>* state_;
719
720 public:
721 22 when_any_homogeneous_launcher(
722 Range* range,
723 when_any_homogeneous_state<T>* state)
724 22 : range_(range)
725 22 , state_(state)
726 {
727 22 }
728
729 22 bool await_ready() const noexcept
730 {
731 22 return std::ranges::empty(*range_);
732 }
733
734 /** CRITICAL: If the last task finishes synchronously, parent resumes and
735 destroys this object before await_suspend returns. Must not reference
736 `this` after dispatching begins.
737
738 Two-phase approach:
739 1. Create all runners (safe - no dispatch yet)
740 2. Dispatch all runners (any may complete synchronously)
741 */
742 22 std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
743 {
744 22 state_->core_.continuation_ = continuation;
745 22 state_->core_.caller_env_ = caller_env;
746
747 22 if(caller_env->stop_token.stop_possible())
748 {
749 14 state_->core_.parent_stop_callback_.emplace(
750 7 caller_env->stop_token,
751 7 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
752
753 7 if(caller_env->stop_token.stop_requested())
754 4 state_->core_.stop_source_.request_stop();
755 }
756
757 22 auto token = state_->core_.stop_source_.get_token();
758
759 // Phase 1: Create all runners without dispatching.
760 // This iterates over *range_ safely because no runners execute yet.
761 22 std::size_t index = 0;
762 107 for(auto&& a : *range_)
763 {
764 85 auto runner = make_when_any_runner(
765 85 std::move(a), state_, index);
766
767 85 auto h = runner.release();
768 85 h.promise().state_ = state_;
769 85 h.promise().index_ = index;
770 85 h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
771
772 85 state_->runner_handles_[index] = std::coroutine_handle<>{h};
773 85 ++index;
774 }
775
776 // Phase 2: Post all runners. Any may complete synchronously.
777 // After last post, state_ and this may be destroyed.
778 // Use raw pointer/count captured before posting.
779 22 std::coroutine_handle<>* handles = state_->runner_handles_.data();
780 22 std::size_t count = state_->runner_handles_.size();
781 107 for(std::size_t i = 0; i < count; ++i)
782 85 caller_env->executor.post(handles[i]);
783
784 44 return std::noop_coroutine();
785 107 }
786
787 22 void await_resume() const noexcept
788 {
789 22 }
790 };
791
792 } // namespace detail
793
794 /** Wait for the first awaitable to complete (range overload).
795
796 Races a range of awaitables with the same result type. Accepts any
797 sized input range of IoAwaitable types, enabling use with arrays,
798 spans, or custom containers.
799
800 @par Suspends
801 The calling coroutine suspends when co_await is invoked. All awaitables
802 in the range are launched concurrently and execute in parallel. The
803 coroutine resumes only after all awaitables have completed, even though
804 the winner is determined by the first to finish.
805
806 @par Completion Conditions
807 @li Winner is determined when the first awaitable completes (success or exception)
808 @li Only one task can claim winner status via atomic compare-exchange
809 @li Once a winner exists, stop is requested for all remaining siblings
810 @li Parent coroutine resumes only after all siblings acknowledge completion
811 @li The winner's index and result are returned; if the winner threw, the exception is rethrown
812
813 @par Cancellation Semantics
814 Cancellation is supported via stop_token propagated through the
815 IoAwaitable protocol:
816 @li Each child awaitable receives a stop_token derived from a shared stop_source
817 @li When the parent's stop token is activated, the stop is forwarded to all children
818 @li When a winner is determined, stop_source_.request_stop() is called immediately
819 @li Siblings must handle cancellation gracefully and complete before parent resumes
820 @li Stop requests are cooperative; tasks must check and respond to them
821
822 @par Concurrency/Overlap
823 All awaitables are launched concurrently before any can complete.
824 The launcher iterates through the range, starting each task on the
825 caller's executor. Tasks may execute in parallel on multi-threaded
826 executors or interleave on single-threaded executors. There is no
827 guaranteed ordering of task completion.
828
829 @par Notable Error Conditions
830 @li Empty range: throws std::invalid_argument immediately (not via co_return)
831 @li Winner exception: if the winning task threw, that exception is rethrown
832 @li Non-winner exceptions: silently discarded (only winner's result matters)
833 @li Cancellation: tasks may complete via cancellation without throwing
834
835 @par Example
836 @code
837 task<void> example() {
838 std::array<task<Response>, 3> requests = {
839 fetch_from_server(0),
840 fetch_from_server(1),
841 fetch_from_server(2)
842 };
843
844 auto [index, response] = co_await when_any(std::move(requests));
845 }
846 @endcode
847
848 @par Example with Vector
849 @code
850 task<Response> fetch_fastest(std::vector<Server> const& servers) {
851 std::vector<task<Response>> requests;
852 for (auto const& server : servers)
853 requests.push_back(fetch_from(server));
854
855 auto [index, response] = co_await when_any(std::move(requests));
856 co_return response;
857 }
858 @endcode
859
860 @tparam R Range type satisfying IoAwaitableRange.
861 @param awaitables Range of awaitables to race concurrently (must not be empty).
862 @return A task yielding a pair of (winner_index, result).
863
864 @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
865 @throws Rethrows the winner's exception if the winning task threw an exception.
866
867 @par Remarks
868 Elements are moved from the range; for lvalue ranges, the original
869 container will have moved-from elements after this call. The range
870 is moved onto the coroutine frame to ensure lifetime safety. Unlike
871 the variadic overload, no variant wrapper is needed since all tasks
872 share the same return type.
873
874 @see when_any, IoAwaitableRange
875 */
876 template<IoAwaitableRange R>
877 requires (!std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>)
878 21 [[nodiscard]] auto when_any(R&& awaitables)
879 -> task<std::pair<std::size_t, detail::awaitable_result_t<std::ranges::range_value_t<R>>>>
880 {
881 using Awaitable = std::ranges::range_value_t<R>;
882 using T = detail::awaitable_result_t<Awaitable>;
883 using result_type = std::pair<std::size_t, T>;
884 using OwnedRange = std::remove_cvref_t<R>;
885
886 auto count = std::ranges::size(awaitables);
887 if(count == 0)
888 throw std::invalid_argument("when_any requires at least one awaitable");
889
890 // Move/copy range onto coroutine frame to ensure lifetime
891 OwnedRange owned_awaitables = std::forward<R>(awaitables);
892
893 detail::when_any_homogeneous_state<T> state(count);
894
895 co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
896
897 if(state.core_.winner_exception_)
898 std::rethrow_exception(state.core_.winner_exception_);
899
900 co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
901 42 }
902
903 /** Wait for the first awaitable to complete (void range overload).
904
905 Races a range of void-returning awaitables. Since void awaitables have
906 no result value, only the winner's index is returned.
907
908 @par Suspends
909 The calling coroutine suspends when co_await is invoked. All awaitables
910 in the range are launched concurrently and execute in parallel. The
911 coroutine resumes only after all awaitables have completed, even though
912 the winner is determined by the first to finish.
913
914 @par Completion Conditions
915 @li Winner is determined when the first awaitable completes (success or exception)
916 @li Only one task can claim winner status via atomic compare-exchange
917 @li Once a winner exists, stop is requested for all remaining siblings
918 @li Parent coroutine resumes only after all siblings acknowledge completion
919 @li The winner's index is returned; if the winner threw, the exception is rethrown
920
921 @par Cancellation Semantics
922 Cancellation is supported via stop_token propagated through the
923 IoAwaitable protocol:
924 @li Each child awaitable receives a stop_token derived from a shared stop_source
925 @li When the parent's stop token is activated, the stop is forwarded to all children
926 @li When a winner is determined, stop_source_.request_stop() is called immediately
927 @li Siblings must handle cancellation gracefully and complete before parent resumes
928 @li Stop requests are cooperative; tasks must check and respond to them
929
930 @par Concurrency/Overlap
931 All awaitables are launched concurrently before any can complete.
932 The launcher iterates through the range, starting each task on the
933 caller's executor. Tasks may execute in parallel on multi-threaded
934 executors or interleave on single-threaded executors. There is no
935 guaranteed ordering of task completion.
936
937 @par Notable Error Conditions
938 @li Empty range: throws std::invalid_argument immediately (not via co_return)
939 @li Winner exception: if the winning task threw, that exception is rethrown
940 @li Non-winner exceptions: silently discarded (only winner's result matters)
941 @li Cancellation: tasks may complete via cancellation without throwing
942
943 @par Example
944 @code
945 task<void> example() {
946 std::vector<task<void>> tasks;
947 for (int i = 0; i < 5; ++i)
948 tasks.push_back(background_work(i));
949
950 std::size_t winner = co_await when_any(std::move(tasks));
951 // winner is the index of the first task to complete
952 }
953 @endcode
954
955 @par Example with Timeout
956 @code
957 task<void> with_timeout() {
958 std::vector<task<void>> tasks;
959 tasks.push_back(long_running_operation());
960 tasks.push_back(delay(std::chrono::seconds(5)));
961
962 std::size_t winner = co_await when_any(std::move(tasks));
963 if (winner == 1) {
964 // Timeout occurred
965 }
966 }
967 @endcode
968
969 @tparam R Range type satisfying IoAwaitableRange with void result.
970 @param awaitables Range of void awaitables to race concurrently (must not be empty).
971 @return A task yielding the winner's index (zero-based).
972
973 @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
974 @throws Rethrows the winner's exception if the winning task threw an exception.
975
976 @par Remarks
977 Elements are moved from the range; for lvalue ranges, the original
978 container will have moved-from elements after this call. The range
979 is moved onto the coroutine frame to ensure lifetime safety. Unlike
980 the non-void overload, no result storage is needed since void tasks
981 produce no value.
982
983 @see when_any, IoAwaitableRange
984 */
985 template<IoAwaitableRange R>
986 requires std::is_void_v<detail::awaitable_result_t<std::ranges::range_value_t<R>>>
987 3 [[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
988 {
989 using OwnedRange = std::remove_cvref_t<R>;
990
991 auto count = std::ranges::size(awaitables);
992 if(count == 0)
993 throw std::invalid_argument("when_any requires at least one awaitable");
994
995 // Move/copy range onto coroutine frame to ensure lifetime
996 OwnedRange owned_awaitables = std::forward<R>(awaitables);
997
998 detail::when_any_homogeneous_state<void> state(count);
999
1000 co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
1001
1002 if(state.core_.winner_exception_)
1003 std::rethrow_exception(state.core_.winner_exception_);
1004
1005 co_return state.core_.winner_index_;
1006 6 }
1007
1008 } // namespace capy
1009 } // namespace boost
1010
1011 #endif
1012