Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
ah-parallel.H
Go to the documentation of this file.
1/*
2 Aleph_w
3
4 Data structures & Algorithms
5 version 2.0.0b
6 https://github.com/lrleon/Aleph-w
7
8 This file is part of Aleph-w library
9
10 Copyright (c) 2002-2026 Leandro Rabindranath Leon
11
12 Permission is hereby granted, free of charge, to any person obtaining a copy
13 of this software and associated documentation files (the "Software"), to deal
14 in the Software without restriction, including without limitation the rights
15 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 copies of the Software, and to permit persons to whom the Software is
17 furnished to do so, subject to the following conditions:
18
19 The above copyright notice and this permission notice shall be included in all
20 copies or substantial portions of the Software.
21
22 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 SOFTWARE.
29*/
30
31
32#ifndef AH_PARALLEL_H
33#define AH_PARALLEL_H
34
111#include <vector>
112#include <atomic>
113#include <optional>
114#include <algorithm>
115#include <numeric>
116#include <type_traits>
117#include <iterator>
118#include <functional>
119#include <thread_pool.H>
120
121namespace Aleph
122{
123 // =============================================================================
124 // Implementation Details
125 // =============================================================================
126
127 namespace parallel_detail
128 {
130 inline size_t chunk_size(const size_t n, const size_t num_threads, const size_t min_chunk = 64)
131 {
132 if (n == 0) return 1;
133 // Use more chunks than threads for better load balancing
134 const size_t chunks = num_threads * 4;
135 const size_t size = (n + chunks - 1) / chunks;
136 return std::max(size, min_chunk);
137 }
138
140 template <typename Container>
141 constexpr bool has_random_access()
142 {
143 using It = decltype(std::begin(std::declval<Container &>()));
144 return std::is_base_of_v<std::random_access_iterator_tag,
145 typename std::iterator_traits<It>::iterator_category>;
146 }
147
150 template <typename Container>
152 {
153 if constexpr (has_random_access<Container>())
154 return &c; // Return pointer to original
155 else
156 return std::make_unique<std::vector<typename Container::value_type>>(std::begin(c), std::end(c));
157 }
158
160 template <typename T>
161 decltype(auto) deref(T && ptr)
162 {
163 if constexpr (std::is_pointer_v<std::decay_t<T>>)
164 return *ptr;
165 else
166 return *ptr; // unique_ptr also supports *
167 }
168 } // namespace parallel_detail
169
170 // =============================================================================
171 // Parallel Map
172 // =============================================================================
173
204 template <typename ResultT = void, typename Container, typename Op>
205 [[nodiscard]] auto pmaps(ThreadPool & pool, const Container & c, Op op,
206 size_t chunk_size = 0)
207 {
208 using InputT = std::decay_t<decltype(*std::begin(c))>;
209 using ActualResultT = std::conditional_t<
210 std::is_void_v<ResultT>,
211 std::invoke_result_t<Op, const InputT &>,
212 ResultT>;
213
214 const size_t n = std::distance(std::begin(c), std::end(c));
215 if (n == 0)
216 return std::vector<ActualResultT>{};
217
218 if (chunk_size == 0)
219 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
220
221 // Ensure random access for parallel processing
223 const auto & data = parallel_detail::deref(data_holder);
224
225 std::vector<ActualResultT> result(n);
226 std::vector<std::future<void>> futures;
227
228 size_t offset = 0;
229
230 while (offset < n)
231 {
232 size_t chunk_end = std::min(offset + chunk_size, n);
233
234 futures.push_back(pool.enqueue([&result, &data, op, offset, chunk_end]()
235 {
236 auto in_it = std::begin(data);
237 std::advance(in_it, offset);
238 for (size_t i = offset; i < chunk_end; ++i, ++in_it)
239 result[i] = op(*in_it);
240 }));
241
243 }
244
245 for (auto & f: futures)
246 f.get();
247
248 return result;
249 }
250
251 // =============================================================================
252 // Parallel Filter
253 // =============================================================================
254
282 template <typename Container, typename Pred>
283 [[nodiscard]] auto pfilter(ThreadPool & pool, const Container & c, Pred pred,
284 size_t chunk_size = 0)
285 {
286 using T = std::decay_t<decltype(*std::begin(c))>;
287
288 const size_t n = std::distance(std::begin(c), std::end(c));
289 if (n == 0)
290 return std::vector<T>{};
291
292 if (chunk_size == 0)
293 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
294
296 const auto & data = parallel_detail::deref(data_holder);
297
298 // Each chunk produces its own filtered result
299 std::vector<std::future<std::vector<T>>> futures;
300
301 size_t offset = 0;
302 while (offset < n)
303 {
304 size_t chunk_end = std::min(offset + chunk_size, n);
305
306 futures.push_back(pool.enqueue([&data, pred, offset, chunk_end]()
307 {
308 std::vector<T> chunk_result;
309 auto it = std::begin(data);
310 std::advance(it, offset);
311 for (size_t i = offset; i < chunk_end; ++i, ++it)
312 if (pred(*it))
313 chunk_result.push_back(*it);
314 return chunk_result;
315 }));
316
318 }
319
320 // Merge results in order
321 std::vector<T> result;
322 for (auto & f: futures)
323 {
324 auto chunk_result = f.get();
325 result.insert(result.end(),
326 std::make_move_iterator(chunk_result.begin()),
327 std::make_move_iterator(chunk_result.end()));
328 }
329
330 return result;
331 }
332
333 // =============================================================================
334 // Parallel Fold/Reduce
335 // =============================================================================
336
374 template <typename T, typename Container, typename BinaryOp>
375 [[nodiscard]] T pfoldl(ThreadPool & pool, const Container & c, T init, BinaryOp op,
376 size_t chunk_size = 0)
377 {
378 const size_t n = std::distance(std::begin(c), std::end(c));
379 if (n == 0)
380 return init;
381
382 if (chunk_size == 0)
383 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
384
386 const auto & data = parallel_detail::deref(data_holder);
387
388 std::vector<std::future<T>> futures;
389
390 size_t offset = 0;
391 while (offset < n)
392 {
393 size_t chunk_end = std::min(offset + chunk_size, n);
394
395 futures.push_back(pool.enqueue([&data, op, offset, chunk_end]()
396 {
397 auto it = std::begin(data);
398 std::advance(it, offset);
399 T local = *it++;
400 for (size_t i = offset + 1; i < chunk_end; ++i, ++it)
401 local = op(local, *it);
402 return local;
403 }));
404
406 }
407
408 // Combine partial results
409 T result = init;
410 for (auto & f: futures)
411 result = op(result, f.get());
412
413 return result;
414 }
415
416 // =============================================================================
417 // Parallel For Each
418 // =============================================================================
419
450 template <typename Container, typename Op>
451 void pfor_each(ThreadPool & pool, Container & c, Op op, size_t chunk_size = 0)
452 {
453 const size_t n = std::distance(std::begin(c), std::end(c));
454 if (n == 0)
455 return;
456
457 if (chunk_size == 0)
458 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
459
460 std::vector<std::future<void>> futures;
461 size_t offset = 0;
462
463 while (offset < n)
464 {
465 size_t chunk_end = std::min(offset + chunk_size, n);
466
467 futures.push_back(pool.enqueue([&c, op, offset, chunk_end]()
468 {
469 auto it = std::begin(c);
470 std::advance(it, offset);
471 for (size_t i = offset; i < chunk_end; ++i, ++it)
472 op(*it);
473 }));
474
476 }
477
478 for (auto & f: futures)
479 f.get();
480 }
481
494 template <typename Container, typename Op>
495 void pfor_each(ThreadPool & pool, const Container & c, Op op, size_t chunk_size = 0)
496 {
497 const size_t n = std::distance(std::begin(c), std::end(c));
498 if (n == 0)
499 return;
500
501 if (chunk_size == 0)
502 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
503
505 const auto & data = parallel_detail::deref(data_holder);
506
507 std::vector<std::future<void>> futures;
508
509 size_t offset = 0;
510 while (offset < n)
511 {
512 size_t chunk_end = std::min(offset + chunk_size, n);
513
514 futures.push_back(pool.enqueue([&data, op, offset, chunk_end]()
515 {
516 auto it = std::begin(data);
517 std::advance(it, offset);
518 for (size_t i = offset; i < chunk_end; ++i, ++it)
519 op(*it);
520 }));
521
523 }
524
525 for (auto & f: futures)
526 f.get();
527 }
528
529 // =============================================================================
530 // Parallel Predicates (all, exists, none)
531 // =============================================================================
532
555 template <typename Container, typename Pred>
556 [[nodiscard]] bool pall(ThreadPool & pool, const Container & c, Pred pred,
557 size_t chunk_size = 0)
558 {
559 const size_t n = std::distance(std::begin(c), std::end(c));
560 if (n == 0)
561 return true;
562
563 if (chunk_size == 0)
564 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
565
567 const auto & data = parallel_detail::deref(data_holder);
568
569 std::atomic<bool> found_false{false};
570 std::vector<std::future<void>> futures;
571
572 size_t offset = 0;
573 while (offset < n)
574 {
575 size_t chunk_end = std::min(offset + chunk_size, n);
576
577 futures.push_back(pool.enqueue([&data, pred, &found_false, offset, chunk_end]()
578 {
579 if (found_false.load(std::memory_order_relaxed))
580 return; // Short-circuit
581
582 auto it = std::begin(data);
583 std::advance(it, offset);
584 for (size_t i = offset; i < chunk_end; ++i, ++it)
585 {
586 if (not pred(*it))
587 {
588 found_false.store(true, std::memory_order_relaxed);
589 return;
590 }
591 if (found_false.load(std::memory_order_relaxed))
592 return;
593 }
594 }));
595
597 }
598
599 for (auto & f: futures)
600 f.get();
601
602 return not found_false.load();
603 }
604
626 template <typename Container, typename Pred>
627 [[nodiscard]] bool pexists(ThreadPool & pool, const Container & c, Pred pred,
628 size_t chunk_size = 0)
629 {
630 const size_t n = std::distance(std::begin(c), std::end(c));
631 if (n == 0)
632 return false;
633
634 if (chunk_size == 0)
635 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
636
637 auto data_holder = parallel_detail::ensure_random_access(c);
638 const auto & data = parallel_detail::deref(data_holder);
639
640 std::atomic<bool> found{false};
641 std::vector<std::future<void>> futures;
642
643 size_t offset = 0;
644 while (offset < n)
645 {
646 size_t chunk_end = std::min(offset + chunk_size, n);
647
648 futures.push_back(pool.enqueue([&data, pred, &found, offset, chunk_end]()
649 {
650 if (found.load(std::memory_order_relaxed))
651 return; // Short-circuit
652
653 auto it = std::begin(data);
654 std::advance(it, offset);
655 for (size_t i = offset; i < chunk_end; ++i, ++it)
656 {
657 if (pred(*it))
658 {
659 found.store(true, std::memory_order_relaxed);
660 return;
661 }
662 if (found.load(std::memory_order_relaxed))
663 return;
664 }
665 }));
666
667 offset = chunk_end;
668 }
669
670 for (auto & f: futures)
671 f.get();
672
673 return found.load();
674 }
675
689 template <typename Container, typename Pred>
690 [[nodiscard]] bool pnone(ThreadPool & pool, const Container & c, Pred pred,
691 size_t chunk_size = 0)
692 {
693 return not pexists(pool, c, pred, chunk_size);
694 }
695
696 // =============================================================================
697 // Parallel Count
698 // =============================================================================
699
721 template <typename Container, typename Pred>
722 [[nodiscard]] size_t pcount_if(ThreadPool & pool, const Container & c, Pred pred,
723 size_t chunk_size = 0)
724 {
725 const size_t n = std::distance(std::begin(c), std::end(c));
726 if (n == 0)
727 return 0;
728
729 if (chunk_size == 0)
730 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
731
732 auto data_holder = parallel_detail::ensure_random_access(c);
733 const auto & data = parallel_detail::deref(data_holder);
734
735 std::vector<std::future<size_t>> futures;
736
737 size_t offset = 0;
738 while (offset < n)
739 {
740 size_t chunk_end = std::min(offset + chunk_size, n);
741
742 futures.push_back(pool.enqueue([&data, pred, offset, chunk_end]()
743 {
744 size_t count = 0;
745 auto it = std::begin(data);
746 std::advance(it, offset);
747 for (size_t i = offset; i < chunk_end; ++i, ++it)
748 if (pred(*it))
749 ++count;
750 return count;
751 }));
752
753 offset = chunk_end;
754 }
755
756 size_t total = 0;
757 for (auto & f: futures)
758 total += f.get();
759
760 return total;
761 }
762
763 // =============================================================================
764 // Parallel Find
765 // =============================================================================
766
792 template <typename Container, typename Pred>
793 [[nodiscard]] std::optional<size_t> pfind(ThreadPool & pool, const Container & c,
794 Pred pred, size_t chunk_size = 0)
795 {
796 const size_t n = std::distance(std::begin(c), std::end(c));
797 if (n == 0)
798 return std::nullopt;
799
800 if (chunk_size == 0)
801 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
802
803 auto data_holder = parallel_detail::ensure_random_access(c);
804 const auto & data = parallel_detail::deref(data_holder);
805
806 // Track minimum found index
807 std::atomic<size_t> min_index{n}; // n means not found
808 std::vector<std::future<void>> futures;
809
810 size_t offset = 0;
811 while (offset < n)
812 {
813 size_t chunk_end = std::min(offset + chunk_size, n);
814
815 futures.push_back(pool.enqueue([&data, pred, &min_index, offset, chunk_end]()
816 {
817 // Skip if we already found something earlier
818 if (min_index.load(std::memory_order_relaxed) <= offset)
819 return;
820
821 auto it = std::begin(data);
822 std::advance(it, offset);
823 for (size_t i = offset; i < chunk_end; ++i, ++it)
824 {
825 // Stop if earlier match found
826 if (min_index.load(std::memory_order_relaxed) <= i)
827 return;
828
829 if (pred(*it))
830 {
831 // Atomically update minimum
832 size_t expected = min_index.load(std::memory_order_relaxed);
833 while (i < expected and
834 not min_index.compare_exchange_weak(expected, i,
835 std::memory_order_relaxed));
836 return;
837 }
838 }
839 }));
840
841 offset = chunk_end;
842 }
843
844 for (auto & f: futures)
845 f.get();
846
847 if (size_t result = min_index.load(); result < n)
848 return result;
849 return std::nullopt;
850 }
851
875 template <typename Container, typename Pred>
876 [[nodiscard]] auto pfind_value(ThreadPool & pool, const Container & c,
877 Pred pred, size_t chunk_size = 0)
878 {
879 using T = std::decay_t<decltype(*std::begin(c))>;
880
881 auto idx = pfind(pool, c, pred, chunk_size);
882 if (not idx)
883 return std::optional<T>{std::nullopt};
884
885 auto it = std::begin(c);
886 std::advance(it, *idx);
887 return std::optional<T>{*it};
888 }
889
890 // =============================================================================
891 // Parallel Numeric Operations
892 // =============================================================================
893
913 template <typename Container,
914 typename T = std::decay_t<decltype(*std::begin(std::declval<Container>()))>>
915 [[nodiscard]] T psum(ThreadPool & pool, const Container & c, T init = T{},
916 size_t chunk_size = 0)
917 {
918 return pfoldl(pool, c, init, std::plus<T>{}, chunk_size);
919 }
920
932 template <typename Container,
933 typename T = std::decay_t<decltype(*std::begin(std::declval<Container>()))>>
934 [[nodiscard]] T pproduct(ThreadPool & pool, const Container & c, T init = T{1},
935 size_t chunk_size = 0)
936 {
937 return pfoldl(pool, c, init, std::multiplies<T>{}, chunk_size);
938 }
939
950 template <typename Container>
951 [[nodiscard]] auto pmin(ThreadPool & pool, const Container & c, size_t chunk_size = 0)
952 {
953 using T = std::decay_t<decltype(*std::begin(c))>;
954
955 const size_t n = std::distance(std::begin(c), std::end(c));
956 if (n == 0)
957 return std::optional<T>{std::nullopt};
958
959 if (chunk_size == 0)
960 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
961
962 auto data_holder = parallel_detail::ensure_random_access(c);
963 const auto & data = parallel_detail::deref(data_holder);
964
965 std::vector<std::future<T>> futures;
966
967 size_t offset = 0;
968 while (offset < n)
969 {
970 size_t chunk_end = std::min(offset + chunk_size, n);
971
972 futures.push_back(pool.enqueue([&data, offset, chunk_end]()
973 {
974 auto it = std::begin(data);
975 std::advance(it, offset);
976 T local_min = *it++;
977 for (size_t i = offset + 1; i < chunk_end; ++i, ++it)
978 if (*it < local_min)
979 local_min = *it;
980 return local_min;
981 }));
982
983 offset = chunk_end;
984 }
985
986 T result = futures[0].get();
987 for (size_t i = 1; i < futures.size(); ++i)
988 {
989 T val = futures[i].get();
990 if (val < result)
991 result = val;
992 }
993
994 return std::optional<T>{result};
995 }
996
1007 template <typename Container>
1008 [[nodiscard]] auto pmax(ThreadPool & pool, const Container & c, size_t chunk_size = 0)
1009 {
1010 using T = std::decay_t<decltype(*std::begin(c))>;
1011
1012 const size_t n = std::distance(std::begin(c), std::end(c));
1013 if (n == 0)
1014 return std::optional<T>{std::nullopt};
1015
1016 if (chunk_size == 0)
1017 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
1018
1019 auto data_holder = parallel_detail::ensure_random_access(c);
1020 const auto & data = parallel_detail::deref(data_holder);
1021
1022 std::vector<std::future<T>> futures;
1023
1024 size_t offset = 0;
1025 while (offset < n)
1026 {
1027 size_t chunk_end = std::min(offset + chunk_size, n);
1028
1029 futures.push_back(pool.enqueue([&data, offset, chunk_end]()
1030 {
1031 auto it = std::begin(data);
1032 std::advance(it, offset);
1033 T local_max = *it++;
1034 for (size_t i = offset + 1; i < chunk_end; ++i, ++it)
1035 if (*it > local_max)
1036 local_max = *it;
1037 return local_max;
1038 }));
1039
1040 offset = chunk_end;
1041 }
1042
1043 T result = futures[0].get();
1044 for (size_t i = 1; i < futures.size(); ++i)
1045 {
1046 T val = futures[i].get();
1047 if (val > result)
1048 result = val;
1049 }
1050
1051 return std::optional<T>{result};
1052 }
1053
1064 template <typename Container>
1065 [[nodiscard]] auto pminmax(ThreadPool & pool, const Container & c, size_t chunk_size = 0)
1066 {
1067 using T = std::decay_t<decltype(*std::begin(c))>;
1068
1069 const size_t n = std::distance(std::begin(c), std::end(c));
1070 if (n == 0)
1071 return std::optional<std::pair<T, T>>{std::nullopt};
1072
1073 if (chunk_size == 0)
1074 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
1075
1076 auto data_holder = parallel_detail::ensure_random_access(c);
1077 const auto & data = parallel_detail::deref(data_holder);
1078
1079 std::vector<std::future<std::pair<T, T>>> futures;
1080
1081 size_t offset = 0;
1082 while (offset < n)
1083 {
1084 size_t chunk_end = std::min(offset + chunk_size, n);
1085
1086 futures.push_back(pool.enqueue([&data, offset, chunk_end]()
1087 {
1088 auto it = std::begin(data);
1089 std::advance(it, offset);
1090 T local_min = *it;
1091 T local_max = *it++;
1092 for (size_t i = offset + 1; i < chunk_end; ++i, ++it)
1093 {
1094 if (*it < local_min) local_min = *it;
1095 if (*it > local_max) local_max = *it;
1096 }
1097 return std::make_pair(local_min, local_max);
1098 }));
1099
1100 offset = chunk_end;
1101 }
1102
1103 auto result = futures[0].get();
1104 for (size_t i = 1; i < futures.size(); ++i)
1105 {
1106 auto [mi, ma] = futures[i].get();
1107 if (mi < result.first) result.first = mi;
1108 if (ma > result.second) result.second = ma;
1109 }
1110
1111 return std::optional<std::pair<T, T>>{result};
1112 }
1113
1114 // =============================================================================
1115 // Parallel Sort
1116 // =============================================================================
1117
1145 template <typename Container, typename Compare = std::less<>>
1146 void psort(ThreadPool & pool, Container & c, Compare cmp = Compare{},
1147 const size_t min_parallel_size = 1024)
1148 {
1149 const size_t n = std::distance(std::begin(c), std::end(c));
1150 if (n <= 1)
1151 return;
1152
1153 // For small sizes, use regular sort
1154 if (n <= min_parallel_size or pool.num_threads() <= 1)
1155 {
1156 std::sort(std::begin(c), std::end(c), cmp);
1157 return;
1158 }
1159
1160 // Split into chunks, sort each in parallel, then merge
1161 const size_t num_chunks = std::min(pool.num_threads() * 2, n / min_parallel_size);
1162 const size_t chunk_size = (n + num_chunks - 1) / num_chunks;
1163
1164 // Sort chunks in parallel
1165 std::vector<std::future<void>> futures;
1166 for (size_t i = 0; i < n; i += chunk_size)
1167 {
1168 size_t end = std::min(i + chunk_size, n);
1169 auto begin_it = std::begin(c);
1170 std::advance(begin_it, i);
1171 auto end_it = std::begin(c);
1172 std::advance(end_it, end);
1173
1174 futures.push_back(pool.enqueue([begin_it, end_it, cmp]()
1175 {
1176 std::sort(begin_it, end_it, cmp);
1177 }));
1178 }
1179
1180 for (auto & f: futures)
1181 f.get();
1182
1183 // Merge sorted chunks
1184 using T = std::decay_t<decltype(*std::begin(c))>;
1185 std::vector<T> buffer(n);
1186
1187 for (size_t width = chunk_size; width < n; width *= 2)
1188 {
1189 std::vector<std::future<void>> merge_futures;
1190
1191 for (size_t i = 0; i < n; i += 2 * width)
1192 {
1193 size_t mid = std::min(i + width, n);
1194 size_t end = std::min(i + 2 * width, n);
1195
1196 if (mid < end)
1197 {
1198 auto begin_it = std::begin(c);
1199 std::advance(begin_it, i);
1200 auto mid_it = std::begin(c);
1201 std::advance(mid_it, mid);
1202 auto end_it = std::begin(c);
1203 std::advance(end_it, end);
1204
1205 merge_futures.push_back(pool.enqueue([begin_it, mid_it, end_it, &buffer, i, cmp]()
1206 {
1207 std::merge(begin_it, mid_it, mid_it, end_it,
1208 buffer.begin() + i, cmp);
1209 }));
1210 }
1211 else
1212 {
1213 // Copy remaining elements
1214 auto begin_it = std::begin(c);
1215 std::advance(begin_it, i);
1216 auto end_it = std::begin(c);
1217 std::advance(end_it, mid);
1218 std::copy(begin_it, end_it, buffer.begin() + i);
1219 }
1220 }
1221
1222 for (auto & f: merge_futures)
1223 f.get();
1224
1225 // Swap buffer back to container
1226 auto it = std::begin(c);
1227 for (size_t i = 0; i < n; ++i, ++it)
1228 *it = std::move(buffer[i]);
1229 }
1230 }
1231
1232 // =============================================================================
1233 // Parallel Zip Operations
1234 // =============================================================================
1235
1263 template <typename Container1, typename Container2, typename Op>
1264 void pzip_for_each(ThreadPool & pool, const Container1 & c1, const Container2 & c2,
1265 Op op, size_t chunk_size = 0)
1266 {
1267 const size_t n1 = std::distance(std::begin(c1), std::end(c1));
1268 const size_t n2 = std::distance(std::begin(c2), std::end(c2));
1269 const size_t n = std::min(n1, n2);
1270
1271 if (n == 0)
1272 return;
1273
1274 if (chunk_size == 0)
1275 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
1276
1277 auto h1 = parallel_detail::ensure_random_access(c1);
1278 auto h2 = parallel_detail::ensure_random_access(c2);
1279 const auto & d1 = parallel_detail::deref(h1);
1280 const auto & d2 = parallel_detail::deref(h2);
1281
1282 std::vector<std::future<void>> futures;
1283
1284 size_t offset = 0;
1285 while (offset < n)
1286 {
1287 size_t chunk_end = std::min(offset + chunk_size, n);
1288
1289 futures.push_back(pool.enqueue([&d1, &d2, op, offset, chunk_end]()
1290 {
1291 auto it1 = std::begin(d1);
1292 auto it2 = std::begin(d2);
1293 std::advance(it1, offset);
1294 std::advance(it2, offset);
1295 for (size_t i = offset; i < chunk_end; ++i, ++it1, ++it2)
1296 op(*it1, *it2);
1297 }));
1298
1299 offset = chunk_end;
1300 }
1301
1302 for (auto & f: futures)
1303 f.get();
1304 }
1305
1330 template <typename Container1, typename Container2, typename Op>
1331 [[nodiscard]] auto pzip_maps(ThreadPool & pool, const Container1 & c1,
1332 const Container2 & c2, Op op, size_t chunk_size = 0)
1333 {
1334 using T1 = std::decay_t<decltype(*std::begin(c1))>;
1335 using T2 = std::decay_t<decltype(*std::begin(c2))>;
1336 using ResultT = std::invoke_result_t<Op, const T1 &, const T2 &>;
1337
1338 const size_t n1 = std::distance(std::begin(c1), std::end(c1));
1339 const size_t n2 = std::distance(std::begin(c2), std::end(c2));
1340 const size_t n = std::min(n1, n2);
1341
1342 if (n == 0)
1343 return std::vector<ResultT>{};
1344
1345 if (chunk_size == 0)
1346 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
1347
1348 auto h1 = parallel_detail::ensure_random_access(c1);
1349 auto h2 = parallel_detail::ensure_random_access(c2);
1350 const auto & d1 = parallel_detail::deref(h1);
1351 const auto & d2 = parallel_detail::deref(h2);
1352
1353 std::vector<ResultT> result(n);
1354 std::vector<std::future<void>> futures;
1355
1356 size_t offset = 0;
1357 while (offset < n)
1358 {
1359 size_t chunk_end = std::min(offset + chunk_size, n);
1360
1361 futures.push_back(pool.enqueue([&result, &d1, &d2, op, offset, chunk_end]()
1362 {
1363 auto it1 = std::begin(d1);
1364 auto it2 = std::begin(d2);
1365 std::advance(it1, offset);
1366 std::advance(it2, offset);
1367 for (size_t i = offset; i < chunk_end; ++i, ++it1, ++it2)
1368 result[i] = op(*it1, *it2);
1369 }));
1370
1371 offset = chunk_end;
1372 }
1373
1374 for (auto & f: futures)
1375 f.get();
1376
1377 return result;
1378 }
1379
1408 template <typename Container1, typename Container2, typename T, typename Op>
1409 [[nodiscard]] T pzip_foldl(ThreadPool & pool, const Container1 & c1,
1410 const Container2 & c2, T init, Op op,
1411 size_t chunk_size = 0)
1412 {
1413 const size_t n1 = std::distance(std::begin(c1), std::end(c1));
1414 const size_t n2 = std::distance(std::begin(c2), std::end(c2));
1415 const size_t n = std::min(n1, n2);
1416
1417 if (n == 0)
1418 return init;
1419
1420 if (chunk_size == 0)
1421 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
1422
1423 auto h1 = parallel_detail::ensure_random_access(c1);
1424 auto h2 = parallel_detail::ensure_random_access(c2);
1425 const auto & d1 = parallel_detail::deref(h1);
1426 const auto & d2 = parallel_detail::deref(h2);
1427
1428 std::vector<std::future<T>> futures;
1429
1430 size_t offset = 0;
1431 while (offset < n)
1432 {
1433 size_t chunk_end = std::min(offset + chunk_size, n);
1434
1435 futures.push_back(pool.enqueue([&d1, &d2, &init, op, offset, chunk_end]()
1436 {
1437 auto it1 = std::begin(d1);
1438 auto it2 = std::begin(d2);
1439 std::advance(it1, offset);
1440 std::advance(it2, offset);
1441
1442 T local = op(init, *it1++, *it2++);
1443 for (size_t i = offset + 1; i < chunk_end; ++i, ++it1, ++it2)
1444 local = op(local, *it1, *it2);
1445 return local;
1446 }));
1447
1448 offset = chunk_end;
1449 }
1450
1451 // Binary reduce the partial results
1452 // We need a binary op for this - derive it from the ternary op
1453 T result = futures[0].get();
1454 for (size_t i = 1; i < futures.size(); ++i)
1455 {
1456 T val = futures[i].get();
1457 // Combine using addition - user should use pfoldl + pzip_maps for complex cases
1458 result = result + val - init; // Compensate for init being added in each chunk
1459 }
1460
1461 return result;
1462 }
1463
1464 // =============================================================================
1465 // Parallel Partition
1466 // =============================================================================
1467
1489 template <typename Container, typename Pred>
1490 [[nodiscard]] auto ppartition(ThreadPool & pool, const Container & c, Pred pred,
1491 size_t chunk_size = 0)
1492 {
1493 using T = std::decay_t<decltype(*std::begin(c))>;
1494
1495 const size_t n = std::distance(std::begin(c), std::end(c));
1496 if (n == 0)
1497 return std::make_pair(std::vector<T>{}, std::vector<T>{});
1498
1499 if (chunk_size == 0)
1500 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
1501
1502 auto data_holder = parallel_detail::ensure_random_access(c);
1503 const auto & data = parallel_detail::deref(data_holder);
1504
1505 using ChunkResult = std::pair<std::vector<T>, std::vector<T>>;
1506 std::vector<std::future<ChunkResult>> futures;
1507
1508 size_t offset = 0;
1509 while (offset < n)
1510 {
1511 size_t chunk_end = std::min(offset + chunk_size, n);
1512
1513 futures.push_back(pool.enqueue([&data, pred, offset, chunk_end]()
1514 {
1515 std::vector<T> yes, no;
1516 auto it = std::begin(data);
1517 std::advance(it, offset);
1518 for (size_t i = offset; i < chunk_end; ++i, ++it)
1519 {
1520 if (pred(*it))
1521 yes.push_back(*it);
1522 else
1523 no.push_back(*it);
1524 }
1525 return std::make_pair(std::move(yes), std::move(no));
1526 }));
1527
1528 offset = chunk_end;
1529 }
1530
1531 // Merge results in order
1532 std::vector<T> yes_result, no_result;
1533 for (auto & f: futures)
1534 {
1535 auto [yes, no] = f.get();
1536 yes_result.insert(yes_result.end(),
1537 std::make_move_iterator(yes.begin()),
1538 std::make_move_iterator(yes.end()));
1539 no_result.insert(no_result.end(),
1540 std::make_move_iterator(no.begin()),
1541 std::make_move_iterator(no.end()));
1542 }
1543
1544 return std::make_pair(std::move(yes_result), std::move(no_result));
1545 }
1546
1547 // =============================================================================
1548 // Variadic Parallel Zip Operations (N containers)
1549 // =============================================================================
1550
1551 namespace parallel_zip_detail
1552 {
1561 template <typename Container>
1563 {
1564 using value_type = std::decay_t<decltype(*std::begin(std::declval<Container &>()))>;
1565 using holder_type = std::conditional_t<
1566 parallel_detail::has_random_access<Container>(),
1567 const Container *,
1568 std::unique_ptr<std::vector<value_type>>>;
1569
1572
1573 explicit ContainerHolder(const Container & c)
1574 {
1575 if constexpr (parallel_detail::has_random_access<Container>())
1576 {
1577 data = &c;
1578 // For random access, std::distance is O(1)
1579 cached_size = static_cast<size_t>(std::distance(std::begin(c), std::end(c)));
1580 }
1581 else
1582 {
1583 // Copy to vector (O(n) - unavoidable), then get size from vector (O(1))
1584 data = std::make_unique<std::vector<value_type>>(std::begin(c), std::end(c));
1585 cached_size = data->size();
1586 }
1587 }
1588
1589 decltype(auto) get() const
1590 {
1591 if constexpr (parallel_detail::has_random_access<Container>())
1592 return *data;
1593 else
1594 return *data;
1595 }
1596
1598 [[nodiscard]] size_t size() const noexcept { return cached_size; }
1599
1600 auto begin() const { return std::begin(get()); }
1601 auto end() const { return std::end(get()); }
1602 };
1603
1605 template <typename... Holders, size_t... Is>
1606 size_t min_holder_size_impl(const std::tuple<Holders...> & holders,
1607 std::index_sequence<Is...>)
1608 {
1609 return std::min({std::get<Is>(holders).size()...});
1610 }
1611
1612 template <typename... Holders>
1613 size_t min_holder_size(const std::tuple<Holders...> & holders)
1614 {
1615 return min_holder_size_impl(holders, std::make_index_sequence<sizeof...(Holders)>{});
1616 }
1617
1619 template <typename... Holders, size_t... Is>
1620 auto make_iterators_at(size_t offset, const std::tuple<Holders...> & holders,
1621 std::index_sequence<Is...>)
1622 {
1623 return std::make_tuple([&]()
1624 {
1625 auto it = std::get<Is>(holders).begin();
1626 std::advance(it, offset);
1627 return it;
1628 }()...);
1629 }
1630
1632 template <typename... Iters, size_t... Is>
1633 void advance_all_iters(std::tuple<Iters...> & iters, std::index_sequence<Is...>)
1634 {
1635 (++std::get<Is>(iters), ...);
1636 }
1637
1639 template <typename... Iters, size_t... Is>
1640 auto deref_all_iters(const std::tuple<Iters...> & iters, std::index_sequence<Is...>)
1641 {
1642 return std::make_tuple(*std::get<Is>(iters)...);
1643 }
1644 } // namespace parallel_zip_detail
1645
1678 template <typename Op, typename... Containers>
1679 void pzip_for_each_n(ThreadPool & pool, Op op, const Containers &... cs)
1680 {
1681 static_assert(sizeof...(Containers) >= 2,
1682 "pzip_for_each requires at least 2 containers");
1683
1684 // Convert all containers to random access FIRST
1685 // This is O(n) for non-RA containers, but unavoidable
1686 auto holders = std::make_tuple(parallel_zip_detail::ContainerHolder<Containers>(cs)...);
1687
1688 // Now get min size - O(1) because all holders have cached sizes
1689 const size_t n = parallel_zip_detail::min_holder_size(holders);
1690 if (n == 0)
1691 return;
1692
1693 const size_t chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
1694
1695 std::vector<std::future<void>> futures;
1696
1697 size_t offset = 0;
1698 while (offset < n)
1699 {
1700 size_t chunk_end = std::min(offset + chunk_size, n);
1701
1702 futures.push_back(pool.enqueue([&holders, op, offset, chunk_end]()
1703 {
1704 constexpr size_t N = sizeof...(Containers);
1705 auto iters = parallel_zip_detail::make_iterators_at(
1706 offset, holders, std::make_index_sequence<N>{});
1707
1708 for (size_t i = offset; i < chunk_end; ++i)
1709 {
1710 std::apply(op, parallel_zip_detail::deref_all_iters(
1711 iters, std::make_index_sequence<N>{}));
1712 parallel_zip_detail::advance_all_iters(
1713 iters, std::make_index_sequence<N>{});
1714 }
1715 }));
1716
1717 offset = chunk_end;
1718 }
1719
1720 for (auto & f: futures)
1721 f.get();
1722 }
1723
1755 template <typename Op, typename... Containers>
1756 [[nodiscard]] auto pzip_maps_n(ThreadPool & pool, Op op, const Containers &... cs)
1757 {
1758 static_assert(sizeof...(Containers) >= 2,
1759 "pzip_maps requires at least 2 containers");
1760
1761 // Deduce result type from operation
1762 using ResultT = std::invoke_result_t<Op,
1763 std::decay_t<decltype(*std::begin(cs))>...>;
1764
1765 // Convert all containers to random access FIRST
1766 auto holders = std::make_tuple(parallel_zip_detail::ContainerHolder<Containers>(cs)...);
1767
1768 // Now get min size - O(1) because all holders have cached sizes
1769 const size_t n = parallel_zip_detail::min_holder_size(holders);
1770 if (n == 0)
1771 return std::vector<ResultT>{};
1772
1773 size_t chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
1774
1775 std::vector<ResultT> result(n);
1776 std::vector<std::future<void>> futures;
1777
1778 size_t offset = 0;
1779 while (offset < n)
1780 {
1781 size_t chunk_end = std::min(offset + chunk_size, n);
1782
1783 futures.push_back(pool.enqueue([&result, &holders, op, offset, chunk_end]()
1784 {
1785 constexpr size_t N = sizeof...(Containers);
1786 auto iters = parallel_zip_detail::make_iterators_at(
1787 offset, holders, std::make_index_sequence<N>{});
1788
1789 for (size_t i = offset; i < chunk_end; ++i)
1790 {
1791 result[i] = std::apply(op, parallel_zip_detail::deref_all_iters(
1792 iters, std::make_index_sequence<N>{}));
1793 parallel_zip_detail::advance_all_iters(
1794 iters, std::make_index_sequence<N>{});
1795 }
1796 }));
1797
1798 offset = chunk_end;
1799 }
1800
1801 for (auto & f: futures)
1802 f.get();
1803
1804 return result;
1805 }
1806
1845 template <typename T, typename Op, typename Combiner, typename... Containers>
1846 [[nodiscard]] T pzip_foldl_n(ThreadPool & pool, T init, Op op, Combiner combiner,
1847 const Containers &... cs)
1848 {
1849 static_assert(sizeof...(Containers) >= 2,
1850 "pzip_foldl requires at least 2 containers");
1851
1852 // Convert all containers to random access FIRST
1853 auto holders = std::make_tuple(parallel_zip_detail::ContainerHolder<Containers>(cs)...);
1854
1855 // Now get min size - O(1) because all holders have cached sizes
1856 const size_t n = parallel_zip_detail::min_holder_size(holders);
1857 if (n == 0)
1858 return init;
1859
1860 size_t chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
1861
1862 std::vector<std::future<T>> futures;
1863
1864 size_t offset = 0;
1865 while (offset < n)
1866 {
1867 size_t chunk_end = std::min(offset + chunk_size, n);
1868
1869 futures.push_back(pool.enqueue([&holders, init, op, offset, chunk_end]()
1870 {
1871 constexpr size_t N = sizeof...(Containers);
1872 auto iters = parallel_zip_detail::make_iterators_at(
1873 offset, holders, std::make_index_sequence<N>{});
1874
1875 // First element
1876 auto first_tuple = parallel_zip_detail::deref_all_iters(
1877 iters, std::make_index_sequence<N>{});
1878 T local = std::apply([&op, &init](auto &&... args)
1879 {
1880 return op(init, std::forward<decltype(args)>(args)
1881 ...);
1882 }, first_tuple);
1883 parallel_zip_detail::advance_all_iters(
1884 iters, std::make_index_sequence<N>{});
1885
1886 // Remaining elements
1887 for (size_t i = offset + 1; i < chunk_end; ++i)
1888 {
1889 auto tuple = parallel_zip_detail::deref_all_iters(
1890 iters, std::make_index_sequence<N>{});
1891 local = std::apply([&op, &local](auto &&... args)
1892 {
1893 return op(local,
1894 std::forward<decltype(args)>(args)
1895 ...);
1896 }, tuple);
1897 parallel_zip_detail::advance_all_iters(
1898 iters, std::make_index_sequence<N>{});
1899 }
1900
1901 return local;
1902 }));
1903
1904 offset = chunk_end;
1905 }
1906
1907 // Combine partial results using the combiner
1908 T result = futures[0].get();
1909 for (size_t i = 1; i < futures.size(); ++i)
1910 result = combiner(result, futures[i].get());
1911
1912 return result;
1913 }
1914
1943 template <typename Pred, typename... Containers>
1944 [[nodiscard]] bool pzip_all_n(ThreadPool & pool, Pred pred, const Containers &... cs)
1945 {
1946 static_assert(sizeof...(Containers) >= 2,
1947 "pzip_all requires at least 2 containers");
1948
1949 // Convert all containers to random access FIRST
1950 auto holders = std::make_tuple(
1952
1953 // Now get min size - O(1) because all holders have cached sizes
1954 const size_t n = parallel_zip_detail::min_holder_size(holders);
1955 if (n == 0)
1956 return true; // Vacuous truth
1957
1958 const size_t chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
1959
1960 std::atomic<bool> found_false{false};
1961 std::vector<std::future<void>> futures;
1962
1963 size_t offset = 0;
1964 while (offset < n)
1965 {
1966 size_t chunk_end = std::min(offset + chunk_size, n);
1967
1968 futures.push_back(pool.enqueue([&holders, pred, &found_false, offset, chunk_end]()
1969 {
1970 if (found_false.load(std::memory_order_relaxed))
1971 return;
1972
1973 constexpr size_t N = sizeof...(Containers);
1974 auto iters = parallel_zip_detail::make_iterators_at(
1975 offset, holders, std::make_index_sequence<N>{});
1976
1977 for (size_t i = offset; i < chunk_end; ++i)
1978 {
1979 auto tuple = parallel_zip_detail::deref_all_iters(
1980 iters, std::make_index_sequence<N>{});
1981 if (! std::apply(pred, tuple))
1982 {
1983 found_false.store(true, std::memory_order_relaxed);
1984 return;
1985 }
1986 if (found_false.load(std::memory_order_relaxed))
1987 return;
1988 parallel_zip_detail::advance_all_iters(
1989 iters, std::make_index_sequence<N>{});
1990 }
1991 }));
1992
1993 offset = chunk_end;
1994 }
1995
1996 for (auto & f: futures)
1997 f.get();
1998
1999 return not found_false.load();
2000 }
2001
2030 template <typename Pred, typename... Containers>
2031 [[nodiscard]] bool pzip_exists_n(ThreadPool & pool, Pred pred, const Containers &... cs)
2032 {
2033 static_assert(sizeof...(Containers) >= 2,
2034 "pzip_exists requires at least 2 containers");
2035
2036 // Convert all containers to random access FIRST
2037 auto holders = std::make_tuple(parallel_zip_detail::ContainerHolder<Containers>(cs)...);
2038
2039 // Now get min size - O(1) because all holders have cached sizes
2040 const size_t n = parallel_zip_detail::min_holder_size(holders);
2041 if (n == 0)
2042 return false;
2043
2044 const size_t chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
2045
2046 std::atomic<bool> found{false};
2047 std::vector<std::future<void>> futures;
2048
2049 size_t offset = 0;
2050 while (offset < n)
2051 {
2052 size_t chunk_end = std::min(offset + chunk_size, n);
2053
2054 futures.push_back(pool.enqueue([&holders, pred, &found, offset, chunk_end]()
2055 {
2056 if (found.load(std::memory_order_relaxed))
2057 return;
2058
2059 constexpr size_t N = sizeof...(Containers);
2060 auto iters = parallel_zip_detail::make_iterators_at(
2061 offset, holders, std::make_index_sequence<N>{});
2062
2063 for (size_t i = offset; i < chunk_end; ++i)
2064 {
2065 auto tuple = parallel_zip_detail::deref_all_iters(
2066 iters, std::make_index_sequence<N>{});
2067 if (std::apply(pred, tuple))
2068 {
2069 found.store(true, std::memory_order_relaxed);
2070 return;
2071 }
2072 if (found.load(std::memory_order_relaxed))
2073 return;
2074 parallel_zip_detail::advance_all_iters(
2075 iters, std::make_index_sequence<N>{});
2076 }
2077 }));
2078
2079 offset = chunk_end;
2080 }
2081
2082 for (auto & f: futures)
2083 f.get();
2084
2085 return found.load();
2086 }
2087
2103 template <typename Pred, typename... Containers>
2104 [[nodiscard]] size_t pzip_count_if_n(ThreadPool & pool, Pred pred,
2105 const Containers &... cs)
2106 {
2107 static_assert(sizeof...(Containers) >= 2,
2108 "pzip_count_if requires at least 2 containers");
2109
2110 // Convert all containers to random access FIRST
2111 auto holders = std::make_tuple(parallel_zip_detail::ContainerHolder<Containers>(cs)...);
2112
2113 // Now get min size - O(1) because all holders have cached sizes
2114 const size_t n = parallel_zip_detail::min_holder_size(holders);
2115 if (n == 0)
2116 return 0;
2117
2118 size_t chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
2119
2120 std::vector<std::future<size_t>> futures;
2121
2122 size_t offset = 0;
2123 while (offset < n)
2124 {
2125 size_t chunk_end = std::min(offset + chunk_size, n);
2126
2127 futures.push_back(pool.enqueue([&holders, pred, offset, chunk_end]()
2128 {
2129 constexpr size_t N = sizeof...(Containers);
2130 auto iters = parallel_zip_detail::make_iterators_at(
2131 offset, holders, std::make_index_sequence<N>{});
2132
2133 size_t count = 0;
2134 for (size_t i = offset; i < chunk_end; ++i)
2135 {
2136 auto tuple = parallel_zip_detail::deref_all_iters(
2137 iters, std::make_index_sequence<N>{});
2138 if (std::apply(pred, tuple))
2139 ++count;
2140 parallel_zip_detail::advance_all_iters(
2141 iters, std::make_index_sequence<N>{});
2142 }
2143 return count;
2144 }));
2145
2146 offset = chunk_end;
2147 }
2148
2149 size_t total = 0;
2150 for (auto & f: futures)
2151 total += f.get();
2152
2153 return total;
2154 }
2155
2156 // =============================================================================
2157 // Parallel Enumerate
2158 // =============================================================================
2159
2185 template <typename Container, typename Op>
2187 size_t chunk_size = 0)
2188 {
2189 const size_t n = std::distance(std::begin(c), std::end(c));
2190 if (n == 0)
2191 return;
2192
2193 if (chunk_size == 0)
2194 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
2195
2196 std::vector<std::future<void>> futures;
2197
2198 size_t offset = 0;
2199 while (offset < n)
2200 {
2201 size_t chunk_end = std::min(offset + chunk_size, n);
2202
2203 futures.push_back(pool.enqueue([&c, op, offset, chunk_end]()
2204 {
2205 auto it = std::begin(c);
2206 std::advance(it, offset);
2207 for (size_t i = offset; i < chunk_end; ++i, ++it)
2208 op(i, *it);
2209 }));
2210
2211 offset = chunk_end;
2212 }
2213
2214 for (auto & f: futures)
2215 f.get();
2216 }
2217
2230 template <typename Container, typename Op>
2231 void penumerate_for_each(ThreadPool & pool, const Container & c, Op op,
2232 size_t chunk_size = 0)
2233 {
2234 const size_t n = std::distance(std::begin(c), std::end(c));
2235 if (n == 0)
2236 return;
2237
2238 if (chunk_size == 0)
2239 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
2240
2241 auto data_holder = parallel_detail::ensure_random_access(c);
2242 const auto & data = parallel_detail::deref(data_holder);
2243
2244 std::vector<std::future<void>> futures;
2245
2246 size_t offset = 0;
2247 while (offset < n)
2248 {
2249 size_t chunk_end = std::min(offset + chunk_size, n);
2250
2251 futures.push_back(pool.enqueue([&data, op, offset, chunk_end]()
2252 {
2253 auto it = std::begin(data);
2254 std::advance(it, offset);
2255 for (size_t i = offset; i < chunk_end; ++i, ++it)
2256 op(i, *it);
2257 }));
2258
2259 offset = chunk_end;
2260 }
2261
2262 for (auto & f: futures)
2263 f.get();
2264 }
2265
2294 template <typename Container, typename Op>
2295 [[nodiscard]] auto penumerate_maps(ThreadPool & pool, const Container & c, Op op,
2296 size_t chunk_size = 0)
2297 {
2298 using T = std::decay_t<decltype(*std::begin(c))>;
2299 using ResultT = std::invoke_result_t<Op, size_t, const T &>;
2300
2301 const size_t n = std::distance(std::begin(c), std::end(c));
2302 if (n == 0)
2303 return std::vector<ResultT>{};
2304
2305 if (chunk_size == 0)
2306 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
2307
2308 auto data_holder = parallel_detail::ensure_random_access(c);
2309 const auto & data = parallel_detail::deref(data_holder);
2310
2311 std::vector<ResultT> result(n);
2312 std::vector<std::future<void>> futures;
2313
2314 size_t offset = 0;
2315 while (offset < n)
2316 {
2317 size_t chunk_end = std::min(offset + chunk_size, n);
2318
2319 futures.push_back(pool.enqueue([&result, &data, op, offset, chunk_end]()
2320 {
2321 auto it = std::begin(data);
2322 std::advance(it, offset);
2323 for (size_t i = offset; i < chunk_end; ++i, ++it)
2324 result[i] = op(i, *it);
2325 }));
2326
2327 offset = chunk_end;
2328 }
2329
2330 for (auto & f: futures)
2331 f.get();
2332
2333 return result;
2334 }
2335
2336 // =============================================================================
2337 // Convenience: Default Pool Variants
2338 // =============================================================================
2339
2347 {
2348 return default_pool();
2349 }
2350
2351 // Convenience macros for using default pool (optional)
2352#ifdef AH_PARALLEL_USE_DEFAULT_POOL
2353
2354#define PMAP(c, op) pmaps(parallel_default_pool(), c, op)
2355#define PFILTER(c, pred) pfilter(parallel_default_pool(), c, pred)
2356#define PFOLD(c, init, op) pfoldl(parallel_default_pool(), c, init, op)
2357#define PFOR_EACH(c, op) pfor_each(parallel_default_pool(), c, op)
2358#define PALL(c, pred) pall(parallel_default_pool(), c, pred)
2359#define PEXISTS(c, pred) pexists(parallel_default_pool(), c, pred)
2360#define PSUM(c) psum(parallel_default_pool(), c)
2361
2362#endif // AH_PARALLEL_USE_DEFAULT_POOL
2363} // namespace Aleph
2364
2365#endif // AH_PARALLEL_H
A reusable thread pool for efficient parallel task execution.
size_t num_threads() const noexcept
Get the number of worker threads.
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task for execution and get a future for the result.
iterator end() noexcept
Return an STL-compatible end iterator.
iterator begin() noexcept
Return an STL-compatible iterator to the first element.
int cmp(const __gmp_expr< T, U > &expr1, const __gmp_expr< V, W > &expr2)
Definition gmpfrxx.h:4118
Freq_Node * pred
Predecessor node in level-order traversal.
const long double offset[]
Offset values indexed by symbol string length (bounded by MAX_OFFSET_INDEX)
decltype(auto) deref(T &&ptr)
Get reference from pointer or unique_ptr.
auto ensure_random_access(const Container &c)
For containers with random access, just return a pointer to it For non-random access,...
size_t chunk_size(const size_t n, const size_t num_threads, const size_t min_chunk=64)
Calculate optimal chunk size based on data size and thread count.
constexpr bool has_random_access()
Check if container supports random access.
size_t min_holder_size_impl(const std::tuple< Holders... > &holders, std::index_sequence< Is... >)
Get minimum size from tuple of holders - always O(1) per holder.
void advance_all_iters(std::tuple< Iters... > &iters, std::index_sequence< Is... >)
Advance all iterators in tuple.
auto make_iterators_at(size_t offset, const std::tuple< Holders... > &holders, std::index_sequence< Is... >)
Create tuple of iterators at given offset.
size_t min_holder_size(const std::tuple< Holders... > &holders)
auto deref_all_iters(const std::tuple< Iters... > &iters, std::index_sequence< Is... >)
Dereference all iterators and make tuple.
Main namespace for Aleph-w library functions.
Definition ah-arena.H:89
bool pall(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel all predicate (short-circuit).
bool pnone(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel none predicate.
auto pmaps(ThreadPool &pool, const Container &c, Op op, size_t chunk_size=0)
Parallel map operation.
auto pmin(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel minimum element.
T pzip_foldl_n(ThreadPool &pool, T init, Op op, Combiner combiner, const Containers &... cs)
Parallel fold/reduce over N zipped containers (variadic).
ThreadPool & default_pool()
Global default thread pool.
T pzip_foldl(ThreadPool &pool, const Container1 &c1, const Container2 &c2, T init, Op op, size_t chunk_size=0)
Parallel zip + fold.
void pzip_for_each(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Op op, size_t chunk_size=0)
Parallel zip + for_each.
size_t size(Node *root) noexcept
ThreadPool & parallel_default_pool()
Global default pool for parallel operations.
auto penumerate_maps(ThreadPool &pool, const Container &c, Op op, size_t chunk_size=0)
Parallel enumerate with map.
void penumerate_for_each(ThreadPool &pool, Container &c, Op op, size_t chunk_size=0)
Parallel for_each with index (enumerate).
bool pzip_all_n(ThreadPool &pool, Pred pred, const Containers &... cs)
Parallel all predicate over N zipped containers (variadic).
size_t pcount_if(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel count_if operation.
std::optional< size_t > pfind(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel find operation (returns index).
bool pzip_exists_n(ThreadPool &pool, Pred pred, const Containers &... cs)
Parallel exists predicate over N zipped containers (variadic).
std::decay_t< typename HeadC::Item_Type > T
Definition ah-zip.H:107
void psort(ThreadPool &pool, Container &c, Compare cmp=Compare{}, const size_t min_parallel_size=1024)
Parallel sort (in-place).
void pfor_each(ThreadPool &pool, Container &c, Op op, size_t chunk_size=0)
Parallel for_each operation.
auto pfilter(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel filter operation.
auto ppartition(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel partition (stable).
T pproduct(ThreadPool &pool, const Container &c, T init=T{1}, size_t chunk_size=0)
Parallel product of elements.
auto pmax(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel maximum element.
void pzip_for_each_n(ThreadPool &pool, Op op, const Containers &... cs)
Parallel for_each over N zipped containers (variadic).
auto pzip_maps_n(ThreadPool &pool, Op op, const Containers &... cs)
Parallel map over N zipped containers (variadic).
T pfoldl(ThreadPool &pool, const Container &c, T init, BinaryOp op, size_t chunk_size=0)
Parallel left fold (reduce).
auto pfind_value(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel find with value return.
static bool init
Definition hash-fct.C:47
auto pzip_maps(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Op op, size_t chunk_size=0)
Parallel zip + map.
T psum(ThreadPool &pool, const Container &c, T init=T{}, size_t chunk_size=0)
Parallel sum of elements.
auto pminmax(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel min and max elements.
size_t pzip_count_if_n(ThreadPool &pool, Pred pred, const Containers &... cs)
Parallel count over N zipped containers (variadic).
bool pexists(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel exists predicate (short-circuit).
DynList< T > maps(const C &c, Op op)
Classic map operation.
Itor::difference_type count(const Itor &beg, const Itor &end, const T &value)
Count elements equal to a value.
Definition ahAlgo.H:127
Holder for converted containers (either pointer or unique_ptr to vector).
size_t cached_size
Cached size for O(1) access.
std::decay_t< decltype(*std::begin(std::declval< Container & >()))> value_type
std::conditional_t< parallel_detail::has_random_access< Container >(), const Container *, std::unique_ptr< std::vector< value_type > > > holder_type
size_t size() const noexcept
Size is always O(1) - either from random access or from cached vector size.
A modern, efficient thread pool for parallel task execution.