Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
ah-parallel.H
Go to the documentation of this file.
1/*
2 Aleph_w
3
4 Data structures & Algorithms
5 version 2.0.0b
6 https://github.com/lrleon/Aleph-w
7
8 This file is part of Aleph-w library
9
10 Copyright (c) 2002-2026 Leandro Rabindranath Leon
11
12 Permission is hereby granted, free of charge, to any person obtaining a copy
13 of this software and associated documentation files (the "Software"), to deal
14 in the Software without restriction, including without limitation the rights
15 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 copies of the Software, and to permit persons to whom the Software is
17 furnished to do so, subject to the following conditions:
18
19 The above copyright notice and this permission notice shall be included in all
20 copies or substantial portions of the Software.
21
22 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 SOFTWARE.
29*/
30
31
32#ifndef AH_PARALLEL_H
33#define AH_PARALLEL_H
34
115#include <vector>
116#include <atomic>
117#include <optional>
118#include <algorithm>
119#include <numeric>
120#include <type_traits>
121#include <iterator>
122#include <functional>
123#include <thread_pool.H>
124
125namespace Aleph
126{
127 // =============================================================================
128 // Implementation Details
129 // =============================================================================
130
131 namespace parallel_detail
132 {
134 {
135 size_t offset;
136 size_t end;
137 };
138
140 {
141 return options.pool == nullptr ? default_pool() : *options.pool;
142 }
143
144 inline size_t chunk_count(const size_t n, const size_t chunk_size) noexcept
145 {
146 return n == 0 ? 0 : (n + chunk_size - 1) / chunk_size;
147 }
148
149 inline chunk_bounds bounds_for_chunk(const size_t idx, const size_t n,
150 const size_t chunk_size) noexcept
151 {
152 const size_t offset = idx * chunk_size;
153 return {offset, std::min(offset + chunk_size, n)};
154 }
155
157 inline size_t chunk_size(const size_t n, const size_t num_threads, const size_t min_chunk = 64)
158 {
159 if (n == 0) return 1;
160 // Use more chunks than threads for better load balancing
161 const size_t chunks = num_threads * 4;
162 const size_t size = (n + chunks - 1) / chunks;
163 return std::max(size, min_chunk);
164 }
165
167 template <typename Container>
168 constexpr bool has_random_access()
169 {
170 using It = decltype(std::begin(std::declval<Container &>()));
171 return std::is_base_of_v<std::random_access_iterator_tag,
172 typename std::iterator_traits<It>::iterator_category>;
173 }
174
177 template <typename Container>
179 {
180 if constexpr (has_random_access<Container>())
181 return &c; // Return pointer to original
182 else
183 return std::make_unique<std::vector<typename Container::value_type>>(std::begin(c), std::end(c));
184 }
185
187 template <typename T>
188 decltype(auto) deref(T && ptr)
189 {
190 if constexpr (std::is_pointer_v<std::decay_t<T>>)
191 return *ptr;
192 else
193 return *ptr; // unique_ptr also supports *
194 }
195
196 inline size_t effective_parallel_chunk_size(const size_t n,
197 const ThreadPool & pool,
198 const ParallelOptions & options,
199 const size_t min_chunk = 64)
200 {
201 size_t effective = options.chunk_size == 0 ? chunk_size(n, pool.num_threads(), min_chunk) : options.chunk_size;
202
203 if (options.max_tasks > 0)
204 {
205 const size_t min_for_cap = (n + options.max_tasks - 1) / options.max_tasks;
206 effective = std::max(effective, std::max(min_chunk, min_for_cap));
207 }
208
209 return std::max<size_t>(1, effective);
210 }
211
212 inline bool use_sequential_parallel_path(const size_t n,
213 const ThreadPool & pool,
214 const ParallelOptions & options) noexcept
215 {
216 if (n == 0)
217 return true;
218 if (options.cancel_token.stop_requested())
219 return true;
220 if (options.max_tasks == 1)
221 return true;
222 if (options.min_size > 0 and n < options.min_size)
223 return true;
224 return pool.num_threads() <= 1;
225 }
226
228 {
230 }
231 } // namespace parallel_detail
232
233 // =============================================================================
234 // Parallel Map
235 // =============================================================================
236
267 template <typename ResultT = void, typename Container, typename Op>
268 [[nodiscard]] auto pmaps(ThreadPool & pool, const Container & c, Op op,
269 size_t chunk_size = 0)
270 {
272 options.pool = &pool;
273 options.chunk_size = chunk_size;
274 return pmaps<ResultT>(c, op, options);
275 }
276
277 template <typename ResultT = void, typename Container, typename Op>
278 [[nodiscard]] auto pmaps(const Container & c, Op op,
279 const ParallelOptions & options = {})
280 {
282 using InputT = std::decay_t<decltype(*std::begin(c))>;
283 using ActualResultT = std::conditional_t<
284 std::is_void_v<ResultT>,
285 std::invoke_result_t<Op, const InputT &>,
286 ResultT>;
287
288 const size_t n = std::distance(std::begin(c), std::end(c));
289 if (n == 0)
290 return std::vector<ActualResultT>{};
291
293 {
294 std::vector<ActualResultT> result;
295 result.reserve(n);
296 for (auto it = std::begin(c); it != std::end(c); ++it)
297 {
299 result.push_back(op(*it));
300 }
302 return result;
303 }
304
305 const size_t chunk_size =
307
308 // Ensure random access for parallel processing
310 const auto & data = parallel_detail::deref(data_holder);
311
312 std::vector<std::optional<ActualResultT>> slots(n);
313 std::vector<std::future<void>> futures;
314
315 size_t offset = 0;
316
317 while (offset < n)
318 {
319 size_t chunk_end = std::min(offset + chunk_size, n);
320
321 futures.push_back(pool.enqueue([&slots, &data, op, offset, chunk_end,
322 token = options.cancel_token]()
323 {
324 auto in_it = std::begin(data);
325 std::advance(in_it, offset);
326 for (size_t i = offset; i < chunk_end; ++i, ++in_it)
327 {
328 parallel_detail::throw_if_parallel_canceled(token);
329 slots[i] = op(*in_it);
330 }
331 }));
332
334 }
335
336 {
337 std::exception_ptr ep;
338 for (auto & f: futures)
339 try { f.get(); } catch (...) { if (not ep) ep = std::current_exception(); }
340 if (ep)
341 std::rethrow_exception(ep);
342 }
343
344 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
345
346 std::vector<ActualResultT> result;
347 result.reserve(n);
348 for (auto & slot : slots)
349 result.push_back(std::move(*slot));
350
351 return result;
352 }
353
354 // =============================================================================
355 // Parallel Filter
356 // =============================================================================
357
385 template <typename Container, typename Pred>
386 [[nodiscard]] auto pfilter(ThreadPool & pool, const Container & c, Pred pred,
387 size_t chunk_size = 0)
388 {
390 options.pool = &pool;
391 options.chunk_size = chunk_size;
392 return pfilter(c, pred, options);
393 }
394
395 template <typename Container, typename Pred>
396 [[nodiscard]] auto pfilter(const Container & c, Pred pred,
397 const ParallelOptions & options = {})
398 {
399 ThreadPool & pool = parallel_detail::selected_parallel_pool(options);
400 using T = std::decay_t<decltype(*std::begin(c))>;
401
402 const size_t n = std::distance(std::begin(c), std::end(c));
403 if (n == 0)
404 return std::vector<T>{};
405
406 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
407 {
408 std::vector<T> result;
409 for (auto it = std::begin(c); it != std::end(c); ++it)
410 {
411 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
412 if (pred(*it))
413 result.push_back(*it);
414 }
415 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
416 return result;
417 }
418
419 const size_t chunk_size =
420 parallel_detail::effective_parallel_chunk_size(n, pool, options);
421
422 auto data_holder = parallel_detail::ensure_random_access(c);
423 const auto & data = parallel_detail::deref(data_holder);
424
425 std::vector<std::future<std::vector<T>>> futures;
426 const size_t num_chunks = parallel_detail::chunk_count(n, chunk_size);
427 futures.reserve(num_chunks);
428
429 for (size_t chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx)
430 {
431 const auto bounds = parallel_detail::bounds_for_chunk(chunk_idx, n, chunk_size);
432
433 futures.push_back(pool.enqueue([&data, pred,
434 offset = bounds.offset,
435 chunk_end = bounds.end,
436 token = options.cancel_token]()
437 {
438 std::vector<T> chunk_result;
439 auto it = std::begin(data);
440 std::advance(it, offset);
441 for (size_t i = offset; i < chunk_end; ++i, ++it)
442 {
443 parallel_detail::throw_if_parallel_canceled(token);
444 if (pred(*it))
445 chunk_result.push_back(*it);
446 }
447 return chunk_result;
448 }));
449 }
450
451 std::vector<T> result;
452 for (auto & f: futures)
453 {
454 auto chunk_result = f.get();
455 result.insert(result.end(),
456 std::make_move_iterator(chunk_result.begin()),
457 std::make_move_iterator(chunk_result.end()));
458 }
459
460 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
461
462 return result;
463 }
464
465 // =============================================================================
466 // Parallel Fold/Reduce
467 // =============================================================================
468
506 template <typename T, typename Container, typename BinaryOp>
507 [[nodiscard]] T pfoldl(ThreadPool & pool, const Container & c, T init, BinaryOp op,
508 size_t chunk_size = 0)
509 {
511 options.pool = &pool;
512 options.chunk_size = chunk_size;
513 return pfoldl(c, init, op, options);
514 }
515
516 template <typename T, typename Container, typename BinaryOp>
517 [[nodiscard]] T pfoldl(const Container & c, T init, BinaryOp op,
518 const ParallelOptions & options = {})
519 {
520 ThreadPool & pool = parallel_detail::selected_parallel_pool(options);
521 const size_t n = std::distance(std::begin(c), std::end(c));
522 if (n == 0)
523 return init;
524
525 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
526 {
527 T result = init;
528 for (auto it = std::begin(c); it != std::end(c); ++it)
529 {
530 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
531 result = op(result, *it);
532 }
533 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
534 return result;
535 }
536
537 const size_t chunk_size =
538 parallel_detail::effective_parallel_chunk_size(n, pool, options);
539
540 auto data_holder = parallel_detail::ensure_random_access(c);
541 const auto & data = parallel_detail::deref(data_holder);
542
543 std::vector<std::future<T>> futures;
544
545 size_t offset = 0;
546 while (offset < n)
547 {
548 size_t chunk_end = std::min(offset + chunk_size, n);
549
550 futures.push_back(pool.enqueue([&data, op, offset, chunk_end,
551 token = options.cancel_token]()
552 {
553 auto it = std::begin(data);
554 std::advance(it, offset);
555 parallel_detail::throw_if_parallel_canceled(token);
556 T local = *it++;
557 for (size_t i = offset + 1; i < chunk_end; ++i, ++it)
558 {
559 parallel_detail::throw_if_parallel_canceled(token);
560 local = op(local, *it);
561 }
562 return local;
563 }));
564
565 offset = chunk_end;
566 }
567
568 // Combine partial results
569 T result = init;
570 for (auto & f: futures)
571 result = op(result, f.get());
572
573 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
574
575 return result;
576 }
577
578 // =============================================================================
579 // Parallel For Each
580 // =============================================================================
581
612 template <typename Container, typename Op>
613 void pfor_each(ThreadPool & pool, Container & c, Op op, size_t chunk_size = 0)
614 {
616 options.pool = &pool;
617 options.chunk_size = chunk_size;
618 pfor_each(c, op, options);
619 }
620
621 template <typename Container, typename Op>
622 void pfor_each(Container & c, Op op, const ParallelOptions & options = {})
623 {
624 ThreadPool & pool = parallel_detail::selected_parallel_pool(options);
625 const size_t n = std::distance(std::begin(c), std::end(c));
626 if (n == 0)
627 return;
628
629 const size_t chunk_size =
630 parallel_detail::effective_parallel_chunk_size(n, pool, options);
631
632 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
633 {
634 for (auto it = std::begin(c); it != std::end(c); ++it)
635 {
636 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
637 op(*it);
638 }
639 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
640 return;
641 }
642
643 std::vector<std::future<void>> futures;
644 size_t offset = 0;
645
646 while (offset < n)
647 {
648 size_t chunk_end = std::min(offset + chunk_size, n);
649
650 futures.push_back(pool.enqueue([&c, op, offset, chunk_end,
651 token = options.cancel_token]()
652 {
653 auto it = std::begin(c);
654 std::advance(it, offset);
655 for (size_t i = offset; i < chunk_end; ++i, ++it)
656 {
657 parallel_detail::throw_if_parallel_canceled(token);
658 op(*it);
659 }
660 }));
661
662 offset = chunk_end;
663 }
664
665 for (auto & f: futures)
666 f.get();
667
668 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
669 }
670
683 template <typename Container, typename Op>
684 void pfor_each(ThreadPool & pool, const Container & c, Op op, size_t chunk_size = 0)
685 {
687 options.pool = &pool;
688 options.chunk_size = chunk_size;
689 pfor_each(c, op, options);
690 }
691
692 template <typename Container, typename Op>
693 void pfor_each(const Container & c, Op op, const ParallelOptions & options = {})
694 {
695 ThreadPool & pool = parallel_detail::selected_parallel_pool(options);
696 const size_t n = std::distance(std::begin(c), std::end(c));
697 if (n == 0)
698 return;
699
700 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
701 {
702 for (auto it = std::begin(c); it != std::end(c); ++it)
703 {
704 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
705 op(*it);
706 }
707 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
708 return;
709 }
710
711 const size_t chunk_size =
712 parallel_detail::effective_parallel_chunk_size(n, pool, options);
713
714 auto data_holder = parallel_detail::ensure_random_access(c);
715 const auto & data = parallel_detail::deref(data_holder);
716
717 std::vector<std::future<void>> futures;
718
719 size_t offset = 0;
720 while (offset < n)
721 {
722 size_t chunk_end = std::min(offset + chunk_size, n);
723
724 futures.push_back(pool.enqueue([&data, op, offset, chunk_end,
725 token = options.cancel_token]()
726 {
727 auto it = std::begin(data);
728 std::advance(it, offset);
729 for (size_t i = offset; i < chunk_end; ++i, ++it)
730 {
731 parallel_detail::throw_if_parallel_canceled(token);
732 op(*it);
733 }
734 }));
735
736 offset = chunk_end;
737 }
738
739 for (auto & f: futures)
740 f.get();
741
742 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
743 }
744
745 // =============================================================================
746 // Parallel Predicates (all, exists, none)
747 // =============================================================================
748
771 template <typename Container, typename Pred>
772 [[nodiscard]] bool pall(ThreadPool & pool, const Container & c, Pred pred,
773 size_t chunk_size = 0)
774 {
776 options.pool = &pool;
777 options.chunk_size = chunk_size;
778 return pall(c, pred, options);
779 }
780
781 template <typename Container, typename Pred>
782 [[nodiscard]] bool pall(const Container & c, Pred pred,
783 const ParallelOptions & options = {})
784 {
785 ThreadPool & pool = parallel_detail::selected_parallel_pool(options);
786 const size_t n = std::distance(std::begin(c), std::end(c));
787 if (n == 0)
788 return true;
789
790 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
791 {
792 for (auto it = std::begin(c); it != std::end(c); ++it)
793 {
794 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
795 if (not pred(*it))
796 return false;
797 }
798 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
799 return true;
800 }
801
802 const size_t chunk_size =
803 parallel_detail::effective_parallel_chunk_size(n, pool, options);
804
805 auto data_holder = parallel_detail::ensure_random_access(c);
806 const auto & data = parallel_detail::deref(data_holder);
807
808 std::atomic<bool> found_false{false};
809 std::vector<std::future<void>> futures;
810
811 size_t offset = 0;
812 while (offset < n)
813 {
814 size_t chunk_end = std::min(offset + chunk_size, n);
815
816 futures.push_back(pool.enqueue([&data, pred, &found_false, offset, chunk_end,
817 token = options.cancel_token]()
818 {
819 parallel_detail::throw_if_parallel_canceled(token);
820 if (found_false.load(std::memory_order_relaxed))
821 return; // Short-circuit
822
823 auto it = std::begin(data);
824 std::advance(it, offset);
825 for (size_t i = offset; i < chunk_end; ++i, ++it)
826 {
827 if (not pred(*it))
828 {
829 found_false.store(true, std::memory_order_relaxed);
830 return;
831 }
832 parallel_detail::throw_if_parallel_canceled(token);
833 if (found_false.load(std::memory_order_relaxed))
834 return;
835 }
836 }));
837
838 offset = chunk_end;
839 }
840
841 for (auto & f: futures)
842 f.get();
843
844 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
845 return not found_false.load();
846 }
847
869 template <typename Container, typename Pred>
870 [[nodiscard]] bool pexists(ThreadPool & pool, const Container & c, Pred pred,
871 size_t chunk_size = 0)
872 {
874 options.pool = &pool;
875 options.chunk_size = chunk_size;
876 return pexists(c, pred, options);
877 }
878
879 template <typename Container, typename Pred>
880 [[nodiscard]] bool pexists(const Container & c, Pred pred,
881 const ParallelOptions & options = {})
882 {
883 ThreadPool & pool = parallel_detail::selected_parallel_pool(options);
884 const size_t n = std::distance(std::begin(c), std::end(c));
885 if (n == 0)
886 return false;
887
888 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
889 {
890 for (auto it = std::begin(c); it != std::end(c); ++it)
891 {
892 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
893 if (pred(*it))
894 return true;
895 }
896 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
897 return false;
898 }
899
900 const size_t chunk_size =
901 parallel_detail::effective_parallel_chunk_size(n, pool, options);
902
903 auto data_holder = parallel_detail::ensure_random_access(c);
904 const auto & data = parallel_detail::deref(data_holder);
905
906 std::atomic<bool> found{false};
907 std::vector<std::future<void>> futures;
908
909 size_t offset = 0;
910 while (offset < n)
911 {
912 size_t chunk_end = std::min(offset + chunk_size, n);
913
914 futures.push_back(pool.enqueue([&data, pred, &found, offset, chunk_end,
915 token = options.cancel_token]()
916 {
917 parallel_detail::throw_if_parallel_canceled(token);
918 if (found.load(std::memory_order_relaxed))
919 return; // Short-circuit
920
921 auto it = std::begin(data);
922 std::advance(it, offset);
923 for (size_t i = offset; i < chunk_end; ++i, ++it)
924 {
925 if (pred(*it))
926 {
927 found.store(true, std::memory_order_relaxed);
928 return;
929 }
930 parallel_detail::throw_if_parallel_canceled(token);
931 if (found.load(std::memory_order_relaxed))
932 return;
933 }
934 }));
935
936 offset = chunk_end;
937 }
938
939 for (auto & f: futures)
940 f.get();
941
942 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
943 return found.load();
944 }
945
959 template <typename Container, typename Pred>
960 [[nodiscard]] bool pnone(ThreadPool & pool, const Container & c, Pred pred,
961 size_t chunk_size = 0)
962 {
963 return not pexists(pool, c, pred, chunk_size);
964 }
965
966 template <typename Container, typename Pred>
967 [[nodiscard]] bool pnone(const Container & c, Pred pred,
968 const ParallelOptions & options = {})
969 {
970 return not pexists(c, pred, options);
971 }
972
973 // =============================================================================
974 // Parallel Count
975 // =============================================================================
976
998 template <typename Container, typename Pred>
999 [[nodiscard]] size_t pcount_if(ThreadPool & pool, const Container & c, Pred pred,
1000 size_t chunk_size = 0)
1001 {
1003 options.pool = &pool;
1004 options.chunk_size = chunk_size;
1005 return pcount_if(c, pred, options);
1006 }
1007
1008 template <typename Container, typename Pred>
1009 [[nodiscard]] size_t pcount_if(const Container & c, Pred pred,
1010 const ParallelOptions & options = {})
1011 {
1012 ThreadPool & pool = parallel_detail::selected_parallel_pool(options);
1013 const size_t n = std::distance(std::begin(c), std::end(c));
1014 if (n == 0)
1015 return 0;
1016
1017 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
1018 {
1019 size_t total = 0;
1020 for (auto it = std::begin(c); it != std::end(c); ++it)
1021 {
1022 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1023 if (pred(*it))
1024 ++total;
1025 }
1026 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1027 return total;
1028 }
1029
1030 const size_t chunk_size =
1031 parallel_detail::effective_parallel_chunk_size(n, pool, options);
1032
1033 auto data_holder = parallel_detail::ensure_random_access(c);
1034 const auto & data = parallel_detail::deref(data_holder);
1035
1036 std::vector<std::future<size_t>> futures;
1037
1038 size_t offset = 0;
1039 while (offset < n)
1040 {
1041 size_t chunk_end = std::min(offset + chunk_size, n);
1042
1043 futures.push_back(pool.enqueue([&data, pred, offset, chunk_end,
1044 token = options.cancel_token]()
1045 {
1046 size_t count = 0;
1047 auto it = std::begin(data);
1048 std::advance(it, offset);
1049 for (size_t i = offset; i < chunk_end; ++i, ++it)
1050 {
1051 parallel_detail::throw_if_parallel_canceled(token);
1052 if (pred(*it))
1053 ++count;
1054 }
1055 return count;
1056 }));
1057
1058 offset = chunk_end;
1059 }
1060
1061 size_t total = 0;
1062 for (auto & f: futures)
1063 total += f.get();
1064
1065 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1066 return total;
1067 }
1068
1069 // =============================================================================
1070 // Parallel Find
1071 // =============================================================================
1072
1098 template <typename Container, typename Pred>
1099 [[nodiscard]] std::optional<size_t> pfind(ThreadPool & pool, const Container & c,
1100 Pred pred, size_t chunk_size = 0)
1101 {
1103 options.pool = &pool;
1104 options.chunk_size = chunk_size;
1105 return pfind(c, pred, options);
1106 }
1107
1108 template <typename Container, typename Pred>
1109 [[nodiscard]] std::optional<size_t> pfind(const Container & c,
1110 Pred pred,
1111 const ParallelOptions & options = {})
1112 {
1113 ThreadPool & pool = parallel_detail::selected_parallel_pool(options);
1114 const size_t n = std::distance(std::begin(c), std::end(c));
1115 if (n == 0)
1116 return std::nullopt;
1117
1118 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
1119 {
1120 size_t idx = 0;
1121 for (auto it = std::begin(c); it != std::end(c); ++it, ++idx)
1122 {
1123 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1124 if (pred(*it))
1125 return idx;
1126 }
1127 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1128 return std::nullopt;
1129 }
1130
1131 const size_t chunk_size =
1132 parallel_detail::effective_parallel_chunk_size(n, pool, options);
1133
1134 auto data_holder = parallel_detail::ensure_random_access(c);
1135 const auto & data = parallel_detail::deref(data_holder);
1136
1137 // Track minimum found index
1138 std::atomic<size_t> min_index{n}; // n means not found
1139 std::vector<std::future<void>> futures;
1140
1141 size_t offset = 0;
1142 while (offset < n)
1143 {
1144 size_t chunk_end = std::min(offset + chunk_size, n);
1145
1146 futures.push_back(pool.enqueue([&data, pred, &min_index, offset, chunk_end,
1147 token = options.cancel_token]()
1148 {
1149 parallel_detail::throw_if_parallel_canceled(token);
1150 // Skip if we already found something earlier
1151 if (min_index.load(std::memory_order_relaxed) <= offset)
1152 return;
1153
1154 auto it = std::begin(data);
1155 std::advance(it, offset);
1156 for (size_t i = offset; i < chunk_end; ++i, ++it)
1157 {
1158 // Stop if earlier match found
1159 if (min_index.load(std::memory_order_relaxed) <= i)
1160 return;
1161
1162 parallel_detail::throw_if_parallel_canceled(token);
1163 if (pred(*it))
1164 {
1165 // Atomically update minimum
1166 size_t expected = min_index.load(std::memory_order_relaxed);
1167 while (i < expected and
1168 not min_index.compare_exchange_weak(expected, i,
1169 std::memory_order_relaxed));
1170 return;
1171 }
1172 }
1173 }));
1174
1175 offset = chunk_end;
1176 }
1177
1178 for (auto & f: futures)
1179 f.get();
1180
1181 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1182 if (size_t result = min_index.load(); result < n)
1183 return result;
1184 return std::nullopt;
1185 }
1186
1210 template <typename Container, typename Pred>
1211 [[nodiscard]] auto pfind_value(ThreadPool & pool, const Container & c,
1212 Pred pred, size_t chunk_size = 0)
1213 {
1215 options.pool = &pool;
1216 options.chunk_size = chunk_size;
1217 return pfind_value(c, pred, options);
1218 }
1219
1220 template <typename Container, typename Pred>
1221 [[nodiscard]] auto pfind_value(const Container & c,
1222 Pred pred,
1223 const ParallelOptions & options = {})
1224 {
1225 using T = std::decay_t<decltype(*std::begin(c))>;
1226
1227 auto idx = pfind(c, pred, options);
1228 if (not idx)
1229 return std::optional<T>{std::nullopt};
1230
1231 auto it = std::begin(c);
1232 std::advance(it, *idx);
1233 return std::optional<T>{*it};
1234 }
1235
1236 // =============================================================================
1237 // Parallel Numeric Operations
1238 // =============================================================================
1239
1259 template <typename Container,
1260 typename T = std::decay_t<decltype(*std::begin(std::declval<Container>()))>>
1261 [[nodiscard]] T psum(ThreadPool & pool, const Container & c, T init = T{},
1262 size_t chunk_size = 0)
1263 {
1264 return pfoldl(pool, c, init, std::plus<T>{}, chunk_size);
1265 }
1266
1267 template <typename Container,
1268 typename T = std::decay_t<decltype(*std::begin(std::declval<Container>()))>>
1269 [[nodiscard]] T psum(const Container & c, T init, const ParallelOptions & options)
1270 {
1271 return pfoldl(c, init, std::plus<T>{}, options);
1272 }
1273
1285 template <typename Container,
1286 typename T = std::decay_t<decltype(*std::begin(std::declval<Container>()))>>
1287 [[nodiscard]] T pproduct(ThreadPool & pool, const Container & c, T init = T{1},
1288 size_t chunk_size = 0)
1289 {
1290 return pfoldl(pool, c, init, std::multiplies<T>{}, chunk_size);
1291 }
1292
1293 template <typename Container,
1294 typename T = std::decay_t<decltype(*std::begin(std::declval<Container>()))>>
1295 [[nodiscard]] T pproduct(const Container & c, T init, const ParallelOptions & options)
1296 {
1297 return pfoldl(c, init, std::multiplies<T>{}, options);
1298 }
1299
1310 template <typename Container>
1311 [[nodiscard]] auto pmin(ThreadPool & pool, const Container & c, size_t chunk_size = 0)
1312 {
1314 options.pool = &pool;
1315 options.chunk_size = chunk_size;
1316 return pmin(c, options);
1317 }
1318
1319 template <typename Container>
1320 [[nodiscard]] auto pmin(const Container & c, const ParallelOptions & options = {})
1321 {
1322 ThreadPool & pool = parallel_detail::selected_parallel_pool(options);
1323 using T = std::decay_t<decltype(*std::begin(c))>;
1324
1325 const size_t n = std::distance(std::begin(c), std::end(c));
1326 if (n == 0)
1327 return std::optional<T>{std::nullopt};
1328
1329 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
1330 {
1331 auto it = std::begin(c);
1332 T result = *it++;
1333 for (; it != std::end(c); ++it)
1334 {
1335 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1336 if (*it < result)
1337 result = *it;
1338 }
1339 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1340 return std::optional<T>{result};
1341 }
1342
1343 const size_t chunk_size =
1344 parallel_detail::effective_parallel_chunk_size(n, pool, options);
1345
1346 auto data_holder = parallel_detail::ensure_random_access(c);
1347 const auto & data = parallel_detail::deref(data_holder);
1348
1349 std::vector<std::future<T>> futures;
1350
1351 size_t offset = 0;
1352 while (offset < n)
1353 {
1354 size_t chunk_end = std::min(offset + chunk_size, n);
1355
1356 futures.push_back(pool.enqueue([&data, offset, chunk_end,
1357 token = options.cancel_token]()
1358 {
1359 auto it = std::begin(data);
1360 std::advance(it, offset);
1361 parallel_detail::throw_if_parallel_canceled(token);
1362 T local_min = *it++;
1363 for (size_t i = offset + 1; i < chunk_end; ++i, ++it)
1364 {
1365 parallel_detail::throw_if_parallel_canceled(token);
1366 if (*it < local_min)
1367 local_min = *it;
1368 }
1369 return local_min;
1370 }));
1371
1372 offset = chunk_end;
1373 }
1374
1375 T result = futures[0].get();
1376 for (size_t i = 1; i < futures.size(); ++i)
1377 {
1378 T val = futures[i].get();
1379 if (val < result)
1380 result = val;
1381 }
1382
1383 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1384 return std::optional<T>{result};
1385 }
1386
1397 template <typename Container>
1398 [[nodiscard]] auto pmax(ThreadPool & pool, const Container & c, size_t chunk_size = 0)
1399 {
1401 options.pool = &pool;
1402 options.chunk_size = chunk_size;
1403 return pmax(c, options);
1404 }
1405
1406 template <typename Container>
1407 [[nodiscard]] auto pmax(const Container & c, const ParallelOptions & options = {})
1408 {
1409 ThreadPool & pool = parallel_detail::selected_parallel_pool(options);
1410 using T = std::decay_t<decltype(*std::begin(c))>;
1411
1412 const size_t n = std::distance(std::begin(c), std::end(c));
1413 if (n == 0)
1414 return std::optional<T>{std::nullopt};
1415
1416 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
1417 {
1418 auto it = std::begin(c);
1419 T result = *it++;
1420 for (; it != std::end(c); ++it)
1421 {
1422 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1423 if (*it > result)
1424 result = *it;
1425 }
1426 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1427 return std::optional<T>{result};
1428 }
1429
1430 const size_t chunk_size =
1431 parallel_detail::effective_parallel_chunk_size(n, pool, options);
1432
1433 auto data_holder = parallel_detail::ensure_random_access(c);
1434 const auto & data = parallel_detail::deref(data_holder);
1435
1436 std::vector<std::future<T>> futures;
1437
1438 size_t offset = 0;
1439 while (offset < n)
1440 {
1441 size_t chunk_end = std::min(offset + chunk_size, n);
1442
1443 futures.push_back(pool.enqueue([&data, offset, chunk_end,
1444 token = options.cancel_token]()
1445 {
1446 auto it = std::begin(data);
1447 std::advance(it, offset);
1448 parallel_detail::throw_if_parallel_canceled(token);
1449 T local_max = *it++;
1450 for (size_t i = offset + 1; i < chunk_end; ++i, ++it)
1451 {
1452 parallel_detail::throw_if_parallel_canceled(token);
1453 if (*it > local_max)
1454 local_max = *it;
1455 }
1456 return local_max;
1457 }));
1458
1459 offset = chunk_end;
1460 }
1461
1462 T result = futures[0].get();
1463 for (size_t i = 1; i < futures.size(); ++i)
1464 {
1465 T val = futures[i].get();
1466 if (val > result)
1467 result = val;
1468 }
1469
1470 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1471 return std::optional<T>{result};
1472 }
1473
1484 template <typename Container>
1485 [[nodiscard]] auto pminmax(ThreadPool & pool, const Container & c, size_t chunk_size = 0)
1486 {
1487 using T = std::decay_t<decltype(*std::begin(c))>;
1488
1489 const size_t n = std::distance(std::begin(c), std::end(c));
1490 if (n == 0)
1491 return std::optional<std::pair<T, T>>{std::nullopt};
1492
1493 if (chunk_size == 0)
1494 chunk_size = parallel_detail::chunk_size(n, pool.num_threads());
1495
1496 auto data_holder = parallel_detail::ensure_random_access(c);
1497 const auto & data = parallel_detail::deref(data_holder);
1498
1499 std::vector<std::future<std::pair<T, T>>> futures;
1500
1501 size_t offset = 0;
1502 while (offset < n)
1503 {
1504 size_t chunk_end = std::min(offset + chunk_size, n);
1505
1506 futures.push_back(pool.enqueue([&data, offset, chunk_end]()
1507 {
1508 auto it = std::begin(data);
1509 std::advance(it, offset);
1510 T local_min = *it;
1511 T local_max = *it++;
1512 for (size_t i = offset + 1; i < chunk_end; ++i, ++it)
1513 {
1514 if (*it < local_min) local_min = *it;
1515 if (*it > local_max) local_max = *it;
1516 }
1517 return std::make_pair(local_min, local_max);
1518 }));
1519
1520 offset = chunk_end;
1521 }
1522
1523 auto result = futures[0].get();
1524 for (size_t i = 1; i < futures.size(); ++i)
1525 {
1526 auto [mi, ma] = futures[i].get();
1527 if (mi < result.first) result.first = mi;
1528 if (ma > result.second) result.second = ma;
1529 }
1530
1531 return std::optional<std::pair<T, T>>{result};
1532 }
1533
1534 // =============================================================================
1535 // Parallel Sort
1536 // =============================================================================
1537
1565 template <typename Container, typename Compare = std::less<>>
1566 void psort(ThreadPool & pool, Container & c, Compare cmp = Compare{},
1567 const size_t min_parallel_size = 1024)
1568 {
1569 ParallelOptions options;
1570 options.pool = &pool;
1571 options.min_size = min_parallel_size;
1572 psort(c, std::move(cmp), options);
1573 }
1574
1575 template <typename Container, typename Compare = std::less<>>
1576 void psort(Container & c, Compare cmp = Compare{},
1577 const ParallelOptions & options = {})
1578 {
1579 const size_t n = std::distance(std::begin(c), std::end(c));
1580 if (n <= 1)
1581 return;
1582
1583 auto & pool = parallel_detail::selected_parallel_pool(options);
1584 const size_t min_parallel_size = options.min_size == 0 ? 1024 : options.min_size;
1585
1586 // For small sizes, use regular sort
1587 if (parallel_detail::use_sequential_parallel_path(n, pool, options)
1588 or n <= min_parallel_size)
1589 {
1590 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1591 std::sort(std::begin(c), std::end(c), cmp);
1592 return;
1593 }
1594
1595 // Split into chunks, sort each in parallel, then merge
1596 size_t num_chunks = std::min(pool.num_threads() * 2, n / min_parallel_size);
1597 if (options.max_tasks > 0)
1598 num_chunks = std::min(num_chunks, options.max_tasks);
1599 num_chunks = std::max<size_t>(1, num_chunks);
1600 const size_t chunk_size = (n + num_chunks - 1) / num_chunks;
1601
1602 // Sort chunks in parallel
1603 std::vector<std::future<void>> futures;
1604 for (size_t i = 0; i < n; i += chunk_size)
1605 {
1606 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1607 size_t end = std::min(i + chunk_size, n);
1608 auto begin_it = std::begin(c);
1609 std::advance(begin_it, i);
1610 auto end_it = std::begin(c);
1611 std::advance(end_it, end);
1612
1613 futures.push_back(pool.enqueue([begin_it, end_it, cmp, token = options.cancel_token]()
1614 {
1615 parallel_detail::throw_if_parallel_canceled(token);
1616 std::sort(begin_it, end_it, cmp);
1617 }));
1618 }
1619
1620 for (auto & f: futures)
1621 f.get();
1622
1623 // Merge sorted chunks
1624 using T = std::decay_t<decltype(*std::begin(c))>;
1625 std::vector<std::optional<T>> buffer(n);
1626
1627 ParallelOptions merge_options = options;
1628 merge_options.pool = &pool;
1629
1630 for (size_t width = chunk_size; width < n; width *= 2)
1631 {
1632 for (size_t i = 0; i < n; i += 2 * width)
1633 {
1634 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1635 size_t mid = std::min(i + width, n);
1636 size_t end = std::min(i + 2 * width, n);
1637
1638 if (mid < end)
1639 {
1640 auto begin_it = std::begin(c);
1641 std::advance(begin_it, i);
1642 auto mid_it = std::begin(c);
1643 std::advance(mid_it, mid);
1644 auto end_it = std::begin(c);
1645 std::advance(end_it, end);
1646
1647 Aleph::pmerge(begin_it, mid_it, mid_it, end_it,
1648 buffer.begin() + i, cmp, merge_options);
1649 }
1650 else
1651 {
1652 // Copy remaining elements
1653 auto begin_it = std::begin(c);
1654 std::advance(begin_it, i);
1655 auto end_it = std::begin(c);
1656 std::advance(end_it, mid);
1657 std::copy(begin_it, end_it, buffer.begin() + i);
1658 }
1659 }
1660
1661 // Swap buffer back to container
1662 auto it = std::begin(c);
1663 for (size_t i = 0; i < n; ++i, ++it)
1664 *it = std::move(*buffer[i]);
1665 }
1666 }
1667
1668 // =============================================================================
1669 // Parallel Zip Operations
1670 // =============================================================================
1671
1699 template <typename Container1, typename Container2, typename Op>
1700 void pzip_for_each(ThreadPool & pool, const Container1 & c1, const Container2 & c2,
1701 Op op, size_t chunk_size = 0)
1702 {
1704 options.pool = &pool;
1705 options.chunk_size = chunk_size;
1706 pzip_for_each(c1, c2, op, options);
1707 }
1708
1709 template <typename Container1, typename Container2, typename Op>
1710 void pzip_for_each(const Container1 & c1, const Container2 & c2, Op op,
1711 const ParallelOptions & options = {})
1712 {
1713 const size_t n1 = std::distance(std::begin(c1), std::end(c1));
1714 const size_t n2 = std::distance(std::begin(c2), std::end(c2));
1715 const size_t n = std::min(n1, n2);
1716
1717 if (n == 0)
1718 return;
1719
1720 auto & pool = parallel_detail::selected_parallel_pool(options);
1721 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
1722 {
1723 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1724 auto it1 = std::begin(c1);
1725 auto it2 = std::begin(c2);
1726 for (size_t i = 0; i < n; ++i, ++it1, ++it2)
1727 {
1728 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1729 op(*it1, *it2);
1730 }
1731 return;
1732 }
1733
1734 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool, options,
1735 pool.num_threads());
1736
1737 auto h1 = parallel_detail::ensure_random_access(c1);
1738 auto h2 = parallel_detail::ensure_random_access(c2);
1739 const auto & d1 = parallel_detail::deref(h1);
1740 const auto & d2 = parallel_detail::deref(h2);
1741
1742 std::vector<std::future<void>> futures;
1743
1744 size_t offset = 0;
1745 while (offset < n)
1746 {
1747 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1748 size_t chunk_end = std::min(offset + chunk_size, n);
1749
1750 futures.push_back(pool.enqueue([&d1, &d2, op, offset, chunk_end,
1751 token = options.cancel_token]()
1752 {
1753 auto it1 = std::begin(d1);
1754 auto it2 = std::begin(d2);
1755 std::advance(it1, offset);
1756 std::advance(it2, offset);
1757 for (size_t i = offset; i < chunk_end; ++i, ++it1, ++it2)
1758 {
1759 parallel_detail::throw_if_parallel_canceled(token);
1760 op(*it1, *it2);
1761 }
1762 }));
1763
1764 offset = chunk_end;
1765 }
1766
1767 for (auto & f: futures)
1768 f.get();
1769 }
1770
1795 template <typename Container1, typename Container2, typename Op>
1796 [[nodiscard]] auto pzip_maps(ThreadPool & pool, const Container1 & c1,
1797 const Container2 & c2, Op op, size_t chunk_size = 0)
1798 {
1800 options.pool = &pool;
1801 options.chunk_size = chunk_size;
1802 return pzip_maps(c1, c2, op, options);
1803 }
1804
1805 template <typename Container1, typename Container2, typename Op>
1806 [[nodiscard]] auto pzip_maps(const Container1 & c1, const Container2 & c2, Op op,
1807 const ParallelOptions & options = {})
1808 {
1809 using T1 = std::decay_t<decltype(*std::begin(c1))>;
1810 using T2 = std::decay_t<decltype(*std::begin(c2))>;
1811 using ResultT = std::invoke_result_t<Op, const T1 &, const T2 &>;
1812
1813 const size_t n1 = std::distance(std::begin(c1), std::end(c1));
1814 const size_t n2 = std::distance(std::begin(c2), std::end(c2));
1815 const size_t n = std::min(n1, n2);
1816
1817 if (n == 0)
1818 return std::vector<ResultT>{};
1819
1820 auto & pool = parallel_detail::selected_parallel_pool(options);
1821 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
1822 {
1823 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1824 std::vector<ResultT> result;
1825 result.reserve(n);
1826 auto it1 = std::begin(c1);
1827 auto it2 = std::begin(c2);
1828 for (size_t i = 0; i < n; ++i, ++it1, ++it2)
1829 {
1830 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1831 result.push_back(op(*it1, *it2));
1832 }
1833 return result;
1834 }
1835
1836 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool, options,
1837 pool.num_threads());
1838
1839 auto h1 = parallel_detail::ensure_random_access(c1);
1840 auto h2 = parallel_detail::ensure_random_access(c2);
1841 const auto & d1 = parallel_detail::deref(h1);
1842 const auto & d2 = parallel_detail::deref(h2);
1843
1844 std::vector<std::optional<ResultT>> slots(n);
1845 std::vector<std::future<void>> futures;
1846
1847 size_t offset = 0;
1848 while (offset < n)
1849 {
1850 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1851 size_t chunk_end = std::min(offset + chunk_size, n);
1852
1853 futures.push_back(pool.enqueue([&slots, &d1, &d2, op, offset, chunk_end,
1854 token = options.cancel_token]()
1855 {
1856 auto it1 = std::begin(d1);
1857 auto it2 = std::begin(d2);
1858 std::advance(it1, offset);
1859 std::advance(it2, offset);
1860 for (size_t i = offset; i < chunk_end; ++i, ++it1, ++it2)
1861 {
1862 parallel_detail::throw_if_parallel_canceled(token);
1863 slots[i] = op(*it1, *it2);
1864 }
1865 }));
1866
1867 offset = chunk_end;
1868 }
1869
1870 for (auto & f: futures)
1871 f.get();
1872
1873 std::vector<ResultT> result;
1874 result.reserve(n);
1875 for (auto & slot : slots)
1876 result.push_back(std::move(*slot));
1877
1878 return result;
1879 }
1880
1909 template <typename Container1, typename Container2, typename T, typename Op>
1910 [[nodiscard]] T pzip_foldl(ThreadPool & pool, const Container1 & c1,
1911 const Container2 & c2, T init, Op op,
1912 size_t chunk_size = 0)
1913 {
1915 options.pool = &pool;
1916 options.chunk_size = chunk_size;
1917 return pzip_foldl(c1, c2, init, op, options);
1918 }
1919
1920 template <typename Container1, typename Container2, typename T, typename Op>
1921 [[nodiscard]] T pzip_foldl(const Container1 & c1, const Container2 & c2, T init, Op op,
1922 const ParallelOptions & options = {})
1923 {
1924 const size_t n1 = std::distance(std::begin(c1), std::end(c1));
1925 const size_t n2 = std::distance(std::begin(c2), std::end(c2));
1926 const size_t n = std::min(n1, n2);
1927
1928 if (n == 0)
1929 return init;
1930
1931 auto & pool = parallel_detail::selected_parallel_pool(options);
1932 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
1933 {
1934 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1935 auto it1 = std::begin(c1);
1936 auto it2 = std::begin(c2);
1937 T result = init;
1938 for (size_t i = 0; i < n; ++i, ++it1, ++it2)
1939 {
1940 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1941 result = op(result, *it1, *it2);
1942 }
1943 return result;
1944 }
1945
1946 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool, options,
1947 pool.num_threads());
1948
1949 auto h1 = parallel_detail::ensure_random_access(c1);
1950 auto h2 = parallel_detail::ensure_random_access(c2);
1951 const auto & d1 = parallel_detail::deref(h1);
1952 const auto & d2 = parallel_detail::deref(h2);
1953
1954 std::vector<std::future<T>> futures;
1955
1956 size_t offset = 0;
1957 while (offset < n)
1958 {
1959 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
1960 size_t chunk_end = std::min(offset + chunk_size, n);
1961
1962 futures.push_back(pool.enqueue([&d1, &d2, &init, op, offset, chunk_end,
1963 token = options.cancel_token]()
1964 {
1965 auto it1 = std::begin(d1);
1966 auto it2 = std::begin(d2);
1967 std::advance(it1, offset);
1968 std::advance(it2, offset);
1969
1970 parallel_detail::throw_if_parallel_canceled(token);
1971 T local = op(init, *it1++, *it2++);
1972 for (size_t i = offset + 1; i < chunk_end; ++i, ++it1, ++it2)
1973 {
1974 parallel_detail::throw_if_parallel_canceled(token);
1975 local = op(local, *it1, *it2);
1976 }
1977 return local;
1978 }));
1979
1980 offset = chunk_end;
1981 }
1982
1983 // Binary reduce the partial results
1984 // We need a binary op for this - derive it from the ternary op
1985 T result = futures[0].get();
1986 for (size_t i = 1; i < futures.size(); ++i)
1987 {
1988 T val = futures[i].get();
1989 // Combine using addition - user should use pfoldl + pzip_maps for complex cases
1990 result = result + val - init; // Compensate for init being added in each chunk
1991 }
1992
1993 return result;
1994 }
1995
1996 // =============================================================================
1997 // Parallel Partition
1998 // =============================================================================
1999
2021 template <typename Container, typename Pred>
2022 [[nodiscard]] auto ppartition(ThreadPool & pool, const Container & c, Pred pred,
2023 size_t chunk_size = 0)
2024 {
2026 options.pool = &pool;
2027 options.chunk_size = chunk_size;
2028 return ppartition(c, pred, options);
2029 }
2030
2031 template <typename Container, typename Pred>
2032 [[nodiscard]] auto ppartition(const Container & c, Pred pred,
2033 const ParallelOptions & options = {})
2034 {
2035 using T = std::decay_t<decltype(*std::begin(c))>;
2036 ThreadPool & pool = parallel_detail::selected_parallel_pool(options);
2037
2038 const size_t n = std::distance(std::begin(c), std::end(c));
2039 if (n == 0)
2040 return std::make_pair(std::vector<T>{}, std::vector<T>{});
2041
2042 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
2043 {
2044 std::vector<T> yes_result, no_result;
2045 for (auto it = std::begin(c); it != std::end(c); ++it)
2046 {
2047 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
2048 if (pred(*it))
2049 yes_result.push_back(*it);
2050 else
2051 no_result.push_back(*it);
2052 }
2053 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
2054 return std::make_pair(std::move(yes_result), std::move(no_result));
2055 }
2056
2057 const size_t chunk_size =
2058 parallel_detail::effective_parallel_chunk_size(n, pool, options);
2059
2060 auto data_holder = parallel_detail::ensure_random_access(c);
2061 const auto & data = parallel_detail::deref(data_holder);
2062
2063 const size_t num_chunks = parallel_detail::chunk_count(n, chunk_size);
2064 std::vector<size_t> yes_counts(num_chunks, 0);
2065 std::vector<size_t> no_counts(num_chunks, 0);
2066 std::vector<std::future<void>> count_futures;
2067 count_futures.reserve(num_chunks);
2068
2069 for (size_t chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx)
2070 {
2071 const auto bounds = parallel_detail::bounds_for_chunk(chunk_idx, n, chunk_size);
2072
2073 count_futures.push_back(pool.enqueue([&data, &yes_counts, &no_counts, pred,
2074 chunk_idx,
2075 offset = bounds.offset,
2076 chunk_end = bounds.end,
2077 token = options.cancel_token]()
2078 {
2079 size_t yes = 0;
2080 size_t no = 0;
2081 auto it = std::begin(data);
2082 std::advance(it, offset);
2083 for (size_t i = offset; i < chunk_end; ++i, ++it)
2084 {
2085 parallel_detail::throw_if_parallel_canceled(token);
2086 if (pred(*it))
2087 ++yes;
2088 else
2089 ++no;
2090 }
2091 yes_counts[chunk_idx] = yes;
2092 no_counts[chunk_idx] = no;
2093 }));
2094 }
2095
2096 for (auto & f: count_futures)
2097 f.get();
2098
2099 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
2100
2101 std::vector<size_t> yes_offsets(num_chunks, 0);
2102 std::vector<size_t> no_offsets(num_chunks, 0);
2103 if (num_chunks > 0)
2104 {
2105 Aleph::pexclusive_scan(yes_counts.begin(), yes_counts.end(), yes_offsets.begin(),
2106 size_t{0}, std::plus<size_t>{}, options);
2107 Aleph::pexclusive_scan(no_counts.begin(), no_counts.end(), no_offsets.begin(),
2108 size_t{0}, std::plus<size_t>{}, options);
2109 }
2110
2111 const size_t yes_total = num_chunks == 0 ? 0 : yes_offsets.back() + yes_counts.back();
2112 const size_t no_total = num_chunks == 0 ? 0 : no_offsets.back() + no_counts.back();
2113
2114 std::vector<std::optional<T>> yes_slots(yes_total);
2115 std::vector<std::optional<T>> no_slots(no_total);
2116 std::vector<std::future<void>> fill_futures;
2117 fill_futures.reserve(num_chunks);
2118
2119 for (size_t chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx)
2120 {
2121 const auto bounds = parallel_detail::bounds_for_chunk(chunk_idx, n, chunk_size);
2122 const size_t yes_offset = yes_offsets[chunk_idx];
2123 const size_t no_offset = no_offsets[chunk_idx];
2124
2125 fill_futures.push_back(pool.enqueue([&data, &yes_slots, &no_slots, pred,
2126 offset = bounds.offset,
2127 chunk_end = bounds.end,
2128 yes_offset, no_offset,
2129 token = options.cancel_token]()
2130 {
2131 auto it = std::begin(data);
2132 std::advance(it, offset);
2133 size_t yes_pos = yes_offset;
2134 size_t no_pos = no_offset;
2135 for (size_t i = offset; i < chunk_end; ++i, ++it)
2136 {
2137 parallel_detail::throw_if_parallel_canceled(token);
2138 if (pred(*it))
2139 yes_slots[yes_pos++].emplace(*it);
2140 else
2141 no_slots[no_pos++].emplace(*it);
2142 }
2143 }));
2144 }
2145
2146 for (auto & f: fill_futures)
2147 f.get();
2148
2149 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
2150
2151 std::vector<T> yes_result, no_result;
2152 yes_result.reserve(yes_total);
2153 no_result.reserve(no_total);
2154
2155 for (auto & slot: yes_slots)
2156 yes_result.push_back(std::move(*slot));
2157 for (auto & slot: no_slots)
2158 no_result.push_back(std::move(*slot));
2159
2160 return std::make_pair(std::move(yes_result), std::move(no_result));
2161 }
2162
2163 // =============================================================================
2164 // Parallel Scan and Merge Wrappers
2165 // =============================================================================
2166
2190 template <typename Container, typename BinaryOp>
2191 [[nodiscard]] auto pscan(ThreadPool & pool, const Container & c, BinaryOp op,
2192 size_t chunk_size = 0)
2193 {
2195 options.pool = &pool;
2196 options.chunk_size = chunk_size;
2197 return pscan(c, op, options);
2198 }
2199
2220 template <typename Container, typename BinaryOp>
2221 [[nodiscard]] auto pscan(const Container & c, BinaryOp op,
2222 const ParallelOptions & options = {})
2223 {
2224 using T = std::decay_t<decltype(*std::begin(c))>;
2225
2226 auto holder = parallel_detail::ensure_random_access(c);
2227 const auto & data = parallel_detail::deref(holder);
2228 const size_t n = static_cast<size_t>(std::distance(std::begin(data), std::end(data)));
2229 std::vector<std::optional<T>> slots(n);
2230
2231 Aleph::pscan(std::begin(data), std::end(data), slots.begin(), op, options);
2232
2233 std::vector<T> result;
2234 result.reserve(n);
2235 for (auto & slot: slots)
2236 result.push_back(std::move(*slot));
2237 return result;
2238 }
2239
2266 template <typename Container, typename T, typename BinaryOp>
2267 [[nodiscard]] auto pexclusive_scan(ThreadPool & pool, const Container & c, T init,
2268 BinaryOp op, size_t chunk_size = 0)
2269 {
2271 options.pool = &pool;
2272 options.chunk_size = chunk_size;
2273 return pexclusive_scan(c, init, op, options);
2274 }
2275
2296 template <typename Container, typename T, typename BinaryOp>
2297 [[nodiscard]] auto pexclusive_scan(const Container & c, T init, BinaryOp op,
2298 const ParallelOptions & options = {})
2299 {
2300 auto holder = parallel_detail::ensure_random_access(c);
2301 const auto & data = parallel_detail::deref(holder);
2302 const size_t n = static_cast<size_t>(std::distance(std::begin(data), std::end(data)));
2303 std::vector<std::optional<T>> slots(n);
2304
2305 Aleph::pexclusive_scan(std::begin(data), std::end(data), slots.begin(),
2306 init, op, options);
2307
2308 std::vector<T> result;
2309 result.reserve(n);
2310 for (auto & slot: slots)
2311 result.push_back(std::move(*slot));
2312 return result;
2313 }
2314
2340 template <typename Container1, typename Container2, typename Compare = std::less<>>
2341 [[nodiscard]] auto pmerge(ThreadPool & pool, const Container1 & c1, const Container2 & c2,
2342 Compare comp = Compare{}, size_t chunk_size = 0)
2343 {
2344 ParallelOptions options;
2345 options.pool = &pool;
2346 options.chunk_size = chunk_size;
2347 return pmerge(c1, c2, std::move(comp), options);
2348 }
2349
2370 template <typename Container1, typename Container2, typename Compare = std::less<>>
2371 [[nodiscard]] auto pmerge(const Container1 & c1, const Container2 & c2,
2372 Compare comp = Compare{},
2373 const ParallelOptions & options = {})
2374 {
2375 using T1 = std::decay_t<decltype(*std::begin(c1))>;
2376 using T2 = std::decay_t<decltype(*std::begin(c2))>;
2377 using ResultT = std::common_type_t<T1, T2>;
2378
2379 auto h1 = parallel_detail::ensure_random_access(c1);
2380 auto h2 = parallel_detail::ensure_random_access(c2);
2381 const auto & d1 = parallel_detail::deref(h1);
2382 const auto & d2 = parallel_detail::deref(h2);
2383
2384 const size_t total = static_cast<size_t>(std::distance(std::begin(d1), std::end(d1))
2385 + std::distance(std::begin(d2), std::end(d2)));
2386 std::vector<std::optional<ResultT>> slots(total);
2387
2388 Aleph::pmerge(std::begin(d1), std::end(d1),
2389 std::begin(d2), std::end(d2),
2390 slots.begin(), comp, options);
2391
2392 std::vector<ResultT> result;
2393 result.reserve(total);
2394 for (auto & slot: slots)
2395 result.push_back(std::move(*slot));
2396 return result;
2397 }
2398
2399 // =============================================================================
2400 // Variadic Parallel Zip Operations (N containers)
2401 // =============================================================================
2402
2403 namespace parallel_zip_detail
2404 {
2413 template <typename Container>
2415 {
2416 using value_type = std::decay_t<decltype(*std::begin(std::declval<Container &>()))>;
2417 using holder_type = std::conditional_t<
2418 parallel_detail::has_random_access<Container>(),
2419 const Container *,
2420 std::unique_ptr<std::vector<value_type>>>;
2421
2424
2425 explicit ContainerHolder(const Container & c)
2426 {
2427 if constexpr (parallel_detail::has_random_access<Container>())
2428 {
2429 data = &c;
2430 // For random access, std::distance is O(1)
2431 cached_size = static_cast<size_t>(std::distance(std::begin(c), std::end(c)));
2432 }
2433 else
2434 {
2435 // Copy to vector (O(n) - unavoidable), then get size from vector (O(1))
2436 data = std::make_unique<std::vector<value_type>>(std::begin(c), std::end(c));
2437 cached_size = data->size();
2438 }
2439 }
2440
2441 decltype(auto) get() const
2442 {
2443 if constexpr (parallel_detail::has_random_access<Container>())
2444 return *data;
2445 else
2446 return *data;
2447 }
2448
2450 [[nodiscard]] size_t size() const noexcept { return cached_size; }
2451
2452 auto begin() const { return std::begin(get()); }
2453 auto end() const { return std::end(get()); }
2454 };
2455
2457 template <typename... Holders, size_t... Is>
2458 size_t min_holder_size_impl(const std::tuple<Holders...> & holders,
2459 std::index_sequence<Is...>)
2460 {
2461 return std::min({std::get<Is>(holders).size()...});
2462 }
2463
2464 template <typename... Holders>
2465 size_t min_holder_size(const std::tuple<Holders...> & holders)
2466 {
2467 return min_holder_size_impl(holders, std::make_index_sequence<sizeof...(Holders)>{});
2468 }
2469
2471 template <typename... Holders, size_t... Is>
2472 auto make_iterators_at(size_t offset, const std::tuple<Holders...> & holders,
2473 std::index_sequence<Is...>)
2474 {
2475 return std::make_tuple([&]()
2476 {
2477 auto it = std::get<Is>(holders).begin();
2478 std::advance(it, offset);
2479 return it;
2480 }()...);
2481 }
2482
2484 template <typename... Iters, size_t... Is>
2485 void advance_all_iters(std::tuple<Iters...> & iters, std::index_sequence<Is...>)
2486 {
2487 (++std::get<Is>(iters), ...);
2488 }
2489
2491 template <typename... Iters, size_t... Is>
2492 auto deref_all_iters(const std::tuple<Iters...> & iters, std::index_sequence<Is...>)
2493 {
2494 return std::make_tuple(*std::get<Is>(iters)...);
2495 }
2496 } // namespace parallel_zip_detail
2497
2530 template <typename Op, typename... Containers>
2531 void pzip_for_each_n(ThreadPool & pool, Op op, const Containers &... cs)
2532 {
2534 options.pool = &pool;
2535 pzip_for_each_n(op, options, cs...);
2536 }
2537
2538 template <typename Op, typename... Containers>
2539 void pzip_for_each_n(Op op, const ParallelOptions & options, const Containers &... cs)
2540 {
2541 static_assert(sizeof...(Containers) >= 2,
2542 "pzip_for_each requires at least 2 containers");
2543
2544 // Convert all containers to random access FIRST
2545 // This is O(n) for non-RA containers, but unavoidable
2546 auto holders = std::make_tuple(parallel_zip_detail::ContainerHolder<Containers>(cs)...);
2547
2548 // Now get min size - O(1) because all holders have cached sizes
2549 const size_t n = parallel_zip_detail::min_holder_size(holders);
2550 if (n == 0)
2551 return;
2552
2553 auto & pool = parallel_detail::selected_parallel_pool(options);
2554 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
2555 {
2556 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
2557 constexpr size_t N = sizeof...(Containers);
2558 auto iters = parallel_zip_detail::make_iterators_at(
2559 0, holders, std::make_index_sequence<N>{});
2560 for (size_t i = 0; i < n; ++i)
2561 {
2562 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
2563 std::apply(op, parallel_zip_detail::deref_all_iters(
2564 iters, std::make_index_sequence<N>{}));
2565 parallel_zip_detail::advance_all_iters(iters, std::make_index_sequence<N>{});
2566 }
2567 return;
2568 }
2569
2570 const size_t chunk_size = parallel_detail::effective_parallel_chunk_size(
2571 n, pool, options, pool.num_threads());
2572
2573 std::vector<std::future<void>> futures;
2574
2575 size_t offset = 0;
2576 while (offset < n)
2577 {
2578 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
2579 size_t chunk_end = std::min(offset + chunk_size, n);
2580
2581 futures.push_back(pool.enqueue([&holders, op, offset, chunk_end,
2582 token = options.cancel_token]()
2583 {
2584 constexpr size_t N = sizeof...(Containers);
2585 auto iters = parallel_zip_detail::make_iterators_at(
2586 offset, holders, std::make_index_sequence<N>{});
2587
2588 for (size_t i = offset; i < chunk_end; ++i)
2589 {
2590 parallel_detail::throw_if_parallel_canceled(token);
2591 std::apply(op, parallel_zip_detail::deref_all_iters(
2592 iters, std::make_index_sequence<N>{}));
2593 parallel_zip_detail::advance_all_iters(
2594 iters, std::make_index_sequence<N>{});
2595 }
2596 }));
2597
2598 offset = chunk_end;
2599 }
2600
2601 for (auto & f: futures)
2602 f.get();
2603 }
2604
2636 template <typename Op, typename... Containers>
2637 [[nodiscard]] auto pzip_maps_n(ThreadPool & pool, Op op, const Containers &... cs)
2638 {
2640 options.pool = &pool;
2641 return pzip_maps_n(op, options, cs...);
2642 }
2643
2644 template <typename Op, typename... Containers>
2645 [[nodiscard]] auto pzip_maps_n(Op op, const ParallelOptions & options,
2646 const Containers &... cs)
2647 {
2648 static_assert(sizeof...(Containers) >= 2,
2649 "pzip_maps requires at least 2 containers");
2650
2651 // Deduce result type from operation
2652 using ResultT = std::invoke_result_t<Op,
2653 std::decay_t<decltype(*std::begin(cs))>...>;
2654
2655 // Convert all containers to random access FIRST
2656 auto holders = std::make_tuple(parallel_zip_detail::ContainerHolder<Containers>(cs)...);
2657
2658 // Now get min size - O(1) because all holders have cached sizes
2659 const size_t n = parallel_zip_detail::min_holder_size(holders);
2660 if (n == 0)
2661 return std::vector<ResultT>{};
2662
2663 auto & pool = parallel_detail::selected_parallel_pool(options);
2664 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
2665 {
2666 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
2667 std::vector<std::optional<ResultT>> slots(n);
2668 constexpr size_t N = sizeof...(Containers);
2669 auto iters = parallel_zip_detail::make_iterators_at(
2670 0, holders, std::make_index_sequence<N>{});
2671 for (size_t i = 0; i < n; ++i)
2672 {
2673 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
2674 slots[i] = std::apply(op, parallel_zip_detail::deref_all_iters(
2675 iters, std::make_index_sequence<N>{}));
2676 parallel_zip_detail::advance_all_iters(iters, std::make_index_sequence<N>{});
2677 }
2678
2679 std::vector<ResultT> result;
2680 result.reserve(n);
2681 for (auto & slot: slots)
2682 result.push_back(std::move(*slot));
2683 return result;
2684 }
2685
2686 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool, options,
2687 pool.num_threads());
2688
2689 std::vector<std::optional<ResultT>> slots(n);
2690 std::vector<std::future<void>> futures;
2691
2692 size_t offset = 0;
2693 while (offset < n)
2694 {
2695 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
2696 size_t chunk_end = std::min(offset + chunk_size, n);
2697
2698 futures.push_back(pool.enqueue([&slots, &holders, op, offset, chunk_end,
2699 token = options.cancel_token]()
2700 {
2701 constexpr size_t N = sizeof...(Containers);
2702 auto iters = parallel_zip_detail::make_iterators_at(
2703 offset, holders, std::make_index_sequence<N>{});
2704
2705 for (size_t i = offset; i < chunk_end; ++i)
2706 {
2707 parallel_detail::throw_if_parallel_canceled(token);
2708 slots[i] = std::apply(op, parallel_zip_detail::deref_all_iters(
2709 iters, std::make_index_sequence<N>{}));
2710 parallel_zip_detail::advance_all_iters(iters, std::make_index_sequence<N>{});
2711 }
2712 }));
2713 offset = chunk_end;
2714 }
2715
2716 for (auto & f: futures)
2717 f.get();
2718
2719 std::vector<ResultT> result;
2720 result.reserve(n);
2721 for (auto & slot: slots)
2722 result.push_back(std::move(*slot));
2723 return result;
2724
2725 }
2726
2765 template <typename T, typename Op, typename Combiner, typename... Containers>
2766 [[nodiscard]] T pzip_foldl_n(ThreadPool & pool, T init, Op op, Combiner combiner,
2767 const Containers &... cs)
2768 {
2770 options.pool = &pool;
2771 return pzip_foldl_n(init, op, combiner, options, cs...);
2772 }
2773
2774 template <typename T, typename Op, typename Combiner, typename... Containers>
2775 [[nodiscard]] T pzip_foldl_n(T init, Op op, Combiner combiner,
2776 const ParallelOptions & options, const Containers &... cs)
2777 {
2778 static_assert(sizeof...(Containers) >= 2,
2779 "pzip_foldl requires at least 2 containers");
2780
2781 // Convert all containers to random access FIRST
2782 auto holders = std::make_tuple(parallel_zip_detail::ContainerHolder<Containers>(cs)...);
2783
2784 // Now get min size - O(1) because all holders have cached sizes
2785 const size_t n = parallel_zip_detail::min_holder_size(holders);
2786 if (n == 0)
2787 return init;
2788
2789 auto & pool = parallel_detail::selected_parallel_pool(options);
2790 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
2791 {
2792 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
2793 constexpr size_t N = sizeof...(Containers);
2794 auto iters = parallel_zip_detail::make_iterators_at(
2795 0, holders, std::make_index_sequence<N>{});
2796 T result = init;
2797 for (size_t i = 0; i < n; ++i)
2798 {
2799 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
2800 auto tuple = parallel_zip_detail::deref_all_iters(iters, std::make_index_sequence<N>{});
2801 result = std::apply([&op, &result](auto &&... args)
2802 {
2803 return op(result, std::forward<decltype(args)>(args)...);
2804 }, tuple);
2805 parallel_zip_detail::advance_all_iters(iters, std::make_index_sequence<N>{});
2806 }
2807 return result;
2808 }
2809
2810 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool, options,
2811 pool.num_threads());
2812
2813 std::vector<std::future<T>> futures;
2814
2815 size_t offset = 0;
2816 while (offset < n)
2817 {
2818 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
2819 size_t chunk_end = std::min(offset + chunk_size, n);
2820
2821 futures.push_back(pool.enqueue([&holders, init, op, offset, chunk_end,
2822 token = options.cancel_token]()
2823 {
2824 constexpr size_t N = sizeof...(Containers);
2825 auto iters = parallel_zip_detail::make_iterators_at(
2826 offset, holders, std::make_index_sequence<N>{});
2827
2828 // First element
2829 parallel_detail::throw_if_parallel_canceled(token);
2830 auto first_tuple = parallel_zip_detail::deref_all_iters(
2831 iters, std::make_index_sequence<N>{});
2832 T local = std::apply([&op, &init](auto &&... args)
2833 {
2834 return op(init, std::forward<decltype(args)>(args)
2835 ...);
2836 }, first_tuple);
2837 parallel_zip_detail::advance_all_iters(
2838 iters, std::make_index_sequence<N>{});
2839
2840 // Remaining elements
2841 for (size_t i = offset + 1; i < chunk_end; ++i)
2842 {
2843 parallel_detail::throw_if_parallel_canceled(token);
2844 auto tuple = parallel_zip_detail::deref_all_iters(
2845 iters, std::make_index_sequence<N>{});
2846 local = std::apply([&op, &local](auto &&... args)
2847 {
2848 return op(local,
2849 std::forward<decltype(args)>(args)
2850 ...);
2851 }, tuple);
2852 parallel_zip_detail::advance_all_iters(
2853 iters, std::make_index_sequence<N>{});
2854 }
2855
2856 return local;
2857 }));
2858
2859 offset = chunk_end;
2860 }
2861
2862 // Combine partial results using the combiner
2863 T result = futures[0].get();
2864 for (size_t i = 1; i < futures.size(); ++i)
2865 result = combiner(result, futures[i].get());
2866
2867 return result;
2868 }
2869
2898 template <typename Pred, typename... Containers>
2899 [[nodiscard]] bool pzip_all_n(ThreadPool & pool, Pred pred, const Containers &... cs)
2900 {
2902 options.pool = &pool;
2903 return pzip_all_n(pred, options, cs...);
2904 }
2905
2906 template <typename Pred, typename... Containers>
2907 [[nodiscard]] bool pzip_all_n(Pred pred, const ParallelOptions & options,
2908 const Containers &... cs)
2909 {
2910 static_assert(sizeof...(Containers) >= 2,
2911 "pzip_all requires at least 2 containers");
2912
2913 // Convert all containers to random access FIRST
2914 auto holders = std::make_tuple(
2916
2917 // Now get min size - O(1) because all holders have cached sizes
2918 const size_t n = parallel_zip_detail::min_holder_size(holders);
2919 if (n == 0)
2920 return true; // Vacuous truth
2921
2922 auto & pool = parallel_detail::selected_parallel_pool(options);
2923 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
2924 {
2925 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
2926 constexpr size_t N = sizeof...(Containers);
2927 auto iters = parallel_zip_detail::make_iterators_at(
2928 0, holders, std::make_index_sequence<N>{});
2929 for (size_t i = 0; i < n; ++i)
2930 {
2931 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
2932 auto tuple = parallel_zip_detail::deref_all_iters(iters, std::make_index_sequence<N>{});
2933 if (! std::apply(pred, tuple))
2934 return false;
2935 parallel_zip_detail::advance_all_iters(iters, std::make_index_sequence<N>{});
2936 }
2937 return true;
2938 }
2939
2940 const size_t chunk_size = parallel_detail::effective_parallel_chunk_size(
2941 n, pool, options, pool.num_threads());
2942
2943 std::atomic<bool> found_false{false};
2944 std::vector<std::future<void>> futures;
2945
2946 size_t offset = 0;
2947 while (offset < n)
2948 {
2949 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
2950 size_t chunk_end = std::min(offset + chunk_size, n);
2951
2952 futures.push_back(pool.enqueue([&holders, pred, &found_false, offset, chunk_end,
2953 token = options.cancel_token]()
2954 {
2955 if (found_false.load(std::memory_order_relaxed))
2956 return;
2957
2958 constexpr size_t N = sizeof...(Containers);
2959 auto iters = parallel_zip_detail::make_iterators_at(
2960 offset, holders, std::make_index_sequence<N>{});
2961
2962 for (size_t i = offset; i < chunk_end; ++i)
2963 {
2964 parallel_detail::throw_if_parallel_canceled(token);
2965 auto tuple = parallel_zip_detail::deref_all_iters(
2966 iters, std::make_index_sequence<N>{});
2967 if (! std::apply(pred, tuple))
2968 {
2969 found_false.store(true, std::memory_order_relaxed);
2970 return;
2971 }
2972 if (found_false.load(std::memory_order_relaxed))
2973 return;
2974 parallel_zip_detail::advance_all_iters(
2975 iters, std::make_index_sequence<N>{});
2976 }
2977 }));
2978
2979 offset = chunk_end;
2980 }
2981
2982 for (auto & f: futures)
2983 f.get();
2984
2985 return not found_false.load();
2986 }
2987
3016 template <typename Pred, typename... Containers>
3017 [[nodiscard]] bool pzip_exists_n(ThreadPool & pool, Pred pred, const Containers &... cs)
3018 {
3020 options.pool = &pool;
3021 return pzip_exists_n(pred, options, cs...);
3022 }
3023
3024 template <typename Pred, typename... Containers>
3025 [[nodiscard]] bool pzip_exists_n(Pred pred, const ParallelOptions & options,
3026 const Containers &... cs)
3027 {
3028 static_assert(sizeof...(Containers) >= 2,
3029 "pzip_exists requires at least 2 containers");
3030
3031 // Convert all containers to random access FIRST
3032 auto holders = std::make_tuple(parallel_zip_detail::ContainerHolder<Containers>(cs)...);
3033
3034 // Now get min size - O(1) because all holders have cached sizes
3035 const size_t n = parallel_zip_detail::min_holder_size(holders);
3036 if (n == 0)
3037 return false;
3038
3039 auto & pool = parallel_detail::selected_parallel_pool(options);
3040 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
3041 {
3042 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
3043 constexpr size_t N = sizeof...(Containers);
3044 auto iters = parallel_zip_detail::make_iterators_at(
3045 0, holders, std::make_index_sequence<N>{});
3046 for (size_t i = 0; i < n; ++i)
3047 {
3048 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
3049 auto tuple = parallel_zip_detail::deref_all_iters(iters, std::make_index_sequence<N>{});
3050 if (std::apply(pred, tuple))
3051 return true;
3052 parallel_zip_detail::advance_all_iters(iters, std::make_index_sequence<N>{});
3053 }
3054 return false;
3055 }
3056
3057 const size_t chunk_size = parallel_detail::effective_parallel_chunk_size(
3058 n, pool, options, pool.num_threads());
3059
3060 std::atomic<bool> found{false};
3061 std::vector<std::future<void>> futures;
3062
3063 size_t offset = 0;
3064 while (offset < n)
3065 {
3066 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
3067 size_t chunk_end = std::min(offset + chunk_size, n);
3068
3069 futures.push_back(pool.enqueue([&holders, pred, &found, offset, chunk_end,
3070 token = options.cancel_token]()
3071 {
3072 if (found.load(std::memory_order_relaxed))
3073 return;
3074
3075 constexpr size_t N = sizeof...(Containers);
3076 auto iters = parallel_zip_detail::make_iterators_at(
3077 offset, holders, std::make_index_sequence<N>{});
3078
3079 for (size_t i = offset; i < chunk_end; ++i)
3080 {
3081 parallel_detail::throw_if_parallel_canceled(token);
3082 auto tuple = parallel_zip_detail::deref_all_iters(
3083 iters, std::make_index_sequence<N>{});
3084 if (std::apply(pred, tuple))
3085 {
3086 found.store(true, std::memory_order_relaxed);
3087 return;
3088 }
3089 if (found.load(std::memory_order_relaxed))
3090 return;
3091 parallel_zip_detail::advance_all_iters(
3092 iters, std::make_index_sequence<N>{});
3093 }
3094 }));
3095
3096 offset = chunk_end;
3097 }
3098
3099 for (auto & f: futures)
3100 f.get();
3101
3102 return found.load();
3103 }
3104
3120 template <typename Pred, typename... Containers>
3121 [[nodiscard]] size_t pzip_count_if_n(ThreadPool & pool, Pred pred,
3122 const Containers &... cs)
3123 {
3125 options.pool = &pool;
3126 return pzip_count_if_n(pred, options, cs...);
3127 }
3128
3129 template <typename Pred, typename... Containers>
3130 [[nodiscard]] size_t pzip_count_if_n(Pred pred, const ParallelOptions & options,
3131 const Containers &... cs)
3132 {
3133 static_assert(sizeof...(Containers) >= 2,
3134 "pzip_count_if requires at least 2 containers");
3135
3136 // Convert all containers to random access FIRST
3137 auto holders = std::make_tuple(parallel_zip_detail::ContainerHolder<Containers>(cs)...);
3138
3139 // Now get min size - O(1) because all holders have cached sizes
3140 const size_t n = parallel_zip_detail::min_holder_size(holders);
3141 if (n == 0)
3142 return 0;
3143
3144 auto & pool = parallel_detail::selected_parallel_pool(options);
3145 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
3146 {
3147 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
3148 constexpr size_t N = sizeof...(Containers);
3149 auto iters = parallel_zip_detail::make_iterators_at(
3150 0, holders, std::make_index_sequence<N>{});
3151 size_t count = 0;
3152 for (size_t i = 0; i < n; ++i)
3153 {
3154 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
3155 auto tuple = parallel_zip_detail::deref_all_iters(iters, std::make_index_sequence<N>{});
3156 if (std::apply(pred, tuple))
3157 ++count;
3158 parallel_zip_detail::advance_all_iters(iters, std::make_index_sequence<N>{});
3159 }
3160 return count;
3161 }
3162
3163 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool, options,
3164 pool.num_threads());
3165
3166 std::vector<std::future<size_t>> futures;
3167
3168 size_t offset = 0;
3169 while (offset < n)
3170 {
3171 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
3172 size_t chunk_end = std::min(offset + chunk_size, n);
3173
3174 futures.push_back(pool.enqueue([&holders, pred, offset, chunk_end,
3175 token = options.cancel_token]()
3176 {
3177 constexpr size_t N = sizeof...(Containers);
3178 auto iters = parallel_zip_detail::make_iterators_at(
3179 offset, holders, std::make_index_sequence<N>{});
3180
3181 size_t count = 0;
3182 for (size_t i = offset; i < chunk_end; ++i)
3183 {
3184 parallel_detail::throw_if_parallel_canceled(token);
3185 auto tuple = parallel_zip_detail::deref_all_iters(
3186 iters, std::make_index_sequence<N>{});
3187 if (std::apply(pred, tuple))
3188 ++count;
3189 parallel_zip_detail::advance_all_iters(
3190 iters, std::make_index_sequence<N>{});
3191 }
3192 return count;
3193 }));
3194
3195 offset = chunk_end;
3196 }
3197
3198 size_t total = 0;
3199 for (auto & f: futures)
3200 total += f.get();
3201
3202 return total;
3203 }
3204
3205 // =============================================================================
3206 // Parallel Enumerate
3207 // =============================================================================
3208
3234 template <typename Container, typename Op>
3236 size_t chunk_size = 0)
3237 {
3239 options.pool = &pool;
3240 options.chunk_size = chunk_size;
3242 }
3243
3244 template <typename Container, typename Op>
3246 const ParallelOptions & options = {})
3247 {
3248 const size_t n = std::distance(std::begin(c), std::end(c));
3249 if (n == 0)
3250 return;
3251
3252 auto & pool = parallel_detail::selected_parallel_pool(options);
3253 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
3254 {
3255 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
3256 auto it = std::begin(c);
3257 for (size_t i = 0; i < n; ++i, ++it)
3258 {
3259 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
3260 op(i, *it);
3261 }
3262 return;
3263 }
3264
3265 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool, options,
3266 pool.num_threads());
3267
3268 std::vector<std::future<void>> futures;
3269
3270 size_t offset = 0;
3271 while (offset < n)
3272 {
3273 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
3274 size_t chunk_end = std::min(offset + chunk_size, n);
3275
3276 futures.push_back(pool.enqueue([&c, op, offset, chunk_end,
3277 token = options.cancel_token]()
3278 {
3279 auto it = std::begin(c);
3280 std::advance(it, offset);
3281 for (size_t i = offset; i < chunk_end; ++i, ++it)
3282 {
3283 parallel_detail::throw_if_parallel_canceled(token);
3284 op(i, *it);
3285 }
3286 }));
3287
3288 offset = chunk_end;
3289 }
3290
3291 for (auto & f: futures)
3292 f.get();
3293 }
3294
3307 template <typename Container, typename Op>
3308 void penumerate_for_each(ThreadPool & pool, const Container & c, Op op,
3309 size_t chunk_size = 0)
3310 {
3312 options.pool = &pool;
3313 options.chunk_size = chunk_size;
3315 }
3316
3317 template <typename Container, typename Op>
3318 void penumerate_for_each(const Container & c, Op op,
3319 const ParallelOptions & options = {})
3320 {
3321 const size_t n = std::distance(std::begin(c), std::end(c));
3322 if (n == 0)
3323 return;
3324
3325 auto & pool = parallel_detail::selected_parallel_pool(options);
3326 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
3327 {
3328 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
3329 auto it = std::begin(c);
3330 for (size_t i = 0; i < n; ++i, ++it)
3331 {
3332 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
3333 op(i, *it);
3334 }
3335 return;
3336 }
3337
3338 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool, options,
3339 pool.num_threads());
3340
3341 auto data_holder = parallel_detail::ensure_random_access(c);
3342 const auto & data = parallel_detail::deref(data_holder);
3343
3344 std::vector<std::future<void>> futures;
3345
3346 size_t offset = 0;
3347 while (offset < n)
3348 {
3349 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
3350 size_t chunk_end = std::min(offset + chunk_size, n);
3351
3352 futures.push_back(pool.enqueue([&data, op, offset, chunk_end,
3353 token = options.cancel_token]()
3354 {
3355 auto it = std::begin(data);
3356 std::advance(it, offset);
3357 for (size_t i = offset; i < chunk_end; ++i, ++it)
3358 {
3359 parallel_detail::throw_if_parallel_canceled(token);
3360 op(i, *it);
3361 }
3362 }));
3363
3364 offset = chunk_end;
3365 }
3366
3367 for (auto & f: futures)
3368 f.get();
3369 }
3370
3399 template <typename Container, typename Op>
3400 [[nodiscard]] auto penumerate_maps(ThreadPool & pool, const Container & c, Op op,
3401 size_t chunk_size = 0)
3402 {
3404 options.pool = &pool;
3405 options.chunk_size = chunk_size;
3406 return penumerate_maps(c, op, options);
3407 }
3408
3409 template <typename Container, typename Op>
3410 [[nodiscard]] auto penumerate_maps(const Container & c, Op op,
3411 const ParallelOptions & options = {})
3412 {
3413 using T = std::decay_t<decltype(*std::begin(c))>;
3414 using ResultT = std::invoke_result_t<Op, size_t, const T &>;
3415
3416 const size_t n = std::distance(std::begin(c), std::end(c));
3417 if (n == 0)
3418 return std::vector<ResultT>{};
3419
3420 auto & pool = parallel_detail::selected_parallel_pool(options);
3421 if (parallel_detail::use_sequential_parallel_path(n, pool, options))
3422 {
3423 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
3424 std::vector<ResultT> result;
3425 result.reserve(n);
3426 auto it = std::begin(c);
3427 for (size_t i = 0; i < n; ++i, ++it)
3428 {
3429 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
3430 result.push_back(op(i, *it));
3431 }
3432 return result;
3433 }
3434
3435 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool, options,
3436 pool.num_threads());
3437
3438 auto data_holder = parallel_detail::ensure_random_access(c);
3439 const auto & data = parallel_detail::deref(data_holder);
3440
3441 std::vector<std::optional<ResultT>> slots(n);
3442 std::vector<std::future<void>> futures;
3443
3444 size_t offset = 0;
3445 while (offset < n)
3446 {
3447 parallel_detail::throw_if_parallel_canceled(options.cancel_token);
3448 size_t chunk_end = std::min(offset + chunk_size, n);
3449
3450 futures.push_back(pool.enqueue([&slots, &data, op, offset, chunk_end,
3451 token = options.cancel_token]()
3452 {
3453 auto it = std::begin(data);
3454 std::advance(it, offset);
3455 for (size_t i = offset; i < chunk_end; ++i, ++it)
3456 {
3457 parallel_detail::throw_if_parallel_canceled(token);
3458 slots[i] = op(i, *it);
3459 }
3460 }));
3461 offset = chunk_end;
3462 }
3463
3464 for (auto & f: futures)
3465 f.get();
3466
3467 std::vector<ResultT> result;
3468 result.reserve(n);
3469 for (auto & slot: slots)
3470 result.push_back(std::move(*slot));
3471 return result;
3472
3473 }
3474
3475 // =============================================================================
3476 // Convenience: Default Pool Variants
3477 // =============================================================================
3478
3486 {
3487 return default_pool();
3488 }
3489
3490 // Convenience macros for using default pool (optional)
3491#ifdef AH_PARALLEL_USE_DEFAULT_POOL
3492
3493#define PMAP(c, op) pmaps(parallel_default_pool(), c, op)
3494#define PFILTER(c, pred) pfilter(parallel_default_pool(), c, pred)
3495#define PFOLD(c, init, op) pfoldl(parallel_default_pool(), c, init, op)
3496#define PFOR_EACH(c, op) pfor_each(parallel_default_pool(), c, op)
3497#define PALL(c, pred) pall(parallel_default_pool(), c, pred)
3498#define PEXISTS(c, pred) pexists(parallel_default_pool(), c, pred)
3499#define PSUM(c) psum(parallel_default_pool(), c)
3500
3501#endif // AH_PARALLEL_USE_DEFAULT_POOL
3502} // namespace Aleph
3503
3504#endif // AH_PARALLEL_H
Read-only cooperative cancellation token.
void throw_if_cancellation_requested() const
Throw operation_canceled if cancellation was requested.
A reusable thread pool for efficient parallel task execution.
size_t num_threads() const noexcept
Get the number of worker threads.
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task for execution and get a future for the result.
#define N
Definition fib.C:294
int cmp(const __gmp_expr< T, U > &expr1, const __gmp_expr< V, W > &expr2)
Definition gmpfrxx.h:4118
Freq_Node * pred
Predecessor node in level-order traversal.
const long double offset[]
Offset values indexed by symbol string length (bounded by MAX_OFFSET_INDEX)
decltype(auto) deref(T &&ptr)
Get reference from pointer or unique_ptr.
bool use_sequential_parallel_path(const size_t n, const ThreadPool &pool, const ParallelOptions &options) noexcept
void throw_if_parallel_canceled(const CancellationToken &token)
size_t chunk_count(const size_t n, const size_t chunk_size) noexcept
ThreadPool & selected_parallel_pool(const ParallelOptions &options)
auto ensure_random_access(const Container &c)
For containers with random access, just return a pointer to it For non-random access,...
size_t effective_parallel_chunk_size(const size_t n, const ThreadPool &pool, const ParallelOptions &options, const size_t min_chunk=64)
size_t chunk_size(const size_t n, const size_t num_threads, const size_t min_chunk=64)
Calculate optimal chunk size based on data size and thread count.
constexpr bool has_random_access()
Check if container supports random access.
chunk_bounds bounds_for_chunk(const size_t idx, const size_t n, const size_t chunk_size) noexcept
size_t min_holder_size_impl(const std::tuple< Holders... > &holders, std::index_sequence< Is... >)
Get minimum size from tuple of holders - always O(1) per holder.
void advance_all_iters(std::tuple< Iters... > &iters, std::index_sequence< Is... >)
Advance all iterators in tuple.
auto make_iterators_at(size_t offset, const std::tuple< Holders... > &holders, std::index_sequence< Is... >)
Create tuple of iterators at given offset.
size_t min_holder_size(const std::tuple< Holders... > &holders)
auto deref_all_iters(const std::tuple< Iters... > &iters, std::index_sequence< Is... >)
Dereference all iterators and make tuple.
Main namespace for Aleph-w library functions.
Definition ah-arena.H:89
and
Check uniqueness with explicit hash + equality functors.
bool pall(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel all predicate (short-circuit).
bool pnone(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel none predicate.
auto pmaps(ThreadPool &pool, const Container &c, Op op, size_t chunk_size=0)
Parallel map operation.
auto pmin(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel minimum element.
T pzip_foldl_n(ThreadPool &pool, T init, Op op, Combiner combiner, const Containers &... cs)
Parallel fold/reduce over N zipped containers (variadic).
ThreadPool & default_pool()
Return the default shared thread pool instance.
T pzip_foldl(ThreadPool &pool, const Container1 &c1, const Container2 &c2, T init, Op op, size_t chunk_size=0)
Parallel zip + fold.
void pzip_for_each(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Op op, size_t chunk_size=0)
Parallel zip + for_each.
size_t size(Node *root) noexcept
ThreadPool & parallel_default_pool()
Global default pool for parallel operations.
auto penumerate_maps(ThreadPool &pool, const Container &c, Op op, size_t chunk_size=0)
Parallel enumerate with map.
void penumerate_for_each(ThreadPool &pool, Container &c, Op op, size_t chunk_size=0)
Parallel for_each with index (enumerate).
bool pzip_all_n(ThreadPool &pool, Pred pred, const Containers &... cs)
Parallel all predicate over N zipped containers (variadic).
size_t pcount_if(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel count_if operation.
auto pexclusive_scan(ThreadPool &pool, const Container &c, T init, BinaryOp op, size_t chunk_size=0)
Parallel exclusive scan over a container.
std::optional< size_t > pfind(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel find operation (returns index).
bool pzip_exists_n(ThreadPool &pool, Pred pred, const Containers &... cs)
Parallel exists predicate over N zipped containers (variadic).
Divide_Conquer_DP_Result< Cost > divide_and_conquer_partition_dp(const size_t groups, const size_t n, Transition_Cost_Fn transition_cost, const Cost inf=dp_optimization_detail::default_inf< Cost >())
Optimize partition DP using divide-and-conquer optimization.
std::decay_t< typename HeadC::Item_Type > T
Definition ah-zip.H:105
void psort(ThreadPool &pool, Container &c, Compare cmp=Compare{}, const size_t min_parallel_size=1024)
Parallel sort (in-place).
void pfor_each(ThreadPool &pool, Container &c, Op op, size_t chunk_size=0)
Parallel for_each operation.
auto pfilter(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel filter operation.
auto ppartition(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel partition (stable).
auto pmerge(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Compare comp=Compare{}, size_t chunk_size=0)
Parallel merge of two sorted containers.
T pproduct(ThreadPool &pool, const Container &c, T init=T{1}, size_t chunk_size=0)
Parallel product of elements.
auto pmax(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel maximum element.
void pzip_for_each_n(ThreadPool &pool, Op op, const Containers &... cs)
Parallel for_each over N zipped containers (variadic).
auto pzip_maps_n(ThreadPool &pool, Op op, const Containers &... cs)
Parallel map over N zipped containers (variadic).
auto pscan(ThreadPool &pool, const Container &c, BinaryOp op, size_t chunk_size=0)
Parallel inclusive scan over a container.
T pfoldl(ThreadPool &pool, const Container &c, T init, BinaryOp op, size_t chunk_size=0)
Parallel left fold (reduce).
auto pfind_value(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel find with value return.
auto pzip_maps(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Op op, size_t chunk_size=0)
Parallel zip + map.
T psum(ThreadPool &pool, const Container &c, T init=T{}, size_t chunk_size=0)
Parallel sum of elements.
auto pminmax(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel min and max elements.
size_t pzip_count_if_n(ThreadPool &pool, Pred pred, const Containers &... cs)
Parallel count over N zipped containers (variadic).
bool pexists(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel exists predicate (short-circuit).
Itor::difference_type count(const Itor &beg, const Itor &end, const T &value)
Count elements equal to a value.
Definition ahAlgo.H:127
STL namespace.
static struct argp_option options[]
Definition ntreepic.C:1886
Common configuration object for parallel algorithms.
ThreadPool * pool
Executor to use (nullptr = default_pool()).
Holder for converted containers (either pointer or unique_ptr to vector).
size_t cached_size
Cached size for O(1) access.
std::decay_t< decltype(*std::begin(std::declval< Container & >()))> value_type
std::conditional_t< parallel_detail::has_random_access< Container >(), const Container *, std::unique_ptr< std::vector< value_type > > > holder_type
size_t size() const noexcept
Size is always O(1) - either from random access or from cached vector size.
Filter_Iterator< DynList< int >, DynList< int >::Iterator, Par > It
Definition test_htlist.C:56
A modern, efficient thread pool for parallel task execution.