Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
q-consumer-threads.H
Go to the documentation of this file.
1
2/*
3 Aleph_w
4
5 Data structures & Algorithms
6 version 2.0.0b
7 https://github.com/lrleon/Aleph-w
8
9 This file is part of Aleph-w library
10
11 Copyright (c) 2002-2026 Leandro Rabindranath Leon
12
13 Permission is hereby granted, free of charge, to any person obtaining a copy
14 of this software and associated documentation files (the "Software"), to deal
15 in the Software without restriction, including without limitation the rights
16 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17 copies of the Software, and to permit persons to whom the Software is
18 furnished to do so, subject to the following conditions:
19
20 The above copyright notice and this permission notice shall be included in all
21 copies or substantial portions of the Software.
22
23 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
24 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
26 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
28 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29 SOFTWARE.
30*/
31
32
49# ifndef Q_CONSUMER_THREADS_H
50# define Q_CONSUMER_THREADS_H
51
52# include <thread>
53# include <condition_variable>
54# include <mutex>
55# include <atomic>
56# include <tpl_dynListQueue.H>
57# include <ah-errors.H>
58
59template <typename T>
61{
62 enum class Status { Ready, Executing };
63
64 std::atomic<Status> status = { Status::Ready };
65
66public:
67
69
78 bool run() = 0;
79};
80
85template <typename T>
87{
88public:
89
91 {
92 friend class QueueTheadsPool;
93
94 enum class Status { Ready, Executing };
95
97
98 protected:
99
100 std::atomic<size_t> count = 0;
101
102 public:
103
105
114 virtual void run() = 0;
115
116 size_t get_count() const noexcept { return count.load(); }
117
119
121 };
122
124
125private:
126
127 std::mutex lck;
128 std::condition_variable cond;
130 size_t num_threads = 0;
131 size_t num_active = 0;
132 bool suspended = true;
133 bool shutting_down = false;
134 bool shutdown_done = false;
135 std::condition_variable shutdown_cv;
136
138
139public:
140
142 {
143 std::unique_lock<std::mutex> cs(lck);
145 << "~QueueTheadsPool: shutdown not done";
146 }
147
148private:
149
151 ConsumerQueueEvent * event)
152 {
153 threads_pool->run_event(event);
154 }
155
156 inline void run_event(ConsumerQueueEvent * event);
157
158public:
159
161 {
162 std::unique_lock<std::mutex> critical_section(lck);
163
164 auto th = std::thread(handler, this, event);
165 th.detach();
166
167 // check if event is already owned by another thread
168 ah_domain_error_if(event_list.exists([event] (auto e) { return e == event; }))
169 << "Thread creation with an event owned by another";
170
171 event_list.append(event);
172 ++num_threads;
173 }
174
175 bool put(T && item)
176 {
177 try
178 {
179 std::unique_lock<std::mutex> critical_section(lck);
180 if (num_threads == 0)
181 ah_domain_error_if(true) << "there is no any thread created";
182 q.put(std::forward<T>(item));
183 if (not suspended)
184 cond.notify_one(); // for waking up a thread
185 }
186 catch (...)
187 {
188 return false;
189 }
190
191 return true;
192 }
193
194 void resume()
195 {
196 std::unique_lock<std::mutex> critical_section(lck);
197 if (shutting_down)
198 return;
199
200 suspended = false;
201 cond.notify_all();
202 }
203
204 void suspend()
205 {
206 std::unique_lock<std::mutex> critical_section(lck);
207 if (shutting_down)
208 return;
209
210 suspended = true;
211 cond.notify_all();
212 }
213
215 {
216 std::unique_lock<std::mutex> critical_section(lck);
217 return suspended;
218 }
219
221 {
222 std::unique_lock<std::mutex> critical_section(lck);
223 return shutdown_done;
224 }
225
226 void shutdown()
227 {
228 std::unique_lock<std::mutex> critical_section(lck);
229 if (shutting_down)
230 return;
231
232 shutting_down = true;
233 cond.notify_all();
234
235 shutdown_cv.wait(critical_section, [this] { return num_threads == 0; } );
236 shutdown_done = true;
237 }
238};
239
240
241template <typename T>
243{
244 std::unique_lock<std::mutex> critical_section(lck);
245
246 while (true)
247 {
249 [this] { return not ((q.is_empty() and not shutting_down) or
250 suspended); });
251
252 if (shutting_down)
253 break;
254
255 if (suspended or q.is_empty())
256 continue;
257
258 event->item = q.get();
259 ++num_active;
260 critical_section.unlock();
261
262 try
263 {
265 event->run();
266 event->status = ConsumerQueueEvent::Status::Ready;
267
268 critical_section.lock();
269 --num_active;
270 }
271 catch (...)
272 {
273 event->status = ConsumerQueueEvent::Status::Ready;
274 critical_section.lock();
275 --num_active;
276 }
277 }
278
279 --num_threads;
280 shutdown_cv.notify_one();
281}
282
283
284# endif // Q_CONSUMER_THREADS_H
Exception handling system with formatted messages for Aleph-w.
#define ah_domain_error_if(C)
Throws std::domain_error if condition holds.
Definition ah-errors.H:522
Dynamic queue of elements of generic type T based on single linked list.
Dynamic singly linked list with functional programming support.
Definition htlist.H:1155
bool run()=0
this is a item extracted from the queue
std::atomic< Status > status
virtual void run()=0
this is a item extracted from the queue
Pool of consumer threads form a queue.
std::condition_variable cond
DynList< ConsumerQueueEvent * > event_list
static void handler(QueueTheadsPool< T > *threads_pool, ConsumerQueueEvent *event)
std::condition_variable shutdown_cv
void run_event(ConsumerQueueEvent *event)
void create_thread(ConsumerQueueEvent *event)
DynListQueue< T > q
and
Check uniqueness with explicit hash + equality functors.
Divide_Conquer_DP_Result< Cost > divide_and_conquer_partition_dp(const size_t groups, const size_t n, Transition_Cost_Fn transition_cost, const Cost inf=dp_optimization_detail::default_inf< Cost >())
Optimize partition DP using divide-and-conquer optimization.
std::decay_t< typename HeadC::Item_Type > T
Definition ah-zip.H:105
Dynamic queue implementation based on linked lists.