87#ifndef ALEPH_EXPERIMENTAL_ASYNC_H
88#define ALEPH_EXPERIMENTAL_ASYNC_H
93#ifndef ALEPH_ENABLE_EXPERIMENTAL_ASYNC
94# define ALEPH_ENABLE_EXPERIMENTAL_ASYNC 0
97#if ALEPH_ENABLE_EXPERIMENTAL_ASYNC && __cplusplus >= 202002L
98# if defined(__has_include)
99# if __has_include(<coroutine>)
101# define ALEPH_DETAIL_HAS_COROUTINE_HEADER 1
103# define ALEPH_DETAIL_HAS_COROUTINE_HEADER 0
106# define ALEPH_DETAIL_HAS_COROUTINE_HEADER 0
109# define ALEPH_DETAIL_HAS_COROUTINE_HEADER 0
112#if ALEPH_ENABLE_EXPERIMENTAL_ASYNC && \
113 __cplusplus >= 202002L && \
114 ALEPH_DETAIL_HAS_COROUTINE_HEADER && \
115 defined(__cpp_impl_coroutine) && \
116 __cpp_impl_coroutine >= 201902L
117# define ALEPH_HAS_EXPERIMENTAL_ASYNC 1
119# define ALEPH_HAS_EXPERIMENTAL_ASYNC 0
122#if ALEPH_HAS_EXPERIMENTAL_ASYNC
124#include <condition_variable>
130#include <system_error>
132#include <type_traits>
141 [[noreturn]]
inline void throw_no_state()
147 class ContinuationRegistration
149 mutable std::mutex mutex_;
150 std::coroutine_handle<> handle_{};
154 explicit ContinuationRegistration(std::coroutine_handle<> handle) noexcept
157 [[nodiscard]]
bool is_active()
const noexcept
159 std::lock_guard<std::mutex> lock(mutex_);
160 return active_ and
static_cast<bool>(handle_);
163 void unregister()
noexcept
165 std::lock_guard<std::mutex> lock(mutex_);
167 handle_ = std::coroutine_handle<>{};
170 void resume_if_active()
172 std::coroutine_handle<> handle;
174 std::lock_guard<std::mutex> lock(mutex_);
175 if (not active_ or not handle_)
178 handle = std::exchange(handle_, std::coroutine_handle<>{});
184 template <
typename F,
typename... Args>
185 auto make_invocable(F && f, Args && ... args)
187 return [func = std::forward<F>(f),
188 args_tuple = std::make_tuple(std::forward<Args>(args)...)]()
mutable
190 return std::apply([&func](
auto && ... a)
192 return std::invoke(std::move(func),
193 std::forward<
decltype(a)>(a)...);
194 }, std::move(args_tuple));
198 template <
typename T>
201 mutable std::mutex mutex_;
202 mutable std::condition_variable ready_cv_;
204 bool consumed_ =
false;
205 std::exception_ptr exception_;
206 std::optional<T> value_;
207 std::weak_ptr<ContinuationRegistration> continuation_;
209 template <
typename F>
210 void complete(F &&
fill)
212 std::weak_ptr<ContinuationRegistration> continuation;
214 std::lock_guard<std::mutex> lock(mutex_);
219 continuation = std::exchange(continuation_, {});
221 ready_cv_.notify_all();
222 if (
auto registration = continuation.lock(); registration !=
nullptr)
223 registration->resume_if_active();
227 OperationState() =
default;
229 [[nodiscard]]
bool is_ready()
const
231 std::lock_guard<std::mutex> lock(mutex_);
237 std::unique_lock<std::mutex> lock(mutex_);
238 ready_cv_.wait(lock, [
this] {
return ready_; });
241 template <
typename U>
242 void set_value(U && value)
244 complete([
this, &value]
246 value_.emplace(std::forward<U>(value));
250 void set_exception(std::exception_ptr exception)
252 complete([
this, &exception]
254 exception_ = exception;
258 [[nodiscard]]
bool attach_continuation(
259 const std::shared_ptr<ContinuationRegistration> & continuation)
261 std::lock_guard<std::mutex> lock(mutex_);
264 if (
auto registered = continuation_.lock();
265 registered !=
nullptr and registered->is_active())
267 <<
"scheduled_operation already has a waiting coroutine";
268 continuation_ = continuation;
272 void detach_continuation(
273 const std::shared_ptr<ContinuationRegistration> & continuation)
noexcept
275 std::lock_guard<std::mutex> lock(mutex_);
276 if (
auto registered = continuation_.lock(); registered == continuation)
277 continuation_.reset();
282 std::unique_lock<std::mutex> lock(mutex_);
283 ready_cv_.wait(lock, [
this] {
return ready_; });
287 if (exception_ !=
nullptr)
288 std::rethrow_exception(exception_);
289 return std::move(*value_);
294 class OperationState<void>
296 mutable std::mutex mutex_;
297 mutable std::condition_variable ready_cv_;
299 bool consumed_ =
false;
300 std::exception_ptr exception_;
301 std::weak_ptr<ContinuationRegistration> continuation_;
303 template <
typename F>
304 void complete(F &&
fill)
306 std::weak_ptr<ContinuationRegistration> continuation;
308 std::lock_guard<std::mutex> lock(mutex_);
313 continuation = std::exchange(continuation_, {});
315 ready_cv_.notify_all();
316 if (
auto registration = continuation.lock(); registration !=
nullptr)
317 registration->resume_if_active();
321 OperationState() =
default;
323 [[nodiscard]]
bool is_ready()
const
325 std::lock_guard<std::mutex> lock(mutex_);
331 std::unique_lock<std::mutex> lock(mutex_);
332 ready_cv_.wait(lock, [
this] {
return ready_; });
340 void set_exception(std::exception_ptr exception)
342 complete([
this, &exception]
344 exception_ = exception;
348 [[nodiscard]]
bool attach_continuation(
349 const std::shared_ptr<ContinuationRegistration> & continuation)
351 std::lock_guard<std::mutex> lock(mutex_);
354 if (
auto registered = continuation_.lock();
355 registered !=
nullptr and registered->is_active())
357 <<
"scheduled_operation already has a waiting coroutine";
358 continuation_ = continuation;
362 void detach_continuation(
363 const std::shared_ptr<ContinuationRegistration> & continuation)
noexcept
365 std::lock_guard<std::mutex> lock(mutex_);
366 if (
auto registered = continuation_.lock(); registered == continuation)
367 continuation_.reset();
372 std::unique_lock<std::mutex> lock(mutex_);
373 ready_cv_.wait(lock, [
this] {
return ready_; });
377 if (exception_ !=
nullptr)
378 std::rethrow_exception(exception_);
409 template <
typename T>
412 std::shared_ptr<detail::OperationState<T>> state_;
413 std::shared_ptr<detail::ContinuationRegistration> continuation_registration_;
415 explicit scheduled_operation(std::shared_ptr<detail::OperationState<T>> state)
416 : state_(std::move(state)) {}
418 void unregister_continuation()
noexcept
420 if (state_ !=
nullptr and continuation_registration_ !=
nullptr)
421 state_->detach_continuation(continuation_registration_);
422 if (continuation_registration_ !=
nullptr)
423 continuation_registration_->unregister();
424 continuation_registration_.reset();
427 std::shared_ptr<detail::OperationState<T>> consume_state()
429 unregister_continuation();
430 auto state = std::exchange(state_,
nullptr);
431 if (state ==
nullptr)
432 detail::throw_no_state();
436 detail::OperationState<T> & state()
438 if (state_ ==
nullptr)
439 detail::throw_no_state();
443 const detail::OperationState<T> & state()
const
445 if (state_ ==
nullptr)
446 detail::throw_no_state();
450 template <
typename F,
typename... Args>
451 friend auto schedule(ThreadPool & pool, F && f, Args && ... args)
452 -> scheduled_operation<std::invoke_result_t<F, Args...>>;
459 scheduled_operation() =
default;
460 scheduled_operation(
const scheduled_operation &) =
delete;
461 scheduled_operation & operator = (
const scheduled_operation &) =
delete;
462 scheduled_operation(scheduled_operation &&)
noexcept =
default;
463 scheduled_operation & operator = (scheduled_operation && other)
noexcept
467 unregister_continuation();
468 state_ = std::move(other.state_);
469 continuation_registration_ = std::move(other.continuation_registration_);
478 ~scheduled_operation() { unregister_continuation(); }
487 [[nodiscard]]
bool valid()
const noexcept {
return state_ !=
nullptr; }
495 [[nodiscard]]
bool is_ready()
const
497 return state().is_ready();
523 return consume_state()->take();
532 [[nodiscard]]
bool await_ready()
const
534 return state().is_ready();
548 bool await_suspend(std::coroutine_handle<> continuation)
551 std::make_shared<detail::ContinuationRegistration>(continuation);
552 auto previous = std::move(continuation_registration_);
553 continuation_registration_ = registration;
556 if (state().attach_continuation(registration))
559 previous->unregister();
565 continuation_registration_ = std::move(previous);
568 continuation_registration_ = std::move(previous);
584 return consume_state()->take();
613 std::shared_ptr<detail::OperationState<void>> state_;
614 std::shared_ptr<detail::ContinuationRegistration> continuation_registration_;
616 explicit scheduled_operation(std::shared_ptr<detail::OperationState<void>> state)
617 : state_(std::move(state)) {}
619 void unregister_continuation()
noexcept
621 if (state_ !=
nullptr and continuation_registration_ !=
nullptr)
622 state_->detach_continuation(continuation_registration_);
623 if (continuation_registration_ !=
nullptr)
624 continuation_registration_->unregister();
625 continuation_registration_.reset();
628 std::shared_ptr<detail::OperationState<void>> consume_state()
630 unregister_continuation();
631 auto state = std::exchange(state_,
nullptr);
632 if (state ==
nullptr)
633 detail::throw_no_state();
637 detail::OperationState<void> & state()
639 if (state_ ==
nullptr)
640 detail::throw_no_state();
644 const detail::OperationState<void> & state()
const
646 if (state_ ==
nullptr)
647 detail::throw_no_state();
651 template <
typename F,
typename... Args>
652 friend auto schedule(ThreadPool & pool, F && f, Args && ... args)
653 -> scheduled_operation<std::invoke_result_t<F, Args...>>;
660 scheduled_operation() =
default;
661 scheduled_operation(
const scheduled_operation &) =
delete;
662 scheduled_operation & operator = (
const scheduled_operation &) =
delete;
663 scheduled_operation(scheduled_operation &&)
noexcept =
default;
664 scheduled_operation & operator = (scheduled_operation && other)
noexcept
668 unregister_continuation();
669 state_ = std::move(other.state_);
670 continuation_registration_ = std::move(other.continuation_registration_);
679 ~scheduled_operation() { unregister_continuation(); }
688 [[nodiscard]]
bool valid()
const noexcept {
return state_ !=
nullptr; }
696 [[nodiscard]]
bool is_ready()
const
698 return state().is_ready();
723 consume_state()->take();
732 [[nodiscard]]
bool await_ready()
const
734 return state().is_ready();
748 bool await_suspend(std::coroutine_handle<> continuation)
751 std::make_shared<detail::ContinuationRegistration>(continuation);
752 auto previous = std::move(continuation_registration_);
753 continuation_registration_ = registration;
756 if (state().attach_continuation(registration))
759 previous->unregister();
765 continuation_registration_ = std::move(previous);
768 continuation_registration_ = std::move(previous);
784 consume_state()->take();
809 template <
typename F,
typename...
Args>
814 static_assert(
not std::is_reference_v<return_type>,
815 "experimental::schedule() does not support reference returns");
817 auto state = std::make_shared<detail::OperationState<return_type>>();
818 auto invocable = detail::make_invocable(std::forward<F>(f),
819 std::forward<Args>(
args)...);
825 if constexpr (std::is_void_v<return_type>)
837 state->set_exception(std::current_exception());
859 template <
typename F,
typename...
Args>
865 std::forward<Args>(
args)...);
Exception handling system with formatted messages for Aleph-w.
#define ah_runtime_error()
Throws std::runtime_error unconditionally.
#define ah_logic_error_if(C)
Throws std::logic_error if condition holds.
Main namespace for Aleph-w library functions.
ThreadPool & default_pool()
Return the default shared thread pool instance.
Divide_Conquer_DP_Result< Cost > divide_and_conquer_partition_dp(const size_t groups, const size_t n, Transition_Cost_Fn transition_cost, const Cost inf=dp_optimization_detail::default_inf< Cost >())
Optimize partition DP using divide-and-conquer optimization.
A modern, efficient thread pool for parallel task execution.