24#ifndef LIBTHREADAR_RATELIER_GATHER_HPP
25#define LIBTHREADAR_RATELIER_GATHER_HPP
116 void worker_push_one(
unsigned int slot, std::unique_ptr<T> & one,
signed int flag = 0);
124 void gather(std::deque<std::unique_ptr<T> > & ones, std::deque<signed int> & flag);
131 static const unsigned int cond_pending_data = 0;
132 static const unsigned int cond_full = 1;
136 std::unique_ptr<T> obj;
141 slot(
signed int val) { empty =
true; flag = val; };
142 slot(
const slot & ref) { obj.reset(); empty = ref.empty; index = ref.index; flag = ref.flag; };
145 unsigned int next_index;
146 std::vector<slot> table;
147 std::map<unsigned int, unsigned int> corres;
148 std::deque<unsigned int> empty_slot;
153 table(size, slot(flag)),
158 for(
unsigned int i = 0; i < size; ++i)
159 empty_slot.push_back(i);
168 while(empty_slot.empty()
169 || ((empty_slot.size() == 1 && slot != next_index)
170 && corres.begin() != corres.end() && (corres.begin())->first != next_index))
171 verrou.wait(cond_full);
173 std::map<unsigned int, unsigned int>::iterator it = corres.find(slot);
176 if(it != corres.end())
177 throw exception_range(
"the ratelier_gather index to fill is already used");
179 index = empty_slot.back();
183 if(index >= table.size())
185 if( ! table[index].empty)
190 corres[slot] = index;
191 table[index].obj = std::move(one);
192 table[index].empty =
false;
193 table[index].index = slot;
194 table[index].flag = flag;
196 empty_slot.pop_back();
198 if(verrou.get_waiting_thread_count(cond_pending_data) > 0)
199 if(corres.find(next_index) != corres.end())
200 verrou.signal(cond_pending_data);
205 verrou.broadcast(cond_pending_data);
206 verrou.broadcast(cond_full);
220 std::map<unsigned int, unsigned int>::iterator it;
221 std::map<unsigned int, unsigned int>::iterator tmp;
227 while(it != corres.end())
229 if(it->first > next_index)
232 if(it->first == next_index)
237 if(it->second >= table.size())
239 if(table[it->second].index != next_index)
241 if(table[it->second].empty)
243 if( ! table[it->second].obj)
248 ones.push_back(std::move(table[it->second].obj));
249 flag.push_back(table[it->second].flag);
251 table[it->second].empty =
true;
252 empty_slot.push_back(it->second);
263 verrou.wait(cond_pending_data);
267 if(verrou.get_waiting_thread_count(cond_full) > 0)
268 verrou.broadcast(cond_full);
273 verrou.broadcast(cond_pending_data);
274 verrou.broadcast(cond_full);
279 if(ones.size() != flag.size())
285 unsigned int size = table.size();
290 for(
unsigned int i = 0; i < size; ++i)
292 table[i].obj.reset();
293 table[i].empty =
true;
294 empty_slot.push_back(i);
298 verrou.broadcast(cond_pending_data);
299 verrou.broadcast(cond_full);
Wrapper around the Posix pthread_cond_t object and its associated mutex.
Exception used to report out or range value or argument.
the class ratelier_gather has a fixed length range of slots of arbitrary defined object type
void worker_push_one(unsigned int slot, std::unique_ptr< T > &one, signed int flag=0)
provides to a worker thread a mean to given data with its associated index to a gathering thread
void gather(std::deque< std::unique_ptr< T > > &ones, std::deque< signed int > &flag)
obtain the lowest continuous filled slots of the ratelier_gather and free them
void reset()
reset the object in its prestine state
#define THREADAR_BUG
Macro used to throw an exception_bug when execution reach that statement.
defines the mutex C++ class
This is the only namespace used in libthreadar and all symbols provided by libthreadar are member of ...