source: Sophya/trunk/ArchTOIPipe/Kernel/toisegment.cc@ 1963

Last change on this file since 1963 was 1944, checked in by aubourg, 24 years ago

decorrelateur wiener

File size: 18.3 KB
Line 
1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
5// $Id: toisegment.cc,v 1.26 2002-03-23 23:05:22 aubourg Exp $
6
7#include "toisegment.h"
8
9#include <iostream.h>
10
11#ifndef MAXINT
12#define MAXINT 2147483647
13#endif
14
15static pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
16static void cout_lock() {pthread_mutex_lock(&cout_mutex);}
17static void cout_unlock() {pthread_mutex_unlock(&cout_mutex);}
18#define LOG(_xxx_)
19/*
20#define LOG(_xxx_) \
21cout_lock(); \
22_xxx_; \
23cout_unlock();
24*/
25
26/******************************/
27/******* TOISegmented *********/
28/******************************/
29
30TOISegmented::TOISegmented(int bufsz, int maxseg) {
31 master = new MasterView(bufsz, maxseg, "");
32 setName("TOISegmented");
33 syncOldWay = false;
34}
35
36TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) {
37 master = new MasterView(bufsz, maxseg, nm);
38 setName(nm);
39 syncOldWay = false;
40}
41
42TOISegmented::TOISegmented(char* cnm, int bufsz, int maxseg) {
43 string nm = cnm;
44 master = new MasterView(bufsz, maxseg, nm);
45 setName(nm);
46 syncOldWay = false;
47}
48
49TOISegmented::~TOISegmented() {
50 delete master;
51}
52
53
54void TOISegmented::addConsumer(TOIProcessor* p) {
55 TOI::addConsumer(p);
56 master->nConsumers = consumers.size();
57}
58
59
60double TOISegmented::getData(int i) { /* reader thread */
61 return master->getData(i);
62}
63
64void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */
65 data = master->getData(i);
66 flag = master->getFlag(i);
67}
68
69void TOISegmented::getData(int i, int n, double* data, uint_8* flg) { /* reader thread */
70 master->getData(i, n, data, flg);
71}
72
73void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */
74 master->putData(i, value, flag);
75}
76
77void TOISegmented::putData(int i, int n, double const* val, uint_8 const* flg) { /* writer thread */
78 master->putData(i, n, val, flg);
79}
80
81void TOISegmented::putDone() {
82 master->putDone();
83}
84
85void TOISegmented::wontNeedBefore(int i) { /* reader thread */
86 master->getView()->wontNeedBefore(i);
87}
88
89TOI::DataStatus TOISegmented::isDataAvail(int i, int j) {
90 // return master->getView()->isDataAvail(i, j);
91 cout << "TOISegmented::isDataAvail unimplemented" << endl;
92 throw PError("TOISegmented::isDataAvail unimplemented");
93}
94
95TOI::DataStatus TOISegmented::isDataAvail(int i) {
96 return isDataAvail(i,i);
97}
98
99TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) {
100 return isDataAvail(i,j);
101}
102
103TOI::DataStatus TOISegmented::isDataAvailNL(int i) {
104 return isDataAvail(i);
105}
106
107void TOISegmented::waitForData(int iStart, int iEnd) {
108 // get will wait...
109}
110
111void TOISegmented::waitForData(int i) {
112 // get will wait...
113}
114
115void TOISegmented::waitForAnyData() {
116 cout << "TOISegmented::waitForAnyData unimplemented" << endl;
117 throw PError("TOISegmented::waitForAnyData unimplemented");
118}
119
120int TOISegmented::nextDataAvail(int iAfter) {
121 cout << "TOISegmented::nextDataAvail" << endl;
122 return iAfter+1;
123}
124
125bool TOISegmented::hasSomeData() {
126 cout << "TOISegmented::hasSomeData" << endl;
127 return true;
128}
129
130void TOISegmented::doPutData(int i, double value, uint_8 flag) {
131 cout << "TOISegmented::doPutData unimplemented" << endl;
132 throw PError("TOISegmented::doPutData unimplemented");
133}
134
135void TOISegmented::doGetData(int i, double& value, uint_8& flag) {
136 cout << "TOISegmented::doGetData unimplemented" << endl;
137 throw PError("TOISegmented::doGetData unimplemented");
138}
139
140
141/*******************************/
142/******* BufferSegment *********/
143/*******************************/
144
145TOISegmented::BufferSegment::BufferSegment(int sz) {
146 status = NEW;
147 bufferSize = sz;
148 sn0 = -1;
149
150 refcount = 0;
151
152 data = new double[sz];
153 flags = new uint_8[sz];
154
155 pthread_mutex_init(&refcount_mutex, NULL);
156}
157
158TOISegmented::BufferSegment::~BufferSegment() {
159 if (refcount > 0) {
160 cerr << "TOISegment : delete Buffer with refcount>0" << endl;
161 throw(ForbiddenError("TOISegment : delete Buffer with refcount>0"));
162 }
163 LOG(cout << "Destroying buffersegment sn0 "<< sn0 << endl);
164 delete[] data;
165 delete[] flags;
166 pthread_mutex_destroy(&refcount_mutex);
167}
168
169void TOISegmented::BufferSegment::getData(int sn, int n, double* d, uint_8* f) {
170 checkCommitted();
171 checkInRange(sn);
172 checkInRange(sn+n-1);
173 memcpy(d, data+(sn-sn0), n*sizeof(double));
174 if (f != NULL) {
175 memcpy(f, flags+(sn-sn0), n*sizeof(uint_8));
176 }
177}
178
179void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) {
180 /* writer thread*/
181 if (status == NEW) {
182 status = WRITE;
183 sn0 = sn;
184 }
185 if (status == COMMITTED) {
186 cerr << "TOISegment : putData in committed buffer" << endl;
187 throw(ForbiddenError("TOISegment : putData in committed buffer"));
188 }
189 checkInRange(sn);
190 data[sn-sn0] = d;
191 flags[sn-sn0] = f;
192}
193
194void TOISegmented::BufferSegment::putData(int sn, int n, double const* d, uint_8 const* f) {
195 if (status == NEW) {
196 status = WRITE;
197 sn0 = sn;
198 }
199 if (status == COMMITTED) {
200 cerr << "TOISegment : putData in committed buffer" << endl;
201 throw(ForbiddenError("TOISegment : putData in committed buffer"));
202 }
203 checkInRange(sn);
204 checkInRange(sn+n-1);
205 memcpy(data+(sn-sn0), d, n*sizeof(double));
206 if (f != NULL) {
207 memcpy(flags+(sn-sn0), f, n*sizeof(uint_8));
208 } else {
209 memset(flags+(sn-sn0), 0, n*sizeof(uint_8));
210 }
211}
212
213void TOISegmented::BufferSegment::incRefCount() {
214 pthread_mutex_lock(&refcount_mutex);
215 refcount++;
216 pthread_mutex_unlock(&refcount_mutex);
217}
218
219void TOISegmented::BufferSegment::decRefCount() {
220 pthread_mutex_lock(&refcount_mutex);
221 int nrc = --refcount;
222 pthread_mutex_unlock(&refcount_mutex);
223 if (nrc<0) {
224 cerr << "TOISegment : buffer refcount < 0" << endl;
225 throw(ForbiddenError("TOISegment : buffer refcount < 0"));
226 }
227}
228
229int TOISegmented::BufferSegment::getRefCount() {
230 pthread_mutex_lock(&refcount_mutex);
231 int rc = refcount;
232 pthread_mutex_unlock(&refcount_mutex);
233 return rc;
234}
235
236
237/*******************************/
238/********** BufferView *********/
239/*******************************/
240
241TOISegmented::BufferView::BufferView(MasterView* m) {
242 master = m;
243 sn0 = -1;
244 segmentSize = m->segmentSize;
245 firstNeeded = -1;
246 waiting = false;
247 waitingFor = -1;
248}
249
250TOISegmented::BufferView::~BufferView() {
251}
252
253double TOISegmented::BufferView::getData(int sn) { /* Single-thread, reader thread*/
254 ensure(sn);
255 int seg = (sn-sn0)/segmentSize;
256 return segments[seg]->getData(sn);
257}
258
259uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread, reader thread */
260 ensure(sn);
261 int seg = (sn-sn0)/segmentSize;
262 return segments[seg]->getFlag(sn);
263}
264
265void TOISegmented::BufferView::getData(int sn, int n, double* dat, uint_8* flg) { /* Single-thread, reader thread */
266 ensure(sn);
267 ensure(sn+n-1);
268
269 int sn1 = sn;
270 int nsam = n;
271 double* pdat = dat;
272 uint_8* pflg = flg;
273
274 while (true) {
275 int seg = (sn1-sn0)/segmentSize;
276 BufferSegment* s = segments[seg];
277 int snmax = s->sn0 + s->bufferSize - 1;
278 int sn2 = snmax > (sn1+nsam-1) ? (sn1+nsam-1) : snmax;
279 int nget = sn2-sn1+1;
280 s->getData(sn1, nget, pdat, pflg);
281 pdat += nget;
282 if (pflg != NULL) pflg += nget;
283 nsam -= nget;
284 sn1 += nget;
285 if (nsam <= 0) break;
286 }
287}
288
289void TOISegmented::BufferView::ensure(int sn) { /* Single-thread, reader thread */
290 if (sn < sn0) {
291 cout << "TOISegmented::BufferView::ensure requested sample before first" << endl;
292 cout << "sn " << sn << " sn0 " << sn0 << endl;
293 abort();
294 }
295
296 if (sn0 < 0 ||
297 sn >= sn0 + segmentSize*segments.size()) {
298 LOG(cout << master->name << " BufferView "
299 << hex << this << dec << ": read fault for " << sn << endl)
300 sync();
301 pthread_mutex_lock(&(master->read_wait_mutex));
302 while (sn0<0 || sn >= sn0 + segmentSize*segments.size()) {
303 wait(sn); // must be atomic with loop test // $CHECK$ est-ce vrai ?
304 pthread_mutex_unlock(&(master->read_wait_mutex));
305 LOG(cout << master->name << " BufferView " << hex << this << dec << ": waiting for " << sn << endl)
306 sync();
307 pthread_mutex_lock(&(master->read_wait_mutex));
308 }
309 pthread_mutex_unlock(&(master->read_wait_mutex));
310
311 LOG(cout << master->name << " BufferView " << hex << this << dec << ": resuming for " << sn
312 << " now data for " << sn0 << " - " << sn0 + segmentSize*segments.size()
313 << " in " << segments.size() << " segments " << endl)
314 }
315}
316
317void TOISegmented::BufferView::sync() { /* Single-thread, reader thread */
318 master->updateView(this); // update me !
319}
320
321void TOISegmented::BufferView::wait(int sn) { /* reader thread, master read wait lock taken */
322 //pthread_mutex_lock(&(master->read_wait_mutex));
323 waiting = true;
324 waitingFor = sn;
325 master->waitingViews++;
326 pthread_cond_wait(&(master->read_wait_condv), &(master->read_wait_mutex));
327 waiting = false;
328 waitingFor = -1;
329 master->waitingViews--;
330 //pthread_mutex_unlock(&(master->read_wait_mutex));
331}
332
333
334void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */
335 if (sn > firstNeeded) {
336 firstNeeded = sn;
337 // C'est peut-etre le moment de faire unl sync, si on est coince par ailleurs...
338 //pthread_mutex_lock(&(master->read_wait_mutex));
339 if (sn >= sn0 + segmentSize){ // && master->waitingViews>0) {
340// LOG(cout<<master->name<< " sync on wontneed, waitingViews=" << master->waitingViews << endl);
341 LOG(cout<<master->name<< " sync on wontneed, sn = " << sn << " sn0 = " << sn0 << endl);
342 sync();
343 }
344 //pthread_mutex_unlock(&(master->read_wait_mutex));
345 }
346}
347
348
349/*******************************/
350/********** MasterView *********/
351/*******************************/
352
353TOISegmented::MasterView::MasterView(int bufsz, int maxseg, string nm) {
354 currentSegment = NULL;
355 maxSegments = maxseg;
356 segmentSize = bufsz;
357 sn0 = -1;
358 nConsumers = 0;
359 name = nm;
360
361 pthread_mutex_init(&views_mutex, NULL);
362 pthread_mutex_init(&read_wait_mutex, NULL);
363 pthread_cond_init(&write_wait_condv, NULL);
364 pthread_cond_init(&read_wait_condv, NULL);
365 pthread_key_create(&buffer_key, BufferDestroy);
366
367 waitingOnWrite = false;
368 waitingViews = 0;
369}
370
371TOISegmented::MasterView::~MasterView() {
372 pthread_mutex_destroy(&views_mutex);
373 pthread_mutex_destroy(&read_wait_mutex);
374 pthread_cond_destroy(&write_wait_condv);
375 pthread_cond_destroy(&read_wait_condv);
376 pthread_key_delete(buffer_key);
377
378 // There should not be any BufferView left... Check ?
379
380 // decrement count for segments ?
381}
382
383void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { /* writer thread */
384 if (sn0<0) {
385 LOG(cout << "***MasterView::putData sn0<0 -> " << sn << endl);
386 sn0=sn;
387 }
388 // can fit in current segment ?
389 if (!(currentSegment != NULL &&
390 sn >= currentSegment->sn0 &&
391 sn < currentSegment->sn0 + currentSegment->bufferSize)) {
392 LOG(cout << name << " MasterView::putData, need extend for " << sn << endl)
393 nextSegment();
394 }
395 currentSegment->putData(sn, data, flags);
396}
397
398void TOISegmented::MasterView::putData(int sn, int n, double const* data, uint_8 const* flags) { /* writer thread */
399 if (sn0<0) {
400 LOG(cout << "***MasterView::putData sn0<0 -> " << sn << endl);
401 sn0=sn;
402 }
403 double const* pdat = data;
404 uint_8 const* pflg = flags;
405 int nsam = n;
406 int sn1 = sn;
407 while (true) {
408 // maximum that current segment can take
409 int snmax = -1;
410 if (currentSegment != NULL) {
411 snmax = currentSegment->sn0 + currentSegment->bufferSize-1;
412 }
413 int sn2 = snmax > (sn1+nsam-1) ? (sn1+nsam-1) : snmax;
414 if (snmax>0) {
415 int nput = sn2-sn1+1;
416 currentSegment->putData(sn1, nput, pdat, pflg);
417 pdat += nput;
418 if (pflg != NULL) pflg += nput;
419 nsam -= nput;
420 sn1 += nput;
421 }
422 if (nsam <= 0) break;
423 nextSegment();
424 currentSegment->putData(sn1, 0, 0); // dummy, to initialize sn0 in segment : add method ?
425 }
426}
427
428double TOISegmented::MasterView::getData(int sn) { /* reader thread */
429 return getView()->getData(sn); /* thread-specific */
430}
431
432uint_8 TOISegmented::MasterView::getFlag(int sn) { /* reader thread */
433 return getView()->getFlag(sn);
434}
435
436void TOISegmented::MasterView::getData(int sn, int n, double* dat, uint_8* flg) { /* reader thread */
437 getView()->getData(sn, n, dat, flg);
438}
439
440TOISegmented::BufferView* TOISegmented::MasterView::getView() { /* reader thread */
441 BufferView* bv = (BufferView*) pthread_getspecific(buffer_key);
442 if (bv == NULL) {
443 bv = createView();
444 LOG(cout << "creating new view " << hex << bv << dec << endl)
445 pthread_setspecific(buffer_key, bv);
446 }
447 return bv;
448}
449
450void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ /* views locked */
451 pthread_mutex_lock(&read_wait_mutex);
452 pthread_cond_broadcast(&read_wait_condv);
453 pthread_mutex_unlock(&read_wait_mutex);
454}
455
456void TOISegmented::MasterView::signalWrite() { /* reader thread */ /* views locked */
457 if (waitingOnWrite) {
458 LOG(cout << name << " MasterView : signal for wait on write" << endl)
459 pthread_cond_signal(&write_wait_condv); // only one thread can be sleeping
460 }
461}
462
463void TOISegmented::MasterView::putDone() {
464 nextSegment(); // cree un segment inutile, a nettoyer
465}
466
467void TOISegmented::MasterView::nextSegment() { /* writer thread */
468 // The current segment, if any, is now committed. A new
469 // blank buffer is allocated, if any.
470 pthread_mutex_lock(&views_mutex);
471
472 LOG(cout << "MasterView::nextSegment "
473 << segments.size()+1 << "/" << maxSegments << endl)
474
475 if (currentSegment != NULL) {
476 currentSegment->status = BufferSegment::COMMITTED;
477 segments.push_back(currentSegment);
478 }
479
480 currentSegment = NULL;
481 while (segments.size() >= maxSegments) {
482 waitForCleaning();
483 }
484
485 currentSegment = new BufferSegment(segmentSize);
486 currentSegment->incRefCount();
487 signalWaitingViews(); // they can ask to be updated !!
488 pthread_mutex_unlock(&views_mutex);
489}
490
491void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ /* views locked */
492 LOG(cout << name << " MasterView : write wait for clean for " << sn0 << endl)
493 waitingOnWrite = true;
494 checkDeadLock();
495 pthread_cond_wait(&write_wait_condv, &views_mutex);
496 LOG(cout << name << " MasterView : wait done" << endl)
497}
498
499TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */
500 BufferView* bv = new BufferView(this);
501 pthread_mutex_lock(&views_mutex);
502 allViews.insert(bv);
503 pthread_mutex_unlock(&views_mutex);
504 updateView(bv);
505 return bv;
506}
507
508void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */
509 pthread_mutex_lock(&views_mutex);
510
511// int oldBegin = bv->sn0;
512// int oldEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
513 int oldBegin = sn0;
514 int oldEnd = sn0 + bv->segmentSize * segments.size();
515
516 for (vector<BufferSegment*>::iterator i = bv->segments.begin();
517 i != bv->segments.end(); i++) {
518 (*i)->decRefCount();
519 }
520
521 bv->segments.clear();
522
523 // utiliser firstNeeded de toutes les vues pour faire le menage chez
524 // nous.
525
526 // A condition que tous les consumers se soient fait connaitre...
527
528 if (nConsumers == allViews.size()) {
529 int firstNeeded = MAXINT;
530 for (set<BufferView*>::iterator i = allViews.begin();
531 i != allViews.end(); i++) {
532 LOG(cout << name << " View firstneeded " << (*i)->firstNeeded << endl);
533 if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded;
534 }
535
536 LOG(cout << name << " MasterView : firstNeeded = " << firstNeeded << endl);
537
538 vector<BufferSegment*>::iterator j = segments.begin();
539 bool clean = false;
540 {for (vector<BufferSegment*>::iterator i = segments.begin();
541 i != segments.end(); i++) {
542 //LOG(cout << "Updating : rc = " << (*i)->getRefCount() << " sn0 = " << (*i)->sn0 << endl;);
543 if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 1)) {
544 clean = true;
545 (*i)->decRefCount();
546 delete (*i);
547 j = i;
548 }
549 }}
550 j++;
551 if (clean) {
552 segments.erase(segments.begin(),j);
553 sn0 = (*segments.begin())->sn0;
554 LOG(cout << "MasterView : purged until " << sn0 << endl);
555 }
556 } else {
557 LOG(cout << "MasterView : not yet all consumer thread known "<< allViews.size()
558 << "/" << nConsumers << endl);
559 }
560
561 {for (vector<BufferSegment*>::iterator i = segments.begin();
562 i != segments.end(); i++) {
563 if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) {
564 (*i)->incRefCount();
565 bv->segments.push_back(*i);
566 }
567 }}
568
569 bv->sn0 = -1;
570 int newEnd = -1;
571 if (segments.size() > 0) {
572 bv->sn0 = bv->segments[0]->sn0;
573 newEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
574 }
575
576 if (sn0 > oldBegin) { // nettoyage de fait, reveiller le writer thread si besoin
577 signalWrite();
578 }
579
580 LOG(cout << name << " sync for " << hex << bv << dec << " : "
581 << oldBegin << " - " << oldEnd << " --> "
582 << sn0 << " - " << newEnd << endl);
583
584 if (newEnd > oldEnd) { // Nouveautes, reveiller les reader threads si besoin
585 signalWaitingViews();
586 }
587 pthread_mutex_unlock(&views_mutex);
588}
589
590void TOISegmented::MasterView::checkDeadLock() { /* views locked */
591 // There is a possible deadlock if no view can free old segments
592 // and we are waiting for write.
593
594 // we need to record "wont need before" for each view, and
595 // signal deadlock if any view that needs first segment data is sleeping
596 // while we are asleep
597
598 pthread_mutex_lock(&read_wait_mutex);
599 if (!waitingOnWrite) {
600 pthread_mutex_unlock(&read_wait_mutex);
601 return; // no problem, there is an active writer
602 }
603
604 // Is any sleeping view needing our first segment ?
605
606 for (set<BufferView*>::iterator i=allViews.begin();
607 i != allViews.end(); i++) {
608 if ((*i)->waiting && (*i)->firstNeeded < sn0+segmentSize) {
609 cout << "**** DEADLOCK detected ****" << endl;
610 cout << "We are waiting on write (buffer is full)"<< endl;
611 cout << "but a waiting reader still needs our first segment" << endl;
612 cout << "restart with bigger buffers" << endl;
613
614 cout << "master has range " << sn0 << " - "
615 << sn0+segments.size()*segmentSize-1 <<endl;
616 cout << "in " << segments.size() << " segments" << endl;
617 cout << "waiting bufferview is waiting for " << (*i)->waitingFor
618 << " and still needs " << (*i)->firstNeeded << endl;
619 cout << "it has sn0= " << (*i)->sn0 << " nseg = " << (*i)->segments.size()
620 << " snlast = " << (*i)->sn0+(*i)->segments.size()*(*i)->segmentSize -1
621 << endl;
622
623 abort();
624 }
625 }
626 pthread_mutex_unlock(&read_wait_mutex);
627}
628
629
630
631void TOISegmented::MasterView::BufferDestroy(void* p) {
632 BufferView* bv = (BufferView*) p;
633 delete bv;
634}
Note: See TracBrowser for help on using the repository browser.