Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
timeoutQueue.H
Go to the documentation of this file.
1/*
2 Aleph_w
3
4 Data structures & Algorithms
5 version 2.0.0b
6 https://github.com/lrleon/Aleph-w
7
8 This file is part of Aleph-w library
9
10 Copyright (c) 2002-2026 Leandro Rabindranath Leon
11
12 Permission is hereby granted, free of charge, to any person obtaining a copy
13 of this software and associated documentation files (the "Software"), to deal
14 in the Software without restriction, including without limitation the rights
15 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
16 copies of the Software, and to permit persons to whom the Software is
17 furnished to do so, subject to the following conditions:
18
19 The above copyright notice and this permission notice shall be included in all
20 copies or substantial portions of the Software.
21
22 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
23 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
24 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
25 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
26 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
27 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
28 SOFTWARE.
29*/
30
31
103# ifndef TIMEOUTQUEUE_H
104# define TIMEOUTQUEUE_H
105
106# include <unistd.h>
107# include <iostream>
108# include <cassert>
109# include <cstdlib>
110# include <mutex>
111# include <condition_variable>
112# include <thread>
113# include <chrono>
114# include <functional>
115# include <string>
116# include <atomic>
117# include <cstdint>
118# include <ah-errors.H>
119# include <tpl_dynMapTree.H>
120# include <tpl_dynSetTree.H>
121# include <tpl_binHeap.H>
122# include <ah-time.H>
123# include <utility>
124
141{
142public:
152 class Event : private BinHeapVtl<Time>::Node
153 {
154 friend class TimeoutQueue;
155
156 public:
172
179 using CompletionCallback = std::function<void(Event *, Execution_Status)>;
180
183
185 static constexpr EventId InvalidId = 0;
186
187 private:
188 static std::atomic<EventId> nextId;
189
191 std::atomic<Execution_Status> execution_status;
192 std::string event_name;
194
195 Time& time_key() { return this->get_key(); }
196 [[nodiscard]] const Time& time_key() const { return this->get_key(); }
197
199 {
200 assert(_t.tv_nsec >= 0 and _t.tv_nsec < NSEC);
201 time_key() = _t;
202 }
203
205 {
206 execution_status = status;
207 }
208
214
215 protected:
216 Event(const Time & t, std::string name = "")
219 {
221 }
222
223 Event(const long sec, const long nsec, std::string name = "")
225 {
226 assert(nsec >= 0 and nsec < NSEC);
227
228 time_key().tv_sec = sec;
229 time_key().tv_nsec = nsec;
230 }
231
232 public:
233 ~Event() noexcept override
234 {
236 {
237 ah_warning(std::cerr)
238 << "FATAL: Deleting Event " << this << " (ID " << event_id << ", name: '"
239 << event_name << "') that is still In_Queue. "
240 << "This causes use-after-free when the worker thread accesses it. "
241 << "Must call cancel_event() or use cancel_delete_event() before destroying.";
242 abort();
243 }
244 }
245
247
248 [[nodiscard]] Time getAbsoluteTime() const { return time_key(); }
249
251
253 [[nodiscard]] EventId get_id() const { return event_id; }
254
256 [[nodiscard]] const std::string &get_name() const { return event_name; }
257
259 void set_name(const std::string & name) { event_name = name; }
260
263
273 virtual void EventFct() = 0;
274 }; /* end class Event */
275
276private:
278 DynMapTree<Event::EventId, Event *> event_map; // For O(log n) lookup by ID
279 DynSetTree<Event *> event_registry; // Currently managed event pointers
280
281 mutable std::mutex mtx;
282 std::condition_variable cond;
283 std::thread workerThread;
284
286 bool isPaused = false;
287
288 size_t executedCount = 0;
289 size_t canceledCount = 0;
290
291 std::condition_variable emptyCondition;
292
293 void triggerEvent();
294
295 // Set isShutdown and notify; caller must hold mtx
297 {
298 isShutdown = true;
299 cond.notify_all();
300 }
301
302public:
304 TimeoutQueue();
305
308
316 void schedule_event(const Time & trigger_time, Event *);
317
324 void schedule_event(Event * event);
325
333 bool cancel_event(Event * event);
334
342 void reschedule_event(const Time & trigger_time, Event *event);
343
365 void cancel_delete_event(Event *& event);
366
368 void shutdown();
369
371 [[nodiscard]] std::thread::id getThreadId() const { return workerThread.get_id(); }
372
374 [[nodiscard]] size_t size() const;
375
377 [[nodiscard]] bool is_empty() const;
378
380 [[nodiscard]] bool is_running() const;
381
387 void schedule_after_ms(int ms_from_now, Event *event);
388
393 [[nodiscard]] Time next_event_time() const;
394
399 size_t clear_all();
400
402 [[nodiscard]] size_t executed_count() const;
403
405 [[nodiscard]] size_t canceled_count() const;
406
408 void reset_stats();
409
411 void pause();
412
414 void resume();
415
417 [[nodiscard]] bool is_paused() const;
418
424 bool wait_until_empty(int timeout_ms = 0);
425
431 [[nodiscard]] Event * find_by_id(const Event::EventId id) const;
432
439};
440
441
442# endif /* TIMEOUTQUEUE_H */
Exception handling system with formatted messages for Aleph-w.
#define ah_warning(out)
Emits an unconditional warning to a stream.
Definition ah-errors.H:172
Low-level time utilities using POSIX timespec.
#define NSEC
Definition ah-time.H:55
struct timespec Time
Definition ah-time.H:50
WeightedDigraph::Node Node
Generic key-value map implemented on top of a binary search tree.
Dynamic set backed by balanced binary search trees with automatic memory management.
Base class for scheduled events.
Event(const Time &t, std::string name="")
CompletionCallback on_completed
void set_trigger_time(const Time _t)
const std::string & get_name() const
Get the event name (for debugging)
Time getAbsoluteTime() const
void set_execution_status(Execution_Status status)
~Event() noexcept override
Execution_Status
Possible states of an event in its lifecycle.
@ Executing
Currently executing EventFct()
@ Deleted
Memory freed.
@ Out_Queue
Not currently in any queue.
@ Canceled
Removed from queue before execution.
@ Executed
Completed execution.
@ In_Queue
Scheduled and waiting for trigger time.
@ To_Delete
Marked for cleanup.
std::atomic< Execution_Status > execution_status
Execution_Status get_execution_status() const
const Time & time_key() const
static std::atomic< EventId > nextId
std::function< void(Event *, Execution_Status)> CompletionCallback
Optional callback invoked after an event completes, is canceled, or is deleted.
Event(const long sec, const long nsec, std::string name="")
void set_completion_callback(CompletionCallback cb)
Set the completion callback (called after EventFct completes or the event is canceled)
std::string event_name
virtual void EventFct()=0
Event handler function to be overridden.
void invoke_completion_callback()
void set_name(const std::string &name)
Set the event name.
uint64_t EventId
Type for unique event identifiers.
static constexpr EventId InvalidId
Invalid/null event ID.
EventId get_id() const
Get the unique event ID (auto-generated on construction)
Thread-safe priority queue for scheduling timed events.
std::condition_variable cond
size_t executed_count() const
Get count of events that have been executed.
void reschedule_event(const Time &trigger_time, Event *event)
Reschedule an event to a new time.
void shutdown_locked()
void reset_stats()
Reset statistics counters to zero.
DynMapTree< Event::EventId, Event * > event_map
size_t size() const
Get the number of pending events in the queue.
void schedule_event(const Time &trigger_time, Event *)
Schedule an event at a specific time.
~TimeoutQueue()
Destructor - shuts down the queue and joins the thread.
bool is_running() const
Check if the queue is running (not shut down)
std::mutex mtx
Time next_event_time() const
Get the trigger time of the next (soonest) event.
bool cancel_by_id(Event::EventId id)
Cancel a scheduled event by its ID.
std::thread::id getThreadId() const
Get the background thread ID.
void resume()
Resume event execution after pause.
size_t canceled_count() const
Get count of events that have been canceled.
void pause()
Pause event execution (events remain scheduled but won't execute)
std::thread workerThread
size_t executedCount
void cancel_delete_event(Event *&event)
Cancel and delete an event.
void shutdown()
Shut down the queue and stop the background thread.
bool wait_until_empty(int timeout_ms=0)
Block until all pending events have been executed or canceled.
void schedule_after_ms(int ms_from_now, Event *event)
Schedule an event relative to the current time.
void triggerEvent()
bool is_paused() const
Check if the queue is paused.
Event * find_by_id(const Event::EventId id) const
Find a scheduled event by its ID.
BinHeapVtl< Time > prio_queue
bool cancel_event(Event *event)
Cancel a scheduled event.
size_t clear_all()
Cancel all pending events in the queue.
TimeoutQueue()
Default constructor - starts the background thread.
bool is_empty() const
Check if the queue has no pending events.
std::condition_variable emptyCondition
DynSetTree< Event * > event_registry
size_t canceledCount
__gmp_expr< T, __gmp_unary_expr< __gmp_expr< T, U >, __gmp_sec_function > > sec(const __gmp_expr< T, U > &expr)
Definition gmpfrxx.h:4072
DynList< T > maps(const C &c, Op op)
Classic map operation.
STL namespace.
Heap of nodes with virtual destroyer.
Binary heap implementation using tree structure.
Dynamic key-value map based on balanced binary search trees.
Dynamic set implementations based on balanced binary search trees.