151using namespace Aleph;
152using namespace std::chrono_literals;
158 std::cout <<
"+" << std::string(60,
'-') <<
"+\n";
159 std::cout <<
"| " << std::left << std::setw(58) << title <<
" |\n";
160 std::cout <<
"+" << std::string(60,
'-') <<
"+\n\n";
182 if (n < 2)
return false;
183 if (n == 2)
return true;
184 if (n % 2 == 0)
return false;
185 for (
int i = 3; i * i <= n; i += 2)
186 if (n % i == 0)
return false;
194 std::cout <<
"GOAL: Find all prime numbers in a range using parallel computation.\n\n";
204 ThreadPool pool(std::thread::hardware_concurrency());
205 std::cout <<
"Created ThreadPool with " << pool.
num_threads() <<
" workers\n\n";
214 std::vector<std::future<bool>>
futures;
217 auto start = std::chrono::high_resolution_clock::now();
231 std::cout <<
"Collecting results...\n";
234 for (
size_t i = 0; i <
futures.size(); ++i)
240 auto end = std::chrono::high_resolution_clock::now();
241 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
244 std::cout <<
"\n✓ RESULT: Found " << prime_count <<
" primes in range ["
246 std::cout <<
" Time: " <<
duration.count() <<
" ms\n";
269 std::this_thread::sleep_for(10
ms);
272 size_t hash = std::hash<std::string>{}(filename);
275 static_cast<int>(hash % 1000 + 100),
276 static_cast<int>((hash >> 10) % 100 + 10)
282 print_header(
"Example 2: Batch Processing with enqueue_bulk()");
284 std::cout <<
"GOAL: Process multiple files in parallel and aggregate results.\n\n";
287 std::vector<std::string>
files;
288 for (
int i = 1; i <= 50; ++i)
289 files.push_back(
"document_" + std::to_string(i) +
".txt");
293 std::cout <<
"Processing " <<
files.size() <<
" files with "
296 auto start = std::chrono::high_resolution_clock::now();
308 auto result = f.get();
313 auto end = std::chrono::high_resolution_clock::now();
314 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
316 std::cout <<
"✓ RESULT:\n";
317 std::cout <<
" Files processed: " <<
files.size() <<
"\n";
318 std::cout <<
" Total words: " <<
total_words <<
"\n";
319 std::cout <<
" Total lines: " <<
total_lines <<
"\n";
320 std::cout <<
" Time: " <<
duration.count() <<
" ms\n";
321 std::cout <<
" (Sequential would take ~" <<
files.size() * 10 <<
" ms)\n";
346 print_header(
"Example 3: Fire-and-Forget with enqueue_detached()");
348 std::cout <<
"GOAL: Perform background logging without blocking main work.\n\n";
352 std::cout <<
"Main thread does work while logging happens in background:\n\n";
354 for (
int i = 1; i <= 5; ++i)
360 std::this_thread::sleep_for(5
ms);
361 std::ostringstream
oss;
362 oss <<
" [BACKGROUND LOG] Processed item " << i <<
"\n";
363 std::cout <<
oss.str();
367 std::cout <<
"[MAIN THREAD] Working on item " << i <<
"...\n";
368 std::this_thread::sleep_for(30
ms);
371 std::cout <<
"\n[MAIN THREAD] Main work complete. Waiting for logs...\n";
373 std::cout <<
"\n✓ All background logging completed\n";
392 print_header(
"Example 4: Backpressure with enqueue_bounded()");
394 std::cout <<
"GOAL: Prevent queue overflow when producer is faster than consumers.\n\n";
404 std::cout <<
"Queue limits: soft=" <<
soft <<
", hard=" <<
hard <<
"\n\n";
410 std::this_thread::sleep_for(50
ms);
414 std::cout <<
"Enqueueing 20 slow tasks (50ms each)...\n";
415 std::cout <<
"Watch how producer is throttled when queue fills up:\n\n";
417 auto start = std::chrono::high_resolution_clock::now();
419 for (
int i = 1; i <= 20; ++i)
421 auto enqueue_start = std::chrono::high_resolution_clock::now();
426 auto enqueue_end = std::chrono::high_resolution_clock::now();
427 auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(
430 std::cout <<
" Task " << std::setw(2) << i
433 std::cout <<
" ← blocked " <<
wait_time <<
"ms";
439 auto end = std::chrono::high_resolution_clock::now();
440 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
442 std::cout <<
"\n✓ RESULT:\n";
443 std::cout <<
" Processed: " <<
processed <<
" tasks\n";
444 std::cout <<
" Total time: " <<
duration.count() <<
" ms\n";
445 std::cout <<
" Memory was protected by limiting queue size\n";
470 print_header(
"Example 5: Load Shedding with try_enqueue()");
472 std::cout <<
"GOAL: Reject excess tasks when system is overloaded.\n\n";
482 std::this_thread::sleep_for(100
ms);
486 std::cout <<
"Attempting to enqueue 15 tasks with soft_limit=3:\n\n";
488 for (
int i = 1; i <= 15; ++i)
493 std::cout <<
" Task " << std::setw(2) << i <<
": ✓ ACCEPTED\n";
497 std::cout <<
" Task " << std::setw(2) << i <<
": ✗ REJECTED (queue full)\n";
504 std::cout <<
"\n✓ RESULT:\n";
505 std::cout <<
" Accepted and processed: " <<
accepted <<
"\n";
506 std::cout <<
" Rejected (dropped): " <<
rejected <<
"\n";
507 std::cout <<
" System remained responsive - no blocking!\n";
520 print_header(
"Example 6: Structured Tasks and Cooperative Cancellation");
529 for (int step = 0; step < 20; ++step)
531 if (token.stop_requested())
533 std::this_thread::sleep_for(1ms);
542 std::this_thread::sleep_for(5
ms);
543 std::cout <<
"Requesting cancellation of remaining work...\n";
544 cancel.request_cancel();
551 catch (
const std::exception & e)
553 std::cout <<
"Caught structured exception: " << e.what() <<
"\n";
556 std::cout <<
"Tasks completed before cancellation/exception: "
570 print_header(
"Example 7: parallel_invoke / pscan / pmerge");
572 ThreadPool pool(std::thread::hardware_concurrency());
576 const std::vector<int>
seed = {1, 2, 3, 4};
581 [&] {
sum = std::accumulate(
seed.begin(),
seed.end(), 0); },
583 std::multiplies<int>{}); });
585 std::cout <<
"parallel_invoke results: sum=" <<
sum
586 <<
", product=" <<
product <<
"\n";
588 std::vector<int> values = {1, 2, 3, 4, 5, 6};
589 std::vector<int>
inclusive(values.size());
590 std::vector<int> exclusive(values.size());
595 std::cout <<
"Inclusive scan: ";
597 std::cout << x <<
" ";
598 std::cout <<
"\nExclusive scan: ";
599 for (
int x : exclusive)
600 std::cout << x <<
" ";
603 std::vector<int> left = {1, 3, 5, 7};
604 std::vector<int> right = {2, 4, 6, 8};
605 std::vector<int> merged(left.size() + right.size());
606 pmerge(left.begin(), left.end(), right.begin(), right.end(),
607 merged.begin(), std::less<int>{},
options);
609 std::cout <<
"Merged range: ";
611 std::cout << x <<
" ";
625 print_header(
"Example 8: bounded_channel / synchronized / spsc_queue");
634 for (
int i = 1; i <= 8; ++i)
657 std::cout <<
"Squares received through bounded_channel: ";
659 std::cout << value <<
" ";
661 std::cout <<
"Items processed: "
668 auto first =
handoff.try_pop();
669 auto second =
handoff.try_pop();
670 std::cout <<
"SPSC queue sample: "
671 << (first.has_value() ? *first : -1) <<
", "
672 << (second.has_value() ? *second : -1) <<
"\n";
683 std::cout <<
"Cancellation-aware recv interrupted as expected\n";
702 std::cout <<
"GOAL: Compare parallel execution vs sequential execution.\n\n";
707 auto compute = [](
int x) {
709 for (
int i = 0; i < 10000; ++i)
710 result += std::sin(x * i * 0.001);
715 std::cout <<
"Running " <<
num_tasks <<
" tasks sequentially...\n";
717 auto seq_start = std::chrono::high_resolution_clock::now();
721 auto seq_end = std::chrono::high_resolution_clock::now();
725 std::cout <<
"Running " <<
num_tasks <<
" tasks in parallel...\n\n";
727 ThreadPool pool(std::thread::hardware_concurrency());
729 auto par_start = std::chrono::high_resolution_clock::now();
730 std::vector<std::future<double>>
futures;
738 auto par_end = std::chrono::high_resolution_clock::now();
744 std::cout <<
"┌────────────────────────────────────────┐\n";
745 std::cout <<
"│ PERFORMANCE RESULTS │\n";
746 std::cout <<
"├────────────────────────────────────────┤\n";
747 std::cout <<
"│ Tasks: " << std::setw(24) <<
num_tasks <<
" │\n";
748 std::cout <<
"│ Threads: " << std::setw(24) << pool.
num_threads() <<
" │\n";
749 std::cout <<
"├────────────────────────────────────────┤\n";
750 std::cout <<
"│ Sequential: " << std::setw(20) <<
seq_ms <<
" ms │\n";
751 std::cout <<
"│ Parallel: " << std::setw(20) <<
par_ms <<
" ms │\n";
752 std::cout <<
"├────────────────────────────────────────┤\n";
753 std::cout <<
"│ SPEEDUP: " << std::setw(20) << std::fixed
754 << std::setprecision(1) <<
speedup <<
"x │\n";
755 std::cout <<
"└────────────────────────────────────────┘\n";
758 std::cout <<
"\n✓ Results match: " << (std::abs(
seq_result -
par_result) < 0.01 ?
"YES" :
"NO") <<
"\n";
768 std::cout <<
"╔════════════════════════════════════════════════════════════════╗\n";
769 std::cout <<
"║ ║\n";
770 std::cout <<
"║ ALEPH-W THREADPOOL USAGE EXAMPLES ║\n";
771 std::cout <<
"║ ║\n";
772 std::cout <<
"║ Learn how to use the ThreadPool for parallel execution ║\n";
773 std::cout <<
"║ ║\n";
774 std::cout <<
"╚════════════════════════════════════════════════════════════════╝\n";
776 std::cout <<
"\nThis program demonstrates 9 common ThreadPool/concurrency usage patterns.\n";
777 std::cout <<
"Read the source code comments for detailed explanations.\n";
790 std::cout <<
"╔════════════════════════════════════════════════════════════════╗\n";
791 std::cout <<
"║ ✓ ALL EXAMPLES COMPLETED SUCCESSFULLY ║\n";
792 std::cout <<
"║ ║\n";
793 std::cout <<
"║ QUICK REFERENCE: ║\n";
794 std::cout <<
"║ enqueue(f, args...) → std::future<T> ║\n";
795 std::cout <<
"║ enqueue_detached(f, args...) → void (fire-and-forget) ║\n";
796 std::cout <<
"║ enqueue_bounded(f, args...) → std::future<T> (backpressure)║\n";
797 std::cout <<
"║ try_enqueue(f, args...) → optional<future> (non-block) ║\n";
798 std::cout <<
"║ enqueue_bulk(f, container) → vector<future> (batch) ║\n";
799 std::cout <<
"║ TaskGroup / Cancellation* → structured parallel tasks ║\n";
800 std::cout <<
"║ parallel_invoke / pscan → composable parallel blocks ║\n";
801 std::cout <<
"║ bounded_channel / sync* → producer-consumer helpers ║\n";
802 std::cout <<
"╚════════════════════════════════════════════════════════════════╝\n\n";
Exception handling system with formatted messages for Aleph-w.
#define ah_runtime_error()
Throws std::runtime_error unconditionally.
Bounded blocking channel for producer-consumer workflows.
Cooperative cancellation source paired with CancellationToken.
CancellationToken token() const noexcept
Return a token observing this source.
void request_cancel() noexcept
Request cancellation for all derived tokens.
Read/write-lock protected shared object wrapper.
Bounded single-producer/single-consumer queue.
Mutex-protected shared object wrapper.
Minimal structured-concurrency helper over ThreadPool futures.
A reusable thread pool for efficient parallel task execution.
bool try_enqueue_detached(F &&f, Args &&... args)
Try to submit a detached task without blocking or throwing.
size_t num_threads() const noexcept
Get the number of worker threads.
void enqueue_detached(F &&f, Args &&... args)
Submit a task without tracking the result (fire-and-forget).
auto enqueue_bulk(F &&f, const Container &args_container) -> std::vector< std::future< std::invoke_result_t< F, typename Container::value_type > > >
Submit multiple tasks and collect all futures.
void wait_all(const std::chrono::milliseconds poll_interval=std::chrono::milliseconds(1))
Wait until all current tasks complete.
std::pair< size_t, size_t > get_queue_limits() const
Get current queue limits.
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task for execution and get a future for the result.
void enqueue_bounded_detached(F &&f, Args &&... args)
Submit a task with backpressure, without tracking result.
size_t pending_tasks() const
Get the number of pending tasks in the queue.
void set_queue_limits(const size_t soft_limit, const size_t hard_limit=std::numeric_limits< size_t >::max())
Set queue limits for bounded enqueue operations.
Exception thrown when cooperative cancellation is observed.
Modern synchronization helpers for channels, shared state, and small producer-consumer queues.
Main namespace for Aleph-w library functions.
bool completed() const noexcept
Return true if all underlying iterators are finished.
auto pexclusive_scan(ThreadPool &pool, const Container &c, T init, BinaryOp op, size_t chunk_size=0)
Parallel exclusive scan over a container.
Divide_Conquer_DP_Result< Cost > divide_and_conquer_partition_dp(const size_t groups, const size_t n, Transition_Cost_Fn transition_cost, const Cost inf=dp_optimization_detail::default_inf< Cost >())
Optimize partition DP using divide-and-conquer optimization.
auto pmerge(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Compare comp=Compare{}, size_t chunk_size=0)
Parallel merge of two sorted containers.
T product(const Container &container, const T &init=T{1})
Compute product of all elements.
auto pscan(ThreadPool &pool, const Container &c, BinaryOp op, size_t chunk_size=0)
Parallel inclusive scan over a container.
void parallel_invoke(ThreadPool &pool, Fs &&... fs)
Invoke a small set of related callables in parallel.
Itor::difference_type count(const Itor &beg, const Itor &end, const T &value)
Count elements equal to a value.
T sum(const Container &container, const T &init=T{})
Compute sum of all elements.
static struct argp_option options[]
Common configuration object for parallel algorithms.
ThreadPool * pool
Executor to use (nullptr = default_pool()).
A modern, efficient thread pool for parallel task execution.
void example_parallel_building_blocks()
void example_structured_concurrency()
void example_load_shedding()
void example_performance()
void example_backpressure()
void example_channels_and_shared_state()
FileResult process_file(const std::string &filename)
void example_fire_and_forget()
void example_batch_processing()
void example_basic_parallel()