109#ifndef ALEPH_THREAD_POOL_H
110#define ALEPH_THREAD_POOL_H
116#include <condition_variable>
122#include <type_traits>
144 :
std::runtime_error(
"operation canceled") {}
181 std::vector<std::condition_variable *>
to_notify;
201 std::shared_ptr<CancellationState>
state_;
204 :
state_(std::move(state)) {}
212 std::shared_ptr<CancellationState>
state_;
213 std::condition_variable *
cv_ =
nullptr;
218 state_->unregister_condition_variable(
cv_);
231 std::condition_variable * cv)
noexcept
270 return state_ !=
nullptr and state_->stop_requested.load(std::memory_order_relaxed);
295 std::condition_variable & cv)
const
299 state_->register_condition_variable(&cv);
302 state_->unregister_condition_variable(&cv);
313 std::shared_ptr<CancellationState>
state_ =
314 std::make_shared<CancellationState>();
329 state_->stop_requested.store(
true, std::memory_order_relaxed);
330 state_->notify_observers();
339 return state_->stop_requested.load(std::memory_order_relaxed);
370 :
std::overflow_error(
"ThreadPool queue overflow: " +
450 template <
typename F>
461 std::queue<std::unique_ptr<TaskBase>>
tasks;
486 std::unique_ptr<TaskBase>
task;
527 for (
size_t i = 0; i < n; ++i)
547 template <
typename F,
typename...
Args>
551 return [func = std::forward<F>(f),
552 args_tuple = std::make_tuple(std::forward<Args>(
args)...)]()
mutable
554 return std::apply([&func](
auto &&... a)
556 return std::invoke(std::move(func), std::forward<
decltype(a)>(a)...);
639 template <
typename F,
typename...
Args>
641 -> std::future<std::invoke_result_t<
F,
Args...>>
649 auto promise = std::make_shared<std::promise<return_type>>();
650 std::future<return_type> result =
promise->get_future();
660 if constexpr (std::is_void_v<return_type>)
669 ++(*completed_counter);
673 promise->set_exception(std::current_exception());
674 ++(*completed_counter);
680 throw std::runtime_error(
"enqueue on stopped ThreadPool");
685 const size_t current_size =
tasks.size();
721 template <
typename F,
typename...
Args>
736 callback = std::move(callback)]()
mutable
741 ++(*completed_counter);
749 try { callback(std::current_exception()); }
757 throw std::runtime_error(
"enqueue_detached on stopped ThreadPool");
762 const size_t current_size =
tasks.size();
791 template <
typename F,
typename Container>
793 -> std::vector<std::future<std::invoke_result_t<F, typename Container::value_type>>>
795 using return_type = std::invoke_result_t<F, typename Container::value_type>;
796 std::vector<std::future<return_type>>
results;
827 const size_t hard_limit = std::numeric_limits<size_t>::max())
831 hard_limit_ = (hard_limit == std::numeric_limits<size_t>::max()
and
832 soft_limit != std::numeric_limits<size_t>::max()) ?
882 template <
typename F,
typename...
Args>
884 -> std::future<std::invoke_result_t<
F,
Args...>>
890 auto promise = std::make_shared<std::promise<return_type>>();
891 std::future<return_type> result =
promise->get_future();
897 if constexpr (std::is_void_v<return_type>)
909 promise->set_exception(std::current_exception());
925 throw std::runtime_error(
"enqueue_bounded on stopped ThreadPool");
952 template <
typename F,
typename...
Args>
981 throw std::runtime_error(
"enqueue_bounded_detached on stopped ThreadPool");
1027 template <
typename F,
typename...
Args>
1029 -> std::optional<std::future<std::invoke_result_t<
F,
Args...>>>
1034 throw std::runtime_error(
"try_enqueue on stopped ThreadPool");
1038 return std::nullopt;
1042 return enqueue(std::forward<F>(f), std::forward<Args>(
args)...);
1067 template <
typename F,
typename...
Args>
1073 throw std::runtime_error(
"try_enqueue_detached on stopped ThreadPool");
1097 return tasks.size();
1138 if (
stop.exchange(
true))
1172 throw std::runtime_error(
"cannot resize a stopped ThreadPool");
1214 template <
typename Rep,
typename Period>
1233 template <
typename Clock,
typename Duration>
1322 template <
typename F,
typename Container>
1324 -> std::vector<std::future<
decltype(std::apply(f, *std::begin(
args_list)))>>
1326 using ArgsTuple =
typename Container::value_type;
1327 using return_type =
decltype(std::apply(f, std::declval<ArgsTuple>()));
1329 std::vector<std::future<return_type>>
results;
1332 std::vector<std::unique_ptr<TaskBase>>
batch_tasks;
1340 auto promise = std::make_shared<std::promise<return_type>>();
1347 if constexpr (std::is_void_v<return_type>)
1349 std::apply(func,
args);
1356 ++(*completed_counter);
1360 promise->set_exception(std::current_exception());
1361 ++(*completed_counter);
1371 throw std::runtime_error(
"enqueue_batch on stopped ThreadPool");
1377 const size_t current_size =
tasks.size();
1379 while (current_size >
peak and
1394 namespace parallel_detail
1402 const size_t num_threads,
1412 total / std::max<size_t>(1, num_threads * 4));
1431 if (
options.cancel_token.stop_requested())
1440 return pool.num_threads() <= 1;
1448 template <
typename InputIt1,
typename InputIt2,
typename OutputIt,
typename Compare>
1508 template <
typename Iterator,
typename F>
1510 size_t chunk_size = 0)
1514 options.chunk_size = chunk_size;
1519 template <
typename Iterator,
typename F>
1524 const size_t total = std::distance(begin, end);
1528 const size_t chunk_size =
1534 if constexpr (std::is_invocable_v<F, Iterator, Iterator>)
1540 for (Iterator it = begin; it != end; ++it)
1550 std::vector<std::future<void>>
futures;
1551 futures.reserve((
total + chunk_size - 1) / chunk_size);
1554 if constexpr (std::is_invocable_v<F, Iterator, Iterator>)
1560 std::advance(
chunk_end, std::min(chunk_size,
1561 static_cast<size_t>(std::distance(
chunk_begin, end))));
1565 parallel_detail::throw_if_canceled(token);
1566 f(chunk_begin, chunk_end);
1578 std::advance(
chunk_end, std::min(chunk_size,
1579 static_cast<size_t>(std::distance(
chunk_begin, end))));
1583 for (Iterator it = chunk_begin; it != chunk_end; ++it)
1585 parallel_detail::throw_if_canceled(token);
1630 template <
typename InputIt,
typename OutputIt,
typename F>
1632 OutputIt d_first, F && f,
size_t chunk_size = 0)
1636 options.chunk_size = chunk_size;
1641 template <
typename InputIt,
typename OutputIt,
typename F>
1645 ThreadPool & pool = parallel_detail::select_pool(
options);
1646 const size_t total = std::distance(first, last);
1650 const size_t chunk_size =
1651 parallel_detail::resolve_chunk_size(total, pool.num_threads(),
options);
1653 if (parallel_detail::use_sequential_path(total, pool,
options))
1659 parallel_detail::throw_if_canceled(
options.cancel_token);
1664 parallel_detail::throw_if_canceled(
options.cancel_token);
1665 return d_first + total;
1668 std::vector<std::future<void>> futures;
1669 futures.reserve((total + chunk_size - 1) / chunk_size);
1671 InputIt chunk_in = first;
1672 OutputIt chunk_out = d_first;
1674 while (chunk_in < last)
1676 size_t chunk_len = std::min(chunk_size,
1677 static_cast<size_t>(std::distance(chunk_in, last)));
1678 InputIt chunk_in_end = chunk_in;
1679 std::advance(chunk_in_end, chunk_len);
1681 futures.push_back(pool.enqueue([f, chunk_in, chunk_in_end, chunk_out,
1682 token =
options.cancel_token]()
1684 InputIt in = chunk_in;
1685 OutputIt out = chunk_out;
1686 while (in != chunk_in_end)
1688 parallel_detail::throw_if_canceled(token);
1695 chunk_in = chunk_in_end;
1696 std::advance(chunk_out, chunk_len);
1699 for (
auto & fut: futures)
1702 parallel_detail::throw_if_canceled(
options.cancel_token);
1704 return d_first + total;
1741 template <
typename Iterator,
typename T,
typename BinaryOp>
1743 T init, BinaryOp op,
size_t chunk_size = 0)
1747 options.chunk_size = chunk_size;
1752 template <
typename Iterator,
typename T,
typename BinaryOp>
1756 ThreadPool & pool = parallel_detail::select_pool(
options);
1757 const size_t total = std::distance(first, last);
1761 const size_t chunk_size =
1762 parallel_detail::resolve_chunk_size(total, pool.num_threads(),
options);
1764 if (parallel_detail::use_sequential_path(total, pool,
options))
1767 for (Iterator it = first; it != last; ++it)
1769 parallel_detail::throw_if_canceled(
options.cancel_token);
1770 result = op(result, *it);
1772 parallel_detail::throw_if_canceled(
options.cancel_token);
1776 std::vector<std::future<T>> futures;
1777 futures.reserve((total + chunk_size - 1) / chunk_size);
1779 for (Iterator chunk_begin = first; chunk_begin < last;)
1781 Iterator chunk_end = chunk_begin;
1782 std::advance(chunk_end, std::min(chunk_size,
1783 static_cast<size_t>(std::distance(chunk_begin, last))));
1785 futures.push_back(pool.enqueue([op, chunk_begin, chunk_end,
1786 token =
options.cancel_token]()
1788 parallel_detail::throw_if_canceled(token);
1789 T local_result = *chunk_begin;
1790 for (Iterator it = chunk_begin + 1; it != chunk_end; ++it)
1792 parallel_detail::throw_if_canceled(token);
1793 local_result = op(local_result, *it);
1795 return local_result;
1798 chunk_begin = chunk_end;
1803 for (
auto & fut: futures)
1804 result = op(result, fut.get());
1806 parallel_detail::throw_if_canceled(
options.cancel_token);
1830 template <
typename F>
1832 size_t chunk_size = 0)
1836 options.chunk_size = chunk_size;
1841 template <
typename F>
1845 ThreadPool & pool = parallel_detail::select_pool(
options);
1849 const size_t total = end - start;
1851 const size_t chunk_size =
1852 parallel_detail::resolve_chunk_size(total, pool.num_threads(),
options);
1854 if (parallel_detail::use_sequential_path(total, pool,
options))
1856 for (
size_t i = start; i < end; ++i)
1858 parallel_detail::throw_if_canceled(
options.cancel_token);
1861 parallel_detail::throw_if_canceled(
options.cancel_token);
1865 std::vector<std::future<void>> futures;
1866 futures.reserve((total + chunk_size - 1) / chunk_size);
1868 for (
size_t chunk_start = start; chunk_start < end;)
1870 size_t chunk_end = std::min(chunk_start + chunk_size, end);
1872 futures.push_back(pool.enqueue([f, chunk_start, chunk_end,
1873 token =
options.cancel_token]()
1875 for (size_t i = chunk_start; i < chunk_end; ++i)
1877 parallel_detail::throw_if_canceled(token);
1882 chunk_start = chunk_end;
1885 for (
auto & fut: futures)
1888 parallel_detail::throw_if_canceled(
options.cancel_token);
1897 template <
typename... Fs>
1906 template <
typename... Fs>
1909 constexpr size_t task_count =
sizeof...(Fs);
1910 if constexpr (task_count == 0)
1918 if (parallel_detail::use_sequential_path(task_count, pool,
options))
1920 auto invoke_one = [&](
auto && fn)
1922 parallel_detail::throw_if_canceled(
options.cancel_token);
1923 if constexpr (std::is_void_v<std::invoke_result_t<
decltype(fn)>>)
1924 std::invoke(std::forward<
decltype(fn)>(fn));
1926 static_cast<void>(std::invoke(std::forward<
decltype(fn)>(fn)));
1929 (invoke_one(std::forward<Fs>(fs)), ...);
1930 parallel_detail::throw_if_canceled(
options.cancel_token);
1934 std::vector<std::future<void>> futures;
1935 futures.reserve(task_count);
1937 auto launch_one = [&](
auto && fn)
1939 futures.push_back(pool.
enqueue(
1940 [func = std::forward<
decltype(fn)>(fn),
1941 token =
options.cancel_token]()
mutable
1943 parallel_detail::throw_if_canceled(token);
1944 if constexpr (std::is_void_v<std::invoke_result_t<decltype(func) &>>)
1947 static_cast<void>(std::invoke(func));
1951 (launch_one(std::forward<Fs>(fs)), ...);
1953 std::exception_ptr first_exception;
1954 for (
auto & fut: futures)
1962 if (first_exception ==
nullptr)
1963 first_exception = std::current_exception();
1967 if (first_exception !=
nullptr)
1968 std::rethrow_exception(first_exception);
1970 parallel_detail::throw_if_canceled(
options.cancel_token);
1980 template <
typename InputIt,
typename OutputIt,
typename BinaryOp>
1982 OutputIt d_first, BinaryOp op,
size_t chunk_size = 0)
1986 options.chunk_size = chunk_size;
1991 template <
typename InputIt,
typename OutputIt,
typename BinaryOp>
1992 OutputIt
pscan(InputIt first, InputIt last, OutputIt d_first,
1995 using Value = std::decay_t<std::invoke_result_t<BinaryOp,
decltype(*first),
decltype(*first)>>;
1997 ThreadPool & pool = parallel_detail::select_pool(
options);
1998 const size_t total = std::distance(first, last);
2002 const size_t chunk_size =
2003 parallel_detail::resolve_chunk_size(total, pool.num_threads(),
options);
2005 if (parallel_detail::use_sequential_path(total, pool,
options))
2007 parallel_detail::throw_if_canceled(
options.cancel_token);
2010 OutputIt out = d_first;
2011 Value running = *in;
2018 parallel_detail::throw_if_canceled(
options.cancel_token);
2019 running = op(running, *in);
2025 parallel_detail::throw_if_canceled(
options.cancel_token);
2026 return d_first + total;
2029 const size_t num_chunks = (total +
chunk_size - 1) / chunk_size;
2030 std::vector<std::optional<Value>> partials(total);
2031 std::vector<std::optional<Value>> chunk_totals(num_chunks);
2032 std::vector<std::future<void>> futures;
2033 futures.reserve(num_chunks);
2035 for (
size_t chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx)
2038 const size_t chunk_end = std::min(
offset + chunk_size, total);
2039 InputIt chunk_first = first +
offset;
2040 InputIt chunk_last = first + chunk_end;
2041 futures.push_back(pool.enqueue(
2042 [chunk_first, chunk_last, &partials,
offset,
2043 &chunk_totals, chunk_idx, op,
2044 token =
options.cancel_token]()
mutable
2046 parallel_detail::throw_if_canceled(token);
2048 InputIt in = chunk_first;
2049 Value running = *in;
2050 partials[offset].emplace(running);
2052 size_t out_idx = offset + 1;
2054 while (in != chunk_last)
2056 parallel_detail::throw_if_canceled(token);
2057 running = op(running, *in);
2058 partials[out_idx++].emplace(running);
2062 chunk_totals[chunk_idx].emplace(running);
2066 for (
auto & fut: futures)
2069 parallel_detail::throw_if_canceled(
options.cancel_token);
2073 std::vector<std::optional<Value>> carries(num_chunks);
2074 for (
size_t i = 1; i < num_chunks; ++i)
2076 parallel_detail::throw_if_canceled(
options.cancel_token);
2077 carries[i].emplace((i == 1)
2079 : op(*carries[i - 1], *chunk_totals[i - 1]));
2082 std::vector<std::future<void>> adjust_futures;
2083 adjust_futures.reserve(num_chunks - 1);
2085 for (
size_t chunk_idx = 1; chunk_idx < num_chunks; ++chunk_idx)
2088 const size_t chunk_end = std::min(
offset + chunk_size, total);
2089 Value carry = *carries[chunk_idx];
2091 adjust_futures.push_back(pool.enqueue(
2092 [&partials, chunk_end,
offset, carry, op,
2093 token =
options.cancel_token]()
mutable
2095 for (size_t i = offset; i < chunk_end; ++i)
2097 parallel_detail::throw_if_canceled(token);
2098 partials[i] = op(carry, *partials[i]);
2103 for (
auto & fut: adjust_futures)
2107 OutputIt out = d_first;
2108 for (
auto & partial: partials)
2110 *out = std::move(*partial);
2114 parallel_detail::throw_if_canceled(
options.cancel_token);
2115 return d_first + total;
2123 template <
typename InputIt,
typename OutputIt,
typename T,
typename BinaryOp>
2125 OutputIt d_first,
T init, BinaryOp op,
2126 size_t chunk_size = 0)
2130 options.chunk_size = chunk_size;
2135 template <
typename InputIt,
typename OutputIt,
typename T,
typename BinaryOp>
2139 using Value = std::decay_t<T>;
2141 ThreadPool & pool = parallel_detail::select_pool(
options);
2142 const size_t total = std::distance(first, last);
2146 if (parallel_detail::use_sequential_path(total, pool,
options))
2149 for (
size_t i = 0; i < total; ++i)
2151 parallel_detail::throw_if_canceled(
options.cancel_token);
2152 d_first[i] = running;
2153 running = op(running, first[i]);
2155 parallel_detail::throw_if_canceled(
options.cancel_token);
2156 return d_first + total;
2159 std::vector<std::optional<Value>> inclusive(total);
2168 d_first[i] = op(init, *inclusive[i - 1]);
2172 parallel_detail::throw_if_canceled(
options.cancel_token);
2173 return d_first + total;
2182 template <
typename InputIt1,
typename InputIt2,
typename OutputIt,
2183 typename Compare = std::less<>>
2185 InputIt1 first1, InputIt1 last1,
2186 InputIt2 first2, InputIt2 last2,
2187 OutputIt d_first, Compare comp = Compare{},
2188 size_t chunk_size = 0)
2192 options.chunk_size = chunk_size;
2193 return pmerge(first1, last1, first2, last2, d_first, std::move(comp),
options);
2197 template <
typename InputIt1,
typename InputIt2,
typename OutputIt,
2198 typename Compare = std::less<>>
2199 OutputIt
pmerge(InputIt1 first1, InputIt1 last1,
2200 InputIt2 first2, InputIt2 last2,
2201 OutputIt d_first, Compare comp = Compare{},
2202 const ParallelOptions &
options = {})
2204 ThreadPool & pool = parallel_detail::select_pool(
options);
2205 const size_t n1 = std::distance(first1, last1);
2206 const size_t n2 = std::distance(first2, last2);
2207 const size_t total = n1 + n2;
2212 if (parallel_detail::use_sequential_path(total, pool,
options) or n1 == 0 or n2 == 0)
2214 parallel_detail::merge_sequential(first1, last1, first2, last2, d_first,
2216 parallel_detail::throw_if_canceled(
options.cancel_token);
2217 return d_first + total;
2220 const size_t chunk_hint =
2221 parallel_detail::resolve_chunk_size(total, pool.num_threads(),
options);
2222 const size_t desired_tasks = std::max<size_t>(1, (total + chunk_hint - 1) / chunk_hint);
2224 std::vector<std::future<void>> futures;
2228 const size_t task_count = std::max<size_t>(1, std::min(desired_tasks, n1));
2229 const size_t base_chunk = (n1 + task_count - 1) / task_count;
2230 futures.reserve(task_count);
2232 size_t left_offset = 0;
2233 size_t right_offset = 0;
2234 for (
size_t task = 0; task < task_count; ++task)
2236 parallel_detail::throw_if_canceled(
options.cancel_token);
2237 const size_t next_left = std::min(left_offset + base_chunk, n1);
2238 const size_t next_right =
2240 ?
static_cast<size_t>(
2241 std::lower_bound(first2 + right_offset, last2,
2242 first1[next_left], comp) - first2)
2245 futures.push_back(pool.enqueue(
2246 [left_begin = first1 + left_offset,
2247 left_end = first1 + next_left,
2248 right_begin = first2 + right_offset,
2249 right_end = first2 + next_right,
2250 out = d_first + left_offset + right_offset,
2251 comp, token =
options.cancel_token]()
mutable
2253 parallel_detail::merge_sequential(left_begin, left_end,
2254 right_begin, right_end,
2258 left_offset = next_left;
2259 right_offset = next_right;
2264 const size_t task_count = std::max<size_t>(1, std::min(desired_tasks, n2));
2265 const size_t base_chunk = (n2 + task_count - 1) / task_count;
2266 futures.reserve(task_count);
2268 size_t left_offset = 0;
2269 size_t right_offset = 0;
2270 for (
size_t task = 0; task < task_count; ++task)
2272 parallel_detail::throw_if_canceled(
options.cancel_token);
2273 const size_t next_right = std::min(right_offset + base_chunk, n2);
2274 const size_t next_left =
2276 ?
static_cast<size_t>(
2277 std::upper_bound(first1 + left_offset, last1,
2278 first2[next_right], comp) - first1)
2281 futures.push_back(pool.enqueue(
2282 [left_begin = first1 + left_offset,
2283 left_end = first1 + next_left,
2284 right_begin = first2 + right_offset,
2285 right_end = first2 + next_right,
2286 out = d_first + left_offset + right_offset,
2287 comp, token =
options.cancel_token]()
mutable
2289 parallel_detail::merge_sequential(left_begin, left_end,
2290 right_begin, right_end,
2294 left_offset = next_left;
2295 right_offset = next_right;
2299 for (
auto & fut: futures)
2302 parallel_detail::throw_if_canceled(
options.cancel_token);
2303 return d_first + total;
2336 template <
typename F,
typename... Args>
2339 return [func = std::forward<F>(f),
2340 args_tuple = std::make_tuple(std::forward<Args>(args)...)]()
mutable
2342 return std::apply([&func](
auto &&... a)
2344 return std::invoke(std::move(func), std::forward<
decltype(a)>(a)...);
2345 }, std::move(args_tuple));
2351 std::exception_ptr first_exception;
2352 for (
auto & task: tasks_)
2360 if (first_exception ==
nullptr)
2361 first_exception = std::current_exception();
2365 if (propagate and first_exception !=
nullptr)
2366 std::rethrow_exception(first_exception);
2388 template <
typename F,
typename... Args>
2391 auto invocable = make_invocable(std::forward<F>(f), std::forward<Args>(args)...);
2392 tasks_.push_back(pool_->
enqueue([invocable = std::move(invocable)]()
mutable
2394 if constexpr (std::is_void_v<std::invoke_result_t<decltype(invocable) &>>)
2397 static_cast<void>(invocable());
2402 template <
typename F,
typename... Args>
2403 void run(F && f, Args &&... args)
2405 launch(std::forward<F>(f), std::forward<Args>(args)...);
2412 [[nodiscard]]
size_t size() const noexcept {
return tasks_.size(); }
2415 [[nodiscard]]
bool is_empty() const noexcept {
return tasks_.empty(); }
Cooperative cancellation source paired with CancellationToken.
CancellationToken token() const noexcept
Return a token observing this source.
bool stop_requested() const noexcept
Return true if cancellation has already been requested.
std::shared_ptr< CancellationState > state_
void request_cancel() noexcept
Request cancellation for all derived tokens.
CancellationSource()=default
Default constructor.
void reset()
Reset the source so future tokens observe a fresh state.
void cancel() noexcept
Synonym for request_cancel().
RAII guard for condition variable cancellation registration.
~ConditionVariableRegistration()
std::shared_ptr< CancellationState > state_
ConditionVariableRegistration(std::shared_ptr< CancellationState > state, std::condition_variable *cv) noexcept
Construct with shared state and condition variable.
ConditionVariableRegistration()=default
Default constructor.
std::condition_variable * cv_
ConditionVariableRegistration(ConditionVariableRegistration &&other) noexcept
void unregister() noexcept
ConditionVariableRegistration(const ConditionVariableRegistration &)=delete
Deleted copy constructor.
Read-only cooperative cancellation token.
ConditionVariableRegistration notify_on_cancel(std::condition_variable &cv) const
Request notification of a condition variable on cancellation.
CancellationToken(std::shared_ptr< CancellationState > state) noexcept
bool stop_requested() const noexcept
Return true if cancellation has been requested.
std::shared_ptr< CancellationState > state_
CancellationToken()=default
Default constructor.
bool valid() const noexcept
Return true if the token is connected to a source.
bool is_cancellation_requested() const noexcept
Synonym for stop_requested().
void throw_if_cancellation_requested() const
Throw operation_canceled if cancellation was requested.
Minimal structured-concurrency helper over ThreadPool futures.
bool is_empty() const noexcept
Return true if the group has no pending tasks.
TaskGroup & operator=(const TaskGroup &)=delete
Deleted copy assignment operator.
void run(F &&f, Args &&... args)
Compatibility wrapper for launch().
std::vector< std::future< void > > tasks_
TaskGroup(ThreadPool &pool=default_pool()) noexcept
void wait()
Wait for all launched tasks and rethrow the first exception.
size_t size() const noexcept
Return the number of tasks currently tracked by the group.
TaskGroup(TaskGroup &&)=delete
Deleted move constructor.
void drain(const bool propagate)
TaskGroup & operator=(TaskGroup &&)=delete
Deleted move assignment operator.
static auto make_invocable(F &&f, Args &&... args)
TaskGroup(const TaskGroup &)=delete
Deleted copy constructor.
void launch(F &&f, Args &&... args)
Launch a task that belongs to this group.
A reusable thread pool for efficient parallel task execution.
bool try_enqueue_detached(F &&f, Args &&... args)
Try to submit a detached task without blocking or throwing.
size_t num_threads() const noexcept
Get the number of worker threads.
bool wait_all_for(std::chrono::duration< Rep, Period > timeout)
Wait until all current tasks complete, with timeout.
void enqueue_detached(F &&f, Args &&... args)
Submit a task without tracking the result (fire-and-forget).
void set_exception_callback(ExceptionCallback callback)
Set callback for exceptions in detached tasks.
auto enqueue_batch(F &&f, const Container &args_list) -> std::vector< std::future< decltype(std::apply(f, *std::begin(args_list)))> >
Submit multiple tasks with a single notification.
size_t running_tasks() const noexcept
Get the number of tasks currently being executed.
bool is_idle() const
Check if the pool is idle (no pending or running tasks).
auto enqueue_bounded(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task with backpressure and memory protection.
ThreadPool(ThreadPool &&)=delete
Deleted move constructor.
static auto make_invocable(F &&f, Args &&... args)
Helper to create a callable from function + arguments using std::invoke.
std::condition_variable idle_condition
Notifies wait_all() of idle state.
auto enqueue_bulk(F &&f, const Container &args_container) -> std::vector< std::future< std::invoke_result_t< F, typename Container::value_type > > >
Submit multiple tasks and collect all futures.
void resize(size_t new_size)
Resize the pool to a different number of workers.
ThreadPoolStats get_stats() const
Get current pool statistics.
void reset_stats()
Reset statistics counters to zero.
bool wait_all_until(std::chrono::time_point< Clock, Duration > deadline)
Wait until all current tasks complete, with deadline.
std::condition_variable space_available
Notifies enqueuers of queue space.
void start_workers(size_t n)
Start n worker threads.
std::atomic< size_t > peak_queue_size_
ThreadPool & operator=(ThreadPool &&)=delete
Deleted move assignment operator.
bool is_stopped() const noexcept
Check if the pool has been shut down.
void wait_all(const std::chrono::milliseconds poll_interval=std::chrono::milliseconds(1))
Wait until all current tasks complete.
std::pair< size_t, size_t > get_queue_limits() const
Get current queue limits.
ThreadPool(size_t n_threads=std::thread::hardware_concurrency())
Construct a thread pool with specified number of workers.
std::atomic< size_t > tasks_failed_
size_t hard_limit_
Exception threshold.
~ThreadPool()
Destructor - stops all workers and waits for completion.
size_t soft_limit_
Block threshold.
std::condition_variable condition
Notifies workers of new tasks.
std::vector< std::thread > workers
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task for execution and get a future for the result.
void enqueue_bounded_detached(F &&f, Args &&... args)
Submit a task with backpressure, without tracking result.
void stop_workers()
Stop and join all workers.
void worker_loop()
Worker thread main loop.
std::atomic< size_t > active_tasks
size_t pending_tasks() const
Get the number of pending tasks in the queue.
void set_queue_limits(const size_t soft_limit, const size_t hard_limit=std::numeric_limits< size_t >::max())
Set queue limits for bounded enqueue operations.
void shutdown()
Shut down the pool, completing all pending tasks.
std::atomic< size_t > tasks_completed_
ThreadPool(const ThreadPool &)=delete
Deleted copy constructor.
auto try_enqueue(F &&f, Args &&... args) -> std::optional< std::future< std::invoke_result_t< F, Args... > > >
Try to submit a task without blocking or throwing.
ThreadPool & operator=(const ThreadPool &)=delete
Deleted copy assignment operator.
std::queue< std::unique_ptr< TaskBase > > tasks
ExceptionCallback exception_callback_
Exception thrown when cooperative cancellation is observed.
operation_canceled(const std::string &message)
Exception thrown when the task queue exceeds its hard limit.
size_t current_size() const noexcept
Current queue size when exception was thrown.
queue_overflow_error(size_t current_size, size_t hard_limit)
size_t hard_limit() const noexcept
Hard limit that was exceeded.
const long double offset[]
Offset values indexed by symbol string length (bounded by MAX_OFFSET_INDEX)
ThreadPool & select_pool(const ParallelOptions &options)
size_t resolve_chunk_size(const size_t total, const size_t num_threads, const ParallelOptions &options, const size_t min_chunk=1)
void throw_if_canceled(const CancellationToken &token)
void merge_sequential(InputIt1 first1, InputIt1 last1, InputIt2 first2, InputIt2 last2, OutputIt d_first, Compare comp, const CancellationToken &token)
size_t chunk_size(const size_t n, const size_t num_threads, const size_t min_chunk=64)
Calculate optimal chunk size based on data size and thread count.
bool use_sequential_path(const size_t total, const ThreadPool &pool, const ParallelOptions &options) noexcept
Main namespace for Aleph-w library functions.
and
Check uniqueness with explicit hash + equality functors.
T parallel_reduce(ThreadPool &pool, Iterator first, Iterator last, T init, BinaryOp op, size_t chunk_size=0)
Reduce elements in parallel.
OutputIt parallel_transform(ThreadPool &pool, InputIt first, InputIt last, OutputIt d_first, F &&f, size_t chunk_size=0)
Transform elements in parallel and store results.
ThreadPool & default_pool()
Return the default shared thread pool instance.
void message(const char *file, int line, const char *format,...)
Print an informational message with file and line info.
auto pexclusive_scan(ThreadPool &pool, const Container &c, T init, BinaryOp op, size_t chunk_size=0)
Parallel exclusive scan over a container.
void parallel_for_index(ThreadPool &pool, size_t start, size_t end, F &&f, size_t chunk_size=0)
Apply a function to each element in parallel (index-based).
void parallel_for(ThreadPool &pool, Iterator begin, Iterator end, F &&f, size_t chunk_size=0)
Execute a function in parallel over a range.
Divide_Conquer_DP_Result< Cost > divide_and_conquer_partition_dp(const size_t groups, const size_t n, Transition_Cost_Fn transition_cost, const Cost inf=dp_optimization_detail::default_inf< Cost >())
Optimize partition DP using divide-and-conquer optimization.
std::decay_t< typename HeadC::Item_Type > T
auto pmerge(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Compare comp=Compare{}, size_t chunk_size=0)
Parallel merge of two sorted containers.
auto pscan(ThreadPool &pool, const Container &c, BinaryOp op, size_t chunk_size=0)
Parallel inclusive scan over a container.
std::string to_string(const time_t t, const std::string &format)
Format a time_t value into a string using format.
void parallel_invoke(ThreadPool &pool, Fs &&... fs)
Invoke a small set of related callables in parallel.
std::function< void(std::exception_ptr)> ExceptionCallback
Type for exception callback in detached tasks.
static struct argp_option options[]
Shared state for cooperative cancellation.
void register_condition_variable(std::condition_variable *cv)
void notify_observers() noexcept
std::atomic< bool > stop_requested
std::vector< std::condition_variable * > observers
void unregister_condition_variable(std::condition_variable *cv)
std::mutex observer_mutex
Common configuration object for parallel algorithms.
size_t chunk_size
Elements per chunk (0 = auto).
ThreadPool * pool
Executor to use (nullptr = default_pool()).
size_t min_size
Run sequentially below this size.
size_t max_tasks
Maximum number of scheduled tasks (0 = auto).
CancellationToken cancel_token
Cooperative cancellation observer.
Statistics collected by ThreadPool.
double queue_utilization(size_t soft_limit) const noexcept
Queue utilization as percentage (0-100)
size_t current_queue_size
Current pending tasks.
size_t num_workers
Number of worker threads.
size_t tasks_completed
Total tasks completed.
size_t peak_queue_size
Maximum queue size observed.
size_t total_processed() const noexcept
Total tasks processed (completed + failed)
size_t current_active
Currently executing tasks.
size_t tasks_failed
Tasks that threw exceptions (detached)
Type-erased task wrapper.
virtual ~TaskBase()=default
Virtual destructor.
Concrete task implementation with type preservation.