94using namespace std::chrono_literals;
100 std::cout <<
"+" << std::string(65,
'-') <<
"+\n";
101 std::cout <<
"| " << std::left << std::setw(63) << title <<
" |\n";
102 std::cout <<
"+" << std::string(65,
'-') <<
"+\n\n";
122 std::cout <<
"GOAL: Transform a large dataset in parallel.\n\n";
124 ThreadPool pool(std::thread::hardware_concurrency());
129 std::cout <<
"Using ThreadPool with " << pool.
num_threads() <<
" workers\n\n";
132 std::vector<int>
numbers(1000000);
135 std::cout <<
"Input: " <<
numbers.size() <<
" integers\n";
137 auto start = std::chrono::high_resolution_clock::now();
141 return static_cast<long long>(x) * x;
144 auto end = std::chrono::high_resolution_clock::now();
145 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
147 std::cout <<
"Output: " <<
squares.size() <<
" squared values\n\n";
150 std::cout <<
"First 5 results: ";
151 for (
size_t i = 0; i < 5; ++i)
152 std::cout <<
squares[i] <<
" ";
155 std::cout <<
"Last 5 results: ";
157 std::cout <<
squares[i] <<
" ";
160 std::cout <<
"✓ Completed in " <<
ms <<
" ms\n";
175 std::cout <<
"GOAL: Find all prime numbers in a range using parallel filtering.\n\n";
177 ThreadPool pool(std::thread::hardware_concurrency());
183 std::cout <<
"Checking " <<
candidates.size() <<
" candidates for primality...\n";
187 if (n < 2)
return false;
188 if (n == 2)
return true;
189 if (n % 2 == 0)
return false;
190 for (
int i = 3; i * i <= n; i += 2)
191 if (n % i == 0)
return false;
195 auto start = std::chrono::high_resolution_clock::now();
199 auto end = std::chrono::high_resolution_clock::now();
200 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
202 std::cout <<
"Found " <<
primes.size() <<
" primes\n\n";
205 std::cout <<
"First 10: ";
206 for (
size_t i = 0; i < 10 && i <
primes.size(); ++i)
207 std::cout <<
primes[i] <<
" ";
210 std::cout <<
"Last 10: ";
212 std::cout <<
primes[i] <<
" ";
215 std::cout <<
"✓ Completed in " <<
ms <<
" ms\n";
234 std::cout <<
"GOAL: Compute sum and product of a large dataset in parallel.\n\n";
236 ThreadPool pool(std::thread::hardware_concurrency());
239 std::vector<double> data(100000);
240 for (
size_t i = 0; i < data.size(); ++i)
241 data[i] = 1.0 + 1.0 / (i + 1);
243 std::cout <<
"Data size: " << data.size() <<
" elements\n\n";
246 auto sum =
pfoldl(pool, data, 0.0, std::plus<double>());
247 std::cout <<
"Sum: " << std::fixed << std::setprecision(2) <<
sum <<
"\n";
251 std::cout <<
"Sum (psum): " <<
sum2 <<
"\n\n";
254 std::vector<std::string> words = {
"Parallel",
" ",
"functional",
" ",
255 "programming",
" ",
"is",
" ",
"powerful!"};
257 auto sentence =
pfoldl(pool, words, std::string{}, std::plus<std::string>());
258 std::cout <<
"Concatenated: \"" <<
sentence <<
"\"\n\n";
260 std::cout <<
"✓ Fold operations completed\n";
280 std::cout <<
"GOAL: Test conditions on large datasets efficiently.\n\n";
282 ThreadPool pool(std::thread::hardware_concurrency());
285 std::vector<int> data(1000000);
286 std::iota(data.begin(), data.end(), 1);
288 std::cout <<
"Dataset: integers 1 to " << data.size() <<
"\n\n";
292 std::cout <<
"All positive? " << (
all_positive ?
"YES" :
"NO") <<
"\n";
295 bool all_even =
pall(pool, data, [](
int x) {
return x % 2 == 0; });
296 std::cout <<
"All even? " << (
all_even ?
"YES" :
"NO") <<
" (short-circuits early!)\n";
300 std::cout <<
"Has number divisible by 12345? " << (
has_special ?
"YES" :
"NO") <<
"\n";
304 std::cout <<
"No negatives? " << (
no_negatives ?
"YES" :
"NO") <<
"\n";
307 size_t sevens =
pcount_if(pool, data, [](
int x) {
return x % 7 == 0; });
308 std::cout <<
"Multiples of 7: " <<
sevens <<
"\n\n";
310 std::cout <<
"✓ Predicate tests completed\n";
324 print_header(
"Example 5: Parallel Find (pfind, pfind_value)");
326 std::cout <<
"GOAL: Search for elements in parallel with early termination.\n\n";
328 ThreadPool pool(std::thread::hardware_concurrency());
331 std::vector<int> data(1000000);
332 std::iota(data.begin(), data.end(), 0);
333 std::mt19937
rng(42);
334 std::shuffle(data.begin(), data.end(),
rng);
336 std::cout <<
"Shuffled dataset of " << data.size() <<
" elements\n\n";
339 auto idx =
pfind(pool, data, [](
int x) {
return x == 500000; });
341 std::cout <<
"Value 500000 found at index " << *idx <<
"\n";
343 std::cout <<
"Value 500000 not found\n";
346 auto val =
pfind_value(pool, data, [](
int x) {
return x > 999990; });
348 std::cout <<
"First value > 999990: " << *val <<
"\n";
350 std::cout <<
"No value > 999990\n";
353 auto missing =
pfind(pool, data, [](
int x) {
return x == -1; });
354 std::cout <<
"Value -1: " << (
missing ?
"found" :
"not found") <<
"\n\n";
356 std::cout <<
"✓ Search operations completed\n";
370 std::cout <<
"GOAL: Compute statistics on large datasets in parallel.\n\n";
372 ThreadPool pool(std::thread::hardware_concurrency());
375 std::vector<double> data(500000);
376 std::mt19937
rng(123);
377 std::uniform_real_distribution<double> dist(-1000.0, 1000.0);
381 std::cout <<
"Dataset: " << data.size() <<
" random doubles in [-1000, 1000]\n\n";
389 std::cout << std::fixed << std::setprecision(4);
390 std::cout <<
"Sum: " <<
sum <<
"\n";
393 std::cout <<
"Min: " << *
min_opt <<
"\n";
395 std::cout <<
"Max: " << *
max_opt <<
"\n";
399 double mean =
sum / data.size();
400 std::cout <<
"Mean: " <<
mean <<
"\n\n";
402 std::cout <<
"✓ Aggregation operations completed\n";
416 std::cout <<
"GOAL: Sort large datasets using parallel merge sort.\n\n";
418 ThreadPool pool(std::thread::hardware_concurrency());
425 std::vector<int> data(500000);
426 std::mt19937
rng(456);
430 std::cout <<
"Dataset: " << data.size() <<
" random integers\n";
431 std::cout <<
"First 10 (unsorted): ";
432 for (
size_t i = 0; i < 10; ++i)
433 std::cout << data[i] <<
" ";
436 auto start = std::chrono::high_resolution_clock::now();
440 auto end = std::chrono::high_resolution_clock::now();
441 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
443 std::cout <<
"First 10 (sorted): ";
444 for (
size_t i = 0; i < 10; ++i)
445 std::cout << data[i] <<
" ";
448 std::cout <<
"Last 10 (sorted): ";
449 for (
size_t i = data.size() - 10; i < data.size(); ++i)
450 std::cout << data[i] <<
" ";
454 bool is_sorted = std::is_sorted(data.begin(), data.end());
455 std::cout <<
"Correctly sorted? " << (
is_sorted ?
"YES" :
"NO") <<
"\n";
456 std::cout <<
"Time: " <<
ms <<
" ms\n\n";
458 std::cout <<
"✓ Parallel sort completed\n";
472 print_header(
"Example 8: Parallel Zip Operations (2 containers)");
474 std::cout <<
"GOAL: Process corresponding elements from two containers.\n\n";
476 ThreadPool pool(std::thread::hardware_concurrency());
482 std::vector<double> a(100000);
483 std::vector<double> b(100000);
484 for (
size_t i = 0; i < a.size(); ++i)
486 a[i] =
static_cast<double>(i);
487 b[i] =
static_cast<double>(i) * 2;
490 std::cout <<
"Vectors a and b, each with " << a.size() <<
" elements\n\n";
497 std::cout <<
"Element-wise products (first 5): ";
498 for (
size_t i = 0; i < 5; ++i)
506 std::cout <<
"Dot product: " << std::fixed << std::setprecision(0)
510 std::atomic<double>
sum{0};
515 std::cout <<
"Sum of all pairs: " <<
sum.load() <<
"\n\n";
517 std::cout <<
"✓ Zip operations completed\n";
535 std::cout <<
"GOAL: Process corresponding elements from 3+ containers.\n\n";
537 ThreadPool pool(std::thread::hardware_concurrency());
543 std::vector<int> x = {1, 2, 3, 4, 5};
544 std::vector<int>
y = {10, 20, 30, 40, 50};
545 std::vector<int> z = {100, 200, 300, 400, 500};
547 std::cout <<
"x = {1, 2, 3, 4, 5}\n";
548 std::cout <<
"y = {10, 20, 30, 40, 50}\n";
549 std::cout <<
"z = {100, 200, 300, 400, 500}\n\n";
556 std::cout <<
"x + y + z = ";
558 std::cout << v <<
" ";
563 return a < b && b < c;
566 std::cout <<
"All x[i] < y[i] < z[i]? " << (
all_ordered ?
"YES" :
"NO") <<
"\n";
570 return a + b + c > 100;
573 std::cout <<
"Triplets with sum > 100: " <<
count <<
"\n\n";
576 std::vector<double> v1 = {1.0, 2.0, 3.0};
577 std::vector<double> v2 = {1.0, 2.0, 3.0};
578 std::vector<double>
v3 = {1.0, 2.0, 3.0};
579 std::vector<double>
v4 = {1.0, 2.0, 3.0};
582 return a * b * c * d;
585 std::cout <<
"v1 * v2 * v3 * v4 = ";
587 std::cout << v <<
" ";
590 std::cout <<
"✓ Variadic zip operations completed\n";
606 std::cout <<
"GOAL: Process elements along with their indices in parallel.\n\n";
608 ThreadPool pool(std::thread::hardware_concurrency());
614 std::vector<int> data(10, 0);
618 x =
static_cast<int>(i * 10);
621 std::cout <<
"After penumerate_for_each (x = i * 10): ";
623 std::cout << x <<
" ";
627 std::vector<std::string> words = {
"apple",
"banana",
"cherry",
"date",
"elderberry"};
630 [](
size_t i,
const std::string& s) {
631 return "[" + std::to_string(i) +
"] " + s;
634 std::cout <<
"Indexed strings:\n";
636 std::cout <<
" " << s <<
"\n";
639 std::cout <<
"✓ Enumerate operations completed\n";
652 print_header(
"Example 11: Scan / Merge / Partition Wrappers");
654 std::cout <<
"GOAL: Use container-level wrappers for scan, merge, and partition.\n\n";
656 ThreadPool pool(std::thread::hardware_concurrency());
661 std::vector<int> values = {1, 2, 3, 4, 5, 6};
665 std::cout <<
"Inclusive scan: ";
667 std::cout << x <<
" ";
668 std::cout <<
"\nExclusive scan: ";
669 for (
int x : exclusive)
670 std::cout << x <<
" ";
673 std::vector<int> left = {1, 3, 5, 7};
674 std::vector<int> right = {2, 4, 6, 8};
675 auto merged =
pmerge(left, right, std::less<int>{},
options);
677 std::cout <<
"Merged sorted ranges: ";
679 std::cout << x <<
" ";
683 std::cout <<
"Partitioned evens: ";
685 std::cout << x <<
" ";
686 std::cout <<
"\nPartitioned odds: ";
688 std::cout << x <<
" ";
691 std::cout <<
"✓ Scan / merge / partition wrappers completed\n";
702 std::cout <<
"GOAL: Compare parallel vs sequential execution times.\n\n";
704 ThreadPool pool(std::thread::hardware_concurrency());
707 std::vector<double> data(2000000);
708 std::iota(data.begin(), data.end(), 1.0);
710 std::cout <<
"Dataset: " << data.size() <<
" elements\n";
711 std::cout <<
"Threads: " << pool.
num_threads() <<
"\n\n";
716 for (
int i = 0; i < 50; ++i)
717 result = std::sin(result) * std::cos(result) + std::sqrt(std::abs(result));
722 std::cout <<
"Running sequential map...\n";
723 auto seq_start = std::chrono::high_resolution_clock::now();
728 auto seq_end = std::chrono::high_resolution_clock::now();
732 std::cout <<
"Running parallel map (pmaps)...\n\n";
733 auto par_start = std::chrono::high_resolution_clock::now();
737 auto par_end = std::chrono::high_resolution_clock::now();
743 std::cout <<
"┌────────────────────────────────────────┐\n";
744 std::cout <<
"│ PERFORMANCE RESULTS │\n";
745 std::cout <<
"├────────────────────────────────────────┤\n";
746 std::cout <<
"│ Sequential: " << std::setw(20) <<
seq_ms <<
" ms │\n";
747 std::cout <<
"│ Parallel: " << std::setw(20) <<
par_ms <<
" ms │\n";
748 std::cout <<
"├────────────────────────────────────────┤\n";
749 std::cout <<
"│ SPEEDUP: " << std::setw(20) << std::fixed
750 << std::setprecision(2) <<
speedup <<
"x │\n";
751 std::cout <<
"└────────────────────────────────────────┘\n";
755 [](
double a,
double b) { return std::abs(a - b) < 1e-10; });
756 std::cout <<
"\n✓ Results match: " << (match ?
"YES" :
"NO") <<
"\n";
766 std::cout <<
"╔════════════════════════════════════════════════════════════════════╗\n";
767 std::cout <<
"║ ║\n";
768 std::cout <<
"║ ALEPH-W PARALLEL FUNCTIONAL PROGRAMMING EXAMPLES ║\n";
769 std::cout <<
"║ ║\n";
770 std::cout <<
"║ ML-style operations (map, filter, fold, zip, etc.) ║\n";
771 std::cout <<
"║ accelerated with multi-threading via ThreadPool ║\n";
772 std::cout <<
"║ ║\n";
773 std::cout <<
"╚════════════════════════════════════════════════════════════════════╝\n";
775 std::cout <<
"\nThis program demonstrates 12 parallel functional programming patterns.\n";
776 std::cout <<
"Read the source code comments for detailed explanations.\n";
792 std::cout <<
"╔════════════════════════════════════════════════════════════════════╗\n";
793 std::cout <<
"║ ✓ ALL EXAMPLES COMPLETED SUCCESSFULLY ║\n";
794 std::cout <<
"║ ║\n";
795 std::cout <<
"║ QUICK REFERENCE: ║\n";
796 std::cout <<
"║ pmaps(pool, c, f) → parallel map ║\n";
797 std::cout <<
"║ pfilter(pool, c, pred) → parallel filter ║\n";
798 std::cout <<
"║ pfoldl(pool, c, init, op) → parallel fold ║\n";
799 std::cout <<
"║ pfor_each(pool, c, f) → parallel for_each ║\n";
800 std::cout <<
"║ pall/pexists/pnone → parallel predicates ║\n";
801 std::cout <<
"║ pfind/pfind_value → parallel search ║\n";
802 std::cout <<
"║ psum/pproduct/pmin/pmax → parallel aggregations ║\n";
803 std::cout <<
"║ ppartition → parallel stable partition ║\n";
804 std::cout <<
"║ pscan/pexclusive_scan → parallel prefix scans ║\n";
805 std::cout <<
"║ pmerge → parallel merge of sorted inputs ║\n";
806 std::cout <<
"║ psort → parallel merge sort ║\n";
807 std::cout <<
"║ pzip_* → parallel zip (2 containers) ║\n";
808 std::cout <<
"║ pzip_*_n → parallel zip (N containers) ║\n";
809 std::cout <<
"║ penumerate_* → parallel enumerate ║\n";
810 std::cout <<
"╚════════════════════════════════════════════════════════════════════╝\n\n";
Parallel functional programming operations using ThreadPool.
void example_parallel_filter()
void example_parallel_sort()
void example_parallel_map()
void example_parallel_find()
void example_parallel_zip()
void example_parallel_aggregations()
void example_parallel_predicates()
void example_performance_comparison()
void example_variadic_zip()
void example_parallel_fold()
void example_parallel_scan_merge_partition()
void example_parallel_enumerate()
A reusable thread pool for efficient parallel task execution.
size_t num_threads() const noexcept
Get the number of worker threads.
Main namespace for Aleph-w library functions.
bool pall(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel all predicate (short-circuit).
bool is_sorted(const Container< T > &cont, const Compare &cmp=Compare())
Check if a container is sorted in ascending order.
bool pnone(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel none predicate.
auto pmaps(ThreadPool &pool, const Container &c, Op op, size_t chunk_size=0)
Parallel map operation.
auto pmin(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel minimum element.
T pzip_foldl(ThreadPool &pool, const Container1 &c1, const Container2 &c2, T init, Op op, size_t chunk_size=0)
Parallel zip + fold.
void pzip_for_each(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Op op, size_t chunk_size=0)
Parallel zip + for_each.
auto penumerate_maps(ThreadPool &pool, const Container &c, Op op, size_t chunk_size=0)
Parallel enumerate with map.
void penumerate_for_each(ThreadPool &pool, Container &c, Op op, size_t chunk_size=0)
Parallel for_each with index (enumerate).
bool pzip_all_n(ThreadPool &pool, Pred pred, const Containers &... cs)
Parallel all predicate over N zipped containers (variadic).
size_t pcount_if(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel count_if operation.
auto pexclusive_scan(ThreadPool &pool, const Container &c, T init, BinaryOp op, size_t chunk_size=0)
Parallel exclusive scan over a container.
std::optional< size_t > pfind(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel find operation (returns index).
Divide_Conquer_DP_Result< Cost > divide_and_conquer_partition_dp(const size_t groups, const size_t n, Transition_Cost_Fn transition_cost, const Cost inf=dp_optimization_detail::default_inf< Cost >())
Optimize partition DP using divide-and-conquer optimization.
void psort(ThreadPool &pool, Container &c, Compare cmp=Compare{}, const size_t min_parallel_size=1024)
Parallel sort (in-place).
auto pfilter(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel filter operation.
auto ppartition(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel partition (stable).
auto pmerge(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Compare comp=Compare{}, size_t chunk_size=0)
Parallel merge of two sorted containers.
auto mean(const Container &data) -> std::decay_t< decltype(*std::begin(data))>
Compute the arithmetic mean.
auto pmax(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel maximum element.
auto pzip_maps_n(ThreadPool &pool, Op op, const Containers &... cs)
Parallel map over N zipped containers (variadic).
auto pscan(ThreadPool &pool, const Container &c, BinaryOp op, size_t chunk_size=0)
Parallel inclusive scan over a container.
T pfoldl(ThreadPool &pool, const Container &c, T init, BinaryOp op, size_t chunk_size=0)
Parallel left fold (reduce).
auto pfind_value(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel find with value return.
auto pzip_maps(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Op op, size_t chunk_size=0)
Parallel zip + map.
T psum(ThreadPool &pool, const Container &c, T init=T{}, size_t chunk_size=0)
Parallel sum of elements.
auto pminmax(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel min and max elements.
size_t pzip_count_if_n(ThreadPool &pool, Pred pred, const Containers &... cs)
Parallel count over N zipped containers (variadic).
bool pexists(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel exists predicate (short-circuit).
Itor::difference_type count(const Itor &beg, const Itor &end, const T &value)
Count elements equal to a value.
T sum(const Container &container, const T &init=T{})
Compute sum of all elements.
static struct argp_option options[]
Common configuration object for parallel algorithms.
ThreadPool * pool
Executor to use (nullptr = default_pool()).