Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
Threads and Concurrency

Threading utilities (thread pools, worker pools, timed event queues) and synchronization helpers. More...

Files

file  concurrency_utils.H
 Modern synchronization helpers for channels, shared state, and small producer-consumer queues.
 
file  experimental_async.H
 Opt-in experimental coroutine-friendly bridge over ThreadPool.
 
file  thread_pool.H
 A modern, efficient thread pool for parallel task execution.
 
file  timeoutQueue.H
 Priority queue for scheduling timed events.
 
file  useCondVar.H
 Legacy wrapper class for POSIX condition variables.
 
file  useMutex.H
 Legacy RAII-style wrapper for POSIX mutexes.
 
file  worker_pool.H
 General-purpose worker thread pool for parallel task execution.
 

Classes

class  UseCondVar
 Legacy wrapper class for POSIX condition variables. More...
 
class  UseMutex
 Legacy RAII-style mutex lock guard. More...
 

Functions

 UseCondVar::UseCondVar (pthread_cond_t *c, pthread_mutex_t *m)
 Construct wrapper for condition variable and mutex.
 

Detailed Description

Threading utilities (thread pools, worker pools, timed event queues) and synchronization helpers.

This module groups Aleph-w facilities related to concurrency and thread-based execution.

It covers:

ThreadPool (task-based)

See thread_pool.H and the runnable example thread_pool_example.cc.

Typical usage pattern:

#include <thread_pool.H>
auto f = pool.enqueue([](int x) { return x * x; }, 5);
int v = f.get();
(void) v;
A reusable thread pool for efficient parallel task execution.
A modern, efficient thread pool for parallel task execution.

Structured task groups:

#include <thread_pool.H>
Aleph::TaskGroup group(pool);
group.launch([] { /* task A */ });
group.launch([] { /* task B */ });
group.wait();
Minimal structured-concurrency helper over ThreadPool futures.

Cooperative cancellation:

auto token = cancel.token();
pool.enqueue([token] {
while (not token.stop_requested())
do_some_work_step();
});
cancel.request_cancel();
Cooperative cancellation source paired with CancellationToken.
CancellationToken token() const noexcept
Return a token observing this source.
void request_cancel() noexcept
Request cancellation for all derived tokens.

Parallel algorithms can now be configured through ParallelOptions, including min_size, chunk_size, max_tasks, pool, and cancel_token.

This applies to the main helpers in ah-parallel.H, including map/filter/fold, parallel sort, scan/merge wrappers, zip, and enumerate operations. Existing ThreadPool& overloads remain available as compatibility wrappers.

If cancel_token is signaled and the algorithm observes it, the operation throws Aleph::operation_canceled.

Foundational building blocks now available in thread_pool.H:

Aleph::parallel_invoke(options, [] { /* branch A */ }, [] { /* branch B */ });
Aleph::pscan(first, last, out, std::plus<int>{}, options);
Aleph::pexclusive_scan(first, last, out, 0, std::plus<int>{}, options);
Aleph::pmerge(a_first, a_last, b_first, b_last, out, std::less<int>{}, options);
auto pexclusive_scan(ThreadPool &pool, const Container &c, T init, BinaryOp op, size_t chunk_size=0)
Parallel exclusive scan over a container.
auto pmerge(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Compare comp=Compare{}, size_t chunk_size=0)
Parallel merge of two sorted containers.
auto pscan(ThreadPool &pool, const Container &c, BinaryOp op, size_t chunk_size=0)
Parallel inclusive scan over a container.
void parallel_invoke(ThreadPool &pool, Fs &&... fs)
Invoke a small set of related callables in parallel.
static struct argp_option options[]
Definition ntreepic.C:1886

Modern coordination helpers in concurrency_utils.H:

ch.send(10);
if (auto item = ch.recv())
items.with_lock([&](auto &out) { out.push_back(*item); });
shared_sum.with_write_lock([](int &value) { value += 5; });
const int snapshot = shared_sum.with_read_lock([](const int &value) {
return value;
});
(void) snapshot;
handoff.try_push(1);
auto popped = handoff.try_pop();
Bounded blocking channel for producer-consumer workflows.
Read/write-lock protected shared object wrapper.
Bounded single-producer/single-consumer queue.
Mutex-protected shared object wrapper.
Modern synchronization helpers for channels, shared state, and small producer-consumer queues.

bounded_channel<T> also offers cancellation-aware blocking operations:

try
{
jobs.send(42, cancel.token());
auto item = jobs.recv(cancel.token());
(void) item;
}
{
// stop blocked send/recv cleanly
}
Exception thrown when cooperative cancellation is observed.

UseMutex and UseCondVar are still available for backward compatibility with older POSIX-style code, but new code should generally prefer concurrency_utils.H and standard C++ synchronization primitives.

See also the runnable example Examples/concurrency_utils_example.cc.

Work stealing was evaluated for this phase and intentionally deferred; see docs/work_stealing_backend_note.md for the design note and rationale.

TimeoutQueue (timed events)

See timeoutQueue.H and the runnable example timeoutQueue_example.C.

Notes

Function Documentation

◆ UseCondVar()

UseCondVar::UseCondVar ( pthread_cond_t *  c,
pthread_mutex_t *  m 
)
inline

Construct wrapper for condition variable and mutex.

Precondition
c != nullptr && m != nullptr
Parameters
cPointer to initialized condition variable
mPointer to initialized mutex (must be held when waiting)
Exceptions
None
Complexity O(1)
Exception safety No-throw

Definition at line 129 of file useCondVar.H.

References UseCondVar::cond, m, and UseCondVar::mutex.