31#include <gtest/gtest.h>
47using namespace std::chrono_literals;
51 template <
typename Sync>
55 sync.with_lock([](
auto & value) {
return value; });
58 template <
typename Sync>
62 sync.with_read_lock([](
const auto & value) {
return value; });
65 template <
typename Sync>
69 sync.with_write_lock([](
auto & value) {
return value; });
81 struct ExceptionSafePayload
83 static inline bool throw_on_move_construction =
false;
84 static inline bool throw_on_move_assignment =
false;
88 ExceptionSafePayload() =
default;
90 explicit ExceptionSafePayload(
int value_arg)
93 ExceptionSafePayload(
const ExceptionSafePayload &) =
default;
94 ExceptionSafePayload & operator = (
const ExceptionSafePayload &) =
default;
96 ExceptionSafePayload(ExceptionSafePayload &&
other)
100 if (throw_on_move_construction)
101 throw std::runtime_error(
"move construction failed");
104 ExceptionSafePayload & operator = (ExceptionSafePayload &&
other)
108 if (throw_on_move_assignment)
109 throw std::runtime_error(
"move assignment failed");
115 throw_on_move_construction =
false;
116 throw_on_move_assignment =
false;
133 auto first =
ch.recv();
134 auto second =
ch.recv();
135 auto third =
ch.recv();
152 auto first =
ch.try_recv();
153 auto second =
ch.try_recv();
174 std::this_thread::yield();
189 std::this_thread::yield();
203 for (
int i = 0; i < 2000; ++i)
205 producer_ok.store(
false, std::memory_order_relaxed);
210 while (
auto item =
ch.recv())
219 for (
size_t i = 0; i <
received.size(); ++i)
228 auto item =
ch.recv();
241 std::atomic<bool>
started{
false};
246 started.store(
true, std::memory_order_release);
258 while (
not started.load(std::memory_order_acquire))
259 std::this_thread::yield();
260 std::this_thread::sleep_for(5
ms);
272 std::atomic<bool>
started{
false};
277 started.store(
true, std::memory_order_release);
289 while (
not started.load(std::memory_order_acquire))
290 std::this_thread::yield();
291 std::this_thread::sleep_for(5
ms);
307 auto first =
ch.recv(source.
token());
308 auto second =
ch.recv(source.
token());
321 std::atomic<bool>
started{
false};
326 started.store(
true, std::memory_order_release);
338 while (
not started.load(std::memory_order_acquire))
339 std::this_thread::yield();
340 std::this_thread::sleep_for(5
ms);
351 auto token = source.
token();
367 ExceptionSafePayload::reset_failures();
372 ExceptionSafePayload::throw_on_move_construction =
true;
374 ExceptionSafePayload::reset_failures();
382 ExceptionSafePayload::reset_failures();
387 ExceptionSafePayload
out;
388 ExceptionSafePayload::throw_on_move_assignment =
true;
390 ExceptionSafePayload::reset_failures();
426 for (int i = 0; i < 2000; ++i)
427 counter.with_lock([](int & value) { ++value; });
432 EXPECT_EQ(
counter.with_lock([](
const int & value) { return value; }), 16000);
457 return std::accumulate(v.begin(), v.end(), 0);
465 explicit Payload(
int x) : value(x) {}
478 explicit Payload(
int x) : value(x) {}
519 for (
int i = 0; i < 5000; ++i)
521 std::this_thread::yield();
527 auto item = queue.try_pop();
528 if (item.has_value())
529 received.push_back(*item);
531 std::this_thread::yield();
539 for (
size_t i = 0; i <
received.size(); ++i)
557 ExceptionSafePayload::reset_failures();
562 ExceptionSafePayload::throw_on_move_construction =
true;
564 ExceptionSafePayload::reset_failures();
Bounded blocking channel for producer-consumer workflows.
Cooperative cancellation source paired with CancellationToken.
CancellationToken token() const noexcept
Return a token observing this source.
void request_cancel() noexcept
Request cancellation for all derived tokens.
Read-only cooperative cancellation token.
Read/write-lock protected shared object wrapper.
decltype(auto) with_read_lock(F &&f) const
Execute a callback with shared (read-only) access.
ReadLockedPtr read() const
Acquire a shared lock and return a guard.
WriteLockedPtr write()
Acquire an exclusive lock and return a guard.
decltype(auto) with_write_lock(F &&f)
Execute a callback with exclusive (write) access.
Bounded single-producer/single-consumer queue.
bool try_pop(T &out)
Attempt to pop an item from the queue into out.
bool empty() const noexcept
Check if the queue is empty.
bool full() const noexcept
Check if the queue is full.
bool try_push(const T &value)
Attempt to push an item (copy) into the queue.
Mutex-protected shared object wrapper.
decltype(auto) with_lock(F &&f)
Execute a callback with exclusive access to the value.
LockedPtr lock()
Acquire an exclusive lock and return a guard.
Minimal structured-concurrency helper over ThreadPool futures.
A reusable thread pool for efficient parallel task execution.
Exception thrown when cooperative cancellation is observed.
Modern synchronization helpers for channels, shared state, and small producer-consumer queues.
Main namespace for Aleph-w library functions.
Itor2 copy(Itor1 sourceBeg, const Itor1 &sourceEnd, Itor2 destBeg)
Copy elements from one range to another.
Divide_Conquer_DP_Result< Cost > divide_and_conquer_partition_dp(const size_t groups, const size_t n, Transition_Cost_Fn transition_cost, const Cost inf=dp_optimization_detail::default_inf< Cost >())
Optimize partition DP using divide-and-conquer optimization.
A modern, efficient thread pool for parallel task execution.