40#include <gtest/gtest.h>
50using namespace std::chrono_literals;
96 auto future = pool.
enqueue([](
int a,
int b) {
return a + b; }, 10, 20);
114 std::atomic<bool> executed{
false};
116 auto future = pool.
enqueue([&executed] { executed =
true; });
126 std::vector<std::future<int>>
futures;
142 std::atomic<bool> block{
true};
145 pool.
enqueue_detached([&block] {
while (block) std::this_thread::yield(); });
148 for (
int i = 0; i < 5; ++i)
165 auto f1 = pool.
enqueue([] {
return 1; });
166 auto f2 = pool.
enqueue([] {
return 2; });
182 std::atomic<int> counter{0};
184 for (
int i = 0; i < 10; ++i)
256 std::atomic<bool> block{
true};
260 pool.
enqueue_detached([&block] {
while (block) std::this_thread::yield(); });
263 for (
int i = 0; i < 5; ++i)
285 throw std::runtime_error(
"test exception");
297 throw std::runtime_error(
"test");
301 auto f2 = pool.
enqueue([] {
return 42; });
314 std::atomic<int> counter{0};
341 std::vector<std::future<void>>
futures;
342 for (
int i = 0; i < 100; ++i)
345 int current = ++concurrent_count;
348 int prev_max = max_concurrent.load();
349 while (current > prev_max &&
350 !max_concurrent.compare_exchange_weak(prev_max, current))
353 std::this_thread::sleep_for(1ms);
371 ThreadPool pool(std::thread::hardware_concurrency());
373 std::atomic<int>
sum{0};
375 auto start = std::chrono::high_resolution_clock::now();
377 std::vector<std::future<void>>
futures;
386 auto end = std::chrono::high_resolution_clock::now();
387 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
399 ThreadPool pool(std::thread::hardware_concurrency());
403 auto compute = [](
int n) {
405 for (
int i = 0; i < n; ++i)
406 sum += std::sqrt(
static_cast<double>(i));
410 std::vector<std::future<double>>
futures;
418 const auto start = std::chrono::steady_clock::now();
420 std::chrono::steady_clock::now() - start < 1s)
422 std::this_thread::sleep_for(1
ms);
436 auto future = pool.
enqueue([] {
return std::string(
"hello"); });
446 return std::vector<int>{1, 2, 3, 4, 5};
459 return std::make_pair(a + b, a * b);
501 std::function<
int(
int)> func = [](
int x) {
return x * x; };
516 int add(
int x) {
return value + x; }
518 static int square(
int x) {
return x * x; }
571 auto ptr = std::make_unique<int>(42);
572 auto future = pool.
enqueue([p = std::move(ptr)]() {
return *p; });
581 auto ptr = std::make_unique<int>(100);
582 auto future = pool.
enqueue([](std::unique_ptr<int> p) {
return *p * 2; },
594 std::unique_ptr<int> data;
601 int operator()() {
return *data; }
616 std::atomic<int> counter{0};
618 for (
int i = 0; i < 10; ++i)
629 std::atomic<int>
sum{0};
631 for (
int i = 1; i <= 5; ++i)
642 std::atomic<int> counter{0};
668 std::vector<int>
inputs = {1, 2, 3, 4, 5};
683 std::vector<std::string>
inputs = {
"hello",
"world",
"test"};
698 std::vector<int> empty;
764 const int value = 77;
766 auto future = pool.
enqueue([](
const int& x) {
return x * 2; }, std::cref(value));
776 auto future = pool.
enqueue([&value](
int x) { value = x; }, 42);
800 std::atomic<int> counter{0};
802 for (
int i = 0; i < 10; ++i)
804 std::this_thread::sleep_for(10ms);
818 auto start = std::chrono::high_resolution_clock::now();
820 auto end = std::chrono::high_resolution_clock::now();
822 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
832 ThreadPool pool(std::thread::hardware_concurrency());
834 std::atomic<int> counter{0};
849 std::atomic<int> counter{0};
875 for (
int i = 0; i < 100; ++i)
879 std::this_thread::sleep_for(1
ms);
901 for (
int i = 0; i < 10; ++i)
904 std::lock_guard<std::mutex> lock(mtx);
918 for (
int i = 0; i < 1000; ++i)
931 std::vector<int>
large(10000);
947 return pool.
enqueue([] {
return 42; });
957 std::atomic<int> counter{0};
968 std::this_thread::sleep_for(100
ms);
978 auto f1 = pool.
enqueue([]() ->
int {
throw std::runtime_error(
"runtime"); });
979 auto f2 = pool.
enqueue([]() ->
int {
throw std::logic_error(
"logic"); });
980 auto f3 = pool.
enqueue([]() ->
int {
throw std::out_of_range(
"range"); });
981 auto f4 = pool.
enqueue([] {
return 42; });
994 std::vector<std::future<int>>
futures;
995 for (
int i = 0; i < 100; ++i)
998 throw std::runtime_error(
"test");
1008 catch (
const std::runtime_error&)
1029 for (
int i = 0; i < 4; ++i)
1034 std::this_thread::sleep_for(1
ms);
1041 std::this_thread::sleep_for(1
ms);
1054 std::atomic<int> counter{0};
1058 for (
int i = 0; i < 10; ++i)
1061 std::this_thread::sleep_for(10
ms);
1074 std::atomic<int> counter{0};
1089 catch (
const std::runtime_error&)
1094 std::this_thread::sleep_for(1
ms);
1099 std::this_thread::sleep_for(20
ms);
1103 std::this_thread::sleep_for(20
ms);
1107 std::this_thread::sleep_for(20
ms);
1124 std::vector<std::future<int>>
futures;
1139 std::atomic<long long>
sum{0};
1162 std::lock_guard<std::mutex> lock(mtx);
1192 auto start_single = std::chrono::high_resolution_clock::now();
1196 auto end_single = std::chrono::high_resolution_clock::now();
1200 size_t num_threads = std::min(8u, std::thread::hardware_concurrency());
1206 auto end_parallel = std::chrono::high_resolution_clock::now();
1219 auto start = std::chrono::high_resolution_clock::now();
1221 std::vector<std::future<int>>
futures;
1229 auto end = std::chrono::high_resolution_clock::now();
1230 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
1244 auto outer = [](
int x) {
1262 auto add = [](
int a,
int b,
int c) {
return a + b + c; };
1263 auto bound = std::bind(add, 10, std::placeholders::_1, 20);
1275 auto generic_add = [](
auto a,
auto b) {
return a + b; };
1291 return std::accumulate(data.begin(), data.end(), 0);
1305 auto sum_all = [](
int a,
int b,
int c,
int d,
int e,
1306 int f,
int g,
int h,
int i,
int j) {
1307 return a + b + c + d + e + f + g +
h + i + j;
1310 auto future = pool.
enqueue(
sum_all, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
1323 std::mutex queue_mutex;
1324 std::condition_variable cv;
1325 std::atomic<bool>
done{
false};
1330 for (
int i = 0; i < 100; ++i)
1333 std::lock_guard<std::mutex> lock(queue_mutex);
1337 std::this_thread::sleep_for(1
ms);
1344 for (
int c = 0; c < 3; ++c)
1350 std::unique_lock<std::mutex> lock(queue_mutex);
1400 std::vector<std::future<int>>
futures;
1401 for (
int i = 0; i < 50; ++i)
1404 for (
int i = 0; i < 50; ++i)
1419 std::this_thread::sleep_for(1
ms);
1423 for (
int i = 0; i < 5; ++i)
1438 std::this_thread::sleep_for(50
ms);
1461 std::this_thread::sleep_for(1
ms);
1465 for (
int i = 0; i < 15; ++i)
1484 std::this_thread::sleep_for(1
ms);
1488 for (
int i = 0; i < 10; ++i)
1494 FAIL() <<
"Expected queue_overflow_error";
1500 EXPECT_NE(std::string(e.what()).find(
"overflow"), std::string::npos);
1515 std::this_thread::sleep_for(1
ms);
1519 for (
int i = 0; i < 3; ++i)
1523 std::atomic<bool>
done{
false};
1524 std::thread t([&pool, &
done] {
1529 std::this_thread::sleep_for(30
ms);
1560 std::this_thread::sleep_for(5
ms);
1582 std::this_thread::sleep_for(200
ms);
1597 std::atomic<int> counter{0};
1614 std::this_thread::sleep_for(1
ms);
1634 std::vector<std::future<int>>
futures;
1635 for (
int i = 0; i < 100; ++i)
1638 for (
int i = 0; i < 100; ++i)
1648 throw std::runtime_error(
"test");
1659 std::atomic<int> counter{0};
1662 for (
int i = 0; i < 100; ++i)
1689 std::atomic<int> counter{0};
1705 std::this_thread::sleep_for(100us);
1761 auto result = pool.
try_enqueue([] {
return 42; });
1775 std::this_thread::sleep_for(1
ms);
1779 for (
int i = 0; i < 5; ++i)
1794 std::atomic<int> counter{0};
1811 std::this_thread::sleep_for(1
ms);
1815 for (
int i = 0; i < 3; ++i)
1840 auto result = pool.
try_enqueue([](
int a,
int b) {
return a + b; }, 20, 22);
1854 std::this_thread::sleep_for(1
ms);
1862 auto start = std::chrono::high_resolution_clock::now();
1863 for (
int i = 0; i < 1000; ++i)
1865 auto end = std::chrono::high_resolution_clock::now();
1867 auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
1886 auto start = std::chrono::high_resolution_clock::now();
1887 while (std::chrono::high_resolution_clock::now() - start <
work_duration)
1893 auto seq_start = std::chrono::high_resolution_clock::now();
1897 auto seq_end = std::chrono::high_resolution_clock::now();
1901 size_t num_threads = std::thread::hardware_concurrency();
1904 auto pool_start = std::chrono::high_resolution_clock::now();
1905 std::vector<std::future<int>>
futures;
1913 auto pool_end = std::chrono::high_resolution_clock::now();
1920 std::cout <<
"\n=== Benchmark: ThreadPool vs Sequential ===\n";
1921 std::cout <<
"Tasks: " <<
num_tasks <<
", Work per task: 100μs\n";
1922 std::cout <<
"Threads: " << num_threads <<
"\n";
1923 std::cout <<
"Sequential: " <<
seq_duration.count() <<
" ms\n";
1924 std::cout <<
"ThreadPool: " <<
pool_duration.count() <<
" ms\n";
1925 std::cout <<
"Speedup: " << std::fixed << std::setprecision(2) <<
speedup <<
"x\n";
1926 std::cout <<
"============================================\n\n";
1929 if (num_threads > 1)
1939 auto start = std::chrono::high_resolution_clock::now();
1943 auto end = std::chrono::high_resolution_clock::now();
1945 auto total_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
1948 std::cout <<
"\n=== Benchmark: Enqueue Overhead ===\n";
1949 std::cout <<
"Tasks: " <<
num_tasks <<
"\n";
1950 std::cout <<
"Total time: " <<
total_ns / 1000000.0 <<
" ms\n";
1951 std::cout <<
"Per task: " << std::fixed << std::setprecision(0) <<
ns_per_task <<
" ns\n";
1952 std::cout <<
"Throughput: " << std::fixed << std::setprecision(0)
1954 std::cout <<
"===================================\n\n";
1965 auto compute = [](
int x) ->
long long {
1967 for (
int i = 0; i < 1000; ++i)
1968 sum +=
static_cast<long long>(i) * x;
1973 auto async_start = std::chrono::high_resolution_clock::now();
1977 async_futures.push_back(std::async(std::launch::async, compute, i));
1982 auto async_end = std::chrono::high_resolution_clock::now();
1986 ThreadPool pool(std::thread::hardware_concurrency());
1988 auto pool_start = std::chrono::high_resolution_clock::now();
1997 auto pool_end = std::chrono::high_resolution_clock::now();
2004 std::cout <<
"\n=== Benchmark: ThreadPool vs std::async ===\n";
2005 std::cout <<
"Tasks: " <<
num_tasks <<
"\n";
2006 std::cout <<
"std::async: " <<
async_duration.count() / 1000.0 <<
" ms\n";
2007 std::cout <<
"ThreadPool: " <<
pool_duration.count() / 1000.0 <<
" ms\n";
2008 std::cout <<
"Speedup: " << std::fixed << std::setprecision(2) <<
speedup <<
"x\n";
2009 std::cout <<
"============================================\n\n";
2028 EXPECT_EQ(stats.current_queue_size, 0u);
2039 std::vector<std::future<int>>
futures;
2040 for (
int i = 0; i < 10; ++i)
2050 EXPECT_EQ(stats.total_processed(), 10u);
2062 for (
int i = 0; i < 5; ++i)
2065 std::this_thread::sleep_for(std::chrono::milliseconds(10));
2079 pool.
enqueue([] {
return 1; }).get();
2105 std::rethrow_exception(
ep);
2106 }
catch (
const std::runtime_error& e) {
2114 throw std::runtime_error(
"test error");
2118 std::this_thread::sleep_for(std::chrono::milliseconds(50));
2136 std::this_thread::sleep_for(std::chrono::milliseconds(20));
2145 std::this_thread::sleep_for(std::chrono::milliseconds(20));
2160 std::this_thread::sleep_for(std::chrono::milliseconds(20));
2174 pool.
enqueue([] {
return 42; }).get();
2176 bool result = pool.
wait_all_for(std::chrono::milliseconds(100));
2186 std::this_thread::sleep_for(std::chrono::milliseconds(500));
2189 std::this_thread::sleep_for(std::chrono::milliseconds(10));
2191 bool result = pool.
wait_all_for(std::chrono::milliseconds(50));
2202 pool.
enqueue([] {
return 1; }).get();
2204 auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(100);
2215 std::vector<std::tuple<int, int>>
args = {
2216 {1, 2}, {3, 4}, {5, 6}, {7, 8}
2234 std::vector<std::tuple<int>>
args;
2235 for (
int i = 0; i < 100; ++i)
2236 args.emplace_back(i);
2252 std::vector<std::tuple<int>> empty;
2264 std::vector<int> data(100, 1);
2266 parallel_for(pool, data.begin(), data.end(), [](
int& x) { x *= 2; });
2276 std::vector<int> data(100, 1);
2279 [](
auto begin,
auto end) {
2280 for (auto it = begin; it != end; ++it)
2292 std::vector<int> empty;
2295 parallel_for(pool, empty.begin(), empty.end(), [](
int& x) { x = 0; });
2304 std::vector<std::atomic<int>> data(50);
2305 for (
auto& x : data)
2309 [](std::atomic<int>& x) { ++x; },
2312 for (
const auto& x : data)
2322 std::vector<int> data(100, 0);
2325 data[i] = static_cast<int>(i * 2);
2328 for (
size_t i = 0; i < data.size(); ++i)
2329 EXPECT_EQ(data[i],
static_cast<int>(i * 2));
2347 std::vector<int>
input = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
2348 std::vector<int>
output(10);
2351 [](
int x) { return x * x; });
2353 EXPECT_EQ(
output, (std::vector<int>{1, 4, 9, 16, 25, 36, 49, 64, 81, 100}));
2360 std::vector<int>
input(1000);
2363 std::vector<int>
output(1000);
2366 [](
int x) { return x + 1; });
2368 for (
int i = 0; i < 1000; ++i)
2376 std::vector<int>
input;
2391 std::vector<int> data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
2402 std::vector<int> data = {1, 2, 3, 4, 5};
2405 std::multiplies<int>());
2414 std::vector<int> data = {3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5};
2417 std::numeric_limits<int>::min(),
2418 [](
int a,
int b) { return std::max(a, b); });
2427 std::vector<long long> data(10000);
2428 std::iota(data.begin(), data.end(), 1LL);
2431 std::plus<long long>());
2441 std::vector<int> empty;
2464 ::testing::InitGoogleTest(&
argc,
argv);
void empty() noexcept
empty the list
size_t size() const noexcept
Count the number of elements of the list.
A reusable thread pool for efficient parallel task execution.
bool try_enqueue_detached(F &&f, Args &&... args)
Try to submit a detached task without blocking or throwing.
size_t num_threads() const noexcept
Get the number of worker threads.
bool wait_all_for(std::chrono::duration< Rep, Period > timeout)
Wait until all current tasks complete, with timeout.
void enqueue_detached(F &&f, Args &&... args)
Submit a task without tracking the result (fire-and-forget).
void set_exception_callback(ExceptionCallback callback)
Set callback for exceptions in detached tasks.
auto enqueue_batch(F &&f, const Container &args_list) -> std::vector< std::future< decltype(std::apply(f, *std::begin(args_list)))> >
Submit multiple tasks with a single notification.
bool is_idle() const
Check if the pool is idle (no pending or running tasks).
auto enqueue_bounded(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task with backpressure and memory protection.
void wait_all(std::chrono::milliseconds poll_interval=std::chrono::milliseconds(1))
Wait until all current tasks complete.
auto enqueue_bulk(F &&f, const Container &args_container) -> std::vector< std::future< std::invoke_result_t< F, typename Container::value_type > > >
Submit multiple tasks and collect all futures.
void resize(size_t new_size)
Resize the pool to a different number of workers.
ThreadPoolStats get_stats() const
Get current pool statistics.
void reset_stats()
Reset statistics counters to zero.
bool wait_all_until(std::chrono::time_point< Clock, Duration > deadline)
Wait until all current tasks complete, with deadline.
void set_queue_limits(size_t soft_limit, size_t hard_limit=std::numeric_limits< size_t >::max())
Set queue limits for bounded enqueue operations.
bool is_stopped() const noexcept
Check if the pool has been shut down.
std::pair< size_t, size_t > get_queue_limits() const
Get current queue limits.
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task for execution and get a future for the result.
void enqueue_bounded_detached(F &&f, Args &&... args)
Submit a task with backpressure, without tracking result.
size_t pending_tasks() const
Get the number of pending tasks in the queue.
void shutdown()
Shut down the pool, completing all pending tasks.
auto try_enqueue(F &&f, Args &&... args) -> std::optional< std::future< std::invoke_result_t< F, Args... > > >
Try to submit a task without blocking or throwing.
Exception thrown when the task queue exceeds its hard limit.
size_t current_size() const noexcept
Current queue size when exception was thrown.
size_t hard_limit() const noexcept
Hard limit that was exceeded.
iterator end() noexcept
Return an STL-compatible end iterator.
iterator begin() noexcept
Return an STL-compatible iterator to the first element.
Main namespace for Aleph-w library functions.
T parallel_reduce(ThreadPool &pool, Iterator first, Iterator last, T init, BinaryOp op, size_t chunk_size=0)
Reduce elements in parallel.
OutputIt parallel_transform(ThreadPool &pool, InputIt first, InputIt last, OutputIt d_first, F &&f, size_t chunk_size=0)
Transform elements in parallel and store results.
ThreadPool & default_pool()
Global default thread pool.
bool completed() const noexcept
Return true if all underlying iterators are finished.
void parallel_for_index(ThreadPool &pool, size_t start, size_t end, F &&f, size_t chunk_size=0)
Apply a function to each element in parallel (index-based).
void parallel_for(ThreadPool &pool, Iterator begin, Iterator end, F &&f, size_t chunk_size=0)
Execute a function in parallel over a range.
T product(const Container &container, const T &init=T{1})
Compute product of all elements.
DynList< T > maps(const C &c, Op op)
Classic map operation.
T sum(const Container &container, const T &init=T{})
Compute sum of all elements.
Basic arithmetic operations for the calculator.
double add(double a, double b)
Statistics collected by ThreadPool.
double queue_utilization(size_t soft_limit) const noexcept
Queue utilization as percentage (0-100)
size_t current_queue_size
Current pending tasks.
int multiply(int a, int b) const
int operator()(int x) const
static StlIterator begin(SetType &s)
Create an iterator positioned at the first element of the container.
A modern, efficient thread pool for parallel task execution.
void increment_ref(int &x)
TEST_F(ThreadPoolTest, DefaultConstruction)
void add_to_ref(int &x, int amount)