Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
thread_pool_example.cc
Go to the documentation of this file.
1
136#include <thread_pool.H>
137#include <concurrency_utils.H>
138#include <ah-errors.H>
139#include <iostream>
140#include <iomanip>
141#include <vector>
142#include <algorithm>
143#include <functional>
144#include <numeric>
145#include <stdexcept>
146#include <cmath>
147#include <chrono>
148#include <fstream>
149#include <sstream>
150
151using namespace Aleph;
152using namespace std::chrono_literals;
153
154// Helper to print section headers
155void print_header(const std::string& title)
156{
157 std::cout << "\n";
158 std::cout << "+" << std::string(60, '-') << "+\n";
159 std::cout << "| " << std::left << std::setw(58) << title << " |\n";
160 std::cout << "+" << std::string(60, '-') << "+\n\n";
161}
162
163// =============================================================================
164// EXAMPLE 1: Basic Parallel Computation
165// =============================================================================
166//
167// This example shows the fundamental pattern:
168// 1. Create a ThreadPool
169// 2. Submit tasks with enqueue() - returns std::future<T>
170// 3. Collect results with future.get()
171//
172// USE CASE: When you have many independent computations and need all results.
173//
174// KEY CONCEPTS:
175// - enqueue(function, args...) submits a task and returns a future
176// - The future blocks on .get() until the result is ready
177// - Tasks run in parallel across available workers
178//
179
180bool is_prime(int n)
181{
182 if (n < 2) return false;
183 if (n == 2) return true;
184 if (n % 2 == 0) return false;
185 for (int i = 3; i * i <= n; i += 2)
186 if (n % i == 0) return false;
187 return true;
188}
189
191{
192 print_header("Example 1: Basic Parallel Computation");
193
194 std::cout << "GOAL: Find all prime numbers in a range using parallel computation.\n\n";
195
196 const int range_start = 1;
197 const int range_end = 100000;
198
199 // STEP 1: Create ThreadPool
200 // -------------------------
201 // By default, uses std::thread::hardware_concurrency() threads.
202 // You can specify the number: ThreadPool pool(4);
203
204 ThreadPool pool(std::thread::hardware_concurrency());
205 std::cout << "Created ThreadPool with " << pool.num_threads() << " workers\n\n";
206
207 // STEP 2: Submit tasks
208 // --------------------
209 // enqueue() returns std::future<T> where T is the return type of the function.
210 // Tasks are queued and executed by available workers.
211
212 std::cout << "Submitting " << (range_end - range_start + 1) << " tasks...\n";
213
214 std::vector<std::future<bool>> futures;
215 futures.reserve(range_end - range_start + 1);
216
217 auto start = std::chrono::high_resolution_clock::now();
218
219 for (int n = range_start; n <= range_end; ++n)
220 {
221 // Submit task: is_prime(n)
222 // The future will hold the boolean result
223 futures.push_back(pool.enqueue(is_prime, n));
224 }
225
226 // STEP 3: Collect results
227 // -----------------------
228 // future.get() blocks until the result is ready.
229 // Results are collected in submission order (not completion order).
230
231 std::cout << "Collecting results...\n";
232
233 int prime_count = 0;
234 for (size_t i = 0; i < futures.size(); ++i)
235 {
236 if (futures[i].get()) // Blocks until result ready
237 ++prime_count;
238 }
239
240 auto end = std::chrono::high_resolution_clock::now();
241 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
242
243 // RESULTS
244 std::cout << "\n✓ RESULT: Found " << prime_count << " primes in range ["
245 << range_start << ", " << range_end << "]\n";
246 std::cout << " Time: " << duration.count() << " ms\n";
247}
248
249// =============================================================================
250// EXAMPLE 2: Batch Processing with enqueue_bulk()
251// =============================================================================
252//
253// When processing a collection of items with the same function,
254// enqueue_bulk() is more convenient than calling enqueue() in a loop.
255//
256// USE CASE: Processing files, URLs, records, etc. in parallel.
257//
258
260{
261 std::string filename;
264};
265
266FileResult process_file(const std::string& filename)
267{
268 // Simulate file processing (10ms per file)
269 std::this_thread::sleep_for(10ms);
270
271 // Generate fake results based on filename
272 size_t hash = std::hash<std::string>{}(filename);
273 return {
274 filename,
275 static_cast<int>(hash % 1000 + 100),
276 static_cast<int>((hash >> 10) % 100 + 10)
277 };
278}
279
281{
282 print_header("Example 2: Batch Processing with enqueue_bulk()");
283
284 std::cout << "GOAL: Process multiple files in parallel and aggregate results.\n\n";
285
286 // Create a list of files to process
287 std::vector<std::string> files;
288 for (int i = 1; i <= 50; ++i)
289 files.push_back("document_" + std::to_string(i) + ".txt");
290
291 ThreadPool pool(8);
292
293 std::cout << "Processing " << files.size() << " files with "
294 << pool.num_threads() << " workers...\n\n";
295
296 auto start = std::chrono::high_resolution_clock::now();
297
298 // enqueue_bulk() submits the same function for each element in the container
299 // Returns a vector of futures
301
302 // Aggregate results
303 int total_words = 0;
304 int total_lines = 0;
305
306 for (auto& f : futures)
307 {
308 auto result = f.get();
309 total_words += result.word_count;
310 total_lines += result.line_count;
311 }
312
313 auto end = std::chrono::high_resolution_clock::now();
314 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
315
316 std::cout << "✓ RESULT:\n";
317 std::cout << " Files processed: " << files.size() << "\n";
318 std::cout << " Total words: " << total_words << "\n";
319 std::cout << " Total lines: " << total_lines << "\n";
320 std::cout << " Time: " << duration.count() << " ms\n";
321 std::cout << " (Sequential would take ~" << files.size() * 10 << " ms)\n";
322}
323
324// =============================================================================
325// EXAMPLE 3: Fire-and-Forget with enqueue_detached()
326// =============================================================================
327//
328// Sometimes you don't need the result of a task - you just want it to run
329// in the background. enqueue_detached() is perfect for:
330// - Logging
331// - Metrics/telemetry
332// - Cache updates
333// - Notifications
334//
335// BENEFITS:
336// - No future overhead (slightly faster)
337// - Main thread doesn't need to track results
338//
339// WARNING:
340// - Exceptions in detached tasks are silently ignored
341// - Use enqueue() if you need error handling
342//
343
345{
346 print_header("Example 3: Fire-and-Forget with enqueue_detached()");
347
348 std::cout << "GOAL: Perform background logging without blocking main work.\n\n";
349
350 ThreadPool pool(2);
351
352 std::cout << "Main thread does work while logging happens in background:\n\n";
353
354 for (int i = 1; i <= 5; ++i)
355 {
356 // Fire-and-forget: log message asynchronously
357 // Main thread continues immediately
358 pool.enqueue_detached([i] {
359 // Simulate log write latency
360 std::this_thread::sleep_for(5ms);
361 std::ostringstream oss;
362 oss << " [BACKGROUND LOG] Processed item " << i << "\n";
363 std::cout << oss.str();
364 });
365
366 // Main work continues without waiting for log
367 std::cout << "[MAIN THREAD] Working on item " << i << "...\n";
368 std::this_thread::sleep_for(30ms);
369 }
370
371 std::cout << "\n[MAIN THREAD] Main work complete. Waiting for logs...\n";
372 pool.wait_all(); // Ensure all background tasks complete
373 std::cout << "\n✓ All background logging completed\n";
374}
375
376// =============================================================================
377// EXAMPLE 4: Backpressure with enqueue_bounded()
378// =============================================================================
379//
380// PROBLEM: If producers submit tasks faster than workers can process,
381// the queue grows unbounded, potentially causing memory issues.
382//
383// SOLUTION: enqueue_bounded() with queue limits
384// - soft_limit: When queue reaches this size, enqueue BLOCKS
385// - hard_limit: If queue reaches this size, throws queue_overflow_error
386//
387// This creates natural "backpressure" - fast producers slow down automatically.
388//
389
391{
392 print_header("Example 4: Backpressure with enqueue_bounded()");
393
394 std::cout << "GOAL: Prevent queue overflow when producer is faster than consumers.\n\n";
395
396 ThreadPool pool(2);
397
398 // Configure queue limits
399 // soft_limit = 5: Block when queue has 5+ pending tasks
400 // hard_limit = 20: Throw exception if queue exceeds 20 (safety net)
401 pool.set_queue_limits(5, 20);
402
403 auto [soft, hard] = pool.get_queue_limits();
404 std::cout << "Queue limits: soft=" << soft << ", hard=" << hard << "\n\n";
405
406 std::atomic<int> processed{0};
407
408 // Slow consumer task (50ms each)
409 auto slow_task = [&processed] {
410 std::this_thread::sleep_for(50ms);
411 ++processed;
412 };
413
414 std::cout << "Enqueueing 20 slow tasks (50ms each)...\n";
415 std::cout << "Watch how producer is throttled when queue fills up:\n\n";
416
417 auto start = std::chrono::high_resolution_clock::now();
418
419 for (int i = 1; i <= 20; ++i)
420 {
421 auto enqueue_start = std::chrono::high_resolution_clock::now();
422
423 // This will BLOCK if queue is at soft_limit
425
426 auto enqueue_end = std::chrono::high_resolution_clock::now();
427 auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(
428 enqueue_end - enqueue_start).count();
429
430 std::cout << " Task " << std::setw(2) << i
431 << " enqueued (pending: " << pool.pending_tasks() << ")";
432 if (wait_time > 5)
433 std::cout << " ← blocked " << wait_time << "ms";
434 std::cout << "\n";
435 }
436
437 pool.wait_all();
438
439 auto end = std::chrono::high_resolution_clock::now();
440 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
441
442 std::cout << "\n✓ RESULT:\n";
443 std::cout << " Processed: " << processed << " tasks\n";
444 std::cout << " Total time: " << duration.count() << " ms\n";
445 std::cout << " Memory was protected by limiting queue size\n";
446}
447
448// =============================================================================
449// EXAMPLE 5: Load Shedding with try_enqueue()
450// =============================================================================
451//
452// SCENARIO: High-traffic system where you'd rather DROP requests than
453// slow down or run out of memory.
454//
455// try_enqueue() returns immediately:
456// - std::optional<future> if task was queued
457// - std::nullopt if queue is full (at soft_limit)
458//
459// try_enqueue_detached() returns:
460// - true if task was queued
461// - false if queue is full
462//
463// This is useful for:
464// - Web servers that need to reject excess requests
465// - Real-time systems where dropping is better than delaying
466//
467
469{
470 print_header("Example 5: Load Shedding with try_enqueue()");
471
472 std::cout << "GOAL: Reject excess tasks when system is overloaded.\n\n";
473
474 ThreadPool pool(2);
475 pool.set_queue_limits(3, 10); // Accept max 3 pending tasks
476
477 std::atomic<int> accepted{0};
478 int rejected = 0;
479
480 // Slow task (100ms)
481 auto task = [&accepted] {
482 std::this_thread::sleep_for(100ms);
483 ++accepted;
484 };
485
486 std::cout << "Attempting to enqueue 15 tasks with soft_limit=3:\n\n";
487
488 for (int i = 1; i <= 15; ++i)
489 {
490 // try_enqueue_detached returns bool (non-blocking)
491 if (pool.try_enqueue_detached(task))
492 {
493 std::cout << " Task " << std::setw(2) << i << ": ✓ ACCEPTED\n";
494 }
495 else
496 {
497 std::cout << " Task " << std::setw(2) << i << ": ✗ REJECTED (queue full)\n";
498 ++rejected;
499 }
500 }
501
502 pool.wait_all();
503
504 std::cout << "\n✓ RESULT:\n";
505 std::cout << " Accepted and processed: " << accepted << "\n";
506 std::cout << " Rejected (dropped): " << rejected << "\n";
507 std::cout << " System remained responsive - no blocking!\n";
508}
509
510// =============================================================================
511// EXAMPLE 6: Structured Tasks and Cooperative Cancellation
512// =============================================================================
513//
514// TaskGroup groups related tasks and joins them as a unit. Cancellation remains
515// cooperative: tasks observe a token and exit on their own.
516//
517
519{
520 print_header("Example 6: Structured Tasks and Cooperative Cancellation");
521
522 ThreadPool pool(4);
523 TaskGroup group(pool);
524 CancellationSource cancel;
525 std::atomic<int> completed{0};
526
527 for (int task_id = 0; task_id < 8; ++task_id)
528 group.launch([task_id, token = cancel.token(), &completed] {
529 for (int step = 0; step < 20; ++step)
530 {
531 if (token.stop_requested())
532 return;
533 std::this_thread::sleep_for(1ms);
534 }
535 ++completed;
536 if (task_id == 2)
537 ah_runtime_error() << "TaskGroup demo exception";
538 });
539
540 // Request cancellation while tasks are still running to demonstrate
541 // cooperative interruption.
542 std::this_thread::sleep_for(5ms);
543 std::cout << "Requesting cancellation of remaining work...\n";
544 cancel.request_cancel();
545
546 try
547 {
548 // wait() propagates the exception from task 2.
549 group.wait();
550 }
551 catch (const std::exception & e)
552 {
553 std::cout << "Caught structured exception: " << e.what() << "\n";
554 }
555
556 std::cout << "Tasks completed before cancellation/exception: "
557 << completed.load() << "\n";
558}
559
560// =============================================================================
561// EXAMPLE 7: Foundational Parallel Building Blocks
562// =============================================================================
563//
564// These low-level helpers are meant to compose larger parallel algorithms
565// without exposing users to queue plumbing.
566//
567
569{
570 print_header("Example 7: parallel_invoke / pscan / pmerge");
571
572 ThreadPool pool(std::thread::hardware_concurrency());
574 options.pool = &pool;
575 options.chunk_size = 4;
576 const std::vector<int> seed = {1, 2, 3, 4};
577
578 int sum = 0;
579 int product = 1;
581 [&] { sum = std::accumulate(seed.begin(), seed.end(), 0); },
582 [&] { product = std::accumulate(seed.begin(), seed.end(), 1,
583 std::multiplies<int>{}); });
584
585 std::cout << "parallel_invoke results: sum=" << sum
586 << ", product=" << product << "\n";
587
588 std::vector<int> values = {1, 2, 3, 4, 5, 6};
589 std::vector<int> inclusive(values.size());
590 std::vector<int> exclusive(values.size());
591 pscan(values.begin(), values.end(), inclusive.begin(), std::plus<int>{}, options);
592 pexclusive_scan(values.begin(), values.end(), exclusive.begin(), 0,
593 std::plus<int>{}, options);
594
595 std::cout << "Inclusive scan: ";
596 for (int x : inclusive)
597 std::cout << x << " ";
598 std::cout << "\nExclusive scan: ";
599 for (int x : exclusive)
600 std::cout << x << " ";
601 std::cout << "\n";
602
603 std::vector<int> left = {1, 3, 5, 7};
604 std::vector<int> right = {2, 4, 6, 8};
605 std::vector<int> merged(left.size() + right.size());
606 pmerge(left.begin(), left.end(), right.begin(), right.end(),
607 merged.begin(), std::less<int>{}, options);
608
609 std::cout << "Merged range: ";
610 for (int x : merged)
611 std::cout << x << " ";
612 std::cout << "\n";
613}
614
615// =============================================================================
616// EXAMPLE 8: Channels and Synchronized Shared State
617// =============================================================================
618//
619// This example shows how the small coordination helpers can sit on top of the
620// existing thread-pool machinery.
621//
622
624{
625 print_header("Example 8: bounded_channel / synchronized / spsc_queue");
626
627 ThreadPool pool(4);
631 TaskGroup group(pool);
632
633 group.launch([&] {
634 for (int i = 1; i <= 8; ++i)
635 jobs.send(i);
636 jobs.close();
637 });
638
639 for (int worker = 0; worker < 2; ++worker)
640 group.launch([&] {
641 while (auto job = jobs.recv())
642 {
643 results.with_lock([&](auto &out) {
644 out.push_back(*job * *job);
645 });
646 processed.with_write_lock([](int &count) { ++count; });
647 }
648 });
649
650 group.wait();
651
652 auto sorted_results = results.with_lock([](const auto &out) {
653 return out;
654 });
655 std::sort(sorted_results.begin(), sorted_results.end());
656
657 std::cout << "Squares received through bounded_channel: ";
658 for (const int value : sorted_results)
659 std::cout << value << " ";
660 std::cout << "\n";
661 std::cout << "Items processed: "
662 << processed.with_read_lock([](const int &count) { return count; })
663 << "\n";
664
666 handoff.try_push(10);
667 handoff.try_push(20);
668 auto first = handoff.try_pop();
669 auto second = handoff.try_pop();
670 std::cout << "SPSC queue sample: "
671 << (first.has_value() ? *first : -1) << ", "
672 << (second.has_value() ? *second : -1) << "\n";
673
675 CancellationSource cancel;
676 cancel.request_cancel();
677 try
678 {
679 (void) cancel_demo.recv(cancel.token());
680 }
681 catch (const operation_canceled &)
682 {
683 std::cout << "Cancellation-aware recv interrupted as expected\n";
684 }
685}
686
687// =============================================================================
688// EXAMPLE 9: Performance Comparison
689// =============================================================================
690//
691// This example demonstrates the speedup achieved by parallel execution.
692// The speedup depends on:
693// - Number of CPU cores
694// - Task duration (longer tasks = better speedup)
695// - Task independence (no shared state = better speedup)
696//
697
699{
700 print_header("Example 9: Performance Comparison");
701
702 std::cout << "GOAL: Compare parallel execution vs sequential execution.\n\n";
703
704 const int num_tasks = 1000;
705
706 // CPU-intensive computation
707 auto compute = [](int x) {
708 double result = 0;
709 for (int i = 0; i < 10000; ++i)
710 result += std::sin(x * i * 0.001);
711 return result;
712 };
713
714 // SEQUENTIAL EXECUTION
715 std::cout << "Running " << num_tasks << " tasks sequentially...\n";
716
717 auto seq_start = std::chrono::high_resolution_clock::now();
718 double seq_result = 0;
719 for (int i = 0; i < num_tasks; ++i)
720 seq_result += compute(i);
721 auto seq_end = std::chrono::high_resolution_clock::now();
722 auto seq_ms = std::chrono::duration_cast<std::chrono::milliseconds>(seq_end - seq_start).count();
723
724 // PARALLEL EXECUTION
725 std::cout << "Running " << num_tasks << " tasks in parallel...\n\n";
726
727 ThreadPool pool(std::thread::hardware_concurrency());
728
729 auto par_start = std::chrono::high_resolution_clock::now();
730 std::vector<std::future<double>> futures;
731 futures.reserve(num_tasks);
732 for (int i = 0; i < num_tasks; ++i)
733 futures.push_back(pool.enqueue(compute, i));
734
735 double par_result = 0;
736 for (auto& f : futures)
737 par_result += f.get();
738 auto par_end = std::chrono::high_resolution_clock::now();
739 auto par_ms = std::chrono::duration_cast<std::chrono::milliseconds>(par_end - par_start).count();
740
741 // RESULTS
742 double speedup = (par_ms > 0) ? static_cast<double>(seq_ms) / par_ms : 0;
743
744 std::cout << "┌────────────────────────────────────────┐\n";
745 std::cout << "│ PERFORMANCE RESULTS │\n";
746 std::cout << "├────────────────────────────────────────┤\n";
747 std::cout << "│ Tasks: " << std::setw(24) << num_tasks << " │\n";
748 std::cout << "│ Threads: " << std::setw(24) << pool.num_threads() << " │\n";
749 std::cout << "├────────────────────────────────────────┤\n";
750 std::cout << "│ Sequential: " << std::setw(20) << seq_ms << " ms │\n";
751 std::cout << "│ Parallel: " << std::setw(20) << par_ms << " ms │\n";
752 std::cout << "├────────────────────────────────────────┤\n";
753 std::cout << "│ SPEEDUP: " << std::setw(20) << std::fixed
754 << std::setprecision(1) << speedup << "x │\n";
755 std::cout << "└────────────────────────────────────────┘\n";
756
757 // Verify correctness
758 std::cout << "\n✓ Results match: " << (std::abs(seq_result - par_result) < 0.01 ? "YES" : "NO") << "\n";
759}
760
761// =============================================================================
762// MAIN - Run all examples
763// =============================================================================
764
765int main()
766{
767 std::cout << "\n";
768 std::cout << "╔════════════════════════════════════════════════════════════════╗\n";
769 std::cout << "║ ║\n";
770 std::cout << "║ ALEPH-W THREADPOOL USAGE EXAMPLES ║\n";
771 std::cout << "║ ║\n";
772 std::cout << "║ Learn how to use the ThreadPool for parallel execution ║\n";
773 std::cout << "║ ║\n";
774 std::cout << "╚════════════════════════════════════════════════════════════════╝\n";
775
776 std::cout << "\nThis program demonstrates 9 common ThreadPool/concurrency usage patterns.\n";
777 std::cout << "Read the source code comments for detailed explanations.\n";
778
788
789 std::cout << "\n";
790 std::cout << "╔════════════════════════════════════════════════════════════════╗\n";
791 std::cout << "║ ✓ ALL EXAMPLES COMPLETED SUCCESSFULLY ║\n";
792 std::cout << "║ ║\n";
793 std::cout << "║ QUICK REFERENCE: ║\n";
794 std::cout << "║ enqueue(f, args...) → std::future<T> ║\n";
795 std::cout << "║ enqueue_detached(f, args...) → void (fire-and-forget) ║\n";
796 std::cout << "║ enqueue_bounded(f, args...) → std::future<T> (backpressure)║\n";
797 std::cout << "║ try_enqueue(f, args...) → optional<future> (non-block) ║\n";
798 std::cout << "║ enqueue_bulk(f, container) → vector<future> (batch) ║\n";
799 std::cout << "║ TaskGroup / Cancellation* → structured parallel tasks ║\n";
800 std::cout << "║ parallel_invoke / pscan → composable parallel blocks ║\n";
801 std::cout << "║ bounded_channel / sync* → producer-consumer helpers ║\n";
802 std::cout << "╚════════════════════════════════════════════════════════════════╝\n\n";
803
804 return 0;
805}
Exception handling system with formatted messages for Aleph-w.
#define ah_runtime_error()
Throws std::runtime_error unconditionally.
Definition ah-errors.H:282
Bounded blocking channel for producer-consumer workflows.
Cooperative cancellation source paired with CancellationToken.
CancellationToken token() const noexcept
Return a token observing this source.
void request_cancel() noexcept
Request cancellation for all derived tokens.
Read/write-lock protected shared object wrapper.
Bounded single-producer/single-consumer queue.
Mutex-protected shared object wrapper.
Minimal structured-concurrency helper over ThreadPool futures.
A reusable thread pool for efficient parallel task execution.
bool try_enqueue_detached(F &&f, Args &&... args)
Try to submit a detached task without blocking or throwing.
size_t num_threads() const noexcept
Get the number of worker threads.
void enqueue_detached(F &&f, Args &&... args)
Submit a task without tracking the result (fire-and-forget).
auto enqueue_bulk(F &&f, const Container &args_container) -> std::vector< std::future< std::invoke_result_t< F, typename Container::value_type > > >
Submit multiple tasks and collect all futures.
void wait_all(const std::chrono::milliseconds poll_interval=std::chrono::milliseconds(1))
Wait until all current tasks complete.
std::pair< size_t, size_t > get_queue_limits() const
Get current queue limits.
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task for execution and get a future for the result.
void enqueue_bounded_detached(F &&f, Args &&... args)
Submit a task with backpressure, without tracking result.
size_t pending_tasks() const
Get the number of pending tasks in the queue.
void set_queue_limits(const size_t soft_limit, const size_t hard_limit=std::numeric_limits< size_t >::max())
Set queue limits for bounded enqueue operations.
Exception thrown when cooperative cancellation is observed.
Modern synchronization helpers for channels, shared state, and small producer-consumer queues.
Main namespace for Aleph-w library functions.
Definition ah-arena.H:89
bool completed() const noexcept
Return true if all underlying iterators are finished.
Definition ah-zip.H:136
auto pexclusive_scan(ThreadPool &pool, const Container &c, T init, BinaryOp op, size_t chunk_size=0)
Parallel exclusive scan over a container.
Divide_Conquer_DP_Result< Cost > divide_and_conquer_partition_dp(const size_t groups, const size_t n, Transition_Cost_Fn transition_cost, const Cost inf=dp_optimization_detail::default_inf< Cost >())
Optimize partition DP using divide-and-conquer optimization.
auto pmerge(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Compare comp=Compare{}, size_t chunk_size=0)
Parallel merge of two sorted containers.
T product(const Container &container, const T &init=T{1})
Compute product of all elements.
auto pscan(ThreadPool &pool, const Container &c, BinaryOp op, size_t chunk_size=0)
Parallel inclusive scan over a container.
void parallel_invoke(ThreadPool &pool, Fs &&... fs)
Invoke a small set of related callables in parallel.
Itor::difference_type count(const Itor &beg, const Itor &end, const T &value)
Count elements equal to a value.
Definition ahAlgo.H:127
T sum(const Container &container, const T &init=T{})
Compute sum of all elements.
STL namespace.
static struct argp_option options[]
Definition ntreepic.C:1886
void print_header()
Common configuration object for parallel algorithms.
ThreadPool * pool
Executor to use (nullptr = default_pool()).
std::string filename
ValueArg< size_t > seed
Definition testHash.C:48
A modern, efficient thread pool for parallel task execution.
void example_parallel_building_blocks()
void example_structured_concurrency()
bool is_prime(int n)
void example_load_shedding()
void example_performance()
void example_backpressure()
void example_channels_and_shared_state()
FileResult process_file(const std::string &filename)
void example_fire_and_forget()
void example_batch_processing()
void example_basic_parallel()
int main()