Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
thread_pool_test.cc
Go to the documentation of this file.
1
2/*
3 Aleph_w
4
5 Data structures & Algorithms
6 version 2.0.0b
7 https://github.com/lrleon/Aleph-w
8
9 This file is part of Aleph-w library
10
11 Copyright (c) 2002-2026 Leandro Rabindranath Leon
12
13 Permission is hereby granted, free of charge, to any person obtaining a copy
14 of this software and associated documentation files (the "Software"), to deal
15 in the Software without restriction, including without limitation the rights
16 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17 copies of the Software, and to permit persons to whom the Software is
18 furnished to do so, subject to the following conditions:
19
20 The above copyright notice and this permission notice shall be included in all
21 copies or substantial portions of the Software.
22
23 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
24 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
26 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
28 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29 SOFTWARE.
30*/
31
32
33
40#include <gtest/gtest.h>
41#include <thread_pool.H>
42#include <atomic>
43#include <chrono>
44#include <vector>
45#include <cmath>
46#include <numeric>
47#include <iomanip>
48
49using namespace Aleph;
50using namespace std::chrono_literals;
51
52class ThreadPoolTest : public ::testing::Test
53{
54protected:
55 void SetUp() override {}
56 void TearDown() override {}
57};
58
59// ============================================================================
60// Basic Functionality Tests
61// ============================================================================
62
70
76
82
84{
85 ThreadPool pool(2);
86
87 auto future = pool.enqueue([] { return 42; });
88
89 EXPECT_EQ(future.get(), 42);
90}
91
93{
94 ThreadPool pool(2);
95
96 auto future = pool.enqueue([](int a, int b) { return a + b; }, 10, 20);
97
98 EXPECT_EQ(future.get(), 30);
99}
100
102{
103 ThreadPool pool(2);
104 int value = 100;
105
106 auto future = pool.enqueue([&value] { return value * 2; });
107
108 EXPECT_EQ(future.get(), 200);
109}
110
112{
113 ThreadPool pool(2);
114 std::atomic<bool> executed{false};
115
116 auto future = pool.enqueue([&executed] { executed = true; });
117 future.get();
118
119 EXPECT_TRUE(executed);
120}
121
123{
124 ThreadPool pool(4);
125 const int num_tasks = 100;
126 std::vector<std::future<int>> futures;
127
128 for (int i = 0; i < num_tasks; ++i)
129 futures.push_back(pool.enqueue([i] { return i * i; }));
130
131 for (int i = 0; i < num_tasks; ++i)
132 EXPECT_EQ(futures[i].get(), i * i);
133}
134
135// ============================================================================
136// Status and Query Tests
137// ============================================================================
138
140{
141 ThreadPool pool(1);
142 std::atomic<bool> block{true};
143
144 // Block the single worker
145 pool.enqueue_detached([&block] { while (block) std::this_thread::yield(); });
146
147 // Queue more tasks
148 for (int i = 0; i < 5; ++i)
149 pool.enqueue_detached([] {});
150
151 // Should have pending tasks
152 EXPECT_GT(pool.pending_tasks(), 0u);
153
154 // Unblock
155 block = false;
156 pool.wait_all();
157
158 EXPECT_EQ(pool.pending_tasks(), 0u);
159}
160
162{
163 ThreadPool pool(2);
164
165 auto f1 = pool.enqueue([] { return 1; });
166 auto f2 = pool.enqueue([] { return 2; });
167
168 f1.get();
169 f2.get();
170
171 pool.wait_all();
172 EXPECT_TRUE(pool.is_idle());
173}
174
175// ============================================================================
176// Shutdown Tests
177// ============================================================================
178
180{
181 ThreadPool pool(2);
182 std::atomic<int> counter{0};
183
184 for (int i = 0; i < 10; ++i)
185 pool.enqueue_detached([&counter] { ++counter; });
186
187 pool.shutdown();
188
189 EXPECT_EQ(counter.load(), 10);
190 EXPECT_TRUE(pool.is_stopped());
191}
192
194{
195 ThreadPool pool(2);
196 pool.shutdown();
197
198 EXPECT_THROW(pool.enqueue([] { return 0; }), std::runtime_error);
199}
200
202{
203 ThreadPool pool(2);
204 pool.shutdown();
206}
207
208// ============================================================================
209// Resize Tests
210// ============================================================================
211
213{
214 ThreadPool pool(2);
215 EXPECT_EQ(pool.num_threads(), 2u);
216
217 pool.resize(4);
218 EXPECT_EQ(pool.num_threads(), 4u);
219
220 // Verify new workers work
221 auto future = pool.enqueue([] { return 42; });
222 EXPECT_EQ(future.get(), 42);
223}
224
226{
227 ThreadPool pool(4);
228 EXPECT_EQ(pool.num_threads(), 4u);
229
230 pool.resize(2);
231 EXPECT_EQ(pool.num_threads(), 2u);
232
233 // Verify remaining workers work
234 auto future = pool.enqueue([] { return 42; });
235 EXPECT_EQ(future.get(), 42);
236}
237
239{
240 ThreadPool pool(4);
241 pool.resize(4);
242 EXPECT_EQ(pool.num_threads(), 4u);
243}
244
246{
247 ThreadPool pool(2);
248 pool.shutdown();
249
250 EXPECT_THROW(pool.resize(4), std::runtime_error);
251}
252
254{
255 ThreadPool pool(1);
256 std::atomic<bool> block{true};
257 std::atomic<int> completed{0};
258
259 // Block the worker
260 pool.enqueue_detached([&block] { while (block) std::this_thread::yield(); });
261
262 // Queue tasks
263 for (int i = 0; i < 5; ++i)
264 pool.enqueue_detached([&completed] { ++completed; });
265
266 // Unblock and resize
267 block = false;
268 pool.resize(4);
269
270 pool.wait_all();
271
272 // All tasks should complete (first blocking task + 5 counting tasks)
273 EXPECT_EQ(completed.load(), 5);
274}
275
276// ============================================================================
277// Exception Handling Tests
278// ============================================================================
279
281{
282 ThreadPool pool(2);
283
284 auto future = pool.enqueue([] {
285 throw std::runtime_error("test exception");
286 return 0;
287 });
288
289 EXPECT_THROW(future.get(), std::runtime_error);
290}
291
293{
294 ThreadPool pool(2);
295
296 auto f1 = pool.enqueue([] {
297 throw std::runtime_error("test");
298 return 0;
299 });
300
301 auto f2 = pool.enqueue([] { return 42; });
302
303 EXPECT_THROW(f1.get(), std::runtime_error);
304 EXPECT_EQ(f2.get(), 42);
305}
306
307// ============================================================================
308// Concurrency Tests
309// ============================================================================
310
312{
313 ThreadPool pool(4);
314 std::atomic<int> counter{0};
315 const int tasks_per_thread = 100;
316 const int num_enqueue_threads = 4;
317
318 std::vector<std::thread> enqueuers;
319 for (int t = 0; t < num_enqueue_threads; ++t)
320 {
321 enqueuers.emplace_back([&pool, &counter, tasks_per_thread] {
322 for (int i = 0; i < tasks_per_thread; ++i)
323 pool.enqueue_detached([&counter] { ++counter; });
324 });
325 }
326
327 for (auto& t : enqueuers)
328 t.join();
329
330 pool.wait_all();
331
333}
334
336{
337 ThreadPool pool(4);
338 std::atomic<int> concurrent_count{0};
339 std::atomic<int> max_concurrent{0};
340
341 std::vector<std::future<void>> futures;
342 for (int i = 0; i < 100; ++i)
343 {
344 futures.push_back(pool.enqueue([&concurrent_count, &max_concurrent] {
345 int current = ++concurrent_count;
346
347 // Update max if this is higher
348 int prev_max = max_concurrent.load();
349 while (current > prev_max &&
350 !max_concurrent.compare_exchange_weak(prev_max, current))
351 ;
352
353 std::this_thread::sleep_for(1ms);
354 --concurrent_count;
355 }));
356 }
357
358 for (auto& f : futures)
359 f.get();
360
361 // Should have had multiple concurrent executions
362 EXPECT_GT(max_concurrent.load(), 1);
363}
364
365// ============================================================================
366// Performance Tests (not strictly unit tests, but useful)
367// ============================================================================
368
370{
371 ThreadPool pool(std::thread::hardware_concurrency());
372 const int num_tasks = 10000;
373 std::atomic<int> sum{0};
374
375 auto start = std::chrono::high_resolution_clock::now();
376
377 std::vector<std::future<void>> futures;
378 futures.reserve(num_tasks);
379
380 for (int i = 0; i < num_tasks; ++i)
381 futures.push_back(pool.enqueue([&sum, i] { sum += i; }));
382
383 for (auto& f : futures)
384 f.get();
385
386 auto end = std::chrono::high_resolution_clock::now();
387 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
388
389 // Expected sum: 0 + 1 + 2 + ... + (n-1) = n*(n-1)/2
390 int expected = num_tasks * (num_tasks - 1) / 2;
391 EXPECT_EQ(sum.load(), expected);
392
393 // Should complete reasonably fast (less than 5 seconds)
394 EXPECT_LT(duration.count(), 5000);
395}
396
398{
399 ThreadPool pool(std::thread::hardware_concurrency());
400 const int num_tasks = 100;
401
402 // Compute-intensive task: calculate sum of squares
403 auto compute = [](int n) {
404 double sum = 0;
405 for (int i = 0; i < n; ++i)
406 sum += std::sqrt(static_cast<double>(i));
407 return sum;
408 };
409
410 std::vector<std::future<double>> futures;
411 for (int i = 0; i < num_tasks; ++i)
412 futures.push_back(pool.enqueue(compute, 10000));
413
414 for (auto& f : futures)
415 f.get();
416
417 // Wait for pool to transition to idle state (fixes race condition in CI)
418 const auto start = std::chrono::steady_clock::now();
419 while (!pool.is_idle() &&
420 std::chrono::steady_clock::now() - start < 1s)
421 {
422 std::this_thread::sleep_for(1ms);
423 }
424
425 EXPECT_TRUE(pool.is_idle());
426}
427
428// ============================================================================
429// Return Type Tests
430// ============================================================================
431
433{
434 ThreadPool pool(2);
435
436 auto future = pool.enqueue([] { return std::string("hello"); });
437
438 EXPECT_EQ(future.get(), "hello");
439}
440
442{
443 ThreadPool pool(2);
444
445 auto future = pool.enqueue([] {
446 return std::vector<int>{1, 2, 3, 4, 5};
447 });
448
449 auto result = future.get();
450 EXPECT_EQ(result.size(), 5u);
451 EXPECT_EQ(result[2], 3);
452}
453
455{
456 ThreadPool pool(2);
457
458 auto future = pool.enqueue([](int a, int b) {
459 return std::make_pair(a + b, a * b);
460 }, 3, 4);
461
462 auto [sum, product] = future.get();
463 EXPECT_EQ(sum, 7);
464 EXPECT_EQ(product, 12);
465}
466
467// ============================================================================
468// Callable Types Tests
469// ============================================================================
470
471int free_function(int x) { return x * 2; }
472
474{
475 int value;
476 int operator()(int x) const { return x + value; }
477};
478
480{
481 ThreadPool pool(2);
482
483 auto future = pool.enqueue(free_function, 21);
484
485 EXPECT_EQ(future.get(), 42);
486}
487
489{
490 ThreadPool pool(2);
491 Functor f{10};
492
493 auto future = pool.enqueue(f, 32);
494
495 EXPECT_EQ(future.get(), 42);
496}
497
499{
500 ThreadPool pool(2);
501 std::function<int(int)> func = [](int x) { return x * x; };
502
503 auto future = pool.enqueue(func, 6);
504
505 EXPECT_EQ(future.get(), 36);
506}
507
508// ============================================================================
509// Member Function Tests
510// ============================================================================
511
513{
514 int value = 10;
515
516 int add(int x) { return value + x; }
517 int multiply(int a, int b) const { return a * b; }
518 static int square(int x) { return x * x; }
519};
520
522{
523 ThreadPool pool(2);
525 calc.value = 20;
526
527 auto future = pool.enqueue(&Calculator::add, &calc, 22);
528
529 EXPECT_EQ(future.get(), 42);
530}
531
533{
534 ThreadPool pool(2);
536
537 auto future = pool.enqueue(&Calculator::multiply, &calc, 6, 7);
538
539 EXPECT_EQ(future.get(), 42);
540}
541
543{
544 ThreadPool pool(2);
545
546 auto future = pool.enqueue(&Calculator::square, 7);
547
548 EXPECT_EQ(future.get(), 49);
549}
550
552{
553 ThreadPool pool(2);
555 calc.value = 30;
556
557 // Using reference wrapper
558 auto future = pool.enqueue(&Calculator::add, std::ref(calc), 12);
559
560 EXPECT_EQ(future.get(), 42);
561}
562
563// ============================================================================
564// Move-Only Tests
565// ============================================================================
566
568{
569 ThreadPool pool(2);
570
571 auto ptr = std::make_unique<int>(42);
572 auto future = pool.enqueue([p = std::move(ptr)]() { return *p; });
573
574 EXPECT_EQ(future.get(), 42);
575}
576
578{
579 ThreadPool pool(2);
580
581 auto ptr = std::make_unique<int>(100);
582 auto future = pool.enqueue([](std::unique_ptr<int> p) { return *p * 2; },
583 std::move(ptr));
584
585 EXPECT_EQ(future.get(), 200);
586}
587
589{
590 ThreadPool pool(2);
591
592 struct MoveOnlyFunctor
593 {
594 std::unique_ptr<int> data;
595
596 MoveOnlyFunctor(int v) : data(std::make_unique<int>(v)) {}
597 MoveOnlyFunctor(MoveOnlyFunctor&&) = default;
598 MoveOnlyFunctor& operator=(MoveOnlyFunctor&&) = default;
599 MoveOnlyFunctor(const MoveOnlyFunctor&) = delete;
600
601 int operator()() { return *data; }
602 };
603
604 auto future = pool.enqueue(MoveOnlyFunctor{42});
605
606 EXPECT_EQ(future.get(), 42);
607}
608
609// ============================================================================
610// Enqueue Detached Tests
611// ============================================================================
612
614{
615 ThreadPool pool(2);
616 std::atomic<int> counter{0};
617
618 for (int i = 0; i < 10; ++i)
619 pool.enqueue_detached([&counter] { ++counter; });
620
621 pool.wait_all();
622
623 EXPECT_EQ(counter.load(), 10);
624}
625
627{
628 ThreadPool pool(2);
629 std::atomic<int> sum{0};
630
631 for (int i = 1; i <= 5; ++i)
632 pool.enqueue_detached([&sum](int x) { sum += x; }, i);
633
634 pool.wait_all();
635
636 EXPECT_EQ(sum.load(), 15); // 1+2+3+4+5
637}
638
640{
641 ThreadPool pool(2);
642 std::atomic<int> counter{0};
643
644 // This should not crash - exceptions are silently ignored
645 pool.enqueue_detached([] { throw std::runtime_error("ignored"); });
646 pool.enqueue_detached([&counter] { ++counter; });
647
648 pool.wait_all();
649
650 EXPECT_EQ(counter.load(), 1);
651}
652
654{
655 ThreadPool pool(2);
656 pool.shutdown();
657
658 EXPECT_THROW(pool.enqueue_detached([] {}), std::runtime_error);
659}
660
661// ============================================================================
662// Enqueue Bulk Tests
663// ============================================================================
664
666{
667 ThreadPool pool(4);
668 std::vector<int> inputs = {1, 2, 3, 4, 5};
669
670 auto futures = pool.enqueue_bulk([](int x) { return x * x; }, inputs);
671
672 ASSERT_EQ(futures.size(), 5u);
673 EXPECT_EQ(futures[0].get(), 1);
674 EXPECT_EQ(futures[1].get(), 4);
675 EXPECT_EQ(futures[2].get(), 9);
676 EXPECT_EQ(futures[3].get(), 16);
677 EXPECT_EQ(futures[4].get(), 25);
678}
679
681{
682 ThreadPool pool(2);
683 std::vector<std::string> inputs = {"hello", "world", "test"};
684
685 auto futures = pool.enqueue_bulk([](const std::string& s) {
686 return s.size();
687 }, inputs);
688
689 ASSERT_EQ(futures.size(), 3u);
690 EXPECT_EQ(futures[0].get(), 5u);
691 EXPECT_EQ(futures[1].get(), 5u);
692 EXPECT_EQ(futures[2].get(), 4u);
693}
694
696{
697 ThreadPool pool(2);
698 std::vector<int> empty;
699
700 auto futures = pool.enqueue_bulk([](int x) { return x; }, empty);
701
703}
704
705// ============================================================================
706// Default Pool Tests
707// ============================================================================
708
710{
711 auto& pool = Aleph::default_pool();
712
713 EXPECT_GT(pool.num_threads(), 0u);
714 EXPECT_FALSE(pool.is_stopped());
715}
716
718{
719 auto future = Aleph::default_pool().enqueue([](int x) { return x * 2; }, 21);
720
721 EXPECT_EQ(future.get(), 42);
722}
723
731
732// ============================================================================
733// Reference Argument Tests
734// ============================================================================
735
736void increment_ref(int& x) { ++x; }
737void add_to_ref(int& x, int amount) { x += amount; }
738
740{
741 ThreadPool pool(2);
742 int value = 10;
743
744 auto future = pool.enqueue(increment_ref, std::ref(value));
745 future.get();
746
747 EXPECT_EQ(value, 11);
748}
749
751{
752 ThreadPool pool(2);
753 int value = 100;
754
755 auto future = pool.enqueue(add_to_ref, std::ref(value), 50);
756 future.get();
757
758 EXPECT_EQ(value, 150);
759}
760
762{
763 ThreadPool pool(2);
764 const int value = 77;
765
766 auto future = pool.enqueue([](const int& x) { return x * 2; }, std::cref(value));
767
768 EXPECT_EQ(future.get(), 154);
769}
770
772{
773 ThreadPool pool(2);
774 int value = 0;
775
776 auto future = pool.enqueue([&value](int x) { value = x; }, 42);
777 future.get();
778
779 EXPECT_EQ(value, 42);
780}
781
783{
784 ThreadPool pool(2);
786 calc.value = 30;
787
788 auto future = pool.enqueue(&Calculator::add, std::ref(calc), 12);
789
790 EXPECT_EQ(future.get(), 42);
791}
792
793// ============================================================================
794// WaitAll Tests
795// ============================================================================
796
798{
799 ThreadPool pool(2);
800 std::atomic<int> counter{0};
801
802 for (int i = 0; i < 10; ++i)
803 pool.enqueue_detached([&counter] {
804 std::this_thread::sleep_for(10ms);
805 ++counter;
806 });
807
808 pool.wait_all();
809
810 EXPECT_EQ(counter.load(), 10);
811}
812
814{
815 ThreadPool pool(2);
816
817 // Should return immediately
818 auto start = std::chrono::high_resolution_clock::now();
819 pool.wait_all();
820 auto end = std::chrono::high_resolution_clock::now();
821
822 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
823 EXPECT_LT(duration.count(), 100); // Should be nearly instant
824}
825
826// ============================================================================
827// Stress Tests
828// ============================================================================
829
831{
832 ThreadPool pool(std::thread::hardware_concurrency());
833 const int num_tasks = 100000;
834 std::atomic<int> counter{0};
835
836 for (int i = 0; i < num_tasks; ++i)
837 pool.enqueue_detached([&counter] { ++counter; });
838
839 pool.wait_all();
840
841 EXPECT_EQ(counter.load(), num_tasks);
842}
843
845{
846 ThreadPool pool(8);
847 const int num_enqueuers = 16;
848 const int tasks_per_enqueuer = 1000;
849 std::atomic<int> counter{0};
850
851 std::vector<std::thread> enqueuers;
852 for (int i = 0; i < num_enqueuers; ++i)
853 {
854 enqueuers.emplace_back([&pool, &counter, tasks_per_enqueuer] {
855 for (int j = 0; j < tasks_per_enqueuer; ++j)
856 pool.enqueue_detached([&counter] { ++counter; });
857 });
858 }
859
860 for (auto& t : enqueuers)
861 t.join();
862
863 pool.wait_all();
864
865 EXPECT_EQ(counter.load(), num_enqueuers * tasks_per_enqueuer);
866}
867
869{
870 ThreadPool pool(4);
871 std::atomic<int> fast_count{0};
872 std::atomic<int> slow_count{0};
873
874 // Mix of fast and slow tasks
875 for (int i = 0; i < 100; ++i)
876 {
877 pool.enqueue_detached([&fast_count] { ++fast_count; });
879 std::this_thread::sleep_for(1ms);
880 ++slow_count;
881 });
882 }
883
884 pool.wait_all();
885
886 EXPECT_EQ(fast_count.load(), 100);
887 EXPECT_EQ(slow_count.load(), 100);
888}
889
890// ============================================================================
891// Edge Cases
892// ============================================================================
893
895{
896 ThreadPool pool(1);
897 std::vector<int> results;
898 std::mutex mtx;
899
900 // Tasks should execute sequentially
901 for (int i = 0; i < 10; ++i)
902 {
903 pool.enqueue_detached([&results, &mtx, i] {
904 std::lock_guard<std::mutex> lock(mtx);
905 results.push_back(i);
906 });
907 }
908
909 pool.wait_all();
910
911 EXPECT_EQ(results.size(), 10u);
912}
913
915{
916 ThreadPool pool(4);
917
918 for (int i = 0; i < 1000; ++i)
919 pool.enqueue_detached([] {});
920
921 pool.wait_all();
922
923 EXPECT_TRUE(pool.is_idle());
924}
925
927{
928 ThreadPool pool(2);
929
930 auto future = pool.enqueue([] {
931 std::vector<int> large(10000);
932 std::iota(large.begin(), large.end(), 0);
933 return large;
934 });
935
936 auto result = future.get();
937 EXPECT_EQ(result.size(), 10000u);
938 EXPECT_EQ(result[9999], 9999);
939}
940
942{
943 ThreadPool pool(2);
944
945 auto future = pool.enqueue([&pool] {
946 // Enqueue from inside a task
947 return pool.enqueue([] { return 42; });
948 });
949
950 auto inner_future = future.get();
952}
953
955{
956 ThreadPool pool(4);
957 std::atomic<int> counter{0};
958
959 std::function<void(int)> recursive = [&pool, &counter, &recursive](int depth) {
960 ++counter;
961 if (depth > 0)
962 pool.enqueue_detached(recursive, depth - 1);
963 };
964
965 pool.enqueue_detached(recursive, 10);
966
967 // Wait a bit for all recursive tasks
968 std::this_thread::sleep_for(100ms);
969 pool.wait_all();
970
971 EXPECT_EQ(counter.load(), 11); // 0 to 10 inclusive
972}
973
975{
976 ThreadPool pool(4);
977
978 auto f1 = pool.enqueue([]() -> int { throw std::runtime_error("runtime"); });
979 auto f2 = pool.enqueue([]() -> int { throw std::logic_error("logic"); });
980 auto f3 = pool.enqueue([]() -> int { throw std::out_of_range("range"); });
981 auto f4 = pool.enqueue([] { return 42; }); // Normal task
982
983 EXPECT_THROW(f1.get(), std::runtime_error);
984 EXPECT_THROW(f2.get(), std::logic_error);
985 EXPECT_THROW(f3.get(), std::out_of_range);
986 EXPECT_EQ(f4.get(), 42);
987}
988
990{
991 ThreadPool pool(4);
992 std::atomic<int> exception_count{0};
993
994 std::vector<std::future<int>> futures;
995 for (int i = 0; i < 100; ++i)
996 {
997 futures.push_back(pool.enqueue([]() -> int {
998 throw std::runtime_error("test");
999 }));
1000 }
1001
1002 for (auto& f : futures)
1003 {
1004 try
1005 {
1006 f.get();
1007 }
1008 catch (const std::runtime_error&)
1009 {
1011 }
1012 }
1013
1014 EXPECT_EQ(exception_count.load(), 100);
1015}
1016
1017// ============================================================================
1018// Shutdown and Lifecycle Tests
1019// ============================================================================
1020
1022{
1023 ThreadPool pool(2);
1024 std::atomic<int> started{0};
1025 std::atomic<int> finished{0};
1026 std::atomic<bool> can_finish{false};
1027
1028 // Start some blocking tasks
1029 for (int i = 0; i < 4; ++i)
1030 {
1032 ++started;
1033 while (!can_finish)
1034 std::this_thread::sleep_for(1ms);
1035 ++finished;
1036 });
1037 }
1038
1039 // Wait for tasks to start
1040 while (started < 2)
1041 std::this_thread::sleep_for(1ms);
1042
1043 // Allow tasks to finish
1044 can_finish = true;
1045
1046 // Shutdown should wait for all tasks
1047 pool.shutdown();
1048
1049 EXPECT_EQ(finished.load(), 4);
1050}
1051
1053{
1054 std::atomic<int> counter{0};
1055
1056 {
1057 ThreadPool pool(2);
1058 for (int i = 0; i < 10; ++i)
1059 {
1060 pool.enqueue_detached([&counter] {
1061 std::this_thread::sleep_for(10ms);
1062 ++counter;
1063 });
1064 }
1065 // Destructor should wait
1066 }
1067
1068 EXPECT_EQ(counter.load(), 10);
1069}
1070
1072{
1073 ThreadPool pool(2);
1074 std::atomic<int> counter{0};
1075 std::atomic<bool> keep_running{true};
1076 std::atomic<bool> resize_in_progress{false};
1077
1078 // Start continuous work
1079 std::thread producer([&pool, &counter, &keep_running, &resize_in_progress] {
1080 while (keep_running)
1081 {
1082 // Skip enqueue during resize to avoid race
1083 if (!resize_in_progress)
1084 {
1085 try
1086 {
1087 pool.enqueue_detached([&counter] { ++counter; });
1088 }
1089 catch (const std::runtime_error&)
1090 {
1091 // Pool might be stopped during resize, ignore
1092 }
1093 }
1094 std::this_thread::sleep_for(1ms);
1095 }
1096 });
1097
1098 // Resize while under load
1099 std::this_thread::sleep_for(20ms);
1100 resize_in_progress = true;
1101 pool.resize(8);
1102 resize_in_progress = false;
1103 std::this_thread::sleep_for(20ms);
1104 resize_in_progress = true;
1105 pool.resize(2);
1106 resize_in_progress = false;
1107 std::this_thread::sleep_for(20ms);
1108
1109 keep_running = false;
1110 producer.join();
1111 pool.wait_all();
1112
1113 EXPECT_GT(counter.load(), 0);
1114}
1115
1116// ============================================================================
1117// Data Integrity Tests
1118// ============================================================================
1119
1121{
1122 ThreadPool pool(8);
1123 const int num_tasks = 10000;
1124 std::vector<std::future<int>> futures;
1125 futures.reserve(num_tasks);
1126
1127 // Each task returns its index
1128 for (int i = 0; i < num_tasks; ++i)
1129 futures.push_back(pool.enqueue([i] { return i; }));
1130
1131 // Verify all results are correct
1132 for (int i = 0; i < num_tasks; ++i)
1133 EXPECT_EQ(futures[i].get(), i);
1134}
1135
1137{
1138 ThreadPool pool(8);
1139 std::atomic<long long> sum{0};
1140 const int num_tasks = 10000;
1141
1142 for (int i = 1; i <= num_tasks; ++i)
1143 pool.enqueue_detached([&sum, i] { sum += i; });
1144
1145 pool.wait_all();
1146
1147 // Sum of 1 to n = n*(n+1)/2
1148 long long expected = static_cast<long long>(num_tasks) * (num_tasks + 1) / 2;
1149 EXPECT_EQ(sum.load(), expected);
1150}
1151
1153{
1154 ThreadPool pool(8);
1155 std::vector<int> shared_vec;
1156 std::mutex mtx;
1157 const int num_tasks = 1000;
1158
1159 for (int i = 0; i < num_tasks; ++i)
1160 {
1161 pool.enqueue_detached([&shared_vec, &mtx, i] {
1162 std::lock_guard<std::mutex> lock(mtx);
1163 shared_vec.push_back(i);
1164 });
1165 }
1166
1167 pool.wait_all();
1168
1169 EXPECT_EQ(shared_vec.size(), static_cast<size_t>(num_tasks));
1170
1171 // Sort and verify all values present
1172 std::sort(shared_vec.begin(), shared_vec.end());
1173 for (int i = 0; i < num_tasks; ++i)
1174 EXPECT_EQ(shared_vec[i], i);
1175}
1176
1177// ============================================================================
1178// Timing and Performance Tests
1179// ============================================================================
1180
1182{
1183 const int num_tasks = 100;
1184 const auto work_duration = 10ms;
1185
1186 auto do_work = [work_duration] {
1187 std::this_thread::sleep_for(work_duration);
1188 };
1189
1190 // Sequential baseline (single thread)
1192 auto start_single = std::chrono::high_resolution_clock::now();
1193 for (int i = 0; i < num_tasks; ++i)
1194 pool_single.enqueue_detached(do_work);
1195 pool_single.wait_all();
1196 auto end_single = std::chrono::high_resolution_clock::now();
1197 auto duration_single = std::chrono::duration_cast<std::chrono::milliseconds>(end_single - start_single);
1198
1199 // Parallel (multiple threads)
1200 size_t num_threads = std::min(8u, std::thread::hardware_concurrency());
1201 ThreadPool pool_parallel(num_threads);
1202 auto start_parallel = std::chrono::high_resolution_clock::now();
1203 for (int i = 0; i < num_tasks; ++i)
1204 pool_parallel.enqueue_detached(do_work);
1205 pool_parallel.wait_all();
1206 auto end_parallel = std::chrono::high_resolution_clock::now();
1207 auto duration_parallel = std::chrono::duration_cast<std::chrono::milliseconds>(end_parallel - start_parallel);
1208
1209 // Parallel should be significantly faster
1210 double speedup = static_cast<double>(duration_single.count()) / duration_parallel.count();
1211 EXPECT_GT(speedup, 1.5); // At least 1.5x speedup
1212}
1213
1215{
1216 ThreadPool pool(4);
1217 const int num_tasks = 1000;
1218
1219 auto start = std::chrono::high_resolution_clock::now();
1220
1221 std::vector<std::future<int>> futures;
1222 futures.reserve(num_tasks);
1223 for (int i = 0; i < num_tasks; ++i)
1224 futures.push_back(pool.enqueue([i] { return i; }));
1225
1226 for (auto& f : futures)
1227 f.get();
1228
1229 auto end = std::chrono::high_resolution_clock::now();
1230 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
1231
1232 // Should complete 1000 trivial tasks in less than 100ms
1233 EXPECT_LT(duration.count(), 100000);
1234}
1235
1236// ============================================================================
1237// Complex Callable Tests
1238// ============================================================================
1239
1241{
1242 ThreadPool pool(2);
1243
1244 auto outer = [](int x) {
1245 return [x](int y) {
1246 return x + y;
1247 };
1248 };
1249
1250 auto future = pool.enqueue([outer] {
1251 auto inner = outer(10);
1252 return inner(32);
1253 });
1254
1255 EXPECT_EQ(future.get(), 42);
1256}
1257
1259{
1260 ThreadPool pool(2);
1261
1262 auto add = [](int a, int b, int c) { return a + b + c; };
1263 auto bound = std::bind(add, 10, std::placeholders::_1, 20);
1264
1265 auto future = pool.enqueue(bound, 12);
1266
1267 EXPECT_EQ(future.get(), 42);
1268}
1269
1271{
1272 ThreadPool pool(2);
1273
1274 // Generic lambda (C++14+)
1275 auto generic_add = [](auto a, auto b) { return a + b; };
1276
1277 auto future_int = pool.enqueue(generic_add, 20, 22);
1278 auto future_double = pool.enqueue(generic_add, 20.5, 21.5);
1279
1280 EXPECT_EQ(future_int.get(), 42);
1282}
1283
1285{
1286 ThreadPool pool(2);
1287
1288 std::vector<int> large_data(1000, 42);
1289
1290 auto future = pool.enqueue([data = std::move(large_data)]() mutable {
1291 return std::accumulate(data.begin(), data.end(), 0);
1292 });
1293
1294 EXPECT_EQ(future.get(), 42000);
1295}
1296
1297// ============================================================================
1298// Edge Case: Very Many Arguments
1299// ============================================================================
1300
1302{
1303 ThreadPool pool(2);
1304
1305 auto sum_all = [](int a, int b, int c, int d, int e,
1306 int f, int g, int h, int i, int j) {
1307 return a + b + c + d + e + f + g + h + i + j;
1308 };
1309
1310 auto future = pool.enqueue(sum_all, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
1311
1312 EXPECT_EQ(future.get(), 55);
1313}
1314
1315// ============================================================================
1316// Producer-Consumer Pattern
1317// ============================================================================
1318
1320{
1321 ThreadPool pool(4);
1322 std::queue<int> work_queue;
1323 std::mutex queue_mutex;
1324 std::condition_variable cv;
1325 std::atomic<bool> done{false};
1326 std::atomic<int> consumed{0};
1327
1328 // Producer task
1329 pool.enqueue_detached([&work_queue, &queue_mutex, &cv, &done] {
1330 for (int i = 0; i < 100; ++i)
1331 {
1332 {
1333 std::lock_guard<std::mutex> lock(queue_mutex);
1334 work_queue.push(i);
1335 }
1336 cv.notify_one();
1337 std::this_thread::sleep_for(1ms);
1338 }
1339 done = true;
1340 cv.notify_all();
1341 });
1342
1343 // Consumer tasks
1344 for (int c = 0; c < 3; ++c)
1345 {
1346 pool.enqueue_detached([&work_queue, &queue_mutex, &cv, &done, &consumed] {
1347 while (true)
1348 {
1349 {
1350 std::unique_lock<std::mutex> lock(queue_mutex);
1351 cv.wait_for(lock, 10ms, [&] { return !work_queue.empty() || done; });
1352 if (work_queue.empty() && done)
1353 return;
1354 if (work_queue.empty())
1355 continue;
1356 work_queue.pop(); // Consume the item
1357 }
1358 ++consumed;
1359 }
1360 });
1361 }
1362
1363 pool.wait_all();
1364
1365 EXPECT_EQ(consumed.load(), 100);
1366}
1367
1368// ============================================================================
1369// Bounded Queue Tests
1370// ============================================================================
1371
1373{
1374 ThreadPool pool(2);
1375
1376 pool.set_queue_limits(100, 500);
1377 auto [soft, hard] = pool.get_queue_limits();
1378
1379 EXPECT_EQ(soft, 100u);
1380 EXPECT_EQ(hard, 500u);
1381}
1382
1384{
1385 ThreadPool pool(2);
1386
1387 pool.set_queue_limits(100); // hard should default to 10x soft
1388 auto [soft, hard] = pool.get_queue_limits();
1389
1390 EXPECT_EQ(soft, 100u);
1391 EXPECT_EQ(hard, 1000u); // 10 * 100
1392}
1393
1395{
1396 ThreadPool pool(4);
1397 pool.set_queue_limits(100, 1000);
1398
1399 // Should not block when below soft limit
1400 std::vector<std::future<int>> futures;
1401 for (int i = 0; i < 50; ++i)
1402 futures.push_back(pool.enqueue_bounded([i] { return i; }));
1403
1404 for (int i = 0; i < 50; ++i)
1405 EXPECT_EQ(futures[i].get(), i);
1406}
1407
1409{
1410 ThreadPool pool(1); // Single worker
1411 pool.set_queue_limits(5, 100);
1412
1413 std::atomic<bool> worker_blocked{true};
1414 std::atomic<int> enqueued{0};
1415
1416 // Block the only worker
1418 while (worker_blocked)
1419 std::this_thread::sleep_for(1ms);
1420 });
1421
1422 // Enqueue up to soft limit (should succeed immediately)
1423 for (int i = 0; i < 5; ++i)
1424 {
1426 }
1427
1428 EXPECT_EQ(pool.pending_tasks(), 5u);
1429
1430 // Next enqueue should block
1431 std::atomic<bool> enqueue_completed{false};
1432 std::thread blocker([&pool, &enqueue_completed] {
1433 pool.enqueue_bounded_detached([] {});
1434 enqueue_completed = true;
1435 });
1436
1437 // Give it time to try to enqueue
1438 std::this_thread::sleep_for(50ms);
1439 EXPECT_FALSE(enqueue_completed); // Should still be blocked
1440
1441 // Release the worker
1442 worker_blocked = false;
1443
1444 // Now it should complete
1445 blocker.join();
1446 pool.wait_all();
1447
1449}
1450
1452{
1453 ThreadPool pool(1);
1454 pool.set_queue_limits(10, 15);
1455
1456 std::atomic<bool> worker_blocked{true};
1457
1458 // Block the worker
1460 while (worker_blocked)
1461 std::this_thread::sleep_for(1ms);
1462 });
1463
1464 // Fill up to hard limit using regular enqueue (bypasses limits)
1465 for (int i = 0; i < 15; ++i)
1466 pool.enqueue_detached([] {});
1467
1468 // Now bounded enqueue should throw
1470
1471 // Clean up
1472 worker_blocked = false;
1473 pool.wait_all();
1474}
1475
1477{
1478 ThreadPool pool(1);
1479 pool.set_queue_limits(5, 10);
1480
1481 std::atomic<bool> worker_blocked{true};
1483 while (worker_blocked)
1484 std::this_thread::sleep_for(1ms);
1485 });
1486
1487 // Fill queue beyond hard limit
1488 for (int i = 0; i < 10; ++i)
1489 pool.enqueue_detached([] {});
1490
1491 try
1492 {
1493 pool.enqueue_bounded([] { return 0; });
1494 FAIL() << "Expected queue_overflow_error";
1495 }
1496 catch (const Aleph::queue_overflow_error& e)
1497 {
1498 EXPECT_GE(e.current_size(), 10u);
1499 EXPECT_EQ(e.hard_limit(), 10u);
1500 EXPECT_NE(std::string(e.what()).find("overflow"), std::string::npos);
1501 }
1502
1503 worker_blocked = false;
1504 pool.wait_all();
1505}
1506
1508{
1509 ThreadPool pool(1);
1510 pool.set_queue_limits(3, 100);
1511
1512 std::atomic<bool> worker_blocked{true};
1514 while (worker_blocked)
1515 std::this_thread::sleep_for(1ms);
1516 });
1517
1518 // Fill to soft limit
1519 for (int i = 0; i < 3; ++i)
1520 pool.enqueue_bounded_detached([] {});
1521
1522 // Next should block
1523 std::atomic<bool> done{false};
1524 std::thread t([&pool, &done] {
1525 pool.enqueue_bounded_detached([] {});
1526 done = true;
1527 });
1528
1529 std::this_thread::sleep_for(30ms);
1531
1532 worker_blocked = false;
1533 t.join();
1534 pool.wait_all();
1535
1537}
1538
1540{
1541 ThreadPool pool(2);
1542 pool.set_queue_limits(100, 1000);
1543 pool.shutdown();
1544
1545 EXPECT_THROW(pool.enqueue_bounded([] { return 0; }), std::runtime_error);
1546 EXPECT_THROW(pool.enqueue_bounded_detached([] {}), std::runtime_error);
1547}
1548
1550{
1551 ThreadPool pool(2);
1552 pool.set_queue_limits(10, 100);
1553
1554 std::atomic<int> produced{0};
1555 std::atomic<int> consumed{0};
1556 std::atomic<bool> stop_producing{false};
1557
1558 // Consumer tasks (slow)
1559 auto consume = [&consumed] {
1560 std::this_thread::sleep_for(5ms);
1561 ++consumed;
1562 };
1563
1564 // Producer thread (fast)
1565 std::thread producer([&] {
1566 while (!stop_producing && produced < 50)
1567 {
1568 try
1569 {
1571 ++produced;
1572 }
1573 catch (const Aleph::queue_overflow_error&)
1574 {
1575 // Hard limit reached, stop
1576 break;
1577 }
1578 }
1579 });
1580
1581 // Let it run for a bit
1582 std::this_thread::sleep_for(200ms);
1583 stop_producing = true;
1584 producer.join();
1585 pool.wait_all();
1586
1587 // Producer should have been slowed down by backpressure
1588 // (can't produce much faster than consumers can consume)
1589 EXPECT_EQ(consumed.load(), produced.load());
1590}
1591
1593{
1594 ThreadPool pool(4);
1595 pool.set_queue_limits(50, 200);
1596
1597 std::atomic<int> counter{0};
1598 const int num_producers = 8;
1599 const int tasks_per_producer = 100;
1600
1601 std::vector<std::thread> producers;
1602 for (int p = 0; p < num_producers; ++p)
1603 {
1604 producers.emplace_back([&pool, &counter, tasks_per_producer] {
1605 for (int i = 0; i < tasks_per_producer; ++i)
1606 {
1607 try
1608 {
1609 pool.enqueue_bounded_detached([&counter] { ++counter; });
1610 }
1611 catch (const Aleph::queue_overflow_error&)
1612 {
1613 // Retry after small delay
1614 std::this_thread::sleep_for(1ms);
1615 --i;
1616 }
1617 }
1618 });
1619 }
1620
1621 for (auto& t : producers)
1622 t.join();
1623
1624 pool.wait_all();
1625
1626 EXPECT_EQ(counter.load(), num_producers * tasks_per_producer);
1627}
1628
1630{
1631 ThreadPool pool(4);
1632 pool.set_queue_limits(20, 100);
1633
1634 std::vector<std::future<int>> futures;
1635 for (int i = 0; i < 100; ++i)
1636 futures.push_back(pool.enqueue_bounded([i] { return i * i; }));
1637
1638 for (int i = 0; i < 100; ++i)
1639 EXPECT_EQ(futures[i].get(), i * i);
1640}
1641
1643{
1644 ThreadPool pool(2);
1645 pool.set_queue_limits(10, 100);
1646
1647 auto future = pool.enqueue_bounded([]() -> int {
1648 throw std::runtime_error("test");
1649 });
1650
1651 EXPECT_THROW(future.get(), std::runtime_error);
1652}
1653
1655{
1656 ThreadPool pool(4);
1657 pool.set_queue_limits(10, 50);
1658
1659 std::atomic<int> counter{0};
1660
1661 // Mix bounded and unbounded
1662 for (int i = 0; i < 100; ++i)
1663 {
1664 if (i % 2 == 0)
1665 pool.enqueue_detached([&counter] { ++counter; });
1666 else
1667 {
1668 try
1669 {
1670 pool.enqueue_bounded_detached([&counter] { ++counter; });
1671 }
1672 catch (const Aleph::queue_overflow_error&)
1673 {
1674 // Unbounded doesn't care about limits
1675 pool.enqueue_detached([&counter] { ++counter; });
1676 }
1677 }
1678 }
1679
1680 pool.wait_all();
1681 EXPECT_EQ(counter.load(), 100);
1682}
1683
1685{
1686 ThreadPool pool(8);
1687 pool.set_queue_limits(100, 500);
1688
1689 std::atomic<int> counter{0};
1690 const int num_tasks = 10000;
1691
1692 for (int i = 0; i < num_tasks; ++i)
1693 {
1694 // Keep trying until successful
1695 bool success = false;
1696 while (!success)
1697 {
1698 try
1699 {
1700 pool.enqueue_bounded_detached([&counter] { ++counter; });
1701 success = true;
1702 }
1703 catch (const Aleph::queue_overflow_error&)
1704 {
1705 std::this_thread::sleep_for(100us);
1706 }
1707 }
1708 }
1709
1710 pool.wait_all();
1711 EXPECT_EQ(counter.load(), num_tasks);
1712}
1713
1714// Note: soft_limit=0 is not a practical use case and has edge case issues,
1715// so we don't test it. Use soft_limit >= 1 for bounded queues.
1716
1718{
1719 ThreadPool pool(2);
1720
1721 auto [soft, hard] = pool.get_queue_limits();
1722
1723 EXPECT_EQ(soft, std::numeric_limits<size_t>::max());
1724 EXPECT_EQ(hard, std::numeric_limits<size_t>::max());
1725}
1726
1728{
1729 ThreadPool pool(2);
1730 pool.set_queue_limits(10, 100);
1731
1732 auto future = pool.enqueue_bounded([](int a, int b, int c) {
1733 return a + b + c;
1734 }, 10, 20, 12);
1735
1736 EXPECT_EQ(future.get(), 42);
1737}
1738
1740{
1741 ThreadPool pool(2);
1742 pool.set_queue_limits(10, 100);
1743
1744 int value = 10;
1745 auto future = pool.enqueue_bounded([](int& x) { x *= 2; return x; },
1746 std::ref(value));
1747
1748 EXPECT_EQ(future.get(), 20);
1749 EXPECT_EQ(value, 20);
1750}
1751
1752// ============================================================================
1753// Try Enqueue Tests
1754// ============================================================================
1755
1757{
1758 ThreadPool pool(2);
1759 pool.set_queue_limits(10, 100);
1760
1761 auto result = pool.try_enqueue([] { return 42; });
1762
1763 ASSERT_TRUE(result.has_value());
1764 EXPECT_EQ(result->get(), 42);
1765}
1766
1768{
1769 ThreadPool pool(1);
1770 pool.set_queue_limits(5, 100);
1771
1772 std::atomic<bool> worker_blocked{true};
1774 while (worker_blocked)
1775 std::this_thread::sleep_for(1ms);
1776 });
1777
1778 // Fill to soft limit
1779 for (int i = 0; i < 5; ++i)
1780 pool.enqueue_detached([] {});
1781
1782 // try_enqueue should return nullopt
1783 auto result = pool.try_enqueue([] { return 0; });
1784 EXPECT_FALSE(result.has_value());
1785
1786 worker_blocked = false;
1787 pool.wait_all();
1788}
1789
1791{
1792 ThreadPool pool(2);
1793 pool.set_queue_limits(10, 100);
1794 std::atomic<int> counter{0};
1795
1796 bool success = pool.try_enqueue_detached([&counter] { ++counter; });
1797
1799 pool.wait_all();
1800 EXPECT_EQ(counter.load(), 1);
1801}
1802
1804{
1805 ThreadPool pool(1);
1806 pool.set_queue_limits(3, 100);
1807
1808 std::atomic<bool> worker_blocked{true};
1810 while (worker_blocked)
1811 std::this_thread::sleep_for(1ms);
1812 });
1813
1814 // Fill to soft limit
1815 for (int i = 0; i < 3; ++i)
1816 pool.enqueue_detached([] {});
1817
1818 // try_enqueue_detached should return false
1819 bool success = pool.try_enqueue_detached([] {});
1821
1822 worker_blocked = false;
1823 pool.wait_all();
1824}
1825
1827{
1828 ThreadPool pool(2);
1829 pool.shutdown();
1830
1831 EXPECT_THROW(pool.try_enqueue([] { return 0; }), std::runtime_error);
1832 EXPECT_THROW(pool.try_enqueue_detached([] {}), std::runtime_error);
1833}
1834
1836{
1837 ThreadPool pool(2);
1838 pool.set_queue_limits(10, 100);
1839
1840 auto result = pool.try_enqueue([](int a, int b) { return a + b; }, 20, 22);
1841
1842 ASSERT_TRUE(result.has_value());
1843 EXPECT_EQ(result->get(), 42);
1844}
1845
1847{
1848 ThreadPool pool(1);
1849 pool.set_queue_limits(2, 100);
1850
1851 std::atomic<bool> worker_blocked{true};
1853 while (worker_blocked)
1854 std::this_thread::sleep_for(1ms);
1855 });
1856
1857 // Fill queue
1858 pool.enqueue_detached([] {});
1859 pool.enqueue_detached([] {});
1860
1861 // Measure time for try_enqueue when queue is full
1862 auto start = std::chrono::high_resolution_clock::now();
1863 for (int i = 0; i < 1000; ++i)
1864 pool.try_enqueue([] { return 0; }); // Should all fail fast
1865 auto end = std::chrono::high_resolution_clock::now();
1866
1867 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
1868
1869 // 1000 non-blocking calls should complete in < 10ms
1870 EXPECT_LT(duration.count(), 10000);
1871
1872 worker_blocked = false;
1873 pool.wait_all();
1874}
1875
1876// ============================================================================
1877// Benchmark Tests
1878// ============================================================================
1879
1881{
1882 const int num_tasks = 1000;
1883 const auto work_duration = 100us;
1884
1885 auto do_work = [work_duration] {
1886 auto start = std::chrono::high_resolution_clock::now();
1887 while (std::chrono::high_resolution_clock::now() - start < work_duration)
1888 ; // Busy wait
1889 return 1;
1890 };
1891
1892 // Sequential baseline
1893 auto seq_start = std::chrono::high_resolution_clock::now();
1894 int seq_sum = 0;
1895 for (int i = 0; i < num_tasks; ++i)
1896 seq_sum += do_work();
1897 auto seq_end = std::chrono::high_resolution_clock::now();
1898 auto seq_duration = std::chrono::duration_cast<std::chrono::milliseconds>(seq_end - seq_start);
1899
1900 // ThreadPool parallel
1901 size_t num_threads = std::thread::hardware_concurrency();
1902 ThreadPool pool(num_threads);
1903
1904 auto pool_start = std::chrono::high_resolution_clock::now();
1905 std::vector<std::future<int>> futures;
1906 futures.reserve(num_tasks);
1907 for (int i = 0; i < num_tasks; ++i)
1908 futures.push_back(pool.enqueue(do_work));
1909
1910 int pool_sum = 0;
1911 for (auto& f : futures)
1912 pool_sum += f.get();
1913 auto pool_end = std::chrono::high_resolution_clock::now();
1914 auto pool_duration = std::chrono::duration_cast<std::chrono::milliseconds>(pool_end - pool_start);
1915
1917
1918 double speedup = static_cast<double>(seq_duration.count()) / pool_duration.count();
1919
1920 std::cout << "\n=== Benchmark: ThreadPool vs Sequential ===\n";
1921 std::cout << "Tasks: " << num_tasks << ", Work per task: 100μs\n";
1922 std::cout << "Threads: " << num_threads << "\n";
1923 std::cout << "Sequential: " << seq_duration.count() << " ms\n";
1924 std::cout << "ThreadPool: " << pool_duration.count() << " ms\n";
1925 std::cout << "Speedup: " << std::fixed << std::setprecision(2) << speedup << "x\n";
1926 std::cout << "============================================\n\n";
1927
1928 // Should achieve at least some speedup with multiple cores
1929 if (num_threads > 1)
1930 EXPECT_GT(speedup, 1.2);
1931}
1932
1934{
1935 ThreadPool pool(4);
1936 const int num_tasks = 100000;
1937
1938 // Measure enqueue overhead (tasks do nothing)
1939 auto start = std::chrono::high_resolution_clock::now();
1940 for (int i = 0; i < num_tasks; ++i)
1941 pool.enqueue_detached([] {});
1942 pool.wait_all();
1943 auto end = std::chrono::high_resolution_clock::now();
1944
1945 auto total_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
1946 double ns_per_task = static_cast<double>(total_ns) / num_tasks;
1947
1948 std::cout << "\n=== Benchmark: Enqueue Overhead ===\n";
1949 std::cout << "Tasks: " << num_tasks << "\n";
1950 std::cout << "Total time: " << total_ns / 1000000.0 << " ms\n";
1951 std::cout << "Per task: " << std::fixed << std::setprecision(0) << ns_per_task << " ns\n";
1952 std::cout << "Throughput: " << std::fixed << std::setprecision(0)
1953 << (num_tasks * 1e9 / total_ns) << " tasks/sec\n";
1954 std::cout << "===================================\n\n";
1955
1956 // Should be able to enqueue at least 50k tasks/sec
1957 // (relaxed for slower CI environments)
1958 EXPECT_LT(ns_per_task, 20000); // < 20μs per task
1959}
1960
1962{
1963 const int num_tasks = 500; // Fewer tasks because async is slow
1964
1965 auto compute = [](int x) -> long long {
1966 long long sum = 0;
1967 for (int i = 0; i < 1000; ++i)
1968 sum += static_cast<long long>(i) * x;
1969 return sum;
1970 };
1971
1972 // std::async
1973 auto async_start = std::chrono::high_resolution_clock::now();
1974 std::vector<std::future<long long>> async_futures;
1975 async_futures.reserve(num_tasks);
1976 for (int i = 0; i < num_tasks; ++i)
1977 async_futures.push_back(std::async(std::launch::async, compute, i));
1978
1979 long long async_sum = 0;
1980 for (auto& f : async_futures)
1981 async_sum += f.get();
1982 auto async_end = std::chrono::high_resolution_clock::now();
1983 auto async_duration = std::chrono::duration_cast<std::chrono::microseconds>(async_end - async_start);
1984
1985 // ThreadPool
1986 ThreadPool pool(std::thread::hardware_concurrency());
1987
1988 auto pool_start = std::chrono::high_resolution_clock::now();
1989 std::vector<std::future<long long>> pool_futures;
1990 pool_futures.reserve(num_tasks);
1991 for (int i = 0; i < num_tasks; ++i)
1992 pool_futures.push_back(pool.enqueue(compute, i));
1993
1994 long long pool_sum = 0;
1995 for (auto& f : pool_futures)
1996 pool_sum += f.get();
1997 auto pool_end = std::chrono::high_resolution_clock::now();
1998 auto pool_duration = std::chrono::duration_cast<std::chrono::microseconds>(pool_end - pool_start);
1999
2001
2002 double speedup = static_cast<double>(async_duration.count()) / pool_duration.count();
2003
2004 std::cout << "\n=== Benchmark: ThreadPool vs std::async ===\n";
2005 std::cout << "Tasks: " << num_tasks << "\n";
2006 std::cout << "std::async: " << async_duration.count() / 1000.0 << " ms\n";
2007 std::cout << "ThreadPool: " << pool_duration.count() / 1000.0 << " ms\n";
2008 std::cout << "Speedup: " << std::fixed << std::setprecision(2) << speedup << "x\n";
2009 std::cout << "============================================\n\n";
2010
2011 // ThreadPool should be faster than creating new threads each time
2012 EXPECT_GT(speedup, 1.0);
2013}
2014
2015// =============================================================================
2016// New Features Tests
2017// =============================================================================
2018
2019// --- Statistics Tests ---
2020
2022{
2023 ThreadPool pool(2);
2024 auto stats = pool.get_stats();
2025
2026 EXPECT_EQ(stats.tasks_completed, 0u);
2027 EXPECT_EQ(stats.tasks_failed, 0u);
2028 EXPECT_EQ(stats.current_queue_size, 0u);
2029 EXPECT_EQ(stats.current_active, 0u);
2030 EXPECT_EQ(stats.num_workers, 2u);
2031 EXPECT_EQ(stats.peak_queue_size, 0u);
2032 EXPECT_EQ(stats.total_processed(), 0u);
2033}
2034
2036{
2037 ThreadPool pool(2);
2038
2039 std::vector<std::future<int>> futures;
2040 for (int i = 0; i < 10; ++i)
2041 futures.push_back(pool.enqueue([] { return 42; }));
2042
2043 for (auto& f : futures)
2044 f.get();
2045
2046 pool.wait_all();
2047
2048 auto stats = pool.get_stats();
2049 EXPECT_EQ(stats.tasks_completed, 10u);
2050 EXPECT_EQ(stats.total_processed(), 10u);
2051}
2052
2054{
2055 ThreadPool pool(1);
2056
2057 // Block the worker
2058 std::promise<void> blocker;
2059 pool.enqueue([&blocker] { blocker.get_future().wait(); });
2060
2061 // Queue up several tasks
2062 for (int i = 0; i < 5; ++i)
2063 pool.enqueue_detached([] {});
2064
2065 std::this_thread::sleep_for(std::chrono::milliseconds(10));
2066
2067 auto stats = pool.get_stats();
2068 EXPECT_GE(stats.peak_queue_size, 5u);
2069
2070 // Release worker
2071 blocker.set_value();
2072 pool.wait_all();
2073}
2074
2076{
2077 ThreadPool pool(2);
2078
2079 pool.enqueue([] { return 1; }).get();
2080 // Wait for stats to be updated (tasks_completed incremented after future is set)
2081 pool.wait_all();
2082
2083 auto stats1 = pool.get_stats();
2084 EXPECT_GE(stats1.tasks_completed, 1u);
2085
2086 pool.reset_stats();
2087
2088 auto stats2 = pool.get_stats();
2089 EXPECT_EQ(stats2.tasks_completed, 0u);
2090 EXPECT_EQ(stats2.peak_queue_size, 0u);
2091}
2092
2093// --- Exception Callback Tests ---
2094
2096{
2097 ThreadPool pool(2);
2098
2099 std::atomic<bool> callback_called{false};
2100 std::atomic<bool> correct_exception{false};
2101
2102 pool.set_exception_callback([&](std::exception_ptr ep) {
2103 callback_called = true;
2104 try {
2105 std::rethrow_exception(ep);
2106 } catch (const std::runtime_error& e) {
2107 correct_exception = (std::string(e.what()) == "test error");
2108 } catch (...) {
2109 correct_exception = false;
2110 }
2111 });
2112
2113 pool.enqueue_detached([] {
2114 throw std::runtime_error("test error");
2115 });
2116
2117 pool.wait_all();
2118 std::this_thread::sleep_for(std::chrono::milliseconds(50));
2119
2122}
2123
2125{
2126 ThreadPool pool(2);
2127
2128 std::atomic<int> callback_count{0};
2129
2130 pool.set_exception_callback([&](std::exception_ptr) {
2132 });
2133
2134 pool.enqueue_detached([] { throw std::runtime_error("error"); });
2135 pool.wait_all();
2136 std::this_thread::sleep_for(std::chrono::milliseconds(20));
2137
2138 EXPECT_EQ(callback_count.load(), 1);
2139
2140 // Clear callback
2141 pool.set_exception_callback(nullptr);
2142
2143 pool.enqueue_detached([] { throw std::runtime_error("error"); });
2144 pool.wait_all();
2145 std::this_thread::sleep_for(std::chrono::milliseconds(20));
2146
2147 // Count should still be 1
2148 EXPECT_EQ(callback_count.load(), 1);
2149}
2150
2152{
2153 ThreadPool pool(2);
2154
2155 pool.enqueue_detached([] { throw std::runtime_error("fail"); });
2156 pool.enqueue_detached([] { throw std::logic_error("fail"); });
2157 pool.enqueue_detached([] { /* success */ });
2158
2159 pool.wait_all();
2160 std::this_thread::sleep_for(std::chrono::milliseconds(20));
2161
2162 auto stats = pool.get_stats();
2163 EXPECT_EQ(stats.tasks_failed, 2u);
2164 EXPECT_EQ(stats.tasks_completed, 1u);
2165 EXPECT_EQ(stats.total_processed(), 3u);
2166}
2167
2168// --- wait_all_for/wait_all_until Tests ---
2169
2171{
2172 ThreadPool pool(2);
2173
2174 pool.enqueue([] { return 42; }).get();
2175
2176 bool result = pool.wait_all_for(std::chrono::milliseconds(100));
2177 EXPECT_TRUE(result);
2178}
2179
2181{
2182 ThreadPool pool(1);
2183
2184 // Start a long task
2185 pool.enqueue_detached([] {
2186 std::this_thread::sleep_for(std::chrono::milliseconds(500));
2187 });
2188
2189 std::this_thread::sleep_for(std::chrono::milliseconds(10));
2190
2191 bool result = pool.wait_all_for(std::chrono::milliseconds(50));
2192 EXPECT_FALSE(result);
2193
2194 // Wait for cleanup
2195 pool.wait_all();
2196}
2197
2199{
2200 ThreadPool pool(2);
2201
2202 pool.enqueue([] { return 1; }).get();
2203
2204 auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(100);
2205 bool result = pool.wait_all_until(deadline);
2206 EXPECT_TRUE(result);
2207}
2208
2209// --- enqueue_batch Tests ---
2210
2212{
2213 ThreadPool pool(4);
2214
2215 std::vector<std::tuple<int, int>> args = {
2216 {1, 2}, {3, 4}, {5, 6}, {7, 8}
2217 };
2218
2219 auto futures = pool.enqueue_batch([](int a, int b) { return a + b; }, args);
2220
2221 EXPECT_EQ(futures.size(), 4u);
2222
2223 std::vector<int> results;
2224 for (auto& f : futures)
2225 results.push_back(f.get());
2226
2227 EXPECT_EQ(results, (std::vector<int>{3, 7, 11, 15}));
2228}
2229
2231{
2232 ThreadPool pool(4);
2233
2234 std::vector<std::tuple<int>> args;
2235 for (int i = 0; i < 100; ++i)
2236 args.emplace_back(i);
2237
2238 auto futures = pool.enqueue_batch([](int x) { return x * x; }, args);
2239
2240 int sum = 0;
2241 for (auto& f : futures)
2242 sum += f.get();
2243
2244 // Sum of squares 0^2 + 1^2 + ... + 99^2 = 328350
2245 EXPECT_EQ(sum, 328350);
2246}
2247
2249{
2250 ThreadPool pool(2);
2251
2252 std::vector<std::tuple<int>> empty;
2253 auto futures = pool.enqueue_batch([](int x) { return x; }, empty);
2254
2256}
2257
2258// --- parallel_for Tests ---
2259
2261{
2262 ThreadPool pool(4);
2263
2264 std::vector<int> data(100, 1);
2265
2266 parallel_for(pool, data.begin(), data.end(), [](int& x) { x *= 2; });
2267
2268 for (int x : data)
2269 EXPECT_EQ(x, 2);
2270}
2271
2273{
2274 ThreadPool pool(4);
2275
2276 std::vector<int> data(100, 1);
2277
2278 parallel_for(pool, data.begin(), data.end(),
2279 [](auto begin, auto end) {
2280 for (auto it = begin; it != end; ++it)
2281 *it = 5;
2282 });
2283
2284 for (int x : data)
2285 EXPECT_EQ(x, 5);
2286}
2287
2289{
2290 ThreadPool pool(2);
2291
2292 std::vector<int> empty;
2293
2294 // Should not throw or crash
2295 parallel_for(pool, empty.begin(), empty.end(), [](int& x) { x = 0; });
2296
2297 EXPECT_TRUE(empty.empty());
2298}
2299
2301{
2302 ThreadPool pool(2);
2303
2304 std::vector<std::atomic<int>> data(50);
2305 for (auto& x : data)
2306 x = 0;
2307
2308 parallel_for(pool, data.begin(), data.end(),
2309 [](std::atomic<int>& x) { ++x; },
2310 10); // chunk size = 10
2311
2312 for (const auto& x : data)
2313 EXPECT_EQ(x.load(), 1);
2314}
2315
2316// --- parallel_for_index Tests ---
2317
2319{
2320 ThreadPool pool(4);
2321
2322 std::vector<int> data(100, 0);
2323
2324 parallel_for_index(pool, 0, data.size(), [&data](size_t i) {
2325 data[i] = static_cast<int>(i * 2);
2326 });
2327
2328 for (size_t i = 0; i < data.size(); ++i)
2329 EXPECT_EQ(data[i], static_cast<int>(i * 2));
2330}
2331
2333{
2334 ThreadPool pool(2);
2335
2336 // Should not throw
2337 parallel_for_index(pool, 10, 10, [](size_t) { /* nothing */ });
2338 parallel_for_index(pool, 10, 5, [](size_t) { /* nothing */ });
2339}
2340
2341// --- parallel_transform Tests ---
2342
2344{
2345 ThreadPool pool(4);
2346
2347 std::vector<int> input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
2348 std::vector<int> output(10);
2349
2351 [](int x) { return x * x; });
2352
2353 EXPECT_EQ(output, (std::vector<int>{1, 4, 9, 16, 25, 36, 49, 64, 81, 100}));
2354}
2355
2357{
2358 ThreadPool pool(4);
2359
2360 std::vector<int> input(1000);
2361 std::iota(input.begin(), input.end(), 0);
2362
2363 std::vector<int> output(1000);
2364
2366 [](int x) { return x + 1; });
2367
2368 for (int i = 0; i < 1000; ++i)
2369 EXPECT_EQ(output[i], i + 1);
2370}
2371
2373{
2374 ThreadPool pool(2);
2375
2376 std::vector<int> input;
2377 std::vector<int> output;
2378
2379 auto end = parallel_transform(pool, input.begin(), input.end(),
2380 output.begin(), [](int x) { return x; });
2381
2382 EXPECT_EQ(end, output.begin());
2383}
2384
2385// --- parallel_reduce Tests ---
2386
2388{
2389 ThreadPool pool(4);
2390
2391 std::vector<int> data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
2392
2393 int sum = parallel_reduce(pool, data.begin(), data.end(), 0, std::plus<int>());
2394
2395 EXPECT_EQ(sum, 55);
2396}
2397
2399{
2400 ThreadPool pool(4);
2401
2402 std::vector<int> data = {1, 2, 3, 4, 5};
2403
2404 int product = parallel_reduce(pool, data.begin(), data.end(), 1,
2405 std::multiplies<int>());
2406
2407 EXPECT_EQ(product, 120);
2408}
2409
2411{
2412 ThreadPool pool(4);
2413
2414 std::vector<int> data = {3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5};
2415
2416 int max_val = parallel_reduce(pool, data.begin(), data.end(),
2417 std::numeric_limits<int>::min(),
2418 [](int a, int b) { return std::max(a, b); });
2419
2420 EXPECT_EQ(max_val, 9);
2421}
2422
2424{
2425 ThreadPool pool(4);
2426
2427 std::vector<long long> data(10000);
2428 std::iota(data.begin(), data.end(), 1LL);
2429
2430 long long sum = parallel_reduce(pool, data.begin(), data.end(), 0LL,
2431 std::plus<long long>());
2432
2433 // Sum 1+2+...+10000 = 10000*10001/2 = 50005000
2434 EXPECT_EQ(sum, 50005000LL);
2435}
2436
2438{
2439 ThreadPool pool(2);
2440
2441 std::vector<int> empty;
2442
2443 int result = parallel_reduce(pool, empty.begin(), empty.end(), 42,
2444 std::plus<int>());
2445
2446 EXPECT_EQ(result, 42); // Returns init for empty range
2447}
2448
2449// --- ThreadPoolStats Tests ---
2450
2452{
2453 ThreadPoolStats stats;
2454 stats.current_queue_size = 50;
2455
2456 EXPECT_NEAR(stats.queue_utilization(100), 50.0, 0.01);
2457 EXPECT_NEAR(stats.queue_utilization(200), 25.0, 0.01);
2458 EXPECT_NEAR(stats.queue_utilization(0), 0.0, 0.01); // Edge case
2459 EXPECT_NEAR(stats.queue_utilization(std::numeric_limits<size_t>::max()), 0.0, 0.01);
2460}
2461
2462int main(int argc, char **argv)
2463{
2464 ::testing::InitGoogleTest(&argc, argv);
2465 return RUN_ALL_TESTS();
2466}
2467
int main()
long double h
Definition btreepic.C:154
T & push(const T &item)
Definition htlist.H:1523
void empty() noexcept
empty the list
Definition htlist.H:1689
size_t size() const noexcept
Count the number of elements of the list.
Definition htlist.H:1319
A reusable thread pool for efficient parallel task execution.
bool try_enqueue_detached(F &&f, Args &&... args)
Try to submit a detached task without blocking or throwing.
size_t num_threads() const noexcept
Get the number of worker threads.
bool wait_all_for(std::chrono::duration< Rep, Period > timeout)
Wait until all current tasks complete, with timeout.
void enqueue_detached(F &&f, Args &&... args)
Submit a task without tracking the result (fire-and-forget).
void set_exception_callback(ExceptionCallback callback)
Set callback for exceptions in detached tasks.
auto enqueue_batch(F &&f, const Container &args_list) -> std::vector< std::future< decltype(std::apply(f, *std::begin(args_list)))> >
Submit multiple tasks with a single notification.
bool is_idle() const
Check if the pool is idle (no pending or running tasks).
auto enqueue_bounded(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task with backpressure and memory protection.
void wait_all(std::chrono::milliseconds poll_interval=std::chrono::milliseconds(1))
Wait until all current tasks complete.
auto enqueue_bulk(F &&f, const Container &args_container) -> std::vector< std::future< std::invoke_result_t< F, typename Container::value_type > > >
Submit multiple tasks and collect all futures.
void resize(size_t new_size)
Resize the pool to a different number of workers.
ThreadPoolStats get_stats() const
Get current pool statistics.
void reset_stats()
Reset statistics counters to zero.
bool wait_all_until(std::chrono::time_point< Clock, Duration > deadline)
Wait until all current tasks complete, with deadline.
void set_queue_limits(size_t soft_limit, size_t hard_limit=std::numeric_limits< size_t >::max())
Set queue limits for bounded enqueue operations.
bool is_stopped() const noexcept
Check if the pool has been shut down.
std::pair< size_t, size_t > get_queue_limits() const
Get current queue limits.
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task for execution and get a future for the result.
void enqueue_bounded_detached(F &&f, Args &&... args)
Submit a task with backpressure, without tracking result.
size_t pending_tasks() const
Get the number of pending tasks in the queue.
void shutdown()
Shut down the pool, completing all pending tasks.
auto try_enqueue(F &&f, Args &&... args) -> std::optional< std::future< std::invoke_result_t< F, Args... > > >
Try to submit a task without blocking or throwing.
Exception thrown when the task queue exceeds its hard limit.
size_t current_size() const noexcept
Current queue size when exception was thrown.
size_t hard_limit() const noexcept
Hard limit that was exceeded.
iterator end() noexcept
Return an STL-compatible end iterator.
iterator begin() noexcept
Return an STL-compatible iterator to the first element.
void SetUp() override
void TearDown() override
#define FAIL(msg)
static mpfr_t y
Definition mpfr_mul_d.c:3
Main namespace for Aleph-w library functions.
Definition ah-arena.H:89
T parallel_reduce(ThreadPool &pool, Iterator first, Iterator last, T init, BinaryOp op, size_t chunk_size=0)
Reduce elements in parallel.
OutputIt parallel_transform(ThreadPool &pool, InputIt first, InputIt last, OutputIt d_first, F &&f, size_t chunk_size=0)
Transform elements in parallel and store results.
ThreadPool & default_pool()
Global default thread pool.
bool completed() const noexcept
Return true if all underlying iterators are finished.
Definition ah-zip.H:140
void parallel_for_index(ThreadPool &pool, size_t start, size_t end, F &&f, size_t chunk_size=0)
Apply a function to each element in parallel (index-based).
void parallel_for(ThreadPool &pool, Iterator begin, Iterator end, F &&f, size_t chunk_size=0)
Execute a function in parallel over a range.
T product(const Container &container, const T &init=T{1})
Compute product of all elements.
DynList< T > maps(const C &c, Op op)
Classic map operation.
T sum(const Container &container, const T &init=T{})
Compute sum of all elements.
Basic arithmetic operations for the calculator.
double add(double a, double b)
#define LL
Definition ran_array.c:24
Statistics collected by ThreadPool.
double queue_utilization(size_t soft_limit) const noexcept
Queue utilization as percentage (0-100)
size_t current_queue_size
Current pending tasks.
static int square(int x)
int multiply(int a, int b) const
int operator()(int x) const
static StlIterator begin(SetType &s)
Create an iterator positioned at the first element of the container.
A modern, efficient thread pool for parallel task execution.
int free_function(int x)
void increment_ref(int &x)
TEST_F(ThreadPoolTest, DefaultConstruction)
void add_to_ref(int &x, int amount)
ofstream output
Definition writeHeap.C:213