Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
Aleph::BoundedChannel< T > Class Template Reference

Bounded blocking channel for producer-consumer workflows. More...

#include <concurrency_utils.H>

Collaboration diagram for Aleph::BoundedChannel< T >:
[legend]

Public Member Functions

 BoundedChannel (size_t capacity)
 Construct a channel with a fixed capacity.
 
 BoundedChannel (const BoundedChannel &)=delete
 Deleted copy constructor.
 
BoundedChanneloperator= (const BoundedChannel &)=delete
 Deleted copy assignment operator.
 
size_t capacity () const noexcept
 Return the maximum number of queued items.
 
bool is_closed () const
 Check whether the channel has been closed.
 
size_t size () const
 Return the number of currently queued items.
 
bool empty () const
 Check whether the channel currently holds no queued items.
 
void close ()
 Close the channel.
 
bool send (const T &value)
 Send a value by copying it into the channel.
 
bool send (T &&value)
 Send a value by moving it into the channel.
 
bool send (const T &value, const CancellationToken &token)
 Cancellation-aware blocking send (copy).
 
bool send (T &&value, const CancellationToken &token)
 Cancellation-aware blocking send (move).
 
template<typename... Args>
requires (sizeof...(Args) == 0)
bool emplace (Args &&... args)
 Construct an element in-place at the back of the queue (no args).
 
template<typename First , typename... Rest>
requires std::constructible_from<T, First, Rest...>
bool emplace (First &&first, Rest &&... rest)
 Construct an element in-place at the back of the queue.
 
template<typename First , typename... Args>
requires std::same_as<std::remove_cvref_t<First>, CancellationToken> && (not std::constructible_from<T, First, Args...>) && std::constructible_from<T, Args...>
bool emplace (First &&token, Args &&... args)
 Cancellation-aware in-place construction.
 
bool try_send (const T &value)
 Attempt to send a value without blocking.
 
bool try_send (T &&value)
 Attempt to send a value without blocking (move).
 
bool recv (T &out)
 Receive an item from the channel into out.
 
bool recv (T &out, const CancellationToken &token)
 Cancellation-aware blocking receive into out.
 
std::optional< Trecv ()
 Receive the next item from the channel.
 
std::optional< Trecv (const CancellationToken &token)
 Cancellation-aware receive of the next item from the channel.
 
bool try_recv (T &out)
 Attempt to receive an item into out without blocking.
 
std::optional< Ttry_recv ()
 Attempt to receive an item without blocking.
 

Private Member Functions

template<typename Predicate >
void wait_with_cancellation (std::condition_variable &cv, std::unique_lock< std::mutex > &lock, Predicate pred, const CancellationToken &token)
 Wait on a condition variable with cancellation support.
 
template<typename U >
bool send_impl (U &&value)
 Common implementation for send operations.
 
template<typename U >
bool send_impl (U &&value, const CancellationToken &token)
 Common implementation for cancellation-aware send operations.
 

Private Attributes

std::mutex mutex_
 
std::condition_variable not_empty_
 
std::condition_variable not_full_
 
std::deque< Tqueue_
 
size_t capacity_
 
bool closed_ = false
 

Detailed Description

template<typename T>
class Aleph::BoundedChannel< T >

Bounded blocking channel for producer-consumer workflows.

BoundedChannel provides a fixed-capacity FIFO queue for passing values between any number of producer threads and any number of consumer threads. All public member functions are thread-safe. Distinct senders and receivers may operate concurrently, but individual calls are not reentrant on the same object because they synchronize through an internal mutex and condition variables.

Blocking operations wait for space or data:

  • send() / emplace() block while the channel is full.
  • recv() blocks while the channel is empty.
  • try_* operations never block.

Close semantics:

  • close() wakes all blocked senders and receivers.
  • Once closed, future sends fail and return false.
  • Receivers continue draining already queued items.
  • After the queue is drained, receives report end-of-stream by returning false or std::nullopt.

Cancellation-aware overloads accept a Aleph::CancellationToken. They preserve the same close semantics and throw Aleph::operation_canceled if cancellation is requested while the caller is blocked waiting for progress.

Exception safety:

  • If element construction or assignment throws, the channel remains in a valid state and the element stays queued until extraction succeeds.
  • Methods marked noexcept do not throw; all others may propagate exceptions from user-supplied element operations or cancellation.
Template Parameters
TThe type of elements stored in the channel.

Definition at line 141 of file concurrency_utils.H.

Constructor & Destructor Documentation

◆ BoundedChannel() [1/2]

template<typename T >
Aleph::BoundedChannel< T >::BoundedChannel ( size_t  capacity)
inlineexplicit

Construct a channel with a fixed capacity.

Parameters
capacityMaximum number of items the channel can hold.
Exceptions
std::invalid_argumentif capacity is zero.
Note
This constructor does not block and is not thread-safe against concurrent access to the object being constructed.

Definition at line 222 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::capacity_.

◆ BoundedChannel() [2/2]

template<typename T >
Aleph::BoundedChannel< T >::BoundedChannel ( const BoundedChannel< T > &  )
delete

Deleted copy constructor.

Member Function Documentation

◆ capacity()

template<typename T >
size_t Aleph::BoundedChannel< T >::capacity ( ) const
inlinenoexcept

Return the maximum number of queued items.

Returns
The channel's capacity.
Exceptions
Nothing.
Note
Thread-safe, non-blocking, reentrant, and noexcept.

Definition at line 239 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::capacity_.

◆ close()

template<typename T >
void Aleph::BoundedChannel< T >::close ( )
inline

Close the channel.

Wakes up all blocked senders and receivers. Blocked senders will subsequently return false. Blocked receivers will drain any remaining items before returning false or std::nullopt.

Calling close() on an already closed channel is a no-op.

Note
Returns: Nothing.
Exceptions
Nothingunless locking the internal mutex throws.
Note
Thread-safe, non-reentrant on the same object, and non-blocking apart from internal mutex acquisition. After closure, no send or emplace operation can succeed.

Definition at line 291 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::closed_, Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, and Aleph::BoundedChannel< T >::not_full_.

◆ emplace() [1/3]

template<typename T >
template<typename... Args>
requires (sizeof...(Args) == 0)
bool Aleph::BoundedChannel< T >::emplace ( Args &&...  args)
inline

Construct an element in-place at the back of the queue (no args).

Blocks if full. Returns false if closed.

Parameters
argsConstructor arguments for T (empty by construction).
Returns
true if the element was queued, false if the channel was closed before space became available.
Exceptions
Anyexception thrown while constructing T in-place.
Note
Thread-safe, not reentrant on the same object, and may block. On close, blocked callers wake and return false.

Definition at line 382 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::capacity_, Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, and Aleph::BoundedChannel< T >::queue_.

◆ emplace() [2/3]

template<typename T >
template<typename First , typename... Rest>
requires std::constructible_from<T, First, Rest...>
bool Aleph::BoundedChannel< T >::emplace ( First &&  first,
Rest &&...  rest 
)
inline

Construct an element in-place at the back of the queue.

Blocks if full. Returns false if closed.

Template Parameters
FirstFirst argument type.
RestRemaining argument types.
Parameters
firstFirst constructor argument.
restRemaining constructor arguments.
Returns
true if the element was queued, false if the channel was closed before space became available.
Exceptions
Anyexception thrown while constructing T in-place.
Note
Thread-safe, not reentrant on the same object, and may block. If T itself is constructible from CancellationToken, this overload remains eligible and is preferred for payload construction.

Definition at line 412 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::capacity_, Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, and Aleph::BoundedChannel< T >::queue_.

◆ emplace() [3/3]

template<typename T >
template<typename First , typename... Args>
requires std::same_as<std::remove_cvref_t<First>, CancellationToken> && (not std::constructible_from<T, First, Args...>) && std::constructible_from<T, Args...>
bool Aleph::BoundedChannel< T >::emplace ( First &&  token,
Args &&...  args 
)
inline

Cancellation-aware in-place construction.

Blocks like emplace() but monitors token for cancellation.

Template Parameters
FirstType of the first argument (must be CancellationToken).
ArgsTypes of constructor arguments.
Parameters
tokenToken for cooperative cancellation.
argsConstructor arguments.
Returns
true if the element was queued, false if the channel was closed before space became available.
Exceptions
Aleph::operation_canceledif cancellation is requested while the call is blocked waiting for space.
Anyexception thrown while constructing T in-place.
Note
Thread-safe, not reentrant on the same object, and may block. This overload only participates when the first argument is used as a control token rather than a payload constructor argument.

Definition at line 446 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::capacity_, Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, Aleph::BoundedChannel< T >::queue_, and Aleph::BoundedChannel< T >::wait_with_cancellation().

◆ empty()

template<typename T >
bool Aleph::BoundedChannel< T >::empty ( ) const
inline

Check whether the channel currently holds no queued items.

Returns
true if there are no queued items, otherwise false.
Exceptions
Nothingunless locking the internal mutex throws.
Note
Thread-safe, non-reentrant on the same object, and does not block except to acquire the internal mutex.

Definition at line 271 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::mutex_, and Aleph::BoundedChannel< T >::queue_.

◆ is_closed()

template<typename T >
bool Aleph::BoundedChannel< T >::is_closed ( ) const
inline

Check whether the channel has been closed.

Returns
true if close() has been called, otherwise false.
Exceptions
Nothingunless locking the internal mutex throws.
Note
Thread-safe, non-reentrant on the same object, and does not block except to acquire the internal mutex.

Definition at line 247 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::closed_, and Aleph::BoundedChannel< T >::mutex_.

◆ operator=()

Deleted copy assignment operator.

◆ recv() [1/4]

template<typename T >
std::optional< T > Aleph::BoundedChannel< T >::recv ( )
inline

Receive the next item from the channel.

Blocks until an item becomes available or the channel closes and drains.

Returns
The next queued item wrapped in std::optional, or std::nullopt if the channel is closed and empty.
Exceptions
Anyexception thrown while copying or moving the queued value into the returned optional.
Note
Thread-safe, not reentrant on the same object, and may block. If optional construction throws, the queued element remains present.

Definition at line 568 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, and Aleph::BoundedChannel< T >::queue_.

◆ recv() [2/4]

template<typename T >
std::optional< T > Aleph::BoundedChannel< T >::recv ( const CancellationToken token)
inline

Cancellation-aware receive of the next item from the channel.

Blocks like recv() but monitors token for cancellation.

Parameters
tokenToken for cooperative cancellation.
Returns
The next queued item wrapped in std::optional, or std::nullopt if the channel is closed and empty.
Exceptions
Aleph::operation_canceledif cancellation is requested while the call is blocked waiting for data.
Anyexception thrown while copying or moving the queued value into the returned optional.
Note
Thread-safe, not reentrant on the same object, and may block. If optional construction throws, the queued element remains present.

Definition at line 606 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, Aleph::BoundedChannel< T >::queue_, and Aleph::BoundedChannel< T >::wait_with_cancellation().

◆ recv() [3/4]

template<typename T >
bool Aleph::BoundedChannel< T >::recv ( T out)
inline

Receive an item from the channel into out.

Blocks the calling thread if the channel is empty until an item is available or the channel is closed and drained.

Parameters
outReference where the received item will be moved.
Returns
true if an item was received, false if the channel is closed and empty.
Exceptions
Anyexception thrown while copying or moving the queued value into a temporary or assigning into out.
Note
Thread-safe, not reentrant on the same object, and may block. If extraction throws, the queued element remains present.

Definition at line 511 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, and Aleph::BoundedChannel< T >::queue_.

◆ recv() [4/4]

template<typename T >
bool Aleph::BoundedChannel< T >::recv ( T out,
const CancellationToken token 
)
inline

Cancellation-aware blocking receive into out.

Blocks like recv(T&) but monitors token for cancellation.

Parameters
outReference where the received item will be moved.
tokenToken for cooperative cancellation.
Returns
true if an item was received, false if the channel is closed and empty.
Exceptions
Aleph::operation_canceledif cancellation is requested while the call is blocked waiting for data.
Anyexception thrown while copying or moving the queued value into a temporary or assigning into out.
Note
Thread-safe, not reentrant on the same object, and may block. If extraction throws, the queued element remains present.

Definition at line 540 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, Aleph::BoundedChannel< T >::queue_, and Aleph::BoundedChannel< T >::wait_with_cancellation().

◆ send() [1/4]

template<typename T >
bool Aleph::BoundedChannel< T >::send ( const T value)
inline

Send a value by copying it into the channel.

Blocks the calling thread if the channel is full until space becomes available or the channel is closed.

Parameters
valueThe item to send.
Returns
true if the value was queued, false if the channel was already closed before space became available.
Exceptions
Anyexception thrown while copying T into the queue.
Note
Thread-safe, not reentrant on the same object, and may block. On close, blocked senders wake and return false.

Definition at line 313 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::send_impl().

◆ send() [2/4]

template<typename T >
bool Aleph::BoundedChannel< T >::send ( const T value,
const CancellationToken token 
)
inline

Cancellation-aware blocking send (copy).

Blocks like send(const T&) but monitors token for cancellation.

Parameters
valueThe item to send.
tokenToken for cooperative cancellation.
Returns
true if the value was queued, false if the channel was closed before space became available.
Exceptions
Aleph::operation_canceledif cancellation is requested while the call is blocked waiting for space.
Anyexception thrown while copying T into the queue.
Note
Thread-safe, not reentrant on the same object, and may block. If the channel closes first, blocked senders return false instead of throwing.

Definition at line 344 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::send_impl().

◆ send() [3/4]

template<typename T >
bool Aleph::BoundedChannel< T >::send ( T &&  value)
inline

Send a value by moving it into the channel.

Blocks the calling thread if the channel is full until space becomes available or the channel is closed.

Parameters
valueThe item to send.
Returns
true if the value was queued, false if the channel was already closed before space became available.
Exceptions
Anyexception thrown while moving T into the queue.
Note
Thread-safe, not reentrant on the same object, and may block. On close, blocked senders wake and return false.

Definition at line 327 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::send_impl().

◆ send() [4/4]

template<typename T >
bool Aleph::BoundedChannel< T >::send ( T &&  value,
const CancellationToken token 
)
inline

Cancellation-aware blocking send (move).

Blocks like send(T&&) but monitors token for cancellation.

Parameters
valueThe item to send.
tokenToken for cooperative cancellation.
Returns
true if the value was queued, false if the channel was closed before space became available.
Exceptions
Aleph::operation_canceledif cancellation is requested while the call is blocked waiting for space.
Anyexception thrown while moving T into the queue.
Note
Thread-safe, not reentrant on the same object, and may block. If the channel closes first, blocked senders return false instead of throwing.

Definition at line 364 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::send_impl().

◆ send_impl() [1/2]

template<typename T >
template<typename U >
bool Aleph::BoundedChannel< T >::send_impl ( U &&  value)
inlineprivate

Common implementation for send operations.

Template Parameters
UType of the value to send.
Parameters
valueValue to push into the queue.
Returns
true if sent, false if the channel was closed.

Definition at line 180 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::capacity_, Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, and Aleph::BoundedChannel< T >::queue_.

Referenced by Aleph::BoundedChannel< T >::send(), Aleph::BoundedChannel< T >::send(), Aleph::BoundedChannel< T >::send(), and Aleph::BoundedChannel< T >::send().

◆ send_impl() [2/2]

template<typename T >
template<typename U >
bool Aleph::BoundedChannel< T >::send_impl ( U &&  value,
const CancellationToken token 
)
inlineprivate

Common implementation for cancellation-aware send operations.

Template Parameters
UType of the value to send.
Parameters
valueValue to push into the queue.
tokenToken for cooperative cancellation.
Returns
true if sent, false if the channel was closed.
Exceptions
Aleph::operation_canceledif canceled while blocked.

Definition at line 201 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::capacity_, Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, Aleph::BoundedChannel< T >::not_full_, Aleph::BoundedChannel< T >::queue_, and Aleph::BoundedChannel< T >::wait_with_cancellation().

◆ size()

template<typename T >
size_t Aleph::BoundedChannel< T >::size ( ) const
inline

Return the number of currently queued items.

Returns
Current size of the internal queue.
Exceptions
Nothingunless locking the internal mutex throws.
Note
Thread-safe, non-reentrant on the same object, and does not block except to acquire the internal mutex.

Definition at line 259 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::mutex_, and Aleph::BoundedChannel< T >::queue_.

◆ try_recv() [1/2]

template<typename T >
std::optional< T > Aleph::BoundedChannel< T >::try_recv ( )
inline

Attempt to receive an item without blocking.

Returns
The next queued item wrapped in std::optional, or std::nullopt if the channel is empty.
Exceptions
Anyexception thrown while copying or moving the queued value into the returned optional.
Note
Thread-safe, not reentrant on the same object, and never blocks waiting for data. If optional construction throws, the queued element remains present.

Definition at line 664 of file concurrency_utils.H.

References Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_full_, and Aleph::BoundedChannel< T >::queue_.

◆ try_recv() [2/2]

template<typename T >
bool Aleph::BoundedChannel< T >::try_recv ( T out)
inline

Attempt to receive an item into out without blocking.

Parameters
outReference where the received item will be moved.
Returns
true if an item was received immediately, false if the channel is empty.
Exceptions
Anyexception thrown while copying or moving the queued value into a temporary or assigning into out.
Note
Thread-safe, not reentrant on the same object, and never blocks waiting for data. If extraction throws, the queued element remains present.

Definition at line 642 of file concurrency_utils.H.

References Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_full_, and Aleph::BoundedChannel< T >::queue_.

◆ try_send() [1/2]

template<typename T >
bool Aleph::BoundedChannel< T >::try_send ( const T value)
inline

Attempt to send a value without blocking.

Parameters
valueValue to send.
Returns
true if the value was queued immediately, false if the channel is full or closed.
Exceptions
Anyexception thrown while copying T into the queue.
Note
Thread-safe, not reentrant on the same object, and never blocks waiting for capacity. Close is observed immediately.

Definition at line 469 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::capacity_, Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, and Aleph::BoundedChannel< T >::queue_.

◆ try_send() [2/2]

template<typename T >
bool Aleph::BoundedChannel< T >::try_send ( T &&  value)
inline

Attempt to send a value without blocking (move).

Parameters
valueValue to send.
Returns
true if the value was queued immediately, false if the channel is full or closed.
Exceptions
Anyexception thrown while moving T into the queue.
Note
Thread-safe, not reentrant on the same object, and never blocks waiting for capacity. Close is observed immediately.

Definition at line 488 of file concurrency_utils.H.

References Aleph::BoundedChannel< T >::capacity_, Aleph::BoundedChannel< T >::closed_, Aleph::divide_and_conquer_partition_dp(), Aleph::BoundedChannel< T >::mutex_, Aleph::BoundedChannel< T >::not_empty_, and Aleph::BoundedChannel< T >::queue_.

◆ wait_with_cancellation()

template<typename T >
template<typename Predicate >
void Aleph::BoundedChannel< T >::wait_with_cancellation ( std::condition_variable &  cv,
std::unique_lock< std::mutex > &  lock,
Predicate  pred,
const CancellationToken token 
)
inlineprivate

Wait on a condition variable with cancellation support.

Parameters
cvCondition variable to wait on.
lockLocked mutex protecting the condition.
predPredicate that must be true to stop waiting.
tokenToken to monitor for cancellation.
Exceptions
Aleph::operation_canceledif cancellation is requested.

Definition at line 159 of file concurrency_utils.H.

References Aleph::divide_and_conquer_partition_dp(), Aleph::CancellationToken::notify_on_cancel(), pred, Aleph::CancellationToken::stop_requested(), and Aleph::CancellationToken::throw_if_cancellation_requested().

Referenced by Aleph::BoundedChannel< T >::emplace(), Aleph::BoundedChannel< T >::recv(), Aleph::BoundedChannel< T >::recv(), and Aleph::BoundedChannel< T >::send_impl().

Member Data Documentation

◆ capacity_

◆ closed_

◆ mutex_

◆ not_empty_

◆ not_full_

◆ queue_


The documentation for this class was generated from the following file: