Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
thread_pool.H
Go to the documentation of this file.
1/*
2 Aleph_w
3
4 Data structures & Algorithms
5 version 2.0.0b
6 https://github.com/lrleon/Aleph-w
7
8 This file is part of Aleph-w library
9
10 Copyright (c) 2002-2026 Leandro Rabindranath Leon
11
12 Permission is hereby granted, free of charge, to any person obtaining a copy
13 of this software and associated documentation files (the "Software"), to deal
14 in the Software without restriction, including without limitation the rights
15 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 copies of the Software, and to permit persons to whom the Software is
17 furnished to do so, subject to the following conditions:
18
19 The above copyright notice and this permission notice shall be included in all
20 copies or substantial portions of the Software.
21
22 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 SOFTWARE.
29*/
30
31
109#ifndef ALEPH_THREAD_POOL_H
110#define ALEPH_THREAD_POOL_H
111
112#include <future>
113#include <queue>
114#include <functional>
115#include <algorithm>
116#include <condition_variable>
117#include <atomic>
118#include <vector>
119#include <thread>
120#include <mutex>
121#include <stdexcept>
122#include <type_traits>
123#include <tuple>
124#include <memory>
125#include <limits>
126#include <optional>
127#include <chrono>
128#include <numeric>
129#include <iterator>
130#include <string>
131
132namespace Aleph
133{
140 class operation_canceled : public std::runtime_error
141 {
142 public:
144 : std::runtime_error("operation canceled") {}
145
146 explicit operation_canceled(const std::string & message)
147 : std::runtime_error(message) {}
148 };
149
151 class ThreadPool;
152
156 ThreadPool &default_pool();
157
160 {
161 std::atomic<bool> stop_requested{false};
162 std::mutex observer_mutex;
163 std::vector<std::condition_variable *> observers;
164
165 void register_condition_variable(std::condition_variable * cv)
166 {
167 std::lock_guard<std::mutex> lock(observer_mutex);
168 observers.push_back(cv);
169 }
170
171 void unregister_condition_variable(std::condition_variable * cv)
172 {
173 std::lock_guard<std::mutex> lock(observer_mutex);
174 auto it = std::find(observers.begin(), observers.end(), cv);
175 if (it != observers.end())
176 observers.erase(it);
177 }
178
180 {
181 std::vector<std::condition_variable *> to_notify;
182 {
183 std::lock_guard<std::mutex> lock(observer_mutex);
185 }
186
187 for (auto * cv : to_notify)
188 if (cv != nullptr)
189 cv->notify_all();
190 }
191 };
192
200 {
201 std::shared_ptr<CancellationState> state_;
202
203 explicit CancellationToken(std::shared_ptr<CancellationState> state) noexcept
204 : state_(std::move(state)) {}
205
206 friend class CancellationSource;
207
208 public:
211 {
212 std::shared_ptr<CancellationState> state_;
213 std::condition_variable *cv_ = nullptr;
214
216 {
217 if (state_ != nullptr and cv_ != nullptr)
218 state_->unregister_condition_variable(cv_);
219 cv_ = nullptr;
220 }
221
222 public:
225
230 ConditionVariableRegistration(std::shared_ptr<CancellationState> state,
231 std::condition_variable * cv) noexcept
232 : state_(std::move(state)), cv_(cv) {}
233
238 (const ConditionVariableRegistration &) = delete;
239
241 : state_(std::move(other.state_)), cv_(other.cv_)
242 {
243 other.cv_ = nullptr;
244 }
245
248 {
249 if (this == &other)
250 return *this;
251 unregister();
252 state_ = std::move(other.state_);
253 cv_ = other.cv_;
254 other.cv_ = nullptr;
255 return *this;
256 }
257
262 };
263
265 CancellationToken() = default;
266
269 {
270 return state_ != nullptr and state_->stop_requested.load(std::memory_order_relaxed);
271 }
272
278
281 {
282 if (stop_requested())
283 throw operation_canceled();
284 }
285
287 [[nodiscard]] bool valid() const noexcept { return state_ != nullptr; }
288
295 std::condition_variable & cv) const
296 {
297 if (state_ == nullptr or stop_requested())
298 return {};
299 state_->register_condition_variable(&cv);
300 if (stop_requested())
301 {
302 state_->unregister_condition_variable(&cv);
303 cv.notify_all();
304 return {};
305 }
307 }
308 };
309
312 {
313 std::shared_ptr<CancellationState> state_ =
314 std::make_shared<CancellationState>();
315
316 public:
319
325
328 {
329 state_->stop_requested.store(true, std::memory_order_relaxed);
330 state_->notify_observers();
331 }
332
335
338 {
339 return state_->stop_requested.load(std::memory_order_relaxed);
340 }
341
343 void reset() { state_ = std::make_shared<CancellationState>(); }
344 };
345
353 {
354 size_t min_size = 0;
355 size_t chunk_size = 0;
356 size_t max_tasks = 0;
357 ThreadPool *pool = nullptr;
359 };
360
366 class queue_overflow_error : public std::overflow_error
367 {
368 public:
370 : std::overflow_error("ThreadPool queue overflow: " +
371 std::to_string(current_size) + " >= hard_limit " +
374
377
380
381 private:
384 };
385
391 {
392 size_t tasks_completed = 0;
393 size_t tasks_failed = 0;
395 size_t current_active = 0;
396 size_t num_workers = 0;
397 size_t peak_queue_size = 0;
398
401 {
403 }
404
406 [[nodiscard]] double queue_utilization(size_t soft_limit) const noexcept
407 {
408 if (soft_limit == 0 or soft_limit == std::numeric_limits<size_t>::max())
409 return 0.0;
410 return (100.0 * current_queue_size) / soft_limit;
411 }
412 };
413
415 using ExceptionCallback = std::function<void(std::exception_ptr)>;
416
439 {
441 struct TaskBase
442 {
444 virtual ~TaskBase() = default;
445
446 virtual void execute() = 0;
447 };
448
450 template <typename F>
451 struct Task : TaskBase
452 {
454
455 explicit Task(F && f) : func(std::move(f)) {}
456
457 void execute() override { func(); }
458 };
459
460 std::vector<std::thread> workers;
461 std::queue<std::unique_ptr<TaskBase>> tasks;
462 mutable std::mutex queue_mutex;
463 std::condition_variable condition;
464 std::condition_variable space_available;
465 std::condition_variable idle_condition;
466 std::atomic<bool> stop{false};
467 std::atomic<size_t> active_tasks{0};
468
469 // Queue limits for bounded enqueue
470 size_t soft_limit_ = std::numeric_limits<size_t>::max();
471 size_t hard_limit_ = std::numeric_limits<size_t>::max();
472
473 // Statistics
474 std::atomic<size_t> tasks_completed_{0};
475 std::atomic<size_t> tasks_failed_{0};
476 std::atomic<size_t> peak_queue_size_{0};
477
478 // Exception callback for detached tasks
480
483 {
484 while (true)
485 {
486 std::unique_ptr<TaskBase> task;
487 bool should_notify_space = false; {
488 std::unique_lock<std::mutex> lock(queue_mutex);
489 condition.wait(lock, [this]
490 {
491 return stop or not tasks.empty();
492 });
493
494 if (stop and tasks.empty())
495 return;
496
497 // Check if we should notify waiters after popping
499
500 task = std::move(tasks.front());
501 tasks.pop();
502 ++active_tasks;
503 }
504
505 // Notify any blocked enqueuers that space is available
507 space_available.notify_all();
508
509 task->execute();
510
511 // Update stats and check for idle state
512 --active_tasks;
513
514 // Notify wait_all if we're now idle
515 {
516 std::unique_lock<std::mutex> lock(queue_mutex);
517 if (tasks.empty() and active_tasks == 0)
518 idle_condition.notify_all();
519 }
520 }
521 }
522
524 void start_workers(size_t n)
525 {
526 workers.reserve(n);
527 for (size_t i = 0; i < n; ++i)
528 workers.emplace_back(&ThreadPool::worker_loop, this);
529 }
530
533 { {
534 std::unique_lock<std::mutex> lock(queue_mutex);
535 stop = true;
536 }
537 condition.notify_all();
538
539 for (auto & worker: workers)
540 if (worker.joinable())
541 worker.join();
542
543 workers.clear();
544 }
545
547 template <typename F, typename... Args>
548 static auto make_invocable(F && f, Args &&... args)
549 {
550 // Decay types to store copies/moves, not references
551 return [func = std::forward<F>(f),
552 args_tuple = std::make_tuple(std::forward<Args>(args)...)]() mutable
553 {
554 return std::apply([&func](auto &&... a)
555 {
556 return std::invoke(std::move(func), std::forward<decltype(a)>(a)...);
557 }, std::move(args_tuple));
558 };
559 }
560
561 public:
567 explicit ThreadPool(size_t n_threads = std::thread::hardware_concurrency())
568 {
569 if (n_threads == 0)
570 n_threads = 1; // Ensure at least one worker
571
573 }
574
576 ThreadPool(const ThreadPool &) = delete;
577
579 ThreadPool &operator=(const ThreadPool &) = delete;
580
582 ThreadPool(ThreadPool &&) = delete;
583
586
592 {
593 stop_workers();
594 }
595
639 template <typename F, typename... Args>
640 [[nodiscard]] auto enqueue(F && f, Args &&... args)
641 -> std::future<std::invoke_result_t<F, Args...>>
642 {
643 using return_type = std::invoke_result_t<F, Args...>;
644
645 // Create the invocable that captures function and arguments
646 auto invocable = make_invocable(std::forward<F>(f), std::forward<Args>(args)...);
647
648 // Wrap in packaged_task for future support
649 auto promise = std::make_shared<std::promise<return_type>>();
650 std::future<return_type> result = promise->get_future();
651
652 // Capture stats counter for task completion tracking
654
655 // Create the task that will execute and set the promise
656 auto task_func = [invocable = std::move(invocable), promise, completed_counter]() mutable
657 {
658 try
659 {
660 if constexpr (std::is_void_v<return_type>)
661 {
662 invocable();
663 promise->set_value();
664 }
665 else
666 {
667 promise->set_value(invocable());
668 }
669 ++(*completed_counter);
670 }
671 catch (...)
672 {
673 promise->set_exception(std::current_exception());
674 ++(*completed_counter); // Still counts as processed
675 }
676 }; {
677 std::unique_lock<std::mutex> lock(queue_mutex);
678
679 if (stop)
680 throw std::runtime_error("enqueue on stopped ThreadPool");
681
682 tasks.push(std::make_unique<Task<decltype(task_func)>>(std::move(task_func)));
683
684 // Update peak queue size
685 const size_t current_size = tasks.size();
686 size_t peak = peak_queue_size_.load();
687 while (current_size > peak and
688 not peak_queue_size_.compare_exchange_weak(peak, current_size));
689 }
690
691 condition.notify_one();
692 return result;
693 }
694
721 template <typename F, typename... Args>
722 void enqueue_detached(F && f, Args &&... args)
723 {
724 auto invocable = make_invocable(std::forward<F>(f), std::forward<Args>(args)...);
725
726 // Capture stats and callback
729 ExceptionCallback callback; {
730 std::unique_lock<std::mutex> lock(queue_mutex);
731 callback = exception_callback_;
732 }
733
734 auto task_func = [invocable = std::move(invocable),
736 callback = std::move(callback)]() mutable
737 {
738 try
739 {
740 invocable();
741 ++(*completed_counter);
742 }
743 catch (...)
744 {
745 ++(*failed_counter);
746 // Call exception callback if set
747 if (callback)
748 {
749 try { callback(std::current_exception()); }
750 catch (...) { /* Ignore callback exceptions */ }
751 }
752 }
753 }; {
754 std::unique_lock<std::mutex> lock(queue_mutex);
755
756 if (stop)
757 throw std::runtime_error("enqueue_detached on stopped ThreadPool");
758
759 tasks.push(std::make_unique<Task<decltype(task_func)>>(std::move(task_func)));
760
761 // Update peak queue size
762 const size_t current_size = tasks.size();
763 size_t peak = peak_queue_size_.load();
764 while (current_size > peak and
765 not peak_queue_size_.compare_exchange_weak(peak, current_size));
766 }
767
768 condition.notify_one();
769 }
770
791 template <typename F, typename Container>
793 -> std::vector<std::future<std::invoke_result_t<F, typename Container::value_type>>>
794 {
795 using return_type = std::invoke_result_t<F, typename Container::value_type>;
796 std::vector<std::future<return_type>> results;
797 results.reserve(args_container.size());
798
799 for (const auto & arg: args_container)
800 results.push_back(enqueue(f, arg));
801
802 return results;
803 }
804
826 void set_queue_limits(const size_t soft_limit,
827 const size_t hard_limit = std::numeric_limits<size_t>::max())
828 {
829 std::unique_lock<std::mutex> lock(queue_mutex);
831 hard_limit_ = (hard_limit == std::numeric_limits<size_t>::max() and
832 soft_limit != std::numeric_limits<size_t>::max()) ?
833 soft_limit * 10 // Default hard = 10x soft
834 :
835 hard_limit;
836 }
837
842 [[nodiscard]] std::pair<size_t, size_t> get_queue_limits() const
843 {
844 std::unique_lock<std::mutex> lock(queue_mutex);
845 return {soft_limit_, hard_limit_};
846 }
847
882 template <typename F, typename... Args>
883 [[nodiscard]] auto enqueue_bounded(F && f, Args &&... args)
884 -> std::future<std::invoke_result_t<F, Args...>>
885 {
886 using return_type = std::invoke_result_t<F, Args...>;
887
888 auto invocable = make_invocable(std::forward<F>(f), std::forward<Args>(args)...);
889
890 auto promise = std::make_shared<std::promise<return_type>>();
891 std::future<return_type> result = promise->get_future();
892
893 auto task_func = [invocable = std::move(invocable), promise]() mutable
894 {
895 try
896 {
897 if constexpr (std::is_void_v<return_type>)
898 {
899 invocable();
900 promise->set_value();
901 }
902 else
903 {
904 promise->set_value(invocable());
905 }
906 }
907 catch (...)
908 {
909 promise->set_exception(std::current_exception());
910 }
911 }; {
912 std::unique_lock<std::mutex> lock(queue_mutex);
913
914 // Check hard limit first (throws immediately)
915 if (tasks.size() >= hard_limit_)
917
918 // Block if at soft limit (wait for space)
919 space_available.wait(lock, [this]
920 {
921 return stop or tasks.size() < soft_limit_;
922 });
923
924 if (stop)
925 throw std::runtime_error("enqueue_bounded on stopped ThreadPool");
926
927 // Re-check hard limit after waking up
928 if (tasks.size() >= hard_limit_)
930
931 tasks.push(std::make_unique<Task<decltype(task_func)>>(std::move(task_func)));
932 }
933
934 condition.notify_one();
935 return result;
936 }
937
952 template <typename F, typename... Args>
954 {
955 auto invocable = make_invocable(std::forward<F>(f), std::forward<Args>(args)...);
956
957 auto task_func = [invocable = std::move(invocable)]() mutable
958 {
959 try
960 {
961 invocable();
962 }
963 catch (...)
964 {
965 // Silently ignore exceptions in detached tasks
966 }
967 }; {
968 std::unique_lock<std::mutex> lock(queue_mutex);
969
970 // Check hard limit first
971 if (tasks.size() >= hard_limit_)
973
974 // Block if at soft limit
975 space_available.wait(lock, [this]
976 {
977 return stop or tasks.size() < soft_limit_;
978 });
979
980 if (stop)
981 throw std::runtime_error("enqueue_bounded_detached on stopped ThreadPool");
982
983 // Re-check hard limit after waking up
984 if (tasks.size() >= hard_limit_)
986
987 tasks.push(std::make_unique<Task<decltype(task_func)>>(std::move(task_func)));
988 }
989
990 condition.notify_one();
991 }
992
1027 template <typename F, typename... Args>
1028 [[nodiscard]] auto try_enqueue(F && f, Args &&... args)
1029 -> std::optional<std::future<std::invoke_result_t<F, Args...>>>
1030 { {
1031 std::unique_lock<std::mutex> lock(queue_mutex);
1032
1033 if (stop)
1034 throw std::runtime_error("try_enqueue on stopped ThreadPool");
1035
1036 // Check if queue is at or above soft limit
1037 if (tasks.size() >= soft_limit_)
1038 return std::nullopt;
1039 }
1040
1041 // Queue has space, delegate to regular enqueue
1042 return enqueue(std::forward<F>(f), std::forward<Args>(args)...);
1043 }
1044
1067 template <typename F, typename... Args>
1069 { {
1070 std::unique_lock<std::mutex> lock(queue_mutex);
1071
1072 if (stop)
1073 throw std::runtime_error("try_enqueue_detached on stopped ThreadPool");
1074
1075 if (tasks.size() >= soft_limit_)
1076 return false;
1077 }
1078
1079 enqueue_detached(std::forward<F>(f), std::forward<Args>(args)...);
1080 return true;
1081 }
1082
1087 {
1088 return workers.size();
1089 }
1090
1094 [[nodiscard]] size_t pending_tasks() const
1095 {
1096 std::unique_lock<std::mutex> lock(queue_mutex);
1097 return tasks.size();
1098 }
1099
1104 {
1105 return active_tasks.load();
1106 }
1107
1111 [[nodiscard]] bool is_idle() const
1112 {
1113 std::unique_lock<std::mutex> lock(queue_mutex);
1114 return tasks.empty() and active_tasks == 0;
1115 }
1116
1121 {
1122 return stop.load();
1123 }
1124
1136 { {
1137 std::unique_lock<std::mutex> lock(queue_mutex);
1138 if (stop.exchange(true))
1139 return; // Already stopped
1140 }
1141 condition.notify_all();
1142
1143 for (auto & worker: workers)
1144 if (worker.joinable())
1145 worker.join();
1146
1147 workers.clear();
1148 }
1149
1163 void resize(size_t new_size)
1164 {
1165 if (new_size == 0)
1166 new_size = 1;
1167
1168 if (new_size == workers.size())
1169 return;
1170
1171 if (stop)
1172 throw std::runtime_error("cannot resize a stopped ThreadPool");
1173
1174 // Stop current workers (but don't clear tasks)
1175 {
1176 std::unique_lock<std::mutex> lock(queue_mutex);
1177 stop = true;
1178 }
1179 condition.notify_all();
1180
1181 for (auto & worker: workers)
1182 if (worker.joinable())
1183 worker.join();
1184
1185 workers.clear();
1186
1187 // Restart with new size
1188 stop = false;
1190 }
1191
1199 void wait_all(const std::chrono::milliseconds poll_interval = std::chrono::milliseconds(1))
1200 {
1201 while (not is_idle())
1202 std::this_thread::sleep_for(poll_interval);
1203 }
1204
1214 template <typename Rep, typename Period>
1215 [[nodiscard]] bool wait_all_for(std::chrono::duration<Rep, Period> timeout)
1216 {
1217 std::unique_lock<std::mutex> lock(queue_mutex);
1218 return idle_condition.wait_for(lock, timeout, [this]
1219 {
1220 return tasks.empty() and active_tasks == 0;
1221 });
1222 }
1223
1233 template <typename Clock, typename Duration>
1234 [[nodiscard]] bool wait_all_until(std::chrono::time_point<Clock, Duration> deadline)
1235 {
1236 std::unique_lock<std::mutex> lock(queue_mutex);
1237 return idle_condition.wait_until(lock, deadline, [this]
1238 {
1239 return tasks.empty() and active_tasks == 0;
1240 });
1241 }
1242
1250 {
1251 ThreadPoolStats stats;
1252 stats.tasks_completed = tasks_completed_.load();
1253 stats.tasks_failed = tasks_failed_.load();
1254 stats.current_active = active_tasks.load();
1255 stats.num_workers = workers.size();
1256 stats.peak_queue_size = peak_queue_size_.load(); {
1257 std::unique_lock<std::mutex> lock(queue_mutex);
1258 stats.current_queue_size = tasks.size();
1259 }
1260
1261 return stats;
1262 }
1263
1267 {
1268 tasks_completed_ = 0;
1269 tasks_failed_ = 0;
1270 peak_queue_size_ = 0;
1271 }
1272
1295 {
1296 std::unique_lock<std::mutex> lock(queue_mutex);
1297 exception_callback_ = std::move(callback);
1298 }
1299
1322 template <typename F, typename Container>
1324 -> std::vector<std::future<decltype(std::apply(f, *std::begin(args_list)))>>
1325 {
1326 using ArgsTuple = typename Container::value_type;
1327 using return_type = decltype(std::apply(f, std::declval<ArgsTuple>()));
1328
1329 std::vector<std::future<return_type>> results;
1330 results.reserve(std::distance(std::begin(args_list), std::end(args_list)));
1331
1332 std::vector<std::unique_ptr<TaskBase>> batch_tasks;
1333 batch_tasks.reserve(results.capacity());
1334
1335 // Capture stats counter
1337
1338 for (const auto & args: args_list)
1339 {
1340 auto promise = std::make_shared<std::promise<return_type>>();
1341 results.push_back(promise->get_future());
1342
1343 auto task_func = [func = f, args, promise, completed_counter]() mutable
1344 {
1345 try
1346 {
1347 if constexpr (std::is_void_v<return_type>)
1348 {
1349 std::apply(func, args);
1350 promise->set_value();
1351 }
1352 else
1353 {
1354 promise->set_value(std::apply(func, args));
1355 }
1356 ++(*completed_counter);
1357 }
1358 catch (...)
1359 {
1360 promise->set_exception(std::current_exception());
1361 ++(*completed_counter);
1362 }
1363 };
1364
1365 batch_tasks.push_back(
1366 std::make_unique<Task<decltype(task_func)>>(std::move(task_func)));
1367 } {
1368 std::unique_lock<std::mutex> lock(queue_mutex);
1369
1370 if (stop)
1371 throw std::runtime_error("enqueue_batch on stopped ThreadPool");
1372
1373 for (auto & task: batch_tasks)
1374 tasks.push(std::move(task));
1375
1376 // Update peak queue size
1377 const size_t current_size = tasks.size();
1378 size_t peak = peak_queue_size_.load();
1379 while (current_size > peak and
1380 not peak_queue_size_.compare_exchange_weak(peak, current_size));
1381 }
1382
1383 // Notify all workers for batch
1384 condition.notify_all();
1385
1386 return results;
1387 }
1388 };
1389
1390 // =============================================================================
1391 // Parallel Algorithms
1392 // =============================================================================
1393
1394 namespace parallel_detail
1395 {
1397 {
1398 return options.pool == nullptr ? default_pool() : *options.pool;
1399 }
1400
1401 inline size_t resolve_chunk_size(const size_t total,
1402 const size_t num_threads,
1403 const ParallelOptions & options,
1404 const size_t min_chunk = 1)
1405 {
1406 if (total == 0)
1407 return 1;
1408
1409 size_t chunk_size = options.chunk_size;
1410 if (chunk_size == 0)
1411 chunk_size = std::max(min_chunk,
1412 total / std::max<size_t>(1, num_threads * 4));
1413
1414 if (options.max_tasks > 0)
1415 {
1416 const size_t min_for_cap =
1417 (total + options.max_tasks - 1) / options.max_tasks;
1418 chunk_size = std::max(chunk_size, std::max(min_chunk, min_for_cap));
1419 }
1420
1421 return std::max<size_t>(1, chunk_size);
1422 }
1423
1424 inline bool use_sequential_path(const size_t total,
1425 const ThreadPool & pool,
1426 const ParallelOptions & options) noexcept
1427 {
1428 if (total == 0)
1429 return true;
1430
1431 if (options.cancel_token.stop_requested())
1432 return true;
1433
1434 if (options.max_tasks == 1)
1435 return true;
1436
1437 if (options.min_size > 0 and total < options.min_size)
1438 return true;
1439
1440 return pool.num_threads() <= 1;
1441 }
1442
1443 inline void throw_if_canceled(const CancellationToken & token)
1444 {
1446 }
1447
1448 template <typename InputIt1, typename InputIt2, typename OutputIt, typename Compare>
1451 OutputIt d_first, Compare comp,
1452 const CancellationToken & token)
1453 {
1454 while (first1 != last1 and first2 != last2)
1455 {
1456 throw_if_canceled(token);
1457 if (comp(*first2, *first1))
1458 *d_first++ = *first2++;
1459 else
1460 *d_first++ = *first1++;
1461 }
1462
1463 while (first1 != last1)
1464 {
1465 throw_if_canceled(token);
1466 *d_first++ = *first1++;
1467 }
1468
1469 while (first2 != last2)
1470 {
1471 throw_if_canceled(token);
1472 *d_first++ = *first2++;
1473 }
1474 }
1475 } // namespace parallel_detail
1476
1508 template <typename Iterator, typename F>
1509 void parallel_for(ThreadPool & pool, Iterator begin, Iterator end, F && f,
1510 size_t chunk_size = 0)
1511 {
1513 options.pool = &pool;
1514 options.chunk_size = chunk_size;
1515 parallel_for(begin, end, std::forward<F>(f), options);
1516 }
1517
1519 template <typename Iterator, typename F>
1520 void parallel_for(Iterator begin, Iterator end, F && f,
1521 const ParallelOptions & options = {})
1522 {
1523 ThreadPool & pool = parallel_detail::select_pool(options);
1524 const size_t total = std::distance(begin, end);
1525 if (total == 0)
1526 return;
1527
1528 const size_t chunk_size =
1530
1532 {
1534 if constexpr (std::is_invocable_v<F, Iterator, Iterator>)
1535 {
1536 f(begin, end);
1537 }
1538 else
1539 {
1540 for (Iterator it = begin; it != end; ++it)
1541 {
1543 f(*it);
1544 }
1545 }
1547 return;
1548 }
1549
1550 std::vector<std::future<void>> futures;
1551 futures.reserve((total + chunk_size - 1) / chunk_size);
1552
1553 // Check if F accepts (Iterator, Iterator) or (T&)
1554 if constexpr (std::is_invocable_v<F, Iterator, Iterator>)
1555 {
1556 // Chunk-based function
1557 for (Iterator chunk_begin = begin; chunk_begin < end;)
1558 {
1559 Iterator chunk_end = chunk_begin;
1560 std::advance(chunk_end, std::min(chunk_size,
1561 static_cast<size_t>(std::distance(chunk_begin, end))));
1562
1563 futures.push_back(pool.enqueue([f, chunk_begin, chunk_end, token = options.cancel_token]()
1564 {
1565 parallel_detail::throw_if_canceled(token);
1566 f(chunk_begin, chunk_end);
1567 }));
1568
1570 }
1571 }
1572 else
1573 {
1574 // Per-element function
1575 for (Iterator chunk_begin = begin; chunk_begin < end;)
1576 {
1577 Iterator chunk_end = chunk_begin;
1578 std::advance(chunk_end, std::min(chunk_size,
1579 static_cast<size_t>(std::distance(chunk_begin, end))));
1580
1581 futures.push_back(pool.enqueue([f, chunk_begin, chunk_end, token = options.cancel_token]()
1582 {
1583 for (Iterator it = chunk_begin; it != chunk_end; ++it)
1584 {
1585 parallel_detail::throw_if_canceled(token);
1586 f(*it);
1587 }
1588 }));
1589
1591 }
1592 }
1593
1594 // Wait for all chunks to complete
1595 for (auto & fut: futures)
1596 fut.get();
1597
1599 }
1600
1630 template <typename InputIt, typename OutputIt, typename F>
1631 OutputIt parallel_transform(ThreadPool & pool, InputIt first, InputIt last,
1632 OutputIt d_first, F && f, size_t chunk_size = 0)
1633 {
1635 options.pool = &pool;
1636 options.chunk_size = chunk_size;
1637 return parallel_transform(first, last, d_first, std::forward<F>(f), options);
1638 }
1639
1641 template <typename InputIt, typename OutputIt, typename F>
1642 OutputIt parallel_transform(InputIt first, InputIt last, OutputIt d_first,
1643 F && f, const ParallelOptions & options = {})
1644 {
1645 ThreadPool & pool = parallel_detail::select_pool(options);
1646 const size_t total = std::distance(first, last);
1647 if (total == 0)
1648 return d_first;
1649
1650 const size_t chunk_size =
1651 parallel_detail::resolve_chunk_size(total, pool.num_threads(), options);
1652
1653 if (parallel_detail::use_sequential_path(total, pool, options))
1654 {
1655 auto in = first;
1656 auto out = d_first;
1657 while (in != last)
1658 {
1659 parallel_detail::throw_if_canceled(options.cancel_token);
1660 *out = f(*in);
1661 ++in;
1662 ++out;
1663 }
1664 parallel_detail::throw_if_canceled(options.cancel_token);
1665 return d_first + total;
1666 }
1667
1668 std::vector<std::future<void>> futures;
1669 futures.reserve((total + chunk_size - 1) / chunk_size);
1670
1671 InputIt chunk_in = first;
1672 OutputIt chunk_out = d_first;
1673
1674 while (chunk_in < last)
1675 {
1676 size_t chunk_len = std::min(chunk_size,
1677 static_cast<size_t>(std::distance(chunk_in, last)));
1678 InputIt chunk_in_end = chunk_in;
1679 std::advance(chunk_in_end, chunk_len);
1680
1681 futures.push_back(pool.enqueue([f, chunk_in, chunk_in_end, chunk_out,
1682 token = options.cancel_token]()
1683 {
1684 InputIt in = chunk_in;
1685 OutputIt out = chunk_out;
1686 while (in != chunk_in_end)
1687 {
1688 parallel_detail::throw_if_canceled(token);
1689 *out = f(*in);
1690 ++in;
1691 ++out;
1692 }
1693 }));
1694
1695 chunk_in = chunk_in_end;
1696 std::advance(chunk_out, chunk_len);
1697 }
1698
1699 for (auto & fut: futures)
1700 fut.get();
1701
1702 parallel_detail::throw_if_canceled(options.cancel_token);
1703
1704 return d_first + total;
1705 }
1706
1741 template <typename Iterator, typename T, typename BinaryOp>
1742 T parallel_reduce(ThreadPool & pool, Iterator first, Iterator last,
1743 T init, BinaryOp op, size_t chunk_size = 0)
1744 {
1746 options.pool = &pool;
1747 options.chunk_size = chunk_size;
1748 return parallel_reduce(first, last, init, op, options);
1749 }
1750
1752 template <typename Iterator, typename T, typename BinaryOp>
1753 T parallel_reduce(Iterator first, Iterator last, T init, BinaryOp op,
1754 const ParallelOptions & options = {})
1755 {
1756 ThreadPool & pool = parallel_detail::select_pool(options);
1757 const size_t total = std::distance(first, last);
1758 if (total == 0)
1759 return init;
1760
1761 const size_t chunk_size =
1762 parallel_detail::resolve_chunk_size(total, pool.num_threads(), options);
1763
1764 if (parallel_detail::use_sequential_path(total, pool, options))
1765 {
1766 T result = init;
1767 for (Iterator it = first; it != last; ++it)
1768 {
1769 parallel_detail::throw_if_canceled(options.cancel_token);
1770 result = op(result, *it);
1771 }
1772 parallel_detail::throw_if_canceled(options.cancel_token);
1773 return result;
1774 }
1775
1776 std::vector<std::future<T>> futures;
1777 futures.reserve((total + chunk_size - 1) / chunk_size);
1778
1779 for (Iterator chunk_begin = first; chunk_begin < last;)
1780 {
1781 Iterator chunk_end = chunk_begin;
1782 std::advance(chunk_end, std::min(chunk_size,
1783 static_cast<size_t>(std::distance(chunk_begin, last))));
1784
1785 futures.push_back(pool.enqueue([op, chunk_begin, chunk_end,
1786 token = options.cancel_token]()
1787 {
1788 parallel_detail::throw_if_canceled(token);
1789 T local_result = *chunk_begin;
1790 for (Iterator it = chunk_begin + 1; it != chunk_end; ++it)
1791 {
1792 parallel_detail::throw_if_canceled(token);
1793 local_result = op(local_result, *it);
1794 }
1795 return local_result;
1796 }));
1797
1798 chunk_begin = chunk_end;
1799 }
1800
1801 // Reduce partial results
1802 T result = init;
1803 for (auto & fut: futures)
1804 result = op(result, fut.get());
1805
1806 parallel_detail::throw_if_canceled(options.cancel_token);
1807
1808 return result;
1809 }
1810
1830 template <typename F>
1831 void parallel_for_index(ThreadPool & pool, size_t start, size_t end, F && f,
1832 size_t chunk_size = 0)
1833 {
1835 options.pool = &pool;
1836 options.chunk_size = chunk_size;
1837 parallel_for_index(start, end, std::forward<F>(f), options);
1838 }
1839
1841 template <typename F>
1842 void parallel_for_index(size_t start, size_t end, F && f,
1843 const ParallelOptions & options = {})
1844 {
1845 ThreadPool & pool = parallel_detail::select_pool(options);
1846 if (start >= end)
1847 return;
1848
1849 const size_t total = end - start;
1850
1851 const size_t chunk_size =
1852 parallel_detail::resolve_chunk_size(total, pool.num_threads(), options);
1853
1854 if (parallel_detail::use_sequential_path(total, pool, options))
1855 {
1856 for (size_t i = start; i < end; ++i)
1857 {
1858 parallel_detail::throw_if_canceled(options.cancel_token);
1859 f(i);
1860 }
1861 parallel_detail::throw_if_canceled(options.cancel_token);
1862 return;
1863 }
1864
1865 std::vector<std::future<void>> futures;
1866 futures.reserve((total + chunk_size - 1) / chunk_size);
1867
1868 for (size_t chunk_start = start; chunk_start < end;)
1869 {
1870 size_t chunk_end = std::min(chunk_start + chunk_size, end);
1871
1872 futures.push_back(pool.enqueue([f, chunk_start, chunk_end,
1873 token = options.cancel_token]()
1874 {
1875 for (size_t i = chunk_start; i < chunk_end; ++i)
1876 {
1877 parallel_detail::throw_if_canceled(token);
1878 f(i);
1879 }
1880 }));
1881
1882 chunk_start = chunk_end;
1883 }
1884
1885 for (auto & fut: futures)
1886 fut.get();
1887
1888 parallel_detail::throw_if_canceled(options.cancel_token);
1889 }
1890
1897 template <typename... Fs>
1898 void parallel_invoke(ThreadPool & pool, Fs &&... fs)
1899 {
1901 options.pool = &pool;
1902 parallel_invoke(options, std::forward<Fs>(fs)...);
1903 }
1904
1906 template <typename... Fs>
1907 void parallel_invoke(const ParallelOptions & options, Fs &&... fs)
1908 {
1909 constexpr size_t task_count = sizeof...(Fs);
1910 if constexpr (task_count == 0)
1911 {
1912 return;
1913 }
1914 else
1915 {
1916 ThreadPool & pool = parallel_detail::select_pool(options);
1917
1918 if (parallel_detail::use_sequential_path(task_count, pool, options))
1919 {
1920 auto invoke_one = [&](auto && fn)
1921 {
1922 parallel_detail::throw_if_canceled(options.cancel_token);
1923 if constexpr (std::is_void_v<std::invoke_result_t<decltype(fn)>>)
1924 std::invoke(std::forward<decltype(fn)>(fn));
1925 else
1926 static_cast<void>(std::invoke(std::forward<decltype(fn)>(fn)));
1927 };
1928
1929 (invoke_one(std::forward<Fs>(fs)), ...);
1930 parallel_detail::throw_if_canceled(options.cancel_token);
1931 return;
1932 }
1933
1934 std::vector<std::future<void>> futures;
1935 futures.reserve(task_count);
1936
1937 auto launch_one = [&](auto && fn)
1938 {
1939 futures.push_back(pool.enqueue(
1940 [func = std::forward<decltype(fn)>(fn),
1941 token = options.cancel_token]() mutable
1942 {
1943 parallel_detail::throw_if_canceled(token);
1944 if constexpr (std::is_void_v<std::invoke_result_t<decltype(func) &>>)
1945 std::invoke(func);
1946 else
1947 static_cast<void>(std::invoke(func));
1948 }));
1949 };
1950
1951 (launch_one(std::forward<Fs>(fs)), ...);
1952
1953 std::exception_ptr first_exception;
1954 for (auto & fut: futures)
1955 {
1956 try
1957 {
1958 fut.get();
1959 }
1960 catch (...)
1961 {
1962 if (first_exception == nullptr)
1963 first_exception = std::current_exception();
1964 }
1965 }
1966
1967 if (first_exception != nullptr)
1968 std::rethrow_exception(first_exception);
1969
1970 parallel_detail::throw_if_canceled(options.cancel_token);
1971 }
1972 }
1973
1980 template <typename InputIt, typename OutputIt, typename BinaryOp>
1981 OutputIt pscan(ThreadPool & pool, InputIt first, InputIt last,
1982 OutputIt d_first, BinaryOp op, size_t chunk_size = 0)
1983 {
1985 options.pool = &pool;
1986 options.chunk_size = chunk_size;
1987 return pscan(first, last, d_first, op, options);
1988 }
1989
1991 template <typename InputIt, typename OutputIt, typename BinaryOp>
1992 OutputIt pscan(InputIt first, InputIt last, OutputIt d_first,
1993 BinaryOp op, const ParallelOptions & options = {})
1994 {
1995 using Value = std::decay_t<std::invoke_result_t<BinaryOp, decltype(*first), decltype(*first)>>;
1996
1997 ThreadPool & pool = parallel_detail::select_pool(options);
1998 const size_t total = std::distance(first, last);
1999 if (total == 0)
2000 return d_first;
2001
2002 const size_t chunk_size =
2003 parallel_detail::resolve_chunk_size(total, pool.num_threads(), options);
2004
2005 if (parallel_detail::use_sequential_path(total, pool, options))
2006 {
2007 parallel_detail::throw_if_canceled(options.cancel_token);
2008
2009 InputIt in = first;
2010 OutputIt out = d_first;
2011 Value running = *in;
2012 *out = running;
2013 ++in;
2014 ++out;
2015
2016 while (in != last)
2017 {
2018 parallel_detail::throw_if_canceled(options.cancel_token);
2019 running = op(running, *in);
2020 *out = running;
2021 ++in;
2022 ++out;
2023 }
2024
2025 parallel_detail::throw_if_canceled(options.cancel_token);
2026 return d_first + total;
2027 }
2028
2029 const size_t num_chunks = (total + chunk_size - 1) / chunk_size;
2030 std::vector<std::optional<Value>> partials(total);
2031 std::vector<std::optional<Value>> chunk_totals(num_chunks);
2032 std::vector<std::future<void>> futures;
2033 futures.reserve(num_chunks);
2034
2035 for (size_t chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx)
2036 {
2037 const size_t offset = chunk_idx * chunk_size;
2038 const size_t chunk_end = std::min(offset + chunk_size, total);
2039 InputIt chunk_first = first + offset;
2040 InputIt chunk_last = first + chunk_end;
2041 futures.push_back(pool.enqueue(
2042 [chunk_first, chunk_last, &partials, offset,
2043 &chunk_totals, chunk_idx, op,
2044 token = options.cancel_token]() mutable
2045 {
2046 parallel_detail::throw_if_canceled(token);
2047
2048 InputIt in = chunk_first;
2049 Value running = *in;
2050 partials[offset].emplace(running);
2051 ++in;
2052 size_t out_idx = offset + 1;
2053
2054 while (in != chunk_last)
2055 {
2056 parallel_detail::throw_if_canceled(token);
2057 running = op(running, *in);
2058 partials[out_idx++].emplace(running);
2059 ++in;
2060 }
2061
2062 chunk_totals[chunk_idx].emplace(running);
2063 }));
2064 }
2065
2066 for (auto & fut: futures)
2067 fut.get();
2068
2069 parallel_detail::throw_if_canceled(options.cancel_token);
2070
2071 if (num_chunks > 1)
2072 {
2073 std::vector<std::optional<Value>> carries(num_chunks);
2074 for (size_t i = 1; i < num_chunks; ++i)
2075 {
2076 parallel_detail::throw_if_canceled(options.cancel_token);
2077 carries[i].emplace((i == 1)
2078 ? *chunk_totals[0]
2079 : op(*carries[i - 1], *chunk_totals[i - 1]));
2080 }
2081
2082 std::vector<std::future<void>> adjust_futures;
2083 adjust_futures.reserve(num_chunks - 1);
2084
2085 for (size_t chunk_idx = 1; chunk_idx < num_chunks; ++chunk_idx)
2086 {
2087 const size_t offset = chunk_idx * chunk_size;
2088 const size_t chunk_end = std::min(offset + chunk_size, total);
2089 Value carry = *carries[chunk_idx];
2090
2091 adjust_futures.push_back(pool.enqueue(
2092 [&partials, chunk_end, offset, carry, op,
2093 token = options.cancel_token]() mutable
2094 {
2095 for (size_t i = offset; i < chunk_end; ++i)
2096 {
2097 parallel_detail::throw_if_canceled(token);
2098 partials[i] = op(carry, *partials[i]);
2099 }
2100 }));
2101 }
2102
2103 for (auto & fut: adjust_futures)
2104 fut.get();
2105 }
2106
2107 OutputIt out = d_first;
2108 for (auto & partial: partials)
2109 {
2110 *out = std::move(*partial);
2111 ++out;
2112 }
2113
2114 parallel_detail::throw_if_canceled(options.cancel_token);
2115 return d_first + total;
2116 }
2117
2123 template <typename InputIt, typename OutputIt, typename T, typename BinaryOp>
2124 OutputIt pexclusive_scan(ThreadPool & pool, InputIt first, InputIt last,
2125 OutputIt d_first, T init, BinaryOp op,
2126 size_t chunk_size = 0)
2127 {
2129 options.pool = &pool;
2130 options.chunk_size = chunk_size;
2131 return pexclusive_scan(first, last, d_first, init, op, options);
2132 }
2133
2135 template <typename InputIt, typename OutputIt, typename T, typename BinaryOp>
2136 OutputIt pexclusive_scan(InputIt first, InputIt last, OutputIt d_first,
2137 T init, BinaryOp op, const ParallelOptions & options = {})
2138 {
2139 using Value = std::decay_t<T>;
2140
2141 ThreadPool & pool = parallel_detail::select_pool(options);
2142 const size_t total = std::distance(first, last);
2143 if (total == 0)
2144 return d_first;
2145
2146 if (parallel_detail::use_sequential_path(total, pool, options))
2147 {
2148 T running = init;
2149 for (size_t i = 0; i < total; ++i)
2150 {
2151 parallel_detail::throw_if_canceled(options.cancel_token);
2152 d_first[i] = running;
2153 running = op(running, first[i]);
2154 }
2155 parallel_detail::throw_if_canceled(options.cancel_token);
2156 return d_first + total;
2157 }
2158
2159 std::vector<std::optional<Value>> inclusive(total);
2160 pscan(first, last, inclusive.begin(), op, options);
2161
2162 parallel_for_index(0, total,
2163 [&](size_t i)
2164 {
2165 if (i == 0)
2166 d_first[i] = init;
2167 else
2168 d_first[i] = op(init, *inclusive[i - 1]);
2169 },
2170 options);
2171
2172 parallel_detail::throw_if_canceled(options.cancel_token);
2173 return d_first + total;
2174 }
2175
2182 template <typename InputIt1, typename InputIt2, typename OutputIt,
2183 typename Compare = std::less<>>
2184 OutputIt pmerge(ThreadPool & pool,
2185 InputIt1 first1, InputIt1 last1,
2186 InputIt2 first2, InputIt2 last2,
2187 OutputIt d_first, Compare comp = Compare{},
2188 size_t chunk_size = 0)
2189 {
2190 ParallelOptions options;
2191 options.pool = &pool;
2192 options.chunk_size = chunk_size;
2193 return pmerge(first1, last1, first2, last2, d_first, std::move(comp), options);
2194 }
2195
2197 template <typename InputIt1, typename InputIt2, typename OutputIt,
2198 typename Compare = std::less<>>
2199 OutputIt pmerge(InputIt1 first1, InputIt1 last1,
2200 InputIt2 first2, InputIt2 last2,
2201 OutputIt d_first, Compare comp = Compare{},
2202 const ParallelOptions & options = {})
2203 {
2204 ThreadPool & pool = parallel_detail::select_pool(options);
2205 const size_t n1 = std::distance(first1, last1);
2206 const size_t n2 = std::distance(first2, last2);
2207 const size_t total = n1 + n2;
2208
2209 if (total == 0)
2210 return d_first;
2211
2212 if (parallel_detail::use_sequential_path(total, pool, options) or n1 == 0 or n2 == 0)
2213 {
2214 parallel_detail::merge_sequential(first1, last1, first2, last2, d_first,
2215 comp, options.cancel_token);
2216 parallel_detail::throw_if_canceled(options.cancel_token);
2217 return d_first + total;
2218 }
2219
2220 const size_t chunk_hint =
2221 parallel_detail::resolve_chunk_size(total, pool.num_threads(), options);
2222 const size_t desired_tasks = std::max<size_t>(1, (total + chunk_hint - 1) / chunk_hint);
2223
2224 std::vector<std::future<void>> futures;
2225
2226 if (n1 >= n2)
2227 {
2228 const size_t task_count = std::max<size_t>(1, std::min(desired_tasks, n1));
2229 const size_t base_chunk = (n1 + task_count - 1) / task_count;
2230 futures.reserve(task_count);
2231
2232 size_t left_offset = 0;
2233 size_t right_offset = 0;
2234 for (size_t task = 0; task < task_count; ++task)
2235 {
2236 parallel_detail::throw_if_canceled(options.cancel_token);
2237 const size_t next_left = std::min(left_offset + base_chunk, n1);
2238 const size_t next_right =
2239 (next_left < n1)
2240 ? static_cast<size_t>(
2241 std::lower_bound(first2 + right_offset, last2,
2242 first1[next_left], comp) - first2)
2243 : n2;
2244
2245 futures.push_back(pool.enqueue(
2246 [left_begin = first1 + left_offset,
2247 left_end = first1 + next_left,
2248 right_begin = first2 + right_offset,
2249 right_end = first2 + next_right,
2250 out = d_first + left_offset + right_offset,
2251 comp, token = options.cancel_token]() mutable
2252 {
2253 parallel_detail::merge_sequential(left_begin, left_end,
2254 right_begin, right_end,
2255 out, comp, token);
2256 }));
2257
2258 left_offset = next_left;
2259 right_offset = next_right;
2260 }
2261 }
2262 else
2263 {
2264 const size_t task_count = std::max<size_t>(1, std::min(desired_tasks, n2));
2265 const size_t base_chunk = (n2 + task_count - 1) / task_count;
2266 futures.reserve(task_count);
2267
2268 size_t left_offset = 0;
2269 size_t right_offset = 0;
2270 for (size_t task = 0; task < task_count; ++task)
2271 {
2272 parallel_detail::throw_if_canceled(options.cancel_token);
2273 const size_t next_right = std::min(right_offset + base_chunk, n2);
2274 const size_t next_left =
2275 (next_right < n2)
2276 ? static_cast<size_t>(
2277 std::upper_bound(first1 + left_offset, last1,
2278 first2[next_right], comp) - first1)
2279 : n1;
2280
2281 futures.push_back(pool.enqueue(
2282 [left_begin = first1 + left_offset,
2283 left_end = first1 + next_left,
2284 right_begin = first2 + right_offset,
2285 right_end = first2 + next_right,
2286 out = d_first + left_offset + right_offset,
2287 comp, token = options.cancel_token]() mutable
2288 {
2289 parallel_detail::merge_sequential(left_begin, left_end,
2290 right_begin, right_end,
2291 out, comp, token);
2292 }));
2293
2294 left_offset = next_left;
2295 right_offset = next_right;
2296 }
2297 }
2298
2299 for (auto & fut: futures)
2300 fut.get();
2301
2302 parallel_detail::throw_if_canceled(options.cancel_token);
2303 return d_first + total;
2304 }
2305
2320 {
2321 static ThreadPool pool;
2322 return pool;
2323 }
2324
2332 {
2333 ThreadPool *pool_ = nullptr;
2334 std::vector<std::future<void>> tasks_;
2335
2336 template <typename F, typename... Args>
2337 static auto make_invocable(F && f, Args &&... args)
2338 {
2339 return [func = std::forward<F>(f),
2340 args_tuple = std::make_tuple(std::forward<Args>(args)...)]() mutable
2341 {
2342 return std::apply([&func](auto &&... a)
2343 {
2344 return std::invoke(std::move(func), std::forward<decltype(a)>(a)...);
2345 }, std::move(args_tuple));
2346 };
2347 }
2348
2349 void drain(const bool propagate)
2350 {
2351 std::exception_ptr first_exception;
2352 for (auto & task: tasks_)
2353 {
2354 try
2355 {
2356 task.get();
2357 }
2358 catch (...)
2359 {
2360 if (first_exception == nullptr)
2361 first_exception = std::current_exception();
2362 }
2363 }
2364 tasks_.clear();
2365 if (propagate and first_exception != nullptr)
2366 std::rethrow_exception(first_exception);
2367 }
2368
2369 public:
2370 explicit TaskGroup(ThreadPool & pool = default_pool()) noexcept
2371 : pool_(&pool) {}
2372
2374 TaskGroup(const TaskGroup &) = delete;
2375
2377 TaskGroup &operator=(const TaskGroup &) = delete;
2378
2380 TaskGroup(TaskGroup &&) = delete;
2381
2384
2385 ~TaskGroup() { drain(false); }
2386
2388 template <typename F, typename... Args>
2389 void launch(F && f, Args &&... args)
2390 {
2391 auto invocable = make_invocable(std::forward<F>(f), std::forward<Args>(args)...);
2392 tasks_.push_back(pool_->enqueue([invocable = std::move(invocable)]() mutable
2393 {
2394 if constexpr (std::is_void_v<std::invoke_result_t<decltype(invocable) &>>)
2395 invocable();
2396 else
2397 static_cast<void>(invocable());
2398 }));
2399 }
2400
2402 template <typename F, typename... Args>
2403 void run(F && f, Args &&... args)
2404 {
2405 launch(std::forward<F>(f), std::forward<Args>(args)...);
2406 }
2407
2409 void wait() { drain(true); }
2410
2412 [[nodiscard]] size_t size() const noexcept { return tasks_.size(); }
2413
2415 [[nodiscard]] bool is_empty() const noexcept { return tasks_.empty(); }
2416 };
2417} // end namespace Aleph
2418
2419#endif // ALEPH_THREAD_POOL_H
Cooperative cancellation source paired with CancellationToken.
CancellationToken token() const noexcept
Return a token observing this source.
bool stop_requested() const noexcept
Return true if cancellation has already been requested.
std::shared_ptr< CancellationState > state_
void request_cancel() noexcept
Request cancellation for all derived tokens.
CancellationSource()=default
Default constructor.
void reset()
Reset the source so future tokens observe a fresh state.
void cancel() noexcept
Synonym for request_cancel().
RAII guard for condition variable cancellation registration.
std::shared_ptr< CancellationState > state_
ConditionVariableRegistration(std::shared_ptr< CancellationState > state, std::condition_variable *cv) noexcept
Construct with shared state and condition variable.
ConditionVariableRegistration()=default
Default constructor.
ConditionVariableRegistration(ConditionVariableRegistration &&other) noexcept
ConditionVariableRegistration(const ConditionVariableRegistration &)=delete
Deleted copy constructor.
Read-only cooperative cancellation token.
ConditionVariableRegistration notify_on_cancel(std::condition_variable &cv) const
Request notification of a condition variable on cancellation.
CancellationToken(std::shared_ptr< CancellationState > state) noexcept
bool stop_requested() const noexcept
Return true if cancellation has been requested.
std::shared_ptr< CancellationState > state_
CancellationToken()=default
Default constructor.
bool valid() const noexcept
Return true if the token is connected to a source.
bool is_cancellation_requested() const noexcept
Synonym for stop_requested().
void throw_if_cancellation_requested() const
Throw operation_canceled if cancellation was requested.
Minimal structured-concurrency helper over ThreadPool futures.
bool is_empty() const noexcept
Return true if the group has no pending tasks.
TaskGroup & operator=(const TaskGroup &)=delete
Deleted copy assignment operator.
void run(F &&f, Args &&... args)
Compatibility wrapper for launch().
std::vector< std::future< void > > tasks_
TaskGroup(ThreadPool &pool=default_pool()) noexcept
void wait()
Wait for all launched tasks and rethrow the first exception.
size_t size() const noexcept
Return the number of tasks currently tracked by the group.
TaskGroup(TaskGroup &&)=delete
Deleted move constructor.
void drain(const bool propagate)
TaskGroup & operator=(TaskGroup &&)=delete
Deleted move assignment operator.
static auto make_invocable(F &&f, Args &&... args)
TaskGroup(const TaskGroup &)=delete
Deleted copy constructor.
void launch(F &&f, Args &&... args)
Launch a task that belongs to this group.
A reusable thread pool for efficient parallel task execution.
bool try_enqueue_detached(F &&f, Args &&... args)
Try to submit a detached task without blocking or throwing.
size_t num_threads() const noexcept
Get the number of worker threads.
std::mutex queue_mutex
bool wait_all_for(std::chrono::duration< Rep, Period > timeout)
Wait until all current tasks complete, with timeout.
void enqueue_detached(F &&f, Args &&... args)
Submit a task without tracking the result (fire-and-forget).
void set_exception_callback(ExceptionCallback callback)
Set callback for exceptions in detached tasks.
auto enqueue_batch(F &&f, const Container &args_list) -> std::vector< std::future< decltype(std::apply(f, *std::begin(args_list)))> >
Submit multiple tasks with a single notification.
size_t running_tasks() const noexcept
Get the number of tasks currently being executed.
bool is_idle() const
Check if the pool is idle (no pending or running tasks).
auto enqueue_bounded(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task with backpressure and memory protection.
ThreadPool(ThreadPool &&)=delete
Deleted move constructor.
static auto make_invocable(F &&f, Args &&... args)
Helper to create a callable from function + arguments using std::invoke.
std::condition_variable idle_condition
Notifies wait_all() of idle state.
auto enqueue_bulk(F &&f, const Container &args_container) -> std::vector< std::future< std::invoke_result_t< F, typename Container::value_type > > >
Submit multiple tasks and collect all futures.
void resize(size_t new_size)
Resize the pool to a different number of workers.
ThreadPoolStats get_stats() const
Get current pool statistics.
void reset_stats()
Reset statistics counters to zero.
bool wait_all_until(std::chrono::time_point< Clock, Duration > deadline)
Wait until all current tasks complete, with deadline.
std::condition_variable space_available
Notifies enqueuers of queue space.
void start_workers(size_t n)
Start n worker threads.
std::atomic< size_t > peak_queue_size_
ThreadPool & operator=(ThreadPool &&)=delete
Deleted move assignment operator.
bool is_stopped() const noexcept
Check if the pool has been shut down.
void wait_all(const std::chrono::milliseconds poll_interval=std::chrono::milliseconds(1))
Wait until all current tasks complete.
std::pair< size_t, size_t > get_queue_limits() const
Get current queue limits.
ThreadPool(size_t n_threads=std::thread::hardware_concurrency())
Construct a thread pool with specified number of workers.
std::atomic< size_t > tasks_failed_
size_t hard_limit_
Exception threshold.
~ThreadPool()
Destructor - stops all workers and waits for completion.
size_t soft_limit_
Block threshold.
std::condition_variable condition
Notifies workers of new tasks.
std::vector< std::thread > workers
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task for execution and get a future for the result.
void enqueue_bounded_detached(F &&f, Args &&... args)
Submit a task with backpressure, without tracking result.
void stop_workers()
Stop and join all workers.
std::atomic< bool > stop
void worker_loop()
Worker thread main loop.
std::atomic< size_t > active_tasks
size_t pending_tasks() const
Get the number of pending tasks in the queue.
void set_queue_limits(const size_t soft_limit, const size_t hard_limit=std::numeric_limits< size_t >::max())
Set queue limits for bounded enqueue operations.
void shutdown()
Shut down the pool, completing all pending tasks.
std::atomic< size_t > tasks_completed_
ThreadPool(const ThreadPool &)=delete
Deleted copy constructor.
auto try_enqueue(F &&f, Args &&... args) -> std::optional< std::future< std::invoke_result_t< F, Args... > > >
Try to submit a task without blocking or throwing.
ThreadPool & operator=(const ThreadPool &)=delete
Deleted copy assignment operator.
std::queue< std::unique_ptr< TaskBase > > tasks
ExceptionCallback exception_callback_
Exception thrown when cooperative cancellation is observed.
operation_canceled(const std::string &message)
Exception thrown when the task queue exceeds its hard limit.
size_t current_size() const noexcept
Current queue size when exception was thrown.
queue_overflow_error(size_t current_size, size_t hard_limit)
size_t hard_limit() const noexcept
Hard limit that was exceeded.
const long double offset[]
Offset values indexed by symbol string length (bounded by MAX_OFFSET_INDEX)
ThreadPool & select_pool(const ParallelOptions &options)
size_t resolve_chunk_size(const size_t total, const size_t num_threads, const ParallelOptions &options, const size_t min_chunk=1)
void throw_if_canceled(const CancellationToken &token)
void merge_sequential(InputIt1 first1, InputIt1 last1, InputIt2 first2, InputIt2 last2, OutputIt d_first, Compare comp, const CancellationToken &token)
size_t chunk_size(const size_t n, const size_t num_threads, const size_t min_chunk=64)
Calculate optimal chunk size based on data size and thread count.
bool use_sequential_path(const size_t total, const ThreadPool &pool, const ParallelOptions &options) noexcept
Main namespace for Aleph-w library functions.
Definition ah-arena.H:89
and
Check uniqueness with explicit hash + equality functors.
T parallel_reduce(ThreadPool &pool, Iterator first, Iterator last, T init, BinaryOp op, size_t chunk_size=0)
Reduce elements in parallel.
OutputIt parallel_transform(ThreadPool &pool, InputIt first, InputIt last, OutputIt d_first, F &&f, size_t chunk_size=0)
Transform elements in parallel and store results.
ThreadPool & default_pool()
Return the default shared thread pool instance.
void message(const char *file, int line, const char *format,...)
Print an informational message with file and line info.
Definition ahDefs.C:100
auto pexclusive_scan(ThreadPool &pool, const Container &c, T init, BinaryOp op, size_t chunk_size=0)
Parallel exclusive scan over a container.
void parallel_for_index(ThreadPool &pool, size_t start, size_t end, F &&f, size_t chunk_size=0)
Apply a function to each element in parallel (index-based).
void parallel_for(ThreadPool &pool, Iterator begin, Iterator end, F &&f, size_t chunk_size=0)
Execute a function in parallel over a range.
Divide_Conquer_DP_Result< Cost > divide_and_conquer_partition_dp(const size_t groups, const size_t n, Transition_Cost_Fn transition_cost, const Cost inf=dp_optimization_detail::default_inf< Cost >())
Optimize partition DP using divide-and-conquer optimization.
std::decay_t< typename HeadC::Item_Type > T
Definition ah-zip.H:105
auto pmerge(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Compare comp=Compare{}, size_t chunk_size=0)
Parallel merge of two sorted containers.
auto pscan(ThreadPool &pool, const Container &c, BinaryOp op, size_t chunk_size=0)
Parallel inclusive scan over a container.
std::string to_string(const time_t t, const std::string &format)
Format a time_t value into a string using format.
Definition ah-date.H:140
void parallel_invoke(ThreadPool &pool, Fs &&... fs)
Invoke a small set of related callables in parallel.
std::function< void(std::exception_ptr)> ExceptionCallback
Type for exception callback in detached tasks.
STL namespace.
static struct argp_option options[]
Definition ntreepic.C:1886
Shared state for cooperative cancellation.
void register_condition_variable(std::condition_variable *cv)
void notify_observers() noexcept
std::atomic< bool > stop_requested
std::vector< std::condition_variable * > observers
void unregister_condition_variable(std::condition_variable *cv)
Common configuration object for parallel algorithms.
size_t chunk_size
Elements per chunk (0 = auto).
ThreadPool * pool
Executor to use (nullptr = default_pool()).
size_t min_size
Run sequentially below this size.
size_t max_tasks
Maximum number of scheduled tasks (0 = auto).
CancellationToken cancel_token
Cooperative cancellation observer.
Statistics collected by ThreadPool.
double queue_utilization(size_t soft_limit) const noexcept
Queue utilization as percentage (0-100)
size_t current_queue_size
Current pending tasks.
size_t num_workers
Number of worker threads.
size_t tasks_completed
Total tasks completed.
size_t peak_queue_size
Maximum queue size observed.
size_t total_processed() const noexcept
Total tasks processed (completed + failed)
size_t current_active
Currently executing tasks.
size_t tasks_failed
Tasks that threw exceptions (detached)
Type-erased task wrapper.
virtual void execute()=0
virtual ~TaskBase()=default
Virtual destructor.
Concrete task implementation with type preservation.
void execute() override