Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
timeoutQueue.C
Go to the documentation of this file.
1/*
2 Aleph_w
3
4 Data structures & Algorithms
5 version 2.0.0b
6 https://github.com/lrleon/Aleph-w
7
8 This file is part of Aleph-w library
9
10 Copyright (c) 2002-2026 Leandro Rabindranath Leon
11
12 Permission is hereby granted, free of charge, to any person obtaining a copy
13 of this software and associated documentation files (the "Software"), to deal
14 in the Software without restriction, including without limitation the rights
15 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 copies of the Software, and to permit persons to whom the Software is
17 furnished to do so, subject to the following conditions:
18
19 The above copyright notice and this permission notice shall be included in all
20 copies or substantial portions of the Software.
21
22 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 SOFTWARE.
29*/
30
31# include <cstdio>
32# include <typeinfo>
33# include <timeoutQueue.H>
34# include <ah-errors.H>
35
36using namespace std::chrono;
37
38// Initialize static event ID counter
39std::atomic<TimeoutQueue::Event::EventId> TimeoutQueue::Event::nextId{0};
40
41// Convert POSIX timespec to std::chrono::time_point (system_clock)
42static auto timespec_to_timepoint(const Time & t)
43{
44 return system_clock::time_point(duration_cast<system_clock::duration>(
45 seconds(t.tv_sec) + nanoseconds(t.tv_nsec)));
46}
47
49{
50 workerThread = std::thread(&TimeoutQueue::triggerEvent, this);
51}
52
54{ {
55 std::lock_guard<std::mutex> lock(mtx);
56 if (not isShutdown)
57 {
58#ifndef NDEBUG
59 ah_warning(std::cerr)
60 << "TimeoutQueue destructor called without prior shutdown(). "
61 << "Invoking shutdown() automatically." << std::endl;
62#endif
64 }
65 }
66
67 if (workerThread.joinable())
68 workerThread.join();
69}
70
73{
75 << "nullptr event";
76
77 event->set_trigger_time(trigger_time);
79}
80
82{
84 << "nullptr event";
85 ah_domain_error_if(event->time_key().tv_nsec < 0 or event->time_key().tv_nsec >= NSEC)
86 << "event nsec out of range: " << event->time_key().tv_nsec;
87
88 std::lock_guard<std::mutex> lock(mtx);
89
90 event_registry.insert(event);
91
92 ah_invalid_argument_if(event->get_execution_status() == Event::In_Queue)
93 << "Event has already been inserted in timeoutQueue";
94
95 if (isShutdown)
96 return;
97
98 event->set_execution_status(Event::In_Queue);
99
101 event_map[event->get_id()] = event;
102
103 cond.notify_one();
104}
105
107{
108 ah_invalid_argument_if(event == nullptr)
109 << "nullptr event";
110
112 bool became_empty = false; {
113 std::lock_guard<std::mutex> lock(mtx);
114
116 << "Event " << event << " not found in timeoutQueue";
117
118 if (event->get_execution_status() != Event::In_Queue)
119 return false;
120
121 callback = event->on_completed;
122
124 if (event_map.contains(event->get_id()))
125 event_map.remove(event->get_id());
126 event_registry.remove(event);
127
128 event->set_execution_status(Event::Canceled);
130
132 }
133
134 if (callback)
135 callback(event, Event::Canceled);
136
137 if (became_empty)
138 emptyCondition.notify_all();
139
140 cond.notify_one();
141
142 return true;
143}
144
146{
147 if (event == nullptr)
148 return;
149
150 Event *local = event;
153 bool became_empty = false; {
154 bool was_in_queue = false;
155 std::lock_guard<std::mutex> lock(mtx);
156
158 << "Event " << local << " not found in timeoutQueue";
159
160 if (local->get_execution_status() == Event::In_Queue)
161 {
163 if (event_map.contains(local->get_id()))
164 event_map.remove(local->get_id());
165 event_registry.remove(local);
167 was_in_queue = true;
168 }
169
170 if (local->get_execution_status() == Event::Executing)
171 {
172 // Worker thread will invoke callback and delete after EventFct() returns
173 local->set_execution_status(Event::To_Delete);
175
176 // Also remove from event_map to prevent find_by_id() from returning a pointer
177 // that is about to be deleted by the worker thread.
178 if (event_map.contains(local->get_id()))
179 event_map.remove(local->get_id());
180
181 event = nullptr;
182 cond.notify_one();
183 return;
184 }
185
186 callback = local->on_completed;
187
188 if (was_in_queue)
190
191 local->set_execution_status(Event::Deleted);
193 }
194
195 delete local;
196 event = nullptr;
197
198 if (callback)
199 callback(nullptr, final_status);
200
201 if (became_empty)
202 emptyCondition.notify_all();
203
204 cond.notify_one();
205}
206
209{
210 ah_invalid_argument_if(event == nullptr)
211 << "nullptr event";
212
213 std::lock_guard<std::mutex> lock(mtx);
214
216 << "Event " << event << " not found in timeoutQueue";
217
218 if (isShutdown)
219 return;
220
221 if (event->get_execution_status() == Event::In_Queue)
222 prio_queue.remove(event); // eventMap entry stays, ID doesn't change
223
224 event->set_trigger_time(trigger_time);
225
226 event->set_execution_status(Event::In_Queue);
227
229 event_map[event->get_id()] = event; // Re-add in case it wasn't there
230
231 cond.notify_one();
232}
233
235{
236 std::unique_lock<std::mutex> lock(mtx);
237
238 while (true)
239 {
240 // Sleep if there are no events or if paused
241 while ((prio_queue.size() == 0 or isPaused) and not isShutdown)
242 cond.wait(lock);
243
244 if (isShutdown)
245 break;
246
247 // Read the soonest event
248 auto *event_to_schedule = static_cast<Event *>(prio_queue.top());
249
250 // Compute time when the event must be triggered (wall clock)
251 const Time original_trigger_time = event_to_schedule->time_key();
253
254 // Anchor both clocks under the same lock to avoid skew
255 const auto sys_now = system_clock::now();
256 const auto steady_now = steady_clock::now();
257
258 // Compute delta in system_clock domain, clamp negative to zero
259 auto delta = trigger_sys - sys_now;
260 if (delta < system_clock::duration::zero())
261 delta = system_clock::duration::zero();
262
263 // Convert delta to steady_clock and build a steady deadline
264 const auto deadline_steady =
266
267 // Wait until deadline or notification (immune to wall-clock jumps)
268 const auto wait_result = cond.wait_until(lock, deadline_steady);
269
270 if (isShutdown)
271 break;
272
273 // If paused, go back to waiting
274 if (isPaused)
275 continue;
276
277 if (wait_result == std::cv_status::timeout)
278 {
279 if (prio_queue.size() == 0)
280 continue;
281
282 // Peek at the soonest event without extracting it
283 // If the top changed (original was canceled/rescheduled) and
284 // the new top is in the future, go back to wait for it
285 if (const auto *next = static_cast<Event *>(prio_queue.top()); next->time_key() > original_trigger_time)
286 continue;
287
288 // Now extract the event we are going to execute
289 auto *event_to_execute = static_cast<Event *>(prio_queue.getMin());
290
291 event_to_execute->set_execution_status(Event::Executing);
292
293 lock.unlock();
294
295 try { event_to_execute->EventFct(); }
296 catch (...)
297 {
298 ah_warning(std::cerr) << "Uncaught exception in TimeoutQueue event execution (ID "
299 << event_to_execute->get_id() << ", name: '"
300 << event_to_execute->get_name() << "')" << std::endl;
301 }
302
303 lock.lock();
304
306
307 const auto current_status = event_to_execute->get_execution_status();
308 const Event::CompletionCallback callback = event_to_execute->on_completed;
310
312 {
314 event_to_execute->set_execution_status(Event::Deleted);
315 }
317 {
318 // Event was rescheduled during EventFct() - still in queue, don't touch
320 }
321 else
322 {
325 event_to_execute->set_execution_status(Event::Executed);
326 }
327
328 const bool became_empty = prio_queue.size() == 0;
329
330 lock.unlock();
331
333 {
334 delete event_to_execute;
335 if (callback)
336 callback(nullptr, final_status);
337 }
338 else if (callback)
340
341 if (became_empty)
342 emptyCondition.notify_all();
343
344 lock.lock();
345 }
346 }
347
348 // Shutdown requested - cancel all pending events
349 while (prio_queue.size() > 0)
350 {
351 auto *event = static_cast<Event *>(prio_queue.getMin());
352 const Event::CompletionCallback callback = event->on_completed;
353 event_map.remove(event->get_id());
354 event_registry.remove(event);
355 // Set final status BEFORE callback to avoid use-after-free:
356 // User code may delete the event in the callback
357 event->set_execution_status(Event::Canceled);
359 lock.unlock();
360 if (callback)
361 callback(event, Event::Canceled);
362 lock.lock();
363 }
364
365 emptyCondition.notify_all();
366}
367
369{
370 std::lock_guard<std::mutex> lock(mtx);
372}
373
374size_t TimeoutQueue::size() const
375{
376 std::lock_guard<std::mutex> lock(mtx);
377 return prio_queue.size();
378}
379
381{
382 std::lock_guard<std::mutex> lock(mtx);
383 return prio_queue.size() == 0;
384}
385
387{
388 std::lock_guard<std::mutex> lock(mtx);
389 return not isShutdown;
390}
391
397
399{
400 std::lock_guard<std::mutex> lock(mtx);
401 if (prio_queue.size() == 0)
402 return {0, 0};
403 return static_cast<Event *>(prio_queue.top())->time_key();
404}
405
407{
408 std::unique_lock<std::mutex> lock(mtx);
409
410 size_t count = 0;
411 while (prio_queue.size() > 0)
412 {
413 auto *event = static_cast<Event *>(prio_queue.getMin());
414 const Event::CompletionCallback callback = event->on_completed;
415 event_map.remove(event->get_id());
416 event_registry.remove(event);
417 event->set_execution_status(Event::Canceled);
418 ++count;
420 lock.unlock();
421 if (callback)
422 callback(event, Event::Canceled);
423 lock.lock();
424 }
425
426 cond.notify_one();
427 emptyCondition.notify_all();
428
429 return count;
430}
431
433{
434 std::lock_guard<std::mutex> lock(mtx);
435 return executedCount;
436}
437
439{
440 std::lock_guard<std::mutex> lock(mtx);
441 return canceledCount;
442}
443
445{
446 std::lock_guard<std::mutex> lock(mtx);
447 executedCount = 0;
448 canceledCount = 0;
449}
450
452{
453 std::lock_guard<std::mutex> lock(mtx);
454 isPaused = true;
455}
456
458{
459 std::lock_guard<std::mutex> lock(mtx);
460 isPaused = false;
461 cond.notify_one();
462}
463
465{
466 std::lock_guard<std::mutex> lock(mtx);
467 return isPaused;
468}
469
471{
472 std::unique_lock<std::mutex> lock(mtx);
473
474 if (prio_queue.size() == 0)
475 return true;
476
477 if (timeout_ms <= 0)
478 {
479 emptyCondition.wait(lock, [this]() { return prio_queue.size() == 0 or isShutdown; });
480 return prio_queue.size() == 0;
481 }
482
483 return emptyCondition.wait_for(lock, std::chrono::milliseconds(timeout_ms),
484 [this]() { return prio_queue.size() == 0 or isShutdown; });
485}
486
488{
489 if (id == Event::InvalidId)
490 return nullptr;
491
492 std::lock_guard<std::mutex> lock(mtx);
493
494 if (const auto ptr_pair = event_map.search(id);
495 ptr_pair != nullptr and ptr_pair->second->get_execution_status() == Event::In_Queue)
496 return ptr_pair->second;
497
498 return nullptr;
499}
500
502{
503 if (id == Event::InvalidId)
504 return false;
505
507 Event *event = nullptr;
508 bool became_empty = false; {
509 std::lock_guard<std::mutex> lock(mtx);
510
511 const auto event_pair = event_map.search(id);
512 if (event_pair == nullptr)
513 return false;
514
515 event = event_pair->second;
516 if (event->get_execution_status() != Event::In_Queue)
517 return false;
518
519 callback = event->on_completed;
521 event_map.remove(id);
522 event_registry.remove(event);
523 event->set_execution_status(Event::Canceled);
525
527 }
528
529 if (callback)
530 callback(event, Event::Canceled);
531 cond.notify_one();
532
533 if (became_empty)
534 emptyCondition.notify_all();
535
536 return true;
537}
Exception handling system with formatted messages for Aleph-w.
#define ah_warning(out)
Emits an unconditional warning to a stream.
Definition ah-errors.H:172
#define ah_invalid_argument_unless(C)
Throws std::invalid_argument if condition does NOT hold.
Definition ah-errors.H:655
#define ah_domain_error_if(C)
Throws std::domain_error if condition holds.
Definition ah-errors.H:522
#define ah_invalid_argument_if(C)
Throws std::invalid_argument if condition holds.
Definition ah-errors.H:639
Time time_plus_msec(const Time &current_time, const int &msec)
Definition ah-time.H:112
Time read_current_time()
Definition ah-time.H:103
#define NSEC
Definition ah-time.H:55
struct timespec Time
Definition ah-time.H:50
T remove()
Remove the first item of the list.
Definition htlist.H:1611
Node * getMin()
Elimina del heap el nodo de menor prioridad.
Node * remove(Node *node)
Elimina del heap el nodo node.
Node * insert(Node *p) noexcept
Inserta un nodo en un heap.
Node * top()
Retorna el nodo con menor prioridad según el criterio de comparación especificado en la declaración.
const size_t & size() const noexcept
Base class for scheduled events.
CompletionCallback on_completed
Execution_Status
Possible states of an event in its lifecycle.
@ Executing
Currently executing EventFct()
@ Deleted
Memory freed.
@ Canceled
Removed from queue before execution.
@ Executed
Completed execution.
@ In_Queue
Scheduled and waiting for trigger time.
@ To_Delete
Marked for cleanup.
static std::atomic< EventId > nextId
std::function< void(Event *, Execution_Status)> CompletionCallback
Optional callback invoked after an event completes, is canceled, or is deleted.
uint64_t EventId
Type for unique event identifiers.
static constexpr EventId InvalidId
Invalid/null event ID.
std::condition_variable cond
size_t executed_count() const
Get count of events that have been executed.
void reschedule_event(const Time &trigger_time, Event *event)
Reschedule an event to a new time.
void shutdown_locked()
void reset_stats()
Reset statistics counters to zero.
DynMapTree< Event::EventId, Event * > event_map
size_t size() const
Get the number of pending events in the queue.
void schedule_event(const Time &trigger_time, Event *)
Schedule an event at a specific time.
~TimeoutQueue()
Destructor - shuts down the queue and joins the thread.
bool is_running() const
Check if the queue is running (not shut down)
std::mutex mtx
Time next_event_time() const
Get the trigger time of the next (soonest) event.
bool cancel_by_id(Event::EventId id)
Cancel a scheduled event by its ID.
void resume()
Resume event execution after pause.
size_t canceled_count() const
Get count of events that have been canceled.
void pause()
Pause event execution (events remain scheduled but won't execute)
std::thread workerThread
size_t executedCount
void cancel_delete_event(Event *&event)
Cancel and delete an event.
void shutdown()
Shut down the queue and stop the background thread.
bool wait_until_empty(int timeout_ms=0)
Block until all pending events have been executed or canceled.
void schedule_after_ms(int ms_from_now, Event *event)
Schedule an event relative to the current time.
void triggerEvent()
bool is_paused() const
Check if the queue is paused.
Event * find_by_id(const Event::EventId id) const
Find a scheduled event by its ID.
BinHeapVtl< Time > prio_queue
bool cancel_event(Event *event)
Cancel a scheduled event.
size_t clear_all()
Cancel all pending events in the queue.
TimeoutQueue()
Default constructor - starts the background thread.
bool is_empty() const
Check if the queue has no pending events.
std::condition_variable emptyCondition
DynSetTree< Event * > event_registry
size_t canceledCount
void next()
Advance all underlying iterators (bounds-checked).
Definition ah-zip.H:175
DynList< T > maps(const C &c, Op op)
Classic map operation.
Itor::difference_type count(const Itor &beg, const Itor &end, const T &value)
Count elements equal to a value.
Definition ahAlgo.H:127
static auto timespec_to_timepoint(const Time &t)
Priority queue for scheduling timed events.