Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
thread_pool_test.cc
Go to the documentation of this file.
1
2/*
3 Aleph_w
4
5 Data structures & Algorithms
6 version 2.0.0b
7 https://github.com/lrleon/Aleph-w
8
9 This file is part of Aleph-w library
10
11 Copyright (c) 2002-2026 Leandro Rabindranath Leon
12
13 Permission is hereby granted, free of charge, to any person obtaining a copy
14 of this software and associated documentation files (the "Software"), to deal
15 in the Software without restriction, including without limitation the rights
16 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17 copies of the Software, and to permit persons to whom the Software is
18 furnished to do so, subject to the following conditions:
19
20 The above copyright notice and this permission notice shall be included in all
21 copies or substantial portions of the Software.
22
23 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
24 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
26 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
28 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29 SOFTWARE.
30*/
31
32
33
40#include <gtest/gtest.h>
41#include <thread_pool.H>
42#include <atomic>
43#include <chrono>
44#include <vector>
45#include <cmath>
46#include <numeric>
47#include <iomanip>
48#include <algorithm>
49
50using namespace Aleph;
51using namespace std::chrono_literals;
52
53class ThreadPoolTest : public ::testing::Test
54{
55protected:
56 void SetUp() override {}
57 void TearDown() override {}
58};
59
60// ============================================================================
61// Basic Functionality Tests
62// ============================================================================
63
71
77
83
85{
86 ThreadPool pool(2);
87
88 auto future = pool.enqueue([] { return 42; });
89
90 EXPECT_EQ(future.get(), 42);
91}
92
94{
95 ThreadPool pool(2);
96
97 auto future = pool.enqueue([](int a, int b) { return a + b; }, 10, 20);
98
99 EXPECT_EQ(future.get(), 30);
100}
101
103{
104 ThreadPool pool(2);
105 int value = 100;
106
107 auto future = pool.enqueue([&value] { return value * 2; });
108
109 EXPECT_EQ(future.get(), 200);
110}
111
113{
114 ThreadPool pool(2);
115 std::atomic<bool> executed{false};
116
117 auto future = pool.enqueue([&executed] { executed = true; });
118 future.get();
119
120 EXPECT_TRUE(executed);
121}
122
124{
125 ThreadPool pool(4);
126 const int num_tasks = 100;
127 std::vector<std::future<int>> futures;
128
129 for (int i = 0; i < num_tasks; ++i)
130 futures.push_back(pool.enqueue([i] { return i * i; }));
131
132 for (int i = 0; i < num_tasks; ++i)
133 EXPECT_EQ(futures[i].get(), i * i);
134}
135
137{
138 CancellationSource source;
139 auto token = source.token();
140
141 EXPECT_FALSE(token.stop_requested());
142 source.request_cancel();
143 EXPECT_TRUE(token.stop_requested());
144 EXPECT_THROW(token.throw_if_cancellation_requested(), operation_canceled);
145}
146
148{
149 ThreadPool pool(4);
150 TaskGroup group(pool);
151 std::atomic<int> sum{0};
152
153 for (int i = 1; i <= 8; ++i)
154 group.launch([&sum, i] { sum.fetch_add(i, std::memory_order_relaxed); });
155
156 EXPECT_EQ(group.size(), 8u);
157 EXPECT_NO_THROW(group.wait());
158 EXPECT_EQ(sum.load(), 36);
159 EXPECT_TRUE(group.is_empty());
160}
161
163{
164 ThreadPool pool(2);
165 TaskGroup group(pool);
166 std::atomic<int> completed{0};
167
168 group.launch([&completed] {
169 ++completed;
170 });
171 group.launch([] {
172 throw std::logic_error("boom");
173 });
174 group.launch([&completed] {
175 ++completed;
176 });
177
178 EXPECT_THROW(group.wait(), std::logic_error);
179 EXPECT_EQ(completed.load(), 2);
180 EXPECT_TRUE(group.is_empty());
181}
182
184{
185 ThreadPool pool(4);
186 CancellationSource source;
187 std::atomic<size_t> processed{0};
188
190 options.pool = &pool;
191 options.chunk_size = 16;
192 options.cancel_token = source.token();
193
195 [&](size_t i) {
196 if (i == 64)
197 source.request_cancel();
198 ++processed;
199 },
200 options),
202 EXPECT_LT(processed.load(), 1024u);
203}
204
206{
207 ThreadPool pool(4);
208 std::atomic<int> sum{0};
209
210 parallel_invoke(pool,
211 [&] { sum.fetch_add(1, std::memory_order_relaxed); },
212 [&] { sum.fetch_add(10, std::memory_order_relaxed); },
213 [&] { sum.fetch_add(100, std::memory_order_relaxed); });
214
215 EXPECT_EQ(sum.load(), 111);
216}
217
219{
220 ThreadPool pool(3);
221 std::atomic<int> completed{0};
222
224 [&] { ++completed; },
225 [] { throw std::runtime_error("parallel_invoke boom"); },
226 [&] { ++completed; }),
227 std::runtime_error);
228 EXPECT_EQ(completed.load(), 2);
229}
230
232{
233 ThreadPool pool(2);
234 CancellationSource source;
236 options.pool = &pool;
237 options.cancel_token = source.token();
238 source.request_cancel();
239
241 [] {},
242 [] {}),
244}
245
247{
248 ThreadPool pool(4);
249 std::vector<int> input(1024);
250 std::iota(input.begin(), input.end(), 1);
251 std::vector<int> output(input.size());
252 std::vector<int> expected(input.size());
253
254 std::partial_sum(input.begin(), input.end(), expected.begin());
255 pscan(pool, input.begin(), input.end(), output.begin(), std::plus<int>{});
256
258}
259
261{
262 ThreadPool pool(4);
263 std::vector<int> input = {3, 1, 4, 1, 5, 9};
264 std::vector<int> output(input.size());
266 options.pool = &pool;
267 options.chunk_size = 2;
268
269 pscan(input.begin(), input.end(), output.begin(), std::plus<int>{}, options);
270 EXPECT_EQ(output, (std::vector<int>{3, 4, 8, 9, 14, 23}));
271}
272
274{
275 ThreadPool pool(4);
276 std::vector<int> input = {1, 2, 3, 4, 5};
277 std::vector<int> output(input.size());
278
279 pexclusive_scan(pool, input.begin(), input.end(), output.begin(), 0, std::plus<int>{});
280 EXPECT_EQ(output, (std::vector<int>{0, 1, 3, 6, 10}));
281}
282
284{
285 ThreadPool pool(4);
286 std::vector<int> input(2048, 1);
287 std::vector<int> output(input.size());
288 CancellationSource source;
290 options.pool = &pool;
291 options.chunk_size = 64;
292 options.cancel_token = source.token();
293 source.request_cancel();
294
295 EXPECT_THROW(pscan(input.begin(), input.end(), output.begin(),
296 std::plus<int>{}, options),
298}
299
301{
302 ThreadPool pool(4);
303 std::vector<int> left(500), right(700), merged(1200), expected(1200);
304 std::iota(left.begin(), left.end(), 0);
305 std::iota(right.begin(), right.end(), 250);
306
307 std::merge(left.begin(), left.end(), right.begin(), right.end(), expected.begin());
308 pmerge(pool, left.begin(), left.end(), right.begin(), right.end(), merged.begin());
309
310 EXPECT_EQ(merged, expected);
311}
312
314{
315 ThreadPool pool(4);
316 std::vector<int> left = {9, 7, 5, 3, 1};
317 std::vector<int> right = {10, 8, 6, 4, 2};
318 std::vector<int> merged(left.size() + right.size());
320 options.pool = &pool;
321 options.chunk_size = 3;
322
323 pmerge(left.begin(), left.end(), right.begin(), right.end(),
324 merged.begin(), std::greater<int>{}, options);
325
326 EXPECT_EQ(merged, (std::vector<int>{10, 9, 8, 7, 6, 5, 4, 3, 2, 1}));
327}
328
330{
331 ThreadPool pool(4);
332 std::vector<int> left(1024), right(1024), merged(2048);
333 std::iota(left.begin(), left.end(), 0);
334 std::iota(right.begin(), right.end(), 0);
335 CancellationSource source;
337 options.pool = &pool;
338 options.chunk_size = 64;
339 options.cancel_token = source.token();
340 source.request_cancel();
341
342 EXPECT_THROW(pmerge(left.begin(), left.end(), right.begin(), right.end(),
343 merged.begin(), std::less<int>{}, options),
345}
346
347// ============================================================================
348// Boundary Cases: empty / single-element
349// ============================================================================
350
352{
353 ThreadPool pool(4);
354 std::vector<int> input, output;
355 pscan(pool, input.begin(), input.end(), output.begin(), std::plus<int>{});
356 EXPECT_TRUE(output.empty());
357
359 options.pool = &pool;
360 pscan(input.begin(), input.end(), output.begin(), std::plus<int>{}, options);
361 EXPECT_TRUE(output.empty());
362}
363
365{
366 ThreadPool pool(4);
367 std::vector<int> input = {42};
368 std::vector<int> output(1);
369 pscan(pool, input.begin(), input.end(), output.begin(), std::plus<int>{});
370 EXPECT_EQ(output, (std::vector<int>{42}));
371
373 options.pool = &pool;
374 std::fill(output.begin(), output.end(), 0);
375 pscan(input.begin(), input.end(), output.begin(), std::plus<int>{}, options);
376 EXPECT_EQ(output, (std::vector<int>{42}));
377}
378
380{
381 ThreadPool pool(4);
382 std::vector<int> input, output;
383 pexclusive_scan(pool, input.begin(), input.end(), output.begin(), 0, std::plus<int>{});
384 EXPECT_TRUE(output.empty());
385
387 options.pool = &pool;
388 pexclusive_scan(input.begin(), input.end(), output.begin(), 0, std::plus<int>{}, options);
389 EXPECT_TRUE(output.empty());
390}
391
393{
394 ThreadPool pool(4);
395 std::vector<int> input = {7};
396 std::vector<int> output(1);
397 pexclusive_scan(pool, input.begin(), input.end(), output.begin(), 0, std::plus<int>{});
398 EXPECT_EQ(output, (std::vector<int>{0}));
399
401 options.pool = &pool;
402 std::fill(output.begin(), output.end(), -1);
403 pexclusive_scan(input.begin(), input.end(), output.begin(), 0, std::plus<int>{}, options);
404 EXPECT_EQ(output, (std::vector<int>{0}));
405}
406
408{
409 ThreadPool pool(4);
410 std::vector<int> left, right = {1, 2, 3};
411 std::vector<int> merged(right.size());
412 pmerge(pool, left.begin(), left.end(), right.begin(), right.end(), merged.begin());
413 EXPECT_EQ(merged, right);
414
416 options.pool = &pool;
417 std::fill(merged.begin(), merged.end(), 0);
418 pmerge(left.begin(), left.end(), right.begin(), right.end(),
419 merged.begin(), std::less<int>{}, options);
420 EXPECT_EQ(merged, right);
421}
422
424{
425 ThreadPool pool(4);
426 std::vector<int> left = {1, 2, 3}, right;
427 std::vector<int> merged(left.size());
428 pmerge(pool, left.begin(), left.end(), right.begin(), right.end(), merged.begin());
429 EXPECT_EQ(merged, left);
430
432 options.pool = &pool;
433 std::fill(merged.begin(), merged.end(), 0);
434 pmerge(left.begin(), left.end(), right.begin(), right.end(),
435 merged.begin(), std::less<int>{}, options);
436 EXPECT_EQ(merged, left);
437}
438
440{
441 ThreadPool pool(4);
442 std::vector<int> left, right, merged;
443 pmerge(pool, left.begin(), left.end(), right.begin(), right.end(), merged.begin());
444 EXPECT_TRUE(merged.empty());
445
447 options.pool = &pool;
448 pmerge(left.begin(), left.end(), right.begin(), right.end(),
449 merged.begin(), std::less<int>{}, options);
450 EXPECT_TRUE(merged.empty());
451}
452
454{
455 ThreadPool pool(4);
456 std::vector<long long> scan_input(200000);
457 std::iota(scan_input.begin(), scan_input.end(), 1LL);
458 std::vector<long long> scan_output(scan_input.size());
459
460 auto scan_start = std::chrono::high_resolution_clock::now();
461 pscan(pool, scan_input.begin(), scan_input.end(), scan_output.begin(), std::plus<long long>{});
462 auto scan_end = std::chrono::high_resolution_clock::now();
463
464 std::vector<int> left(100000), right(100000), merged(200000);
465 std::iota(left.begin(), left.end(), 0);
466 std::iota(right.begin(), right.end(), 50000);
467
468 auto merge_start = std::chrono::high_resolution_clock::now();
469 pmerge(pool, left.begin(), left.end(), right.begin(), right.end(), merged.begin());
470 auto merge_end = std::chrono::high_resolution_clock::now();
471
472 auto scan_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
473 scan_end - scan_start).count();
474 auto merge_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
475 merge_end - merge_start).count();
476
477 EXPECT_TRUE(std::is_sorted(merged.begin(), merged.end()));
478 EXPECT_EQ(scan_output.back(),
479 std::accumulate(scan_input.begin(), scan_input.end(), 0LL));
480
481 std::cout << "\n=== Benchmark: pscan / pmerge ===\n";
482 std::cout << "pscan(200K): " << scan_ms << " ms\n";
483 std::cout << "pmerge(200K): " << merge_ms << " ms\n";
484 std::cout << "=================================\n\n";
485}
486
487// ============================================================================
488// Status and Query Tests
489// ============================================================================
490
492{
493 ThreadPool pool(1);
494 std::atomic<bool> block{true};
495
496 // Block the single worker
497 pool.enqueue_detached([&block] { while (block) std::this_thread::yield(); });
498
499 // Queue more tasks
500 for (int i = 0; i < 5; ++i)
501 pool.enqueue_detached([] {});
502
503 // Should have pending tasks
504 EXPECT_GT(pool.pending_tasks(), 0u);
505
506 // Unblock
507 block = false;
508 pool.wait_all();
509
510 EXPECT_EQ(pool.pending_tasks(), 0u);
511}
512
514{
515 ThreadPool pool(2);
516
517 auto f1 = pool.enqueue([] { return 1; });
518 auto f2 = pool.enqueue([] { return 2; });
519
520 f1.get();
521 f2.get();
522
523 pool.wait_all();
524 EXPECT_TRUE(pool.is_idle());
525}
526
527// ============================================================================
528// Shutdown Tests
529// ============================================================================
530
532{
533 ThreadPool pool(2);
534 std::atomic<int> counter{0};
535
536 for (int i = 0; i < 10; ++i)
537 pool.enqueue_detached([&counter] { ++counter; });
538
539 pool.shutdown();
540
541 EXPECT_EQ(counter.load(), 10);
542 EXPECT_TRUE(pool.is_stopped());
543}
544
546{
547 ThreadPool pool(2);
548 pool.shutdown();
549
550 EXPECT_THROW(pool.enqueue([] { return 0; }), std::runtime_error);
551}
552
554{
555 ThreadPool pool(2);
556 pool.shutdown();
558}
559
560// ============================================================================
561// Resize Tests
562// ============================================================================
563
565{
566 ThreadPool pool(2);
567 EXPECT_EQ(pool.num_threads(), 2u);
568
569 pool.resize(4);
570 EXPECT_EQ(pool.num_threads(), 4u);
571
572 // Verify new workers work
573 auto future = pool.enqueue([] { return 42; });
574 EXPECT_EQ(future.get(), 42);
575}
576
578{
579 ThreadPool pool(4);
580 EXPECT_EQ(pool.num_threads(), 4u);
581
582 pool.resize(2);
583 EXPECT_EQ(pool.num_threads(), 2u);
584
585 // Verify remaining workers work
586 auto future = pool.enqueue([] { return 42; });
587 EXPECT_EQ(future.get(), 42);
588}
589
591{
592 ThreadPool pool(4);
593 pool.resize(4);
594 EXPECT_EQ(pool.num_threads(), 4u);
595}
596
598{
599 ThreadPool pool(2);
600 pool.shutdown();
601
602 EXPECT_THROW(pool.resize(4), std::runtime_error);
603}
604
606{
607 ThreadPool pool(1);
608 std::atomic<bool> block{true};
609 std::atomic<int> completed{0};
610
611 // Block the worker
612 pool.enqueue_detached([&block] { while (block) std::this_thread::yield(); });
613
614 // Queue tasks
615 for (int i = 0; i < 5; ++i)
616 pool.enqueue_detached([&completed] { ++completed; });
617
618 // Unblock and resize
619 block = false;
620 pool.resize(4);
621
622 pool.wait_all();
623
624 // All tasks should complete (first blocking task + 5 counting tasks)
625 EXPECT_EQ(completed.load(), 5);
626}
627
628// ============================================================================
629// Exception Handling Tests
630// ============================================================================
631
633{
634 ThreadPool pool(2);
635
636 auto future = pool.enqueue([] {
637 throw std::runtime_error("test exception");
638 return 0;
639 });
640
641 EXPECT_THROW(future.get(), std::runtime_error);
642}
643
645{
646 ThreadPool pool(2);
647
648 auto f1 = pool.enqueue([] {
649 throw std::runtime_error("test");
650 return 0;
651 });
652
653 auto f2 = pool.enqueue([] { return 42; });
654
655 EXPECT_THROW(f1.get(), std::runtime_error);
656 EXPECT_EQ(f2.get(), 42);
657}
658
659// ============================================================================
660// Concurrency Tests
661// ============================================================================
662
664{
665 ThreadPool pool(4);
666 std::atomic<int> counter{0};
667 const int tasks_per_thread = 100;
668 const int num_enqueue_threads = 4;
669
670 std::vector<std::thread> enqueuers;
671 for (int t = 0; t < num_enqueue_threads; ++t)
672 {
673 enqueuers.emplace_back([&pool, &counter, tasks_per_thread] {
674 for (int i = 0; i < tasks_per_thread; ++i)
675 pool.enqueue_detached([&counter] { ++counter; });
676 });
677 }
678
679 for (auto& t : enqueuers)
680 t.join();
681
682 pool.wait_all();
683
685}
686
688{
689 ThreadPool pool(4);
690 std::atomic<int> concurrent_count{0};
691 std::atomic<int> max_concurrent{0};
692
693 std::vector<std::future<void>> futures;
694 for (int i = 0; i < 100; ++i)
695 {
696 futures.push_back(pool.enqueue([&concurrent_count, &max_concurrent] {
697 int current = ++concurrent_count;
698
699 // Update max if this is higher
700 int prev_max = max_concurrent.load();
701 while (current > prev_max &&
702 !max_concurrent.compare_exchange_weak(prev_max, current))
703 ;
704
705 std::this_thread::sleep_for(1ms);
706 --concurrent_count;
707 }));
708 }
709
710 for (auto& f : futures)
711 f.get();
712
713 // Should have had multiple concurrent executions
714 EXPECT_GT(max_concurrent.load(), 1);
715}
716
717// ============================================================================
718// Performance Tests (not strictly unit tests, but useful)
719// ============================================================================
720
722{
723 ThreadPool pool(std::thread::hardware_concurrency());
724 const int num_tasks = 10000;
725 std::atomic<int> sum{0};
726
727 auto start = std::chrono::high_resolution_clock::now();
728
729 std::vector<std::future<void>> futures;
730 futures.reserve(num_tasks);
731
732 for (int i = 0; i < num_tasks; ++i)
733 futures.push_back(pool.enqueue([&sum, i] { sum += i; }));
734
735 for (auto& f : futures)
736 f.get();
737
738 auto end = std::chrono::high_resolution_clock::now();
739 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
740
741 // Expected sum: 0 + 1 + 2 + ... + (n-1) = n*(n-1)/2
742 int expected = num_tasks * (num_tasks - 1) / 2;
743 EXPECT_EQ(sum.load(), expected);
744
745 // Should complete reasonably fast (less than 5 seconds)
746 EXPECT_LT(duration.count(), 5000);
747}
748
750{
751 ThreadPool pool(std::thread::hardware_concurrency());
752 const int num_tasks = 100;
753
754 // Compute-intensive task: calculate sum of squares
755 auto compute = [](int n) {
756 double sum = 0;
757 for (int i = 0; i < n; ++i)
758 sum += std::sqrt(static_cast<double>(i));
759 return sum;
760 };
761
762 std::vector<std::future<double>> futures;
763 for (int i = 0; i < num_tasks; ++i)
764 futures.push_back(pool.enqueue(compute, 10000));
765
766 for (auto& f : futures)
767 f.get();
768
769 // Wait for pool to transition to idle state (fixes race condition in CI)
770 const auto start = std::chrono::steady_clock::now();
771 while (!pool.is_idle() &&
772 std::chrono::steady_clock::now() - start < 1s)
773 {
774 std::this_thread::sleep_for(1ms);
775 }
776
777 EXPECT_TRUE(pool.is_idle());
778}
779
780// ============================================================================
781// Return Type Tests
782// ============================================================================
783
785{
786 ThreadPool pool(2);
787
788 auto future = pool.enqueue([] { return std::string("hello"); });
789
790 EXPECT_EQ(future.get(), "hello");
791}
792
794{
795 ThreadPool pool(2);
796
797 auto future = pool.enqueue([] {
798 return std::vector<int>{1, 2, 3, 4, 5};
799 });
800
801 auto result = future.get();
802 EXPECT_EQ(result.size(), 5u);
803 EXPECT_EQ(result[2], 3);
804}
805
807{
808 ThreadPool pool(2);
809
810 auto future = pool.enqueue([](int a, int b) {
811 return std::make_pair(a + b, a * b);
812 }, 3, 4);
813
814 auto [sum, product] = future.get();
815 EXPECT_EQ(sum, 7);
816 EXPECT_EQ(product, 12);
817}
818
819// ============================================================================
820// Callable Types Tests
821// ============================================================================
822
823int free_function(int x) { return x * 2; }
824
826{
827 int value;
828 int operator()(int x) const { return x + value; }
829};
830
832{
833 ThreadPool pool(2);
834
835 auto future = pool.enqueue(free_function, 21);
836
837 EXPECT_EQ(future.get(), 42);
838}
839
841{
842 ThreadPool pool(2);
843 Functor f{10};
844
845 auto future = pool.enqueue(f, 32);
846
847 EXPECT_EQ(future.get(), 42);
848}
849
851{
852 ThreadPool pool(2);
853 std::function<int(int)> func = [](int x) { return x * x; };
854
855 auto future = pool.enqueue(func, 6);
856
857 EXPECT_EQ(future.get(), 36);
858}
859
860// ============================================================================
861// Member Function Tests
862// ============================================================================
863
865{
866 int value = 10;
867
868 int add(int x) { return value + x; }
869 int multiply(int a, int b) const { return a * b; }
870 static int square(int x) { return x * x; }
871};
872
874{
875 ThreadPool pool(2);
877 calc.value = 20;
878
879 auto future = pool.enqueue(&Calculator::add, &calc, 22);
880
881 EXPECT_EQ(future.get(), 42);
882}
883
885{
886 ThreadPool pool(2);
888
889 auto future = pool.enqueue(&Calculator::multiply, &calc, 6, 7);
890
891 EXPECT_EQ(future.get(), 42);
892}
893
895{
896 ThreadPool pool(2);
897
898 auto future = pool.enqueue(&Calculator::square, 7);
899
900 EXPECT_EQ(future.get(), 49);
901}
902
904{
905 ThreadPool pool(2);
907 calc.value = 30;
908
909 // Using reference wrapper
910 auto future = pool.enqueue(&Calculator::add, std::ref(calc), 12);
911
912 EXPECT_EQ(future.get(), 42);
913}
914
915// ============================================================================
916// Move-Only Tests
917// ============================================================================
918
920{
921 ThreadPool pool(2);
922
923 auto ptr = std::make_unique<int>(42);
924 auto future = pool.enqueue([p = std::move(ptr)]() { return *p; });
925
926 EXPECT_EQ(future.get(), 42);
927}
928
930{
931 ThreadPool pool(2);
932
933 auto ptr = std::make_unique<int>(100);
934 auto future = pool.enqueue([](std::unique_ptr<int> p) { return *p * 2; },
935 std::move(ptr));
936
937 EXPECT_EQ(future.get(), 200);
938}
939
941{
942 ThreadPool pool(2);
943
944 struct MoveOnlyFunctor
945 {
946 std::unique_ptr<int> data;
947
948 MoveOnlyFunctor(int v) : data(std::make_unique<int>(v)) {}
949 MoveOnlyFunctor(MoveOnlyFunctor&&) = default;
950 MoveOnlyFunctor& operator=(MoveOnlyFunctor&&) = default;
951 MoveOnlyFunctor(const MoveOnlyFunctor&) = delete;
952
953 int operator()() { return *data; }
954 };
955
956 auto future = pool.enqueue(MoveOnlyFunctor{42});
957
958 EXPECT_EQ(future.get(), 42);
959}
960
961// ============================================================================
962// Enqueue Detached Tests
963// ============================================================================
964
966{
967 ThreadPool pool(2);
968 std::atomic<int> counter{0};
969
970 for (int i = 0; i < 10; ++i)
971 pool.enqueue_detached([&counter] { ++counter; });
972
973 pool.wait_all();
974
975 EXPECT_EQ(counter.load(), 10);
976}
977
979{
980 ThreadPool pool(2);
981 std::atomic<int> sum{0};
982
983 for (int i = 1; i <= 5; ++i)
984 pool.enqueue_detached([&sum](int x) { sum += x; }, i);
985
986 pool.wait_all();
987
988 EXPECT_EQ(sum.load(), 15); // 1+2+3+4+5
989}
990
992{
993 ThreadPool pool(2);
994 std::atomic<int> counter{0};
995
996 // This should not crash - exceptions are silently ignored
997 pool.enqueue_detached([] { throw std::runtime_error("ignored"); });
998 pool.enqueue_detached([&counter] { ++counter; });
999
1000 pool.wait_all();
1001
1002 EXPECT_EQ(counter.load(), 1);
1003}
1004
1006{
1007 ThreadPool pool(2);
1008 pool.shutdown();
1009
1010 EXPECT_THROW(pool.enqueue_detached([] {}), std::runtime_error);
1011}
1012
1013// ============================================================================
1014// Enqueue Bulk Tests
1015// ============================================================================
1016
1018{
1019 ThreadPool pool(4);
1020 std::vector<int> inputs = {1, 2, 3, 4, 5};
1021
1022 auto futures = pool.enqueue_bulk([](int x) { return x * x; }, inputs);
1023
1024 ASSERT_EQ(futures.size(), 5u);
1025 EXPECT_EQ(futures[0].get(), 1);
1026 EXPECT_EQ(futures[1].get(), 4);
1027 EXPECT_EQ(futures[2].get(), 9);
1028 EXPECT_EQ(futures[3].get(), 16);
1029 EXPECT_EQ(futures[4].get(), 25);
1030}
1031
1033{
1034 ThreadPool pool(2);
1035 std::vector<std::string> inputs = {"hello", "world", "test"};
1036
1037 auto futures = pool.enqueue_bulk([](const std::string& s) {
1038 return s.size();
1039 }, inputs);
1040
1041 ASSERT_EQ(futures.size(), 3u);
1042 EXPECT_EQ(futures[0].get(), 5u);
1043 EXPECT_EQ(futures[1].get(), 5u);
1044 EXPECT_EQ(futures[2].get(), 4u);
1045}
1046
1048{
1049 ThreadPool pool(2);
1050 std::vector<int> empty;
1051
1052 auto futures = pool.enqueue_bulk([](int x) { return x; }, empty);
1053
1054 EXPECT_TRUE(futures.empty());
1055}
1056
1057// ============================================================================
1058// Default Pool Tests
1059// ============================================================================
1060
1062{
1063 auto& pool = Aleph::default_pool();
1064
1065 EXPECT_GT(pool.num_threads(), 0u);
1066 EXPECT_FALSE(pool.is_stopped());
1067}
1068
1070{
1071 auto future = Aleph::default_pool().enqueue([](int x) { return x * 2; }, 21);
1072
1073 EXPECT_EQ(future.get(), 42);
1074}
1075
1083
1084// ============================================================================
1085// Reference Argument Tests
1086// ============================================================================
1087
1088void increment_ref(int& x) { ++x; }
1089void add_to_ref(int& x, int amount) { x += amount; }
1090
1092{
1093 ThreadPool pool(2);
1094 int value = 10;
1095
1096 auto future = pool.enqueue(increment_ref, std::ref(value));
1097 future.get();
1098
1099 EXPECT_EQ(value, 11);
1100}
1101
1103{
1104 ThreadPool pool(2);
1105 int value = 100;
1106
1107 auto future = pool.enqueue(add_to_ref, std::ref(value), 50);
1108 future.get();
1109
1110 EXPECT_EQ(value, 150);
1111}
1112
1114{
1115 ThreadPool pool(2);
1116 const int value = 77;
1117
1118 auto future = pool.enqueue([](const int& x) { return x * 2; }, std::cref(value));
1119
1120 EXPECT_EQ(future.get(), 154);
1121}
1122
1124{
1125 ThreadPool pool(2);
1126 int value = 0;
1127
1128 auto future = pool.enqueue([&value](int x) { value = x; }, 42);
1129 future.get();
1130
1131 EXPECT_EQ(value, 42);
1132}
1133
1135{
1136 ThreadPool pool(2);
1138 calc.value = 30;
1139
1140 auto future = pool.enqueue(&Calculator::add, std::ref(calc), 12);
1141
1142 EXPECT_EQ(future.get(), 42);
1143}
1144
1145// ============================================================================
1146// WaitAll Tests
1147// ============================================================================
1148
1150{
1151 ThreadPool pool(2);
1152 std::atomic<int> counter{0};
1153
1154 for (int i = 0; i < 10; ++i)
1155 pool.enqueue_detached([&counter] {
1156 std::this_thread::sleep_for(10ms);
1157 ++counter;
1158 });
1159
1160 pool.wait_all();
1161
1162 EXPECT_EQ(counter.load(), 10);
1163}
1164
1166{
1167 ThreadPool pool(2);
1168
1169 // Should return immediately
1170 auto start = std::chrono::high_resolution_clock::now();
1171 pool.wait_all();
1172 auto end = std::chrono::high_resolution_clock::now();
1173
1174 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
1175 EXPECT_LT(duration.count(), 100); // Should be nearly instant
1176}
1177
1178// ============================================================================
1179// Stress Tests
1180// ============================================================================
1181
1183{
1184 ThreadPool pool(std::thread::hardware_concurrency());
1185 const int num_tasks = 100000;
1186 std::atomic<int> counter{0};
1187
1188 for (int i = 0; i < num_tasks; ++i)
1189 pool.enqueue_detached([&counter] { ++counter; });
1190
1191 pool.wait_all();
1192
1193 EXPECT_EQ(counter.load(), num_tasks);
1194}
1195
1197{
1198 ThreadPool pool(8);
1199 const int num_enqueuers = 16;
1200 const int tasks_per_enqueuer = 1000;
1201 std::atomic<int> counter{0};
1202
1203 std::vector<std::thread> enqueuers;
1204 for (int i = 0; i < num_enqueuers; ++i)
1205 {
1206 enqueuers.emplace_back([&pool, &counter, tasks_per_enqueuer] {
1207 for (int j = 0; j < tasks_per_enqueuer; ++j)
1208 pool.enqueue_detached([&counter] { ++counter; });
1209 });
1210 }
1211
1212 for (auto& t : enqueuers)
1213 t.join();
1214
1215 pool.wait_all();
1216
1218}
1219
1221{
1222 ThreadPool pool(4);
1223 std::atomic<int> fast_count{0};
1224 std::atomic<int> slow_count{0};
1225
1226 // Mix of fast and slow tasks
1227 for (int i = 0; i < 100; ++i)
1228 {
1229 pool.enqueue_detached([&fast_count] { ++fast_count; });
1231 std::this_thread::sleep_for(1ms);
1232 ++slow_count;
1233 });
1234 }
1235
1236 pool.wait_all();
1237
1238 EXPECT_EQ(fast_count.load(), 100);
1239 EXPECT_EQ(slow_count.load(), 100);
1240}
1241
1242// ============================================================================
1243// Edge Cases
1244// ============================================================================
1245
1247{
1248 ThreadPool pool(1);
1249 std::vector<int> results;
1250 std::mutex mtx;
1251
1252 // Tasks should execute sequentially
1253 for (int i = 0; i < 10; ++i)
1254 {
1255 pool.enqueue_detached([&results, &mtx, i] {
1256 std::lock_guard<std::mutex> lock(mtx);
1257 results.push_back(i);
1258 });
1259 }
1260
1261 pool.wait_all();
1262
1263 EXPECT_EQ(results.size(), 10u);
1264}
1265
1267{
1268 ThreadPool pool(4);
1269
1270 for (int i = 0; i < 1000; ++i)
1271 pool.enqueue_detached([] {});
1272
1273 pool.wait_all();
1274
1275 EXPECT_TRUE(pool.is_idle());
1276}
1277
1279{
1280 ThreadPool pool(2);
1281
1282 auto future = pool.enqueue([] {
1283 std::vector<int> large(10000);
1284 std::iota(large.begin(), large.end(), 0);
1285 return large;
1286 });
1287
1288 auto result = future.get();
1289 EXPECT_EQ(result.size(), 10000u);
1290 EXPECT_EQ(result[9999], 9999);
1291}
1292
1294{
1295 ThreadPool pool(2);
1296
1297 auto future = pool.enqueue([&pool] {
1298 // Enqueue from inside a task
1299 return pool.enqueue([] { return 42; });
1300 });
1301
1302 auto inner_future = future.get();
1303 EXPECT_EQ(inner_future.get(), 42);
1304}
1305
1307{
1308 ThreadPool pool(4);
1309 std::atomic<int> counter{0};
1310
1311 std::function<void(int)> recursive = [&pool, &counter, &recursive](int depth) {
1312 ++counter;
1313 if (depth > 0)
1314 pool.enqueue_detached(recursive, depth - 1);
1315 };
1316
1317 pool.enqueue_detached(recursive, 10);
1318
1319 // Wait a bit for all recursive tasks
1320 std::this_thread::sleep_for(100ms);
1321 pool.wait_all();
1322
1323 EXPECT_EQ(counter.load(), 11); // 0 to 10 inclusive
1324}
1325
1327{
1328 ThreadPool pool(4);
1329
1330 auto f1 = pool.enqueue([]() -> int { throw std::runtime_error("runtime"); });
1331 auto f2 = pool.enqueue([]() -> int { throw std::logic_error("logic"); });
1332 auto f3 = pool.enqueue([]() -> int { throw std::out_of_range("range"); });
1333 auto f4 = pool.enqueue([] { return 42; }); // Normal task
1334
1335 EXPECT_THROW(f1.get(), std::runtime_error);
1336 EXPECT_THROW(f2.get(), std::logic_error);
1337 EXPECT_THROW(f3.get(), std::out_of_range);
1338 EXPECT_EQ(f4.get(), 42);
1339}
1340
1342{
1343 ThreadPool pool(4);
1344 std::atomic<int> exception_count{0};
1345
1346 std::vector<std::future<int>> futures;
1347 for (int i = 0; i < 100; ++i)
1348 {
1349 futures.push_back(pool.enqueue([]() -> int {
1350 throw std::runtime_error("test");
1351 }));
1352 }
1353
1354 for (auto& f : futures)
1355 {
1356 try
1357 {
1358 f.get();
1359 }
1360 catch (const std::runtime_error&)
1361 {
1363 }
1364 }
1365
1366 EXPECT_EQ(exception_count.load(), 100);
1367}
1368
1369// ============================================================================
1370// Shutdown and Lifecycle Tests
1371// ============================================================================
1372
1374{
1375 ThreadPool pool(2);
1376 std::atomic<int> started{0};
1377 std::atomic<int> finished{0};
1378 std::atomic<bool> can_finish{false};
1379
1380 // Start some blocking tasks
1381 for (int i = 0; i < 4; ++i)
1382 {
1384 ++started;
1385 while (!can_finish)
1386 std::this_thread::sleep_for(1ms);
1387 ++finished;
1388 });
1389 }
1390
1391 // Wait for tasks to start
1392 while (started < 2)
1393 std::this_thread::sleep_for(1ms);
1394
1395 // Allow tasks to finish
1396 can_finish = true;
1397
1398 // Shutdown should wait for all tasks
1399 pool.shutdown();
1400
1401 EXPECT_EQ(finished.load(), 4);
1402}
1403
1405{
1406 std::atomic<int> counter{0};
1407
1408 {
1409 ThreadPool pool(2);
1410 for (int i = 0; i < 10; ++i)
1411 {
1412 pool.enqueue_detached([&counter] {
1413 std::this_thread::sleep_for(10ms);
1414 ++counter;
1415 });
1416 }
1417 // Destructor should wait
1418 }
1419
1420 EXPECT_EQ(counter.load(), 10);
1421}
1422
1424{
1425 ThreadPool pool(2);
1426 std::atomic<int> counter{0};
1427 std::atomic<bool> keep_running{true};
1428 std::atomic<bool> resize_in_progress{false};
1429
1430 // Start continuous work
1431 std::thread producer([&pool, &counter, &keep_running, &resize_in_progress] {
1432 while (keep_running)
1433 {
1434 // Skip enqueue during resize to avoid race
1435 if (!resize_in_progress)
1436 {
1437 try
1438 {
1439 pool.enqueue_detached([&counter] { ++counter; });
1440 }
1441 catch (const std::runtime_error&)
1442 {
1443 // Pool might be stopped during resize, ignore
1444 }
1445 }
1446 std::this_thread::sleep_for(1ms);
1447 }
1448 });
1449
1450 // Resize while under load
1451 std::this_thread::sleep_for(20ms);
1452 resize_in_progress = true;
1453 pool.resize(8);
1454 resize_in_progress = false;
1455 std::this_thread::sleep_for(20ms);
1456 resize_in_progress = true;
1457 pool.resize(2);
1458 resize_in_progress = false;
1459 std::this_thread::sleep_for(20ms);
1460
1461 keep_running = false;
1462 producer.join();
1463 pool.wait_all();
1464
1465 EXPECT_GT(counter.load(), 0);
1466}
1467
1468// ============================================================================
1469// Data Integrity Tests
1470// ============================================================================
1471
1473{
1474 ThreadPool pool(8);
1475 const int num_tasks = 10000;
1476 std::vector<std::future<int>> futures;
1477 futures.reserve(num_tasks);
1478
1479 // Each task returns its index
1480 for (int i = 0; i < num_tasks; ++i)
1481 futures.push_back(pool.enqueue([i] { return i; }));
1482
1483 // Verify all results are correct
1484 for (int i = 0; i < num_tasks; ++i)
1485 EXPECT_EQ(futures[i].get(), i);
1486}
1487
1489{
1490 ThreadPool pool(8);
1491 std::atomic<long long> sum{0};
1492 const int num_tasks = 10000;
1493
1494 for (int i = 1; i <= num_tasks; ++i)
1495 pool.enqueue_detached([&sum, i] { sum += i; });
1496
1497 pool.wait_all();
1498
1499 // Sum of 1 to n = n*(n+1)/2
1500 long long expected = static_cast<long long>(num_tasks) * (num_tasks + 1) / 2;
1501 EXPECT_EQ(sum.load(), expected);
1502}
1503
1505{
1506 ThreadPool pool(8);
1507 std::vector<int> shared_vec;
1508 std::mutex mtx;
1509 const int num_tasks = 1000;
1510
1511 for (int i = 0; i < num_tasks; ++i)
1512 {
1513 pool.enqueue_detached([&shared_vec, &mtx, i] {
1514 std::lock_guard<std::mutex> lock(mtx);
1515 shared_vec.push_back(i);
1516 });
1517 }
1518
1519 pool.wait_all();
1520
1521 EXPECT_EQ(shared_vec.size(), static_cast<size_t>(num_tasks));
1522
1523 // Sort and verify all values present
1524 std::sort(shared_vec.begin(), shared_vec.end());
1525 for (int i = 0; i < num_tasks; ++i)
1526 EXPECT_EQ(shared_vec[i], i);
1527}
1528
1529// ============================================================================
1530// Timing and Performance Tests
1531// ============================================================================
1532
1534{
1535 const int num_tasks = 100;
1536 const auto work_duration = 10ms;
1537
1538 auto do_work = [work_duration] {
1539 std::this_thread::sleep_for(work_duration);
1540 };
1541
1542 // Sequential baseline (single thread)
1544 auto start_single = std::chrono::high_resolution_clock::now();
1545 for (int i = 0; i < num_tasks; ++i)
1546 pool_single.enqueue_detached(do_work);
1547 pool_single.wait_all();
1548 auto end_single = std::chrono::high_resolution_clock::now();
1549 auto duration_single = std::chrono::duration_cast<std::chrono::milliseconds>(end_single - start_single);
1550
1551 // Parallel (multiple threads)
1552 size_t num_threads = std::min(8u, std::thread::hardware_concurrency());
1553 ThreadPool pool_parallel(num_threads);
1554 auto start_parallel = std::chrono::high_resolution_clock::now();
1555 for (int i = 0; i < num_tasks; ++i)
1556 pool_parallel.enqueue_detached(do_work);
1557 pool_parallel.wait_all();
1558 auto end_parallel = std::chrono::high_resolution_clock::now();
1559 auto duration_parallel = std::chrono::duration_cast<std::chrono::milliseconds>(end_parallel - start_parallel);
1560
1561 // Parallel should be significantly faster
1562 double speedup = static_cast<double>(duration_single.count()) / duration_parallel.count();
1563 EXPECT_GT(speedup, 1.5); // At least 1.5x speedup
1564}
1565
1567{
1568 ThreadPool pool(4);
1569 const int num_tasks = 1000;
1570
1571 auto start = std::chrono::high_resolution_clock::now();
1572
1573 std::vector<std::future<int>> futures;
1574 futures.reserve(num_tasks);
1575 for (int i = 0; i < num_tasks; ++i)
1576 futures.push_back(pool.enqueue([i] { return i; }));
1577
1578 for (auto& f : futures)
1579 f.get();
1580
1581 auto end = std::chrono::high_resolution_clock::now();
1582 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
1583
1584 // Should complete 1000 trivial tasks in less than 100ms
1585 EXPECT_LT(duration.count(), 100000);
1586}
1587
1588// ============================================================================
1589// Complex Callable Tests
1590// ============================================================================
1591
1593{
1594 ThreadPool pool(2);
1595
1596 auto outer = [](int x) {
1597 return [x](int y) {
1598 return x + y;
1599 };
1600 };
1601
1602 auto future = pool.enqueue([outer] {
1603 auto inner = outer(10);
1604 return inner(32);
1605 });
1606
1607 EXPECT_EQ(future.get(), 42);
1608}
1609
1611{
1612 ThreadPool pool(2);
1613
1614 auto add = [](int a, int b, int c) { return a + b + c; };
1615 auto bound = std::bind(add, 10, std::placeholders::_1, 20);
1616
1617 auto future = pool.enqueue(bound, 12);
1618
1619 EXPECT_EQ(future.get(), 42);
1620}
1621
1623{
1624 ThreadPool pool(2);
1625
1626 // Generic lambda (C++14+)
1627 auto generic_add = [](auto a, auto b) { return a + b; };
1628
1629 auto future_int = pool.enqueue(generic_add, 20, 22);
1630 auto future_double = pool.enqueue(generic_add, 20.5, 21.5);
1631
1632 EXPECT_EQ(future_int.get(), 42);
1633 EXPECT_DOUBLE_EQ(future_double.get(), 42.0);
1634}
1635
1637{
1638 ThreadPool pool(2);
1639
1640 std::vector<int> large_data(1000, 42);
1641
1642 auto future = pool.enqueue([data = std::move(large_data)]() mutable {
1643 return std::accumulate(data.begin(), data.end(), 0);
1644 });
1645
1646 EXPECT_EQ(future.get(), 42000);
1647}
1648
1649// ============================================================================
1650// Edge Case: Very Many Arguments
1651// ============================================================================
1652
1654{
1655 ThreadPool pool(2);
1656
1657 auto sum_all = [](int a, int b, int c, int d, int e,
1658 int f, int g, int h, int i, int j) {
1659 return a + b + c + d + e + f + g + h + i + j;
1660 };
1661
1662 auto future = pool.enqueue(sum_all, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
1663
1664 EXPECT_EQ(future.get(), 55);
1665}
1666
1667// ============================================================================
1668// Producer-Consumer Pattern
1669// ============================================================================
1670
1672{
1673 ThreadPool pool(4);
1674 std::queue<int> work_queue;
1675 std::mutex queue_mutex;
1676 std::condition_variable cv;
1677 std::atomic<bool> done{false};
1678 std::atomic<int> consumed{0};
1679
1680 // Producer task
1681 pool.enqueue_detached([&work_queue, &queue_mutex, &cv, &done] {
1682 for (int i = 0; i < 100; ++i)
1683 {
1684 {
1685 std::lock_guard<std::mutex> lock(queue_mutex);
1686 work_queue.push(i);
1687 }
1688 cv.notify_one();
1689 std::this_thread::sleep_for(1ms);
1690 }
1691 done = true;
1692 cv.notify_all();
1693 });
1694
1695 // Consumer tasks
1696 for (int c = 0; c < 3; ++c)
1697 {
1698 pool.enqueue_detached([&work_queue, &queue_mutex, &cv, &done, &consumed] {
1699 while (true)
1700 {
1701 {
1702 std::unique_lock<std::mutex> lock(queue_mutex);
1703 cv.wait_for(lock, 10ms, [&] { return !work_queue.empty() || done; });
1704 if (work_queue.empty() && done)
1705 return;
1706 if (work_queue.empty())
1707 continue;
1708 work_queue.pop(); // Consume the item
1709 }
1710 ++consumed;
1711 }
1712 });
1713 }
1714
1715 pool.wait_all();
1716
1717 EXPECT_EQ(consumed.load(), 100);
1718}
1719
1720// ============================================================================
1721// Bounded Queue Tests
1722// ============================================================================
1723
1725{
1726 ThreadPool pool(2);
1727
1728 pool.set_queue_limits(100, 500);
1729 auto [soft, hard] = pool.get_queue_limits();
1730
1731 EXPECT_EQ(soft, 100u);
1732 EXPECT_EQ(hard, 500u);
1733}
1734
1736{
1737 ThreadPool pool(2);
1738
1739 pool.set_queue_limits(100); // hard should default to 10x soft
1740 auto [soft, hard] = pool.get_queue_limits();
1741
1742 EXPECT_EQ(soft, 100u);
1743 EXPECT_EQ(hard, 1000u); // 10 * 100
1744}
1745
1747{
1748 ThreadPool pool(4);
1749 pool.set_queue_limits(100, 1000);
1750
1751 // Should not block when below soft limit
1752 std::vector<std::future<int>> futures;
1753 for (int i = 0; i < 50; ++i)
1754 futures.push_back(pool.enqueue_bounded([i] { return i; }));
1755
1756 for (int i = 0; i < 50; ++i)
1757 EXPECT_EQ(futures[i].get(), i);
1758}
1759
1761{
1762 ThreadPool pool(1); // Single worker
1763 pool.set_queue_limits(5, 100);
1764
1765 std::atomic<bool> worker_blocked{true};
1766 std::atomic<int> enqueued{0};
1767
1768 // Block the only worker
1770 while (worker_blocked)
1771 std::this_thread::sleep_for(1ms);
1772 });
1773
1774 // Enqueue up to soft limit (should succeed immediately)
1775 for (int i = 0; i < 5; ++i)
1776 {
1778 }
1779
1780 EXPECT_EQ(pool.pending_tasks(), 5u);
1781
1782 // Next enqueue should block
1783 std::atomic<bool> enqueue_completed{false};
1784 std::thread blocker([&pool, &enqueue_completed] {
1785 pool.enqueue_bounded_detached([] {});
1786 enqueue_completed = true;
1787 });
1788
1789 // Give it time to try to enqueue
1790 std::this_thread::sleep_for(50ms);
1791 EXPECT_FALSE(enqueue_completed); // Should still be blocked
1792
1793 // Release the worker
1794 worker_blocked = false;
1795
1796 // Now it should complete
1797 blocker.join();
1798 pool.wait_all();
1799
1801}
1802
1804{
1805 ThreadPool pool(1);
1806 pool.set_queue_limits(10, 15);
1807
1808 std::atomic<bool> worker_blocked{true};
1809
1810 // Block the worker
1812 while (worker_blocked)
1813 std::this_thread::sleep_for(1ms);
1814 });
1815
1816 // Fill up to hard limit using regular enqueue (bypasses limits)
1817 for (int i = 0; i < 15; ++i)
1818 pool.enqueue_detached([] {});
1819
1820 // Now bounded enqueue should throw
1822
1823 // Clean up
1824 worker_blocked = false;
1825 pool.wait_all();
1826}
1827
1829{
1830 ThreadPool pool(1);
1831 pool.set_queue_limits(5, 10);
1832
1833 std::atomic<bool> worker_blocked{true};
1835 while (worker_blocked)
1836 std::this_thread::sleep_for(1ms);
1837 });
1838
1839 // Fill queue beyond hard limit
1840 for (int i = 0; i < 10; ++i)
1841 pool.enqueue_detached([] {});
1842
1843 try
1844 {
1845 pool.enqueue_bounded([] { return 0; });
1846 FAIL() << "Expected queue_overflow_error";
1847 }
1848 catch (const Aleph::queue_overflow_error& e)
1849 {
1850 EXPECT_GE(e.current_size(), 10u);
1851 EXPECT_EQ(e.hard_limit(), 10u);
1852 EXPECT_NE(std::string(e.what()).find("overflow"), std::string::npos);
1853 }
1854
1855 worker_blocked = false;
1856 pool.wait_all();
1857}
1858
1860{
1861 ThreadPool pool(1);
1862 pool.set_queue_limits(3, 100);
1863
1864 std::atomic<bool> worker_blocked{true};
1866 while (worker_blocked)
1867 std::this_thread::sleep_for(1ms);
1868 });
1869
1870 // Fill to soft limit
1871 for (int i = 0; i < 3; ++i)
1872 pool.enqueue_bounded_detached([] {});
1873
1874 // Next should block
1875 std::atomic<bool> done{false};
1876 std::thread t([&pool, &done] {
1877 pool.enqueue_bounded_detached([] {});
1878 done = true;
1879 });
1880
1881 std::this_thread::sleep_for(30ms);
1883
1884 worker_blocked = false;
1885 t.join();
1886 pool.wait_all();
1887
1889}
1890
1892{
1893 ThreadPool pool(2);
1894 pool.set_queue_limits(100, 1000);
1895 pool.shutdown();
1896
1897 EXPECT_THROW(pool.enqueue_bounded([] { return 0; }), std::runtime_error);
1898 EXPECT_THROW(pool.enqueue_bounded_detached([] {}), std::runtime_error);
1899}
1900
1902{
1903 ThreadPool pool(2);
1904 pool.set_queue_limits(10, 100);
1905
1906 std::atomic<int> produced{0};
1907 std::atomic<int> consumed{0};
1908 std::atomic<bool> stop_producing{false};
1909
1910 // Consumer tasks (slow)
1911 auto consume = [&consumed] {
1912 std::this_thread::sleep_for(5ms);
1913 ++consumed;
1914 };
1915
1916 // Producer thread (fast)
1917 std::thread producer([&] {
1918 while (!stop_producing && produced < 50)
1919 {
1920 try
1921 {
1923 ++produced;
1924 }
1925 catch (const Aleph::queue_overflow_error&)
1926 {
1927 // Hard limit reached, stop
1928 break;
1929 }
1930 }
1931 });
1932
1933 // Let it run for a bit
1934 std::this_thread::sleep_for(200ms);
1935 stop_producing = true;
1936 producer.join();
1937 pool.wait_all();
1938
1939 // Producer should have been slowed down by backpressure
1940 // (can't produce much faster than consumers can consume)
1941 EXPECT_EQ(consumed.load(), produced.load());
1942}
1943
1945{
1946 ThreadPool pool(4);
1947 pool.set_queue_limits(50, 200);
1948
1949 std::atomic<int> counter{0};
1950 const int num_producers = 8;
1951 const int tasks_per_producer = 100;
1952
1953 std::vector<std::thread> producers;
1954 for (int p = 0; p < num_producers; ++p)
1955 {
1956 producers.emplace_back([&pool, &counter, tasks_per_producer] {
1957 for (int i = 0; i < tasks_per_producer; ++i)
1958 {
1959 try
1960 {
1962 }
1963 catch (const Aleph::queue_overflow_error&)
1964 {
1965 // Retry after small delay
1966 std::this_thread::sleep_for(1ms);
1967 --i;
1968 }
1969 }
1970 });
1971 }
1972
1973 for (auto& t : producers)
1974 t.join();
1975
1976 pool.wait_all();
1977
1979}
1980
1982{
1983 ThreadPool pool(4);
1984 pool.set_queue_limits(20, 100);
1985
1986 std::vector<std::future<int>> futures;
1987 for (int i = 0; i < 100; ++i)
1988 futures.push_back(pool.enqueue_bounded([i] { return i * i; }));
1989
1990 for (int i = 0; i < 100; ++i)
1991 EXPECT_EQ(futures[i].get(), i * i);
1992}
1993
1995{
1996 ThreadPool pool(2);
1997 pool.set_queue_limits(10, 100);
1998
1999 auto future = pool.enqueue_bounded([]() -> int {
2000 throw std::runtime_error("test");
2001 });
2002
2003 EXPECT_THROW(future.get(), std::runtime_error);
2004}
2005
2007{
2008 ThreadPool pool(4);
2009 pool.set_queue_limits(10, 50);
2010
2011 std::atomic<int> counter{0};
2012
2013 // Mix bounded and unbounded
2014 for (int i = 0; i < 100; ++i)
2015 {
2016 if (i % 2 == 0)
2017 pool.enqueue_detached([&counter] { ++counter; });
2018 else
2019 {
2020 try
2021 {
2023 }
2024 catch (const Aleph::queue_overflow_error&)
2025 {
2026 // Unbounded doesn't care about limits
2027 pool.enqueue_detached([&counter] { ++counter; });
2028 }
2029 }
2030 }
2031
2032 pool.wait_all();
2033 EXPECT_EQ(counter.load(), 100);
2034}
2035
2037{
2038 ThreadPool pool(8);
2039 pool.set_queue_limits(100, 500);
2040
2041 std::atomic<int> counter{0};
2042 const int num_tasks = 10000;
2043
2044 for (int i = 0; i < num_tasks; ++i)
2045 {
2046 // Keep trying until successful
2047 bool success = false;
2048 while (!success)
2049 {
2050 try
2051 {
2053 success = true;
2054 }
2055 catch (const Aleph::queue_overflow_error&)
2056 {
2057 std::this_thread::sleep_for(100us);
2058 }
2059 }
2060 }
2061
2062 pool.wait_all();
2063 EXPECT_EQ(counter.load(), num_tasks);
2064}
2065
2066// Note: soft_limit=0 is not a practical use case and has edge case issues,
2067// so we don't test it. Use soft_limit >= 1 for bounded queues.
2068
2070{
2071 ThreadPool pool(2);
2072
2073 auto [soft, hard] = pool.get_queue_limits();
2074
2075 EXPECT_EQ(soft, std::numeric_limits<size_t>::max());
2076 EXPECT_EQ(hard, std::numeric_limits<size_t>::max());
2077}
2078
2080{
2081 ThreadPool pool(2);
2082 pool.set_queue_limits(10, 100);
2083
2084 auto future = pool.enqueue_bounded([](int a, int b, int c) {
2085 return a + b + c;
2086 }, 10, 20, 12);
2087
2088 EXPECT_EQ(future.get(), 42);
2089}
2090
2092{
2093 ThreadPool pool(2);
2094 pool.set_queue_limits(10, 100);
2095
2096 int value = 10;
2097 auto future = pool.enqueue_bounded([](int& x) { x *= 2; return x; },
2098 std::ref(value));
2099
2100 EXPECT_EQ(future.get(), 20);
2101 EXPECT_EQ(value, 20);
2102}
2103
2104// ============================================================================
2105// Try Enqueue Tests
2106// ============================================================================
2107
2109{
2110 ThreadPool pool(2);
2111 pool.set_queue_limits(10, 100);
2112
2113 auto result = pool.try_enqueue([] { return 42; });
2114
2115 ASSERT_TRUE(result.has_value());
2116 EXPECT_EQ(result->get(), 42);
2117}
2118
2120{
2121 ThreadPool pool(1);
2122 pool.set_queue_limits(5, 100);
2123
2124 std::atomic<bool> worker_blocked{true};
2126 while (worker_blocked)
2127 std::this_thread::sleep_for(1ms);
2128 });
2129
2130 // Fill to soft limit
2131 for (int i = 0; i < 5; ++i)
2132 pool.enqueue_detached([] {});
2133
2134 // try_enqueue should return nullopt
2135 auto result = pool.try_enqueue([] { return 0; });
2136 EXPECT_FALSE(result.has_value());
2137
2138 worker_blocked = false;
2139 pool.wait_all();
2140}
2141
2143{
2144 ThreadPool pool(2);
2145 pool.set_queue_limits(10, 100);
2146 std::atomic<int> counter{0};
2147
2148 bool success = pool.try_enqueue_detached([&counter] { ++counter; });
2149
2151 pool.wait_all();
2152 EXPECT_EQ(counter.load(), 1);
2153}
2154
2156{
2157 ThreadPool pool(1);
2158 pool.set_queue_limits(3, 100);
2159
2160 std::atomic<bool> worker_blocked{true};
2162 while (worker_blocked)
2163 std::this_thread::sleep_for(1ms);
2164 });
2165
2166 // Fill to soft limit
2167 for (int i = 0; i < 3; ++i)
2168 pool.enqueue_detached([] {});
2169
2170 // try_enqueue_detached should return false
2171 bool success = pool.try_enqueue_detached([] {});
2173
2174 worker_blocked = false;
2175 pool.wait_all();
2176}
2177
2179{
2180 ThreadPool pool(2);
2181 pool.shutdown();
2182
2183 EXPECT_THROW(pool.try_enqueue([] { return 0; }), std::runtime_error);
2184 EXPECT_THROW(pool.try_enqueue_detached([] {}), std::runtime_error);
2185}
2186
2188{
2189 ThreadPool pool(2);
2190 pool.set_queue_limits(10, 100);
2191
2192 auto result = pool.try_enqueue([](int a, int b) { return a + b; }, 20, 22);
2193
2194 ASSERT_TRUE(result.has_value());
2195 EXPECT_EQ(result->get(), 42);
2196}
2197
2199{
2200 ThreadPool pool(1);
2201 pool.set_queue_limits(2, 100);
2202
2203 std::atomic<bool> worker_blocked{true};
2205 while (worker_blocked)
2206 std::this_thread::sleep_for(1ms);
2207 });
2208
2209 // Fill queue
2210 pool.enqueue_detached([] {});
2211 pool.enqueue_detached([] {});
2212
2213 // Measure time for try_enqueue when queue is full
2214 auto start = std::chrono::high_resolution_clock::now();
2215 for (int i = 0; i < 1000; ++i)
2216 pool.try_enqueue([] { return 0; }); // Should all fail fast
2217 auto end = std::chrono::high_resolution_clock::now();
2218
2219 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
2220
2221 // 1000 non-blocking calls should complete in < 10ms
2222 EXPECT_LT(duration.count(), 10000);
2223
2224 worker_blocked = false;
2225 pool.wait_all();
2226}
2227
2228// ============================================================================
2229// Benchmark Tests
2230// ============================================================================
2231
2233{
2234 const int num_tasks = 1000;
2235 const auto work_duration = 100us;
2236
2237 auto do_work = [work_duration] {
2238 auto start = std::chrono::high_resolution_clock::now();
2239 while (std::chrono::high_resolution_clock::now() - start < work_duration)
2240 ; // Busy wait
2241 return 1;
2242 };
2243
2244 // Sequential baseline
2245 auto seq_start = std::chrono::high_resolution_clock::now();
2246 int seq_sum = 0;
2247 for (int i = 0; i < num_tasks; ++i)
2248 seq_sum += do_work();
2249 auto seq_end = std::chrono::high_resolution_clock::now();
2250 auto seq_duration = std::chrono::duration_cast<std::chrono::milliseconds>(seq_end - seq_start);
2251
2252 // ThreadPool parallel
2253 size_t num_threads = std::thread::hardware_concurrency();
2254 ThreadPool pool(num_threads);
2255
2256 auto pool_start = std::chrono::high_resolution_clock::now();
2257 std::vector<std::future<int>> futures;
2258 futures.reserve(num_tasks);
2259 for (int i = 0; i < num_tasks; ++i)
2260 futures.push_back(pool.enqueue(do_work));
2261
2262 int pool_sum = 0;
2263 for (auto& f : futures)
2264 pool_sum += f.get();
2265 auto pool_end = std::chrono::high_resolution_clock::now();
2266 auto pool_duration = std::chrono::duration_cast<std::chrono::milliseconds>(pool_end - pool_start);
2267
2269
2270 double speedup = static_cast<double>(seq_duration.count()) / pool_duration.count();
2271
2272 std::cout << "\n=== Benchmark: ThreadPool vs Sequential ===\n";
2273 std::cout << "Tasks: " << num_tasks << ", Work per task: 100μs\n";
2274 std::cout << "Threads: " << num_threads << "\n";
2275 std::cout << "Sequential: " << seq_duration.count() << " ms\n";
2276 std::cout << "ThreadPool: " << pool_duration.count() << " ms\n";
2277 std::cout << "Speedup: " << std::fixed << std::setprecision(2) << speedup << "x\n";
2278 std::cout << "============================================\n\n";
2279
2280 // Should achieve at least some speedup with multiple cores
2281 if (num_threads > 1)
2282 EXPECT_GT(speedup, 1.2);
2283}
2284
2286{
2287 ThreadPool pool(4);
2288 const int num_tasks = 100000;
2289
2290 // Measure enqueue overhead (tasks do nothing)
2291 auto start = std::chrono::high_resolution_clock::now();
2292 for (int i = 0; i < num_tasks; ++i)
2293 pool.enqueue_detached([] {});
2294 pool.wait_all();
2295 auto end = std::chrono::high_resolution_clock::now();
2296
2297 auto total_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
2298 double ns_per_task = static_cast<double>(total_ns) / num_tasks;
2299
2300 std::cout << "\n=== Benchmark: Enqueue Overhead ===\n";
2301 std::cout << "Tasks: " << num_tasks << "\n";
2302 std::cout << "Total time: " << total_ns / 1000000.0 << " ms\n";
2303 std::cout << "Per task: " << std::fixed << std::setprecision(0) << ns_per_task << " ns\n";
2304 std::cout << "Throughput: " << std::fixed << std::setprecision(0)
2305 << (num_tasks * 1e9 / total_ns) << " tasks/sec\n";
2306 std::cout << "===================================\n\n";
2307
2308 // Should be able to enqueue at least 50k tasks/sec
2309 // (relaxed for slower CI environments)
2310 EXPECT_LT(ns_per_task, 25000); // < 25μs per task
2311}
2312
2314{
2315 const int num_tasks = 500; // Fewer tasks because async is slow
2316
2317 auto compute = [](int x) -> long long {
2318 long long sum = 0;
2319 for (int i = 0; i < 1000; ++i)
2320 sum += static_cast<long long>(i) * x;
2321 return sum;
2322 };
2323
2324 // std::async
2325 auto async_start = std::chrono::high_resolution_clock::now();
2326 std::vector<std::future<long long>> async_futures;
2327 async_futures.reserve(num_tasks);
2328 for (int i = 0; i < num_tasks; ++i)
2329 async_futures.push_back(std::async(std::launch::async, compute, i));
2330
2331 long long async_sum = 0;
2332 for (auto& f : async_futures)
2333 async_sum += f.get();
2334 auto async_end = std::chrono::high_resolution_clock::now();
2335 auto async_duration = std::chrono::duration_cast<std::chrono::microseconds>(async_end - async_start);
2336
2337 // ThreadPool
2338 ThreadPool pool(std::thread::hardware_concurrency());
2339
2340 auto pool_start = std::chrono::high_resolution_clock::now();
2341 std::vector<std::future<long long>> pool_futures;
2342 pool_futures.reserve(num_tasks);
2343 for (int i = 0; i < num_tasks; ++i)
2344 pool_futures.push_back(pool.enqueue(compute, i));
2345
2346 long long pool_sum = 0;
2347 for (auto& f : pool_futures)
2348 pool_sum += f.get();
2349 auto pool_end = std::chrono::high_resolution_clock::now();
2350 auto pool_duration = std::chrono::duration_cast<std::chrono::microseconds>(pool_end - pool_start);
2351
2353
2354 double speedup = static_cast<double>(async_duration.count()) / pool_duration.count();
2355
2356 std::cout << "\n=== Benchmark: ThreadPool vs std::async ===\n";
2357 std::cout << "Tasks: " << num_tasks << "\n";
2358 std::cout << "std::async: " << async_duration.count() / 1000.0 << " ms\n";
2359 std::cout << "ThreadPool: " << pool_duration.count() / 1000.0 << " ms\n";
2360 std::cout << "Speedup: " << std::fixed << std::setprecision(2) << speedup << "x\n";
2361 std::cout << "============================================\n\n";
2362
2363 // ThreadPool should be faster than creating new threads each time
2364 EXPECT_GT(speedup, 1.0);
2365}
2366
2367// =============================================================================
2368// New Features Tests
2369// =============================================================================
2370
2371// --- Statistics Tests ---
2372
2374{
2375 ThreadPool pool(2);
2376 auto stats = pool.get_stats();
2377
2378 EXPECT_EQ(stats.tasks_completed, 0u);
2379 EXPECT_EQ(stats.tasks_failed, 0u);
2380 EXPECT_EQ(stats.current_queue_size, 0u);
2381 EXPECT_EQ(stats.current_active, 0u);
2382 EXPECT_EQ(stats.num_workers, 2u);
2383 EXPECT_EQ(stats.peak_queue_size, 0u);
2384 EXPECT_EQ(stats.total_processed(), 0u);
2385}
2386
2388{
2389 ThreadPool pool(2);
2390
2391 std::vector<std::future<int>> futures;
2392 for (int i = 0; i < 10; ++i)
2393 futures.push_back(pool.enqueue([] { return 42; }));
2394
2395 for (auto& f : futures)
2396 f.get();
2397
2398 pool.wait_all();
2399
2400 auto stats = pool.get_stats();
2401 EXPECT_EQ(stats.tasks_completed, 10u);
2402 EXPECT_EQ(stats.total_processed(), 10u);
2403}
2404
2406{
2407 ThreadPool pool(1);
2408
2409 // Block the worker
2410 std::promise<void> blocker;
2411 pool.enqueue([&blocker] { blocker.get_future().wait(); });
2412
2413 // Queue up several tasks
2414 for (int i = 0; i < 5; ++i)
2415 pool.enqueue_detached([] {});
2416
2417 std::this_thread::sleep_for(std::chrono::milliseconds(10));
2418
2419 auto stats = pool.get_stats();
2420 EXPECT_GE(stats.peak_queue_size, 5u);
2421
2422 // Release worker
2423 blocker.set_value();
2424 pool.wait_all();
2425}
2426
2428{
2429 ThreadPool pool(2);
2430
2431 pool.enqueue([] { return 1; }).get();
2432 // Wait for stats to be updated (tasks_completed incremented after future is set)
2433 pool.wait_all();
2434
2435 auto stats1 = pool.get_stats();
2436 EXPECT_GE(stats1.tasks_completed, 1u);
2437
2438 pool.reset_stats();
2439
2440 auto stats2 = pool.get_stats();
2441 EXPECT_EQ(stats2.tasks_completed, 0u);
2442 EXPECT_EQ(stats2.peak_queue_size, 0u);
2443}
2444
2445// --- Exception Callback Tests ---
2446
2448{
2449 ThreadPool pool(2);
2450
2451 std::atomic<bool> callback_called{false};
2452 std::atomic<bool> correct_exception{false};
2453
2454 pool.set_exception_callback([&](std::exception_ptr ep) {
2455 callback_called = true;
2456 try {
2457 std::rethrow_exception(ep);
2458 } catch (const std::runtime_error& e) {
2459 correct_exception = (std::string(e.what()) == "test error");
2460 } catch (...) {
2461 correct_exception = false;
2462 }
2463 });
2464
2465 pool.enqueue_detached([] {
2466 throw std::runtime_error("test error");
2467 });
2468
2469 pool.wait_all();
2470 std::this_thread::sleep_for(std::chrono::milliseconds(50));
2471
2474}
2475
2477{
2478 ThreadPool pool(2);
2479
2480 std::atomic<int> callback_count{0};
2481
2482 pool.set_exception_callback([&](std::exception_ptr) {
2484 });
2485
2486 pool.enqueue_detached([] { throw std::runtime_error("error"); });
2487 pool.wait_all();
2488 std::this_thread::sleep_for(std::chrono::milliseconds(20));
2489
2490 EXPECT_EQ(callback_count.load(), 1);
2491
2492 // Clear callback
2493 pool.set_exception_callback(nullptr);
2494
2495 pool.enqueue_detached([] { throw std::runtime_error("error"); });
2496 pool.wait_all();
2497 std::this_thread::sleep_for(std::chrono::milliseconds(20));
2498
2499 // Count should still be 1
2500 EXPECT_EQ(callback_count.load(), 1);
2501}
2502
2504{
2505 ThreadPool pool(2);
2506
2507 pool.enqueue_detached([] { throw std::runtime_error("fail"); });
2508 pool.enqueue_detached([] { throw std::logic_error("fail"); });
2509 pool.enqueue_detached([] { /* success */ });
2510
2511 pool.wait_all();
2512 std::this_thread::sleep_for(std::chrono::milliseconds(20));
2513
2514 auto stats = pool.get_stats();
2515 EXPECT_EQ(stats.tasks_failed, 2u);
2516 EXPECT_EQ(stats.tasks_completed, 1u);
2517 EXPECT_EQ(stats.total_processed(), 3u);
2518}
2519
2520// --- wait_all_for/wait_all_until Tests ---
2521
2523{
2524 ThreadPool pool(2);
2525
2526 pool.enqueue([] { return 42; }).get();
2527
2528 bool result = pool.wait_all_for(std::chrono::milliseconds(100));
2529 EXPECT_TRUE(result);
2530}
2531
2533{
2534 ThreadPool pool(1);
2535
2536 // Start a long task
2537 pool.enqueue_detached([] {
2538 std::this_thread::sleep_for(std::chrono::milliseconds(500));
2539 });
2540
2541 std::this_thread::sleep_for(std::chrono::milliseconds(10));
2542
2543 bool result = pool.wait_all_for(std::chrono::milliseconds(50));
2544 EXPECT_FALSE(result);
2545
2546 // Wait for cleanup
2547 pool.wait_all();
2548}
2549
2551{
2552 ThreadPool pool(2);
2553
2554 pool.enqueue([] { return 1; }).get();
2555
2556 auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(100);
2557 bool result = pool.wait_all_until(deadline);
2558 EXPECT_TRUE(result);
2559}
2560
2561// --- enqueue_batch Tests ---
2562
2564{
2565 ThreadPool pool(4);
2566
2567 std::vector<std::tuple<int, int>> args = {
2568 {1, 2}, {3, 4}, {5, 6}, {7, 8}
2569 };
2570
2571 auto futures = pool.enqueue_batch([](int a, int b) { return a + b; }, args);
2572
2573 EXPECT_EQ(futures.size(), 4u);
2574
2575 std::vector<int> results;
2576 for (auto& f : futures)
2577 results.push_back(f.get());
2578
2579 EXPECT_EQ(results, (std::vector<int>{3, 7, 11, 15}));
2580}
2581
2583{
2584 ThreadPool pool(4);
2585
2586 std::vector<std::tuple<int>> args;
2587 for (int i = 0; i < 100; ++i)
2588 args.emplace_back(i);
2589
2590 auto futures = pool.enqueue_batch([](int x) { return x * x; }, args);
2591
2592 int sum = 0;
2593 for (auto& f : futures)
2594 sum += f.get();
2595
2596 // Sum of squares 0^2 + 1^2 + ... + 99^2 = 328350
2597 EXPECT_EQ(sum, 328350);
2598}
2599
2601{
2602 ThreadPool pool(2);
2603
2604 std::vector<std::tuple<int>> empty;
2605 auto futures = pool.enqueue_batch([](int x) { return x; }, empty);
2606
2607 EXPECT_TRUE(futures.empty());
2608}
2609
2610// --- parallel_for Tests ---
2611
2613{
2614 ThreadPool pool(4);
2615
2616 std::vector<int> data(100, 1);
2617
2618 parallel_for(pool, data.begin(), data.end(), [](int& x) { x *= 2; });
2619
2620 for (int x : data)
2621 EXPECT_EQ(x, 2);
2622}
2623
2625{
2626 ThreadPool pool(4);
2627
2628 std::vector<int> data(100, 1);
2629
2630 parallel_for(pool, data.begin(), data.end(),
2631 [](auto begin, auto end) {
2632 for (auto it = begin; it != end; ++it)
2633 *it = 5;
2634 });
2635
2636 for (int x : data)
2637 EXPECT_EQ(x, 5);
2638}
2639
2641{
2642 ThreadPool pool(2);
2643
2644 std::vector<int> empty;
2645
2646 // Should not throw or crash
2647 parallel_for(pool, empty.begin(), empty.end(), [](int& x) { x = 0; });
2648
2649 EXPECT_TRUE(empty.empty());
2650}
2651
2653{
2654 ThreadPool pool(2);
2655
2656 std::vector<std::atomic<int>> data(50);
2657 for (auto& x : data)
2658 x = 0;
2659
2660 parallel_for(pool, data.begin(), data.end(),
2661 [](std::atomic<int>& x) { ++x; },
2662 10); // chunk size = 10
2663
2664 for (const auto& x : data)
2665 EXPECT_EQ(x.load(), 1);
2666}
2667
2668// --- parallel_for_index Tests ---
2669
2671{
2672 ThreadPool pool(4);
2673
2674 std::vector<int> data(100, 0);
2675
2676 parallel_for_index(pool, 0, data.size(), [&data](size_t i) {
2677 data[i] = static_cast<int>(i * 2);
2678 });
2679
2680 for (size_t i = 0; i < data.size(); ++i)
2681 EXPECT_EQ(data[i], static_cast<int>(i * 2));
2682}
2683
2685{
2686 ThreadPool pool(2);
2687
2688 // Should not throw
2689 parallel_for_index(pool, 10, 10, [](size_t) { /* nothing */ });
2690 parallel_for_index(pool, 10, 5, [](size_t) { /* nothing */ });
2691}
2692
2693// --- parallel_transform Tests ---
2694
2696{
2697 ThreadPool pool(4);
2698
2699 std::vector<int> input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
2700 std::vector<int> output(10);
2701
2702 parallel_transform(pool, input.begin(), input.end(), output.begin(),
2703 [](int x) { return x * x; });
2704
2705 EXPECT_EQ(output, (std::vector<int>{1, 4, 9, 16, 25, 36, 49, 64, 81, 100}));
2706}
2707
2709{
2710 ThreadPool pool(4);
2711
2712 std::vector<int> input(1000);
2713 std::iota(input.begin(), input.end(), 0);
2714
2715 std::vector<int> output(1000);
2716
2717 parallel_transform(pool, input.begin(), input.end(), output.begin(),
2718 [](int x) { return x + 1; });
2719
2720 for (int i = 0; i < 1000; ++i)
2721 EXPECT_EQ(output[i], i + 1);
2722}
2723
2725{
2726 ThreadPool pool(2);
2727
2728 std::vector<int> input;
2729 std::vector<int> output;
2730
2731 auto end = parallel_transform(pool, input.begin(), input.end(),
2732 output.begin(), [](int x) { return x; });
2733
2734 EXPECT_EQ(end, output.begin());
2735}
2736
2737// --- parallel_reduce Tests ---
2738
2740{
2741 ThreadPool pool(4);
2742
2743 std::vector<int> data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
2744
2745 int sum = parallel_reduce(pool, data.begin(), data.end(), 0, std::plus<int>());
2746
2747 EXPECT_EQ(sum, 55);
2748}
2749
2751{
2752 ThreadPool pool(4);
2753
2754 std::vector<int> data = {1, 2, 3, 4, 5};
2755
2756 int product = parallel_reduce(pool, data.begin(), data.end(), 1,
2757 std::multiplies<int>());
2758
2759 EXPECT_EQ(product, 120);
2760}
2761
2763{
2764 ThreadPool pool(4);
2765
2766 std::vector<int> data = {3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5};
2767
2768 int max_val = parallel_reduce(pool, data.begin(), data.end(),
2769 std::numeric_limits<int>::min(),
2770 [](int a, int b) { return std::max(a, b); });
2771
2772 EXPECT_EQ(max_val, 9);
2773}
2774
2776{
2777 ThreadPool pool(4);
2778
2779 std::vector<long long> data(10000);
2780 std::iota(data.begin(), data.end(), 1LL);
2781
2782 long long sum = parallel_reduce(pool, data.begin(), data.end(), 0LL,
2783 std::plus<long long>());
2784
2785 // Sum 1+2+...+10000 = 10000*10001/2 = 50005000
2786 EXPECT_EQ(sum, 50005000LL);
2787}
2788
2790{
2791 ThreadPool pool(2);
2792
2793 std::vector<int> empty;
2794
2795 int result = parallel_reduce(pool, empty.begin(), empty.end(), 42,
2796 std::plus<int>());
2797
2798 EXPECT_EQ(result, 42); // Returns init for empty range
2799}
2800
2801// --- ThreadPoolStats Tests ---
2802
2804{
2805 ThreadPoolStats stats;
2806 stats.current_queue_size = 50;
2807
2808 EXPECT_NEAR(stats.queue_utilization(100), 50.0, 0.01);
2809 EXPECT_NEAR(stats.queue_utilization(200), 25.0, 0.01);
2810 EXPECT_NEAR(stats.queue_utilization(0), 0.0, 0.01); // Edge case
2811 EXPECT_NEAR(stats.queue_utilization(std::numeric_limits<size_t>::max()), 0.0, 0.01);
2812}
2813
2814int main(int argc, char **argv)
2815{
2816 ::testing::InitGoogleTest(&argc, argv);
2817 return RUN_ALL_TESTS();
2818}
long double h
Definition btreepic.C:154
Cooperative cancellation source paired with CancellationToken.
CancellationToken token() const noexcept
Return a token observing this source.
void request_cancel() noexcept
Request cancellation for all derived tokens.
Minimal structured-concurrency helper over ThreadPool futures.
A reusable thread pool for efficient parallel task execution.
bool try_enqueue_detached(F &&f, Args &&... args)
Try to submit a detached task without blocking or throwing.
size_t num_threads() const noexcept
Get the number of worker threads.
bool wait_all_for(std::chrono::duration< Rep, Period > timeout)
Wait until all current tasks complete, with timeout.
void enqueue_detached(F &&f, Args &&... args)
Submit a task without tracking the result (fire-and-forget).
void set_exception_callback(ExceptionCallback callback)
Set callback for exceptions in detached tasks.
auto enqueue_batch(F &&f, const Container &args_list) -> std::vector< std::future< decltype(std::apply(f, *std::begin(args_list)))> >
Submit multiple tasks with a single notification.
bool is_idle() const
Check if the pool is idle (no pending or running tasks).
auto enqueue_bounded(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task with backpressure and memory protection.
auto enqueue_bulk(F &&f, const Container &args_container) -> std::vector< std::future< std::invoke_result_t< F, typename Container::value_type > > >
Submit multiple tasks and collect all futures.
void resize(size_t new_size)
Resize the pool to a different number of workers.
ThreadPoolStats get_stats() const
Get current pool statistics.
void reset_stats()
Reset statistics counters to zero.
bool wait_all_until(std::chrono::time_point< Clock, Duration > deadline)
Wait until all current tasks complete, with deadline.
bool is_stopped() const noexcept
Check if the pool has been shut down.
void wait_all(const std::chrono::milliseconds poll_interval=std::chrono::milliseconds(1))
Wait until all current tasks complete.
std::pair< size_t, size_t > get_queue_limits() const
Get current queue limits.
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task for execution and get a future for the result.
void enqueue_bounded_detached(F &&f, Args &&... args)
Submit a task with backpressure, without tracking result.
size_t pending_tasks() const
Get the number of pending tasks in the queue.
void set_queue_limits(const size_t soft_limit, const size_t hard_limit=std::numeric_limits< size_t >::max())
Set queue limits for bounded enqueue operations.
void shutdown()
Shut down the pool, completing all pending tasks.
auto try_enqueue(F &&f, Args &&... args) -> std::optional< std::future< std::invoke_result_t< F, Args... > > >
Try to submit a task without blocking or throwing.
Exception thrown when cooperative cancellation is observed.
Exception thrown when the task queue exceeds its hard limit.
size_t current_size() const noexcept
Current queue size when exception was thrown.
size_t hard_limit() const noexcept
Hard limit that was exceeded.
void SetUp() override
void TearDown() override
#define FAIL(msg)
static mpfr_t y
Definition mpfr_mul_d.c:3
Main namespace for Aleph-w library functions.
Definition ah-arena.H:89
T parallel_reduce(ThreadPool &pool, Iterator first, Iterator last, T init, BinaryOp op, size_t chunk_size=0)
Reduce elements in parallel.
OutputIt parallel_transform(ThreadPool &pool, InputIt first, InputIt last, OutputIt d_first, F &&f, size_t chunk_size=0)
Transform elements in parallel and store results.
ThreadPool & default_pool()
Return the default shared thread pool instance.
bool completed() const noexcept
Return true if all underlying iterators are finished.
Definition ah-zip.H:136
auto pexclusive_scan(ThreadPool &pool, const Container &c, T init, BinaryOp op, size_t chunk_size=0)
Parallel exclusive scan over a container.
void parallel_for_index(ThreadPool &pool, size_t start, size_t end, F &&f, size_t chunk_size=0)
Apply a function to each element in parallel (index-based).
void parallel_for(ThreadPool &pool, Iterator begin, Iterator end, F &&f, size_t chunk_size=0)
Execute a function in parallel over a range.
Divide_Conquer_DP_Result< Cost > divide_and_conquer_partition_dp(const size_t groups, const size_t n, Transition_Cost_Fn transition_cost, const Cost inf=dp_optimization_detail::default_inf< Cost >())
Optimize partition DP using divide-and-conquer optimization.
auto pmerge(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Compare comp=Compare{}, size_t chunk_size=0)
Parallel merge of two sorted containers.
T product(const Container &container, const T &init=T{1})
Compute product of all elements.
auto pscan(ThreadPool &pool, const Container &c, BinaryOp op, size_t chunk_size=0)
Parallel inclusive scan over a container.
void parallel_invoke(ThreadPool &pool, Fs &&... fs)
Invoke a small set of related callables in parallel.
T sum(const Container &container, const T &init=T{})
Compute sum of all elements.
Basic arithmetic operations for the calculator.
double add(double a, double b)
static struct argp_option options[]
Definition ntreepic.C:1886
#define LL
Definition ran_array.c:24
Common configuration object for parallel algorithms.
ThreadPool * pool
Executor to use (nullptr = default_pool()).
Statistics collected by ThreadPool.
double queue_utilization(size_t soft_limit) const noexcept
Queue utilization as percentage (0-100)
size_t current_queue_size
Current pending tasks.
static int square(int x)
int multiply(int a, int b) const
int operator()(int x) const
static long counter
Definition test-splice.C:35
A modern, efficient thread pool for parallel task execution.
int free_function(int x)
void increment_ref(int &x)
TEST_F(ThreadPoolTest, DefaultConstruction)
void add_to_ref(int &x, int amount)
ofstream output
Definition writeHeap.C:215