Libthreadar 1.5.0
Loading...
Searching...
No Matches
ratelier_gather.hpp
Go to the documentation of this file.
1/*********************************************************************/
2// libthreadar - is a library providing several C++ classes to work with threads
3// Copyright (C) 2014-2024 Denis Corbin
4//
5// This file is part of libthreadar
6//
7// libthreadar is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// libhtreadar is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Lesser General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with libthreadar. If not, see <http://www.gnu.org/licenses/>
19//
20//----
21// to contact the author: dar.linux@free.fr
22/*********************************************************************/
23
24#ifndef LIBTHREADAR_RATELIER_GATHER_HPP
25#define LIBTHREADAR_RATELIER_GATHER_HPP
26
69
70
71
72#include "config.h"
73
74 // C system headers
75extern "C"
76{
77}
78 // C++ standard headers
79#include <vector>
80#include <map>
81#include <deque>
82#include <memory>
83
84 // libthreadar headers
85#include "mutex.hpp"
86
87
88namespace libthreadar
89{
91
96
97 template <class T> class ratelier_gather
98 {
99 public:
100 ratelier_gather(unsigned int size, signed int flag = 0);
101 ratelier_gather(const ratelier_gather & ref) = delete;
102 ratelier_gather(ratelier_gather && ref) = default;
103 ratelier_gather & operator = (const ratelier_gather & ref) = delete;
104 ratelier_gather & operator = (ratelier_gather && ref) noexcept = default;
105 virtual ~ratelier_gather() = default;
106
108
115 void worker_push_one(unsigned int slot, std::unique_ptr<T> & one, signed int flag = 0);
116
118
122 void gather(std::deque<std::unique_ptr<T> > & ones, std::deque<signed int> & flag);
123
125 void reset();
126
127 private:
128
129 static const unsigned int cond_pending_data = 0;
130 static const unsigned int cond_full = 1;
131
132 struct slot
133 {
134 std::unique_ptr<T> obj;
135 bool empty;
136 unsigned int index;
137 signed int flag;
138
139 slot(signed int val) { empty = true; flag = val; };
140 slot(const slot & ref) { obj.reset(); empty = ref.empty; index = ref.index; flag = ref.flag; };
141 };
142
143 unsigned int next_index;
144 std::vector<slot> table;
145 std::map<unsigned int, unsigned int> corres;
146 std::deque<unsigned int> empty_slot;
148 };
149
150 template <class T> ratelier_gather<T>::ratelier_gather(unsigned int size, signed int flag):
151 table(size, slot(flag)),
152 verrou(2)
153 {
154 next_index = 0;
155
156 for(unsigned int i = 0; i < size; ++i)
157 empty_slot.push_back(i);
158 }
159
160 template <class T> void ratelier_gather<T>::worker_push_one(unsigned int slot, std::unique_ptr<T> & one, signed int flag)
161 {
162 verrou.lock();
163
164 try
165 {
166 while(empty_slot.empty() // no free slot available
167 || ((empty_slot.size() == 1 && slot != next_index) // one slot available and we do not provide the lowest expecting slot num
168 && corres.begin() != corres.end() && (corres.begin())->first != next_index)) // and lowest slot is still not received
169 verrou.wait(cond_full);
170
171 std::map<unsigned int, unsigned int>::iterator it = corres.find(slot);
172 unsigned int index;
173
174 if(it != corres.end())
175 throw exception_range("the ratelier_gather index to fill is already used");
176
177 index = empty_slot.back();
178
179 // sanity checks
180
181 if(index >= table.size())
182 throw THREADAR_BUG;
183 if( ! table[index].empty)
184 throw THREADAR_BUG;
185
186 // recording the change
187
188 corres[slot] = index;
189 table[index].obj = std::move(one);
190 table[index].empty = false;
191 table[index].index = slot;
192 table[index].flag = flag;
193
194 empty_slot.pop_back();
195
196 if(verrou.get_waiting_thread_count(cond_pending_data) > 0)
197 if(corres.find(next_index) != corres.end()) // some data can be gathered
198 verrou.signal(cond_pending_data); // awaking the gathering thread
199 }
200 catch(...)
201 {
202 verrou.unlock(); // unlock first, as broadcast/signal may be the cause of the exception
203 verrou.broadcast(cond_pending_data);
204 verrou.broadcast(cond_full);
205 throw;
206 }
207 verrou.unlock();
208 }
209
210 template <class T> void ratelier_gather<T>::gather(std::deque<std::unique_ptr<T> > & ones, std::deque<signed int> & flag)
211 {
212 ones.clear();
213 flag.clear();
214
215 verrou.lock();
216 try
217 {
218 std::map<unsigned int, unsigned int>::iterator it;
219 std::map<unsigned int, unsigned int>::iterator tmp;
220
221 do
222 {
223 it = corres.begin();
224
225 while(it != corres.end())
226 {
227 if(it->first > next_index) // not continuous sequence
228 break; // exiting the inner while loop
229
230 if(it->first == next_index)
231 {
232
233 // sanity checks
234
235 if(it->second >= table.size())
236 throw THREADAR_BUG;
237 if(table[it->second].index != next_index)
238 throw THREADAR_BUG;
239 if(table[it->second].empty)
240 throw THREADAR_BUG;
241 if( ! table[it->second].obj)
242 throw THREADAR_BUG;
243
244 // recording the change
245
246 ones.push_back(std::move(table[it->second].obj));
247 flag.push_back(table[it->second].flag);
248
249 table[it->second].empty = true;
250 empty_slot.push_back(it->second);
251 tmp = it;
252 ++it;
253 corres.erase(tmp);
254 ++next_index;
255 }
256 else // integer overload occured for the index
257 ++it; // skipping this entry
258 }
259
260 if(ones.empty())
261 verrou.wait(cond_pending_data);
262 }
263 while(ones.empty());
264
265 if(verrou.get_waiting_thread_count(cond_full) > 0)
266 verrou.broadcast(cond_full); // awake all pending workers
267 }
268 catch(...)
269 {
270 verrou.unlock(); // unlock first, as broadcast() may be the cause of the exception
271 verrou.broadcast(cond_pending_data);
272 verrou.broadcast(cond_full);
273 throw;
274 }
275 verrou.unlock();
276
277 if(ones.size() != flag.size())
278 throw THREADAR_BUG;
279 }
280
281 template <class T> void ratelier_gather<T>::reset()
282 {
283 unsigned int size = table.size();
284 next_index = 0;
285 corres.clear();
286 empty_slot.clear();
287
288 for(unsigned int i = 0; i < size; ++i)
289 {
290 table[i].obj.reset();
291 table[i].empty = true;
292 empty_slot.push_back(i);
293 }
294
295 verrou.lock();
296 verrou.broadcast(cond_pending_data);
297 verrou.broadcast(cond_full);
298 verrou.unlock();
299 }
300
301
302} // end of namespace
303
304#endif
Wrapper around the Posix pthread_cond_t object and its associated mutex.
Definition condition.hpp:46
Exception used to report out or range value or argument.
the class ratelier_gather has a fixed length range of slots of arbitrary defined object type
void worker_push_one(unsigned int slot, std::unique_ptr< T > &one, signed int flag=0)
provides to a worker thread a mean to given data with its associated index to a gathering thread
void gather(std::deque< std::unique_ptr< T > > &ones, std::deque< signed int > &flag)
obtain the lowest continuous filled slots of the ratelier_gather and free them
void reset()
reset the object in its prestine state
#define THREADAR_BUG
Macro used to throw an exception_bug when execution reach that statement.
defines the mutex C++ class
This is the only namespace used in libthreadar and all symbols provided by libthreadar are member of ...
Definition barrier.hpp:46