110#ifndef ALEPH_THREAD_POOL_H
111#define ALEPH_THREAD_POOL_H
116#include <condition_variable>
122#include <type_traits>
143 :
std::overflow_error(
"ThreadPool queue overflow: " +
221 template <
typename F>
232 std::queue<std::unique_ptr<TaskBase>>
tasks;
257 std::unique_ptr<TaskBase>
task;
298 for (
size_t i = 0; i < n; ++i)
319 template <
typename F,
typename...
Args>
323 return [func = std::forward<F>(f),
324 args_tuple = std::make_tuple(std::forward<Args>(
args)...)]()
mutable
326 return std::apply([&func](
auto&&... a) {
327 return std::invoke(std::move(func), std::forward<
decltype(a)>(a)...);
406 template <
typename F,
typename...
Args>
408 -> std::future<std::invoke_result_t<
F,
Args...>>
416 auto promise = std::make_shared<std::promise<return_type>>();
417 std::future<return_type> result =
promise->get_future();
426 if constexpr (std::is_void_v<return_type>)
435 ++(*completed_counter);
439 promise->set_exception(std::current_exception());
440 ++(*completed_counter);
448 throw std::runtime_error(
"enqueue on stopped ThreadPool");
453 size_t current_size =
tasks.size();
490 template <
typename F,
typename...
Args>
506 callback = std::move(callback)]()
mutable {
510 ++(*completed_counter);
518 try { callback(std::current_exception()); }
528 throw std::runtime_error(
"enqueue_detached on stopped ThreadPool");
533 size_t current_size =
tasks.size();
563 template <
typename F,
typename Container>
565 -> std::vector<std::future<std::invoke_result_t<F, typename Container::value_type>>>
567 using return_type = std::invoke_result_t<F, typename Container::value_type>;
568 std::vector<std::future<return_type>>
results;
599 size_t hard_limit = std::numeric_limits<size_t>::max())
603 hard_limit_ = (hard_limit == std::numeric_limits<size_t>::max() &&
604 soft_limit != std::numeric_limits<size_t>::max())
653 template <
typename F,
typename...
Args>
655 -> std::future<std::invoke_result_t<
F,
Args...>>
661 auto promise = std::make_shared<std::promise<return_type>>();
662 std::future<return_type> result =
promise->get_future();
667 if constexpr (std::is_void_v<return_type>)
679 promise->set_exception(std::current_exception());
696 throw std::runtime_error(
"enqueue_bounded on stopped ThreadPool");
723 template <
typename F,
typename...
Args>
752 throw std::runtime_error(
"enqueue_bounded_detached on stopped ThreadPool");
798 template <
typename F,
typename...
Args>
800 -> std::optional<std::future<std::invoke_result_t<
F,
Args...>>>
806 throw std::runtime_error(
"try_enqueue on stopped ThreadPool");
814 return enqueue(std::forward<F>(f), std::forward<Args>(
args)...);
839 template <
typename F,
typename...
Args>
846 throw std::runtime_error(
"try_enqueue_detached on stopped ThreadPool");
912 if (
stop.exchange(
true))
946 throw std::runtime_error(
"cannot resize a stopped ThreadPool");
988 template <
typename Rep,
typename Period>
1006 template <
typename Clock,
typename Duration>
1096 template <
typename F,
typename Container>
1098 -> std::vector<std::future<
decltype(std::apply(f, *std::begin(
args_list)))>>
1100 using ArgsTuple =
typename Container::value_type;
1101 using return_type =
decltype(std::apply(f, std::declval<ArgsTuple>()));
1103 std::vector<std::future<return_type>>
results;
1106 std::vector<std::unique_ptr<TaskBase>>
batch_tasks;
1114 auto promise = std::make_shared<std::promise<return_type>>();
1120 if constexpr (std::is_void_v<return_type>)
1122 std::apply(func,
args);
1129 ++(*completed_counter);
1133 promise->set_exception(std::current_exception());
1134 ++(*completed_counter);
1146 throw std::runtime_error(
"enqueue_batch on stopped ThreadPool");
1152 size_t current_size =
tasks.size();
1154 while (current_size >
peak and
1201template <
typename Iterator,
typename F>
1203 size_t chunk_size = 0)
1205 const size_t total = std::distance(begin, end);
1210 if (chunk_size == 0)
1213 std::vector<std::future<void>>
futures;
1214 futures.reserve((
total + chunk_size - 1) / chunk_size);
1217 if constexpr (std::is_invocable_v<F, Iterator, Iterator>)
1223 std::advance(
chunk_end, std::min(chunk_size,
1224 static_cast<size_t>(std::distance(
chunk_begin, end))));
1227 f(chunk_begin, chunk_end);
1239 std::advance(
chunk_end, std::min(chunk_size,
1240 static_cast<size_t>(std::distance(
chunk_begin, end))));
1243 for (Iterator it = chunk_begin; it != chunk_end; ++it)
1285template <
typename InputIt,
typename OutputIt,
typename F>
1289 const size_t total = std::distance(first, last);
1293 if (chunk_size == 0)
1296 std::vector<std::future<void>>
futures;
1297 futures.reserve((
total + chunk_size - 1) / chunk_size);
1305 static_cast<size_t>(std::distance(
chunk_in, last)));
1310 InputIt in = chunk_in;
1311 OutputIt out = chunk_out;
1312 while (in != chunk_in_end)
1364template <
typename Iterator,
typename T,
typename BinaryOp>
1366 T init, BinaryOp op,
size_t chunk_size = 0)
1368 const size_t total = std::distance(first, last);
1372 if (chunk_size == 0)
1373 chunk_size = std::max(
size_t(1), total / (pool.
num_threads() * 4));
1375 std::vector<std::future<T>> futures;
1376 futures.reserve((total + chunk_size - 1) / chunk_size);
1378 for (Iterator chunk_begin = first; chunk_begin < last; )
1380 Iterator chunk_end = chunk_begin;
1381 std::advance(chunk_end, std::min(chunk_size,
1382 static_cast<size_t>(std::distance(chunk_begin, last))));
1384 futures.push_back(pool.
enqueue([op, chunk_begin, chunk_end]() {
1385 T local_result = *chunk_begin;
1386 for (Iterator it = chunk_begin + 1; it != chunk_end; ++it)
1387 local_result = op(local_result, *it);
1388 return local_result;
1391 chunk_begin = chunk_end;
1396 for (
auto& fut : futures)
1397 result = op(result, fut.get());
1421template <
typename F>
1423 size_t chunk_size = 0)
1428 const size_t total = end - start;
1430 if (chunk_size == 0)
1431 chunk_size = std::max(
size_t(1), total / (pool.
num_threads() * 4));
1433 std::vector<std::future<void>> futures;
1434 futures.reserve((total + chunk_size - 1) / chunk_size);
1436 for (
size_t chunk_start = start; chunk_start < end; )
1438 size_t chunk_end = std::min(chunk_start + chunk_size, end);
1440 futures.push_back(pool.
enqueue([f, chunk_start, chunk_end]() {
1441 for (size_t i = chunk_start; i < chunk_end; ++i)
1445 chunk_start = chunk_end;
1448 for (
auto& fut : futures)
void empty() noexcept
empty the list
size_t size() const noexcept
Count the number of elements of the list.
A reusable thread pool for efficient parallel task execution.
bool try_enqueue_detached(F &&f, Args &&... args)
Try to submit a detached task without blocking or throwing.
size_t num_threads() const noexcept
Get the number of worker threads.
bool wait_all_for(std::chrono::duration< Rep, Period > timeout)
Wait until all current tasks complete, with timeout.
void enqueue_detached(F &&f, Args &&... args)
Submit a task without tracking the result (fire-and-forget).
void set_exception_callback(ExceptionCallback callback)
Set callback for exceptions in detached tasks.
auto enqueue_batch(F &&f, const Container &args_list) -> std::vector< std::future< decltype(std::apply(f, *std::begin(args_list)))> >
Submit multiple tasks with a single notification.
size_t running_tasks() const noexcept
Get the number of tasks currently being executed.
bool is_idle() const
Check if the pool is idle (no pending or running tasks).
auto enqueue_bounded(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task with backpressure and memory protection.
ThreadPool(ThreadPool &&)=delete
Non-movable (due to mutex and condition_variable)
static auto make_invocable(F &&f, Args &&... args)
Helper to create a callable from function + arguments using std::invoke.
std::condition_variable idle_condition
Notifies wait_all() of idle state.
void wait_all(std::chrono::milliseconds poll_interval=std::chrono::milliseconds(1))
Wait until all current tasks complete.
auto enqueue_bulk(F &&f, const Container &args_container) -> std::vector< std::future< std::invoke_result_t< F, typename Container::value_type > > >
Submit multiple tasks and collect all futures.
void resize(size_t new_size)
Resize the pool to a different number of workers.
ThreadPoolStats get_stats() const
Get current pool statistics.
void reset_stats()
Reset statistics counters to zero.
bool wait_all_until(std::chrono::time_point< Clock, Duration > deadline)
Wait until all current tasks complete, with deadline.
std::condition_variable space_available
Notifies enqueuers of queue space.
void start_workers(size_t n)
Start n worker threads.
void set_queue_limits(size_t soft_limit, size_t hard_limit=std::numeric_limits< size_t >::max())
Set queue limits for bounded enqueue operations.
std::atomic< size_t > peak_queue_size_
ThreadPool & operator=(ThreadPool &&)=delete
bool is_stopped() const noexcept
Check if the pool has been shut down.
std::pair< size_t, size_t > get_queue_limits() const
Get current queue limits.
ThreadPool(size_t n_threads=std::thread::hardware_concurrency())
Construct a thread pool with specified number of workers.
std::atomic< size_t > tasks_failed_
size_t hard_limit_
Exception threshold.
~ThreadPool()
Destructor - stops all workers and waits for completion.
size_t soft_limit_
Block threshold.
std::condition_variable condition
Notifies workers of new tasks.
std::vector< std::thread > workers
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task for execution and get a future for the result.
void enqueue_bounded_detached(F &&f, Args &&... args)
Submit a task with backpressure, without tracking result.
void stop_workers()
Stop and join all workers.
void worker_loop()
Worker thread main loop.
std::atomic< size_t > active_tasks
size_t pending_tasks() const
Get the number of pending tasks in the queue.
void shutdown()
Shut down the pool, completing all pending tasks.
std::atomic< size_t > tasks_completed_
ThreadPool(const ThreadPool &)=delete
Non-copyable.
auto try_enqueue(F &&f, Args &&... args) -> std::optional< std::future< std::invoke_result_t< F, Args... > > >
Try to submit a task without blocking or throwing.
ThreadPool & operator=(const ThreadPool &)=delete
std::queue< std::unique_ptr< TaskBase > > tasks
ExceptionCallback exception_callback_
Exception thrown when the task queue exceeds its hard limit.
size_t current_size() const noexcept
Current queue size when exception was thrown.
queue_overflow_error(size_t current_size, size_t hard_limit)
size_t hard_limit() const noexcept
Hard limit that was exceeded.
Main namespace for Aleph-w library functions.
T parallel_reduce(ThreadPool &pool, Iterator first, Iterator last, T init, BinaryOp op, size_t chunk_size=0)
Reduce elements in parallel.
OutputIt parallel_transform(ThreadPool &pool, InputIt first, InputIt last, OutputIt d_first, F &&f, size_t chunk_size=0)
Transform elements in parallel and store results.
ThreadPool & default_pool()
Global default thread pool.
void parallel_for_index(ThreadPool &pool, size_t start, size_t end, F &&f, size_t chunk_size=0)
Apply a function to each element in parallel (index-based).
void parallel_for(ThreadPool &pool, Iterator begin, Iterator end, F &&f, size_t chunk_size=0)
Execute a function in parallel over a range.
std::decay_t< typename HeadC::Item_Type > T
std::string to_string(const time_t t, const std::string &format)
Format a time_t value into a string using format.
std::function< void(std::exception_ptr)> ExceptionCallback
Type for exception callback in detached tasks.
DynList< T > maps(const C &c, Op op)
Classic map operation.
Statistics collected by ThreadPool.
double queue_utilization(size_t soft_limit) const noexcept
Queue utilization as percentage (0-100)
size_t current_queue_size
Current pending tasks.
size_t num_workers
Number of worker threads.
size_t tasks_completed
Total tasks completed.
size_t peak_queue_size
Maximum queue size observed.
size_t total_processed() const noexcept
Total tasks processed (completed + failed)
size_t current_active
Currently executing tasks.
size_t tasks_failed
Tasks that threw exceptions (detached)
Type-erased task wrapper.
virtual ~TaskBase()=default
Concrete task implementation with type preservation.