Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
q-consumer-threads.H
Go to the documentation of this file.
1
2/*
3 Aleph_w
4
5 Data structures & Algorithms
6 version 2.0.0b
7 https://github.com/lrleon/Aleph-w
8
9 This file is part of Aleph-w library
10
11 Copyright (c) 2002-2026 Leandro Rabindranath Leon
12
13 Permission is hereby granted, free of charge, to any person obtaining a copy
14 of this software and associated documentation files (the "Software"), to deal
15 in the Software without restriction, including without limitation the rights
16 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17 copies of the Software, and to permit persons to whom the Software is
18 furnished to do so, subject to the following conditions:
19
20 The above copyright notice and this permission notice shall be included in all
21 copies or substantial portions of the Software.
22
23 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
24 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
26 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
28 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29 SOFTWARE.
30*/
31
32
49# ifndef Q_CONSUMER_THREADS_H
50# define Q_CONSUMER_THREADS_H
51
52# include <thread>
53# include <condition_variable>
54# include <mutex>
55# include <tpl_dynListQueue.H>
56# include <ah-errors.H>
57
58template <typename T>
60{
61 enum class Status { Ready, Executing };
62
63 std::atomic<Status> status = { Status::Ready };
64
65public:
66
68
77 bool run() = 0;
78};
79
84template <typename T>
86{
87public:
88
90 {
91 friend class QueueTheadsPool;
92
93 enum class Status { Ready, Executing };
94
96
97 public:
98
100
109 virtual void run() = 0;
110
112 };
113
115
116private:
117
118 std::mutex lck;
119 std::condition_variable cond;
121 size_t num_threads = 0;
122 size_t num_active = 0;
123 bool suspended = true;
124 bool shutting_down = false;
125 bool shutdown_done = false;
126 std::condition_variable shutdown_cv;
127
129
130public:
131
133 {
134 std::unique_lock<std::mutex> cs(lck);
136 << "~QueueTheadsPool: shutdown not done";
137 }
138
139private:
140
143 {
144 threads_pool->run_event(event);
145 }
146
147 inline void run_event(ConsumerQueueEvent * event);
148
149public:
150
152 {
153 std::unique_lock<std::mutex> critical_section(lck);
154
155 auto th = std::thread(handler, this, event);
156 th.detach();
157
158 // check if event is already owned by another thread
159 ah_domain_error_if(event_list.exists([event] (auto e) { return e == event; }))
160 << "Thread creation with an event owned by another";
161
163 ++num_threads;
164 }
165
166 bool put(T && item)
167 {
168 try
169 {
170 std::unique_lock<std::mutex> critical_section(lck);
171 if (num_threads == 0)
172 ah_domain_error_if(true) << "there is no any thread created";
173 q.put(std::forward<T>(item));
174 if (not suspended)
175 cond.notify_one(); // for waking up a thread
176 }
177 catch (...)
178 {
179 return false;
180 }
181
182 return true;
183 }
184
185 void resume()
186 {
187 std::unique_lock<std::mutex> critical_section(lck);
188 if (shutting_down)
189 return;
190
191 suspended = false;
192 cond.notify_all();
193 }
194
195 void suspend()
196 {
197 std::unique_lock<std::mutex> critical_section(lck);
198 if (shutting_down)
199 return;
200
201 suspended = true;
202 cond.notify_all();
203 }
204
206 {
207 std::unique_lock<std::mutex> critical_section(lck);
208 return suspended;
209 }
210
212 {
213 std::unique_lock<std::mutex> critical_section(lck);
214 return shutdown_done;
215 }
216
217 void shutdown()
218 {
219 std::unique_lock<std::mutex> critical_section(lck);
220 if (shutting_down)
221 return;
222
223 shutting_down = true;
224 cond.notify_all();
225
226 shutdown_cv.wait(critical_section, [this] { return num_threads == 0; } );
227 shutdown_done = true;
228 }
229};
230
231
232template <typename T>
234{
235 std::unique_lock<std::mutex> critical_section(lck);
236
237 while (true)
238 {
240 [this] { return not ((q.is_empty() and not shutting_down) or
241 suspended); });
242
243 if (shutting_down)
244 break;
245
246 if (suspended or q.is_empty())
247 continue;
248
249 event->item = q.get();
250 ++num_active;
251 critical_section.unlock();
252
253 try
254 {
256 event->run();
257 event->status = ConsumerQueueEvent::Status::Ready;
258
259 critical_section.lock();
260 --num_active;
261 }
262 catch (...)
263 {
264 event->status = ConsumerQueueEvent::Status::Ready;
265 critical_section.lock();
266 --num_active;
267 }
268 }
269
270 --num_threads;
271 shutdown_cv.notify_one();
272}
273
274
275# endif // Q_CONSUMER_THREADS_H
Exception handling system with formatted messages for Aleph-w.
#define ah_domain_error_if(C)
Throws std::domain_error if condition holds.
Definition ah-errors.H:522
Dynamic queue of elements of generic type T based on single linked list.
Dynamic singly linked list with functional programming support.
Definition htlist.H:1423
T & append(const T &item)
Append a new item by copy.
Definition htlist.H:1562
constexpr bool is_empty() const noexcept
Return true if list is empty.
Definition htlist.H:523
bool run()=0
this is a item extracted from the queue
std::atomic< Status > status
virtual void run()=0
this is a item extracted from the queue
Pool of consumer threads form a queue.
std::condition_variable cond
DynList< ConsumerQueueEvent * > event_list
static void handler(QueueTheadsPool< T > *threads_pool, ConsumerQueueEvent *event)
std::condition_variable shutdown_cv
void run_event(ConsumerQueueEvent *event)
void create_thread(ConsumerQueueEvent *event)
DynListQueue< T > q
std::decay_t< typename HeadC::Item_Type > T
Definition ah-zip.H:107
DynList< T > maps(const C &c, Op op)
Classic map operation.
Dynamic queue implementation based on linked lists.