129using namespace Aleph;
130using namespace std::chrono_literals;
136 std::cout <<
"+" << std::string(60,
'-') <<
"+\n";
137 std::cout <<
"| " << std::left << std::setw(58) <<
title <<
" |\n";
138 std::cout <<
"+" << std::string(60,
'-') <<
"+\n\n";
160 if (n < 2)
return false;
161 if (n == 2)
return true;
162 if (n % 2 == 0)
return false;
163 for (
int i = 3; i * i <= n; i += 2)
164 if (n % i == 0)
return false;
172 std::cout <<
"GOAL: Find all prime numbers in a range using parallel computation.\n\n";
182 ThreadPool pool(std::thread::hardware_concurrency());
183 std::cout <<
"Created ThreadPool with " << pool.
num_threads() <<
" workers\n\n";
192 std::vector<std::future<bool>>
futures;
195 auto start = std::chrono::high_resolution_clock::now();
209 std::cout <<
"Collecting results...\n";
218 auto end = std::chrono::high_resolution_clock::now();
219 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
222 std::cout <<
"\n✓ RESULT: Found " <<
prime_count <<
" primes in range ["
224 std::cout <<
" Time: " <<
duration.count() <<
" ms\n";
247 std::this_thread::sleep_for(10
ms);
250 size_t hash = std::hash<std::string>{}(filename);
253 static_cast<int>(
hash % 1000 + 100),
254 static_cast<int>((
hash >> 10) % 100 + 10)
260 print_header(
"Example 2: Batch Processing with enqueue_bulk()");
262 std::cout <<
"GOAL: Process multiple files in parallel and aggregate results.\n\n";
265 std::vector<std::string>
files;
266 for (
int i = 1; i <= 50; ++i)
267 files.push_back(
"document_" + std::to_string(i) +
".txt");
271 std::cout <<
"Processing " <<
files.
size() <<
" files with "
274 auto start = std::chrono::high_resolution_clock::now();
286 auto result = f.
get();
291 auto end = std::chrono::high_resolution_clock::now();
292 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
294 std::cout <<
"✓ RESULT:\n";
295 std::cout <<
" Files processed: " <<
files.
size() <<
"\n";
296 std::cout <<
" Total words: " <<
total_words <<
"\n";
297 std::cout <<
" Total lines: " <<
total_lines <<
"\n";
298 std::cout <<
" Time: " <<
duration.count() <<
" ms\n";
299 std::cout <<
" (Sequential would take ~" <<
files.
size() * 10 <<
" ms)\n";
324 print_header(
"Example 3: Fire-and-Forget with enqueue_detached()");
326 std::cout <<
"GOAL: Perform background logging without blocking main work.\n\n";
330 std::cout <<
"Main thread does work while logging happens in background:\n\n";
332 for (
int i = 1; i <= 5; ++i)
338 std::this_thread::sleep_for(5
ms);
339 std::ostringstream
oss;
340 oss <<
" [BACKGROUND LOG] Processed item " << i <<
"\n";
341 std::cout <<
oss.str();
345 std::cout <<
"[MAIN THREAD] Working on item " << i <<
"...\n";
346 std::this_thread::sleep_for(30
ms);
349 std::cout <<
"\n[MAIN THREAD] Main work complete. Waiting for logs...\n";
351 std::cout <<
"\n✓ All background logging completed\n";
370 print_header(
"Example 4: Backpressure with enqueue_bounded()");
372 std::cout <<
"GOAL: Prevent queue overflow when producer is faster than consumers.\n\n";
382 std::cout <<
"Queue limits: soft=" <<
soft <<
", hard=" <<
hard <<
"\n\n";
388 std::this_thread::sleep_for(50
ms);
392 std::cout <<
"Enqueueing 20 slow tasks (50ms each)...\n";
393 std::cout <<
"Watch how producer is throttled when queue fills up:\n\n";
395 auto start = std::chrono::high_resolution_clock::now();
397 for (
int i = 1; i <= 20; ++i)
399 auto enqueue_start = std::chrono::high_resolution_clock::now();
404 auto enqueue_end = std::chrono::high_resolution_clock::now();
405 auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(
408 std::cout <<
" Task " << std::setw(2) << i
411 std::cout <<
" ← blocked " <<
wait_time <<
"ms";
417 auto end = std::chrono::high_resolution_clock::now();
418 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
420 std::cout <<
"\n✓ RESULT:\n";
421 std::cout <<
" Processed: " <<
processed <<
" tasks\n";
422 std::cout <<
" Total time: " <<
duration.count() <<
" ms\n";
423 std::cout <<
" Memory was protected by limiting queue size\n";
448 print_header(
"Example 5: Load Shedding with try_enqueue()");
450 std::cout <<
"GOAL: Reject excess tasks when system is overloaded.\n\n";
460 std::this_thread::sleep_for(100
ms);
464 std::cout <<
"Attempting to enqueue 15 tasks with soft_limit=3:\n\n";
466 for (
int i = 1; i <= 15; ++i)
471 std::cout <<
" Task " << std::setw(2) << i <<
": ✓ ACCEPTED\n";
475 std::cout <<
" Task " << std::setw(2) << i <<
": ✗ REJECTED (queue full)\n";
482 std::cout <<
"\n✓ RESULT:\n";
483 std::cout <<
" Accepted and processed: " <<
accepted <<
"\n";
484 std::cout <<
" Rejected (dropped): " <<
rejected <<
"\n";
485 std::cout <<
" System remained responsive - no blocking!\n";
503 std::cout <<
"GOAL: Compare parallel execution vs sequential execution.\n\n";
508 auto compute = [](
int x) {
510 for (
int i = 0; i < 10000; ++i)
511 result += std::sin(x * i * 0.001);
516 std::cout <<
"Running " <<
num_tasks <<
" tasks sequentially...\n";
518 auto seq_start = std::chrono::high_resolution_clock::now();
522 auto seq_end = std::chrono::high_resolution_clock::now();
526 std::cout <<
"Running " <<
num_tasks <<
" tasks in parallel...\n\n";
528 ThreadPool pool(std::thread::hardware_concurrency());
530 auto par_start = std::chrono::high_resolution_clock::now();
531 std::vector<std::future<double>>
futures;
539 auto par_end = std::chrono::high_resolution_clock::now();
545 std::cout <<
"┌────────────────────────────────────────┐\n";
546 std::cout <<
"│ PERFORMANCE RESULTS │\n";
547 std::cout <<
"├────────────────────────────────────────┤\n";
548 std::cout <<
"│ Tasks: " << std::setw(24) <<
num_tasks <<
" │\n";
549 std::cout <<
"│ Threads: " << std::setw(24) << pool.
num_threads() <<
" │\n";
550 std::cout <<
"├────────────────────────────────────────┤\n";
551 std::cout <<
"│ Sequential: " << std::setw(20) <<
seq_ms <<
" ms │\n";
552 std::cout <<
"│ Parallel: " << std::setw(20) <<
par_ms <<
" ms │\n";
553 std::cout <<
"├────────────────────────────────────────┤\n";
554 std::cout <<
"│ SPEEDUP: " << std::setw(20) << std::fixed
555 << std::setprecision(1) <<
speedup <<
"x │\n";
556 std::cout <<
"└────────────────────────────────────────┘\n";
559 std::cout <<
"\n✓ Results match: " << (std::abs(
seq_result -
par_result) < 0.01 ?
"YES" :
"NO") <<
"\n";
569 std::cout <<
"╔════════════════════════════════════════════════════════════════╗\n";
570 std::cout <<
"║ ║\n";
571 std::cout <<
"║ ALEPH-W THREADPOOL USAGE EXAMPLES ║\n";
572 std::cout <<
"║ ║\n";
573 std::cout <<
"║ Learn how to use the ThreadPool for parallel execution ║\n";
574 std::cout <<
"║ ║\n";
575 std::cout <<
"╚════════════════════════════════════════════════════════════════╝\n";
577 std::cout <<
"\nThis program demonstrates 6 common ThreadPool usage patterns.\n";
578 std::cout <<
"Read the source code comments for detailed explanations.\n";
588 std::cout <<
"╔════════════════════════════════════════════════════════════════╗\n";
589 std::cout <<
"║ ✓ ALL EXAMPLES COMPLETED SUCCESSFULLY ║\n";
590 std::cout <<
"║ ║\n";
591 std::cout <<
"║ QUICK REFERENCE: ║\n";
592 std::cout <<
"║ enqueue(f, args...) → std::future<T> ║\n";
593 std::cout <<
"║ enqueue_detached(f, args...) → void (fire-and-forget) ║\n";
594 std::cout <<
"║ enqueue_bounded(f, args...) → std::future<T> (backpressure)║\n";
595 std::cout <<
"║ try_enqueue(f, args...) → optional<future> (non-block) ║\n";
596 std::cout <<
"║ enqueue_bulk(f, container) → vector<future> (batch) ║\n";
597 std::cout <<
"╚════════════════════════════════════════════════════════════════╝\n\n";
size_t size() const noexcept
Count the number of elements of the list.
A reusable thread pool for efficient parallel task execution.
bool try_enqueue_detached(F &&f, Args &&... args)
Try to submit a detached task without blocking or throwing.
size_t num_threads() const noexcept
Get the number of worker threads.
void enqueue_detached(F &&f, Args &&... args)
Submit a task without tracking the result (fire-and-forget).
void wait_all(std::chrono::milliseconds poll_interval=std::chrono::milliseconds(1))
Wait until all current tasks complete.
auto enqueue_bulk(F &&f, const Container &args_container) -> std::vector< std::future< std::invoke_result_t< F, typename Container::value_type > > >
Submit multiple tasks and collect all futures.
void set_queue_limits(size_t soft_limit, size_t hard_limit=std::numeric_limits< size_t >::max())
Set queue limits for bounded enqueue operations.
std::pair< size_t, size_t > get_queue_limits() const
Get current queue limits.
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task for execution and get a future for the result.
void enqueue_bounded_detached(F &&f, Args &&... args)
Submit a task with backpressure, without tracking result.
size_t pending_tasks() const
Get the number of pending tasks in the queue.
Main namespace for Aleph-w library functions.
DynList< T > maps(const C &c, Op op)
Classic map operation.
A modern, efficient thread pool for parallel task execution.
void example_load_shedding()
void example_performance()
void example_backpressure()
FileResult process_file(const std::string &filename)
void example_fire_and_forget()
void example_batch_processing()
void example_basic_parallel()