Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
thread_pool.H
Go to the documentation of this file.
1
2/*
3 Aleph_w
4
5 Data structures & Algorithms
6 version 2.0.0b
7 https://github.com/lrleon/Aleph-w
8
9 This file is part of Aleph-w library
10
11 Copyright (c) 2002-2026 Leandro Rabindranath Leon
12
13 Permission is hereby granted, free of charge, to any person obtaining a copy
14 of this software and associated documentation files (the "Software"), to deal
15 in the Software without restriction, including without limitation the rights
16 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17 copies of the Software, and to permit persons to whom the Software is
18 furnished to do so, subject to the following conditions:
19
20 The above copyright notice and this permission notice shall be included in all
21 copies or substantial portions of the Software.
22
23 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
24 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
26 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
28 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29 SOFTWARE.
30*/
31
32
110#ifndef ALEPH_THREAD_POOL_H
111#define ALEPH_THREAD_POOL_H
112
113#include <future>
114#include <queue>
115#include <functional>
116#include <condition_variable>
117#include <atomic>
118#include <vector>
119#include <thread>
120#include <mutex>
121#include <stdexcept>
122#include <type_traits>
123#include <tuple>
124#include <memory>
125#include <limits>
126#include <optional>
127#include <chrono>
128#include <numeric>
129#include <iterator>
130
131namespace Aleph
132{
133
139class queue_overflow_error : public std::overflow_error
140{
141public:
143 : std::overflow_error("ThreadPool queue overflow: " +
144 std::to_string(current_size) + " >= hard_limit " +
147
150
153
154private:
157};
158
164{
165 size_t tasks_completed = 0;
166 size_t tasks_failed = 0;
168 size_t current_active = 0;
169 size_t num_workers = 0;
170 size_t peak_queue_size = 0;
171
174 {
176 }
177
179 [[nodiscard]] double queue_utilization(size_t soft_limit) const noexcept
180 {
181 if (soft_limit == 0 || soft_limit == std::numeric_limits<size_t>::max())
182 return 0.0;
183 return (100.0 * current_queue_size) / soft_limit;
184 }
185};
186
188using ExceptionCallback = std::function<void(std::exception_ptr)>;
189
212{
214 struct TaskBase
215 {
216 virtual ~TaskBase() = default;
217 virtual void execute() = 0;
218 };
219
221 template <typename F>
222 struct Task : TaskBase
223 {
225
226 explicit Task(F&& f) : func(std::move(f)) {}
227
228 void execute() override { func(); }
229 };
230
231 std::vector<std::thread> workers;
232 std::queue<std::unique_ptr<TaskBase>> tasks;
233 mutable std::mutex queue_mutex;
234 std::condition_variable condition;
235 std::condition_variable space_available;
236 std::condition_variable idle_condition;
237 std::atomic<bool> stop{false};
238 std::atomic<size_t> active_tasks{0};
239
240 // Queue limits for bounded enqueue
241 size_t soft_limit_ = std::numeric_limits<size_t>::max();
242 size_t hard_limit_ = std::numeric_limits<size_t>::max();
243
244 // Statistics
245 std::atomic<size_t> tasks_completed_{0};
246 std::atomic<size_t> tasks_failed_{0};
247 std::atomic<size_t> peak_queue_size_{0};
248
249 // Exception callback for detached tasks
251
254 {
255 while (true)
256 {
257 std::unique_ptr<TaskBase> task;
258 bool should_notify_space = false;
259 {
260 std::unique_lock<std::mutex> lock(queue_mutex);
261 condition.wait(lock, [this] {
262 return stop or not tasks.empty();
263 });
264
265 if (stop and tasks.empty())
266 return;
267
268 // Check if we should notify waiters after popping
270
271 task = std::move(tasks.front());
272 tasks.pop();
273 ++active_tasks;
274 }
275
276 // Notify any blocked enqueuers that space is available
278 space_available.notify_all();
279
280 task->execute();
281
282 // Update stats and check for idle state
283 --active_tasks;
284
285 // Notify wait_all if we're now idle
286 {
287 std::unique_lock<std::mutex> lock(queue_mutex);
288 if (tasks.empty() && active_tasks == 0)
289 idle_condition.notify_all();
290 }
291 }
292 }
293
295 void start_workers(size_t n)
296 {
297 workers.reserve(n);
298 for (size_t i = 0; i < n; ++i)
299 workers.emplace_back(&ThreadPool::worker_loop, this);
300 }
301
304 {
305 {
306 std::unique_lock<std::mutex> lock(queue_mutex);
307 stop = true;
308 }
309 condition.notify_all();
310
311 for (auto& worker : workers)
312 if (worker.joinable())
313 worker.join();
314
315 workers.clear();
316 }
317
319 template <typename F, typename... Args>
320 static auto make_invocable(F&& f, Args&&... args)
321 {
322 // Decay types to store copies/moves, not references
323 return [func = std::forward<F>(f),
324 args_tuple = std::make_tuple(std::forward<Args>(args)...)]() mutable
325 {
326 return std::apply([&func](auto&&... a) {
327 return std::invoke(std::move(func), std::forward<decltype(a)>(a)...);
328 }, std::move(args_tuple));
329 };
330 }
331
332public:
338 explicit ThreadPool(size_t n_threads = std::thread::hardware_concurrency())
339 {
340 if (n_threads == 0)
341 n_threads = 1; // Ensure at least one worker
342
344 }
345
347 ThreadPool(const ThreadPool&) = delete;
348 ThreadPool& operator=(const ThreadPool&) = delete;
349
353
359 {
360 stop_workers();
361 }
362
406 template <typename F, typename... Args>
407 [[nodiscard]] auto enqueue(F&& f, Args&&... args)
408 -> std::future<std::invoke_result_t<F, Args...>>
409 {
410 using return_type = std::invoke_result_t<F, Args...>;
411
412 // Create the invocable that captures function and arguments
413 auto invocable = make_invocable(std::forward<F>(f), std::forward<Args>(args)...);
414
415 // Wrap in packaged_task for future support
416 auto promise = std::make_shared<std::promise<return_type>>();
417 std::future<return_type> result = promise->get_future();
418
419 // Capture stats counter for task completion tracking
421
422 // Create the task that will execute and set the promise
423 auto task_func = [invocable = std::move(invocable), promise, completed_counter]() mutable {
424 try
425 {
426 if constexpr (std::is_void_v<return_type>)
427 {
428 invocable();
429 promise->set_value();
430 }
431 else
432 {
433 promise->set_value(invocable());
434 }
435 ++(*completed_counter);
436 }
437 catch (...)
438 {
439 promise->set_exception(std::current_exception());
440 ++(*completed_counter); // Still counts as processed
441 }
442 };
443
444 {
445 std::unique_lock<std::mutex> lock(queue_mutex);
446
447 if (stop)
448 throw std::runtime_error("enqueue on stopped ThreadPool");
449
450 tasks.push(std::make_unique<Task<decltype(task_func)>>(std::move(task_func)));
451
452 // Update peak queue size
453 size_t current_size = tasks.size();
454 size_t peak = peak_queue_size_.load();
455 while (current_size > peak and
456 not peak_queue_size_.compare_exchange_weak(peak, current_size))
457 ;
458 }
459
460 condition.notify_one();
461 return result;
462 }
463
490 template <typename F, typename... Args>
491 void enqueue_detached(F&& f, Args&&... args)
492 {
493 auto invocable = make_invocable(std::forward<F>(f), std::forward<Args>(args)...);
494
495 // Capture stats and callback
498 ExceptionCallback callback;
499 {
500 std::unique_lock<std::mutex> lock(queue_mutex);
501 callback = exception_callback_;
502 }
503
504 auto task_func = [invocable = std::move(invocable),
506 callback = std::move(callback)]() mutable {
507 try
508 {
509 invocable();
510 ++(*completed_counter);
511 }
512 catch (...)
513 {
514 ++(*failed_counter);
515 // Call exception callback if set
516 if (callback)
517 {
518 try { callback(std::current_exception()); }
519 catch (...) { /* Ignore callback exceptions */ }
520 }
521 }
522 };
523
524 {
525 std::unique_lock<std::mutex> lock(queue_mutex);
526
527 if (stop)
528 throw std::runtime_error("enqueue_detached on stopped ThreadPool");
529
530 tasks.push(std::make_unique<Task<decltype(task_func)>>(std::move(task_func)));
531
532 // Update peak queue size
533 size_t current_size = tasks.size();
534 size_t peak = peak_queue_size_.load();
535 while (current_size > peak and
536 not peak_queue_size_.compare_exchange_weak(peak, current_size))
537 ;
538 }
539
540 condition.notify_one();
541 }
542
563 template <typename F, typename Container>
565 -> std::vector<std::future<std::invoke_result_t<F, typename Container::value_type>>>
566 {
567 using return_type = std::invoke_result_t<F, typename Container::value_type>;
568 std::vector<std::future<return_type>> results;
569 results.reserve(args_container.size());
570
571 for (const auto& arg : args_container)
572 results.push_back(enqueue(f, arg));
573
574 return results;
575 }
576
599 size_t hard_limit = std::numeric_limits<size_t>::max())
600 {
601 std::unique_lock<std::mutex> lock(queue_mutex);
603 hard_limit_ = (hard_limit == std::numeric_limits<size_t>::max() &&
604 soft_limit != std::numeric_limits<size_t>::max())
605 ? soft_limit * 10 // Default hard = 10x soft
606 : hard_limit;
607 }
608
613 [[nodiscard]] std::pair<size_t, size_t> get_queue_limits() const
614 {
615 std::unique_lock<std::mutex> lock(queue_mutex);
616 return {soft_limit_, hard_limit_};
617 }
618
653 template <typename F, typename... Args>
654 [[nodiscard]] auto enqueue_bounded(F&& f, Args&&... args)
655 -> std::future<std::invoke_result_t<F, Args...>>
656 {
657 using return_type = std::invoke_result_t<F, Args...>;
658
659 auto invocable = make_invocable(std::forward<F>(f), std::forward<Args>(args)...);
660
661 auto promise = std::make_shared<std::promise<return_type>>();
662 std::future<return_type> result = promise->get_future();
663
664 auto task_func = [invocable = std::move(invocable), promise]() mutable {
665 try
666 {
667 if constexpr (std::is_void_v<return_type>)
668 {
669 invocable();
670 promise->set_value();
671 }
672 else
673 {
674 promise->set_value(invocable());
675 }
676 }
677 catch (...)
678 {
679 promise->set_exception(std::current_exception());
680 }
681 };
682
683 {
684 std::unique_lock<std::mutex> lock(queue_mutex);
685
686 // Check hard limit first (throws immediately)
687 if (tasks.size() >= hard_limit_)
689
690 // Block if at soft limit (wait for space)
691 space_available.wait(lock, [this] {
692 return stop || tasks.size() < soft_limit_;
693 });
694
695 if (stop)
696 throw std::runtime_error("enqueue_bounded on stopped ThreadPool");
697
698 // Re-check hard limit after waking up
699 if (tasks.size() >= hard_limit_)
701
702 tasks.push(std::make_unique<Task<decltype(task_func)>>(std::move(task_func)));
703 }
704
705 condition.notify_one();
706 return result;
707 }
708
723 template <typename F, typename... Args>
725 {
726 auto invocable = make_invocable(std::forward<F>(f), std::forward<Args>(args)...);
727
728 auto task_func = [invocable = std::move(invocable)]() mutable {
729 try
730 {
731 invocable();
732 }
733 catch (...)
734 {
735 // Silently ignore exceptions in detached tasks
736 }
737 };
738
739 {
740 std::unique_lock<std::mutex> lock(queue_mutex);
741
742 // Check hard limit first
743 if (tasks.size() >= hard_limit_)
745
746 // Block if at soft limit
747 space_available.wait(lock, [this] {
748 return stop || tasks.size() < soft_limit_;
749 });
750
751 if (stop)
752 throw std::runtime_error("enqueue_bounded_detached on stopped ThreadPool");
753
754 // Re-check hard limit after waking up
755 if (tasks.size() >= hard_limit_)
757
758 tasks.push(std::make_unique<Task<decltype(task_func)>>(std::move(task_func)));
759 }
760
761 condition.notify_one();
762 }
763
798 template <typename F, typename... Args>
799 [[nodiscard]] auto try_enqueue(F&& f, Args&&... args)
800 -> std::optional<std::future<std::invoke_result_t<F, Args...>>>
801 {
802 {
803 std::unique_lock<std::mutex> lock(queue_mutex);
804
805 if (stop)
806 throw std::runtime_error("try_enqueue on stopped ThreadPool");
807
808 // Check if queue is at or above soft limit
809 if (tasks.size() >= soft_limit_)
810 return std::nullopt;
811 }
812
813 // Queue has space, delegate to regular enqueue
814 return enqueue(std::forward<F>(f), std::forward<Args>(args)...);
815 }
816
839 template <typename F, typename... Args>
841 {
842 {
843 std::unique_lock<std::mutex> lock(queue_mutex);
844
845 if (stop)
846 throw std::runtime_error("try_enqueue_detached on stopped ThreadPool");
847
848 if (tasks.size() >= soft_limit_)
849 return false;
850 }
851
852 enqueue_detached(std::forward<F>(f), std::forward<Args>(args)...);
853 return true;
854 }
855
860 {
861 return workers.size();
862 }
863
867 [[nodiscard]] size_t pending_tasks() const
868 {
869 std::unique_lock<std::mutex> lock(queue_mutex);
870 return tasks.size();
871 }
872
877 {
878 return active_tasks.load();
879 }
880
884 [[nodiscard]] bool is_idle() const
885 {
886 std::unique_lock<std::mutex> lock(queue_mutex);
887 return tasks.empty() && active_tasks == 0;
888 }
889
894 {
895 return stop.load();
896 }
897
908 void shutdown()
909 {
910 {
911 std::unique_lock<std::mutex> lock(queue_mutex);
912 if (stop.exchange(true))
913 return; // Already stopped
914 }
915 condition.notify_all();
916
917 for (auto& worker : workers)
918 if (worker.joinable())
919 worker.join();
920
921 workers.clear();
922 }
923
937 void resize(size_t new_size)
938 {
939 if (new_size == 0)
940 new_size = 1;
941
942 if (new_size == workers.size())
943 return;
944
945 if (stop)
946 throw std::runtime_error("cannot resize a stopped ThreadPool");
947
948 // Stop current workers (but don't clear tasks)
949 {
950 std::unique_lock<std::mutex> lock(queue_mutex);
951 stop = true;
952 }
953 condition.notify_all();
954
955 for (auto& worker : workers)
956 if (worker.joinable())
957 worker.join();
958
959 workers.clear();
960
961 // Restart with new size
962 stop = false;
964 }
965
973 void wait_all(std::chrono::milliseconds poll_interval = std::chrono::milliseconds(1))
974 {
975 while (not is_idle())
976 std::this_thread::sleep_for(poll_interval);
977 }
978
988 template <typename Rep, typename Period>
989 [[nodiscard]] bool wait_all_for(std::chrono::duration<Rep, Period> timeout)
990 {
991 std::unique_lock<std::mutex> lock(queue_mutex);
992 return idle_condition.wait_for(lock, timeout, [this] {
993 return tasks.empty() && active_tasks == 0;
994 });
995 }
996
1006 template <typename Clock, typename Duration>
1007 [[nodiscard]] bool wait_all_until(std::chrono::time_point<Clock, Duration> deadline)
1008 {
1009 std::unique_lock<std::mutex> lock(queue_mutex);
1010 return idle_condition.wait_until(lock, deadline, [this] {
1011 return tasks.empty() && active_tasks == 0;
1012 });
1013 }
1014
1022 {
1023 ThreadPoolStats stats;
1024 stats.tasks_completed = tasks_completed_.load();
1025 stats.tasks_failed = tasks_failed_.load();
1026 stats.current_active = active_tasks.load();
1027 stats.num_workers = workers.size();
1028 stats.peak_queue_size = peak_queue_size_.load();
1029
1030 {
1031 std::unique_lock<std::mutex> lock(queue_mutex);
1032 stats.current_queue_size = tasks.size();
1033 }
1034
1035 return stats;
1036 }
1037
1041 {
1042 tasks_completed_ = 0;
1043 tasks_failed_ = 0;
1044 peak_queue_size_ = 0;
1045 }
1046
1069 {
1070 std::unique_lock<std::mutex> lock(queue_mutex);
1071 exception_callback_ = std::move(callback);
1072 }
1073
1096 template <typename F, typename Container>
1098 -> std::vector<std::future<decltype(std::apply(f, *std::begin(args_list)))>>
1099 {
1100 using ArgsTuple = typename Container::value_type;
1101 using return_type = decltype(std::apply(f, std::declval<ArgsTuple>()));
1102
1103 std::vector<std::future<return_type>> results;
1104 results.reserve(std::distance(std::begin(args_list), std::end(args_list)));
1105
1106 std::vector<std::unique_ptr<TaskBase>> batch_tasks;
1107 batch_tasks.reserve(results.capacity());
1108
1109 // Capture stats counter
1111
1112 for (const auto& args : args_list)
1113 {
1114 auto promise = std::make_shared<std::promise<return_type>>();
1115 results.push_back(promise->get_future());
1116
1117 auto task_func = [func = f, args, promise, completed_counter]() mutable {
1118 try
1119 {
1120 if constexpr (std::is_void_v<return_type>)
1121 {
1122 std::apply(func, args);
1123 promise->set_value();
1124 }
1125 else
1126 {
1127 promise->set_value(std::apply(func, args));
1128 }
1129 ++(*completed_counter);
1130 }
1131 catch (...)
1132 {
1133 promise->set_exception(std::current_exception());
1134 ++(*completed_counter);
1135 }
1136 };
1137
1138 batch_tasks.push_back(
1139 std::make_unique<Task<decltype(task_func)>>(std::move(task_func)));
1140 }
1141
1142 {
1143 std::unique_lock<std::mutex> lock(queue_mutex);
1144
1145 if (stop)
1146 throw std::runtime_error("enqueue_batch on stopped ThreadPool");
1147
1148 for (auto& task : batch_tasks)
1149 tasks.push(std::move(task));
1150
1151 // Update peak queue size
1152 size_t current_size = tasks.size();
1153 size_t peak = peak_queue_size_.load();
1154 while (current_size > peak and
1155 not peak_queue_size_.compare_exchange_weak(peak, current_size))
1156 ;
1157 }
1158
1159 // Notify all workers for batch
1160 condition.notify_all();
1161
1162 return results;
1163 }
1164};
1165
1166// =============================================================================
1167// Parallel Algorithms
1168// =============================================================================
1169
1201template <typename Iterator, typename F>
1202void parallel_for(ThreadPool& pool, Iterator begin, Iterator end, F&& f,
1203 size_t chunk_size = 0)
1204{
1205 const size_t total = std::distance(begin, end);
1206 if (total == 0)
1207 return;
1208
1209 // Auto-calculate chunk size if not specified
1210 if (chunk_size == 0)
1211 chunk_size = std::max(size_t(1), total / (pool.num_threads() * 4));
1212
1213 std::vector<std::future<void>> futures;
1214 futures.reserve((total + chunk_size - 1) / chunk_size);
1215
1216 // Check if F accepts (Iterator, Iterator) or (T&)
1217 if constexpr (std::is_invocable_v<F, Iterator, Iterator>)
1218 {
1219 // Chunk-based function
1220 for (Iterator chunk_begin = begin; chunk_begin < end; )
1221 {
1222 Iterator chunk_end = chunk_begin;
1223 std::advance(chunk_end, std::min(chunk_size,
1224 static_cast<size_t>(std::distance(chunk_begin, end))));
1225
1226 futures.push_back(pool.enqueue([f, chunk_begin, chunk_end]() {
1227 f(chunk_begin, chunk_end);
1228 }));
1229
1231 }
1232 }
1233 else
1234 {
1235 // Per-element function
1236 for (Iterator chunk_begin = begin; chunk_begin < end; )
1237 {
1238 Iterator chunk_end = chunk_begin;
1239 std::advance(chunk_end, std::min(chunk_size,
1240 static_cast<size_t>(std::distance(chunk_begin, end))));
1241
1242 futures.push_back(pool.enqueue([f, chunk_begin, chunk_end]() {
1243 for (Iterator it = chunk_begin; it != chunk_end; ++it)
1244 f(*it);
1245 }));
1246
1248 }
1249 }
1250
1251 // Wait for all chunks to complete
1252 for (auto& fut : futures)
1253 fut.get();
1254}
1255
1285template <typename InputIt, typename OutputIt, typename F>
1287 OutputIt d_first, F&& f, size_t chunk_size = 0)
1288{
1289 const size_t total = std::distance(first, last);
1290 if (total == 0)
1291 return d_first;
1292
1293 if (chunk_size == 0)
1294 chunk_size = std::max(size_t(1), total / (pool.num_threads() * 4));
1295
1296 std::vector<std::future<void>> futures;
1297 futures.reserve((total + chunk_size - 1) / chunk_size);
1298
1299 InputIt chunk_in = first;
1301
1302 while (chunk_in < last)
1303 {
1304 size_t chunk_len = std::min(chunk_size,
1305 static_cast<size_t>(std::distance(chunk_in, last)));
1307 std::advance(chunk_in_end, chunk_len);
1308
1309 futures.push_back(pool.enqueue([f, chunk_in, chunk_in_end, chunk_out]() {
1310 InputIt in = chunk_in;
1311 OutputIt out = chunk_out;
1312 while (in != chunk_in_end)
1313 {
1314 *out = f(*in);
1315 ++in;
1316 ++out;
1317 }
1318 }));
1319
1321 std::advance(chunk_out, chunk_len);
1322 }
1323
1324 for (auto& fut : futures)
1325 fut.get();
1326
1327 return d_first + total;
1328}
1329
1364template <typename Iterator, typename T, typename BinaryOp>
1365T parallel_reduce(ThreadPool& pool, Iterator first, Iterator last,
1366 T init, BinaryOp op, size_t chunk_size = 0)
1367{
1368 const size_t total = std::distance(first, last);
1369 if (total == 0)
1370 return init;
1371
1372 if (chunk_size == 0)
1373 chunk_size = std::max(size_t(1), total / (pool.num_threads() * 4));
1374
1375 std::vector<std::future<T>> futures;
1376 futures.reserve((total + chunk_size - 1) / chunk_size);
1377
1378 for (Iterator chunk_begin = first; chunk_begin < last; )
1379 {
1380 Iterator chunk_end = chunk_begin;
1381 std::advance(chunk_end, std::min(chunk_size,
1382 static_cast<size_t>(std::distance(chunk_begin, last))));
1383
1384 futures.push_back(pool.enqueue([op, chunk_begin, chunk_end]() {
1385 T local_result = *chunk_begin;
1386 for (Iterator it = chunk_begin + 1; it != chunk_end; ++it)
1387 local_result = op(local_result, *it);
1388 return local_result;
1389 }));
1390
1391 chunk_begin = chunk_end;
1392 }
1393
1394 // Reduce partial results
1395 T result = init;
1396 for (auto& fut : futures)
1397 result = op(result, fut.get());
1398
1399 return result;
1400}
1401
1421template <typename F>
1422void parallel_for_index(ThreadPool& pool, size_t start, size_t end, F&& f,
1423 size_t chunk_size = 0)
1424{
1425 if (start >= end)
1426 return;
1427
1428 const size_t total = end - start;
1429
1430 if (chunk_size == 0)
1431 chunk_size = std::max(size_t(1), total / (pool.num_threads() * 4));
1432
1433 std::vector<std::future<void>> futures;
1434 futures.reserve((total + chunk_size - 1) / chunk_size);
1435
1436 for (size_t chunk_start = start; chunk_start < end; )
1437 {
1438 size_t chunk_end = std::min(chunk_start + chunk_size, end);
1439
1440 futures.push_back(pool.enqueue([f, chunk_start, chunk_end]() {
1441 for (size_t i = chunk_start; i < chunk_end; ++i)
1442 f(i);
1443 }));
1444
1445 chunk_start = chunk_end;
1446 }
1447
1448 for (auto& fut : futures)
1449 fut.get();
1450}
1451
1466{
1467 static ThreadPool pool;
1468 return pool;
1469}
1470
1471} // end namespace Aleph
1472
1473#endif // ALEPH_THREAD_POOL_H
void empty() noexcept
empty the list
Definition htlist.H:1689
size_t size() const noexcept
Count the number of elements of the list.
Definition htlist.H:1319
A reusable thread pool for efficient parallel task execution.
bool try_enqueue_detached(F &&f, Args &&... args)
Try to submit a detached task without blocking or throwing.
size_t num_threads() const noexcept
Get the number of worker threads.
std::mutex queue_mutex
bool wait_all_for(std::chrono::duration< Rep, Period > timeout)
Wait until all current tasks complete, with timeout.
void enqueue_detached(F &&f, Args &&... args)
Submit a task without tracking the result (fire-and-forget).
void set_exception_callback(ExceptionCallback callback)
Set callback for exceptions in detached tasks.
auto enqueue_batch(F &&f, const Container &args_list) -> std::vector< std::future< decltype(std::apply(f, *std::begin(args_list)))> >
Submit multiple tasks with a single notification.
size_t running_tasks() const noexcept
Get the number of tasks currently being executed.
bool is_idle() const
Check if the pool is idle (no pending or running tasks).
auto enqueue_bounded(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task with backpressure and memory protection.
ThreadPool(ThreadPool &&)=delete
Non-movable (due to mutex and condition_variable)
static auto make_invocable(F &&f, Args &&... args)
Helper to create a callable from function + arguments using std::invoke.
std::condition_variable idle_condition
Notifies wait_all() of idle state.
void wait_all(std::chrono::milliseconds poll_interval=std::chrono::milliseconds(1))
Wait until all current tasks complete.
auto enqueue_bulk(F &&f, const Container &args_container) -> std::vector< std::future< std::invoke_result_t< F, typename Container::value_type > > >
Submit multiple tasks and collect all futures.
void resize(size_t new_size)
Resize the pool to a different number of workers.
ThreadPoolStats get_stats() const
Get current pool statistics.
void reset_stats()
Reset statistics counters to zero.
bool wait_all_until(std::chrono::time_point< Clock, Duration > deadline)
Wait until all current tasks complete, with deadline.
std::condition_variable space_available
Notifies enqueuers of queue space.
void start_workers(size_t n)
Start n worker threads.
void set_queue_limits(size_t soft_limit, size_t hard_limit=std::numeric_limits< size_t >::max())
Set queue limits for bounded enqueue operations.
std::atomic< size_t > peak_queue_size_
ThreadPool & operator=(ThreadPool &&)=delete
bool is_stopped() const noexcept
Check if the pool has been shut down.
std::pair< size_t, size_t > get_queue_limits() const
Get current queue limits.
ThreadPool(size_t n_threads=std::thread::hardware_concurrency())
Construct a thread pool with specified number of workers.
std::atomic< size_t > tasks_failed_
size_t hard_limit_
Exception threshold.
~ThreadPool()
Destructor - stops all workers and waits for completion.
size_t soft_limit_
Block threshold.
std::condition_variable condition
Notifies workers of new tasks.
std::vector< std::thread > workers
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task for execution and get a future for the result.
void enqueue_bounded_detached(F &&f, Args &&... args)
Submit a task with backpressure, without tracking result.
void stop_workers()
Stop and join all workers.
std::atomic< bool > stop
void worker_loop()
Worker thread main loop.
std::atomic< size_t > active_tasks
size_t pending_tasks() const
Get the number of pending tasks in the queue.
void shutdown()
Shut down the pool, completing all pending tasks.
std::atomic< size_t > tasks_completed_
ThreadPool(const ThreadPool &)=delete
Non-copyable.
auto try_enqueue(F &&f, Args &&... args) -> std::optional< std::future< std::invoke_result_t< F, Args... > > >
Try to submit a task without blocking or throwing.
ThreadPool & operator=(const ThreadPool &)=delete
std::queue< std::unique_ptr< TaskBase > > tasks
ExceptionCallback exception_callback_
Exception thrown when the task queue exceeds its hard limit.
size_t current_size() const noexcept
Current queue size when exception was thrown.
queue_overflow_error(size_t current_size, size_t hard_limit)
size_t hard_limit() const noexcept
Hard limit that was exceeded.
Main namespace for Aleph-w library functions.
Definition ah-arena.H:89
T parallel_reduce(ThreadPool &pool, Iterator first, Iterator last, T init, BinaryOp op, size_t chunk_size=0)
Reduce elements in parallel.
OutputIt parallel_transform(ThreadPool &pool, InputIt first, InputIt last, OutputIt d_first, F &&f, size_t chunk_size=0)
Transform elements in parallel and store results.
ThreadPool & default_pool()
Global default thread pool.
void parallel_for_index(ThreadPool &pool, size_t start, size_t end, F &&f, size_t chunk_size=0)
Apply a function to each element in parallel (index-based).
void parallel_for(ThreadPool &pool, Iterator begin, Iterator end, F &&f, size_t chunk_size=0)
Execute a function in parallel over a range.
std::decay_t< typename HeadC::Item_Type > T
Definition ah-zip.H:107
std::string to_string(const time_t t, const std::string &format)
Format a time_t value into a string using format.
Definition ah-date.H:140
std::function< void(std::exception_ptr)> ExceptionCallback
Type for exception callback in detached tasks.
DynList< T > maps(const C &c, Op op)
Classic map operation.
STL namespace.
Statistics collected by ThreadPool.
double queue_utilization(size_t soft_limit) const noexcept
Queue utilization as percentage (0-100)
size_t current_queue_size
Current pending tasks.
size_t num_workers
Number of worker threads.
size_t tasks_completed
Total tasks completed.
size_t peak_queue_size
Maximum queue size observed.
size_t total_processed() const noexcept
Total tasks processed (completed + failed)
size_t current_active
Currently executing tasks.
size_t tasks_failed
Tasks that threw exceptions (detached)
Type-erased task wrapper.
virtual void execute()=0
virtual ~TaskBase()=default
Concrete task implementation with type preservation.
void execute() override