120#include <type_traits>
131 namespace parallel_detail
159 if (n == 0)
return 1;
161 const size_t chunks = num_threads * 4;
167 template <
typename Container>
170 using It =
decltype(std::begin(std::declval<Container &>()));
171 return std::is_base_of_v<std::random_access_iterator_tag,
172 typename std::iterator_traits<It>::iterator_category>;
177 template <
typename Container>
183 return std::make_unique<std::vector<typename Container::value_type>>(std::begin(c), std::end(c));
187 template <
typename T>
190 if constexpr (std::is_pointer_v<std::decay_t<T>>)
218 if (
options.cancel_token.stop_requested())
224 return pool.num_threads() <= 1;
267 template <
typename ResultT =
void,
typename Container,
typename Op>
269 size_t chunk_size = 0)
273 options.chunk_size = chunk_size;
277 template <
typename ResultT =
void,
typename Container,
typename Op>
282 using InputT = std::decay_t<
decltype(*std::begin(c))>;
284 std::is_void_v<ResultT>,
285 std::invoke_result_t<Op, const InputT &>,
288 const size_t n = std::distance(std::begin(c), std::end(c));
290 return std::vector<ActualResultT>{};
294 std::vector<ActualResultT> result;
296 for (
auto it = std::begin(c); it != std::end(c); ++it)
299 result.push_back(op(*it));
312 std::vector<std::optional<ActualResultT>> slots(n);
313 std::vector<std::future<void>>
futures;
322 token =
options.cancel_token]()
324 auto in_it = std::begin(data);
325 std::advance(in_it, offset);
326 for (size_t i = offset; i < chunk_end; ++i, ++in_it)
328 parallel_detail::throw_if_parallel_canceled(token);
329 slots[i] = op(*in_it);
337 std::exception_ptr
ep;
339 try { f.get(); }
catch (...) {
if (
not ep)
ep = std::current_exception(); }
341 std::rethrow_exception(
ep);
344 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
346 std::vector<ActualResultT> result;
348 for (
auto & slot : slots)
349 result.push_back(
std::move(*slot));
385 template <
typename Container,
typename Pred>
387 size_t chunk_size = 0)
391 options.chunk_size = chunk_size;
395 template <
typename Container,
typename Pred>
399 ThreadPool & pool = parallel_detail::selected_parallel_pool(
options);
400 using T = std::decay_t<
decltype(*std::begin(c))>;
402 const size_t n = std::distance(std::begin(c), std::end(c));
404 return std::vector<T>{};
406 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
408 std::vector<T> result;
409 for (
auto it = std::begin(c); it != std::end(c); ++it)
411 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
413 result.push_back(*it);
415 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
420 parallel_detail::effective_parallel_chunk_size(n, pool,
options);
422 auto data_holder = parallel_detail::ensure_random_access(c);
423 const auto & data = parallel_detail::deref(data_holder);
425 std::vector<std::future<std::vector<T>>> futures;
426 const size_t num_chunks = parallel_detail::chunk_count(n, chunk_size);
427 futures.reserve(num_chunks);
429 for (
size_t chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx)
431 const auto bounds = parallel_detail::bounds_for_chunk(chunk_idx, n, chunk_size);
433 futures.push_back(pool.enqueue([&data,
pred,
435 chunk_end = bounds.end,
436 token =
options.cancel_token]()
438 std::vector<T> chunk_result;
439 auto it = std::begin(data);
440 std::advance(it, offset);
441 for (size_t i = offset; i < chunk_end; ++i, ++it)
443 parallel_detail::throw_if_parallel_canceled(token);
445 chunk_result.push_back(*it);
451 std::vector<T> result;
452 for (
auto & f: futures)
454 auto chunk_result = f.get();
455 result.insert(result.end(),
456 std::make_move_iterator(chunk_result.begin()),
457 std::make_move_iterator(chunk_result.end()));
460 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
506 template <
typename T,
typename Container,
typename BinaryOp>
508 size_t chunk_size = 0)
512 options.chunk_size = chunk_size;
516 template <
typename T,
typename Container,
typename BinaryOp>
520 ThreadPool & pool = parallel_detail::selected_parallel_pool(
options);
521 const size_t n = std::distance(std::begin(c), std::end(c));
525 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
528 for (
auto it = std::begin(c); it != std::end(c); ++it)
530 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
531 result = op(result, *it);
533 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
538 parallel_detail::effective_parallel_chunk_size(n, pool,
options);
540 auto data_holder = parallel_detail::ensure_random_access(c);
541 const auto & data = parallel_detail::deref(data_holder);
543 std::vector<std::future<T>> futures;
548 size_t chunk_end = std::min(
offset + chunk_size, n);
550 futures.push_back(pool.enqueue([&data, op,
offset, chunk_end,
551 token =
options.cancel_token]()
553 auto it = std::begin(data);
554 std::advance(it, offset);
555 parallel_detail::throw_if_parallel_canceled(token);
557 for (size_t i = offset + 1; i < chunk_end; ++i, ++it)
559 parallel_detail::throw_if_parallel_canceled(token);
560 local = op(local, *it);
570 for (
auto & f: futures)
571 result = op(result, f.get());
573 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
612 template <
typename Container,
typename Op>
617 options.chunk_size = chunk_size;
621 template <
typename Container,
typename Op>
624 ThreadPool & pool = parallel_detail::selected_parallel_pool(
options);
625 const size_t n = std::distance(std::begin(c), std::end(c));
629 const size_t chunk_size =
630 parallel_detail::effective_parallel_chunk_size(n, pool,
options);
632 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
634 for (
auto it = std::begin(c); it != std::end(c); ++it)
636 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
639 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
643 std::vector<std::future<void>> futures;
648 size_t chunk_end = std::min(
offset + chunk_size, n);
650 futures.push_back(pool.enqueue([&c, op,
offset, chunk_end,
651 token =
options.cancel_token]()
653 auto it = std::begin(c);
654 std::advance(it, offset);
655 for (size_t i = offset; i < chunk_end; ++i, ++it)
657 parallel_detail::throw_if_parallel_canceled(token);
665 for (
auto & f: futures)
668 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
683 template <
typename Container,
typename Op>
688 options.chunk_size = chunk_size;
692 template <
typename Container,
typename Op>
695 ThreadPool & pool = parallel_detail::selected_parallel_pool(
options);
696 const size_t n = std::distance(std::begin(c), std::end(c));
700 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
702 for (
auto it = std::begin(c); it != std::end(c); ++it)
704 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
707 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
712 parallel_detail::effective_parallel_chunk_size(n, pool,
options);
714 auto data_holder = parallel_detail::ensure_random_access(c);
715 const auto & data = parallel_detail::deref(data_holder);
717 std::vector<std::future<void>> futures;
722 size_t chunk_end = std::min(
offset + chunk_size, n);
724 futures.push_back(pool.enqueue([&data, op,
offset, chunk_end,
725 token =
options.cancel_token]()
727 auto it = std::begin(data);
728 std::advance(it, offset);
729 for (size_t i = offset; i < chunk_end; ++i, ++it)
731 parallel_detail::throw_if_parallel_canceled(token);
739 for (
auto & f: futures)
742 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
771 template <
typename Container,
typename Pred>
773 size_t chunk_size = 0)
777 options.chunk_size = chunk_size;
781 template <
typename Container,
typename Pred>
785 ThreadPool & pool = parallel_detail::selected_parallel_pool(
options);
786 const size_t n = std::distance(std::begin(c), std::end(c));
790 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
792 for (
auto it = std::begin(c); it != std::end(c); ++it)
794 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
798 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
803 parallel_detail::effective_parallel_chunk_size(n, pool,
options);
805 auto data_holder = parallel_detail::ensure_random_access(c);
806 const auto & data = parallel_detail::deref(data_holder);
808 std::atomic<bool> found_false{
false};
809 std::vector<std::future<void>> futures;
814 size_t chunk_end = std::min(
offset + chunk_size, n);
816 futures.push_back(pool.enqueue([&data,
pred, &found_false,
offset, chunk_end,
817 token =
options.cancel_token]()
819 parallel_detail::throw_if_parallel_canceled(token);
820 if (found_false.load(std::memory_order_relaxed))
823 auto it = std::begin(data);
824 std::advance(it, offset);
825 for (size_t i = offset; i < chunk_end; ++i, ++it)
829 found_false.store(true, std::memory_order_relaxed);
832 parallel_detail::throw_if_parallel_canceled(token);
833 if (found_false.load(std::memory_order_relaxed))
841 for (
auto & f: futures)
844 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
845 return not found_false.load();
869 template <
typename Container,
typename Pred>
871 size_t chunk_size = 0)
875 options.chunk_size = chunk_size;
879 template <
typename Container,
typename Pred>
883 ThreadPool & pool = parallel_detail::selected_parallel_pool(
options);
884 const size_t n = std::distance(std::begin(c), std::end(c));
888 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
890 for (
auto it = std::begin(c); it != std::end(c); ++it)
892 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
896 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
901 parallel_detail::effective_parallel_chunk_size(n, pool,
options);
903 auto data_holder = parallel_detail::ensure_random_access(c);
904 const auto & data = parallel_detail::deref(data_holder);
906 std::atomic<bool> found{
false};
907 std::vector<std::future<void>> futures;
912 size_t chunk_end = std::min(
offset + chunk_size, n);
914 futures.push_back(pool.enqueue([&data,
pred, &found,
offset, chunk_end,
915 token =
options.cancel_token]()
917 parallel_detail::throw_if_parallel_canceled(token);
918 if (found.load(std::memory_order_relaxed))
921 auto it = std::begin(data);
922 std::advance(it, offset);
923 for (size_t i = offset; i < chunk_end; ++i, ++it)
927 found.store(true, std::memory_order_relaxed);
930 parallel_detail::throw_if_parallel_canceled(token);
931 if (found.load(std::memory_order_relaxed))
939 for (
auto & f: futures)
942 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
959 template <
typename Container,
typename Pred>
961 size_t chunk_size = 0)
966 template <
typename Container,
typename Pred>
998 template <
typename Container,
typename Pred>
1000 size_t chunk_size = 0)
1004 options.chunk_size = chunk_size;
1008 template <
typename Container,
typename Pred>
1012 ThreadPool & pool = parallel_detail::selected_parallel_pool(
options);
1013 const size_t n = std::distance(std::begin(c), std::end(c));
1017 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
1020 for (
auto it = std::begin(c); it != std::end(c); ++it)
1022 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1026 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1031 parallel_detail::effective_parallel_chunk_size(n, pool,
options);
1033 auto data_holder = parallel_detail::ensure_random_access(c);
1034 const auto & data = parallel_detail::deref(data_holder);
1036 std::vector<std::future<size_t>> futures;
1041 size_t chunk_end = std::min(
offset + chunk_size, n);
1043 futures.push_back(pool.enqueue([&data,
pred,
offset, chunk_end,
1044 token =
options.cancel_token]()
1047 auto it = std::begin(data);
1048 std::advance(it, offset);
1049 for (size_t i = offset; i < chunk_end; ++i, ++it)
1051 parallel_detail::throw_if_parallel_canceled(token);
1062 for (
auto & f: futures)
1065 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1098 template <
typename Container,
typename Pred>
1100 Pred
pred,
size_t chunk_size = 0)
1104 options.chunk_size = chunk_size;
1108 template <
typename Container,
typename Pred>
1113 ThreadPool & pool = parallel_detail::selected_parallel_pool(
options);
1114 const size_t n = std::distance(std::begin(c), std::end(c));
1116 return std::nullopt;
1118 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
1121 for (
auto it = std::begin(c); it != std::end(c); ++it, ++idx)
1123 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1127 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1128 return std::nullopt;
1132 parallel_detail::effective_parallel_chunk_size(n, pool,
options);
1134 auto data_holder = parallel_detail::ensure_random_access(c);
1135 const auto & data = parallel_detail::deref(data_holder);
1138 std::atomic<size_t> min_index{n};
1139 std::vector<std::future<void>> futures;
1144 size_t chunk_end = std::min(
offset + chunk_size, n);
1146 futures.push_back(pool.enqueue([&data,
pred, &min_index,
offset, chunk_end,
1147 token =
options.cancel_token]()
1149 parallel_detail::throw_if_parallel_canceled(token);
1151 if (min_index.load(std::memory_order_relaxed) <= offset)
1154 auto it = std::begin(data);
1155 std::advance(it, offset);
1156 for (size_t i = offset; i < chunk_end; ++i, ++it)
1159 if (min_index.load(std::memory_order_relaxed) <= i)
1162 parallel_detail::throw_if_parallel_canceled(token);
1166 size_t expected = min_index.load(std::memory_order_relaxed);
1167 while (i < expected and
1168 not min_index.compare_exchange_weak(expected, i,
1169 std::memory_order_relaxed));
1178 for (
auto & f: futures)
1181 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1182 if (
size_t result = min_index.load(); result < n)
1184 return std::nullopt;
1210 template <
typename Container,
typename Pred>
1212 Pred
pred,
size_t chunk_size = 0)
1216 options.chunk_size = chunk_size;
1220 template <
typename Container,
typename Pred>
1225 using T = std::decay_t<
decltype(*std::begin(c))>;
1229 return std::optional<T>{std::nullopt};
1231 auto it = std::begin(c);
1232 std::advance(it, *idx);
1233 return std::optional<T>{*it};
1260 typename T = std::decay_t<decltype(*std::begin(std::declval<Container>()))>>
1262 size_t chunk_size = 0)
1264 return pfoldl(pool, c, init, std::plus<T>{},
chunk_size);
1268 typename T = std::decay_t<decltype(*std::begin(std::declval<Container>()))>>
1286 typename T = std::decay_t<decltype(*std::begin(std::declval<Container>()))>>
1288 size_t chunk_size = 0)
1290 return pfoldl(pool, c, init, std::multiplies<T>{},
chunk_size);
1294 typename T = std::decay_t<decltype(*std::begin(std::declval<Container>()))>>
1310 template <
typename Container>
1315 options.chunk_size = chunk_size;
1319 template <
typename Container>
1322 ThreadPool & pool = parallel_detail::selected_parallel_pool(
options);
1323 using T = std::decay_t<
decltype(*std::begin(c))>;
1325 const size_t n = std::distance(std::begin(c), std::end(c));
1327 return std::optional<T>{std::nullopt};
1329 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
1331 auto it = std::begin(c);
1333 for (; it != std::end(c); ++it)
1335 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1339 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1340 return std::optional<T>{result};
1344 parallel_detail::effective_parallel_chunk_size(n, pool,
options);
1346 auto data_holder = parallel_detail::ensure_random_access(c);
1347 const auto & data = parallel_detail::deref(data_holder);
1349 std::vector<std::future<T>> futures;
1354 size_t chunk_end = std::min(
offset + chunk_size, n);
1356 futures.push_back(pool.enqueue([&data,
offset, chunk_end,
1357 token =
options.cancel_token]()
1359 auto it = std::begin(data);
1360 std::advance(it, offset);
1361 parallel_detail::throw_if_parallel_canceled(token);
1362 T local_min = *it++;
1363 for (size_t i = offset + 1; i < chunk_end; ++i, ++it)
1365 parallel_detail::throw_if_parallel_canceled(token);
1366 if (*it < local_min)
1375 T result = futures[0].get();
1376 for (
size_t i = 1; i < futures.size(); ++i)
1378 T val = futures[i].get();
1383 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1384 return std::optional<T>{result};
1397 template <
typename Container>
1402 options.chunk_size = chunk_size;
1406 template <
typename Container>
1409 ThreadPool & pool = parallel_detail::selected_parallel_pool(
options);
1410 using T = std::decay_t<
decltype(*std::begin(c))>;
1412 const size_t n = std::distance(std::begin(c), std::end(c));
1414 return std::optional<T>{std::nullopt};
1416 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
1418 auto it = std::begin(c);
1420 for (; it != std::end(c); ++it)
1422 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1426 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1427 return std::optional<T>{result};
1431 parallel_detail::effective_parallel_chunk_size(n, pool,
options);
1433 auto data_holder = parallel_detail::ensure_random_access(c);
1434 const auto & data = parallel_detail::deref(data_holder);
1436 std::vector<std::future<T>> futures;
1441 size_t chunk_end = std::min(
offset + chunk_size, n);
1443 futures.push_back(pool.enqueue([&data,
offset, chunk_end,
1444 token =
options.cancel_token]()
1446 auto it = std::begin(data);
1447 std::advance(it, offset);
1448 parallel_detail::throw_if_parallel_canceled(token);
1449 T local_max = *it++;
1450 for (size_t i = offset + 1; i < chunk_end; ++i, ++it)
1452 parallel_detail::throw_if_parallel_canceled(token);
1453 if (*it > local_max)
1462 T result = futures[0].get();
1463 for (
size_t i = 1; i < futures.size(); ++i)
1465 T val = futures[i].get();
1470 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1471 return std::optional<T>{result};
1484 template <
typename Container>
1487 using T = std::decay_t<
decltype(*std::begin(c))>;
1489 const size_t n = std::distance(std::begin(c), std::end(c));
1491 return std::optional<std::pair<T, T>>{std::nullopt};
1493 if (chunk_size == 0)
1494 chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
1496 auto data_holder = parallel_detail::ensure_random_access(c);
1497 const auto & data = parallel_detail::deref(data_holder);
1499 std::vector<std::future<std::pair<T, T>>> futures;
1504 size_t chunk_end = std::min(
offset + chunk_size, n);
1508 auto it = std::begin(data);
1509 std::advance(it, offset);
1511 T local_max = *it++;
1512 for (size_t i = offset + 1; i < chunk_end; ++i, ++it)
1514 if (*it < local_min) local_min = *it;
1515 if (*it > local_max) local_max = *it;
1517 return std::make_pair(local_min, local_max);
1523 auto result = futures[0].get();
1524 for (
size_t i = 1; i < futures.size(); ++i)
1526 auto [mi, ma] = futures[i].get();
1527 if (mi < result.first) result.first = mi;
1528 if (ma > result.second) result.second = ma;
1531 return std::optional<std::pair<T, T>>{result};
1565 template <
typename Container,
typename Compare = std::less<>>
1567 const size_t min_parallel_size = 1024)
1571 options.min_size = min_parallel_size;
1575 template <
typename Container,
typename Compare = std::less<>>
1577 const ParallelOptions &
options = {})
1579 const size_t n = std::distance(std::begin(c), std::end(c));
1583 auto & pool = parallel_detail::selected_parallel_pool(
options);
1584 const size_t min_parallel_size =
options.min_size == 0 ? 1024 :
options.min_size;
1587 if (parallel_detail::use_sequential_parallel_path(n, pool,
options)
1588 or n <= min_parallel_size)
1590 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1591 std::sort(std::begin(c), std::end(c),
cmp);
1596 size_t num_chunks = std::min(pool.
num_threads() * 2, n / min_parallel_size);
1598 num_chunks = std::min(num_chunks,
options.max_tasks);
1599 num_chunks = std::max<size_t>(1, num_chunks);
1600 const size_t chunk_size = (n + num_chunks - 1) / num_chunks;
1603 std::vector<std::future<void>> futures;
1606 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1607 size_t end = std::min(i + chunk_size, n);
1608 auto begin_it = std::begin(c);
1609 std::advance(begin_it, i);
1610 auto end_it = std::begin(c);
1611 std::advance(end_it, end);
1613 futures.push_back(pool.
enqueue([begin_it, end_it,
cmp, token =
options.cancel_token]()
1615 parallel_detail::throw_if_parallel_canceled(token);
1616 std::sort(begin_it, end_it, cmp);
1620 for (
auto & f: futures)
1624 using T = std::decay_t<
decltype(*std::begin(c))>;
1625 std::vector<std::optional<T>> buffer(n);
1627 ParallelOptions merge_options =
options;
1628 merge_options.pool = &pool;
1630 for (
size_t width = chunk_size; width < n; width *= 2)
1632 for (
size_t i = 0; i < n; i += 2 * width)
1634 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1635 size_t mid = std::min(i + width, n);
1636 size_t end = std::min(i + 2 * width, n);
1640 auto begin_it = std::begin(c);
1641 std::advance(begin_it, i);
1642 auto mid_it = std::begin(c);
1643 std::advance(mid_it, mid);
1644 auto end_it = std::begin(c);
1645 std::advance(end_it, end);
1648 buffer.begin() + i,
cmp, merge_options);
1653 auto begin_it = std::begin(c);
1654 std::advance(begin_it, i);
1655 auto end_it = std::begin(c);
1656 std::advance(end_it, mid);
1657 std::copy(begin_it, end_it, buffer.begin() + i);
1662 auto it = std::begin(c);
1663 for (
size_t i = 0; i < n; ++i, ++it)
1664 *it = std::move(*buffer[i]);
1699 template <
typename Container1,
typename Container2,
typename Op>
1701 Op op,
size_t chunk_size = 0)
1705 options.chunk_size = chunk_size;
1709 template <
typename Container1,
typename Container2,
typename Op>
1713 const size_t n1 = std::distance(std::begin(c1), std::end(c1));
1714 const size_t n2 = std::distance(std::begin(c2), std::end(c2));
1715 const size_t n = std::min(n1, n2);
1720 auto & pool = parallel_detail::selected_parallel_pool(
options);
1721 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
1723 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1724 auto it1 = std::begin(c1);
1725 auto it2 = std::begin(c2);
1726 for (
size_t i = 0; i < n; ++i, ++it1, ++it2)
1728 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1734 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool,
options,
1737 auto h1 = parallel_detail::ensure_random_access(c1);
1738 auto h2 = parallel_detail::ensure_random_access(c2);
1739 const auto & d1 = parallel_detail::deref(h1);
1740 const auto & d2 = parallel_detail::deref(h2);
1742 std::vector<std::future<void>> futures;
1747 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1748 size_t chunk_end = std::min(
offset + chunk_size, n);
1750 futures.push_back(pool.
enqueue([&d1, &d2, op,
offset, chunk_end,
1751 token =
options.cancel_token]()
1753 auto it1 = std::begin(d1);
1754 auto it2 = std::begin(d2);
1755 std::advance(it1, offset);
1756 std::advance(it2, offset);
1757 for (size_t i = offset; i < chunk_end; ++i, ++it1, ++it2)
1759 parallel_detail::throw_if_parallel_canceled(token);
1767 for (
auto & f: futures)
1795 template <
typename Container1,
typename Container2,
typename Op>
1797 const Container2 & c2, Op op,
size_t chunk_size = 0)
1801 options.chunk_size = chunk_size;
1805 template <
typename Container1,
typename Container2,
typename Op>
1806 [[nodiscard]]
auto pzip_maps(
const Container1 & c1,
const Container2 & c2, Op op,
1809 using T1 = std::decay_t<
decltype(*std::begin(c1))>;
1810 using T2 = std::decay_t<
decltype(*std::begin(c2))>;
1811 using ResultT = std::invoke_result_t<Op, const T1 &, const T2 &>;
1813 const size_t n1 = std::distance(std::begin(c1), std::end(c1));
1814 const size_t n2 = std::distance(std::begin(c2), std::end(c2));
1815 const size_t n = std::min(n1, n2);
1818 return std::vector<ResultT>{};
1820 auto & pool = parallel_detail::selected_parallel_pool(
options);
1821 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
1823 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1824 std::vector<ResultT> result;
1826 auto it1 = std::begin(c1);
1827 auto it2 = std::begin(c2);
1828 for (
size_t i = 0; i < n; ++i, ++it1, ++it2)
1830 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1831 result.push_back(op(*it1, *it2));
1836 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool,
options,
1839 auto h1 = parallel_detail::ensure_random_access(c1);
1840 auto h2 = parallel_detail::ensure_random_access(c2);
1841 const auto & d1 = parallel_detail::deref(h1);
1842 const auto & d2 = parallel_detail::deref(h2);
1844 std::vector<std::optional<ResultT>> slots(n);
1845 std::vector<std::future<void>> futures;
1850 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1851 size_t chunk_end = std::min(
offset + chunk_size, n);
1853 futures.push_back(pool.
enqueue([&slots, &d1, &d2, op,
offset, chunk_end,
1854 token =
options.cancel_token]()
1856 auto it1 = std::begin(d1);
1857 auto it2 = std::begin(d2);
1858 std::advance(it1, offset);
1859 std::advance(it2, offset);
1860 for (size_t i = offset; i < chunk_end; ++i, ++it1, ++it2)
1862 parallel_detail::throw_if_parallel_canceled(token);
1863 slots[i] = op(*it1, *it2);
1870 for (
auto & f: futures)
1873 std::vector<ResultT> result;
1875 for (
auto & slot : slots)
1876 result.push_back(
std::move(*slot));
1909 template <
typename Container1,
typename Container2,
typename T,
typename Op>
1911 const Container2 & c2,
T init, Op op,
1912 size_t chunk_size = 0)
1916 options.chunk_size = chunk_size;
1920 template <
typename Container1,
typename Container2,
typename T,
typename Op>
1921 [[nodiscard]]
T pzip_foldl(
const Container1 & c1,
const Container2 & c2,
T init, Op op,
1924 const size_t n1 = std::distance(std::begin(c1), std::end(c1));
1925 const size_t n2 = std::distance(std::begin(c2), std::end(c2));
1926 const size_t n = std::min(n1, n2);
1931 auto & pool = parallel_detail::selected_parallel_pool(
options);
1932 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
1934 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1935 auto it1 = std::begin(c1);
1936 auto it2 = std::begin(c2);
1938 for (
size_t i = 0; i < n; ++i, ++it1, ++it2)
1940 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1941 result = op(result, *it1, *it2);
1946 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool,
options,
1949 auto h1 = parallel_detail::ensure_random_access(c1);
1950 auto h2 = parallel_detail::ensure_random_access(c2);
1951 const auto & d1 = parallel_detail::deref(h1);
1952 const auto & d2 = parallel_detail::deref(h2);
1954 std::vector<std::future<T>> futures;
1959 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
1960 size_t chunk_end = std::min(
offset + chunk_size, n);
1962 futures.push_back(pool.
enqueue([&d1, &d2, &init, op,
offset, chunk_end,
1963 token =
options.cancel_token]()
1965 auto it1 = std::begin(d1);
1966 auto it2 = std::begin(d2);
1967 std::advance(it1, offset);
1968 std::advance(it2, offset);
1970 parallel_detail::throw_if_parallel_canceled(token);
1971 T local = op(init, *it1++, *it2++);
1972 for (size_t i = offset + 1; i < chunk_end; ++i, ++it1, ++it2)
1974 parallel_detail::throw_if_parallel_canceled(token);
1975 local = op(local, *it1, *it2);
1985 T result = futures[0].get();
1986 for (
size_t i = 1; i < futures.size(); ++i)
1988 T val = futures[i].get();
1990 result = result + val - init;
2021 template <
typename Container,
typename Pred>
2023 size_t chunk_size = 0)
2027 options.chunk_size = chunk_size;
2031 template <
typename Container,
typename Pred>
2035 using T = std::decay_t<
decltype(*std::begin(c))>;
2036 ThreadPool & pool = parallel_detail::selected_parallel_pool(
options);
2038 const size_t n = std::distance(std::begin(c), std::end(c));
2040 return std::make_pair(std::vector<T>{}, std::vector<T>{});
2042 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
2044 std::vector<T> yes_result, no_result;
2045 for (
auto it = std::begin(c); it != std::end(c); ++it)
2047 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
2049 yes_result.push_back(*it);
2051 no_result.push_back(*it);
2053 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
2054 return std::make_pair(std::move(yes_result), std::move(no_result));
2058 parallel_detail::effective_parallel_chunk_size(n, pool,
options);
2060 auto data_holder = parallel_detail::ensure_random_access(c);
2061 const auto & data = parallel_detail::deref(data_holder);
2063 const size_t num_chunks = parallel_detail::chunk_count(n, chunk_size);
2064 std::vector<size_t> yes_counts(num_chunks, 0);
2065 std::vector<size_t> no_counts(num_chunks, 0);
2066 std::vector<std::future<void>> count_futures;
2067 count_futures.reserve(num_chunks);
2069 for (
size_t chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx)
2071 const auto bounds = parallel_detail::bounds_for_chunk(chunk_idx, n, chunk_size);
2073 count_futures.push_back(pool.enqueue([&data, &yes_counts, &no_counts,
pred,
2076 chunk_end = bounds.end,
2077 token =
options.cancel_token]()
2081 auto it = std::begin(data);
2082 std::advance(it, offset);
2083 for (size_t i = offset; i < chunk_end; ++i, ++it)
2085 parallel_detail::throw_if_parallel_canceled(token);
2091 yes_counts[chunk_idx] = yes;
2092 no_counts[chunk_idx] = no;
2096 for (
auto & f: count_futures)
2099 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
2101 std::vector<size_t> yes_offsets(num_chunks, 0);
2102 std::vector<size_t> no_offsets(num_chunks, 0);
2106 size_t{0}, std::plus<size_t>{},
options);
2108 size_t{0}, std::plus<size_t>{},
options);
2111 const size_t yes_total = num_chunks == 0 ? 0 : yes_offsets.back() + yes_counts.back();
2112 const size_t no_total = num_chunks == 0 ? 0 : no_offsets.back() + no_counts.back();
2114 std::vector<std::optional<T>> yes_slots(yes_total);
2115 std::vector<std::optional<T>> no_slots(no_total);
2116 std::vector<std::future<void>> fill_futures;
2117 fill_futures.reserve(num_chunks);
2119 for (
size_t chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx)
2121 const auto bounds = parallel_detail::bounds_for_chunk(chunk_idx, n, chunk_size);
2122 const size_t yes_offset = yes_offsets[chunk_idx];
2123 const size_t no_offset = no_offsets[chunk_idx];
2125 fill_futures.push_back(pool.enqueue([&data, &yes_slots, &no_slots,
pred,
2127 chunk_end = bounds.end,
2128 yes_offset, no_offset,
2129 token =
options.cancel_token]()
2131 auto it = std::begin(data);
2132 std::advance(it, offset);
2133 size_t yes_pos = yes_offset;
2134 size_t no_pos = no_offset;
2135 for (size_t i = offset; i < chunk_end; ++i, ++it)
2137 parallel_detail::throw_if_parallel_canceled(token);
2139 yes_slots[yes_pos++].emplace(*it);
2141 no_slots[no_pos++].emplace(*it);
2146 for (
auto & f: fill_futures)
2149 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
2151 std::vector<T> yes_result, no_result;
2152 yes_result.reserve(yes_total);
2153 no_result.reserve(no_total);
2155 for (
auto & slot: yes_slots)
2156 yes_result.push_back(
std::move(*slot));
2157 for (
auto & slot: no_slots)
2158 no_result.push_back(
std::move(*slot));
2160 return std::make_pair(std::move(yes_result), std::move(no_result));
2190 template <
typename Container,
typename BinaryOp>
2192 size_t chunk_size = 0)
2196 options.chunk_size = chunk_size;
2220 template <
typename Container,
typename BinaryOp>
2224 using T = std::decay_t<
decltype(*std::begin(c))>;
2226 auto holder = parallel_detail::ensure_random_access(c);
2227 const auto & data = parallel_detail::deref(holder);
2228 const size_t n =
static_cast<size_t>(std::distance(std::begin(data), std::end(data)));
2229 std::vector<std::optional<T>> slots(n);
2233 std::vector<T> result;
2235 for (
auto & slot: slots)
2236 result.push_back(
std::move(*slot));
2266 template <
typename Container,
typename T,
typename BinaryOp>
2268 BinaryOp op,
size_t chunk_size = 0)
2272 options.chunk_size = chunk_size;
2296 template <
typename Container,
typename T,
typename BinaryOp>
2300 auto holder = parallel_detail::ensure_random_access(c);
2301 const auto & data = parallel_detail::deref(holder);
2302 const size_t n =
static_cast<size_t>(std::distance(std::begin(data), std::end(data)));
2303 std::vector<std::optional<T>> slots(n);
2308 std::vector<T> result;
2310 for (
auto & slot: slots)
2311 result.push_back(
std::move(*slot));
2340 template <
typename Container1,
typename Container2,
typename Compare = std::less<>>
2342 Compare comp = Compare{},
size_t chunk_size = 0)
2346 options.chunk_size = chunk_size;
2347 return pmerge(c1, c2, std::move(comp),
options);
2370 template <
typename Container1,
typename Container2,
typename Compare = std::less<>>
2371 [[nodiscard]]
auto pmerge(
const Container1 & c1,
const Container2 & c2,
2372 Compare comp = Compare{},
2373 const ParallelOptions &
options = {})
2375 using T1 = std::decay_t<
decltype(*std::begin(c1))>;
2376 using T2 = std::decay_t<
decltype(*std::begin(c2))>;
2377 using ResultT = std::common_type_t<T1, T2>;
2379 auto h1 = parallel_detail::ensure_random_access(c1);
2380 auto h2 = parallel_detail::ensure_random_access(c2);
2381 const auto & d1 = parallel_detail::deref(h1);
2382 const auto & d2 = parallel_detail::deref(h2);
2384 const size_t total =
static_cast<size_t>(std::distance(std::begin(d1), std::end(d1))
2385 + std::distance(std::begin(d2), std::end(d2)));
2386 std::vector<std::optional<ResultT>> slots(total);
2389 std::begin(d2), std::end(d2),
2390 slots.begin(), comp,
options);
2392 std::vector<ResultT> result;
2393 result.reserve(total);
2394 for (
auto & slot: slots)
2395 result.push_back(
std::move(*slot));
2403 namespace parallel_zip_detail
2413 template <
typename Container>
2416 using value_type = std::decay_t<decltype(*std::begin(std::declval<Container &>()))>;
2418 parallel_detail::has_random_access<Container>(),
2420 std::unique_ptr<std::vector<value_type>>>;
2427 if constexpr (parallel_detail::has_random_access<Container>())
2431 cached_size =
static_cast<size_t>(std::distance(std::begin(c), std::end(c)));
2436 data = std::make_unique<std::vector<value_type>>(std::begin(c), std::end(c));
2437 cached_size = data->size();
2443 if constexpr (parallel_detail::has_random_access<Container>())
2450 [[nodiscard]]
size_t size() const noexcept {
return cached_size; }
2452 auto begin()
const {
return std::begin(get()); }
2453 auto end()
const {
return std::end(get()); }
2457 template <
typename... Holders,
size_t... Is>
2459 std::index_sequence<Is...>)
2461 return std::min({std::get<Is>(holders).size()...});
2464 template <
typename... Holders>
2471 template <
typename... Holders,
size_t... Is>
2473 std::index_sequence<Is...>)
2475 return std::make_tuple([&]()
2477 auto it = std::get<Is>(holders).begin();
2478 std::advance(it,
offset);
2484 template <
typename... Iters,
size_t... Is>
2487 (++std::get<Is>(iters), ...);
2491 template <
typename... Iters,
size_t... Is>
2494 return std::make_tuple(*std::get<Is>(iters)...);
2530 template <
typename Op,
typename... Containers>
2538 template <
typename Op,
typename... Containers>
2541 static_assert(
sizeof...(Containers) >= 2,
2542 "pzip_for_each requires at least 2 containers");
2549 const size_t n = parallel_zip_detail::min_holder_size(holders);
2553 auto & pool = parallel_detail::selected_parallel_pool(
options);
2554 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
2556 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
2557 constexpr size_t N =
sizeof...(Containers);
2558 auto iters = parallel_zip_detail::make_iterators_at(
2559 0, holders, std::make_index_sequence<N>{});
2560 for (
size_t i = 0; i < n; ++i)
2562 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
2563 std::apply(op, parallel_zip_detail::deref_all_iters(
2564 iters, std::make_index_sequence<N>{}));
2565 parallel_zip_detail::advance_all_iters(iters, std::make_index_sequence<N>{});
2570 const size_t chunk_size = parallel_detail::effective_parallel_chunk_size(
2573 std::vector<std::future<void>> futures;
2578 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
2579 size_t chunk_end = std::min(
offset + chunk_size, n);
2581 futures.push_back(pool.
enqueue([&holders, op,
offset, chunk_end,
2582 token =
options.cancel_token]()
2584 constexpr size_t N = sizeof...(Containers);
2585 auto iters = parallel_zip_detail::make_iterators_at(
2586 offset, holders, std::make_index_sequence<N>{});
2588 for (
size_t i =
offset; i < chunk_end; ++i)
2590 parallel_detail::throw_if_parallel_canceled(token);
2591 std::apply(op, parallel_zip_detail::deref_all_iters(
2592 iters, std::make_index_sequence<N>{}));
2593 parallel_zip_detail::advance_all_iters(
2594 iters, std::make_index_sequence<N>{});
2601 for (
auto & f: futures)
2636 template <
typename Op,
typename... Containers>
2644 template <
typename Op,
typename... Containers>
2646 const Containers &... cs)
2648 static_assert(
sizeof...(Containers) >= 2,
2649 "pzip_maps requires at least 2 containers");
2652 using ResultT = std::invoke_result_t<Op,
2653 std::decay_t<
decltype(*std::begin(cs))>...>;
2659 const size_t n = parallel_zip_detail::min_holder_size(holders);
2661 return std::vector<ResultT>{};
2663 auto & pool = parallel_detail::selected_parallel_pool(
options);
2664 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
2666 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
2667 std::vector<std::optional<ResultT>> slots(n);
2668 constexpr size_t N =
sizeof...(Containers);
2669 auto iters = parallel_zip_detail::make_iterators_at(
2670 0, holders, std::make_index_sequence<N>{});
2671 for (
size_t i = 0; i < n; ++i)
2673 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
2674 slots[i] = std::apply(op, parallel_zip_detail::deref_all_iters(
2675 iters, std::make_index_sequence<N>{}));
2676 parallel_zip_detail::advance_all_iters(iters, std::make_index_sequence<N>{});
2679 std::vector<ResultT> result;
2681 for (
auto & slot: slots)
2682 result.push_back(std::move(*slot));
2686 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool,
options,
2689 std::vector<std::optional<ResultT>> slots(n);
2690 std::vector<std::future<void>> futures;
2695 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
2696 size_t chunk_end = std::min(
offset + chunk_size, n);
2698 futures.push_back(pool.
enqueue([&slots, &holders, op,
offset, chunk_end,
2699 token =
options.cancel_token]()
2701 constexpr size_t N = sizeof...(Containers);
2702 auto iters = parallel_zip_detail::make_iterators_at(
2703 offset, holders, std::make_index_sequence<N>{});
2705 for (
size_t i =
offset; i < chunk_end; ++i)
2707 parallel_detail::throw_if_parallel_canceled(token);
2708 slots[i] = std::apply(op, parallel_zip_detail::deref_all_iters(
2709 iters, std::make_index_sequence<N>{}));
2710 parallel_zip_detail::advance_all_iters(iters, std::make_index_sequence<N>{});
2716 for (
auto & f: futures)
2719 std::vector<ResultT> result;
2721 for (
auto & slot: slots)
2722 result.push_back(
std::move(*slot));
2765 template <
typename T,
typename Op,
typename Combiner,
typename... Containers>
2767 const Containers &... cs)
2774 template <
typename T,
typename Op,
typename Combiner,
typename... Containers>
2778 static_assert(
sizeof...(Containers) >= 2,
2779 "pzip_foldl requires at least 2 containers");
2785 const size_t n = parallel_zip_detail::min_holder_size(holders);
2789 auto & pool = parallel_detail::selected_parallel_pool(
options);
2790 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
2792 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
2793 constexpr size_t N =
sizeof...(Containers);
2794 auto iters = parallel_zip_detail::make_iterators_at(
2795 0, holders, std::make_index_sequence<N>{});
2797 for (
size_t i = 0; i < n; ++i)
2799 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
2800 auto tuple = parallel_zip_detail::deref_all_iters(iters, std::make_index_sequence<N>{});
2801 result = std::apply([&op, &result](
auto &&... args)
2803 return op(result, std::forward<
decltype(args)>(args)...);
2805 parallel_zip_detail::advance_all_iters(iters, std::make_index_sequence<N>{});
2810 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool,
options,
2813 std::vector<std::future<T>> futures;
2818 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
2819 size_t chunk_end = std::min(
offset + chunk_size, n);
2821 futures.push_back(pool.
enqueue([&holders, init, op,
offset, chunk_end,
2822 token =
options.cancel_token]()
2824 constexpr size_t N = sizeof...(Containers);
2825 auto iters = parallel_zip_detail::make_iterators_at(
2826 offset, holders, std::make_index_sequence<N>{});
2829 parallel_detail::throw_if_parallel_canceled(token);
2830 auto first_tuple = parallel_zip_detail::deref_all_iters(
2831 iters, std::make_index_sequence<N>{});
2832 T local = std::apply([&op, &init](
auto &&... args)
2834 return op(init, std::forward<
decltype(args)>(args)
2837 parallel_zip_detail::advance_all_iters(
2838 iters, std::make_index_sequence<N>{});
2841 for (
size_t i =
offset + 1; i < chunk_end; ++i)
2843 parallel_detail::throw_if_parallel_canceled(token);
2844 auto tuple = parallel_zip_detail::deref_all_iters(
2845 iters, std::make_index_sequence<N>{});
2846 local = std::apply([&op, &local](
auto &&... args)
2849 std::forward<
decltype(args)>(args)
2852 parallel_zip_detail::advance_all_iters(
2853 iters, std::make_index_sequence<N>{});
2863 T result = futures[0].get();
2864 for (
size_t i = 1; i < futures.size(); ++i)
2865 result = combiner(result, futures[i].get());
2898 template <
typename Pred,
typename... Containers>
2906 template <
typename Pred,
typename... Containers>
2908 const Containers &... cs)
2910 static_assert(
sizeof...(Containers) >= 2,
2911 "pzip_all requires at least 2 containers");
2914 auto holders = std::make_tuple(
2918 const size_t n = parallel_zip_detail::min_holder_size(holders);
2922 auto & pool = parallel_detail::selected_parallel_pool(
options);
2923 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
2925 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
2926 constexpr size_t N =
sizeof...(Containers);
2927 auto iters = parallel_zip_detail::make_iterators_at(
2928 0, holders, std::make_index_sequence<N>{});
2929 for (
size_t i = 0; i < n; ++i)
2931 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
2932 auto tuple = parallel_zip_detail::deref_all_iters(iters, std::make_index_sequence<N>{});
2933 if (! std::apply(
pred, tuple))
2935 parallel_zip_detail::advance_all_iters(iters, std::make_index_sequence<N>{});
2940 const size_t chunk_size = parallel_detail::effective_parallel_chunk_size(
2943 std::atomic<bool> found_false{
false};
2944 std::vector<std::future<void>> futures;
2949 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
2950 size_t chunk_end = std::min(
offset + chunk_size, n);
2952 futures.push_back(pool.
enqueue([&holders,
pred, &found_false,
offset, chunk_end,
2953 token =
options.cancel_token]()
2955 if (found_false.load(std::memory_order_relaxed))
2958 constexpr size_t N = sizeof...(Containers);
2959 auto iters = parallel_zip_detail::make_iterators_at(
2960 offset, holders, std::make_index_sequence<N>{});
2962 for (
size_t i =
offset; i < chunk_end; ++i)
2964 parallel_detail::throw_if_parallel_canceled(token);
2965 auto tuple = parallel_zip_detail::deref_all_iters(
2966 iters, std::make_index_sequence<N>{});
2967 if (! std::apply(
pred, tuple))
2969 found_false.store(
true, std::memory_order_relaxed);
2972 if (found_false.load(std::memory_order_relaxed))
2974 parallel_zip_detail::advance_all_iters(
2975 iters, std::make_index_sequence<N>{});
2982 for (
auto & f: futures)
2985 return not found_false.load();
3016 template <
typename Pred,
typename... Containers>
3024 template <
typename Pred,
typename... Containers>
3026 const Containers &... cs)
3028 static_assert(
sizeof...(Containers) >= 2,
3029 "pzip_exists requires at least 2 containers");
3035 const size_t n = parallel_zip_detail::min_holder_size(holders);
3039 auto & pool = parallel_detail::selected_parallel_pool(
options);
3040 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
3042 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
3043 constexpr size_t N =
sizeof...(Containers);
3044 auto iters = parallel_zip_detail::make_iterators_at(
3045 0, holders, std::make_index_sequence<N>{});
3046 for (
size_t i = 0; i < n; ++i)
3048 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
3049 auto tuple = parallel_zip_detail::deref_all_iters(iters, std::make_index_sequence<N>{});
3050 if (std::apply(
pred, tuple))
3052 parallel_zip_detail::advance_all_iters(iters, std::make_index_sequence<N>{});
3057 const size_t chunk_size = parallel_detail::effective_parallel_chunk_size(
3060 std::atomic<bool> found{
false};
3061 std::vector<std::future<void>> futures;
3066 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
3067 size_t chunk_end = std::min(
offset + chunk_size, n);
3070 token =
options.cancel_token]()
3072 if (found.load(std::memory_order_relaxed))
3075 constexpr size_t N = sizeof...(Containers);
3076 auto iters = parallel_zip_detail::make_iterators_at(
3077 offset, holders, std::make_index_sequence<N>{});
3079 for (
size_t i =
offset; i < chunk_end; ++i)
3081 parallel_detail::throw_if_parallel_canceled(token);
3082 auto tuple = parallel_zip_detail::deref_all_iters(
3083 iters, std::make_index_sequence<N>{});
3084 if (std::apply(
pred, tuple))
3086 found.store(
true, std::memory_order_relaxed);
3089 if (found.load(std::memory_order_relaxed))
3091 parallel_zip_detail::advance_all_iters(
3092 iters, std::make_index_sequence<N>{});
3099 for (
auto & f: futures)
3102 return found.load();
3120 template <
typename Pred,
typename... Containers>
3122 const Containers &... cs)
3129 template <
typename Pred,
typename... Containers>
3131 const Containers &... cs)
3133 static_assert(
sizeof...(Containers) >= 2,
3134 "pzip_count_if requires at least 2 containers");
3140 const size_t n = parallel_zip_detail::min_holder_size(holders);
3144 auto & pool = parallel_detail::selected_parallel_pool(
options);
3145 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
3147 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
3148 constexpr size_t N =
sizeof...(Containers);
3149 auto iters = parallel_zip_detail::make_iterators_at(
3150 0, holders, std::make_index_sequence<N>{});
3152 for (
size_t i = 0; i < n; ++i)
3154 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
3155 auto tuple = parallel_zip_detail::deref_all_iters(iters, std::make_index_sequence<N>{});
3156 if (std::apply(
pred, tuple))
3158 parallel_zip_detail::advance_all_iters(iters, std::make_index_sequence<N>{});
3163 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool,
options,
3166 std::vector<std::future<size_t>> futures;
3171 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
3172 size_t chunk_end = std::min(
offset + chunk_size, n);
3175 token =
options.cancel_token]()
3177 constexpr size_t N = sizeof...(Containers);
3178 auto iters = parallel_zip_detail::make_iterators_at(
3179 offset, holders, std::make_index_sequence<N>{});
3182 for (
size_t i =
offset; i < chunk_end; ++i)
3184 parallel_detail::throw_if_parallel_canceled(token);
3185 auto tuple = parallel_zip_detail::deref_all_iters(
3186 iters, std::make_index_sequence<N>{});
3187 if (std::apply(
pred, tuple))
3189 parallel_zip_detail::advance_all_iters(
3190 iters, std::make_index_sequence<N>{});
3199 for (
auto & f: futures)
3234 template <
typename Container,
typename Op>
3236 size_t chunk_size = 0)
3240 options.chunk_size = chunk_size;
3244 template <
typename Container,
typename Op>
3248 const size_t n = std::distance(std::begin(c), std::end(c));
3252 auto & pool = parallel_detail::selected_parallel_pool(
options);
3253 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
3255 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
3256 auto it = std::begin(c);
3257 for (
size_t i = 0; i < n; ++i, ++it)
3259 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
3265 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool,
options,
3268 std::vector<std::future<void>> futures;
3273 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
3274 size_t chunk_end = std::min(
offset + chunk_size, n);
3277 token =
options.cancel_token]()
3279 auto it = std::begin(c);
3280 std::advance(it, offset);
3281 for (size_t i = offset; i < chunk_end; ++i, ++it)
3283 parallel_detail::throw_if_parallel_canceled(token);
3291 for (
auto & f: futures)
3307 template <
typename Container,
typename Op>
3309 size_t chunk_size = 0)
3313 options.chunk_size = chunk_size;
3317 template <
typename Container,
typename Op>
3321 const size_t n = std::distance(std::begin(c), std::end(c));
3325 auto & pool = parallel_detail::selected_parallel_pool(
options);
3326 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
3328 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
3329 auto it = std::begin(c);
3330 for (
size_t i = 0; i < n; ++i, ++it)
3332 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
3338 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool,
options,
3341 auto data_holder = parallel_detail::ensure_random_access(c);
3342 const auto & data = parallel_detail::deref(data_holder);
3344 std::vector<std::future<void>> futures;
3349 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
3350 size_t chunk_end = std::min(
offset + chunk_size, n);
3352 futures.push_back(pool.
enqueue([&data, op,
offset, chunk_end,
3353 token =
options.cancel_token]()
3355 auto it = std::begin(data);
3356 std::advance(it, offset);
3357 for (size_t i = offset; i < chunk_end; ++i, ++it)
3359 parallel_detail::throw_if_parallel_canceled(token);
3367 for (
auto & f: futures)
3399 template <
typename Container,
typename Op>
3401 size_t chunk_size = 0)
3405 options.chunk_size = chunk_size;
3409 template <
typename Container,
typename Op>
3413 using T = std::decay_t<
decltype(*std::begin(c))>;
3414 using ResultT = std::invoke_result_t<Op, size_t, const T &>;
3416 const size_t n = std::distance(std::begin(c), std::end(c));
3418 return std::vector<ResultT>{};
3420 auto & pool = parallel_detail::selected_parallel_pool(
options);
3421 if (parallel_detail::use_sequential_parallel_path(n, pool,
options))
3423 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
3424 std::vector<ResultT> result;
3426 auto it = std::begin(c);
3427 for (
size_t i = 0; i < n; ++i, ++it)
3429 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
3430 result.push_back(op(i, *it));
3435 size_t chunk_size = parallel_detail::effective_parallel_chunk_size(n, pool,
options,
3438 auto data_holder = parallel_detail::ensure_random_access(c);
3439 const auto & data = parallel_detail::deref(data_holder);
3441 std::vector<std::optional<ResultT>> slots(n);
3442 std::vector<std::future<void>> futures;
3447 parallel_detail::throw_if_parallel_canceled(
options.cancel_token);
3448 size_t chunk_end = std::min(
offset + chunk_size, n);
3450 futures.push_back(pool.
enqueue([&slots, &data, op,
offset, chunk_end,
3451 token =
options.cancel_token]()
3453 auto it = std::begin(data);
3454 std::advance(it, offset);
3455 for (size_t i = offset; i < chunk_end; ++i, ++it)
3457 parallel_detail::throw_if_parallel_canceled(token);
3458 slots[i] = op(i, *it);
3464 for (
auto & f: futures)
3467 std::vector<ResultT> result;
3469 for (
auto & slot: slots)
3470 result.push_back(
std::move(*slot));
3491#ifdef AH_PARALLEL_USE_DEFAULT_POOL
3493#define PMAP(c, op) pmaps(parallel_default_pool(), c, op)
3494#define PFILTER(c, pred) pfilter(parallel_default_pool(), c, pred)
3495#define PFOLD(c, init, op) pfoldl(parallel_default_pool(), c, init, op)
3496#define PFOR_EACH(c, op) pfor_each(parallel_default_pool(), c, op)
3497#define PALL(c, pred) pall(parallel_default_pool(), c, pred)
3498#define PEXISTS(c, pred) pexists(parallel_default_pool(), c, pred)
3499#define PSUM(c) psum(parallel_default_pool(), c)
Read-only cooperative cancellation token.
void throw_if_cancellation_requested() const
Throw operation_canceled if cancellation was requested.
A reusable thread pool for efficient parallel task execution.
size_t num_threads() const noexcept
Get the number of worker threads.
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task for execution and get a future for the result.
int cmp(const __gmp_expr< T, U > &expr1, const __gmp_expr< V, W > &expr2)
Freq_Node * pred
Predecessor node in level-order traversal.
const long double offset[]
Offset values indexed by symbol string length (bounded by MAX_OFFSET_INDEX)
decltype(auto) deref(T &&ptr)
Get reference from pointer or unique_ptr.
bool use_sequential_parallel_path(const size_t n, const ThreadPool &pool, const ParallelOptions &options) noexcept
void throw_if_parallel_canceled(const CancellationToken &token)
size_t chunk_count(const size_t n, const size_t chunk_size) noexcept
ThreadPool & selected_parallel_pool(const ParallelOptions &options)
auto ensure_random_access(const Container &c)
For containers with random access, just return a pointer to it For non-random access,...
size_t effective_parallel_chunk_size(const size_t n, const ThreadPool &pool, const ParallelOptions &options, const size_t min_chunk=64)
size_t chunk_size(const size_t n, const size_t num_threads, const size_t min_chunk=64)
Calculate optimal chunk size based on data size and thread count.
constexpr bool has_random_access()
Check if container supports random access.
chunk_bounds bounds_for_chunk(const size_t idx, const size_t n, const size_t chunk_size) noexcept
size_t min_holder_size_impl(const std::tuple< Holders... > &holders, std::index_sequence< Is... >)
Get minimum size from tuple of holders - always O(1) per holder.
void advance_all_iters(std::tuple< Iters... > &iters, std::index_sequence< Is... >)
Advance all iterators in tuple.
auto make_iterators_at(size_t offset, const std::tuple< Holders... > &holders, std::index_sequence< Is... >)
Create tuple of iterators at given offset.
size_t min_holder_size(const std::tuple< Holders... > &holders)
auto deref_all_iters(const std::tuple< Iters... > &iters, std::index_sequence< Is... >)
Dereference all iterators and make tuple.
Main namespace for Aleph-w library functions.
and
Check uniqueness with explicit hash + equality functors.
bool pall(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel all predicate (short-circuit).
bool pnone(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel none predicate.
auto pmaps(ThreadPool &pool, const Container &c, Op op, size_t chunk_size=0)
Parallel map operation.
auto pmin(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel minimum element.
T pzip_foldl_n(ThreadPool &pool, T init, Op op, Combiner combiner, const Containers &... cs)
Parallel fold/reduce over N zipped containers (variadic).
ThreadPool & default_pool()
Return the default shared thread pool instance.
T pzip_foldl(ThreadPool &pool, const Container1 &c1, const Container2 &c2, T init, Op op, size_t chunk_size=0)
Parallel zip + fold.
void pzip_for_each(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Op op, size_t chunk_size=0)
Parallel zip + for_each.
size_t size(Node *root) noexcept
ThreadPool & parallel_default_pool()
Global default pool for parallel operations.
auto penumerate_maps(ThreadPool &pool, const Container &c, Op op, size_t chunk_size=0)
Parallel enumerate with map.
void penumerate_for_each(ThreadPool &pool, Container &c, Op op, size_t chunk_size=0)
Parallel for_each with index (enumerate).
bool pzip_all_n(ThreadPool &pool, Pred pred, const Containers &... cs)
Parallel all predicate over N zipped containers (variadic).
size_t pcount_if(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel count_if operation.
auto pexclusive_scan(ThreadPool &pool, const Container &c, T init, BinaryOp op, size_t chunk_size=0)
Parallel exclusive scan over a container.
std::optional< size_t > pfind(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel find operation (returns index).
bool pzip_exists_n(ThreadPool &pool, Pred pred, const Containers &... cs)
Parallel exists predicate over N zipped containers (variadic).
Divide_Conquer_DP_Result< Cost > divide_and_conquer_partition_dp(const size_t groups, const size_t n, Transition_Cost_Fn transition_cost, const Cost inf=dp_optimization_detail::default_inf< Cost >())
Optimize partition DP using divide-and-conquer optimization.
std::decay_t< typename HeadC::Item_Type > T
void psort(ThreadPool &pool, Container &c, Compare cmp=Compare{}, const size_t min_parallel_size=1024)
Parallel sort (in-place).
void pfor_each(ThreadPool &pool, Container &c, Op op, size_t chunk_size=0)
Parallel for_each operation.
auto pfilter(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel filter operation.
auto ppartition(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel partition (stable).
auto pmerge(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Compare comp=Compare{}, size_t chunk_size=0)
Parallel merge of two sorted containers.
T pproduct(ThreadPool &pool, const Container &c, T init=T{1}, size_t chunk_size=0)
Parallel product of elements.
auto pmax(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel maximum element.
void pzip_for_each_n(ThreadPool &pool, Op op, const Containers &... cs)
Parallel for_each over N zipped containers (variadic).
auto pzip_maps_n(ThreadPool &pool, Op op, const Containers &... cs)
Parallel map over N zipped containers (variadic).
auto pscan(ThreadPool &pool, const Container &c, BinaryOp op, size_t chunk_size=0)
Parallel inclusive scan over a container.
T pfoldl(ThreadPool &pool, const Container &c, T init, BinaryOp op, size_t chunk_size=0)
Parallel left fold (reduce).
auto pfind_value(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel find with value return.
auto pzip_maps(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Op op, size_t chunk_size=0)
Parallel zip + map.
T psum(ThreadPool &pool, const Container &c, T init=T{}, size_t chunk_size=0)
Parallel sum of elements.
auto pminmax(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel min and max elements.
size_t pzip_count_if_n(ThreadPool &pool, Pred pred, const Containers &... cs)
Parallel count over N zipped containers (variadic).
bool pexists(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel exists predicate (short-circuit).
Itor::difference_type count(const Itor &beg, const Itor &end, const T &value)
Count elements equal to a value.
static struct argp_option options[]
Common configuration object for parallel algorithms.
ThreadPool * pool
Executor to use (nullptr = default_pool()).
Holder for converted containers (either pointer or unique_ptr to vector).
size_t cached_size
Cached size for O(1) access.
std::decay_t< decltype(*std::begin(std::declval< Container & >()))> value_type
std::conditional_t< parallel_detail::has_random_access< Container >(), const Container *, std::unique_ptr< std::vector< value_type > > > holder_type
ContainerHolder(const Container &c)
decltype(auto) get() const
size_t size() const noexcept
Size is always O(1) - either from random access or from cached vector size.
Filter_Iterator< DynList< int >, DynList< int >::Iterator, Par > It
A modern, efficient thread pool for parallel task execution.