Aleph-w 3.0
A C++ Library for Data Structures and Algorithms
Loading...
Searching...
No Matches
test-con-queue.C
Go to the documentation of this file.
1
2/* Aleph-w
3
4 / \ | | ___ _ __ | |__ __ __
5 / _ \ | |/ _ \ '_ \| '_ \ ____\ \ /\ / / Data structures & Algorithms
6 / ___ \| | __/ |_) | | | |_____\ V V / version 1.9c
7 /_/ \_\_|\___| .__/|_| |_| \_/\_/ https://github.com/lrleon/Aleph-w
8 |_|
9
10 This file is part of Aleph-w library
11
12 Copyright (c) 2002-2018 Leandro Rabindranath Leon
13
14 This program is free software: you can redistribute it and/or modify
15 it under the terms of the GNU General Public License as published by
16 the Free Software Foundation, either version 3 of the License, or
17 (at your option) any later version.
18
19 This program is distributed in the hope that it will be useful, but
20 WITHOUT ANY WARRANTY; without even the implied warranty of
21 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
22 General Public License for more details.
23
24 You should have received a copy of the GNU General Public License
25 along with this program. If not, see <https://www.gnu.org/licenses/>.
26*/
27
28# include <iostream>
29# include <cerrno>
30# include <chrono>
31# include <thread>
32# include <q-consumer-threads.H>
33
34using namespace std;
35
36static constexpr size_t WORK_DELAY_MS = 10;
37
38struct Event1 : QueueTheadsPool<int>::Event
39{
40 void run() override
41 {
42 // cout << "Event 1 " << item << endl;
43 this_thread::sleep_for(chrono::milliseconds(WORK_DELAY_MS));
45 }
46};
47
48struct Event2 : QueueTheadsPool<int>::Event
49{
50 void run() override
51 {
52 // cout << "Event 2 " << item << endl;
53 this_thread::sleep_for(chrono::milliseconds(WORK_DELAY_MS));
55 }
56};
57
58int main(int argc, char *argv[])
59{
60 if (argc == 1)
61 {
62 cout << "test-con-queue -- demonstrates QueueTheadsPool concurrent processing\n"
63 << "\n"
64 << "Creates a pool of worker threads backed by a shared queue. Each thread\n"
65 << "runs one of two event types (Event1 / Event2), both of which sleep for\n"
66 << WORK_DELAY_MS << " ms to simulate work. The test enqueues items, lets workers\n"
67 << "run for a while, then suspends, sleeps, resumes, and finally shuts down.\n"
68 << "\n"
69 << "Usage:\n"
70 << " " << argv[0] << " <num_threads> <num_items> <secs>\n"
71 << "\n"
72 << "Arguments:\n"
73 << " num_threads Number of worker threads to create (positive integer)\n"
74 << " num_items Number of integer items to enqueue\n"
75 << " secs Seconds to let workers run in each active phase\n"
76 << "\n"
77 << "Example:\n"
78 << " " << argv[0] << " 4 100 3\n"
79 << " Creates 4 threads, enqueues 100 items, runs for 3 s, suspends,\n"
80 << " sleeps 5 s, resumes for another 3 s, then shuts down.\n";
81 return 0;
82 }
83
84 if (argc < 4)
85 {
86 cerr << "Usage: " << argv[0] << " <num_threads> <num_items> <secs>" << endl;
87 return 1;
88 }
89
90 char * endptr = nullptr;
91 errno = 0;
92 if (argv[1][0] == '-')
93 {
94 cerr << "Invalid num_threads: " << argv[1] << endl;
95 return 1;
96 }
97 const size_t num_threads = strtoul(argv[1], &endptr, 10);
98 if (errno != 0 or endptr == argv[1] or *endptr != '\0' or num_threads == 0)
99 {
100 cerr << "Invalid num_threads: " << argv[1] << endl;
101 return 1;
102 }
103
104 if (argv[2][0] == '-')
105 {
106 cerr << "Invalid num_items: " << argv[2] << endl;
107 return 1;
108 }
109 const size_t num_items = strtoul(argv[2], &endptr, 10);
110 if (errno != 0 or endptr == argv[2] or *endptr != '\0')
111 {
112 cerr << "Invalid num_items: " << argv[2] << endl;
113 return 1;
114 }
115
116 if (argv[3][0] == '-')
117 {
118 cerr << "Invalid secs: " << argv[3] << endl;
119 return 1;
120 }
121 const size_t secs = strtoul(argv[3], &endptr, 10);
122 if (errno != 0 or endptr == argv[3] or *endptr != '\0')
123 {
124 cerr << "Invalid secs: " << argv[3] << endl;
125 return 1;
126 }
127
130 for (size_t i = 0; i < num_threads; ++i)
131 {
132 QueueTheadsPool<int>::Event * ptr = nullptr;
133 if (i % 2 == 0)
134 qpool.create_thread(ptr = new Event1);
135 else
136 qpool.create_thread(ptr = new Event2);
137 event_list.append(ptr);
138 }
139
140 cout << "Inserting " << num_items << " to the queue" << endl;
141 for (size_t i = 0; i < num_items; ++i)
142 {
143 cout << "Inserting " << i << endl;
144 qpool.put(i);
145 }
146 cout << "Done" << endl
147 << endl
148 << "Resuming ... " << endl;
149 qpool.resume();
150
151 cout << endl
152 << "Sleeping for " << secs << " seconds" << endl;
153 this_thread::sleep_for(chrono::seconds(secs));
154 cout << endl
155 << "Done" << endl
156 << endl
157 << "List of event counters" << endl;
158 event_list.for_each([] (auto e)
159 {
160 cout << " Event count = " << e->get_count() << endl;
161 });
162
163 cout << "done" << endl
164 << endl
165 << "Suspending (cooperative: in-flight tasks continue in run())" << endl;
166 qpool.suspend();
167 cout << "Done" << endl
168 << endl
169 << "Sleeping for 5 secs" << endl;
170 this_thread::sleep_for(chrono::seconds(5));
171 cout << "done" << endl
172 << endl
173 << "Resuming while other " << secs << " seconds" << endl
174 << endl;
175 qpool.resume();
176 this_thread::sleep_for(chrono::seconds(secs));
177 cout << endl
178 << "Done" << endl
179 << endl
180 << "List of event counters" << endl;
181 event_list.for_each([] (auto e)
182 {
183 cout << " Event count = " << e->get_count() << endl;
184 });
185
186 cout << "done" << endl
187 << endl
188 << "Shutdown" << endl;
189 qpool.shutdown();
190 cout << "Done" << endl;
191
192 return 0;
193}
Dynamic singly linked list with functional programming support.
Definition htlist.H:1155
T & append(const T &item)
Definition htlist.H:1271
void for_each(Operation &operation)
Traverse all the container and performs an operation on each element.
Definition ah-dry.H:779
Pool of consumer threads form a queue.
Divide_Conquer_DP_Result< Cost > divide_and_conquer_partition_dp(const size_t groups, const size_t n, Transition_Cost_Fn transition_cost, const Cost inf=dp_optimization_detail::default_inf< Cost >())
Optimize partition DP using divide-and-conquer optimization.
STL namespace.
Queue-based consumer thread pool.
void run() override
this is a item extracted from the queue
void run() override
this is a item extracted from the queue
static constexpr size_t WORK_DELAY_MS