90using namespace std::chrono_literals;
96 std::cout <<
"+" << std::string(65,
'-') <<
"+\n";
97 std::cout <<
"| " << std::left << std::setw(63) <<
title <<
" |\n";
98 std::cout <<
"+" << std::string(65,
'-') <<
"+\n\n";
117 std::cout <<
"GOAL: Transform a large dataset in parallel.\n\n";
119 ThreadPool pool(std::thread::hardware_concurrency());
120 std::cout <<
"Using ThreadPool with " << pool.
num_threads() <<
" workers\n\n";
123 std::vector<int>
numbers(1000000);
126 std::cout <<
"Input: " <<
numbers.
size() <<
" integers\n";
129 auto start = std::chrono::high_resolution_clock::now();
133 return static_cast<long long>(x) * x;
136 auto end = std::chrono::high_resolution_clock::now();
137 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
139 std::cout <<
"Output: " <<
squares.
size() <<
" squared values\n\n";
142 std::cout <<
"First 5 results: ";
143 for (
size_t i = 0; i < 5; ++i)
144 std::cout <<
squares[i] <<
" ";
147 std::cout <<
"Last 5 results: ";
149 std::cout <<
squares[i] <<
" ";
152 std::cout <<
"✓ Completed in " <<
ms <<
" ms\n";
167 std::cout <<
"GOAL: Find all prime numbers in a range using parallel filtering.\n\n";
169 ThreadPool pool(std::thread::hardware_concurrency());
175 std::cout <<
"Checking " <<
candidates.
size() <<
" candidates for primality...\n";
179 if (n < 2)
return false;
180 if (n == 2)
return true;
181 if (n % 2 == 0)
return false;
182 for (
int i = 3; i * i <= n; i += 2)
183 if (n % i == 0)
return false;
187 auto start = std::chrono::high_resolution_clock::now();
191 auto end = std::chrono::high_resolution_clock::now();
192 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
194 std::cout <<
"Found " <<
primes.
size() <<
" primes\n\n";
197 std::cout <<
"First 10: ";
198 for (
size_t i = 0; i < 10 && i <
primes.size(); ++i)
199 std::cout <<
primes[i] <<
" ";
202 std::cout <<
"Last 10: ";
204 std::cout <<
primes[i] <<
" ";
207 std::cout <<
"✓ Completed in " <<
ms <<
" ms\n";
226 std::cout <<
"GOAL: Compute sum and product of a large dataset in parallel.\n\n";
228 ThreadPool pool(std::thread::hardware_concurrency());
231 std::vector<double> data(100000);
232 for (
size_t i = 0; i < data.size(); ++i)
233 data[i] = 1.0 + 1.0 / (i + 1);
235 std::cout <<
"Data size: " << data.size() <<
" elements\n\n";
238 auto sum =
pfoldl(pool, data, 0.0, std::plus<double>());
239 std::cout <<
"Sum: " << std::fixed << std::setprecision(2) <<
sum <<
"\n";
243 std::cout <<
"Sum (psum): " <<
sum2 <<
"\n\n";
246 std::vector<std::string> words = {
"Parallel",
" ",
"functional",
" ",
247 "programming",
" ",
"is",
" ",
"powerful!"};
249 auto sentence =
pfoldl(pool, words, std::string{}, std::plus<std::string>());
250 std::cout <<
"Concatenated: \"" <<
sentence <<
"\"\n\n";
252 std::cout <<
"✓ Fold operations completed\n";
272 std::cout <<
"GOAL: Test conditions on large datasets efficiently.\n\n";
274 ThreadPool pool(std::thread::hardware_concurrency());
277 std::vector<int> data(1000000);
278 std::iota(data.begin(), data.end(), 1);
280 std::cout <<
"Dataset: integers 1 to " << data.size() <<
"\n\n";
284 std::cout <<
"All positive? " << (
all_positive ?
"YES" :
"NO") <<
"\n";
287 bool all_even =
pall(pool, data, [](
int x) {
return x % 2 == 0; });
288 std::cout <<
"All even? " << (
all_even ?
"YES" :
"NO") <<
" (short-circuits early!)\n";
292 std::cout <<
"Has number divisible by 12345? " << (
has_special ?
"YES" :
"NO") <<
"\n";
296 std::cout <<
"No negatives? " << (
no_negatives ?
"YES" :
"NO") <<
"\n";
299 size_t sevens =
pcount_if(pool, data, [](
int x) {
return x % 7 == 0; });
300 std::cout <<
"Multiples of 7: " <<
sevens <<
"\n\n";
302 std::cout <<
"✓ Predicate tests completed\n";
316 print_header(
"Example 5: Parallel Find (pfind, pfind_value)");
318 std::cout <<
"GOAL: Search for elements in parallel with early termination.\n\n";
320 ThreadPool pool(std::thread::hardware_concurrency());
323 std::vector<int> data(1000000);
324 std::iota(data.begin(), data.end(), 0);
325 std::mt19937
rng(42);
326 std::shuffle(data.begin(), data.end(),
rng);
328 std::cout <<
"Shuffled dataset of " << data.size() <<
" elements\n\n";
331 auto idx =
pfind(pool, data, [](
int x) {
return x == 500000; });
333 std::cout <<
"Value 500000 found at index " << *idx <<
"\n";
335 std::cout <<
"Value 500000 not found\n";
338 auto val =
pfind_value(pool, data, [](
int x) {
return x > 999990; });
340 std::cout <<
"First value > 999990: " << *val <<
"\n";
342 std::cout <<
"No value > 999990\n";
345 auto missing =
pfind(pool, data, [](
int x) {
return x == -1; });
346 std::cout <<
"Value -1: " << (
missing ?
"found" :
"not found") <<
"\n\n";
348 std::cout <<
"✓ Search operations completed\n";
362 std::cout <<
"GOAL: Compute statistics on large datasets in parallel.\n\n";
364 ThreadPool pool(std::thread::hardware_concurrency());
367 std::vector<double> data(500000);
368 std::mt19937
rng(123);
369 std::uniform_real_distribution<double> dist(-1000.0, 1000.0);
373 std::cout <<
"Dataset: " << data.size() <<
" random doubles in [-1000, 1000]\n\n";
381 std::cout << std::fixed << std::setprecision(4);
382 std::cout <<
"Sum: " <<
sum <<
"\n";
385 std::cout <<
"Min: " << *
min_opt <<
"\n";
387 std::cout <<
"Max: " << *
max_opt <<
"\n";
391 double mean =
sum / data.size();
392 std::cout <<
"Mean: " <<
mean <<
"\n\n";
394 std::cout <<
"✓ Aggregation operations completed\n";
408 std::cout <<
"GOAL: Sort large datasets using parallel merge sort.\n\n";
410 ThreadPool pool(std::thread::hardware_concurrency());
413 std::vector<int> data(500000);
414 std::mt19937
rng(456);
418 std::cout <<
"Dataset: " << data.size() <<
" random integers\n";
419 std::cout <<
"First 10 (unsorted): ";
420 for (
size_t i = 0; i < 10; ++i)
421 std::cout << data[i] <<
" ";
424 auto start = std::chrono::high_resolution_clock::now();
428 auto end = std::chrono::high_resolution_clock::now();
429 auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
431 std::cout <<
"First 10 (sorted): ";
432 for (
size_t i = 0; i < 10; ++i)
433 std::cout << data[i] <<
" ";
436 std::cout <<
"Last 10 (sorted): ";
437 for (
size_t i = data.size() - 10; i < data.size(); ++i)
438 std::cout << data[i] <<
" ";
442 bool is_sorted = std::is_sorted(data.begin(), data.end());
443 std::cout <<
"Correctly sorted? " << (
is_sorted ?
"YES" :
"NO") <<
"\n";
444 std::cout <<
"Time: " <<
ms <<
" ms\n\n";
446 std::cout <<
"✓ Parallel sort completed\n";
460 print_header(
"Example 8: Parallel Zip Operations (2 containers)");
462 std::cout <<
"GOAL: Process corresponding elements from two containers.\n\n";
464 ThreadPool pool(std::thread::hardware_concurrency());
467 std::vector<double> a(100000);
468 std::vector<double> b(100000);
469 for (
size_t i = 0; i < a.size(); ++i)
471 a[i] =
static_cast<double>(i);
472 b[i] =
static_cast<double>(i) * 2;
475 std::cout <<
"Vectors a and b, each with " << a.size() <<
" elements\n\n";
482 std::cout <<
"Element-wise products (first 5): ";
483 for (
size_t i = 0; i < 5; ++i)
489 [](
double acc,
double x,
double y) {
return acc + x *
y; });
491 std::cout <<
"Dot product: " << std::fixed << std::setprecision(0)
495 std::atomic<double>
sum{0};
500 std::cout <<
"Sum of all pairs: " <<
sum.load() <<
"\n\n";
502 std::cout <<
"✓ Zip operations completed\n";
520 std::cout <<
"GOAL: Process corresponding elements from 3+ containers.\n\n";
522 ThreadPool pool(std::thread::hardware_concurrency());
525 std::vector<int> x = {1, 2, 3, 4, 5};
526 std::vector<int>
y = {10, 20, 30, 40, 50};
527 std::vector<int>
z = {100, 200, 300, 400, 500};
529 std::cout <<
"x = {1, 2, 3, 4, 5}\n";
530 std::cout <<
"y = {10, 20, 30, 40, 50}\n";
531 std::cout <<
"z = {100, 200, 300, 400, 500}\n\n";
538 std::cout <<
"x + y + z = ";
540 std::cout << v <<
" ";
545 return a < b && b < c;
548 std::cout <<
"All x[i] < y[i] < z[i]? " << (
all_ordered ?
"YES" :
"NO") <<
"\n";
552 return a + b + c > 100;
555 std::cout <<
"Triplets with sum > 100: " <<
count <<
"\n\n";
558 std::vector<double> v1 = {1.0, 2.0, 3.0};
559 std::vector<double> v2 = {1.0, 2.0, 3.0};
560 std::vector<double>
v3 = {1.0, 2.0, 3.0};
561 std::vector<double>
v4 = {1.0, 2.0, 3.0};
564 return a * b * c * d;
567 std::cout <<
"v1 * v2 * v3 * v4 = ";
569 std::cout << v <<
" ";
572 std::cout <<
"✓ Variadic zip operations completed\n";
588 std::cout <<
"GOAL: Process elements along with their indices in parallel.\n\n";
590 ThreadPool pool(std::thread::hardware_concurrency());
593 std::vector<int> data(10, 0);
597 x =
static_cast<int>(i * 10);
600 std::cout <<
"After penumerate_for_each (x = i * 10): ";
602 std::cout << x <<
" ";
606 std::vector<std::string> words = {
"apple",
"banana",
"cherry",
"date",
"elderberry"};
609 [](
size_t i,
const std::string& s) {
610 return "[" + std::to_string(i) +
"] " + s;
613 std::cout <<
"Indexed strings:\n";
615 std::cout <<
" " << s <<
"\n";
618 std::cout <<
"✓ Enumerate operations completed\n";
629 std::cout <<
"GOAL: Compare parallel vs sequential execution times.\n\n";
631 ThreadPool pool(std::thread::hardware_concurrency());
634 std::vector<double> data(2000000);
635 std::iota(data.begin(), data.end(), 1.0);
637 std::cout <<
"Dataset: " << data.size() <<
" elements\n";
638 std::cout <<
"Threads: " << pool.
num_threads() <<
"\n\n";
643 for (
int i = 0; i < 50; ++i)
644 result = std::sin(result) * std::cos(result) + std::sqrt(std::abs(result));
649 std::cout <<
"Running sequential map...\n";
650 auto seq_start = std::chrono::high_resolution_clock::now();
655 auto seq_end = std::chrono::high_resolution_clock::now();
659 std::cout <<
"Running parallel map (pmaps)...\n\n";
660 auto par_start = std::chrono::high_resolution_clock::now();
664 auto par_end = std::chrono::high_resolution_clock::now();
670 std::cout <<
"┌────────────────────────────────────────┐\n";
671 std::cout <<
"│ PERFORMANCE RESULTS │\n";
672 std::cout <<
"├────────────────────────────────────────┤\n";
673 std::cout <<
"│ Sequential: " << std::setw(20) <<
seq_ms <<
" ms │\n";
674 std::cout <<
"│ Parallel: " << std::setw(20) <<
par_ms <<
" ms │\n";
675 std::cout <<
"├────────────────────────────────────────┤\n";
676 std::cout <<
"│ SPEEDUP: " << std::setw(20) << std::fixed
677 << std::setprecision(2) <<
speedup <<
"x │\n";
678 std::cout <<
"└────────────────────────────────────────┘\n";
682 [](
double a,
double b) { return std::abs(a - b) < 1e-10; });
683 std::cout <<
"\n✓ Results match: " << (
match ?
"YES" :
"NO") <<
"\n";
693 std::cout <<
"╔════════════════════════════════════════════════════════════════════╗\n";
694 std::cout <<
"║ ║\n";
695 std::cout <<
"║ ALEPH-W PARALLEL FUNCTIONAL PROGRAMMING EXAMPLES ║\n";
696 std::cout <<
"║ ║\n";
697 std::cout <<
"║ ML-style operations (map, filter, fold, zip, etc.) ║\n";
698 std::cout <<
"║ accelerated with multi-threading via ThreadPool ║\n";
699 std::cout <<
"║ ║\n";
700 std::cout <<
"╚════════════════════════════════════════════════════════════════════╝\n";
702 std::cout <<
"\nThis program demonstrates 11 parallel functional programming patterns.\n";
703 std::cout <<
"Read the source code comments for detailed explanations.\n";
718 std::cout <<
"╔════════════════════════════════════════════════════════════════════╗\n";
719 std::cout <<
"║ ✓ ALL EXAMPLES COMPLETED SUCCESSFULLY ║\n";
720 std::cout <<
"║ ║\n";
721 std::cout <<
"║ QUICK REFERENCE: ║\n";
722 std::cout <<
"║ pmaps(pool, c, f) → parallel map ║\n";
723 std::cout <<
"║ pfilter(pool, c, pred) → parallel filter ║\n";
724 std::cout <<
"║ pfoldl(pool, c, init, op) → parallel fold ║\n";
725 std::cout <<
"║ pfor_each(pool, c, f) → parallel for_each ║\n";
726 std::cout <<
"║ pall/pexists/pnone → parallel predicates ║\n";
727 std::cout <<
"║ pfind/pfind_value → parallel search ║\n";
728 std::cout <<
"║ psum/pproduct/pmin/pmax → parallel aggregations ║\n";
729 std::cout <<
"║ psort → parallel merge sort ║\n";
730 std::cout <<
"║ pzip_* → parallel zip (2 containers) ║\n";
731 std::cout <<
"║ pzip_*_n → parallel zip (N containers) ║\n";
732 std::cout <<
"║ penumerate_* → parallel enumerate ║\n";
733 std::cout <<
"╚════════════════════════════════════════════════════════════════════╝\n\n";
Parallel functional programming operations using ThreadPool.
void example_parallel_filter()
void example_parallel_sort()
void example_parallel_map()
void example_parallel_find()
void example_parallel_zip()
void example_parallel_aggregations()
void example_parallel_predicates()
void example_performance_comparison()
void example_variadic_zip()
void example_parallel_fold()
void example_parallel_enumerate()
size_t size() const noexcept
Count the number of elements of the list.
A reusable thread pool for efficient parallel task execution.
size_t num_threads() const noexcept
Get the number of worker threads.
iterator end() noexcept
Return an STL-compatible end iterator.
iterator begin() noexcept
Return an STL-compatible iterator to the first element.
Main namespace for Aleph-w library functions.
bool pall(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel all predicate (short-circuit).
bool is_sorted(const Container< T > &cont, const Compare &cmp=Compare())
Check if a container is sorted in ascending order.
bool pnone(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel none predicate.
auto pmaps(ThreadPool &pool, const Container &c, Op op, size_t chunk_size=0)
Parallel map operation.
auto pmin(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel minimum element.
T pzip_foldl(ThreadPool &pool, const Container1 &c1, const Container2 &c2, T init, Op op, size_t chunk_size=0)
Parallel zip + fold.
void pzip_for_each(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Op op, size_t chunk_size=0)
Parallel zip + for_each.
auto penumerate_maps(ThreadPool &pool, const Container &c, Op op, size_t chunk_size=0)
Parallel enumerate with map.
void penumerate_for_each(ThreadPool &pool, Container &c, Op op, size_t chunk_size=0)
Parallel for_each with index (enumerate).
bool pzip_all_n(ThreadPool &pool, Pred pred, const Containers &... cs)
Parallel all predicate over N zipped containers (variadic).
size_t pcount_if(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel count_if operation.
std::optional< size_t > pfind(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel find operation (returns index).
void psort(ThreadPool &pool, Container &c, Compare cmp=Compare{}, const size_t min_parallel_size=1024)
Parallel sort (in-place).
auto pfilter(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel filter operation.
auto mean(const Container &data) -> std::decay_t< decltype(*std::begin(data))>
Compute the arithmetic mean.
auto pmax(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel maximum element.
auto pzip_maps_n(ThreadPool &pool, Op op, const Containers &... cs)
Parallel map over N zipped containers (variadic).
T pfoldl(ThreadPool &pool, const Container &c, T init, BinaryOp op, size_t chunk_size=0)
Parallel left fold (reduce).
auto pfind_value(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel find with value return.
auto pzip_maps(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Op op, size_t chunk_size=0)
Parallel zip + map.
T psum(ThreadPool &pool, const Container &c, T init=T{}, size_t chunk_size=0)
Parallel sum of elements.
auto pminmax(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel min and max elements.
size_t pzip_count_if_n(ThreadPool &pool, Pred pred, const Containers &... cs)
Parallel count over N zipped containers (variadic).
bool pexists(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel exists predicate (short-circuit).
DynList< T > maps(const C &c, Op op)
Classic map operation.
Itor::difference_type count(const Itor &beg, const Itor &end, const T &value)
Count elements equal to a value.
T sum(const Container &container, const T &init=T{})
Compute sum of all elements.