Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
worker_pool.H
Go to the documentation of this file.
1
2/*
3 Aleph_w
4
5 Data structures & Algorithms
6 version 2.0.0b
7 https://github.com/lrleon/Aleph-w
8
9 This file is part of Aleph-w library
10
11 Copyright (c) 2002-2026 Leandro Rabindranath Leon
12
13 Permission is hereby granted, free of charge, to any person obtaining a copy
14 of this software and associated documentation files (the "Software"), to deal
15 in the Software without restriction, including without limitation the rights
16 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
17 copies of the Software, and to permit persons to whom the Software is
18 furnished to do so, subject to the following conditions:
19
20 The above copyright notice and this permission notice shall be included in all
21 copies or substantial portions of the Software.
22
23 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
24 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
26 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
28 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
29 SOFTWARE.
30*/
31
32
73# ifndef WORKER_POOL_H
74# define WORKER_POOL_H
75
76# include <thread>
77# include <mutex>
78# include <condition_variable>
79# include <memory>
80
81# include <ah-errors.H>
82# include <tpl_arrayQueue.H>
83# include <iostream>
84
85using namespace Aleph;
86
106template <class WorkerFct>
108{
109private:
110
112 size_t num_threads = 0;
113 std::mutex m;
114 std::condition_variable cond;
115
116 std::atomic<size_t> num_workers = 0;
117
120 bool shut_down = false;
121 bool job_done = false;
122
123 std::condition_variable job_done_cond;
124 std::mutex job_done_mutex;
125
127 {
128 std::unique_lock<std::mutex> lock(m);
129 while (true)
130 {
131 cond.wait(lock, [this] { return num_workers > 0 or shut_down; });
132 if (shut_down)
133 return;
134
135 while (num_workers > 0)
136 {
137 cond.wait(lock, [this]
138 {
139 return (not q.is_empty()) or shut_down;
140 });
141
142 if (shut_down)
143 return;
144
145 if (not q.is_empty())
146 {
147 void * pars_ptr = q.get();
148
149 if (not job_done)
150 {
151 lock.unlock();
152 try
153 {
154 job_done = (*worker_fct)(pars_ptr);
155 }
156 catch (std::exception & e)
157 {
158 std::cout << "Warning: workers exception "
159 << e.what() << std::endl;
160 }
161 lock.lock();
162 free(pars_ptr);
163 }
164 --num_workers;
165 }
166 }
167
168 assert(q.is_empty());
169 job_done_cond.notify_one();
170 continue;
171 }
172 }
173
174public:
175
184 const size_t qsize, const size_t n = 16)
186 {
187 for (size_t i = 0; i < n; ++i)
188 threads.append(std::thread(&WorkersSet::worker_handler, this));
189 }
190
196 void shutdown()
197 {
198 std::unique_lock<std::mutex> lock(m);
199 shut_down = true;
200 cond.notify_all();
201 }
202
205 {
206 shutdown();
207 threads.mutable_for_each([] (std::thread & th) { th.join(); });
208 }
209
215 void prepare_num_workers(size_t n)
216 {
217 std::unique_lock<std::mutex> lock(m);
218 num_workers = n;
219 }
220
232 {
233 std::unique_lock<std::mutex> lock(m);
234 q.put(pars_ptr);
235 cond.notify_one();
236 }
237
243 bool is_jobs_done() const
244 {
245 std::unique_lock<std::mutex> lock(m);
246 return job_done;
247 }
248
257 {
258 std::unique_lock<std::mutex> lock(job_done_mutex);
259 job_done_cond.wait(lock);
260 }
261};
262
263
264# endif
Exception handling system with formatted messages for Aleph-w.
Dynamic singly linked list with functional programming support.
Definition htlist.H:1423
T & append(const T &item)
Append a new item by copy.
Definition htlist.H:1562
Very simple queue implemented with a contiguous array.
T & put(const T &item) noexcept
Put an item into the queue by copy.
constexpr bool is_empty() const noexcept
Return true if the queue is empty.
T get() noexcept
Remove the oldest item of the queue.
void mutable_for_each(Operation &operation)
Definition ah-dry.H:787
Thread pool for parallel worker function execution.
bool is_jobs_done() const
Check if a worker has signaled job completion.
std::condition_variable cond
void wait_until_all_workers_finished_or_job_is_done()
Block until all work items processed or job done signaled.
~WorkersSet()
Destructor shuts down and joins all worker threads.
void shutdown()
Signal all workers to shut down.
void prepare_num_workers(size_t n)
Set the expected number of work items.
std::atomic< size_t > num_workers
std::mutex m
size_t num_threads
std::mutex job_done_mutex
WorkerFct worker_fct
std::condition_variable job_done_cond
FixedQueue< void * > q
void worker_handler()
DynList< std::thread > threads
void schedule_call(void *pars_ptr)
Schedule a work item for execution.
WorkersSet(WorkerFct worker_fct, const size_t qsize, const size_t n=16)
Construct worker pool with specified thread count.
Main namespace for Aleph-w library functions.
Definition ah-arena.H:89
DynList< T > maps(const C &c, Op op)
Classic map operation.
Circular queue implementations backed by arrays.