Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
Aleph::ThreadPool Class Reference

A reusable thread pool for efficient parallel task execution. More...

#include <thread_pool.H>

Collaboration diagram for Aleph::ThreadPool:
[legend]

Classes

struct  Task
 Concrete task implementation with type preservation. More...
 
struct  TaskBase
 Type-erased task wrapper. More...
 

Public Member Functions

 ThreadPool (size_t n_threads=std::thread::hardware_concurrency())
 Construct a thread pool with specified number of workers.
 
 ThreadPool (const ThreadPool &)=delete
 Non-copyable.
 
ThreadPooloperator= (const ThreadPool &)=delete
 
 ThreadPool (ThreadPool &&)=delete
 Non-movable (due to mutex and condition_variable)
 
ThreadPooloperator= (ThreadPool &&)=delete
 
 ~ThreadPool ()
 Destructor - stops all workers and waits for completion.
 
template<typename F , typename... Args>
auto enqueue (F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
 Submit a task for execution and get a future for the result.
 
template<typename F , typename... Args>
void enqueue_detached (F &&f, Args &&... args)
 Submit a task without tracking the result (fire-and-forget).
 
template<typename F , typename Container >
auto enqueue_bulk (F &&f, const Container &args_container) -> std::vector< std::future< std::invoke_result_t< F, typename Container::value_type > > >
 Submit multiple tasks and collect all futures.
 
void set_queue_limits (size_t soft_limit, size_t hard_limit=std::numeric_limits< size_t >::max())
 Set queue limits for bounded enqueue operations.
 
std::pair< size_t, size_tget_queue_limits () const
 Get current queue limits.
 
template<typename F , typename... Args>
auto enqueue_bounded (F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
 Submit a task with backpressure and memory protection.
 
template<typename F , typename... Args>
void enqueue_bounded_detached (F &&f, Args &&... args)
 Submit a task with backpressure, without tracking result.
 
template<typename F , typename... Args>
auto try_enqueue (F &&f, Args &&... args) -> std::optional< std::future< std::invoke_result_t< F, Args... > > >
 Try to submit a task without blocking or throwing.
 
template<typename F , typename... Args>
bool try_enqueue_detached (F &&f, Args &&... args)
 Try to submit a detached task without blocking or throwing.
 
size_t num_threads () const noexcept
 Get the number of worker threads.
 
size_t pending_tasks () const
 Get the number of pending tasks in the queue.
 
size_t running_tasks () const noexcept
 Get the number of tasks currently being executed.
 
bool is_idle () const
 Check if the pool is idle (no pending or running tasks).
 
bool is_stopped () const noexcept
 Check if the pool has been shut down.
 
void shutdown ()
 Shut down the pool, completing all pending tasks.
 
void resize (size_t new_size)
 Resize the pool to a different number of workers.
 
void wait_all (std::chrono::milliseconds poll_interval=std::chrono::milliseconds(1))
 Wait until all current tasks complete.
 
template<typename Rep , typename Period >
bool wait_all_for (std::chrono::duration< Rep, Period > timeout)
 Wait until all current tasks complete, with timeout.
 
template<typename Clock , typename Duration >
bool wait_all_until (std::chrono::time_point< Clock, Duration > deadline)
 Wait until all current tasks complete, with deadline.
 
ThreadPoolStats get_stats () const
 Get current pool statistics.
 
void reset_stats ()
 Reset statistics counters to zero.
 
void set_exception_callback (ExceptionCallback callback)
 Set callback for exceptions in detached tasks.
 
template<typename F , typename Container >
auto enqueue_batch (F &&f, const Container &args_list) -> std::vector< std::future< decltype(std::apply(f, *std::begin(args_list)))> >
 Submit multiple tasks with a single notification.
 

Private Member Functions

void worker_loop ()
 Worker thread main loop.
 
void start_workers (size_t n)
 Start n worker threads.
 
void stop_workers ()
 Stop and join all workers.
 

Static Private Member Functions

template<typename F , typename... Args>
static auto make_invocable (F &&f, Args &&... args)
 Helper to create a callable from function + arguments using std::invoke.
 

Private Attributes

std::vector< std::thread > workers
 
std::queue< std::unique_ptr< TaskBase > > tasks
 
std::mutex queue_mutex
 
std::condition_variable condition
 Notifies workers of new tasks.
 
std::condition_variable space_available
 Notifies enqueuers of queue space.
 
std::condition_variable idle_condition
 Notifies wait_all() of idle state.
 
std::atomic< boolstop {false}
 
std::atomic< size_tactive_tasks {0}
 
size_t soft_limit_ = std::numeric_limits<size_t>::max()
 Block threshold.
 
size_t hard_limit_ = std::numeric_limits<size_t>::max()
 Exception threshold.
 
std::atomic< size_ttasks_completed_ {0}
 
std::atomic< size_ttasks_failed_ {0}
 
std::atomic< size_tpeak_queue_size_ {0}
 
ExceptionCallback exception_callback_
 

Detailed Description

A reusable thread pool for efficient parallel task execution.

Maintains a pool of worker threads that wait for tasks and execute them. This avoids the overhead of creating and destroying threads for each task.

The pool uses a shared task queue protected by a mutex. Workers wait on a condition variable and are notified when new tasks arrive.

Implementation Notes

This implementation uses std::invoke with std::tuple + std::apply instead of std::bind for better performance and move-only support. The overhead per task is approximately 50-100 nanoseconds.

Note
The pool automatically joins all workers on destruction.
Tasks that throw exceptions will have those exceptions stored in the returned future and rethrown when get() is called.
Author
Leandro Rabindranath León

Definition at line 211 of file thread_pool.H.

Constructor & Destructor Documentation

◆ ThreadPool() [1/3]

Aleph::ThreadPool::ThreadPool ( size_t  n_threads = std::thread::hardware_concurrency())
inlineexplicit

Construct a thread pool with specified number of workers.

Parameters
n_threadsNumber of worker threads to create. Defaults to std::thread::hardware_concurrency().

Definition at line 338 of file thread_pool.H.

References Aleph::maps(), and start_workers().

◆ ThreadPool() [2/3]

Aleph::ThreadPool::ThreadPool ( const ThreadPool )
delete

Non-copyable.

◆ ThreadPool() [3/3]

Aleph::ThreadPool::ThreadPool ( ThreadPool &&  )
delete

Non-movable (due to mutex and condition_variable)

◆ ~ThreadPool()

Aleph::ThreadPool::~ThreadPool ( )
inline

Destructor - stops all workers and waits for completion.

All pending tasks in the queue will be executed before shutdown.

Definition at line 358 of file thread_pool.H.

References stop_workers().

Member Function Documentation

◆ enqueue()

template<typename F , typename... Args>
auto Aleph::ThreadPool::enqueue ( F &&  f,
Args &&...  args 
) -> std::future<std::invoke_result_t<F, Args...>>
inline

Submit a task for execution and get a future for the result.

This is the most general overload, supporting:

  • Any callable (lambda, function, functor, member function pointer)
  • Any number of arguments
  • Move-only callables and arguments
  • Member functions with object pointer/reference
  • Reference arguments via std::ref() / std::cref()
Template Parameters
FCallable type.
ArgsArgument types for the callable.
Parameters
fThe callable to execute.
argsArguments to pass to the callable (via std::invoke). Use std::ref(x) to pass by reference.
Returns
A std::future that will hold the result (or exception).
Exceptions
std::runtime_errorif the pool has been shut down.

Examples

// Lambda
auto f1 = pool.enqueue([](int x) { return x * 2; }, 21);
// Free function
auto f2 = pool.enqueue(std::sqrt, 16.0);
// Member function pointer (std::invoke syntax)
struct Foo { int bar(int x) { return x + 1; } };
auto f3 = pool.enqueue(&Foo::bar, &foo, 41);
// Move-only capture
auto ptr = std::make_unique<int>(100);
auto f4 = pool.enqueue([p = std::move(ptr)]() { return *p; });
// Reference argument (use std::ref)
int value = 0;
pool.enqueue([](int& x) { ++x; }, std::ref(value));
static void bar(int val, int scale=1)
DynList< T > maps(const C &c, Op op)
Classic map operation.

Definition at line 407 of file thread_pool.H.

References condition, make_invocable(), Aleph::maps(), peak_queue_size_, queue_mutex, stop, tasks, and tasks_completed_.

Referenced by enqueue_bulk(), example_basic_parallel(), example_performance(), Aleph::pall(), Aleph::parallel_for(), Aleph::parallel_for_index(), Aleph::parallel_reduce(), Aleph::parallel_transform(), Aleph::pcount_if(), Aleph::penumerate_for_each(), Aleph::penumerate_for_each(), Aleph::penumerate_maps(), Aleph::pexists(), Aleph::pfilter(), Aleph::pfind(), Aleph::pfoldl(), Aleph::pfor_each(), Aleph::pfor_each(), Aleph::pmaps(), Aleph::pmax(), Aleph::pmin(), Aleph::pminmax(), Aleph::ppartition(), Aleph::pzip_all_n(), Aleph::pzip_count_if_n(), Aleph::pzip_exists_n(), Aleph::pzip_foldl(), Aleph::pzip_foldl_n(), Aleph::pzip_for_each(), Aleph::pzip_for_each_n(), Aleph::pzip_maps(), Aleph::pzip_maps_n(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and try_enqueue().

◆ enqueue_batch()

template<typename F , typename Container >
auto Aleph::ThreadPool::enqueue_batch ( F &&  f,
const Container args_list 
) -> std::vector<std::future<decltype(std::apply(f, *std::begin(args_list)))>>
inline

Submit multiple tasks with a single notification.

More efficient than calling enqueue() multiple times when you have many tasks to submit, as it reduces mutex contention and notification overhead.

Template Parameters
FCallable type.
ContainerContainer of argument tuples.
Parameters
fThe callable to execute for each argument set.
args_listContainer where each element is the arguments for one call.
Returns
Vector of futures for all submitted tasks.

Example

std::vector<std::tuple<int, int>> work = {{1, 2}, {3, 4}, {5, 6}};
auto futures = pool.enqueue_batch([](int a, int b) { return a + b; }, work);
// Submits all 3 tasks with one lock acquisition

Definition at line 1097 of file thread_pool.H.

References condition, Aleph::maps(), peak_queue_size_, queue_mutex, stop, tasks, and tasks_completed_.

Referenced by TEST_F(), TEST_F(), and TEST_F().

◆ enqueue_bounded()

template<typename F , typename... Args>
auto Aleph::ThreadPool::enqueue_bounded ( F &&  f,
Args &&...  args 
) -> std::future<std::invoke_result_t<F, Args...>>
inline

Submit a task with backpressure and memory protection.

Unlike enqueue(), this method respects queue limits:

This provides natural backpressure (producer slows down) and protects against memory exhaustion.

Template Parameters
FCallable type.
ArgsArgument types for the callable.
Parameters
fThe callable to execute.
argsArguments to pass to the callable.
Returns
A std::future that will hold the result.
Exceptions
queue_overflow_errorif queue size >= hard_limit.
std::runtime_errorif the pool has been shut down.

Example

ThreadPool pool(4);
pool.set_queue_limits(100, 1000); // soft=100, hard=1000
try {
for (int i = 0; i < 10000; ++i)
pool.enqueue_bounded(expensive_task); // Will block/throw appropriately
} catch (const queue_overflow_error& e) {
std::cerr << "Queue overflow: " << e.what() << std::endl;
}
A reusable thread pool for efficient parallel task execution.
Exception thrown when the task queue exceeds its hard limit.

Definition at line 654 of file thread_pool.H.

References condition, hard_limit_, make_invocable(), Aleph::maps(), queue_mutex, soft_limit_, space_available, stop, and tasks.

Referenced by TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().

◆ enqueue_bounded_detached()

template<typename F , typename... Args>
void Aleph::ThreadPool::enqueue_bounded_detached ( F &&  f,
Args &&...  args 
)
inline

Submit a task with backpressure, without tracking result.

Fire-and-forget version of enqueue_bounded(). Respects queue limits but doesn't return a future.

Template Parameters
FCallable type.
ArgsArgument types for the callable.
Parameters
fThe callable to execute.
argsArguments to pass to the callable.
Exceptions
queue_overflow_errorif queue size >= hard_limit.
std::runtime_errorif the pool has been shut down.

Definition at line 724 of file thread_pool.H.

References condition, hard_limit_, make_invocable(), Aleph::maps(), queue_mutex, soft_limit_, space_available, stop, and tasks.

Referenced by example_backpressure(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().

◆ enqueue_bulk()

template<typename F , typename Container >
auto Aleph::ThreadPool::enqueue_bulk ( F &&  f,
const Container args_container 
) -> std::vector<std::future<std::invoke_result_t<F, typename Container::value_type>>>
inline

Submit multiple tasks and collect all futures.

Convenience method for submitting a batch of similar tasks.

Template Parameters
FCallable type.
ContainerContainer of argument sets (e.g., vector<tuple<Args...>>).
Parameters
fThe callable to execute for each argument set.
args_containerContainer where each element is the arguments for one call.
Returns
Vector of futures for all submitted tasks.

Example

std::vector<int> inputs = {1, 2, 3, 4, 5};
auto futures = pool.enqueue_bulk([](int x) { return x * x; }, inputs);
// futures contains: future<1>, future<4>, future<9>, future<16>, future<25>

Definition at line 564 of file thread_pool.H.

References enqueue(), Aleph::maps(), and Aleph::HTList::size().

Referenced by example_batch_processing(), TEST_F(), TEST_F(), and TEST_F().

◆ enqueue_detached()

template<typename F , typename... Args>
void Aleph::ThreadPool::enqueue_detached ( F &&  f,
Args &&...  args 
)
inline

Submit a task without tracking the result (fire-and-forget).

More efficient than enqueue() when you don't need the result, as it avoids the overhead of promise/future.

Template Parameters
FCallable type.
ArgsArgument types for the callable.
Parameters
fThe callable to execute.
argsArguments to pass to the callable.
Exceptions
std::runtime_errorif the pool has been shut down.
Warning
Exceptions thrown by the task will be silently ignored. Use enqueue() if you need exception handling.

Example

// Fire-and-forget logging
pool.enqueue_detached([](const std::string& msg) {
std::ofstream log("app.log", std::ios::app);
log << msg << std::endl;
}, "User logged in");
__gmp_expr< T, __gmp_unary_expr< __gmp_expr< T, U >, __gmp_log_function > > log(const __gmp_expr< T, U > &expr)
Definition gmpfrxx.h:4063

Definition at line 491 of file thread_pool.H.

References condition, exception_callback_, make_invocable(), Aleph::maps(), peak_queue_size_, queue_mutex, stop, tasks, tasks_completed_, and tasks_failed_.

Referenced by example_fire_and_forget(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and try_enqueue_detached().

◆ get_queue_limits()

std::pair< size_t, size_t > Aleph::ThreadPool::get_queue_limits ( ) const
inline

Get current queue limits.

Returns
Pair of (soft_limit, hard_limit).

Definition at line 613 of file thread_pool.H.

References hard_limit_, queue_mutex, and soft_limit_.

Referenced by example_backpressure(), TEST_F(), TEST_F(), and TEST_F().

◆ get_stats()

ThreadPoolStats Aleph::ThreadPool::get_stats ( ) const
inline

◆ is_idle()

bool Aleph::ThreadPool::is_idle ( ) const
inline

Check if the pool is idle (no pending or running tasks).

Returns
true if no tasks are queued or running.

Definition at line 884 of file thread_pool.H.

References active_tasks, queue_mutex, and tasks.

Referenced by TEST_F(), TEST_F(), TEST_F(), TEST_F(), and wait_all().

◆ is_stopped()

bool Aleph::ThreadPool::is_stopped ( ) const
inlinenoexcept

Check if the pool has been shut down.

Returns
true if shutdown() was called or destructor is running.

Definition at line 893 of file thread_pool.H.

References stop.

Referenced by TEST_F(), and TEST_F().

◆ make_invocable()

template<typename F , typename... Args>
static auto Aleph::ThreadPool::make_invocable ( F &&  f,
Args &&...  args 
)
inlinestaticprivate

Helper to create a callable from function + arguments using std::invoke.

Definition at line 320 of file thread_pool.H.

References Aleph::maps().

Referenced by enqueue(), enqueue_bounded(), enqueue_bounded_detached(), and enqueue_detached().

◆ num_threads()

◆ operator=() [1/2]

ThreadPool & Aleph::ThreadPool::operator= ( const ThreadPool )
delete

◆ operator=() [2/2]

ThreadPool & Aleph::ThreadPool::operator= ( ThreadPool &&  )
delete

◆ pending_tasks()

size_t Aleph::ThreadPool::pending_tasks ( ) const
inline

Get the number of pending tasks in the queue.

Returns
Number of tasks waiting to be executed.

Definition at line 867 of file thread_pool.H.

References queue_mutex, and tasks.

Referenced by example_backpressure(), TEST_F(), and TEST_F().

◆ reset_stats()

void Aleph::ThreadPool::reset_stats ( )
inline

Reset statistics counters to zero.

Definition at line 1040 of file thread_pool.H.

References peak_queue_size_, tasks_completed_, and tasks_failed_.

Referenced by TEST_F().

◆ resize()

void Aleph::ThreadPool::resize ( size_t  new_size)
inline

Resize the pool to a different number of workers.

Stops all current workers and starts a new set with the specified size. Any tasks currently in the queue will be preserved and executed by the new workers.

Parameters
new_sizeNew number of worker threads.
Warning
This method should not be called while tasks are being enqueued from other threads.
Exceptions
std::runtime_errorif the pool has been shut down.

Definition at line 937 of file thread_pool.H.

References condition, Aleph::maps(), queue_mutex, Aleph::HTList::size(), start_workers(), stop, and workers.

Referenced by TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().

◆ running_tasks()

size_t Aleph::ThreadPool::running_tasks ( ) const
inlinenoexcept

Get the number of tasks currently being executed.

Returns
Number of active tasks.

Definition at line 876 of file thread_pool.H.

References active_tasks.

◆ set_exception_callback()

void Aleph::ThreadPool::set_exception_callback ( ExceptionCallback  callback)
inline

Set callback for exceptions in detached tasks.

When a detached task throws an exception, this callback is invoked with the exception pointer. This allows logging or handling of exceptions that would otherwise be silently ignored.

Parameters
callbackFunction to call with exception_ptr on task failure. Pass nullptr to disable.

Example

pool.set_exception_callback([](std::exception_ptr ep) {
try {
std::rethrow_exception(ep);
} catch (const std::exception& e) {
std::cerr << "Detached task failed: " << e.what() << std::endl;
}
});

Definition at line 1068 of file thread_pool.H.

References exception_callback_, and queue_mutex.

Referenced by TEST_F(), and TEST_F().

◆ set_queue_limits()

void Aleph::ThreadPool::set_queue_limits ( size_t  soft_limit,
size_t  hard_limit = std::numeric_limits<size_t>::max() 
)
inline

Set queue limits for bounded enqueue operations.

Configures backpressure and memory protection for the task queue.

Parameters
soft_limitWhen queue reaches this size, enqueue_bounded() blocks until space is available. Set to SIZE_MAX to disable blocking.
hard_limitWhen queue reaches this size, enqueue_bounded() throws queue_overflow_error. Defaults to 10x soft_limit. Set to SIZE_MAX to disable the hard limit.
Note
These limits only affect enqueue_bounded() and enqueue_bounded_detached(). Regular enqueue() and enqueue_detached() are unaffected.

Example

ThreadPool pool(4);
pool.set_queue_limits(1000); // soft=1000, hard=10000
pool.set_queue_limits(1000, 5000); // soft=1000, hard=5000

Definition at line 598 of file thread_pool.H.

References hard_limit_, Aleph::maps(), queue_mutex, and soft_limit_.

Referenced by example_backpressure(), example_load_shedding(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().

◆ shutdown()

void Aleph::ThreadPool::shutdown ( )
inline

Shut down the pool, completing all pending tasks.

After calling this method:

  • No new tasks can be enqueued (will throw)
  • All pending tasks will complete
  • All workers will be joined
Note
This method blocks until all workers have finished.
Calling shutdown() multiple times is safe (no-op after first).

Definition at line 908 of file thread_pool.H.

References condition, Aleph::maps(), queue_mutex, stop, and workers.

Referenced by TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().

◆ start_workers()

void Aleph::ThreadPool::start_workers ( size_t  n)
inlineprivate

Start n worker threads.

Definition at line 295 of file thread_pool.H.

References worker_loop(), and workers.

Referenced by ThreadPool(), and resize().

◆ stop_workers()

void Aleph::ThreadPool::stop_workers ( )
inlineprivate

Stop and join all workers.

Definition at line 303 of file thread_pool.H.

References condition, Aleph::maps(), queue_mutex, stop, and workers.

Referenced by ~ThreadPool().

◆ try_enqueue()

template<typename F , typename... Args>
auto Aleph::ThreadPool::try_enqueue ( F &&  f,
Args &&...  args 
) -> std::optional<std::future<std::invoke_result_t<F, Args...>>>
inline

Try to submit a task without blocking or throwing.

Non-blocking version of enqueue_bounded(). Returns immediately with either a future (if task was queued) or std::nullopt (if queue is full).

Template Parameters
FCallable type.
ArgsArgument types for the callable.
Parameters
fThe callable to execute.
argsArguments to pass to the callable.
Returns
std::optional<std::future<R>> - contains the future if the task was successfully queued, or std::nullopt if the queue is at or above the soft limit.
Exceptions
std::runtime_erroronly if the pool has been shut down.
Note
Uses soft_limit as the threshold. Configure with set_queue_limits().

Example

ThreadPool pool(4);
pool.set_queue_limits(100);
if (auto future = pool.try_enqueue(compute, arg)) {
// Task was queued, can use future->get() later
results.push_back(std::move(*future));
} else {
// Queue is full, handle backpressure
std::cerr << "Queue full, dropping task\n";
}

Definition at line 799 of file thread_pool.H.

References enqueue(), Aleph::maps(), queue_mutex, soft_limit_, stop, and tasks.

Referenced by TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().

◆ try_enqueue_detached()

template<typename F , typename... Args>
bool Aleph::ThreadPool::try_enqueue_detached ( F &&  f,
Args &&...  args 
)
inline

Try to submit a detached task without blocking or throwing.

Non-blocking version of enqueue_bounded_detached().

Template Parameters
FCallable type.
ArgsArgument types for the callable.
Parameters
fThe callable to execute.
argsArguments to pass to the callable.
Returns
true if the task was queued, false if the queue is full.
Exceptions
std::runtime_erroronly if the pool has been shut down.

Example

if (!pool.try_enqueue_detached(log_message, msg)) {
// Queue full, message dropped
}

Definition at line 840 of file thread_pool.H.

References enqueue_detached(), Aleph::maps(), queue_mutex, soft_limit_, stop, and tasks.

Referenced by example_load_shedding(), TEST_F(), TEST_F(), and TEST_F().

◆ wait_all()

void Aleph::ThreadPool::wait_all ( std::chrono::milliseconds  poll_interval = std::chrono::milliseconds(1))
inline

Wait until all current tasks complete.

Blocks until the task queue is empty and no tasks are running. New tasks can still be enqueued while waiting.

Parameters
poll_intervalHow often to check for completion (default 1ms).

Definition at line 973 of file thread_pool.H.

References is_idle(), and Aleph::maps().

Referenced by example_backpressure(), example_fire_and_forget(), example_load_shedding(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), TEST_F(), and TEST_F().

◆ wait_all_for()

template<typename Rep , typename Period >
bool Aleph::ThreadPool::wait_all_for ( std::chrono::duration< Rep, Period timeout)
inline

Wait until all current tasks complete, with timeout.

Blocks until either:

  • The task queue is empty and no tasks are running, OR
  • The timeout expires
Parameters
timeoutMaximum time to wait.
Returns
true if pool became idle, false if timeout expired.

Definition at line 989 of file thread_pool.H.

References active_tasks, Aleph::DynList< T >::empty(), idle_condition, Aleph::maps(), queue_mutex, and tasks.

Referenced by TEST_F(), and TEST_F().

◆ wait_all_until()

bool Aleph::ThreadPool::wait_all_until ( std::chrono::time_point< Clock, Duration deadline)
inline

Wait until all current tasks complete, with deadline.

Blocks until either:

  • The task queue is empty and no tasks are running, OR
  • The deadline is reached
Parameters
deadlineTime point at which to stop waiting.
Returns
true if pool became idle, false if deadline reached.

Definition at line 1007 of file thread_pool.H.

References active_tasks, Aleph::DynList< T >::empty(), idle_condition, Aleph::maps(), queue_mutex, and tasks.

Referenced by TEST_F().

◆ worker_loop()

void Aleph::ThreadPool::worker_loop ( )
inlineprivate

Worker thread main loop.

Definition at line 253 of file thread_pool.H.

References active_tasks, condition, Aleph::DynList< T >::empty(), idle_condition, Aleph::maps(), queue_mutex, soft_limit_, space_available, stop, and tasks.

Referenced by start_workers().

Member Data Documentation

◆ active_tasks

std::atomic<size_t> Aleph::ThreadPool::active_tasks {0}
private

◆ condition

std::condition_variable Aleph::ThreadPool::condition
private

Notifies workers of new tasks.

Definition at line 234 of file thread_pool.H.

Referenced by enqueue(), enqueue_batch(), enqueue_bounded(), enqueue_bounded_detached(), enqueue_detached(), resize(), shutdown(), stop_workers(), and worker_loop().

◆ exception_callback_

ExceptionCallback Aleph::ThreadPool::exception_callback_
private

Definition at line 250 of file thread_pool.H.

Referenced by enqueue_detached(), and set_exception_callback().

◆ hard_limit_

size_t Aleph::ThreadPool::hard_limit_ = std::numeric_limits<size_t>::max()
private

Exception threshold.

Definition at line 242 of file thread_pool.H.

Referenced by enqueue_bounded(), enqueue_bounded_detached(), get_queue_limits(), and set_queue_limits().

◆ idle_condition

std::condition_variable Aleph::ThreadPool::idle_condition
private

Notifies wait_all() of idle state.

Definition at line 236 of file thread_pool.H.

Referenced by wait_all_for(), wait_all_until(), and worker_loop().

◆ peak_queue_size_

std::atomic<size_t> Aleph::ThreadPool::peak_queue_size_ {0}
private

Definition at line 247 of file thread_pool.H.

Referenced by enqueue(), enqueue_batch(), enqueue_detached(), get_stats(), and reset_stats().

◆ queue_mutex

◆ soft_limit_

size_t Aleph::ThreadPool::soft_limit_ = std::numeric_limits<size_t>::max()
private

◆ space_available

std::condition_variable Aleph::ThreadPool::space_available
private

Notifies enqueuers of queue space.

Definition at line 235 of file thread_pool.H.

Referenced by enqueue_bounded(), enqueue_bounded_detached(), and worker_loop().

◆ stop

◆ tasks

◆ tasks_completed_

std::atomic<size_t> Aleph::ThreadPool::tasks_completed_ {0}
private

Definition at line 245 of file thread_pool.H.

Referenced by enqueue(), enqueue_batch(), enqueue_detached(), get_stats(), and reset_stats().

◆ tasks_failed_

std::atomic<size_t> Aleph::ThreadPool::tasks_failed_ {0}
private

Definition at line 246 of file thread_pool.H.

Referenced by enqueue_detached(), get_stats(), and reset_stats().

◆ workers

std::vector<std::thread> Aleph::ThreadPool::workers
private

Definition at line 231 of file thread_pool.H.

Referenced by get_stats(), num_threads(), resize(), shutdown(), start_workers(), and stop_workers().


The documentation for this class was generated from the following file: