|
Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
|
A reusable thread pool for efficient parallel task execution. More...
#include <thread_pool.H>
Classes | |
| struct | Task |
| Concrete task implementation with type preservation. More... | |
| struct | TaskBase |
| Type-erased task wrapper. More... | |
Public Member Functions | |
| ThreadPool (size_t n_threads=std::thread::hardware_concurrency()) | |
| Construct a thread pool with specified number of workers. | |
| ThreadPool (const ThreadPool &)=delete | |
| Non-copyable. | |
| ThreadPool & | operator= (const ThreadPool &)=delete |
| ThreadPool (ThreadPool &&)=delete | |
| Non-movable (due to mutex and condition_variable) | |
| ThreadPool & | operator= (ThreadPool &&)=delete |
| ~ThreadPool () | |
| Destructor - stops all workers and waits for completion. | |
| template<typename F , typename... Args> | |
| auto | enqueue (F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > > |
| Submit a task for execution and get a future for the result. | |
| template<typename F , typename... Args> | |
| void | enqueue_detached (F &&f, Args &&... args) |
| Submit a task without tracking the result (fire-and-forget). | |
| template<typename F , typename Container > | |
| auto | enqueue_bulk (F &&f, const Container &args_container) -> std::vector< std::future< std::invoke_result_t< F, typename Container::value_type > > > |
| Submit multiple tasks and collect all futures. | |
| void | set_queue_limits (size_t soft_limit, size_t hard_limit=std::numeric_limits< size_t >::max()) |
| Set queue limits for bounded enqueue operations. | |
| std::pair< size_t, size_t > | get_queue_limits () const |
| Get current queue limits. | |
| template<typename F , typename... Args> | |
| auto | enqueue_bounded (F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > > |
| Submit a task with backpressure and memory protection. | |
| template<typename F , typename... Args> | |
| void | enqueue_bounded_detached (F &&f, Args &&... args) |
| Submit a task with backpressure, without tracking result. | |
| template<typename F , typename... Args> | |
| auto | try_enqueue (F &&f, Args &&... args) -> std::optional< std::future< std::invoke_result_t< F, Args... > > > |
| Try to submit a task without blocking or throwing. | |
| template<typename F , typename... Args> | |
| bool | try_enqueue_detached (F &&f, Args &&... args) |
| Try to submit a detached task without blocking or throwing. | |
| size_t | num_threads () const noexcept |
| Get the number of worker threads. | |
| size_t | pending_tasks () const |
| Get the number of pending tasks in the queue. | |
| size_t | running_tasks () const noexcept |
| Get the number of tasks currently being executed. | |
| bool | is_idle () const |
| Check if the pool is idle (no pending or running tasks). | |
| bool | is_stopped () const noexcept |
| Check if the pool has been shut down. | |
| void | shutdown () |
| Shut down the pool, completing all pending tasks. | |
| void | resize (size_t new_size) |
| Resize the pool to a different number of workers. | |
| void | wait_all (std::chrono::milliseconds poll_interval=std::chrono::milliseconds(1)) |
| Wait until all current tasks complete. | |
| template<typename Rep , typename Period > | |
| bool | wait_all_for (std::chrono::duration< Rep, Period > timeout) |
| Wait until all current tasks complete, with timeout. | |
| template<typename Clock , typename Duration > | |
| bool | wait_all_until (std::chrono::time_point< Clock, Duration > deadline) |
| Wait until all current tasks complete, with deadline. | |
| ThreadPoolStats | get_stats () const |
| Get current pool statistics. | |
| void | reset_stats () |
| Reset statistics counters to zero. | |
| void | set_exception_callback (ExceptionCallback callback) |
| Set callback for exceptions in detached tasks. | |
| template<typename F , typename Container > | |
| auto | enqueue_batch (F &&f, const Container &args_list) -> std::vector< std::future< decltype(std::apply(f, *std::begin(args_list)))> > |
| Submit multiple tasks with a single notification. | |
Private Member Functions | |
| void | worker_loop () |
| Worker thread main loop. | |
| void | start_workers (size_t n) |
| Start n worker threads. | |
| void | stop_workers () |
| Stop and join all workers. | |
Static Private Member Functions | |
| template<typename F , typename... Args> | |
| static auto | make_invocable (F &&f, Args &&... args) |
| Helper to create a callable from function + arguments using std::invoke. | |
Private Attributes | |
| std::vector< std::thread > | workers |
| std::queue< std::unique_ptr< TaskBase > > | tasks |
| std::mutex | queue_mutex |
| std::condition_variable | condition |
| Notifies workers of new tasks. | |
| std::condition_variable | space_available |
| Notifies enqueuers of queue space. | |
| std::condition_variable | idle_condition |
| Notifies wait_all() of idle state. | |
| std::atomic< bool > | stop {false} |
| std::atomic< size_t > | active_tasks {0} |
| size_t | soft_limit_ = std::numeric_limits<size_t>::max() |
| Block threshold. | |
| size_t | hard_limit_ = std::numeric_limits<size_t>::max() |
| Exception threshold. | |
| std::atomic< size_t > | tasks_completed_ {0} |
| std::atomic< size_t > | tasks_failed_ {0} |
| std::atomic< size_t > | peak_queue_size_ {0} |
| ExceptionCallback | exception_callback_ |
A reusable thread pool for efficient parallel task execution.
Maintains a pool of worker threads that wait for tasks and execute them. This avoids the overhead of creating and destroying threads for each task.
The pool uses a shared task queue protected by a mutex. Workers wait on a condition variable and are notified when new tasks arrive.
This implementation uses std::invoke with std::tuple + std::apply instead of std::bind for better performance and move-only support. The overhead per task is approximately 50-100 nanoseconds.
get() is called.Definition at line 211 of file thread_pool.H.
|
inlineexplicit |
Construct a thread pool with specified number of workers.
| n_threads | Number of worker threads to create. Defaults to std::thread::hardware_concurrency(). |
Definition at line 338 of file thread_pool.H.
References Aleph::maps(), and start_workers().
|
delete |
Non-copyable.
|
delete |
Non-movable (due to mutex and condition_variable)
|
inline |
Destructor - stops all workers and waits for completion.
All pending tasks in the queue will be executed before shutdown.
Definition at line 358 of file thread_pool.H.
References stop_workers().
|
inline |
Submit a task for execution and get a future for the result.
This is the most general overload, supporting:
std::ref() / std::cref()| F | Callable type. |
| Args | Argument types for the callable. |
| f | The callable to execute. |
| args | Arguments to pass to the callable (via std::invoke). Use std::ref(x) to pass by reference. |
std::future that will hold the result (or exception).| std::runtime_error | if the pool has been shut down. |
Definition at line 407 of file thread_pool.H.
References condition, make_invocable(), Aleph::maps(), peak_queue_size_, queue_mutex, stop, tasks, and tasks_completed_.
Referenced by enqueue_bulk(), example_basic_parallel(), example_performance(), Aleph::pall(), Aleph::parallel_for(), Aleph::parallel_for_index(), Aleph::parallel_reduce(), Aleph::parallel_transform(), Aleph::pcount_if(), Aleph::penumerate_for_each(), Aleph::penumerate_for_each(), Aleph::penumerate_maps(), Aleph::pexists(), Aleph::pfilter(), Aleph::pfind(), Aleph::pfoldl(), Aleph::pfor_each(), Aleph::pfor_each(), Aleph::pmaps(), Aleph::pmax(), Aleph::pmin(), Aleph::pminmax(), Aleph::ppartition(), Aleph::pzip_all_n(), Aleph::pzip_count_if_n(), Aleph::pzip_exists_n(), Aleph::pzip_foldl(), Aleph::pzip_foldl_n(), Aleph::pzip_for_each(), Aleph::pzip_for_each_n(), Aleph::pzip_maps(), Aleph::pzip_maps_nand try_enqueue().
|
inline |
Submit multiple tasks with a single notification.
More efficient than calling enqueue() multiple times when you have many tasks to submit, as it reduces mutex contention and notification overhead.
| f | The callable to execute for each argument set. |
| args_list | Container where each element is the arguments for one call. |
Definition at line 1097 of file thread_pool.H.
References condition, Aleph::maps(), peak_queue_size_, queue_mutex, stop, tasks, and tasks_completed_.
|
inline |
Submit a task with backpressure and memory protection.
Unlike enqueue(), this method respects queue limits:
pending_tasks() >= soft_limit: blocks until space availablepending_tasks() >= hard_limit: throws queue_overflow_errorThis provides natural backpressure (producer slows down) and protects against memory exhaustion.
| F | Callable type. |
| Args | Argument types for the callable. |
| f | The callable to execute. |
| args | Arguments to pass to the callable. |
std::future that will hold the result.| queue_overflow_error | if queue size >= hard_limit. |
| std::runtime_error | if the pool has been shut down. |
Definition at line 654 of file thread_pool.H.
References condition, hard_limit_, make_invocable(), Aleph::maps(), queue_mutex, soft_limit_, space_available, stop, and tasks.
Referenced by TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().
Submit a task with backpressure, without tracking result.
Fire-and-forget version of enqueue_bounded(). Respects queue limits but doesn't return a future.
| F | Callable type. |
| Args | Argument types for the callable. |
| f | The callable to execute. |
| args | Arguments to pass to the callable. |
| queue_overflow_error | if queue size >= hard_limit. |
| std::runtime_error | if the pool has been shut down. |
Definition at line 724 of file thread_pool.H.
References condition, hard_limit_, make_invocable(), Aleph::maps(), queue_mutex, soft_limit_, space_available, stop, and tasks.
Referenced by example_backpressure(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().
|
inline |
Submit multiple tasks and collect all futures.
Convenience method for submitting a batch of similar tasks.
| F | Callable type. |
| Container | Container of argument sets (e.g., vector<tuple<Args...>>). |
| f | The callable to execute for each argument set. |
| args_container | Container where each element is the arguments for one call. |
Definition at line 564 of file thread_pool.H.
References enqueue(), Aleph::maps(), and Aleph::HTList::size().
Referenced by example_batch_processing(), TEST_F(), TEST_F(), and TEST_F().
Submit a task without tracking the result (fire-and-forget).
More efficient than enqueue() when you don't need the result, as it avoids the overhead of promise/future.
| F | Callable type. |
| Args | Argument types for the callable. |
| f | The callable to execute. |
| args | Arguments to pass to the callable. |
| std::runtime_error | if the pool has been shut down. |
enqueue() if you need exception handling.Definition at line 491 of file thread_pool.H.
References condition, exception_callback_, make_invocable(), Aleph::maps(), peak_queue_size_, queue_mutex, stop, tasks, tasks_completed_, and tasks_failed_.
Referenced by example_fire_and_forget(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and try_enqueue_detached().
Get current queue limits.
Definition at line 613 of file thread_pool.H.
References hard_limit_, queue_mutex, and soft_limit_.
Referenced by example_backpressure(), TEST_F(), TEST_F(), and TEST_F().
|
inline |
Get current pool statistics.
Returns a snapshot of various performance counters.
Definition at line 1021 of file thread_pool.H.
References active_tasks, Aleph::ThreadPoolStats::current_active, Aleph::ThreadPoolStats::current_queue_size, Aleph::ThreadPoolStats::num_workers, Aleph::ThreadPoolStats::peak_queue_size, peak_queue_size_, queue_mutex, tasks, Aleph::ThreadPoolStats::tasks_completed, tasks_completed_, Aleph::ThreadPoolStats::tasks_failed, tasks_failed_, and workers.
Referenced by TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().
|
inline |
Check if the pool is idle (no pending or running tasks).
Definition at line 884 of file thread_pool.H.
References active_tasks, queue_mutex, and tasks.
Referenced by TEST_F(), TEST_F(), TEST_F(), TEST_F(), and wait_all().
|
inlinenoexcept |
Check if the pool has been shut down.
Definition at line 893 of file thread_pool.H.
References stop.
Helper to create a callable from function + arguments using std::invoke.
Definition at line 320 of file thread_pool.H.
References Aleph::maps().
Referenced by enqueue(), enqueue_bounded(), enqueue_bounded_detached(), and enqueue_detached().
|
inlinenoexcept |
Get the number of worker threads.
Definition at line 859 of file thread_pool.H.
References Aleph::HTList::size(), and workers.
Referenced by example_basic_parallel(), example_batch_processing(), example_parallel_map(), example_performance(), example_performance_comparison(), Aleph::pall(), Aleph::parallel_for(), Aleph::parallel_for_index(), Aleph::parallel_reduce(), Aleph::parallel_transform(), Aleph::pcount_if(), Aleph::penumerate_for_each(), Aleph::penumerate_for_each(), Aleph::penumerate_maps(), Aleph::pexists(), Aleph::pfilter(), Aleph::pfind(), Aleph::pfoldl(), Aleph::pfor_each(), Aleph::pfor_each(), Aleph::pmaps(), Aleph::pmax(), Aleph::pmin(), Aleph::pminmax(), Aleph::ppartition(), Aleph::pzip_all_n(), Aleph::pzip_count_if_n(), Aleph::pzip_exists_n(), Aleph::pzip_foldl(), Aleph::pzip_foldl_n(), Aleph::pzip_for_each(), Aleph::pzip_for_each_n(), Aleph::pzip_maps(), Aleph::pzip_maps_n(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().
|
delete |
|
delete |
|
inline |
Get the number of pending tasks in the queue.
Definition at line 867 of file thread_pool.H.
References queue_mutex, and tasks.
Referenced by example_backpressure(), TEST_F(), and TEST_F().
|
inline |
Reset statistics counters to zero.
Definition at line 1040 of file thread_pool.H.
References peak_queue_size_, tasks_completed_, and tasks_failed_.
Referenced by TEST_F().
Resize the pool to a different number of workers.
Stops all current workers and starts a new set with the specified size. Any tasks currently in the queue will be preserved and executed by the new workers.
| new_size | New number of worker threads. |
| std::runtime_error | if the pool has been shut down. |
Definition at line 937 of file thread_pool.H.
References condition, Aleph::maps(), queue_mutex, Aleph::HTList::size(), start_workers(), stop, and workers.
Referenced by TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().
|
inlinenoexcept |
Get the number of tasks currently being executed.
Definition at line 876 of file thread_pool.H.
References active_tasks.
|
inline |
Set callback for exceptions in detached tasks.
When a detached task throws an exception, this callback is invoked with the exception pointer. This allows logging or handling of exceptions that would otherwise be silently ignored.
| callback | Function to call with exception_ptr on task failure. Pass nullptr to disable. |
Definition at line 1068 of file thread_pool.H.
References exception_callback_, and queue_mutex.
|
inline |
Set queue limits for bounded enqueue operations.
Configures backpressure and memory protection for the task queue.
| soft_limit | When queue reaches this size, enqueue_bounded() blocks until space is available. Set to SIZE_MAX to disable blocking. |
| hard_limit | When queue reaches this size, enqueue_bounded() throws queue_overflow_error. Defaults to 10x soft_limit. Set to SIZE_MAX to disable the hard limit. |
enqueue_bounded() and enqueue_bounded_detached(). Regular enqueue() and enqueue_detached() are unaffected.Definition at line 598 of file thread_pool.H.
References hard_limit_, Aleph::maps(), queue_mutex, and soft_limit_.
Referenced by example_backpressure(), example_load_shedding(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().
|
inline |
Shut down the pool, completing all pending tasks.
After calling this method:
Definition at line 908 of file thread_pool.H.
References condition, Aleph::maps(), queue_mutex, stop, and workers.
Referenced by TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().
Start n worker threads.
Definition at line 295 of file thread_pool.H.
References worker_loop(), and workers.
Referenced by ThreadPool(), and resize().
|
inlineprivate |
Stop and join all workers.
Definition at line 303 of file thread_pool.H.
References condition, Aleph::maps(), queue_mutex, stop, and workers.
Referenced by ~ThreadPool().
|
inline |
Try to submit a task without blocking or throwing.
Non-blocking version of enqueue_bounded(). Returns immediately with either a future (if task was queued) or std::nullopt (if queue is full).
| F | Callable type. |
| Args | Argument types for the callable. |
| f | The callable to execute. |
| args | Arguments to pass to the callable. |
std::optional<std::future<R>> - contains the future if the task was successfully queued, or std::nullopt if the queue is at or above the soft limit.| std::runtime_error | only if the pool has been shut down. |
soft_limit as the threshold. Configure with set_queue_limits().Definition at line 799 of file thread_pool.H.
References enqueue(), Aleph::maps(), queue_mutex, soft_limit_, stop, and tasks.
Referenced by TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().
Try to submit a detached task without blocking or throwing.
Non-blocking version of enqueue_bounded_detached().
| F | Callable type. |
| Args | Argument types for the callable. |
| f | The callable to execute. |
| args | Arguments to pass to the callable. |
true if the task was queued, false if the queue is full.| std::runtime_error | only if the pool has been shut down. |
Definition at line 840 of file thread_pool.H.
References enqueue_detached(), Aleph::maps(), queue_mutex, soft_limit_, stop, and tasks.
Referenced by example_load_shedding(), TEST_F(), TEST_F(), and TEST_F().
|
inline |
Wait until all current tasks complete.
Blocks until the task queue is empty and no tasks are running. New tasks can still be enqueued while waiting.
| poll_interval | How often to check for completion (default 1ms). |
Definition at line 973 of file thread_pool.H.
References is_idle(), and Aleph::maps().
Referenced by example_backpressure(), example_fire_and_forget(), example_load_shedding(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().
Wait until all current tasks complete, with timeout.
Blocks until either:
| timeout | Maximum time to wait. |
Definition at line 989 of file thread_pool.H.
References active_tasks, Aleph::DynList< T >::empty(), idle_condition, Aleph::maps(), queue_mutex, and tasks.
|
inline |
Wait until all current tasks complete, with deadline.
Blocks until either:
| deadline | Time point at which to stop waiting. |
Definition at line 1007 of file thread_pool.H.
References active_tasks, Aleph::DynList< T >::empty(), idle_condition, Aleph::maps(), queue_mutex, and tasks.
Referenced by TEST_F().
|
inlineprivate |
Worker thread main loop.
Definition at line 253 of file thread_pool.H.
References active_tasks, condition, Aleph::DynList< T >::empty(), idle_condition, Aleph::maps(), queue_mutex, soft_limit_, space_available, stop, and tasks.
Referenced by start_workers().
|
private |
Definition at line 238 of file thread_pool.H.
Referenced by get_stats(), is_idle(), running_tasks(), wait_all_for(), wait_all_until(), and worker_loop().
|
private |
Notifies workers of new tasks.
Definition at line 234 of file thread_pool.H.
Referenced by enqueue(), enqueue_batch(), enqueue_bounded(), enqueue_bounded_detached(), enqueue_detached(), resize(), shutdown(), stop_workers(), and worker_loop().
|
private |
Definition at line 250 of file thread_pool.H.
Referenced by enqueue_detached(), and set_exception_callback().
Exception threshold.
Definition at line 242 of file thread_pool.H.
Referenced by enqueue_bounded(), enqueue_bounded_detached(), get_queue_limits(), and set_queue_limits().
|
private |
Notifies wait_all() of idle state.
Definition at line 236 of file thread_pool.H.
Referenced by wait_all_for(), wait_all_until(), and worker_loop().
|
private |
Definition at line 247 of file thread_pool.H.
Referenced by enqueue(), enqueue_batch(), enqueue_detached(), get_stats(), and reset_stats().
|
mutableprivate |
Definition at line 233 of file thread_pool.H.
Referenced by enqueue(), enqueue_batch(), enqueue_bounded(), enqueue_bounded_detached(), enqueue_detached(), get_queue_limits(), get_stats(), is_idle(), pending_tasks(), resize(), set_exception_callback(), set_queue_limits(), shutdown(), stop_workers(), try_enqueue(), try_enqueue_detached(), wait_all_for(), wait_all_until(), and worker_loop().
Block threshold.
Definition at line 241 of file thread_pool.H.
Referenced by enqueue_bounded(), enqueue_bounded_detached(), get_queue_limits(), set_queue_limits(), try_enqueue(), try_enqueue_detached(), and worker_loop().
|
private |
Notifies enqueuers of queue space.
Definition at line 235 of file thread_pool.H.
Referenced by enqueue_bounded(), enqueue_bounded_detached(), and worker_loop().
Definition at line 237 of file thread_pool.H.
Referenced by enqueue(), enqueue_batch(), enqueue_bounded(), enqueue_bounded_detached(), enqueue_detached(), is_stopped(), resize(), shutdown(), stop_workers(), try_enqueue(), try_enqueue_detached(), and worker_loop().
|
private |
Definition at line 232 of file thread_pool.H.
Referenced by enqueue(), enqueue_batch(), enqueue_bounded(), enqueue_bounded_detached(), enqueue_detached(), get_stats(), is_idle(), pending_tasks(), try_enqueue(), try_enqueue_detached(), wait_all_for(), wait_all_until(), and worker_loop().
|
private |
Definition at line 245 of file thread_pool.H.
Referenced by enqueue(), enqueue_batch(), enqueue_detached(), get_stats(), and reset_stats().
|
private |
Definition at line 246 of file thread_pool.H.
Referenced by enqueue_detached(), get_stats(), and reset_stats().
|
private |
Definition at line 231 of file thread_pool.H.
Referenced by get_stats(), num_threads(), resize(), shutdown(), start_workers(), and stop_workers().