Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
experimental_async.H
Go to the documentation of this file.
1/*
2 Aleph_w
3
4 Data structures & Algorithms
5 version 2.0.0b
6 https://github.com/lrleon/Aleph-w
7
8 This file is part of Aleph-w library
9
10 Copyright (c) 2002-2026 Leandro Rabindranath Leon
111
12 Permission is hereby granted, free of charge, to any person obtaining a copy
13 of this software and associated documentation files (the "Software"), to deal
14 in the Software without restriction, including without limitation the rights
15 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 copies of the Software, and to permit persons to whom the Software is
17 furnished to do so, subject to the following conditions:
18
19 The above copyright notice and this permission notice shall be included in all
20 copies or substantial portions of the Software.
21
22 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 SOFTWARE.
29*/
30
87#ifndef ALEPH_EXPERIMENTAL_ASYNC_H
88#define ALEPH_EXPERIMENTAL_ASYNC_H
89
90#include <ah-errors.H>
91#include <thread_pool.H>
92
93#ifndef ALEPH_ENABLE_EXPERIMENTAL_ASYNC
94# define ALEPH_ENABLE_EXPERIMENTAL_ASYNC 0
95#endif
96
97#if ALEPH_ENABLE_EXPERIMENTAL_ASYNC && __cplusplus >= 202002L
98# if defined(__has_include)
99# if __has_include(<coroutine>)
100# include <coroutine>
101# define ALEPH_DETAIL_HAS_COROUTINE_HEADER 1
102# else
103# define ALEPH_DETAIL_HAS_COROUTINE_HEADER 0
104# endif
105# else
106# define ALEPH_DETAIL_HAS_COROUTINE_HEADER 0
107# endif
108#else
109# define ALEPH_DETAIL_HAS_COROUTINE_HEADER 0
110#endif
111
112#if ALEPH_ENABLE_EXPERIMENTAL_ASYNC && \
113 __cplusplus >= 202002L && \
114 ALEPH_DETAIL_HAS_COROUTINE_HEADER && \
115 defined(__cpp_impl_coroutine) && \
116 __cpp_impl_coroutine >= 201902L
117# define ALEPH_HAS_EXPERIMENTAL_ASYNC 1
118#else
119# define ALEPH_HAS_EXPERIMENTAL_ASYNC 0
120#endif
121
122#if ALEPH_HAS_EXPERIMENTAL_ASYNC
123
124#include <condition_variable>
125#include <exception>
126#include <functional>
127#include <memory>
128#include <mutex>
129#include <optional>
130#include <system_error>
131#include <tuple>
132#include <type_traits>
133#include <utility>
134
135namespace Aleph
136{
137 namespace experimental
138 {
139 namespace detail
140 {
141 [[noreturn]] inline void throw_no_state()
142 {
143 ah_runtime_error() << "scheduled_operation has no state";
144 std::terminate();
145 }
146
147 class ContinuationRegistration
148 {
149 mutable std::mutex mutex_;
150 std::coroutine_handle<> handle_{};
151 bool active_ = true;
152
153 public:
154 explicit ContinuationRegistration(std::coroutine_handle<> handle) noexcept
155 : handle_(handle) {}
156
157 [[nodiscard]] bool is_active() const noexcept
158 {
159 std::lock_guard<std::mutex> lock(mutex_);
160 return active_ and static_cast<bool>(handle_);
161 }
162
163 void unregister() noexcept
164 {
165 std::lock_guard<std::mutex> lock(mutex_);
166 active_ = false;
167 handle_ = std::coroutine_handle<>{};
168 }
169
170 void resume_if_active()
171 {
172 std::coroutine_handle<> handle;
173 {
174 std::lock_guard<std::mutex> lock(mutex_);
175 if (not active_ or not handle_)
176 return;
177 active_ = false;
178 handle = std::exchange(handle_, std::coroutine_handle<>{});
179 }
180 handle.resume();
181 }
182 };
183
184 template <typename F, typename... Args>
185 auto make_invocable(F && f, Args && ... args)
186 {
187 return [func = std::forward<F>(f),
188 args_tuple = std::make_tuple(std::forward<Args>(args)...)]() mutable
189 {
190 return std::apply([&func](auto && ... a)
191 {
192 return std::invoke(std::move(func),
193 std::forward<decltype(a)>(a)...);
194 }, std::move(args_tuple));
195 };
196 }
197
198 template <typename T>
199 class OperationState
200 {
201 mutable std::mutex mutex_;
202 mutable std::condition_variable ready_cv_;
203 bool ready_ = false;
204 bool consumed_ = false;
205 std::exception_ptr exception_;
206 std::optional<T> value_;
207 std::weak_ptr<ContinuationRegistration> continuation_;
208
209 template <typename F>
210 void complete(F && fill)
211 {
212 std::weak_ptr<ContinuationRegistration> continuation;
213 {
214 std::lock_guard<std::mutex> lock(mutex_);
215 if (ready_)
216 return;
217 fill();
218 ready_ = true;
219 continuation = std::exchange(continuation_, {});
220 }
221 ready_cv_.notify_all();
222 if (auto registration = continuation.lock(); registration != nullptr)
223 registration->resume_if_active();
224 }
225
226 public:
227 OperationState() = default;
228
229 [[nodiscard]] bool is_ready() const
230 {
231 std::lock_guard<std::mutex> lock(mutex_);
232 return ready_;
233 }
234
235 void wait() const
236 {
237 std::unique_lock<std::mutex> lock(mutex_);
238 ready_cv_.wait(lock, [this] { return ready_; });
239 }
240
241 template <typename U>
242 void set_value(U && value)
243 {
244 complete([this, &value]
245 {
246 value_.emplace(std::forward<U>(value));
247 });
248 }
249
250 void set_exception(std::exception_ptr exception)
251 {
252 complete([this, &exception]
253 {
254 exception_ = exception;
255 });
256 }
257
258 [[nodiscard]] bool attach_continuation(
259 const std::shared_ptr<ContinuationRegistration> & continuation)
260 {
261 std::lock_guard<std::mutex> lock(mutex_);
262 if (ready_)
263 return false;
264 if (auto registered = continuation_.lock();
265 registered != nullptr and registered->is_active())
267 << "scheduled_operation already has a waiting coroutine";
268 continuation_ = continuation;
269 return true;
270 }
271
272 void detach_continuation(
273 const std::shared_ptr<ContinuationRegistration> & continuation) noexcept
274 {
275 std::lock_guard<std::mutex> lock(mutex_);
276 if (auto registered = continuation_.lock(); registered == continuation)
277 continuation_.reset();
278 }
279
280 T take()
281 {
282 std::unique_lock<std::mutex> lock(mutex_);
283 ready_cv_.wait(lock, [this] { return ready_; });
284 if (consumed_)
285 throw_no_state();
286 consumed_ = true;
287 if (exception_ != nullptr)
288 std::rethrow_exception(exception_);
289 return std::move(*value_);
290 }
291 };
292
293 template <>
294 class OperationState<void>
295 {
296 mutable std::mutex mutex_;
297 mutable std::condition_variable ready_cv_;
298 bool ready_ = false;
299 bool consumed_ = false;
300 std::exception_ptr exception_;
301 std::weak_ptr<ContinuationRegistration> continuation_;
302
303 template <typename F>
304 void complete(F && fill)
305 {
306 std::weak_ptr<ContinuationRegistration> continuation;
307 {
308 std::lock_guard<std::mutex> lock(mutex_);
309 if (ready_)
310 return;
311 fill();
312 ready_ = true;
313 continuation = std::exchange(continuation_, {});
314 }
315 ready_cv_.notify_all();
316 if (auto registration = continuation.lock(); registration != nullptr)
317 registration->resume_if_active();
318 }
319
320 public:
321 OperationState() = default;
322
323 [[nodiscard]] bool is_ready() const
324 {
325 std::lock_guard<std::mutex> lock(mutex_);
326 return ready_;
327 }
328
329 void wait() const
330 {
331 std::unique_lock<std::mutex> lock(mutex_);
332 ready_cv_.wait(lock, [this] { return ready_; });
333 }
334
335 void set_value()
336 {
337 complete([] {});
338 }
339
340 void set_exception(std::exception_ptr exception)
341 {
342 complete([this, &exception]
343 {
344 exception_ = exception;
345 });
346 }
347
348 [[nodiscard]] bool attach_continuation(
349 const std::shared_ptr<ContinuationRegistration> & continuation)
350 {
351 std::lock_guard<std::mutex> lock(mutex_);
352 if (ready_)
353 return false;
354 if (auto registered = continuation_.lock();
355 registered != nullptr and registered->is_active())
357 << "scheduled_operation already has a waiting coroutine";
358 continuation_ = continuation;
359 return true;
360 }
361
362 void detach_continuation(
363 const std::shared_ptr<ContinuationRegistration> & continuation) noexcept
364 {
365 std::lock_guard<std::mutex> lock(mutex_);
366 if (auto registered = continuation_.lock(); registered == continuation)
367 continuation_.reset();
368 }
369
370 void take()
371 {
372 std::unique_lock<std::mutex> lock(mutex_);
373 ready_cv_.wait(lock, [this] { return ready_; });
374 if (consumed_)
375 throw_no_state();
376 consumed_ = true;
377 if (exception_ != nullptr)
378 std::rethrow_exception(exception_);
379 }
380 };
381 } // namespace detail
382
409 template <typename T>
411 {
412 std::shared_ptr<detail::OperationState<T>> state_;
413 std::shared_ptr<detail::ContinuationRegistration> continuation_registration_;
414
415 explicit scheduled_operation(std::shared_ptr<detail::OperationState<T>> state)
416 : state_(std::move(state)) {}
417
418 void unregister_continuation() noexcept
419 {
420 if (state_ != nullptr and continuation_registration_ != nullptr)
421 state_->detach_continuation(continuation_registration_);
422 if (continuation_registration_ != nullptr)
423 continuation_registration_->unregister();
424 continuation_registration_.reset();
425 }
426
427 std::shared_ptr<detail::OperationState<T>> consume_state()
428 {
429 unregister_continuation();
430 auto state = std::exchange(state_, nullptr);
431 if (state == nullptr)
432 detail::throw_no_state();
433 return state;
434 }
435
436 detail::OperationState<T> & state()
437 {
438 if (state_ == nullptr)
439 detail::throw_no_state();
440 return *state_;
441 }
442
443 const detail::OperationState<T> & state() const
444 {
445 if (state_ == nullptr)
446 detail::throw_no_state();
447 return *state_;
448 }
449
450 template <typename F, typename... Args>
451 friend auto schedule(ThreadPool & pool, F && f, Args && ... args)
452 -> scheduled_operation<std::invoke_result_t<F, Args...>>;
453
454 public:
459 scheduled_operation() = default;
460 scheduled_operation(const scheduled_operation &) = delete;
461 scheduled_operation & operator = (const scheduled_operation &) = delete;
462 scheduled_operation(scheduled_operation &&) noexcept = default;
463 scheduled_operation & operator = (scheduled_operation && other) noexcept
464 {
465 if (this != &other)
466 {
467 unregister_continuation();
468 state_ = std::move(other.state_);
469 continuation_registration_ = std::move(other.continuation_registration_);
470 }
471 return *this;
472 }
478 ~scheduled_operation() { unregister_continuation(); }
479
487 [[nodiscard]] bool valid() const noexcept { return state_ != nullptr; }
488
495 [[nodiscard]] bool is_ready() const
496 {
497 return state().is_ready();
498 }
499
506 void wait() const
507 {
508 state().wait();
509 }
510
521 T get()
522 {
523 return consume_state()->take();
524 }
525
532 [[nodiscard]] bool await_ready() const
533 {
534 return state().is_ready();
535 }
536
548 bool await_suspend(std::coroutine_handle<> continuation)
549 {
550 auto registration =
551 std::make_shared<detail::ContinuationRegistration>(continuation);
552 auto previous = std::move(continuation_registration_);
553 continuation_registration_ = registration;
554 try
555 {
556 if (state().attach_continuation(registration))
557 {
558 if (previous)
559 previous->unregister();
560 return true;
561 }
562 }
563 catch (...)
564 {
565 continuation_registration_ = std::move(previous);
566 throw;
567 }
568 continuation_registration_ = std::move(previous);
569 return false;
570 }
571
582 T await_resume()
583 {
584 return consume_state()->take();
585 }
586 };
587
610 template <>
612 {
613 std::shared_ptr<detail::OperationState<void>> state_;
614 std::shared_ptr<detail::ContinuationRegistration> continuation_registration_;
615
616 explicit scheduled_operation(std::shared_ptr<detail::OperationState<void>> state)
617 : state_(std::move(state)) {}
618
619 void unregister_continuation() noexcept
620 {
621 if (state_ != nullptr and continuation_registration_ != nullptr)
622 state_->detach_continuation(continuation_registration_);
623 if (continuation_registration_ != nullptr)
624 continuation_registration_->unregister();
625 continuation_registration_.reset();
626 }
627
628 std::shared_ptr<detail::OperationState<void>> consume_state()
629 {
630 unregister_continuation();
631 auto state = std::exchange(state_, nullptr);
632 if (state == nullptr)
633 detail::throw_no_state();
634 return state;
635 }
636
637 detail::OperationState<void> & state()
638 {
639 if (state_ == nullptr)
640 detail::throw_no_state();
641 return *state_;
642 }
643
644 const detail::OperationState<void> & state() const
645 {
646 if (state_ == nullptr)
647 detail::throw_no_state();
648 return *state_;
649 }
650
651 template <typename F, typename... Args>
652 friend auto schedule(ThreadPool & pool, F && f, Args && ... args)
653 -> scheduled_operation<std::invoke_result_t<F, Args...>>;
654
655 public:
660 scheduled_operation() = default;
661 scheduled_operation(const scheduled_operation &) = delete;
662 scheduled_operation & operator = (const scheduled_operation &) = delete;
663 scheduled_operation(scheduled_operation &&) noexcept = default;
664 scheduled_operation & operator = (scheduled_operation && other) noexcept
665 {
666 if (this != &other)
667 {
668 unregister_continuation();
669 state_ = std::move(other.state_);
670 continuation_registration_ = std::move(other.continuation_registration_);
671 }
672 return *this;
673 }
679 ~scheduled_operation() { unregister_continuation(); }
680
688 [[nodiscard]] bool valid() const noexcept { return state_ != nullptr; }
689
696 [[nodiscard]] bool is_ready() const
697 {
698 return state().is_ready();
699 }
700
707 void wait() const
708 {
709 state().wait();
710 }
711
721 void get()
722 {
723 consume_state()->take();
724 }
725
732 [[nodiscard]] bool await_ready() const
733 {
734 return state().is_ready();
735 }
736
748 bool await_suspend(std::coroutine_handle<> continuation)
749 {
750 auto registration =
751 std::make_shared<detail::ContinuationRegistration>(continuation);
752 auto previous = std::move(continuation_registration_);
753 continuation_registration_ = registration;
754 try
755 {
756 if (state().attach_continuation(registration))
757 {
758 if (previous)
759 previous->unregister();
760 return true;
761 }
762 }
763 catch (...)
764 {
765 continuation_registration_ = std::move(previous);
766 throw;
767 }
768 continuation_registration_ = std::move(previous);
769 return false;
770 }
771
782 void await_resume()
783 {
784 consume_state()->take();
785 }
786 };
787
809 template <typename F, typename... Args>
810 [[nodiscard]] auto schedule(ThreadPool & pool, F && f, Args && ... args)
811 -> scheduled_operation<std::invoke_result_t<F, Args...>>
812 {
813 using return_type = std::invoke_result_t<F, Args...>;
814 static_assert(not std::is_reference_v<return_type>,
815 "experimental::schedule() does not support reference returns");
816
817 auto state = std::make_shared<detail::OperationState<return_type>>();
818 auto invocable = detail::make_invocable(std::forward<F>(f),
819 std::forward<Args>(args)...);
820
821 pool.enqueue_detached([state, invocable = std::move(invocable)]() mutable
822 {
823 try
824 {
825 if constexpr (std::is_void_v<return_type>)
826 {
827 invocable();
828 state->set_value();
829 }
830 else
831 {
832 state->set_value(invocable());
833 }
834 }
835 catch (...)
836 {
837 state->set_exception(std::current_exception());
838 }
839 });
840
841 return scheduled_operation<return_type>(std::move(state));
842 }
843
859 template <typename F, typename... Args>
860 [[nodiscard]] auto schedule(F && f, Args && ... args)
861 -> scheduled_operation<std::invoke_result_t<F, Args...>>
862 {
863 return schedule(default_pool(),
864 std::forward<F>(f),
865 std::forward<Args>(args)...);
866 }
867 } // namespace experimental
868} // namespace Aleph
869
870#endif
871
872#endif
Exception handling system with formatted messages for Aleph-w.
#define ah_runtime_error()
Throws std::runtime_error unconditionally.
Definition ah-errors.H:282
#define ah_logic_error_if(C)
Throws std::logic_error if condition holds.
Definition ah-errors.H:325
void fill(size_t n)
Main namespace for Aleph-w library functions.
Definition ah-arena.H:89
ThreadPool & default_pool()
Return the default shared thread pool instance.
Divide_Conquer_DP_Result< Cost > divide_and_conquer_partition_dp(const size_t groups, const size_t n, Transition_Cost_Fn transition_cost, const Cost inf=dp_optimization_detail::default_inf< Cost >())
Optimize partition DP using divide-and-conquer optimization.
A modern, efficient thread pool for parallel task execution.