85#ifndef ALEPH_CONCURRENCY_UTILS_H
86#define ALEPH_CONCURRENCY_UTILS_H
89#include <condition_variable>
96#include <shared_mutex>
140 template <
typename T>
158 template <
typename Predicate>
160 std::unique_lock<std::mutex> & lock,
179 template <
typename U>
182 std::unique_lock<std::mutex> lock(
mutex_);
186 queue_.push_back(std::forward<U>(value));
200 template <
typename U>
203 std::unique_lock<std::mutex> lock(
mutex_);
209 queue_.push_back(std::forward<U>(value));
226 throw std::invalid_argument(
"BoundedChannel capacity must be greater than zero");
249 std::lock_guard<std::mutex> lock(
mutex_);
261 std::lock_guard<std::mutex> lock(
mutex_);
273 std::lock_guard<std::mutex> lock(
mutex_);
293 std::lock_guard<std::mutex> lock(
mutex_);
366 return send_impl(std::move(value), token);
380 template <
typename...
Args>
381 requires (
sizeof...(Args) == 0)
384 std::unique_lock<std::mutex> lock(
mutex_);
388 queue_.emplace_back(std::forward<Args>(
args)...);
410 template <
typename First,
typename...
Rest>
411 requires std::constructible_from<
T, First,
Rest...>
414 std::unique_lock<std::mutex> lock(
mutex_);
418 queue_.emplace_back(std::forward<First>(first),
419 std::forward<Rest>(
rest)...);
442 template <
typename First,
typename...
Args>
444 (
not std::constructible_from<
T, First,
Args...>) &&
445 std::constructible_from<T, Args...>
448 std::unique_lock<std::mutex> lock(
mutex_);
454 queue_.emplace_back(std::forward<Args>(
args)...);
471 std::lock_guard<std::mutex> lock(
mutex_);
490 std::lock_guard<std::mutex> lock(
mutex_);
493 queue_.push_back(std::move(value));
513 std::unique_lock<std::mutex> lock(
mutex_);
517 T value(std::move_if_noexcept(
queue_.front()));
518 out = std::move(value);
542 std::unique_lock<std::mutex> lock(
mutex_);
548 T value(std::move_if_noexcept(
queue_.front()));
549 out = std::move(value);
570 std::unique_lock<std::mutex> lock(
mutex_);
574 T value(std::move_if_noexcept(
queue_.front()));
575 std::optional<T> result;
576 if constexpr (std::is_nothrow_move_constructible_v<T>
or
577 not std::is_copy_constructible_v<T>)
578 result.emplace(std::move(value));
580 result.emplace(value);
584 if constexpr (std::is_nothrow_move_constructible_v<T>
or
585 not std::is_copy_constructible_v<T>)
588 return static_cast<const std::optional<T> &
>(result);
608 std::unique_lock<std::mutex> lock(
mutex_);
614 T value(std::move_if_noexcept(
queue_.front()));
615 std::optional<T> result;
616 if constexpr (std::is_nothrow_move_constructible_v<T>
or
617 not std::is_copy_constructible_v<T>)
618 result.emplace(std::move(value));
620 result.emplace(value);
624 if constexpr (std::is_nothrow_move_constructible_v<T>
or
625 not std::is_copy_constructible_v<T>)
628 return static_cast<const std::optional<T> &
>(result);
644 std::lock_guard<std::mutex> lock(
mutex_);
647 T value(std::move_if_noexcept(
queue_.front()));
648 out = std::move(value);
666 std::lock_guard<std::mutex> lock(
mutex_);
669 T value(std::move_if_noexcept(
queue_.front()));
670 std::optional<T> result;
671 if constexpr (std::is_nothrow_move_constructible_v<T>
or
672 not std::is_copy_constructible_v<T>)
673 result.emplace(std::move(value));
675 result.emplace(value);
678 if constexpr (std::is_nothrow_move_constructible_v<T>
or
679 not std::is_copy_constructible_v<T>)
682 return static_cast<const std::optional<T> &
>(result);
704 template <
typename T,
typename Mutex = std::mutex>
816 template <
typename...
Args>
855 template <
typename F>
858 using Result = std::invoke_result_t<F, T &>;
859 static_assert(
not std::is_reference_v<Result>,
860 "Synchronized::with_lock callback must not return a reference");
862 return std::invoke(std::forward<F>(f),
value_);
880 template <
typename F>
883 using Result = std::invoke_result_t<F, const T &>;
884 static_assert(
not std::is_reference_v<Result>,
885 "Synchronized::with_lock callback must not return a reference");
887 return std::invoke(std::forward<F>(f), std::as_const(
value_));
904 template <
typename T,
typename SharedMutex = std::shared_mutex>
913 std::shared_lock<SharedMutex>
lock_;
947 std::unique_lock<SharedMutex>
lock_;
1013 template <
typename...
Args>
1050 template <
typename F>
1053 using Result = std::invoke_result_t<F, const T &>;
1054 static_assert(
not std::is_reference_v<Result>,
1055 "RwSynchronized::with_read_lock callback must not return a reference");
1056 std::shared_lock<SharedMutex> lock(
mutex_);
1057 return std::invoke(std::forward<F>(f), std::as_const(
value_));
1071 template <
typename F>
1074 using Result = std::invoke_result_t<F, T &>;
1075 static_assert(
not std::is_reference_v<Result>,
1076 "RwSynchronized::with_write_lock callback must not return a reference");
1077 std::unique_lock<SharedMutex> lock(
mutex_);
1078 return std::invoke(std::forward<F>(f),
value_);
1100 template <
typename T>
1105 alignas(64) std::atomic<size_t>
head_{0};
1106 alignas(64) std::atomic<size_t>
tail_{0};
1115 template <
typename...
Args>
1118 const size_t tail =
tail_.load(std::memory_order_relaxed);
1120 if (
next ==
head_.load(std::memory_order_acquire))
1122 slots_[tail].emplace(std::forward<Args>(
args)...);
1123 tail_.store(
next, std::memory_order_release);
1138 throw std::invalid_argument(
"SpscQueue capacity must be greater than zero");
1161 return head_.load(std::memory_order_acquire) ==
1162 tail_.load(std::memory_order_acquire);
1172 const size_t tail =
tail_.load(std::memory_order_acquire);
1173 return advance(tail) ==
head_.load(std::memory_order_acquire);
1183 const size_t head =
head_.load(std::memory_order_acquire);
1184 const size_t tail =
tail_.load(std::memory_order_acquire);
1216 template <
typename...
Args>
1232 const size_t head =
head_.load(std::memory_order_relaxed);
1233 if (head ==
tail_.load(std::memory_order_acquire))
1235 T value(std::move_if_noexcept(*
slots_[head]));
1236 out = std::move(value);
1253 const size_t head =
head_.load(std::memory_order_relaxed);
1254 if (head ==
tail_.load(std::memory_order_acquire))
1255 return std::nullopt;
1256 T value(std::move_if_noexcept(*
slots_[head]));
1257 std::optional<T> result;
1258 if constexpr (std::is_nothrow_move_constructible_v<T>
or
1259 not std::is_copy_constructible_v<T>)
1260 result.emplace(std::move(value));
1262 result.emplace(value);
1265 if constexpr (std::is_nothrow_move_constructible_v<T>
or
1266 not std::is_copy_constructible_v<T>)
1269 return static_cast<const std::optional<T> &
>(result);
1278 template <
typename T>
1287 template <
typename T,
typename Mutex = std::mutex>
1296 template <
typename T,
typename SharedMutex = std::shared_mutex>
1304 template <
typename T>
Bounded blocking channel for producer-consumer workflows.
std::optional< T > recv(const CancellationToken &token)
Cancellation-aware receive of the next item from the channel.
bool emplace(Args &&... args)
Construct an element in-place at the back of the queue (no args).
size_t size() const
Return the number of currently queued items.
bool emplace(First &&token, Args &&... args)
Cancellation-aware in-place construction.
bool is_closed() const
Check whether the channel has been closed.
bool empty() const
Check whether the channel currently holds no queued items.
BoundedChannel(const BoundedChannel &)=delete
Deleted copy constructor.
size_t capacity() const noexcept
Return the maximum number of queued items.
bool recv(T &out)
Receive an item from the channel into out.
bool try_send(const T &value)
Attempt to send a value without blocking.
bool try_send(T &&value)
Attempt to send a value without blocking (move).
std::optional< T > try_recv()
Attempt to receive an item without blocking.
bool try_recv(T &out)
Attempt to receive an item into out without blocking.
bool send_impl(U &&value)
Common implementation for send operations.
bool send(const T &value, const CancellationToken &token)
Cancellation-aware blocking send (copy).
std::condition_variable not_empty_
void close()
Close the channel.
bool send(T &&value)
Send a value by moving it into the channel.
bool recv(T &out, const CancellationToken &token)
Cancellation-aware blocking receive into out.
std::condition_variable not_full_
BoundedChannel(size_t capacity)
Construct a channel with a fixed capacity.
bool emplace(First &&first, Rest &&... rest)
Construct an element in-place at the back of the queue.
bool send_impl(U &&value, const CancellationToken &token)
Common implementation for cancellation-aware send operations.
bool send(T &&value, const CancellationToken &token)
Cancellation-aware blocking send (move).
std::optional< T > recv()
Receive the next item from the channel.
void wait_with_cancellation(std::condition_variable &cv, std::unique_lock< std::mutex > &lock, Predicate pred, const CancellationToken &token)
Wait on a condition variable with cancellation support.
BoundedChannel & operator=(const BoundedChannel &)=delete
Deleted copy assignment operator.
bool send(const T &value)
Send a value by copying it into the channel.
Read-only cooperative cancellation token.
ConditionVariableRegistration notify_on_cancel(std::condition_variable &cv) const
Request notification of a condition variable on cancellation.
bool stop_requested() const noexcept
Return true if cancellation has been requested.
void throw_if_cancellation_requested() const
Throw operation_canceled if cancellation was requested.
RAII guard for shared (read-only) access.
std::shared_lock< SharedMutex > lock_
ReadLockedPtr(SharedMutex &mutex, const T &value)
Construct a guard and acquire a shared lock.
const T * operator->() const noexcept
Access members of the protected value.
const T & get() const noexcept
Access the protected value.
const T & operator*() const noexcept
Access the protected value.
RAII guard for exclusive (write) access.
T & operator*() noexcept
Access the protected value.
std::unique_lock< SharedMutex > lock_
WriteLockedPtr(SharedMutex &mutex, T &value)
Construct a guard and acquire an exclusive lock.
T * operator->() noexcept
Access members of the protected value.
T & get() noexcept
Access the protected value.
Read/write-lock protected shared object wrapper.
RwSynchronized()=default
Default-construct the protected value.
decltype(auto) with_read_lock(F &&f) const
Execute a callback with shared (read-only) access.
RwSynchronized(std::in_place_t, Args &&... args)
Construct the protected value in-place.
ReadLockedPtr read() const
Acquire a shared lock and return a guard.
WriteLockedPtr write()
Acquire an exclusive lock and return a guard.
decltype(auto) with_write_lock(F &&f)
Execute a callback with exclusive (write) access.
RwSynchronized(T &&value)
Construct by moving from value.
Bounded single-producer/single-consumer queue.
bool try_pop(T &out)
Attempt to pop an item from the queue into out.
std::atomic< size_t > head_
SpscQueue & operator=(const SpscQueue &)=delete
Deleted copy assignment operator.
bool empty() const noexcept
Check if the queue is empty.
SpscQueue(size_t capacity)
Construct a queue with the specified capacity.
bool emplace_impl(Args &&... args)
Internal implementation for push/emplace.
SpscQueue(const SpscQueue &)=delete
Deleted copy constructor.
size_t capacity() const noexcept
Return the queue's capacity.
bool try_push(T &&value)
Attempt to push an item (move) into the queue.
std::vector< std::optional< T > > slots_
bool full() const noexcept
Check if the queue is full.
size_t size() const noexcept
Return the number of items currently in the queue.
size_t advance(size_t idx) const noexcept
Compute the next index in the circular buffer.
bool emplace(Args &&... args)
Construct an item in-place at the tail of the queue.
bool try_push(const T &value)
Attempt to push an item (copy) into the queue.
std::optional< T > try_pop()
Attempt to pop an item from the queue.
std::atomic< size_t > tail_
RAII guard for read-only access to the synchronized value.
ConstLockedPtr(Mutex &mutex, const T &value)
Construct a guard and acquire the lock.
const T * operator->() const noexcept
Access members of the protected value.
std::unique_lock< Mutex > lock_
const T & operator*() const noexcept
Access the protected value.
const T & get() const noexcept
Access the protected value.
RAII guard for exclusive access to the synchronized value.
LockedPtr(Mutex &mutex, T &value)
Construct a guard and acquire the lock.
T * operator->() noexcept
Access members of the protected value.
T & operator*() noexcept
Access the protected value.
std::unique_lock< Mutex > lock_
T & get() noexcept
Access the protected value.
Mutex-protected shared object wrapper.
decltype(auto) with_lock(F &&f)
Execute a callback with exclusive access to the value.
Synchronized()=default
Default-construct the protected value.
ConstLockedPtr lock() const
Acquire an exclusive lock (for const access) and return a guard.
Synchronized(std::in_place_t, Args &&... args)
Construct the protected value in-place.
decltype(auto) with_lock(F &&f) const
Execute a callback with read-only access to the value.
LockedPtr lock()
Acquire an exclusive lock and return a guard.
Synchronized(T &&value)
Construct by moving from value.
Freq_Node * pred
Predecessor node in level-order traversal.
Main namespace for Aleph-w library functions.
Divide_Conquer_DP_Result< Cost > divide_and_conquer_partition_dp(const size_t groups, const size_t n, Transition_Cost_Fn transition_cost, const Cost inf=dp_optimization_detail::default_inf< Cost >())
Optimize partition DP using divide-and-conquer optimization.
std::decay_t< typename HeadC::Item_Type > T
void next()
Advance all underlying iterators (bounds-checked).
A modern, efficient thread pool for parallel task execution.