|
Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
|
Bounded blocking channel for producer-consumer workflows. More...
#include <concurrency_utils.H>
Public Member Functions | |
| BoundedChannel (size_t capacity) | |
| Construct a channel with a fixed capacity. | |
| BoundedChannel (const BoundedChannel &)=delete | |
| Deleted copy constructor. | |
| BoundedChannel & | operator= (const BoundedChannel &)=delete |
| Deleted copy assignment operator. | |
| size_t | capacity () const noexcept |
| Return the maximum number of queued items. | |
| bool | is_closed () const |
| Check whether the channel has been closed. | |
| size_t | size () const |
| Return the number of currently queued items. | |
| bool | empty () const |
| Check whether the channel currently holds no queued items. | |
| void | close () |
| Close the channel. | |
| bool | send (const T &value) |
| Send a value by copying it into the channel. | |
| bool | send (T &&value) |
| Send a value by moving it into the channel. | |
| bool | send (const T &value, const CancellationToken &token) |
| Cancellation-aware blocking send (copy). | |
| bool | send (T &&value, const CancellationToken &token) |
| Cancellation-aware blocking send (move). | |
| template<typename... Args> requires (sizeof...(Args) == 0) | |
| bool | emplace (Args &&... args) |
| Construct an element in-place at the back of the queue (no args). | |
| template<typename First , typename... Rest> requires std::constructible_from<T, First, Rest...> | |
| bool | emplace (First &&first, Rest &&... rest) |
| Construct an element in-place at the back of the queue. | |
| template<typename First , typename... Args> requires std::same_as<std::remove_cvref_t<First>, CancellationToken> && (not std::constructible_from<T, First, Args...>) && std::constructible_from<T, Args...> | |
| bool | emplace (First &&token, Args &&... args) |
| Cancellation-aware in-place construction. | |
| bool | try_send (const T &value) |
| Attempt to send a value without blocking. | |
| bool | try_send (T &&value) |
| Attempt to send a value without blocking (move). | |
| bool | recv (T &out) |
Receive an item from the channel into out. | |
| bool | recv (T &out, const CancellationToken &token) |
Cancellation-aware blocking receive into out. | |
| std::optional< T > | recv () |
| Receive the next item from the channel. | |
| std::optional< T > | recv (const CancellationToken &token) |
| Cancellation-aware receive of the next item from the channel. | |
| bool | try_recv (T &out) |
Attempt to receive an item into out without blocking. | |
| std::optional< T > | try_recv () |
| Attempt to receive an item without blocking. | |
Private Member Functions | |
| template<typename Predicate > | |
| void | wait_with_cancellation (std::condition_variable &cv, std::unique_lock< std::mutex > &lock, Predicate pred, const CancellationToken &token) |
| Wait on a condition variable with cancellation support. | |
| template<typename U > | |
| bool | send_impl (U &&value) |
| Common implementation for send operations. | |
| template<typename U > | |
| bool | send_impl (U &&value, const CancellationToken &token) |
| Common implementation for cancellation-aware send operations. | |
Private Attributes | |
| std::mutex | mutex_ |
| std::condition_variable | not_empty_ |
| std::condition_variable | not_full_ |
| std::deque< T > | queue_ |
| size_t | capacity_ |
| bool | closed_ = false |
Bounded blocking channel for producer-consumer workflows.
BoundedChannel provides a fixed-capacity FIFO queue for passing values between any number of producer threads and any number of consumer threads. All public member functions are thread-safe. Distinct senders and receivers may operate concurrently, but individual calls are not reentrant on the same object because they synchronize through an internal mutex and condition variables.
Blocking operations wait for space or data:
send() / emplace() block while the channel is full.recv() blocks while the channel is empty.try_* operations never block.Close semantics:
close() wakes all blocked senders and receivers.false.false or std::nullopt.Cancellation-aware overloads accept a Aleph::CancellationToken. They preserve the same close semantics and throw Aleph::operation_canceled if cancellation is requested while the caller is blocked waiting for progress.
Exception safety:
noexcept do not throw; all others may propagate exceptions from user-supplied element operations or cancellation.| T | The type of elements stored in the channel. |
Definition at line 141 of file concurrency_utils.H.
|
inlineexplicit |
Construct a channel with a fixed capacity.
| capacity | Maximum number of items the channel can hold. |
| std::invalid_argument | if capacity is zero. |
Definition at line 222 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::capacity_.
|
delete |
Deleted copy constructor.
|
inlinenoexcept |
Return the maximum number of queued items.
| Nothing. |
noexcept. Definition at line 239 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::capacity_.
|
inline |
Close the channel.
Wakes up all blocked senders and receivers. Blocked senders will subsequently return false. Blocked receivers will drain any remaining items before returning false or std::nullopt.
Calling close() on an already closed channel is a no-op.
| Nothing | unless locking the internal mutex throws. |
Definition at line 291 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::closed_, Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, and Aleph::BoundedChannel< T >::not_full_.
|
inline |
Construct an element in-place at the back of the queue (no args).
Blocks if full. Returns false if closed.
| args | Constructor arguments for T (empty by construction). |
true if the element was queued, false if the channel was closed before space became available. | Any | exception thrown while constructing T in-place. |
false. Definition at line 382 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::capacity_, Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, and Aleph::BoundedChannel< T >::queue_.
|
inline |
Construct an element in-place at the back of the queue.
Blocks if full. Returns false if closed.
| First | First argument type. |
| Rest | Remaining argument types. |
| first | First constructor argument. |
| rest | Remaining constructor arguments. |
true if the element was queued, false if the channel was closed before space became available. | Any | exception thrown while constructing T in-place. |
T itself is constructible from CancellationToken, this overload remains eligible and is preferred for payload construction. Definition at line 412 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::capacity_, Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, and Aleph::BoundedChannel< T >::queue_.
|
inline |
Cancellation-aware in-place construction.
Blocks like emplace() but monitors token for cancellation.
| First | Type of the first argument (must be CancellationToken). |
| Args | Types of constructor arguments. |
| token | Token for cooperative cancellation. |
| args | Constructor arguments. |
true if the element was queued, false if the channel was closed before space became available. | Aleph::operation_canceled | if cancellation is requested while the call is blocked waiting for space. |
| Any | exception thrown while constructing T in-place. |
Definition at line 446 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::capacity_, Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, Aleph::BoundedChannel< T >::queue_, and Aleph::BoundedChannel< T >::wait_with_cancellation().
|
inline |
Check whether the channel currently holds no queued items.
true if there are no queued items, otherwise false. | Nothing | unless locking the internal mutex throws. |
Definition at line 271 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::mutex_, and Aleph::BoundedChannel< T >::queue_.
|
inline |
Check whether the channel has been closed.
true if close() has been called, otherwise false. | Nothing | unless locking the internal mutex throws. |
Definition at line 247 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::closed_, and Aleph::BoundedChannel< T >::mutex_.
|
delete |
Deleted copy assignment operator.
|
inline |
Receive the next item from the channel.
Blocks until an item becomes available or the channel closes and drains.
std::optional, or std::nullopt if the channel is closed and empty. | Any | exception thrown while copying or moving the queued value into the returned optional. |
Definition at line 568 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, and Aleph::BoundedChannel< T >::queue_.
|
inline |
Cancellation-aware receive of the next item from the channel.
Blocks like recv() but monitors token for cancellation.
| token | Token for cooperative cancellation. |
std::optional, or std::nullopt if the channel is closed and empty. | Aleph::operation_canceled | if cancellation is requested while the call is blocked waiting for data. |
| Any | exception thrown while copying or moving the queued value into the returned optional. |
Definition at line 606 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, Aleph::BoundedChannel< T >::queue_, and Aleph::BoundedChannel< T >::wait_with_cancellation().
Receive an item from the channel into out.
Blocks the calling thread if the channel is empty until an item is available or the channel is closed and drained.
| out | Reference where the received item will be moved. |
true if an item was received, false if the channel is closed and empty. | Any | exception thrown while copying or moving the queued value into a temporary or assigning into out. |
Definition at line 511 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, and Aleph::BoundedChannel< T >::queue_.
|
inline |
Cancellation-aware blocking receive into out.
Blocks like recv(T&) but monitors token for cancellation.
| out | Reference where the received item will be moved. |
| token | Token for cooperative cancellation. |
true if an item was received, false if the channel is closed and empty. | Aleph::operation_canceled | if cancellation is requested while the call is blocked waiting for data. |
| Any | exception thrown while copying or moving the queued value into a temporary or assigning into out. |
Definition at line 540 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, Aleph::BoundedChannel< T >::queue_, and Aleph::BoundedChannel< T >::wait_with_cancellation().
Send a value by copying it into the channel.
Blocks the calling thread if the channel is full until space becomes available or the channel is closed.
| value | The item to send. |
true if the value was queued, false if the channel was already closed before space became available. | Any | exception thrown while copying T into the queue. |
false. Definition at line 313 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::send_impl().
|
inline |
Cancellation-aware blocking send (copy).
Blocks like send(const T&) but monitors token for cancellation.
| value | The item to send. |
| token | Token for cooperative cancellation. |
true if the value was queued, false if the channel was closed before space became available. | Aleph::operation_canceled | if cancellation is requested while the call is blocked waiting for space. |
| Any | exception thrown while copying T into the queue. |
false instead of throwing. Definition at line 344 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::send_impl().
Send a value by moving it into the channel.
Blocks the calling thread if the channel is full until space becomes available or the channel is closed.
| value | The item to send. |
true if the value was queued, false if the channel was already closed before space became available. | Any | exception thrown while moving T into the queue. |
false. Definition at line 327 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::send_impl().
|
inline |
Cancellation-aware blocking send (move).
Blocks like send(T&&) but monitors token for cancellation.
| value | The item to send. |
| token | Token for cooperative cancellation. |
true if the value was queued, false if the channel was closed before space became available. | Aleph::operation_canceled | if cancellation is requested while the call is blocked waiting for space. |
| Any | exception thrown while moving T into the queue. |
false instead of throwing. Definition at line 364 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::send_impl().
|
inlineprivate |
Common implementation for send operations.
| U | Type of the value to send. |
| value | Value to push into the queue. |
Definition at line 180 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::capacity_, Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, and Aleph::BoundedChannel< T >::queue_.
Referenced by Aleph::BoundedChannel< T >::send(), Aleph::BoundedChannel< T >::send(), Aleph::BoundedChannel< T >::send(), and Aleph::BoundedChannel< T >::send().
|
inlineprivate |
Common implementation for cancellation-aware send operations.
| U | Type of the value to send. |
| value | Value to push into the queue. |
| token | Token for cooperative cancellation. |
| Aleph::operation_canceled | if canceled while blocked. |
Definition at line 201 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::capacity_, Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, Aleph::BoundedChannel< T >::queue_, and Aleph::BoundedChannel< T >::wait_with_cancellation().
|
inline |
Return the number of currently queued items.
| Nothing | unless locking the internal mutex throws. |
Definition at line 259 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::mutex_, and Aleph::BoundedChannel< T >::queue_.
|
inline |
Attempt to receive an item without blocking.
std::optional, or std::nullopt if the channel is empty. | Any | exception thrown while copying or moving the queued value into the returned optional. |
Definition at line 664 of file concurrency_utils.H.
References Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_full_, and Aleph::BoundedChannel< T >::queue_.
Attempt to receive an item into out without blocking.
| out | Reference where the received item will be moved. |
true if an item was received immediately, false if the channel is empty. | Any | exception thrown while copying or moving the queued value into a temporary or assigning into out. |
Definition at line 642 of file concurrency_utils.H.
References Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_full_, and Aleph::BoundedChannel< T >::queue_.
Attempt to send a value without blocking.
| value | Value to send. |
true if the value was queued immediately, false if the channel is full or closed. | Any | exception thrown while copying T into the queue. |
Definition at line 469 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::capacity_, Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, and Aleph::BoundedChannel< T >::queue_.
Attempt to send a value without blocking (move).
| value | Value to send. |
true if the value was queued immediately, false if the channel is full or closed. | Any | exception thrown while moving T into the queue. |
Definition at line 488 of file concurrency_utils.H.
References Aleph::BoundedChannel< T >::capacity_, Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, and Aleph::BoundedChannel< T >::queue_.
|
inlineprivate |
Wait on a condition variable with cancellation support.
| cv | Condition variable to wait on. |
| lock | Locked mutex protecting the condition. |
| pred | Predicate that must be true to stop waiting. |
| token | Token to monitor for cancellation. |
| Aleph::operation_canceled | if cancellation is requested. |
Definition at line 159 of file concurrency_utils.H.
References Aleph::divide_and_conquer_partition_dp(), Aleph::CancellationToken::notify_on_cancel(), pred, Aleph::CancellationToken::stop_requested(), and Aleph::CancellationToken::throw_if_cancellation_requested().
Referenced by Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), and Aleph::BoundedChannel< T >::send_impl().
|
private |
Definition at line 147 of file concurrency_utils.H.
Referenced by Aleph::BoundedChannel< T >::BoundedChannel(), Aleph::BoundedChannel< T >::capacity(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::send_impl(), Aleph::BoundedChannel< T >::send_impl(), Aleph::BoundedChannel< T >::try_send(), and Aleph::BoundedChannel< T >::try_send().
Definition at line 148 of file concurrency_utils.H.
Referenced by Aleph::BoundedChannel< T >::close(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::is_closed(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::send_impl(), Aleph::BoundedChannel< T >::send_impl(), Aleph::BoundedChannel< T >::try_send(), and Aleph::BoundedChannel< T >::try_send().
|
mutableprivate |
Definition at line 143 of file concurrency_utils.H.
Referenced by Aleph::BoundedChannel< T >::close(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::empty(), Aleph::BoundedChannel< T >::is_closed(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::send_impl(), Aleph::BoundedChannel< T >::send_impl(), Aleph::BoundedChannel< T >::size(), Aleph::BoundedChannel< T >::try_recv(), Aleph::BoundedChannel< T >::try_recv(), Aleph::BoundedChannel< T >::try_send(), and Aleph::BoundedChannel< T >::try_send().
|
private |
Definition at line 144 of file concurrency_utils.H.
Referenced by Aleph::BoundedChannel< T >::close(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::send_impl(), Aleph::BoundedChannel< T >::send_impl(), Aleph::BoundedChannel< T >::try_send(), and Aleph::BoundedChannel< T >::try_send().
|
private |
Definition at line 145 of file concurrency_utils.H.
Referenced by Aleph::BoundedChannel< T >::close(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::send_impl(), Aleph::BoundedChannel< T >::send_impl(), Aleph::BoundedChannel< T >::try_recv(), and Aleph::BoundedChannel< T >::try_recv().
|
private |
Definition at line 146 of file concurrency_utils.H.
Referenced by Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::empty(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::send_impl(), Aleph::BoundedChannel< T >::send_impl(), Aleph::BoundedChannel< T >::size(), Aleph::BoundedChannel< T >::try_recv(), Aleph::BoundedChannel< T >::try_recv(), Aleph::BoundedChannel< T >::try_send(), and Aleph::BoundedChannel< T >::try_send().