source: Sophya/trunk/ArchTOIPipe/Kernel/toisegment.cc@ 1695

Last change on this file since 1695 was 1693, checked in by aubourg, 24 years ago

final pour toisegment

File size: 13.0 KB
Line 
1#include "toisegment.h"
2
3#ifndef MAXINT
4#define MAXINT 2147483647
5#endif
6
7static pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
8static void cout_lock() {pthread_mutex_lock(&cout_mutex);}
9static void cout_unlock() {pthread_mutex_unlock(&cout_mutex);}
10#define LOG(_xxx_)
11/*
12#define LOG(_xxx_) \
13cout_lock(); \
14_xxx_; \
15cout_unlock();
16*/
17
18/******************************/
19/******* TOISegmented *********/
20/******************************/
21
22TOISegmented::TOISegmented(int bufsz, int maxseg) {
23 master = new MasterView(bufsz, maxseg);
24 setName("TOISegmented");
25}
26
27TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) {
28 master = new MasterView(bufsz, maxseg);
29 setName(nm);
30}
31
32TOISegmented::~TOISegmented() {
33 delete master;
34}
35
36
37void TOISegmented::addConsumer(TOIProcessor* p) {
38 TOI::addConsumer(p);
39 master->nConsumers = consumers.size();
40}
41
42
43double TOISegmented::getData(int i) { /* reader thread */
44 return master->getData(i);
45}
46
47void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */
48 data = master->getData(i);
49 flag = master->getFlag(i);
50}
51
52void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */
53 master->putData(i, value, flag);
54}
55
56void TOISegmented::putDone() {
57 master->putDone();
58}
59
60void TOISegmented::wontNeedBefore(int i) { /* reader thread */
61 master->getView()->wontNeedBefore(i);
62}
63
64TOI::DataStatus TOISegmented::isDataAvail(int i, int j) {
65 // return master->getView()->isDataAvail(i, j);
66 cout << "TOISegmented::isDataAvail unimplemented" << endl;
67 throw PError("TOISegmented::isDataAvail unimplemented");
68}
69
70TOI::DataStatus TOISegmented::isDataAvail(int i) {
71 return isDataAvail(i,i);
72}
73
74TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) {
75 return isDataAvail(i,j);
76}
77
78void TOISegmented::waitForData(int iStart, int iEnd) {
79 // get will wait...
80}
81
82void TOISegmented::waitForData(int i) {
83 // get will wait...
84}
85
86void TOISegmented::waitForAnyData() {
87 cout << "TOISegmented::waitForAnyData unimplemented" << endl;
88 throw PError("TOISegmented::waitForAnyData unimplemented");
89}
90
91int TOISegmented::nextDataAvail(int iAfter) {
92 cout << "TOISegmented::nextDataAvail" << endl;
93 return iAfter+1;
94}
95
96bool TOISegmented::hasSomeData() {
97 cout << "TOISegmented::hasSomeData" << endl;
98 return true;
99}
100
101void TOISegmented::doPutData(int i, double value, uint_8 flag) {
102 cout << "TOISegmented::doPutData unimplemented" << endl;
103 throw PError("TOISegmented::doPutData unimplemented");
104}
105
106void TOISegmented::doGetData(int i, double& value, uint_8& flag) {
107 cout << "TOISegmented::doGetData unimplemented" << endl;
108 throw PError("TOISegmented::doGetData unimplemented");
109}
110
111
112/*******************************/
113/******* BufferSegment *********/
114/*******************************/
115
116TOISegmented::BufferSegment::BufferSegment(int sz) {
117 status = NEW;
118 bufferSize = sz;
119 sn0 = -1;
120
121 refcount = 0;
122
123 data = new double[sz];
124 flags = new uint_8[sz];
125
126 pthread_mutex_init(&refcount_mutex, NULL);
127}
128
129TOISegmented::BufferSegment::~BufferSegment() {
130 if (refcount > 0) {
131 throw(ForbiddenError("TOISegment : delete Buffer with refcount>0"));
132 }
133 delete[] data;
134 delete[] flags;
135 pthread_mutex_destroy(&refcount_mutex);
136}
137
138void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) {
139 /* writer thread*/
140 if (status == NEW) {
141 status = WRITE;
142 sn0 = sn;
143 }
144 if (status == COMMITTED) {
145 throw(ForbiddenError("TOISegment : putData in committed buffer"));
146 }
147 checkInRange(sn);
148 data[sn-sn0] = d;
149 flags[sn-sn0] = f;
150}
151
152void TOISegmented::BufferSegment::incRefCount() {
153 pthread_mutex_lock(&refcount_mutex);
154 refcount++;
155 pthread_mutex_unlock(&refcount_mutex);
156}
157
158void TOISegmented::BufferSegment::decRefCount() {
159 pthread_mutex_lock(&refcount_mutex);
160 int nrc = --refcount;
161 pthread_mutex_unlock(&refcount_mutex);
162 if (nrc<0)
163 throw(ForbiddenError("TOISegment : buffer refcount < 0"));
164}
165
166int TOISegmented::BufferSegment::getRefCount() {
167 pthread_mutex_lock(&refcount_mutex);
168 int rc = refcount;
169 pthread_mutex_unlock(&refcount_mutex);
170 return rc;
171}
172
173
174/*******************************/
175/********** BufferView *********/
176/*******************************/
177
178TOISegmented::BufferView::BufferView(MasterView* m) {
179 master = m;
180 sn0 = -1;
181 segmentSize = m->segmentSize;
182 firstNeeded = -1;
183 waiting = false;
184}
185
186TOISegmented::BufferView::~BufferView() {
187}
188
189double TOISegmented::BufferView::getData(int sn) { /* Single-thread, reader thread*/
190 ensure(sn);
191 int seg = (sn-sn0)/segmentSize;
192 return segments[seg]->getData(sn);
193}
194
195uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread, reader thread */
196 ensure(sn);
197 int seg = (sn-sn0)/segmentSize;
198 return segments[seg]->getFlag(sn);
199}
200
201void TOISegmented::BufferView::ensure(int sn) { /* Single-thread, reader thread */
202 if (sn < sn0) {
203 LOG(cout << "TOISegmented::BufferView::ensure requested sample before first" << endl);
204 LOG(cout << "sn " << sn << " sn0 " << sn0 << endl);
205 abort();
206 }
207
208 if (sn0 < 0 ||
209 sn >= sn0 + segmentSize*segments.size()) {
210 LOG(cout << "BufferView " << hex << this << dec << ": read fault for " << sn << endl)
211 sync();
212 while (sn0<0 || sn >= sn0 + segmentSize*segments.size()) {
213 wait();
214 LOG(cout << "BufferView " << hex << this << dec << ": waiting for " << sn << endl)
215 sync();
216 }
217 LOG(cout << "BufferView " << hex << this << dec << ": resuming for " << sn
218 << " now data for " << sn0 << " - " << sn0 + segmentSize*segments.size()
219 << " in " << segments.size() << " segments " << endl)
220 }
221}
222
223void TOISegmented::BufferView::sync() { /* Single-thread, reader thread */
224 master->updateView(this); // update me !
225}
226
227void TOISegmented::BufferView::wait() { /* reader thread */
228 pthread_mutex_lock(&(master->read_wait_mutex));
229 waiting = true;
230 pthread_cond_wait(&(master->read_wait_condv), &(master->read_wait_mutex));
231 waiting = false;
232 pthread_mutex_unlock(&(master->read_wait_mutex));
233}
234
235
236void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */
237 if (sn > firstNeeded) {
238 firstNeeded = sn;
239 }
240}
241
242
243/*******************************/
244/********** MasterView *********/
245/*******************************/
246
247TOISegmented::MasterView::MasterView(int bufsz, int maxseg) {
248 currentSegment = NULL;
249 maxSegments = maxseg;
250 segmentSize = bufsz;
251 sn0 = -1;
252 nConsumers = 0;
253
254 pthread_mutex_init(&views_mutex, NULL);
255 pthread_mutex_init(&read_wait_mutex, NULL);
256 pthread_cond_init(&write_wait_condv, NULL);
257 pthread_cond_init(&read_wait_condv, NULL);
258 pthread_key_create(&buffer_key, BufferDestroy);
259
260 waitingOnWrite = false;
261}
262
263TOISegmented::MasterView::~MasterView() {
264 pthread_mutex_destroy(&views_mutex);
265 pthread_mutex_destroy(&read_wait_mutex);
266 pthread_cond_destroy(&write_wait_condv);
267 pthread_cond_destroy(&read_wait_condv);
268 pthread_key_delete(buffer_key);
269
270 // There should not be any BufferView left... Check ?
271
272 // decrement count for segments ?
273}
274
275void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { /* writer thread */
276 if (sn0<0) {
277 LOG(cout << "***MasterView::putData sn0<0" << endl)
278 sn0=sn;
279 }
280 // can fit in current segment ?
281 if (!(currentSegment != NULL &&
282 sn >= currentSegment->sn0 &&
283 sn < currentSegment->sn0 + currentSegment->bufferSize)) {
284 LOG(cout << "MasterView::putData, need extend for " << sn << endl)
285 nextSegment();
286 }
287 currentSegment->putData(sn, data, flags);
288}
289
290double TOISegmented::MasterView::getData(int sn) { /* reader thread */
291 return getView()->getData(sn); /* thread-specific */
292}
293
294uint_8 TOISegmented::MasterView::getFlag(int sn) { /* reader thread */
295 return getView()->getFlag(sn);
296}
297
298TOISegmented::BufferView* TOISegmented::MasterView::getView() { /* reader thread */
299 BufferView* bv = (BufferView*) pthread_getspecific(buffer_key);
300 if (bv == NULL) {
301 bv = createView();
302 LOG(cout << "creating new view " << hex << bv << dec << endl)
303 pthread_setspecific(buffer_key, bv);
304 }
305 return bv;
306}
307
308void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ /* views locked */
309 pthread_mutex_lock(&read_wait_mutex);
310 pthread_cond_broadcast(&read_wait_condv);
311 pthread_mutex_unlock(&read_wait_mutex);
312}
313
314void TOISegmented::MasterView::signalWrite() { /* reader thread */ /* views locked */
315 if (waitingOnWrite) {
316 LOG(cout << "MasterView : signal for wait on write" << endl)
317 pthread_cond_signal(&write_wait_condv); // only one thread can be sleeping
318 }
319}
320
321void TOISegmented::MasterView::putDone() {
322 nextSegment(); // cree un segment inutile, a nettoyer
323}
324
325void TOISegmented::MasterView::nextSegment() { /* writer thread */
326 // The current segment, if any, is now committed. A new
327 // blank buffer is allocated, if any.
328 pthread_mutex_lock(&views_mutex);
329
330 LOG(cout << "MasterView::nextSegment "
331 << segments.size()+1 << "/" << maxSegments << endl)
332
333 if (currentSegment != NULL) {
334 currentSegment->status = BufferSegment::COMMITTED;
335 segments.push_back(currentSegment);
336 }
337
338 currentSegment = NULL;
339 while (segments.size() >= maxSegments) {
340 waitForCleaning();
341 }
342
343 currentSegment = new BufferSegment(segmentSize);
344 signalWaitingViews(); // they can ask to be updated !!
345 pthread_mutex_unlock(&views_mutex);
346}
347
348void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ /* views locked */
349 LOG(cout << "MasterView : write wait for clean for " << sn0 << endl)
350 waitingOnWrite = true;
351 checkDeadLock();
352 pthread_cond_wait(&write_wait_condv, &views_mutex);
353 LOG(cout << "MasterView : wait done" << endl)
354}
355
356TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */
357 BufferView* bv = new BufferView(this);
358 pthread_mutex_lock(&views_mutex);
359 allViews.insert(bv);
360 pthread_mutex_unlock(&views_mutex);
361 updateView(bv);
362 return bv;
363}
364
365void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */
366 pthread_mutex_lock(&views_mutex);
367
368 int oldBegin = bv->sn0;
369 int oldEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
370
371 for (vector<BufferSegment*>::iterator i = bv->segments.begin();
372 i != bv->segments.end(); i++) {
373 (*i)->decRefCount();
374 }
375
376 bv->segments.clear();
377
378 // utiliser firstNeeded de toutes les vues pour faire le menage chez
379 // nous.
380
381 // A condition que tous les consumers se soient fait connaitre...
382
383 if (nConsumers == allViews.size()) {
384 int firstNeeded = MAXINT;
385 for (set<BufferView*>::iterator i = allViews.begin();
386 i != allViews.end(); i++) {
387 if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded;
388 }
389
390 LOG(cout << "MasterView : firstNeeded = " << firstNeeded << endl);
391
392 vector<BufferSegment*>::iterator j = segments.begin();
393 bool clean = false;
394 for (vector<BufferSegment*>::iterator i = segments.begin();
395 i != segments.end(); i++) {
396 if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 0)) {
397 clean = true;
398 j = i;
399 }
400 }
401 if (clean) {
402 segments.erase(segments.begin(),j);
403 sn0 = (*segments.begin())->sn0;
404 LOG(cout << "MasterView : purged until " << sn0 << endl);
405 }
406 } else {
407 LOG(cout << "MasterView : not yet all consumer thread known "<< allViews.size()
408 << "/" << nConsumers << endl);
409 }
410
411 for (vector<BufferSegment*>::iterator i = segments.begin();
412 i != segments.end(); i++) {
413 if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) {
414 (*i)->incRefCount();
415 bv->segments.push_back(*i);
416 }
417 }
418
419 bv->sn0 = -1;
420 int newEnd = -1;
421 if (segments.size() > 0) {
422 bv->sn0 = bv->segments[0]->sn0;
423 newEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
424 }
425
426 if (bv->sn0 > oldBegin) { // nettoyage de fait, reveiller le writer thread si besoin
427 signalWrite();
428 }
429
430 LOG(cout << "sync for " << hex << bv << dec << " : "
431 << oldBegin << " - " << oldEnd << " --> "
432 << bv->sn0 << " - " << newEnd << endl);
433
434 if (newEnd > oldEnd) { // Nouveautes, reveiller les reader threads si besoin
435 signalWaitingViews();
436 }
437 pthread_mutex_unlock(&views_mutex);
438}
439
440void TOISegmented::MasterView::checkDeadLock() { /* views locked */
441 // There is a possible deadlock if no view can free old segments
442 // and we are waiting for write.
443
444 // we need to record "wont need before" for each view, and
445 // signal deadlock if any view that needs first segment data is sleeping
446 // while we are asleep
447
448 pthread_mutex_lock(&read_wait_mutex);
449 if (!waitingOnWrite) {
450 pthread_mutex_unlock(&read_wait_mutex);
451 return; // no problem, there is an active writer
452 }
453
454 // Is any sleeping view needing our first segment ?
455
456 for (set<BufferView*>::iterator i=allViews.begin();
457 i != allViews.end(); i++) {
458 if ((*i)->waiting && (*i)->firstNeeded < sn0+segmentSize) {
459 cout << "**** DEADLOCK detected ****" << endl;
460 cout << "We are waiting on write (buffer is full)"<< endl;
461 cout << "but a waiting reader still needs our first segment" << endl;
462 cout << "restart with bigger buffers" << endl;
463 abort();
464 }
465 }
466 pthread_mutex_unlock(&read_wait_mutex);
467}
468
469
470
471void TOISegmented::MasterView::BufferDestroy(void* p) {
472 BufferView* bv = (BufferView*) p;
473 delete bv;
474}
Note: See TracBrowser for help on using the repository browser.