Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
thread_pool_example.cc
Go to the documentation of this file.
1
120#include <thread_pool.H>
121#include <iostream>
122#include <iomanip>
123#include <vector>
124#include <cmath>
125#include <chrono>
126#include <fstream>
127#include <sstream>
128
129using namespace Aleph;
130using namespace std::chrono_literals;
131
132// Helper to print section headers
133void print_header(const std::string& title)
134{
135 std::cout << "\n";
136 std::cout << "+" << std::string(60, '-') << "+\n";
137 std::cout << "| " << std::left << std::setw(58) << title << " |\n";
138 std::cout << "+" << std::string(60, '-') << "+\n\n";
139}
140
141// =============================================================================
142// EXAMPLE 1: Basic Parallel Computation
143// =============================================================================
144//
145// This example shows the fundamental pattern:
146// 1. Create a ThreadPool
147// 2. Submit tasks with enqueue() - returns std::future<T>
148// 3. Collect results with future.get()
149//
150// USE CASE: When you have many independent computations and need all results.
151//
152// KEY CONCEPTS:
153// - enqueue(function, args...) submits a task and returns a future
154// - The future blocks on .get() until the result is ready
155// - Tasks run in parallel across available workers
156//
157
158bool is_prime(int n)
159{
160 if (n < 2) return false;
161 if (n == 2) return true;
162 if (n % 2 == 0) return false;
163 for (int i = 3; i * i <= n; i += 2)
164 if (n % i == 0) return false;
165 return true;
166}
167
169{
170 print_header("Example 1: Basic Parallel Computation");
171
172 std::cout << "GOAL: Find all prime numbers in a range using parallel computation.\n\n";
173
174 const int range_start = 1;
175 const int range_end = 100000;
176
177 // STEP 1: Create ThreadPool
178 // -------------------------
179 // By default, uses std::thread::hardware_concurrency() threads.
180 // You can specify the number: ThreadPool pool(4);
181
182 ThreadPool pool(std::thread::hardware_concurrency());
183 std::cout << "Created ThreadPool with " << pool.num_threads() << " workers\n\n";
184
185 // STEP 2: Submit tasks
186 // --------------------
187 // enqueue() returns std::future<T> where T is the return type of the function.
188 // Tasks are queued and executed by available workers.
189
190 std::cout << "Submitting " << (range_end - range_start + 1) << " tasks...\n";
191
192 std::vector<std::future<bool>> futures;
193 futures.reserve(range_end - range_start + 1);
194
195 auto start = std::chrono::high_resolution_clock::now();
196
197 for (int n = range_start; n <= range_end; ++n)
198 {
199 // Submit task: is_prime(n)
200 // The future will hold the boolean result
201 futures.push_back(pool.enqueue(is_prime, n));
202 }
203
204 // STEP 3: Collect results
205 // -----------------------
206 // future.get() blocks until the result is ready.
207 // Results are collected in submission order (not completion order).
208
209 std::cout << "Collecting results...\n";
210
211 int prime_count = 0;
212 for (size_t i = 0; i < futures.size(); ++i)
213 {
214 if (futures[i].get()) // Blocks until result ready
215 ++prime_count;
216 }
217
218 auto end = std::chrono::high_resolution_clock::now();
219 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
220
221 // RESULTS
222 std::cout << "\n✓ RESULT: Found " << prime_count << " primes in range ["
223 << range_start << ", " << range_end << "]\n";
224 std::cout << " Time: " << duration.count() << " ms\n";
225}
226
227// =============================================================================
228// EXAMPLE 2: Batch Processing with enqueue_bulk()
229// =============================================================================
230//
231// When processing a collection of items with the same function,
232// enqueue_bulk() is more convenient than calling enqueue() in a loop.
233//
234// USE CASE: Processing files, URLs, records, etc. in parallel.
235//
236
238{
239 std::string filename;
242};
243
244FileResult process_file(const std::string& filename)
245{
246 // Simulate file processing (10ms per file)
247 std::this_thread::sleep_for(10ms);
248
249 // Generate fake results based on filename
250 size_t hash = std::hash<std::string>{}(filename);
251 return {
252 filename,
253 static_cast<int>(hash % 1000 + 100),
254 static_cast<int>((hash >> 10) % 100 + 10)
255 };
256}
257
259{
260 print_header("Example 2: Batch Processing with enqueue_bulk()");
261
262 std::cout << "GOAL: Process multiple files in parallel and aggregate results.\n\n";
263
264 // Create a list of files to process
265 std::vector<std::string> files;
266 for (int i = 1; i <= 50; ++i)
267 files.push_back("document_" + std::to_string(i) + ".txt");
268
269 ThreadPool pool(8);
270
271 std::cout << "Processing " << files.size() << " files with "
272 << pool.num_threads() << " workers...\n\n";
273
274 auto start = std::chrono::high_resolution_clock::now();
275
276 // enqueue_bulk() submits the same function for each element in the container
277 // Returns a vector of futures
279
280 // Aggregate results
281 int total_words = 0;
282 int total_lines = 0;
283
284 for (auto& f : futures)
285 {
286 auto result = f.get();
287 total_words += result.word_count;
288 total_lines += result.line_count;
289 }
290
291 auto end = std::chrono::high_resolution_clock::now();
292 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
293
294 std::cout << "✓ RESULT:\n";
295 std::cout << " Files processed: " << files.size() << "\n";
296 std::cout << " Total words: " << total_words << "\n";
297 std::cout << " Total lines: " << total_lines << "\n";
298 std::cout << " Time: " << duration.count() << " ms\n";
299 std::cout << " (Sequential would take ~" << files.size() * 10 << " ms)\n";
300}
301
302// =============================================================================
303// EXAMPLE 3: Fire-and-Forget with enqueue_detached()
304// =============================================================================
305//
306// Sometimes you don't need the result of a task - you just want it to run
307// in the background. enqueue_detached() is perfect for:
308// - Logging
309// - Metrics/telemetry
310// - Cache updates
311// - Notifications
312//
313// BENEFITS:
314// - No future overhead (slightly faster)
315// - Main thread doesn't need to track results
316//
317// WARNING:
318// - Exceptions in detached tasks are silently ignored
319// - Use enqueue() if you need error handling
320//
321
323{
324 print_header("Example 3: Fire-and-Forget with enqueue_detached()");
325
326 std::cout << "GOAL: Perform background logging without blocking main work.\n\n";
327
328 ThreadPool pool(2);
329
330 std::cout << "Main thread does work while logging happens in background:\n\n";
331
332 for (int i = 1; i <= 5; ++i)
333 {
334 // Fire-and-forget: log message asynchronously
335 // Main thread continues immediately
336 pool.enqueue_detached([i] {
337 // Simulate log write latency
338 std::this_thread::sleep_for(5ms);
339 std::ostringstream oss;
340 oss << " [BACKGROUND LOG] Processed item " << i << "\n";
341 std::cout << oss.str();
342 });
343
344 // Main work continues without waiting for log
345 std::cout << "[MAIN THREAD] Working on item " << i << "...\n";
346 std::this_thread::sleep_for(30ms);
347 }
348
349 std::cout << "\n[MAIN THREAD] Main work complete. Waiting for logs...\n";
350 pool.wait_all(); // Ensure all background tasks complete
351 std::cout << "\n✓ All background logging completed\n";
352}
353
354// =============================================================================
355// EXAMPLE 4: Backpressure with enqueue_bounded()
356// =============================================================================
357//
358// PROBLEM: If producers submit tasks faster than workers can process,
359// the queue grows unbounded, potentially causing memory issues.
360//
361// SOLUTION: enqueue_bounded() with queue limits
362// - soft_limit: When queue reaches this size, enqueue BLOCKS
363// - hard_limit: If queue reaches this size, throws queue_overflow_error
364//
365// This creates natural "backpressure" - fast producers slow down automatically.
366//
367
369{
370 print_header("Example 4: Backpressure with enqueue_bounded()");
371
372 std::cout << "GOAL: Prevent queue overflow when producer is faster than consumers.\n\n";
373
374 ThreadPool pool(2);
375
376 // Configure queue limits
377 // soft_limit = 5: Block when queue has 5+ pending tasks
378 // hard_limit = 20: Throw exception if queue exceeds 20 (safety net)
379 pool.set_queue_limits(5, 20);
380
381 auto [soft, hard] = pool.get_queue_limits();
382 std::cout << "Queue limits: soft=" << soft << ", hard=" << hard << "\n\n";
383
384 std::atomic<int> processed{0};
385
386 // Slow consumer task (50ms each)
387 auto slow_task = [&processed] {
388 std::this_thread::sleep_for(50ms);
389 ++processed;
390 };
391
392 std::cout << "Enqueueing 20 slow tasks (50ms each)...\n";
393 std::cout << "Watch how producer is throttled when queue fills up:\n\n";
394
395 auto start = std::chrono::high_resolution_clock::now();
396
397 for (int i = 1; i <= 20; ++i)
398 {
399 auto enqueue_start = std::chrono::high_resolution_clock::now();
400
401 // This will BLOCK if queue is at soft_limit
403
404 auto enqueue_end = std::chrono::high_resolution_clock::now();
405 auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(
406 enqueue_end - enqueue_start).count();
407
408 std::cout << " Task " << std::setw(2) << i
409 << " enqueued (pending: " << pool.pending_tasks() << ")";
410 if (wait_time > 5)
411 std::cout << " ← blocked " << wait_time << "ms";
412 std::cout << "\n";
413 }
414
415 pool.wait_all();
416
417 auto end = std::chrono::high_resolution_clock::now();
418 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
419
420 std::cout << "\n✓ RESULT:\n";
421 std::cout << " Processed: " << processed << " tasks\n";
422 std::cout << " Total time: " << duration.count() << " ms\n";
423 std::cout << " Memory was protected by limiting queue size\n";
424}
425
426// =============================================================================
427// EXAMPLE 5: Load Shedding with try_enqueue()
428// =============================================================================
429//
430// SCENARIO: High-traffic system where you'd rather DROP requests than
431// slow down or run out of memory.
432//
433// try_enqueue() returns immediately:
434// - std::optional<future> if task was queued
435// - std::nullopt if queue is full (at soft_limit)
436//
437// try_enqueue_detached() returns:
438// - true if task was queued
439// - false if queue is full
440//
441// This is useful for:
442// - Web servers that need to reject excess requests
443// - Real-time systems where dropping is better than delaying
444//
445
447{
448 print_header("Example 5: Load Shedding with try_enqueue()");
449
450 std::cout << "GOAL: Reject excess tasks when system is overloaded.\n\n";
451
452 ThreadPool pool(2);
453 pool.set_queue_limits(3, 10); // Accept max 3 pending tasks
454
455 std::atomic<int> accepted{0};
456 int rejected = 0;
457
458 // Slow task (100ms)
459 auto task = [&accepted] {
460 std::this_thread::sleep_for(100ms);
461 ++accepted;
462 };
463
464 std::cout << "Attempting to enqueue 15 tasks with soft_limit=3:\n\n";
465
466 for (int i = 1; i <= 15; ++i)
467 {
468 // try_enqueue_detached returns bool (non-blocking)
469 if (pool.try_enqueue_detached(task))
470 {
471 std::cout << " Task " << std::setw(2) << i << ": ✓ ACCEPTED\n";
472 }
473 else
474 {
475 std::cout << " Task " << std::setw(2) << i << ": ✗ REJECTED (queue full)\n";
476 ++rejected;
477 }
478 }
479
480 pool.wait_all();
481
482 std::cout << "\n✓ RESULT:\n";
483 std::cout << " Accepted and processed: " << accepted << "\n";
484 std::cout << " Rejected (dropped): " << rejected << "\n";
485 std::cout << " System remained responsive - no blocking!\n";
486}
487
488// =============================================================================
489// EXAMPLE 6: Performance Comparison
490// =============================================================================
491//
492// This example demonstrates the speedup achieved by parallel execution.
493// The speedup depends on:
494// - Number of CPU cores
495// - Task duration (longer tasks = better speedup)
496// - Task independence (no shared state = better speedup)
497//
498
500{
501 print_header("Example 6: Performance Comparison");
502
503 std::cout << "GOAL: Compare parallel execution vs sequential execution.\n\n";
504
505 const int num_tasks = 1000;
506
507 // CPU-intensive computation
508 auto compute = [](int x) {
509 double result = 0;
510 for (int i = 0; i < 10000; ++i)
511 result += std::sin(x * i * 0.001);
512 return result;
513 };
514
515 // SEQUENTIAL EXECUTION
516 std::cout << "Running " << num_tasks << " tasks sequentially...\n";
517
518 auto seq_start = std::chrono::high_resolution_clock::now();
519 double seq_result = 0;
520 for (int i = 0; i < num_tasks; ++i)
521 seq_result += compute(i);
522 auto seq_end = std::chrono::high_resolution_clock::now();
523 auto seq_ms = std::chrono::duration_cast<std::chrono::milliseconds>(seq_end - seq_start).count();
524
525 // PARALLEL EXECUTION
526 std::cout << "Running " << num_tasks << " tasks in parallel...\n\n";
527
528 ThreadPool pool(std::thread::hardware_concurrency());
529
530 auto par_start = std::chrono::high_resolution_clock::now();
531 std::vector<std::future<double>> futures;
532 futures.reserve(num_tasks);
533 for (int i = 0; i < num_tasks; ++i)
534 futures.push_back(pool.enqueue(compute, i));
535
536 double par_result = 0;
537 for (auto& f : futures)
538 par_result += f.get();
539 auto par_end = std::chrono::high_resolution_clock::now();
540 auto par_ms = std::chrono::duration_cast<std::chrono::milliseconds>(par_end - par_start).count();
541
542 // RESULTS
543 double speedup = (par_ms > 0) ? static_cast<double>(seq_ms) / par_ms : 0;
544
545 std::cout << "┌────────────────────────────────────────┐\n";
546 std::cout << "│ PERFORMANCE RESULTS │\n";
547 std::cout << "├────────────────────────────────────────┤\n";
548 std::cout << "│ Tasks: " << std::setw(24) << num_tasks << " │\n";
549 std::cout << "│ Threads: " << std::setw(24) << pool.num_threads() << " │\n";
550 std::cout << "├────────────────────────────────────────┤\n";
551 std::cout << "│ Sequential: " << std::setw(20) << seq_ms << " ms │\n";
552 std::cout << "│ Parallel: " << std::setw(20) << par_ms << " ms │\n";
553 std::cout << "├────────────────────────────────────────┤\n";
554 std::cout << "│ SPEEDUP: " << std::setw(20) << std::fixed
555 << std::setprecision(1) << speedup << "x │\n";
556 std::cout << "└────────────────────────────────────────┘\n";
557
558 // Verify correctness
559 std::cout << "\n✓ Results match: " << (std::abs(seq_result - par_result) < 0.01 ? "YES" : "NO") << "\n";
560}
561
562// =============================================================================
563// MAIN - Run all examples
564// =============================================================================
565
566int main()
567{
568 std::cout << "\n";
569 std::cout << "╔════════════════════════════════════════════════════════════════╗\n";
570 std::cout << "║ ║\n";
571 std::cout << "║ ALEPH-W THREADPOOL USAGE EXAMPLES ║\n";
572 std::cout << "║ ║\n";
573 std::cout << "║ Learn how to use the ThreadPool for parallel execution ║\n";
574 std::cout << "║ ║\n";
575 std::cout << "╚════════════════════════════════════════════════════════════════╝\n";
576
577 std::cout << "\nThis program demonstrates 6 common ThreadPool usage patterns.\n";
578 std::cout << "Read the source code comments for detailed explanations.\n";
579
586
587 std::cout << "\n";
588 std::cout << "╔════════════════════════════════════════════════════════════════╗\n";
589 std::cout << "║ ✓ ALL EXAMPLES COMPLETED SUCCESSFULLY ║\n";
590 std::cout << "║ ║\n";
591 std::cout << "║ QUICK REFERENCE: ║\n";
592 std::cout << "║ enqueue(f, args...) → std::future<T> ║\n";
593 std::cout << "║ enqueue_detached(f, args...) → void (fire-and-forget) ║\n";
594 std::cout << "║ enqueue_bounded(f, args...) → std::future<T> (backpressure)║\n";
595 std::cout << "║ try_enqueue(f, args...) → optional<future> (non-block) ║\n";
596 std::cout << "║ enqueue_bulk(f, container) → vector<future> (batch) ║\n";
597 std::cout << "╚════════════════════════════════════════════════════════════════╝\n\n";
598
599 return 0;
600}
size_t size() const noexcept
Count the number of elements of the list.
Definition htlist.H:1319
A reusable thread pool for efficient parallel task execution.
bool try_enqueue_detached(F &&f, Args &&... args)
Try to submit a detached task without blocking or throwing.
size_t num_threads() const noexcept
Get the number of worker threads.
void enqueue_detached(F &&f, Args &&... args)
Submit a task without tracking the result (fire-and-forget).
void wait_all(std::chrono::milliseconds poll_interval=std::chrono::milliseconds(1))
Wait until all current tasks complete.
auto enqueue_bulk(F &&f, const Container &args_container) -> std::vector< std::future< std::invoke_result_t< F, typename Container::value_type > > >
Submit multiple tasks and collect all futures.
void set_queue_limits(size_t soft_limit, size_t hard_limit=std::numeric_limits< size_t >::max())
Set queue limits for bounded enqueue operations.
std::pair< size_t, size_t > get_queue_limits() const
Get current queue limits.
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task for execution and get a future for the result.
void enqueue_bounded_detached(F &&f, Args &&... args)
Submit a task with backpressure, without tracking result.
size_t pending_tasks() const
Get the number of pending tasks in the queue.
Main namespace for Aleph-w library functions.
Definition ah-arena.H:89
DynList< T > maps(const C &c, Op op)
Classic map operation.
void print_header()
std::string filename
A modern, efficient thread pool for parallel task execution.
bool is_prime(int n)
void example_load_shedding()
void example_performance()
void example_backpressure()
FileResult process_file(const std::string &filename)
void example_fire_and_forget()
void example_batch_processing()
void example_basic_parallel()
int main()