source: Sophya/trunk/ArchTOIPipe/Kernel/toisegment.cc@ 1750

Last change on this file since 1750 was 1750, checked in by aubourg, 24 years ago

bug i/o bloc avec requested sample

File size: 17.5 KB
RevLine 
[1738]1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
[1750]5// $Id: toisegment.cc,v 1.18 2001-11-12 14:12:54 aubourg Exp $
[1738]6
[1670]7#include "toisegment.h"
[1671]8
[1697]9#include <iostream.h>
10
[1690]11#ifndef MAXINT
12#define MAXINT 2147483647
13#endif
14
[1692]15static pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
16static void cout_lock() {pthread_mutex_lock(&cout_mutex);}
17static void cout_unlock() {pthread_mutex_unlock(&cout_mutex);}
[1711]18#define LOG(_xxx_)
19/*
[1692]20#define LOG(_xxx_) \
21cout_lock(); \
22_xxx_; \
23cout_unlock();
[1711]24*/
[1692]25
[1689]26/******************************/
27/******* TOISegmented *********/
28/******************************/
29
30TOISegmented::TOISegmented(int bufsz, int maxseg) {
[1710]31 master = new MasterView(bufsz, maxseg, "");
[1689]32 setName("TOISegmented");
[1740]33 syncOldWay = false;
[1689]34}
35
36TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) {
[1710]37 master = new MasterView(bufsz, maxseg, nm);
[1689]38 setName(nm);
[1740]39 syncOldWay = false;
[1689]40}
41
[1699]42TOISegmented::TOISegmented(char* cnm, int bufsz, int maxseg) {
43 string nm = cnm;
[1710]44 master = new MasterView(bufsz, maxseg, nm);
[1699]45 setName(nm);
[1740]46 syncOldWay = false;
[1699]47}
48
[1689]49TOISegmented::~TOISegmented() {
50 delete master;
51}
52
53
[1692]54void TOISegmented::addConsumer(TOIProcessor* p) {
55 TOI::addConsumer(p);
56 master->nConsumers = consumers.size();
57}
58
59
[1689]60double TOISegmented::getData(int i) { /* reader thread */
61 return master->getData(i);
62}
63
64void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */
65 data = master->getData(i);
66 flag = master->getFlag(i);
67}
68
[1743]69void TOISegmented::getData(int i, int n, double* data, uint_8* flg) { /* reader thread */
70 master->getData(i, n, data, flg);
71}
72
[1689]73void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */
74 master->putData(i, value, flag);
75}
76
[1743]77void TOISegmented::putData(int i, int n, double const* val, uint_8 const* flg) { /* writer thread */
78 master->putData(i, n, val, flg);
79}
80
[1689]81void TOISegmented::putDone() {
82 master->putDone();
83}
84
85void TOISegmented::wontNeedBefore(int i) { /* reader thread */
86 master->getView()->wontNeedBefore(i);
87}
88
89TOI::DataStatus TOISegmented::isDataAvail(int i, int j) {
90 // return master->getView()->isDataAvail(i, j);
91 cout << "TOISegmented::isDataAvail unimplemented" << endl;
92 throw PError("TOISegmented::isDataAvail unimplemented");
93}
94
95TOI::DataStatus TOISegmented::isDataAvail(int i) {
96 return isDataAvail(i,i);
97}
98
99TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) {
100 return isDataAvail(i,j);
101}
102
103void TOISegmented::waitForData(int iStart, int iEnd) {
104 // get will wait...
105}
106
107void TOISegmented::waitForData(int i) {
108 // get will wait...
109}
110
111void TOISegmented::waitForAnyData() {
112 cout << "TOISegmented::waitForAnyData unimplemented" << endl;
113 throw PError("TOISegmented::waitForAnyData unimplemented");
114}
115
116int TOISegmented::nextDataAvail(int iAfter) {
117 cout << "TOISegmented::nextDataAvail" << endl;
118 return iAfter+1;
119}
120
121bool TOISegmented::hasSomeData() {
122 cout << "TOISegmented::hasSomeData" << endl;
123 return true;
124}
125
[1692]126void TOISegmented::doPutData(int i, double value, uint_8 flag) {
[1689]127 cout << "TOISegmented::doPutData unimplemented" << endl;
128 throw PError("TOISegmented::doPutData unimplemented");
129}
130
131void TOISegmented::doGetData(int i, double& value, uint_8& flag) {
132 cout << "TOISegmented::doGetData unimplemented" << endl;
133 throw PError("TOISegmented::doGetData unimplemented");
134}
135
136
[1686]137/*******************************/
138/******* BufferSegment *********/
139/*******************************/
[1671]140
141TOISegmented::BufferSegment::BufferSegment(int sz) {
142 status = NEW;
143 bufferSize = sz;
144 sn0 = -1;
145
146 refcount = 0;
147
148 data = new double[sz];
149 flags = new uint_8[sz];
150
151 pthread_mutex_init(&refcount_mutex, NULL);
152}
153
154TOISegmented::BufferSegment::~BufferSegment() {
155 if (refcount > 0) {
156 throw(ForbiddenError("TOISegment : delete Buffer with refcount>0"));
157 }
[1700]158 LOG(cout << "Destroying buffersegment sn0 "<< sn0 << endl);
[1671]159 delete[] data;
160 delete[] flags;
161 pthread_mutex_destroy(&refcount_mutex);
162}
163
[1743]164void TOISegmented::BufferSegment::getData(int sn, int n, double* d, uint_8* f) {
165 checkCommitted();
166 checkInRange(sn);
167 checkInRange(sn+n-1);
168 memcpy(d, data+(sn-sn0), n*sizeof(double));
169 if (f != NULL) {
170 memcpy(f, flags+(sn-sn0), n*sizeof(uint_8));
171 }
172}
173
[1689]174void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) {
175 /* writer thread*/
[1671]176 if (status == NEW) {
177 status = WRITE;
178 sn0 = sn;
179 }
180 if (status == COMMITTED) {
181 throw(ForbiddenError("TOISegment : putData in committed buffer"));
182 }
183 checkInRange(sn);
184 data[sn-sn0] = d;
185 flags[sn-sn0] = f;
186}
187
[1743]188void TOISegmented::BufferSegment::putData(int sn, int n, double const* d, uint_8 const* f) {
[1744]189 if (status == NEW) {
190 status = WRITE;
191 sn0 = sn;
192 }
193 if (status == COMMITTED) {
194 throw(ForbiddenError("TOISegment : putData in committed buffer"));
195 }
[1743]196 checkInRange(sn);
197 checkInRange(sn+n-1);
198 memcpy(data+(sn-sn0), d, n*sizeof(double));
199 if (f != NULL) {
200 memcpy(flags+(sn-sn0), f, n*sizeof(uint_8));
201 } else {
202 memset(flags+(sn-sn0), 0, n*sizeof(uint_8));
203 }
204}
205
[1671]206void TOISegmented::BufferSegment::incRefCount() {
207 pthread_mutex_lock(&refcount_mutex);
208 refcount++;
209 pthread_mutex_unlock(&refcount_mutex);
210}
211
212void TOISegmented::BufferSegment::decRefCount() {
213 pthread_mutex_lock(&refcount_mutex);
214 int nrc = --refcount;
215 pthread_mutex_unlock(&refcount_mutex);
216 if (nrc<0)
217 throw(ForbiddenError("TOISegment : buffer refcount < 0"));
218}
219
220int TOISegmented::BufferSegment::getRefCount() {
221 pthread_mutex_lock(&refcount_mutex);
222 int rc = refcount;
223 pthread_mutex_unlock(&refcount_mutex);
224 return rc;
225}
226
[1686]227
228/*******************************/
229/********** BufferView *********/
230/*******************************/
231
232TOISegmented::BufferView::BufferView(MasterView* m) {
233 master = m;
234 sn0 = -1;
235 segmentSize = m->segmentSize;
[1689]236 firstNeeded = -1;
[1692]237 waiting = false;
[1686]238}
239
240TOISegmented::BufferView::~BufferView() {
241}
242
[1692]243double TOISegmented::BufferView::getData(int sn) { /* Single-thread, reader thread*/
[1686]244 ensure(sn);
245 int seg = (sn-sn0)/segmentSize;
246 return segments[seg]->getData(sn);
247}
248
[1692]249uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread, reader thread */
[1686]250 ensure(sn);
251 int seg = (sn-sn0)/segmentSize;
252 return segments[seg]->getFlag(sn);
253}
254
[1743]255void TOISegmented::BufferView::getData(int sn, int n, double* dat, uint_8* flg) { /* Single-thread, reader thread */
256 ensure(sn);
257 ensure(sn+n-1);
258
259 int sn1 = sn;
260 int nsam = n;
261 double* pdat = dat;
262 uint_8* pflg = flg;
263
264 while (true) {
265 int seg = (sn1-sn0)/segmentSize;
266 BufferSegment* s = segments[seg];
267 int snmax = s->sn0 + s->bufferSize - 1;
268 int sn2 = snmax > (sn1+nsam-1) ? (sn1+nsam-1) : snmax;
269 int nget = sn2-sn1+1;
270 s->getData(sn1, nget, pdat, pflg);
271 pdat += nget;
272 if (pflg != NULL) pflg += nget;
273 nsam -= nget;
274 sn1 += nget;
275 if (nsam <= 0) break;
276 }
277}
278
[1692]279void TOISegmented::BufferView::ensure(int sn) { /* Single-thread, reader thread */
[1686]280 if (sn < sn0) {
[1750]281 cout << "TOISegmented::BufferView::ensure requested sample before first" << endl;
282 cout << "sn " << sn << " sn0 " << sn0 << endl;
[1692]283 abort();
[1686]284 }
285
[1692]286 if (sn0 < 0 ||
287 sn >= sn0 + segmentSize*segments.size()) {
[1710]288 LOG(cout << master->name << " BufferView "
289 << hex << this << dec << ": read fault for " << sn << endl)
[1686]290 sync();
[1709]291 pthread_mutex_lock(&(master->read_wait_mutex));
[1692]292 while (sn0<0 || sn >= sn0 + segmentSize*segments.size()) {
[1711]293 wait(); // must be atomic with loop test // $CHECK$ est-ce vrai ?
[1709]294 pthread_mutex_unlock(&(master->read_wait_mutex));
[1710]295 LOG(cout << master->name << " BufferView " << hex << this << dec << ": waiting for " << sn << endl)
[1686]296 sync();
[1709]297 pthread_mutex_lock(&(master->read_wait_mutex));
[1686]298 }
[1709]299 pthread_mutex_unlock(&(master->read_wait_mutex));
300
[1710]301 LOG(cout << master->name << " BufferView " << hex << this << dec << ": resuming for " << sn
[1692]302 << " now data for " << sn0 << " - " << sn0 + segmentSize*segments.size()
303 << " in " << segments.size() << " segments " << endl)
[1686]304 }
305}
306
[1692]307void TOISegmented::BufferView::sync() { /* Single-thread, reader thread */
[1686]308 master->updateView(this); // update me !
309}
310
[1709]311void TOISegmented::BufferView::wait() { /* reader thread, master read wait lock taken */
312 //pthread_mutex_lock(&(master->read_wait_mutex));
[1692]313 waiting = true;
[1711]314 master->waitingViews++;
[1692]315 pthread_cond_wait(&(master->read_wait_condv), &(master->read_wait_mutex));
316 waiting = false;
[1711]317 master->waitingViews--;
[1709]318 //pthread_mutex_unlock(&(master->read_wait_mutex));
[1686]319}
320
321
[1689]322void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */
323 if (sn > firstNeeded) {
324 firstNeeded = sn;
[1711]325 // C'est peut-etre le moment de faire unl sync, si on est coince par ailleurs...
326 //pthread_mutex_lock(&(master->read_wait_mutex));
327 if (sn >= sn0 + segmentSize){ // && master->waitingViews>0) {
328// LOG(cout<<master->name<< " sync on wontneed, waitingViews=" << master->waitingViews << endl);
329 LOG(cout<<master->name<< " sync on wontneed, sn = " << sn << " sn0 = " << sn0 << endl);
330 sync();
331 }
332 //pthread_mutex_unlock(&(master->read_wait_mutex));
[1689]333 }
334}
[1686]335
[1689]336
[1686]337/*******************************/
338/********** MasterView *********/
339/*******************************/
340
[1710]341TOISegmented::MasterView::MasterView(int bufsz, int maxseg, string nm) {
[1686]342 currentSegment = NULL;
343 maxSegments = maxseg;
344 segmentSize = bufsz;
345 sn0 = -1;
[1692]346 nConsumers = 0;
[1710]347 name = nm;
[1686]348
349 pthread_mutex_init(&views_mutex, NULL);
[1692]350 pthread_mutex_init(&read_wait_mutex, NULL);
351 pthread_cond_init(&write_wait_condv, NULL);
352 pthread_cond_init(&read_wait_condv, NULL);
[1686]353 pthread_key_create(&buffer_key, BufferDestroy);
354
[1689]355 waitingOnWrite = false;
[1711]356 waitingViews = 0;
[1686]357}
358
359TOISegmented::MasterView::~MasterView() {
360 pthread_mutex_destroy(&views_mutex);
[1692]361 pthread_mutex_destroy(&read_wait_mutex);
362 pthread_cond_destroy(&write_wait_condv);
363 pthread_cond_destroy(&read_wait_condv);
[1686]364 pthread_key_delete(buffer_key);
365
366 // There should not be any BufferView left... Check ?
367
368 // decrement count for segments ?
369}
370
[1692]371void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { /* writer thread */
372 if (sn0<0) {
[1750]373 LOG(cout << "***MasterView::putData sn0<0 -> " << sn << endl);
[1692]374 sn0=sn;
375 }
[1686]376 // can fit in current segment ?
377 if (!(currentSegment != NULL &&
378 sn >= currentSegment->sn0 &&
379 sn < currentSegment->sn0 + currentSegment->bufferSize)) {
[1710]380 LOG(cout << name << " MasterView::putData, need extend for " << sn << endl)
[1686]381 nextSegment();
382 }
383 currentSegment->putData(sn, data, flags);
384}
385
[1743]386void TOISegmented::MasterView::putData(int sn, int n, double const* data, uint_8 const* flags) { /* writer thread */
387 if (sn0<0) {
[1750]388 LOG(cout << "***MasterView::putData sn0<0 -> " << sn << endl);
[1743]389 sn0=sn;
390 }
391 double const* pdat = data;
392 uint_8 const* pflg = flags;
393 int nsam = n;
394 int sn1 = sn;
395 while (true) {
396 // maximum that current segment can take
397 int snmax = -1;
398 if (currentSegment != NULL) {
399 snmax = currentSegment->sn0 + currentSegment->bufferSize-1;
400 }
401 int sn2 = snmax > (sn1+nsam-1) ? (sn1+nsam-1) : snmax;
402 if (snmax>0) {
403 int nput = sn2-sn1+1;
404 currentSegment->putData(sn1, nput, pdat, pflg);
405 pdat += nput;
406 if (pflg != NULL) pflg += nput;
407 nsam -= nput;
408 sn1 += nput;
409 }
410 if (nsam <= 0) break;
411 nextSegment();
412 currentSegment->putData(sn1, 0, 0); // dummy, to initialize sn0 in segment : add method ?
413 }
414}
415
[1692]416double TOISegmented::MasterView::getData(int sn) { /* reader thread */
417 return getView()->getData(sn); /* thread-specific */
[1686]418}
419
[1692]420uint_8 TOISegmented::MasterView::getFlag(int sn) { /* reader thread */
[1686]421 return getView()->getFlag(sn);
422}
423
[1743]424void TOISegmented::MasterView::getData(int sn, int n, double* dat, uint_8* flg) { /* reader thread */
425 getView()->getData(sn, n, dat, flg);
426}
427
[1686]428TOISegmented::BufferView* TOISegmented::MasterView::getView() { /* reader thread */
429 BufferView* bv = (BufferView*) pthread_getspecific(buffer_key);
430 if (bv == NULL) {
431 bv = createView();
[1692]432 LOG(cout << "creating new view " << hex << bv << dec << endl)
[1686]433 pthread_setspecific(buffer_key, bv);
434 }
435 return bv;
436}
[1689]437
[1692]438void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ /* views locked */
439 pthread_mutex_lock(&read_wait_mutex);
440 pthread_cond_broadcast(&read_wait_condv);
441 pthread_mutex_unlock(&read_wait_mutex);
442}
[1689]443
[1692]444void TOISegmented::MasterView::signalWrite() { /* reader thread */ /* views locked */
445 if (waitingOnWrite) {
[1710]446 LOG(cout << name << " MasterView : signal for wait on write" << endl)
[1692]447 pthread_cond_signal(&write_wait_condv); // only one thread can be sleeping
[1689]448 }
449}
450
451void TOISegmented::MasterView::putDone() {
452 nextSegment(); // cree un segment inutile, a nettoyer
453}
454
455void TOISegmented::MasterView::nextSegment() { /* writer thread */
456 // The current segment, if any, is now committed. A new
457 // blank buffer is allocated, if any.
[1692]458 pthread_mutex_lock(&views_mutex);
[1689]459
[1692]460 LOG(cout << "MasterView::nextSegment "
461 << segments.size()+1 << "/" << maxSegments << endl)
[1689]462
463 if (currentSegment != NULL) {
464 currentSegment->status = BufferSegment::COMMITTED;
465 segments.push_back(currentSegment);
466 }
467
468 currentSegment = NULL;
469 while (segments.size() >= maxSegments) {
470 waitForCleaning();
471 }
472
473 currentSegment = new BufferSegment(segmentSize);
[1700]474 currentSegment->incRefCount();
[1689]475 signalWaitingViews(); // they can ask to be updated !!
[1692]476 pthread_mutex_unlock(&views_mutex);
[1689]477}
478
[1692]479void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ /* views locked */
[1710]480 LOG(cout << name << " MasterView : write wait for clean for " << sn0 << endl)
[1689]481 waitingOnWrite = true;
482 checkDeadLock();
[1692]483 pthread_cond_wait(&write_wait_condv, &views_mutex);
[1710]484 LOG(cout << name << " MasterView : wait done" << endl)
[1689]485}
486
487TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */
488 BufferView* bv = new BufferView(this);
[1692]489 pthread_mutex_lock(&views_mutex);
[1690]490 allViews.insert(bv);
[1692]491 pthread_mutex_unlock(&views_mutex);
[1689]492 updateView(bv);
493 return bv;
494}
495
496void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */
497 pthread_mutex_lock(&views_mutex);
498
[1711]499// int oldBegin = bv->sn0;
500// int oldEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
501 int oldBegin = sn0;
502 int oldEnd = sn0 + bv->segmentSize * segments.size();
[1689]503
504 for (vector<BufferSegment*>::iterator i = bv->segments.begin();
505 i != bv->segments.end(); i++) {
506 (*i)->decRefCount();
507 }
508
509 bv->segments.clear();
510
[1690]511 // utiliser firstNeeded de toutes les vues pour faire le menage chez
[1689]512 // nous.
[1690]513
[1692]514 // A condition que tous les consumers se soient fait connaitre...
[1689]515
[1692]516 if (nConsumers == allViews.size()) {
517 int firstNeeded = MAXINT;
518 for (set<BufferView*>::iterator i = allViews.begin();
519 i != allViews.end(); i++) {
[1711]520 LOG(cout << name << " View firstneeded " << (*i)->firstNeeded << endl);
[1692]521 if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded;
522 }
523
[1710]524 LOG(cout << name << " MasterView : firstNeeded = " << firstNeeded << endl);
[1692]525
526 vector<BufferSegment*>::iterator j = segments.begin();
527 bool clean = false;
528 for (vector<BufferSegment*>::iterator i = segments.begin();
529 i != segments.end(); i++) {
[1700]530 //LOG(cout << "Updating : rc = " << (*i)->getRefCount() << " sn0 = " << (*i)->sn0 << endl;);
531 if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 1)) {
[1692]532 clean = true;
[1700]533 (*i)->decRefCount();
534 delete (*i);
[1692]535 j = i;
536 }
537 }
[1700]538 j++;
[1690]539 if (clean) {
540 segments.erase(segments.begin(),j);
541 sn0 = (*segments.begin())->sn0;
[1692]542 LOG(cout << "MasterView : purged until " << sn0 << endl);
[1690]543 }
[1692]544 } else {
545 LOG(cout << "MasterView : not yet all consumer thread known "<< allViews.size()
546 << "/" << nConsumers << endl);
547 }
[1690]548
549 for (vector<BufferSegment*>::iterator i = segments.begin();
[1689]550 i != segments.end(); i++) {
551 if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) {
552 (*i)->incRefCount();
553 bv->segments.push_back(*i);
554 }
555 }
556
557 bv->sn0 = -1;
558 int newEnd = -1;
559 if (segments.size() > 0) {
560 bv->sn0 = bv->segments[0]->sn0;
561 newEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
562 }
563
[1711]564 if (sn0 > oldBegin) { // nettoyage de fait, reveiller le writer thread si besoin
[1692]565 signalWrite();
[1689]566 }
567
[1710]568 LOG(cout << name << " sync for " << hex << bv << dec << " : "
[1692]569 << oldBegin << " - " << oldEnd << " --> "
[1711]570 << sn0 << " - " << newEnd << endl);
[1692]571
572 if (newEnd > oldEnd) { // Nouveautes, reveiller les reader threads si besoin
[1689]573 signalWaitingViews();
574 }
[1692]575 pthread_mutex_unlock(&views_mutex);
[1689]576}
577
[1692]578void TOISegmented::MasterView::checkDeadLock() { /* views locked */
[1689]579 // There is a possible deadlock if no view can free old segments
580 // and we are waiting for write.
581
582 // we need to record "wont need before" for each view, and
583 // signal deadlock if any view that needs first segment data is sleeping
584 // while we are asleep
585
[1692]586 pthread_mutex_lock(&read_wait_mutex);
587 if (!waitingOnWrite) {
588 pthread_mutex_unlock(&read_wait_mutex);
589 return; // no problem, there is an active writer
590 }
[1689]591
592 // Is any sleeping view needing our first segment ?
593
[1692]594 for (set<BufferView*>::iterator i=allViews.begin();
595 i != allViews.end(); i++) {
596 if ((*i)->waiting && (*i)->firstNeeded < sn0+segmentSize) {
[1689]597 cout << "**** DEADLOCK detected ****" << endl;
598 cout << "We are waiting on write (buffer is full)"<< endl;
599 cout << "but a waiting reader still needs our first segment" << endl;
600 cout << "restart with bigger buffers" << endl;
601 abort();
602 }
603 }
[1692]604 pthread_mutex_unlock(&read_wait_mutex);
[1689]605}
606
607
608
609void TOISegmented::MasterView::BufferDestroy(void* p) {
610 BufferView* bv = (BufferView*) p;
611 delete bv;
612}
Note: See TracBrowser for help on using the repository browser.