116#include <type_traits>
127 namespace parallel_detail
132 if (n == 0)
return 1;
134 const size_t chunks = num_threads * 4;
140 template <
typename Container>
143 using It =
decltype(std::begin(std::declval<Container &>()));
144 return std::is_base_of_v<std::random_access_iterator_tag,
145 typename std::iterator_traits<It>::iterator_category>;
150 template <
typename Container>
156 return std::make_unique<std::vector<typename Container::value_type>>(std::begin(c), std::end(c));
160 template <
typename T>
163 if constexpr (std::is_pointer_v<std::decay_t<T>>)
204 template <
typename ResultT =
void,
typename Container,
typename Op>
206 size_t chunk_size = 0)
208 using InputT = std::decay_t<
decltype(*std::begin(c))>;
210 std::is_void_v<ResultT>,
211 std::invoke_result_t<Op, const InputT &>,
214 const size_t n = std::distance(std::begin(c), std::end(c));
216 return std::vector<ActualResultT>{};
225 std::vector<ActualResultT> result(n);
226 std::vector<std::future<void>>
futures;
236 auto in_it = std::begin(data);
237 std::advance(in_it, offset);
238 for (size_t i = offset; i < chunk_end; ++i, ++in_it)
239 result[i] = op(*in_it);
282 template <
typename Container,
typename Pred>
284 size_t chunk_size = 0)
286 using T = std::decay_t<
decltype(*std::begin(c))>;
288 const size_t n = std::distance(std::begin(c), std::end(c));
290 return std::vector<T>{};
299 std::vector<std::future<std::vector<T>>>
futures;
308 std::vector<T> chunk_result;
309 auto it = std::begin(data);
310 std::advance(it, offset);
311 for (size_t i = offset; i < chunk_end; ++i, ++it)
313 chunk_result.push_back(*it);
321 std::vector<T> result;
325 result.insert(result.end(),
374 template <
typename T,
typename Container,
typename BinaryOp>
376 size_t chunk_size = 0)
378 const size_t n = std::distance(std::begin(c), std::end(c));
388 std::vector<std::future<T>>
futures;
397 auto it = std::begin(data);
398 std::advance(it, offset);
400 for (size_t i = offset + 1; i < chunk_end; ++i, ++it)
401 local = op(local, *it);
411 result = op(result, f.get());
450 template <
typename Container,
typename Op>
453 const size_t n = std::distance(std::begin(c), std::end(c));
460 std::vector<std::future<void>>
futures;
469 auto it = std::begin(c);
470 std::advance(it, offset);
471 for (size_t i = offset; i < chunk_end; ++i, ++it)
494 template <
typename Container,
typename Op>
497 const size_t n = std::distance(std::begin(c), std::end(c));
507 std::vector<std::future<void>>
futures;
516 auto it = std::begin(data);
517 std::advance(it, offset);
518 for (size_t i = offset; i < chunk_end; ++i, ++it)
555 template <
typename Container,
typename Pred>
557 size_t chunk_size = 0)
559 const size_t n = std::distance(std::begin(c), std::end(c));
570 std::vector<std::future<void>>
futures;
579 if (found_false.load(std::memory_order_relaxed))
582 auto it = std::begin(data);
583 std::advance(it, offset);
584 for (size_t i = offset; i < chunk_end; ++i, ++it)
588 found_false.store(true, std::memory_order_relaxed);
591 if (found_false.load(std::memory_order_relaxed))
626 template <
typename Container,
typename Pred>
628 size_t chunk_size = 0)
630 const size_t n = std::distance(std::begin(c), std::end(c));
635 chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
637 auto data_holder = parallel_detail::ensure_random_access(c);
638 const auto & data = parallel_detail::deref(data_holder);
640 std::atomic<bool> found{
false};
641 std::vector<std::future<void>> futures;
646 size_t chunk_end = std::min(
offset + chunk_size, n);
650 if (found.load(std::memory_order_relaxed))
653 auto it = std::begin(data);
654 std::advance(it, offset);
655 for (size_t i = offset; i < chunk_end; ++i, ++it)
659 found.store(true, std::memory_order_relaxed);
662 if (found.load(std::memory_order_relaxed))
670 for (
auto & f: futures)
689 template <
typename Container,
typename Pred>
691 size_t chunk_size = 0)
721 template <
typename Container,
typename Pred>
723 size_t chunk_size = 0)
725 const size_t n = std::distance(std::begin(c), std::end(c));
730 chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
732 auto data_holder = parallel_detail::ensure_random_access(c);
733 const auto & data = parallel_detail::deref(data_holder);
735 std::vector<std::future<size_t>> futures;
740 size_t chunk_end = std::min(
offset + chunk_size, n);
745 auto it = std::begin(data);
746 std::advance(it, offset);
747 for (size_t i = offset; i < chunk_end; ++i, ++it)
757 for (
auto & f: futures)
792 template <
typename Container,
typename Pred>
794 Pred
pred,
size_t chunk_size = 0)
796 const size_t n = std::distance(std::begin(c), std::end(c));
801 chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
803 auto data_holder = parallel_detail::ensure_random_access(c);
804 const auto & data = parallel_detail::deref(data_holder);
807 std::atomic<size_t> min_index{n};
808 std::vector<std::future<void>> futures;
813 size_t chunk_end = std::min(
offset + chunk_size, n);
818 if (min_index.load(std::memory_order_relaxed) <= offset)
821 auto it = std::begin(data);
822 std::advance(it, offset);
823 for (size_t i = offset; i < chunk_end; ++i, ++it)
826 if (min_index.load(std::memory_order_relaxed) <= i)
832 size_t expected = min_index.load(std::memory_order_relaxed);
833 while (i < expected and
834 not min_index.compare_exchange_weak(expected, i,
835 std::memory_order_relaxed));
844 for (
auto & f: futures)
847 if (
size_t result = min_index.load(); result < n)
875 template <
typename Container,
typename Pred>
877 Pred
pred,
size_t chunk_size = 0)
879 using T = std::decay_t<
decltype(*std::begin(c))>;
881 auto idx =
pfind(pool, c,
pred, chunk_size);
883 return std::optional<T>{std::nullopt};
885 auto it = std::begin(c);
886 std::advance(it, *idx);
887 return std::optional<T>{*it};
914 typename T = std::decay_t<decltype(*std::begin(std::declval<Container>()))>>
916 size_t chunk_size = 0)
918 return pfoldl(pool, c, init, std::plus<T>{},
chunk_size);
933 typename T = std::decay_t<decltype(*std::begin(std::declval<Container>()))>>
935 size_t chunk_size = 0)
937 return pfoldl(pool, c, init, std::multiplies<T>{},
chunk_size);
950 template <
typename Container>
953 using T = std::decay_t<
decltype(*std::begin(c))>;
955 const size_t n = std::distance(std::begin(c), std::end(c));
957 return std::optional<T>{std::nullopt};
960 chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
962 auto data_holder = parallel_detail::ensure_random_access(c);
963 const auto & data = parallel_detail::deref(data_holder);
965 std::vector<std::future<T>> futures;
970 size_t chunk_end = std::min(
offset + chunk_size, n);
974 auto it = std::begin(data);
975 std::advance(it, offset);
977 for (size_t i = offset + 1; i < chunk_end; ++i, ++it)
986 T result = futures[0].get();
987 for (
size_t i = 1; i < futures.size(); ++i)
989 T val = futures[i].get();
994 return std::optional<T>{result};
1007 template <
typename Container>
1010 using T = std::decay_t<
decltype(*std::begin(c))>;
1012 const size_t n = std::distance(std::begin(c), std::end(c));
1014 return std::optional<T>{std::nullopt};
1016 if (chunk_size == 0)
1017 chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
1019 auto data_holder = parallel_detail::ensure_random_access(c);
1020 const auto & data = parallel_detail::deref(data_holder);
1022 std::vector<std::future<T>> futures;
1027 size_t chunk_end = std::min(
offset + chunk_size, n);
1031 auto it = std::begin(data);
1032 std::advance(it, offset);
1033 T local_max = *it++;
1034 for (size_t i = offset + 1; i < chunk_end; ++i, ++it)
1035 if (*it > local_max)
1043 T result = futures[0].get();
1044 for (
size_t i = 1; i < futures.size(); ++i)
1046 T val = futures[i].get();
1051 return std::optional<T>{result};
1064 template <
typename Container>
1067 using T = std::decay_t<
decltype(*std::begin(c))>;
1069 const size_t n = std::distance(std::begin(c), std::end(c));
1071 return std::optional<std::pair<T, T>>{std::nullopt};
1073 if (chunk_size == 0)
1074 chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
1076 auto data_holder = parallel_detail::ensure_random_access(c);
1077 const auto & data = parallel_detail::deref(data_holder);
1079 std::vector<std::future<std::pair<T, T>>> futures;
1084 size_t chunk_end = std::min(
offset + chunk_size, n);
1088 auto it = std::begin(data);
1089 std::advance(it, offset);
1091 T local_max = *it++;
1092 for (size_t i = offset + 1; i < chunk_end; ++i, ++it)
1094 if (*it < local_min) local_min = *it;
1095 if (*it > local_max) local_max = *it;
1097 return std::make_pair(local_min, local_max);
1103 auto result = futures[0].get();
1104 for (
size_t i = 1; i < futures.size(); ++i)
1106 auto [mi, ma] = futures[i].get();
1107 if (mi < result.first) result.first = mi;
1108 if (ma > result.second) result.second = ma;
1111 return std::optional<std::pair<T, T>>{result};
1145 template <
typename Container,
typename Compare = std::less<>>
1147 const size_t min_parallel_size = 1024)
1149 const size_t n = std::distance(std::begin(c), std::end(c));
1154 if (n <= min_parallel_size or pool.
num_threads() <= 1)
1156 std::sort(std::begin(c), std::end(c),
cmp);
1161 const size_t num_chunks = std::min(pool.
num_threads() * 2, n / min_parallel_size);
1162 const size_t chunk_size = (n + num_chunks - 1) / num_chunks;
1165 std::vector<std::future<void>> futures;
1168 size_t end = std::min(i + chunk_size, n);
1169 auto begin_it = std::begin(c);
1170 std::advance(begin_it, i);
1171 auto end_it = std::begin(c);
1172 std::advance(end_it, end);
1174 futures.push_back(pool.
enqueue([begin_it, end_it,
cmp]()
1176 std::sort(begin_it, end_it, cmp);
1180 for (
auto & f: futures)
1184 using T = std::decay_t<
decltype(*std::begin(c))>;
1185 std::vector<T> buffer(n);
1187 for (
size_t width = chunk_size; width < n; width *= 2)
1189 std::vector<std::future<void>> merge_futures;
1191 for (
size_t i = 0; i < n; i += 2 * width)
1193 size_t mid = std::min(i + width, n);
1194 size_t end = std::min(i + 2 * width, n);
1198 auto begin_it = std::begin(c);
1199 std::advance(begin_it, i);
1200 auto mid_it = std::begin(c);
1201 std::advance(mid_it, mid);
1202 auto end_it = std::begin(c);
1203 std::advance(end_it, end);
1205 merge_futures.push_back(pool.
enqueue([begin_it, mid_it, end_it, &buffer, i,
cmp]()
1207 std::merge(begin_it, mid_it, mid_it, end_it,
1208 buffer.begin() + i, cmp);
1214 auto begin_it = std::begin(c);
1215 std::advance(begin_it, i);
1216 auto end_it = std::begin(c);
1217 std::advance(end_it, mid);
1218 std::copy(begin_it, end_it, buffer.begin() + i);
1222 for (
auto & f: merge_futures)
1226 auto it = std::begin(c);
1227 for (
size_t i = 0; i < n; ++i, ++it)
1228 *it = std::move(buffer[i]);
1263 template <
typename Container1,
typename Container2,
typename Op>
1265 Op op,
size_t chunk_size = 0)
1267 const size_t n1 = std::distance(std::begin(c1), std::end(c1));
1268 const size_t n2 = std::distance(std::begin(c2), std::end(c2));
1269 const size_t n = std::min(n1, n2);
1274 if (chunk_size == 0)
1275 chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
1277 auto h1 = parallel_detail::ensure_random_access(c1);
1278 auto h2 = parallel_detail::ensure_random_access(c2);
1279 const auto & d1 = parallel_detail::deref(h1);
1280 const auto & d2 = parallel_detail::deref(h2);
1282 std::vector<std::future<void>> futures;
1287 size_t chunk_end = std::min(
offset + chunk_size, n);
1289 futures.push_back(pool.
enqueue([&d1, &d2, op,
offset, chunk_end]()
1291 auto it1 = std::begin(d1);
1292 auto it2 = std::begin(d2);
1293 std::advance(it1, offset);
1294 std::advance(it2, offset);
1295 for (size_t i = offset; i < chunk_end; ++i, ++it1, ++it2)
1302 for (
auto & f: futures)
1330 template <
typename Container1,
typename Container2,
typename Op>
1332 const Container2 & c2, Op op,
size_t chunk_size = 0)
1334 using T1 = std::decay_t<
decltype(*std::begin(c1))>;
1335 using T2 = std::decay_t<
decltype(*std::begin(c2))>;
1336 using ResultT = std::invoke_result_t<Op, const T1 &, const T2 &>;
1338 const size_t n1 = std::distance(std::begin(c1), std::end(c1));
1339 const size_t n2 = std::distance(std::begin(c2), std::end(c2));
1340 const size_t n = std::min(n1, n2);
1343 return std::vector<ResultT>{};
1345 if (chunk_size == 0)
1346 chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
1348 auto h1 = parallel_detail::ensure_random_access(c1);
1349 auto h2 = parallel_detail::ensure_random_access(c2);
1350 const auto & d1 = parallel_detail::deref(h1);
1351 const auto & d2 = parallel_detail::deref(h2);
1353 std::vector<ResultT> result(n);
1354 std::vector<std::future<void>> futures;
1359 size_t chunk_end = std::min(
offset + chunk_size, n);
1361 futures.push_back(pool.
enqueue([&result, &d1, &d2, op,
offset, chunk_end]()
1363 auto it1 = std::begin(d1);
1364 auto it2 = std::begin(d2);
1365 std::advance(it1, offset);
1366 std::advance(it2, offset);
1367 for (size_t i = offset; i < chunk_end; ++i, ++it1, ++it2)
1368 result[i] = op(*it1, *it2);
1374 for (
auto & f: futures)
1408 template <
typename Container1,
typename Container2,
typename T,
typename Op>
1410 const Container2 & c2,
T init, Op op,
1411 size_t chunk_size = 0)
1413 const size_t n1 = std::distance(std::begin(c1), std::end(c1));
1414 const size_t n2 = std::distance(std::begin(c2), std::end(c2));
1415 const size_t n = std::min(n1, n2);
1420 if (chunk_size == 0)
1421 chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
1423 auto h1 = parallel_detail::ensure_random_access(c1);
1424 auto h2 = parallel_detail::ensure_random_access(c2);
1425 const auto & d1 = parallel_detail::deref(h1);
1426 const auto & d2 = parallel_detail::deref(h2);
1428 std::vector<std::future<T>> futures;
1433 size_t chunk_end = std::min(
offset + chunk_size, n);
1435 futures.push_back(pool.
enqueue([&d1, &d2, &init, op,
offset, chunk_end]()
1437 auto it1 = std::begin(d1);
1438 auto it2 = std::begin(d2);
1439 std::advance(it1, offset);
1440 std::advance(it2, offset);
1442 T local = op(init, *it1++, *it2++);
1443 for (size_t i = offset + 1; i < chunk_end; ++i, ++it1, ++it2)
1444 local = op(local, *it1, *it2);
1453 T result = futures[0].get();
1454 for (
size_t i = 1; i < futures.size(); ++i)
1456 T val = futures[i].get();
1458 result = result + val - init;
1489 template <
typename Container,
typename Pred>
1491 size_t chunk_size = 0)
1493 using T = std::decay_t<
decltype(*std::begin(c))>;
1495 const size_t n = std::distance(std::begin(c), std::end(c));
1497 return std::make_pair(std::vector<T>{}, std::vector<T>{});
1499 if (chunk_size == 0)
1500 chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
1502 auto data_holder = parallel_detail::ensure_random_access(c);
1503 const auto & data = parallel_detail::deref(data_holder);
1505 using ChunkResult = std::pair<std::vector<T>, std::vector<T>>;
1506 std::vector<std::future<ChunkResult>> futures;
1511 size_t chunk_end = std::min(
offset + chunk_size, n);
1515 std::vector<T> yes, no;
1516 auto it = std::begin(data);
1517 std::advance(it, offset);
1518 for (size_t i = offset; i < chunk_end; ++i, ++it)
1525 return std::make_pair(std::move(yes), std::move(no));
1532 std::vector<T> yes_result, no_result;
1533 for (
auto & f: futures)
1535 auto [yes, no] = f.get();
1536 yes_result.insert(yes_result.end(),
1537 std::make_move_iterator(yes.begin()),
1538 std::make_move_iterator(yes.end()));
1539 no_result.insert(no_result.end(),
1540 std::make_move_iterator(no.begin()),
1541 std::make_move_iterator(no.end()));
1544 return std::make_pair(std::move(yes_result), std::move(no_result));
1551 namespace parallel_zip_detail
1561 template <
typename Container>
1564 using value_type = std::decay_t<decltype(*std::begin(std::declval<Container &>()))>;
1566 parallel_detail::has_random_access<Container>(),
1568 std::unique_ptr<std::vector<value_type>>>;
1575 if constexpr (parallel_detail::has_random_access<Container>())
1579 cached_size =
static_cast<size_t>(std::distance(std::begin(c), std::end(c)));
1584 data = std::make_unique<std::vector<value_type>>(std::begin(c), std::end(c));
1585 cached_size = data->size();
1591 if constexpr (parallel_detail::has_random_access<Container>())
1598 [[nodiscard]]
size_t size() const noexcept {
return cached_size; }
1600 auto begin()
const {
return std::begin(get()); }
1601 auto end()
const {
return std::end(get()); }
1605 template <
typename... Holders,
size_t... Is>
1607 std::index_sequence<Is...>)
1609 return std::min({std::get<Is>(holders).size()...});
1612 template <
typename... Holders>
1619 template <
typename... Holders,
size_t... Is>
1621 std::index_sequence<Is...>)
1623 return std::make_tuple([&]()
1625 auto it = std::get<Is>(holders).begin();
1626 std::advance(it,
offset);
1632 template <
typename... Iters,
size_t... Is>
1635 (++std::get<Is>(iters), ...);
1639 template <
typename... Iters,
size_t... Is>
1642 return std::make_tuple(*std::get<Is>(iters)...);
1678 template <
typename Op,
typename... Containers>
1681 static_assert(
sizeof...(Containers) >= 2,
1682 "pzip_for_each requires at least 2 containers");
1689 const size_t n = parallel_zip_detail::min_holder_size(holders);
1693 const size_t chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
1695 std::vector<std::future<void>> futures;
1700 size_t chunk_end = std::min(
offset + chunk_size, n);
1702 futures.push_back(pool.
enqueue([&holders, op,
offset, chunk_end]()
1704 constexpr size_t N = sizeof...(Containers);
1705 auto iters = parallel_zip_detail::make_iterators_at(
1706 offset, holders, std::make_index_sequence<N>{});
1708 for (
size_t i =
offset; i < chunk_end; ++i)
1710 std::apply(op, parallel_zip_detail::deref_all_iters(
1711 iters, std::make_index_sequence<N>{}));
1712 parallel_zip_detail::advance_all_iters(
1713 iters, std::make_index_sequence<N>{});
1720 for (
auto & f: futures)
1755 template <
typename Op,
typename... Containers>
1758 static_assert(
sizeof...(Containers) >= 2,
1759 "pzip_maps requires at least 2 containers");
1762 using ResultT = std::invoke_result_t<Op,
1763 std::decay_t<
decltype(*std::begin(cs))>...>;
1769 const size_t n = parallel_zip_detail::min_holder_size(holders);
1771 return std::vector<ResultT>{};
1773 size_t chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
1775 std::vector<ResultT> result(n);
1776 std::vector<std::future<void>> futures;
1781 size_t chunk_end = std::min(
offset + chunk_size, n);
1783 futures.push_back(pool.
enqueue([&result, &holders, op,
offset, chunk_end]()
1785 constexpr size_t N = sizeof...(Containers);
1786 auto iters = parallel_zip_detail::make_iterators_at(
1787 offset, holders, std::make_index_sequence<N>{});
1789 for (
size_t i =
offset; i < chunk_end; ++i)
1791 result[i] = std::apply(op, parallel_zip_detail::deref_all_iters(
1792 iters, std::make_index_sequence<N>{}));
1793 parallel_zip_detail::advance_all_iters(
1794 iters, std::make_index_sequence<N>{});
1801 for (
auto & f: futures)
1845 template <
typename T,
typename Op,
typename Combiner,
typename... Containers>
1847 const Containers &... cs)
1849 static_assert(
sizeof...(Containers) >= 2,
1850 "pzip_foldl requires at least 2 containers");
1856 const size_t n = parallel_zip_detail::min_holder_size(holders);
1860 size_t chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
1862 std::vector<std::future<T>> futures;
1867 size_t chunk_end = std::min(
offset + chunk_size, n);
1869 futures.push_back(pool.
enqueue([&holders, init, op,
offset, chunk_end]()
1871 constexpr size_t N = sizeof...(Containers);
1872 auto iters = parallel_zip_detail::make_iterators_at(
1873 offset, holders, std::make_index_sequence<N>{});
1876 auto first_tuple = parallel_zip_detail::deref_all_iters(
1877 iters, std::make_index_sequence<N>{});
1878 T local = std::apply([&op, &init](
auto &&... args)
1880 return op(init, std::forward<
decltype(args)>(args)
1883 parallel_zip_detail::advance_all_iters(
1884 iters, std::make_index_sequence<N>{});
1887 for (
size_t i =
offset + 1; i < chunk_end; ++i)
1889 auto tuple = parallel_zip_detail::deref_all_iters(
1890 iters, std::make_index_sequence<N>{});
1891 local = std::apply([&op, &local](
auto &&... args)
1894 std::forward<
decltype(args)>(args)
1897 parallel_zip_detail::advance_all_iters(
1898 iters, std::make_index_sequence<N>{});
1908 T result = futures[0].get();
1909 for (
size_t i = 1; i < futures.size(); ++i)
1910 result = combiner(result, futures[i].get());
1943 template <
typename Pred,
typename... Containers>
1946 static_assert(
sizeof...(Containers) >= 2,
1947 "pzip_all requires at least 2 containers");
1950 auto holders = std::make_tuple(
1954 const size_t n = parallel_zip_detail::min_holder_size(holders);
1958 const size_t chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
1960 std::atomic<bool> found_false{
false};
1961 std::vector<std::future<void>> futures;
1966 size_t chunk_end = std::min(
offset + chunk_size, n);
1968 futures.push_back(pool.
enqueue([&holders,
pred, &found_false,
offset, chunk_end]()
1970 if (found_false.load(std::memory_order_relaxed))
1973 constexpr size_t N = sizeof...(Containers);
1974 auto iters = parallel_zip_detail::make_iterators_at(
1975 offset, holders, std::make_index_sequence<N>{});
1977 for (
size_t i =
offset; i < chunk_end; ++i)
1979 auto tuple = parallel_zip_detail::deref_all_iters(
1980 iters, std::make_index_sequence<N>{});
1981 if (! std::apply(
pred, tuple))
1983 found_false.store(
true, std::memory_order_relaxed);
1986 if (found_false.load(std::memory_order_relaxed))
1988 parallel_zip_detail::advance_all_iters(
1989 iters, std::make_index_sequence<N>{});
1996 for (
auto & f: futures)
1999 return not found_false.load();
2030 template <
typename Pred,
typename... Containers>
2033 static_assert(
sizeof...(Containers) >= 2,
2034 "pzip_exists requires at least 2 containers");
2040 const size_t n = parallel_zip_detail::min_holder_size(holders);
2044 const size_t chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
2046 std::atomic<bool> found{
false};
2047 std::vector<std::future<void>> futures;
2052 size_t chunk_end = std::min(
offset + chunk_size, n);
2056 if (found.load(std::memory_order_relaxed))
2059 constexpr size_t N = sizeof...(Containers);
2060 auto iters = parallel_zip_detail::make_iterators_at(
2061 offset, holders, std::make_index_sequence<N>{});
2063 for (
size_t i =
offset; i < chunk_end; ++i)
2065 auto tuple = parallel_zip_detail::deref_all_iters(
2066 iters, std::make_index_sequence<N>{});
2067 if (std::apply(
pred, tuple))
2069 found.store(
true, std::memory_order_relaxed);
2072 if (found.load(std::memory_order_relaxed))
2074 parallel_zip_detail::advance_all_iters(
2075 iters, std::make_index_sequence<N>{});
2082 for (
auto & f: futures)
2085 return found.load();
2103 template <
typename Pred,
typename... Containers>
2105 const Containers &... cs)
2107 static_assert(
sizeof...(Containers) >= 2,
2108 "pzip_count_if requires at least 2 containers");
2114 const size_t n = parallel_zip_detail::min_holder_size(holders);
2118 size_t chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
2120 std::vector<std::future<size_t>> futures;
2125 size_t chunk_end = std::min(
offset + chunk_size, n);
2129 constexpr size_t N = sizeof...(Containers);
2130 auto iters = parallel_zip_detail::make_iterators_at(
2131 offset, holders, std::make_index_sequence<N>{});
2134 for (
size_t i =
offset; i < chunk_end; ++i)
2136 auto tuple = parallel_zip_detail::deref_all_iters(
2137 iters, std::make_index_sequence<N>{});
2138 if (std::apply(
pred, tuple))
2140 parallel_zip_detail::advance_all_iters(
2141 iters, std::make_index_sequence<N>{});
2150 for (
auto & f: futures)
2185 template <
typename Container,
typename Op>
2187 size_t chunk_size = 0)
2189 const size_t n = std::distance(std::begin(c), std::end(c));
2193 if (chunk_size == 0)
2194 chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
2196 std::vector<std::future<void>> futures;
2201 size_t chunk_end = std::min(
offset + chunk_size, n);
2203 futures.push_back(pool.
enqueue([&c, op,
offset, chunk_end]()
2205 auto it = std::begin(c);
2206 std::advance(it, offset);
2207 for (size_t i = offset; i < chunk_end; ++i, ++it)
2214 for (
auto & f: futures)
2230 template <
typename Container,
typename Op>
2232 size_t chunk_size = 0)
2234 const size_t n = std::distance(std::begin(c), std::end(c));
2238 if (chunk_size == 0)
2239 chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
2241 auto data_holder = parallel_detail::ensure_random_access(c);
2242 const auto & data = parallel_detail::deref(data_holder);
2244 std::vector<std::future<void>> futures;
2249 size_t chunk_end = std::min(
offset + chunk_size, n);
2251 futures.push_back(pool.
enqueue([&data, op,
offset, chunk_end]()
2253 auto it = std::begin(data);
2254 std::advance(it, offset);
2255 for (size_t i = offset; i < chunk_end; ++i, ++it)
2262 for (
auto & f: futures)
2294 template <
typename Container,
typename Op>
2296 size_t chunk_size = 0)
2298 using T = std::decay_t<
decltype(*std::begin(c))>;
2299 using ResultT = std::invoke_result_t<Op, size_t, const T &>;
2301 const size_t n = std::distance(std::begin(c), std::end(c));
2303 return std::vector<ResultT>{};
2305 if (chunk_size == 0)
2306 chunk_size = parallel_detail::chunk_size(n, pool.
num_threads());
2308 auto data_holder = parallel_detail::ensure_random_access(c);
2309 const auto & data = parallel_detail::deref(data_holder);
2311 std::vector<ResultT> result(n);
2312 std::vector<std::future<void>> futures;
2317 size_t chunk_end = std::min(
offset + chunk_size, n);
2319 futures.push_back(pool.
enqueue([&result, &data, op,
offset, chunk_end]()
2321 auto it = std::begin(data);
2322 std::advance(it, offset);
2323 for (size_t i = offset; i < chunk_end; ++i, ++it)
2324 result[i] = op(i, *it);
2330 for (
auto & f: futures)
2352#ifdef AH_PARALLEL_USE_DEFAULT_POOL
2354#define PMAP(c, op) pmaps(parallel_default_pool(), c, op)
2355#define PFILTER(c, pred) pfilter(parallel_default_pool(), c, pred)
2356#define PFOLD(c, init, op) pfoldl(parallel_default_pool(), c, init, op)
2357#define PFOR_EACH(c, op) pfor_each(parallel_default_pool(), c, op)
2358#define PALL(c, pred) pall(parallel_default_pool(), c, pred)
2359#define PEXISTS(c, pred) pexists(parallel_default_pool(), c, pred)
2360#define PSUM(c) psum(parallel_default_pool(), c)
A reusable thread pool for efficient parallel task execution.
size_t num_threads() const noexcept
Get the number of worker threads.
auto enqueue(F &&f, Args &&... args) -> std::future< std::invoke_result_t< F, Args... > >
Submit a task for execution and get a future for the result.
iterator end() noexcept
Return an STL-compatible end iterator.
iterator begin() noexcept
Return an STL-compatible iterator to the first element.
int cmp(const __gmp_expr< T, U > &expr1, const __gmp_expr< V, W > &expr2)
Freq_Node * pred
Predecessor node in level-order traversal.
const long double offset[]
Offset values indexed by symbol string length (bounded by MAX_OFFSET_INDEX)
decltype(auto) deref(T &&ptr)
Get reference from pointer or unique_ptr.
auto ensure_random_access(const Container &c)
For containers with random access, just return a pointer to it For non-random access,...
size_t chunk_size(const size_t n, const size_t num_threads, const size_t min_chunk=64)
Calculate optimal chunk size based on data size and thread count.
constexpr bool has_random_access()
Check if container supports random access.
size_t min_holder_size_impl(const std::tuple< Holders... > &holders, std::index_sequence< Is... >)
Get minimum size from tuple of holders - always O(1) per holder.
void advance_all_iters(std::tuple< Iters... > &iters, std::index_sequence< Is... >)
Advance all iterators in tuple.
auto make_iterators_at(size_t offset, const std::tuple< Holders... > &holders, std::index_sequence< Is... >)
Create tuple of iterators at given offset.
size_t min_holder_size(const std::tuple< Holders... > &holders)
auto deref_all_iters(const std::tuple< Iters... > &iters, std::index_sequence< Is... >)
Dereference all iterators and make tuple.
Main namespace for Aleph-w library functions.
bool pall(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel all predicate (short-circuit).
bool pnone(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel none predicate.
auto pmaps(ThreadPool &pool, const Container &c, Op op, size_t chunk_size=0)
Parallel map operation.
auto pmin(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel minimum element.
T pzip_foldl_n(ThreadPool &pool, T init, Op op, Combiner combiner, const Containers &... cs)
Parallel fold/reduce over N zipped containers (variadic).
ThreadPool & default_pool()
Global default thread pool.
T pzip_foldl(ThreadPool &pool, const Container1 &c1, const Container2 &c2, T init, Op op, size_t chunk_size=0)
Parallel zip + fold.
void pzip_for_each(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Op op, size_t chunk_size=0)
Parallel zip + for_each.
size_t size(Node *root) noexcept
ThreadPool & parallel_default_pool()
Global default pool for parallel operations.
auto penumerate_maps(ThreadPool &pool, const Container &c, Op op, size_t chunk_size=0)
Parallel enumerate with map.
void penumerate_for_each(ThreadPool &pool, Container &c, Op op, size_t chunk_size=0)
Parallel for_each with index (enumerate).
bool pzip_all_n(ThreadPool &pool, Pred pred, const Containers &... cs)
Parallel all predicate over N zipped containers (variadic).
size_t pcount_if(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel count_if operation.
std::optional< size_t > pfind(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel find operation (returns index).
bool pzip_exists_n(ThreadPool &pool, Pred pred, const Containers &... cs)
Parallel exists predicate over N zipped containers (variadic).
std::decay_t< typename HeadC::Item_Type > T
void psort(ThreadPool &pool, Container &c, Compare cmp=Compare{}, const size_t min_parallel_size=1024)
Parallel sort (in-place).
void pfor_each(ThreadPool &pool, Container &c, Op op, size_t chunk_size=0)
Parallel for_each operation.
auto pfilter(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel filter operation.
auto ppartition(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel partition (stable).
T pproduct(ThreadPool &pool, const Container &c, T init=T{1}, size_t chunk_size=0)
Parallel product of elements.
auto pmax(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel maximum element.
void pzip_for_each_n(ThreadPool &pool, Op op, const Containers &... cs)
Parallel for_each over N zipped containers (variadic).
auto pzip_maps_n(ThreadPool &pool, Op op, const Containers &... cs)
Parallel map over N zipped containers (variadic).
T pfoldl(ThreadPool &pool, const Container &c, T init, BinaryOp op, size_t chunk_size=0)
Parallel left fold (reduce).
auto pfind_value(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel find with value return.
auto pzip_maps(ThreadPool &pool, const Container1 &c1, const Container2 &c2, Op op, size_t chunk_size=0)
Parallel zip + map.
T psum(ThreadPool &pool, const Container &c, T init=T{}, size_t chunk_size=0)
Parallel sum of elements.
auto pminmax(ThreadPool &pool, const Container &c, size_t chunk_size=0)
Parallel min and max elements.
size_t pzip_count_if_n(ThreadPool &pool, Pred pred, const Containers &... cs)
Parallel count over N zipped containers (variadic).
bool pexists(ThreadPool &pool, const Container &c, Pred pred, size_t chunk_size=0)
Parallel exists predicate (short-circuit).
DynList< T > maps(const C &c, Op op)
Classic map operation.
Itor::difference_type count(const Itor &beg, const Itor &end, const T &value)
Count elements equal to a value.
Holder for converted containers (either pointer or unique_ptr to vector).
size_t cached_size
Cached size for O(1) access.
std::decay_t< decltype(*std::begin(std::declval< Container & >()))> value_type
std::conditional_t< parallel_detail::has_random_access< Container >(), const Container *, std::unique_ptr< std::vector< value_type > > > holder_type
ContainerHolder(const Container &c)
decltype(auto) get() const
size_t size() const noexcept
Size is always O(1) - either from random access or from cached vector size.
A modern, efficient thread pool for parallel task execution.