Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
timeoutQueue_test.cc
Go to the documentation of this file.
1
11# include <gtest/gtest.h>
12# include <stdexcept>
13# include <atomic>
14# include <chrono>
15# include <thread>
16# include <vector>
17# include <mutex>
18# include <condition_variable>
19# include <timeoutQueue.H>
20
21using namespace std;
22
23// Global queue (singleton)
24static TimeoutQueue* g_queue = nullptr;
25
26// Helper to get the current time plus milliseconds
28{
30}
31
32// Test event that tracks execution
34{
35public:
36 atomic<bool> executed{false};
37 atomic<int> execution_count{0};
39
40 TestEvent(const Time& t) : Event(t) {}
41 TestEvent(const Time& t, function<void()> cb) : Event(t), callback(move(cb)) {}
42
43 void EventFct() override
44 {
45 executed = true;
47 if (callback) callback();
48 }
49};
50
51// Test event that signals a condition variable
53{
54public:
55 mutex& mtx;
57 bool& flag;
58
59 SignalingEvent(const Time& t, mutex& m, condition_variable& c, bool& f)
60 : Event(t), mtx(m), cv(c), flag(f) {}
61
62 void EventFct() override
63 {
65 flag = true;
66 cv.notify_all();
67 }
68};
69
70// Test event that records execution time
72{
73public:
74 chrono::steady_clock::time_point scheduled_at;
75 chrono::steady_clock::time_point executed_at;
76 atomic<bool> executed{false};
77
78 TimingEvent(const Time& t) : Event(t), scheduled_at(chrono::steady_clock::now()) {}
79
80 void EventFct() override
81 {
82 executed_at = chrono::steady_clock::now();
83 executed = true;
84 }
85
86 [[nodiscard]] int elapsed_ms() const
87 {
88 return chrono::duration_cast<chrono::milliseconds>(
89 executed_at - scheduled_at).count();
90 }
91};
92
93// Test event that can reschedule itself
95{
96public:
100 atomic<int> execution_count{0};
101
104
105 void EventFct() override
106 {
109 {
112 }
113 }
114};
115
116// =============================================================================
117// Test Environment - manages the singleton TimeoutQueue
118// =============================================================================
119
120class TimeoutQueueEnvironment : public ::testing::Environment
121{
122public:
123 void SetUp() override
124 {
125 g_queue = new TimeoutQueue();
126 }
127
128 void TearDown() override
129 {
130 if (g_queue)
131 {
132 g_queue->shutdown();
133 this_thread::sleep_for(chrono::milliseconds(100));
134 delete g_queue;
135 g_queue = nullptr;
136 }
137 }
138};
139
140// =============================================================================
141// Basic Functionality Tests
142// =============================================================================
143
145{
146 mutex mtx;
148 bool executed = false;
149
150 bool completed = false;
151 bool callback_done = false;
152
153 auto* event = new SignalingEvent(time_from_now_ms(100), mtx, cv, executed);
154 event->set_completion_callback([&](TimeoutQueue::Event*, TimeoutQueue::Event::Execution_Status status) {
155 lock_guard<mutex> lock(mtx);
157 callback_done = true;
158 cv.notify_all();
159 });
161
162 unique_lock<mutex> lock(mtx);
163 ASSERT_TRUE(cv.wait_for(lock, chrono::milliseconds(500),
164 [&]{ return callback_done; }));
165 EXPECT_TRUE(executed);
167
168 delete event;
169}
170
172{
173 auto* event = new TestEvent(time_from_now_ms(50));
174
175 EXPECT_EQ(event->get_execution_status(), TimeoutQueue::Event::Out_Queue);
176
178 EXPECT_EQ(event->get_execution_status(), TimeoutQueue::Event::In_Queue);
179
180 this_thread::sleep_for(chrono::milliseconds(200));
181 EXPECT_TRUE(event->executed);
182
183 delete event;
184}
185
187{
188 auto* event = new TestEvent(time_from_now_ms(1000)); // Will be overridden
190
192
193 this_thread::sleep_for(chrono::milliseconds(200));
194 EXPECT_TRUE(event->executed);
195
196 delete event;
197}
198
200{
201 Time t = time_from_now_ms(100);
202 auto* event = new TestEvent(t);
203
204 Time event_time = event->getAbsoluteTime();
205 EXPECT_EQ(event_time.tv_sec, t.tv_sec);
206 EXPECT_EQ(event_time.tv_nsec, t.tv_nsec);
207
208 delete event;
209}
210
211// =============================================================================
212// Cancellation Tests
213// =============================================================================
214
216{
217 auto* event = new TestEvent(time_from_now_ms(500));
218
220 EXPECT_EQ(event->get_execution_status(), TimeoutQueue::Event::In_Queue);
221
224 EXPECT_EQ(event->get_execution_status(), TimeoutQueue::Event::Canceled);
225
226 this_thread::sleep_for(chrono::milliseconds(100));
227 EXPECT_FALSE(event->executed);
228
229 delete event;
230}
231
233{
234 auto* event = new TestEvent(time_from_now_ms(100));
235
236 // Now throws exception if event is not in registry
237 EXPECT_THROW(g_queue->cancel_event(event), std::invalid_argument);
238
239 delete event;
240}
241
251
258
259// =============================================================================
260// Rescheduling Tests
261// =============================================================================
262
264{
265 auto* event = new TimingEvent(time_from_now_ms(500));
266
268 this_thread::sleep_for(chrono::milliseconds(50));
269
271
272 this_thread::sleep_for(chrono::milliseconds(200));
273 EXPECT_TRUE(event->executed);
274 EXPECT_LT(event->elapsed_ms(), 300);
275
276 delete event;
277}
278
280{
281 auto* event = new TestEvent(time_from_now_ms(100));
282
283 // Now throws exception if event is not in registry
284 EXPECT_THROW(g_queue->reschedule_event(time_from_now_ms(50), event), std::invalid_argument);
285
286 delete event;
287}
288
290{
291 auto* event = new ReschedulingEvent(time_from_now_ms(50), g_queue, 2);
292
294
295 this_thread::sleep_for(chrono::milliseconds(400));
296 EXPECT_EQ(event->execution_count, 3);
297
298 delete event;
299}
300
301// =============================================================================
302// Multiple Events Tests
303// =============================================================================
304
306{
307 vector<int> execution_order;
308 mutex order_mutex;
309
310 auto make_event = [&](int id, int delay_ms) {
311 return new TestEvent(time_from_now_ms(delay_ms), [&, id]() {
313 execution_order.push_back(id);
314 });
315 };
316
317 auto* e1 = make_event(1, 150);
318 auto* e2 = make_event(2, 50);
319 auto* e3 = make_event(3, 100);
320
324
325 this_thread::sleep_for(chrono::milliseconds(400));
326
327 {
333 }
334
335 delete e1;
336 delete e2;
337 delete e3;
338}
339
341{
342 const int num_events = 30;
344 atomic<int> total_executed{0};
345
346 for (int i = 0; i < num_events; ++i)
347 {
348 auto* e = new TestEvent(time_from_now_ms(50 + i * 10), [&]() {
350 });
351 events.push_back(e);
353 }
354
355 this_thread::sleep_for(chrono::milliseconds(600));
356
358
359 for (auto* e : events)
360 delete e;
361}
362
363// =============================================================================
364// Edge Cases and Error Handling
365// =============================================================================
366
368{
370 auto* event = new TestEvent(now);
371
373
374 this_thread::sleep_for(chrono::milliseconds(100));
375 EXPECT_TRUE(event->executed);
376
377 delete event;
378}
379
381{
382 auto* throwing_event = new TestEvent(time_from_now_ms(50), []() {
383 throw runtime_error("Test exception");
384 });
385
386 auto* normal_event = new TestEvent(time_from_now_ms(100));
387
390
391 this_thread::sleep_for(chrono::milliseconds(300));
392
393 EXPECT_TRUE(throwing_event->executed);
394 EXPECT_TRUE(normal_event->executed);
395
396 delete throwing_event;
397 delete normal_event;
398}
399
401{
402 auto* event = new TestEvent(time_from_now_ms(100));
403
404 event->set_for_deletion();
405 EXPECT_EQ(event->get_execution_status(), TimeoutQueue::Event::To_Delete);
406
407 delete event;
408}
409
410// =============================================================================
411// Timing Accuracy Tests
412// =============================================================================
413
415{
416 const int expected_delay = 100;
417 const int tolerance = 50;
418
419 auto* event = new TimingEvent(time_from_now_ms(expected_delay));
420
422
423 this_thread::sleep_for(chrono::milliseconds(expected_delay + 100));
424 ASSERT_TRUE(event->executed);
425
426 int actual_delay = event->elapsed_ms();
429
430 delete event;
431}
432
433// =============================================================================
434// Thread Safety Tests
435// =============================================================================
436
438{
439 atomic<int> executed_count{0};
440 const int num_threads = 4;
441 const int events_per_thread = 5;
442 vector<thread> threads;
444 mutex events_mutex;
445
446 for (int t = 0; t < num_threads; ++t)
447 {
448 threads.emplace_back([&, t]() {
449 for (int i = 0; i < events_per_thread; ++i)
450 {
451 auto* e = new TestEvent(time_from_now_ms(50 + i * 20), [&]() {
452 ++executed_count;
453 });
454 {
456 all_events.push_back(e);
457 }
459 }
460 });
461 }
462
463 for (auto& t : threads)
464 t.join();
465
466 this_thread::sleep_for(chrono::milliseconds(400));
467
468 EXPECT_EQ(executed_count, num_threads * events_per_thread);
469
470 for (auto* e : all_events)
471 delete e;
472}
473
475{
476 const int num_events = 10;
478 atomic<int> canceled_count{0};
479
480 for (int i = 0; i < num_events; ++i)
481 {
482 auto* e = new TestEvent(time_from_now_ms(500));
483 events.push_back(e);
485 }
486
487 vector<thread> threads;
488 for (int t = 0; t < 2; ++t)
489 {
490 threads.emplace_back([&, t]() {
491 for (size_t i = t; i < events.size(); i += 2)
492 {
493 try {
494 if (g_queue->cancel_event(events[i]))
495 ++canceled_count;
496 } catch (const std::invalid_argument&) {
497 // Ignore if already canceled/deleted
498 }
499 }
500 });
501 }
502
503 for (auto& t : threads)
504 t.join();
505
506 EXPECT_EQ(canceled_count, num_events);
507
508 for (auto* e : events)
509 delete e;
510}
511
512// =============================================================================
513// Utility Methods Tests (new features)
514// =============================================================================
515
517{
519 EXPECT_EQ(g_queue->size(), 0u);
520
521 auto* e1 = new TestEvent(time_from_now_ms(500));
522 auto* e2 = new TestEvent(time_from_now_ms(600));
523
526 EXPECT_EQ(g_queue->size(), 1u);
527
529 EXPECT_EQ(g_queue->size(), 2u);
530
532 EXPECT_EQ(g_queue->size(), 1u);
533
536
537 delete e1;
538 delete e2;
539}
540
545
547{
548 auto* event = new TimingEvent(time_from_now_ms(1000)); // Will be overridden
549
551
552 this_thread::sleep_for(chrono::milliseconds(250));
553 ASSERT_TRUE(event->executed);
554 EXPECT_LT(event->elapsed_ms(), 200);
555
556 delete event;
557}
558
560{
562 EXPECT_EQ(empty_time.tv_sec, 0);
563 EXPECT_EQ(empty_time.tv_nsec, 0);
564
565 auto* e1 = new TestEvent(time_from_now_ms(500));
566 auto* e2 = new TestEvent(time_from_now_ms(200));
567
570 EXPECT_EQ(t1.tv_sec, e1->getAbsoluteTime().tv_sec);
571
574 EXPECT_EQ(t2.tv_sec, e2->getAbsoluteTime().tv_sec); // e2 is sooner
575
578
579 delete e1;
580 delete e2;
581}
582
584{
585 auto* event = new TestEvent(time_from_now_ms(50));
586
588 this_thread::sleep_for(chrono::milliseconds(200));
589
590 EXPECT_TRUE(event->executed);
591 EXPECT_EQ(event->get_execution_status(), TimeoutQueue::Event::Executed);
592
593 delete event;
594}
595
597{
599 for (int i = 0; i < 5; ++i)
600 {
601 auto* e = new TestEvent(time_from_now_ms(500 + i * 100));
602 events.push_back(e);
604 }
605
606 EXPECT_EQ(g_queue->size(), 5u);
607
608 size_t cleared = g_queue->clear_all();
609 EXPECT_EQ(cleared, 5u);
611
612 for (auto* e : events)
613 {
614 EXPECT_EQ(e->get_execution_status(), TimeoutQueue::Event::Canceled);
615 delete e;
616 }
617}
618
620{
622
627
628 // Execute some events
629 auto* e1 = new TestEvent(time_from_now_ms(50));
630 auto* e2 = new TestEvent(time_from_now_ms(100));
633
634 this_thread::sleep_for(chrono::milliseconds(250));
635
637
638 // Cancel some events
639 auto* e3 = new TestEvent(time_from_now_ms(500));
640 auto* e4 = new TestEvent(time_from_now_ms(600));
643
646
648
649 delete e1;
650 delete e2;
651 delete e3;
652 delete e4;
653}
654
656{
657 // Ensure some stats exist
658 auto* e = new TestEvent(time_from_now_ms(50));
660 this_thread::sleep_for(chrono::milliseconds(150));
661
662 // Reset and verify
666
667 delete e;
668}
669
670// =============================================================================
671// New Features Tests
672// =============================================================================
673
675{
678
679 auto* e1 = new TestEvent(time_from_now_ms(100));
681
682 // Pause before event triggers
683 g_queue->pause();
685
686 // Wait past trigger time - event should NOT execute
687 this_thread::sleep_for(chrono::milliseconds(200));
688 EXPECT_FALSE(e1->executed);
689
690 // Resume - event should execute now
691 g_queue->resume();
693
694 this_thread::sleep_for(chrono::milliseconds(150));
695 EXPECT_TRUE(e1->executed);
696
697 delete e1;
698}
699
701{
702 auto* e1 = new TestEvent(time_from_now_ms(100));
703 auto* e2 = new TestEvent(time_from_now_ms(150));
704
707
709
710 // Wait for all events to complete
714 EXPECT_TRUE(e1->executed);
715 EXPECT_TRUE(e2->executed);
716
717 delete e1;
718 delete e2;
719}
720
722{
723 auto* e = new TestEvent(time_from_now_ms(500));
725
726 // Wait with short timeout - should timeout
730
731 // Cancel and cleanup
733 delete e;
734}
735
748
760
762{
763 // Create event with name
764 class NamedEvent : public TimeoutQueue::Event
765 {
766 public:
767 NamedEvent(const Time& t, const string& name) : Event(t, name) {}
768 void EventFct() override {}
769 };
770
771 auto* e = new NamedEvent(time_from_now_ms(100), "TestEventName");
772 EXPECT_EQ(e->get_name(), "TestEventName");
773
774 e->set_name("NewName");
775 EXPECT_EQ(e->get_name(), "NewName");
776
777 delete e;
778}
779
780TEST(TimeoutQueueTest, CompletionCallback)
781{
782 atomic<bool> callback_called{false};
783 atomic<int> final_status{-1};
784
785 auto* e = new TestEvent(time_from_now_ms(50));
786 e->set_completion_callback([&](TimeoutQueue::Event* ev, TimeoutQueue::Event::Execution_Status status) {
787 (void) ev;
788 callback_called = true;
789 final_status = static_cast<int>(status);
790 });
791
793 this_thread::sleep_for(chrono::milliseconds(200));
794
797
798 delete e;
799}
800
802{
803 atomic<bool> callback_called{false};
804 atomic<int> final_status{-1};
805
806 auto* e = new TestEvent(time_from_now_ms(500));
807 e->set_completion_callback([&](TimeoutQueue::Event*, TimeoutQueue::Event::Execution_Status status) {
808 callback_called = true;
809 final_status = static_cast<int>(status);
810 });
811
814
815 this_thread::sleep_for(chrono::milliseconds(50));
816
819
820 delete e;
821}
822
824{
825 auto* e1 = new TestEvent(time_from_now_ms(500));
826 auto* e2 = new TestEvent(time_from_now_ms(600));
827
828 // Each event should have a unique ID
831 EXPECT_NE(e1->get_id(), e2->get_id());
832
833 delete e1;
834 delete e2;
835}
836
838{
839 auto* e1 = new TestEvent(time_from_now_ms(500));
840 auto* e2 = new TestEvent(time_from_now_ms(600));
841
842 auto id1 = e1->get_id();
843 auto id2 = e2->get_id();
844
847
848 // Should find scheduled events
851
852 // Invalid ID should return nullptr
854 EXPECT_EQ(g_queue->find_by_id(999999), nullptr);
855
858
859 // After cancel, should not find
860 EXPECT_EQ(g_queue->find_by_id(id1), nullptr);
861
862 delete e1;
863 delete e2;
864}
865
867{
868 auto* e1 = new TestEvent(time_from_now_ms(500));
869 auto* e2 = new TestEvent(time_from_now_ms(600));
870
871 auto id1 = e1->get_id();
872 auto id2 = e2->get_id();
873
876
877 EXPECT_EQ(g_queue->size(), 2u);
878
879 // Cancel by ID
881 EXPECT_EQ(g_queue->size(), 1u);
882 EXPECT_EQ(e1->get_execution_status(), TimeoutQueue::Event::Canceled);
883
884 // Cancel same ID again should fail
886
887 // Cancel invalid ID should fail
890
891 // Cancel second event
894
895 delete e1;
896 delete e2;
897}
898
900{
901 atomic<bool> callback_called{false};
902 auto* e = new TestEvent(time_from_now_ms(500));
903 auto id = e->get_id();
904
905 e->set_completion_callback([&](TimeoutQueue::Event*, TimeoutQueue::Event::Execution_Status status) {
906 callback_called = true;
908 });
909
912
914
915 delete e;
916}
917
918// =============================================================================
919// Regression Tests for Bug Fixes
920// =============================================================================
921
923{
924 // Test that destructor auto-shutdowns if shutdown() wasn't called
925 // In Debug builds, this should print a warning to stderr
926 // In Release builds, it should silently auto-shutdown
927 testing::internal::CaptureStderr();
928
929 TimeoutQueue* queue = new TimeoutQueue();
930 auto* event = new TestEvent(time_from_now_ms(1000));
931 queue->schedule_event(event);
932
933 // Delete without calling shutdown() - should auto-shutdown
934 delete queue;
935
936 string output = testing::internal::GetCapturedStderr();
937# ifndef NDEBUG
938 // In debug builds, expect warning message
939 EXPECT_NE(output.find("WARNING"), string::npos);
940 EXPECT_NE(output.find("shutdown"), string::npos);
941# else
942 // In release builds, no warning should be printed
943 EXPECT_EQ(output.find("WARNING"), string::npos);
944# endif
945
946 delete event;
947}
948
950{
951 // Test that canceling an event before deletion is safe (no error)
952 testing::internal::CaptureStderr();
953
954 auto* event = new TestEvent(time_from_now_ms(1000));
956
957 EXPECT_EQ(event->get_execution_status(), TimeoutQueue::Event::In_Queue);
958
959 // Cancel first to remove from the queue, then delete
961
962 // Now it's safe to delete
963 delete event;
964
965 string output = testing::internal::GetCapturedStderr();
966 // Should not have warning/error since we canceled first
967 EXPECT_EQ(output, "");
968}
969
970// Death test: Verify that deleting an In_Queue event throws fatal error
971// Disabled under ThreadSanitizer: TSAN does not support fork() after threads
972// have been created, and ASSERT_DEATH uses fork().
973#if defined(__SANITIZE_THREAD__)
974 // GCC defines __SANITIZE_THREAD__ when compiled with -fsanitize=thread
975# define TSAN_ENABLED 1
976#endif
977
978#ifdef __clang__
979 // Clang uses __has_feature(thread_sanitizer)
980# if __has_feature(thread_sanitizer)
981# define TSAN_ENABLED 1
982# endif
983#endif
984
985#ifdef TSAN_ENABLED
987#else
989#endif
990{
991 // This test verifies fail-fast behavior to prevent use-after-free.
992 // When Event::~Event() aborts from destructor, std::terminate() is called.
994 TimeoutQueue queue;
995 auto* event = new TestEvent(time_from_now_ms(1000));
996 queue.schedule_event(event);
997 // Deleting without cancel should abort, causing process termination
998 delete event;
999 queue.shutdown();
1000 }, "In_Queue.*use-after-free");
1001}
1002
1004{
1005 // Regression test for getMin() bug: event canceled during wait_until
1006 // should not cause the next event to be lost
1008
1009 auto* e1 = new TestEvent(time_from_now_ms(100));
1010 auto* e2 = new TestEvent(time_from_now_ms(200));
1011
1014
1015 // Wait until just before e1 should fire, then cancel it
1016 this_thread::sleep_for(chrono::milliseconds(90));
1019
1020 // e2 should still execute (not be lost due to getMin() bug)
1021 this_thread::sleep_for(chrono::milliseconds(200));
1022 EXPECT_FALSE(e1->executed);
1023 EXPECT_TRUE(e2->executed);
1024
1027
1028 delete e1;
1029 delete e2;
1030}
1031
1033{
1034 // Similar test: reschedule top event while worker is waiting for it
1035 auto* e1 = new TestEvent(time_from_now_ms(100));
1036 auto* e2 = new TestEvent(time_from_now_ms(300));
1037
1040
1041 // Reschedule e1 to much later
1042 this_thread::sleep_for(chrono::milliseconds(50));
1044
1045 // e2 should execute at its original time
1046 this_thread::sleep_for(chrono::milliseconds(350));
1047 EXPECT_TRUE(e2->executed);
1048 EXPECT_FALSE(e1->executed);
1049
1050 // e1 should execute later
1051 this_thread::sleep_for(chrono::milliseconds(300));
1052 EXPECT_TRUE(e1->executed);
1053
1054 delete e1;
1055 delete e2;
1056}
1057
1059{
1060 // Test that passing nullptr throws exception (not assertion failure in Release)
1061 Time t = time_from_now_ms(100);
1062
1063 EXPECT_THROW(g_queue->schedule_event(nullptr), std::invalid_argument);
1064 EXPECT_THROW(g_queue->schedule_event(t, nullptr), std::invalid_argument);
1065 EXPECT_THROW(g_queue->reschedule_event(t, nullptr), std::invalid_argument);
1066}
1067
1069{
1070 // The implementation enforces tv_nsec bounds with assertions inside Event::set_trigger_time()
1071 // and with ah_domain_error_if inside schedule_event(Event*).
1072 // In Debug builds, the assert triggers first (process death).
1073 // In Release builds, assertions are disabled but ah_domain_error_if throws std::domain_error.
1074# ifndef NDEBUG
1076 {
1077 TimeoutQueue q;
1078 auto* e = new TestEvent(time_from_now_ms(1000));
1080 bad.tv_nsec = 2000000000; // Invalid: >= 1e9
1081 q.schedule_event(bad, e);
1082 },
1083 "");
1084# else
1085 TimeoutQueue q;
1086 auto* e = new TestEvent(time_from_now_ms(1000));
1088 bad.tv_nsec = 2000000000; // Invalid: >= 1e9
1089 EXPECT_THROW(q.schedule_event(bad, e), std::domain_error);
1090 delete e; // Clean up since schedule_event threw before taking ownership
1091 q.shutdown();
1092# endif
1093}
1094
1096{
1097 auto* e = new TestEvent(time_from_now_ms(500));
1099 EXPECT_THROW(g_queue->schedule_event(e), std::invalid_argument);
1101 delete e;
1102}
1103
1105{
1106 TimeoutQueue q;
1107
1108 std::mutex m;
1109 std::condition_variable cv;
1110 int callbacks = 0;
1111
1112 auto* e = new TestEvent(time_from_now_ms(1000));
1113 e->set_completion_callback([&](TimeoutQueue::Event* ev, TimeoutQueue::Event::Execution_Status st) {
1114 std::lock_guard<std::mutex> lk(m);
1115 EXPECT_EQ(ev->get_execution_status(), st);
1117 ++callbacks;
1118 cv.notify_all();
1119 });
1120
1121 q.schedule_event(e);
1122 q.shutdown();
1123
1124 std::unique_lock<std::mutex> lk(m);
1125 ASSERT_TRUE(cv.wait_for(lk, std::chrono::milliseconds(500), [&]{ return callbacks == 1; }));
1126
1128
1129 delete e;
1130}
1131
1133{
1134 // Test that cancel_delete_event invokes completion callback
1135 atomic<bool> callback_called{false};
1136 atomic<int> callback_status{-1};
1137
1139
1140 event->set_completion_callback([&](TimeoutQueue::Event* ev,
1142 EXPECT_EQ(ev, nullptr); // Event already destroyed when status is Deleted
1143 callback_called = true;
1144 callback_status = static_cast<int>(status);
1145 });
1146
1149
1152 EXPECT_EQ(event, nullptr);
1153}
1154
1156{
1157 // Test cancel_delete_event on an event that's currently Executing
1158 mutex mtx;
1160 atomic<bool> event_started{false};
1161 atomic<bool> can_finish{false};
1162
1163 class BlockingEvent : public TimeoutQueue::Event
1164 {
1165 public:
1166 atomic<bool>& started;
1167 atomic<bool>& finish_flag;
1168
1169 BlockingEvent(const Time& t, atomic<bool>& s, atomic<bool>& f)
1170 : Event(t), started(s), finish_flag(f) {}
1171
1172 void EventFct() override
1173 {
1174 started = true;
1175 while (!finish_flag)
1176 this_thread::sleep_for(chrono::milliseconds(10));
1177 }
1178 };
1179
1182
1183 atomic<bool> callback_called{false};
1184 atomic<TimeoutQueue::Event*> callback_ev{reinterpret_cast<TimeoutQueue::Event*>(0x1)}; // sentinel
1185 event->set_completion_callback([&](auto* ev, auto status) {
1186 callback_ev = ev;
1187 callback_called = true;
1188 EXPECT_EQ(ev, nullptr); // Event already destroyed when status is Deleted
1190 });
1191
1193
1194 // Wait for event to start executing
1195 while (!event_started)
1196 this_thread::sleep_for(chrono::milliseconds(10));
1197
1198 // Try to cancel_delete while it's executing
1199 // Should mark as To_Delete and worker will delete it
1201 EXPECT_EQ(event, nullptr);
1202
1203 // Let event finish
1204 can_finish = true;
1205 this_thread::sleep_for(chrono::milliseconds(100));
1206
1207 // Callback should have been called by worker thread
1209 EXPECT_EQ(callback_ev.load(), nullptr);
1210}
1211
1213{
1214 // Verify that Deleted status yields nullptr, while Executed/Canceled yield
1215 // a valid pointer.
1219
1220 // 1. Executed path: callback must receive non-null pointer
1221 auto* e1 = new TestEvent(time_from_now_ms(50));
1222 e1->set_completion_callback([&](TimeoutQueue::Event* ev, auto status) {
1224 EXPECT_NE(ev, nullptr);
1225 exec_ptr = ev;
1226 });
1228 this_thread::sleep_for(chrono::milliseconds(200));
1229 EXPECT_NE(exec_ptr.load(), nullptr);
1230 delete e1;
1231
1232 // 2. Canceled path (cancel_event): callback must receive non-null pointer
1233 auto* e2 = new TestEvent(time_from_now_ms(500));
1234 e2->set_completion_callback([&](TimeoutQueue::Event* ev, auto status) {
1236 EXPECT_NE(ev, nullptr);
1237 cancel_ptr = ev;
1238 });
1241 EXPECT_NE(cancel_ptr.load(), nullptr);
1242 delete e2;
1243
1244 // 3. Deleted path (cancel_delete_event): callback must receive nullptr
1246 e3->set_completion_callback([&](TimeoutQueue::Event* ev, auto status) {
1248 EXPECT_EQ(ev, nullptr);
1249 delete_ptr = ev;
1250 });
1253 EXPECT_EQ(delete_ptr.load(), nullptr);
1254 EXPECT_EQ(e3, nullptr);
1255}
1256
1258{
1259 // Test that completion callback is called AFTER status is set
1260 // (regression test for callback/status ordering inconsistency)
1261 atomic<bool> callback_called{false};
1264
1265 auto* event = new TestEvent(time_from_now_ms(50));
1266
1267 event->set_completion_callback([&](TimeoutQueue::Event* ev,
1269 // Status should already be set when callback is invoked
1270 status_in_callback = ev->get_execution_status();
1271 EXPECT_EQ(status, ev->get_execution_status());
1272 callback_called = true; // Must be last: main thread deletes after seeing this
1273 });
1274
1276
1277 // Spin until callback completes to avoid racing with delete
1278 while (!callback_called)
1279 this_thread::sleep_for(chrono::milliseconds(10));
1280
1282
1283 delete event;
1284}
1285
1287{
1288 atomic<bool> callback_called{false};
1289
1290 auto* e1 = new TestEvent(time_from_now_ms(50), [&]() { g_queue->clear_all(); });
1291 auto* e2 = new TestEvent(time_from_now_ms(200));
1292
1293 e1->set_completion_callback([&](auto*, auto) { callback_called = true; });
1294
1297
1298 this_thread::sleep_for(chrono::milliseconds(400));
1299
1301 EXPECT_TRUE(e1->executed);
1302 EXPECT_FALSE(e2->executed); // should have been canceled by clear_all()
1304
1305 delete e1;
1306 delete e2;
1307}
1308
1310{
1311 // Test that multiple events scheduled for the same time all execute
1312 const int num_events = 5;
1314 atomic<int> executed_count{0};
1315
1317
1318 for (int i = 0; i < num_events; ++i)
1319 {
1320 auto* e = new TestEvent(same_time, [&]() { ++executed_count; });
1321 events.push_back(e);
1323 }
1324
1325 this_thread::sleep_for(chrono::milliseconds(300));
1326
1327 EXPECT_EQ(executed_count, num_events);
1328
1329 for (auto* e : events)
1330 {
1331 EXPECT_TRUE(e->executed);
1332 delete e;
1333 }
1334}
1335
1336// =============================================================================
1337// Main
1338// =============================================================================
1339
1340int main(int argc, char **argv)
1341{
1342 ::testing::InitGoogleTest(&argc, argv);
1343 ::testing::AddGlobalTestEnvironment(new TimeoutQueueEnvironment());
1344 return RUN_ALL_TESTS();
1345}
Time time_plus_msec(const Time &current_time, const int &msec)
Definition ah-time.H:112
Time read_current_time()
Definition ah-time.H:103
struct timespec Time
Definition ah-time.H:50
int main()
size_t size() const noexcept
Count the number of elements of the list.
Definition htlist.H:1319
atomic< int > execution_count
void EventFct() override
Event handler function to be overridden.
ReschedulingEvent(const Time &t, TimeoutQueue *q, int max_resc)
condition_variable & cv
void EventFct() override
Event handler function to be overridden.
SignalingEvent(const Time &t, mutex &m, condition_variable &c, bool &f)
function< void()> callback
TestEvent(const Time &t)
void EventFct() override
Event handler function to be overridden.
TestEvent(const Time &t, function< void()> cb)
atomic< bool > executed
atomic< int > execution_count
Base class for scheduled events.
Event(const Time &t, std::string name="")
Execution_Status
Possible states of an event in its lifecycle.
@ Deleted
Memory freed.
@ Out_Queue
Not currently in any queue.
@ Canceled
Removed from queue before execution.
@ Executed
Completed execution.
@ In_Queue
Scheduled and waiting for trigger time.
@ To_Delete
Marked for cleanup.
void set_completion_callback(CompletionCallback cb)
Set the completion callback (called after EventFct completes or the event is canceled)
virtual void EventFct()=0
Event handler function to be overridden.
static constexpr EventId InvalidId
Invalid/null event ID.
Thread-safe priority queue for scheduling timed events.
size_t executed_count() const
Get count of events that have been executed.
void reschedule_event(const Time &trigger_time, Event *event)
Reschedule an event to a new time.
void reset_stats()
Reset statistics counters to zero.
size_t size() const
Get the number of pending events in the queue.
void schedule_event(const Time &trigger_time, Event *)
Schedule an event at a specific time.
bool is_running() const
Check if the queue is running (not shut down)
Time next_event_time() const
Get the trigger time of the next (soonest) event.
bool cancel_by_id(Event::EventId id)
Cancel a scheduled event by its ID.
void resume()
Resume event execution after pause.
size_t canceled_count() const
Get count of events that have been canceled.
void pause()
Pause event execution (events remain scheduled but won't execute)
void cancel_delete_event(Event *&event)
Cancel and delete an event.
void shutdown()
Shut down the queue and stop the background thread.
bool wait_until_empty(int timeout_ms=0)
Block until all pending events have been executed or canceled.
void schedule_after_ms(int ms_from_now, Event *event)
Schedule an event relative to the current time.
bool is_paused() const
Check if the queue is paused.
Event * find_by_id(const Event::EventId id) const
Find a scheduled event by its ID.
bool cancel_event(Event *event)
Cancel a scheduled event.
size_t clear_all()
Cancel all pending events in the queue.
bool is_empty() const
Check if the queue has no pending events.
atomic< bool > executed
chrono::steady_clock::time_point scheduled_at
chrono::steady_clock::time_point executed_at
TimingEvent(const Time &t)
void EventFct() override
Event handler function to be overridden.
int elapsed_ms() const
#define TEST(name)
bool completed() const noexcept
Return true if all underlying iterators are finished.
Definition ah-zip.H:140
DynList< T > maps(const C &c, Op op)
Classic map operation.
STL namespace.
Priority queue for scheduling timed events.
static TimeoutQueue * g_queue
static Time time_from_now_ms(int ms)
ofstream output
Definition writeHeap.C:213