Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
concurrency_utils_test.cc
Go to the documentation of this file.
1/*
2 Aleph_w
3
4 Data structures & Algorithms
5 version 2.0.0b
6 https://github.com/lrleon/Aleph-w
7
8 This file is part of Aleph-w library
9
10 Copyright (c) 2002-2026 Leandro Rabindranath Leon
11
12 Permission is hereby granted, free of charge, to any person obtaining a copy
13 of this software and associated documentation files (the "Software"), to deal
14 in the Software without restriction, including without limitation the rights
15 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 copies of the Software, and to permit persons to whom the Software is
17 furnished to do so, subject to the following conditions:
18
19 The above copyright notice and this permission notice shall be included in all
20 copies or substantial portions of the Software.
21
22 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 SOFTWARE.
29*/
30
31#include <gtest/gtest.h>
32
33#include <concurrency_utils.H>
34#include <thread_pool.H>
35
36#include <algorithm>
37#include <atomic>
38#include <chrono>
39#include <future>
40#include <memory>
41#include <numeric>
42#include <stdexcept>
43#include <thread>
44#include <vector>
45
46using namespace Aleph;
47using namespace std::chrono_literals;
48
49namespace
50{
51 template <typename Sync>
53 requires(Sync sync)
54 {
55 sync.with_lock([](auto & value) { return value; });
56 };
57
58 template <typename Sync>
60 requires(Sync sync)
61 {
62 sync.with_read_lock([](const auto & value) { return value; });
63 };
64
65 template <typename Sync>
67 requires(Sync sync)
68 {
69 sync.with_write_lock([](auto & value) { return value; });
70 };
71
72 struct TokenPayload
73 {
75 int value;
76
77 TokenPayload(CancellationToken token_arg, int value_arg)
78 : token(std::move(token_arg)), value(value_arg) {}
79 };
80
81 struct ExceptionSafePayload
82 {
83 static inline bool throw_on_move_construction = false;
84 static inline bool throw_on_move_assignment = false;
85
86 int value = 0;
87
88 ExceptionSafePayload() = default;
89
90 explicit ExceptionSafePayload(int value_arg)
91 : value(value_arg) {}
92
93 ExceptionSafePayload(const ExceptionSafePayload &) = default;
94 ExceptionSafePayload & operator = (const ExceptionSafePayload &) = default;
95
96 ExceptionSafePayload(ExceptionSafePayload && other)
97 : value(other.value)
98 {
99 other.value = -1;
100 if (throw_on_move_construction)
101 throw std::runtime_error("move construction failed");
102 }
103
104 ExceptionSafePayload & operator = (ExceptionSafePayload && other)
105 {
106 value = other.value;
107 other.value = -1;
108 if (throw_on_move_assignment)
109 throw std::runtime_error("move assignment failed");
110 return *this;
111 }
112
113 static void reset_failures() noexcept
114 {
115 throw_on_move_construction = false;
116 throw_on_move_assignment = false;
117 }
118 };
119}
120
124
126{
128
129 ASSERT_TRUE(ch.send(10));
130 ASSERT_TRUE(ch.send(20));
131 ch.close();
132
133 auto first = ch.recv();
134 auto second = ch.recv();
135 auto third = ch.recv();
136
137 ASSERT_TRUE(first.has_value());
138 ASSERT_TRUE(second.has_value());
139 EXPECT_EQ(*first, 10);
140 EXPECT_EQ(*second, 20);
141 EXPECT_FALSE(third.has_value());
142 EXPECT_FALSE(ch.send(30));
143}
144
146{
148
149 EXPECT_TRUE(ch.try_send(7));
150 EXPECT_FALSE(ch.try_send(8));
151
152 auto first = ch.try_recv();
153 auto second = ch.try_recv();
154
155 ASSERT_TRUE(first.has_value());
156 EXPECT_EQ(*first, 7);
157 EXPECT_FALSE(second.has_value());
158}
159
161{
163 ASSERT_TRUE(full_ch.send(1));
164
165 std::atomic<bool> sender_entered{false};
166 std::promise<bool> sender_result;
167 auto sender_future = sender_result.get_future();
168 std::thread sender([&] {
169 sender_entered.store(true, std::memory_order_release);
170 sender_result.set_value(full_ch.send(2));
171 });
172
173 while (not sender_entered.load(std::memory_order_acquire))
174 std::this_thread::yield();
175 full_ch.close();
177 sender.join();
178
180 std::atomic<bool> receiver_entered{false};
181 std::promise<std::optional<int>> receiver_result;
182 auto receiver_future = receiver_result.get_future();
183 std::thread receiver([&] {
184 receiver_entered.store(true, std::memory_order_release);
185 receiver_result.set_value(empty_ch.recv());
186 });
187
188 while (not receiver_entered.load(std::memory_order_acquire))
189 std::this_thread::yield();
190 empty_ch.close();
191 EXPECT_FALSE(receiver_future.get().has_value());
192 receiver.join();
193}
194
196{
198 std::vector<int> received;
199 received.reserve(2000);
200 std::atomic<bool> producer_ok{true};
201
202 std::thread producer([&] {
203 for (int i = 0; i < 2000; ++i)
204 if (not ch.send(i))
205 producer_ok.store(false, std::memory_order_relaxed);
206 ch.close();
207 });
208
209 std::thread consumer([&] {
210 while (auto item = ch.recv())
211 received.push_back(*item);
212 });
213
214 producer.join();
215 consumer.join();
216
217 EXPECT_TRUE(producer_ok.load(std::memory_order_relaxed));
218 ASSERT_EQ(received.size(), 2000u);
219 for (size_t i = 0; i < received.size(); ++i)
220 EXPECT_EQ(received[i], static_cast<int>(i));
221}
222
224{
226
227 ASSERT_TRUE(ch.send(std::make_unique<int>(42)));
228 auto item = ch.recv();
229
230 ASSERT_TRUE(item.has_value());
231 ASSERT_TRUE(item->get() != nullptr);
232 EXPECT_EQ(**item, 42);
233}
234
236{
238 CancellationSource source;
239 ASSERT_TRUE(ch.send(1));
240
241 std::atomic<bool> started{false};
242 std::promise<bool> canceled;
243 auto future = canceled.get_future();
244
245 std::thread sender([&] {
246 started.store(true, std::memory_order_release);
247 try
248 {
249 (void) ch.send(2, source.token());
250 canceled.set_value(false);
251 }
252 catch (const operation_canceled &)
253 {
254 canceled.set_value(true);
255 }
256 });
257
258 while (not started.load(std::memory_order_acquire))
259 std::this_thread::yield();
260 std::this_thread::sleep_for(5ms);
261 source.request_cancel();
262
263 EXPECT_TRUE(future.get());
264 sender.join();
265}
266
268{
270 CancellationSource source;
271
272 std::atomic<bool> started{false};
273 std::promise<bool> canceled;
274 auto future = canceled.get_future();
275
276 std::thread receiver([&] {
277 started.store(true, std::memory_order_release);
278 try
279 {
280 (void) ch.recv(source.token());
281 canceled.set_value(false);
282 }
283 catch (const operation_canceled &)
284 {
285 canceled.set_value(true);
286 }
287 });
288
289 while (not started.load(std::memory_order_acquire))
290 std::this_thread::yield();
291 std::this_thread::sleep_for(5ms);
292 source.request_cancel();
293
294 EXPECT_TRUE(future.get());
295 receiver.join();
296}
297
299{
301 CancellationSource source;
302
303 ASSERT_TRUE(ch.send(11));
304 ch.close();
305 source.request_cancel();
306
307 auto first = ch.recv(source.token());
308 auto second = ch.recv(source.token());
309
310 ASSERT_TRUE(first.has_value());
311 EXPECT_EQ(*first, 11);
312 EXPECT_FALSE(second.has_value());
313}
314
316{
318 CancellationSource source;
319 ASSERT_TRUE(ch.send(1));
320
321 std::atomic<bool> started{false};
322 std::promise<bool> canceled;
323 auto future = canceled.get_future();
324
325 std::thread producer([&] {
326 started.store(true, std::memory_order_release);
327 try
328 {
329 (void) ch.emplace(source.token(), 2);
330 canceled.set_value(false);
331 }
332 catch (const operation_canceled &)
333 {
334 canceled.set_value(true);
335 }
336 });
337
338 while (not started.load(std::memory_order_acquire))
339 std::this_thread::yield();
340 std::this_thread::sleep_for(5ms);
341 source.request_cancel();
342
343 EXPECT_TRUE(future.get());
344 producer.join();
345}
346
348{
350 CancellationSource source;
351 auto token = source.token();
352
353 ASSERT_TRUE(ch.emplace(token, 42));
354
355 auto received = ch.recv();
356 ASSERT_TRUE(received.has_value());
357 EXPECT_EQ(received->value, 42);
358 EXPECT_FALSE(received->token.stop_requested());
359
360 source.request_cancel();
361
362 EXPECT_TRUE(received->token.stop_requested());
363}
364
366{
367 ExceptionSafePayload::reset_failures();
368
370 ASSERT_TRUE(ch.send(ExceptionSafePayload(23)));
371
372 ExceptionSafePayload::throw_on_move_construction = true;
373 auto received = ch.recv();
374 ExceptionSafePayload::reset_failures();
375
376 ASSERT_TRUE(received.has_value());
377 EXPECT_EQ(received->value, 23);
378}
379
381{
382 ExceptionSafePayload::reset_failures();
383
385 ASSERT_TRUE(ch.send(ExceptionSafePayload(17)));
386
387 ExceptionSafePayload out;
388 ExceptionSafePayload::throw_on_move_assignment = true;
389 EXPECT_THROW((void) ch.try_recv(out), std::runtime_error);
390 ExceptionSafePayload::reset_failures();
391
392 auto recovered = ch.recv();
393 ASSERT_TRUE(recovered.has_value());
394 EXPECT_EQ(recovered->value, 17);
395}
396
398{
399 synchronized<std::vector<int>> values(std::in_place);
400
401 values.with_lock([](auto & v) {
402 v.push_back(1);
403 v.push_back(2);
404 });
405
406 {
407 auto locked = values.lock();
408 locked->push_back(3);
409 }
410
411 const auto copy = values.with_lock([](const auto & v) {
412 return v;
413 });
414
415 EXPECT_EQ(copy, (std::vector<int>{1, 2, 3}));
416}
417
419{
420 ThreadPool pool(4);
421 TaskGroup group(pool);
423
424 for (int worker = 0; worker < 8; ++worker)
425 group.launch([&] {
426 for (int i = 0; i < 2000; ++i)
427 counter.with_lock([](int & value) { ++value; });
428 });
429
430 group.wait();
431
432 EXPECT_EQ(counter.with_lock([](const int & value) { return value; }), 16000);
433}
434
436{
437 rw_synchronized<std::vector<int>> values(std::in_place);
438
439 values.with_write_lock([](auto & v) {
440 v.push_back(4);
441 v.push_back(5);
442 });
443
444 {
445 auto write_locked = values.write();
446 write_locked->push_back(6);
447 }
448
449 {
450 auto read_locked = values.read();
451 ASSERT_EQ(read_locked->size(), 3u);
452 EXPECT_EQ((*read_locked)[0], 4);
453 EXPECT_EQ((*read_locked)[2], 6);
454 }
455
456 EXPECT_EQ(values.with_read_lock([](const auto & v) {
457 return std::accumulate(v.begin(), v.end(), 0);
458 }), 15);
459}
460
462{
463 struct Payload
464 {
465 explicit Payload(int x) : value(x) {}
466 int value;
467 };
468
469 synchronized<Payload> payload(std::in_place, 7);
470 payload.with_lock([](Payload & p) { p.value += 5; });
471 EXPECT_EQ(payload.with_lock([](const Payload & p) { return p.value; }), 12);
472}
473
475{
476 struct Payload
477 {
478 explicit Payload(int x) : value(x) {}
479 int value;
480 };
481
482 rw_synchronized<Payload> payload(std::in_place, 9);
483 payload.with_write_lock([](Payload & p) { p.value *= 2; });
484 EXPECT_EQ(payload.with_read_lock([](const Payload & p) { return p.value; }), 18);
485}
486
488{
489 spsc_queue<int> queue(3);
490
491 EXPECT_TRUE(queue.try_push(1));
492 EXPECT_TRUE(queue.try_push(2));
493 EXPECT_TRUE(queue.try_push(3));
494 EXPECT_FALSE(queue.try_push(4));
495 EXPECT_TRUE(queue.full());
496
497 auto first = queue.try_pop();
498 auto second = queue.try_pop();
499 auto third = queue.try_pop();
500 auto fourth = queue.try_pop();
501
502 ASSERT_TRUE(first.has_value());
503 ASSERT_TRUE(second.has_value());
504 ASSERT_TRUE(third.has_value());
505 EXPECT_EQ(*first, 1);
506 EXPECT_EQ(*second, 2);
507 EXPECT_EQ(*third, 3);
508 EXPECT_FALSE(fourth.has_value());
509 EXPECT_TRUE(queue.empty());
510}
511
513{
514 spsc_queue<int> queue(64);
515 std::vector<int> received;
516 received.reserve(5000);
517
518 std::thread producer([&] {
519 for (int i = 0; i < 5000; ++i)
520 while (not queue.try_push(i))
521 std::this_thread::yield();
522 });
523
524 std::thread consumer([&] {
525 while (received.size() < 5000)
526 {
527 auto item = queue.try_pop();
528 if (item.has_value())
529 received.push_back(*item);
530 else
531 std::this_thread::yield();
532 }
533 });
534
535 producer.join();
536 consumer.join();
537
538 ASSERT_EQ(received.size(), 5000u);
539 for (size_t i = 0; i < received.size(); ++i)
540 EXPECT_EQ(received[i], static_cast<int>(i));
541}
542
544{
546
547 ASSERT_TRUE(queue.try_push(std::make_unique<int>(42)));
548 auto item = queue.try_pop();
549
550 ASSERT_TRUE(item.has_value());
551 ASSERT_TRUE(item->get() != nullptr);
552 EXPECT_EQ(**item, 42);
553}
554
556{
557 ExceptionSafePayload::reset_failures();
558
560 ASSERT_TRUE(queue.try_push(ExceptionSafePayload(31)));
561
562 ExceptionSafePayload::throw_on_move_construction = true;
563 auto item = queue.try_pop();
564 ExceptionSafePayload::reset_failures();
565
566 ASSERT_TRUE(item.has_value());
567 EXPECT_EQ(item->value, 31);
568}
Bounded blocking channel for producer-consumer workflows.
Cooperative cancellation source paired with CancellationToken.
CancellationToken token() const noexcept
Return a token observing this source.
void request_cancel() noexcept
Request cancellation for all derived tokens.
Read-only cooperative cancellation token.
Read/write-lock protected shared object wrapper.
decltype(auto) with_read_lock(F &&f) const
Execute a callback with shared (read-only) access.
ReadLockedPtr read() const
Acquire a shared lock and return a guard.
WriteLockedPtr write()
Acquire an exclusive lock and return a guard.
decltype(auto) with_write_lock(F &&f)
Execute a callback with exclusive (write) access.
Bounded single-producer/single-consumer queue.
bool try_pop(T &out)
Attempt to pop an item from the queue into out.
bool empty() const noexcept
Check if the queue is empty.
bool full() const noexcept
Check if the queue is full.
bool try_push(const T &value)
Attempt to push an item (copy) into the queue.
Mutex-protected shared object wrapper.
decltype(auto) with_lock(F &&f)
Execute a callback with exclusive access to the value.
LockedPtr lock()
Acquire an exclusive lock and return a guard.
Minimal structured-concurrency helper over ThreadPool futures.
A reusable thread pool for efficient parallel task execution.
Exception thrown when cooperative cancellation is observed.
Modern synchronization helpers for channels, shared state, and small producer-consumer queues.
#define TEST(name)
Main namespace for Aleph-w library functions.
Definition ah-arena.H:89
Itor2 copy(Itor1 sourceBeg, const Itor1 &sourceEnd, Itor2 destBeg)
Copy elements from one range to another.
Definition ahAlgo.H:584
Divide_Conquer_DP_Result< Cost > divide_and_conquer_partition_dp(const size_t groups, const size_t n, Transition_Cost_Fn transition_cost, const Cost inf=dp_optimization_detail::default_inf< Cost >())
Optimize partition DP using divide-and-conquer optimization.
STL namespace.
static long counter
Definition test-splice.C:35
A modern, efficient thread pool for parallel task execution.