source: Sophya/trunk/ArchTOIPipe/Kernel/toisegment.cc@ 1740

Last change on this file since 1740 was 1740, checked in by aubourg, 24 years ago

une optim pour quand on n'a que des segmented

File size: 14.8 KB
RevLine 
[1738]1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
[1740]5// $Id: toisegment.cc,v 1.15 2001-11-08 23:23:58 aubourg Exp $
[1738]6
[1670]7#include "toisegment.h"
[1671]8
[1697]9#include <iostream.h>
10
[1690]11#ifndef MAXINT
12#define MAXINT 2147483647
13#endif
14
[1692]15static pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
16static void cout_lock() {pthread_mutex_lock(&cout_mutex);}
17static void cout_unlock() {pthread_mutex_unlock(&cout_mutex);}
[1711]18#define LOG(_xxx_)
19/*
[1692]20#define LOG(_xxx_) \
21cout_lock(); \
22_xxx_; \
23cout_unlock();
[1711]24*/
[1692]25
[1689]26/******************************/
27/******* TOISegmented *********/
28/******************************/
29
30TOISegmented::TOISegmented(int bufsz, int maxseg) {
[1710]31 master = new MasterView(bufsz, maxseg, "");
[1689]32 setName("TOISegmented");
[1740]33 syncOldWay = false;
[1689]34}
35
36TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) {
[1710]37 master = new MasterView(bufsz, maxseg, nm);
[1689]38 setName(nm);
[1740]39 syncOldWay = false;
[1689]40}
41
[1699]42TOISegmented::TOISegmented(char* cnm, int bufsz, int maxseg) {
43 string nm = cnm;
[1710]44 master = new MasterView(bufsz, maxseg, nm);
[1699]45 setName(nm);
[1740]46 syncOldWay = false;
[1699]47}
48
[1689]49TOISegmented::~TOISegmented() {
50 delete master;
51}
52
53
[1692]54void TOISegmented::addConsumer(TOIProcessor* p) {
55 TOI::addConsumer(p);
56 master->nConsumers = consumers.size();
57}
58
59
[1689]60double TOISegmented::getData(int i) { /* reader thread */
61 return master->getData(i);
62}
63
64void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */
65 data = master->getData(i);
66 flag = master->getFlag(i);
67}
68
69void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */
70 master->putData(i, value, flag);
71}
72
73void TOISegmented::putDone() {
74 master->putDone();
75}
76
77void TOISegmented::wontNeedBefore(int i) { /* reader thread */
78 master->getView()->wontNeedBefore(i);
79}
80
81TOI::DataStatus TOISegmented::isDataAvail(int i, int j) {
82 // return master->getView()->isDataAvail(i, j);
83 cout << "TOISegmented::isDataAvail unimplemented" << endl;
84 throw PError("TOISegmented::isDataAvail unimplemented");
85}
86
87TOI::DataStatus TOISegmented::isDataAvail(int i) {
88 return isDataAvail(i,i);
89}
90
91TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) {
92 return isDataAvail(i,j);
93}
94
95void TOISegmented::waitForData(int iStart, int iEnd) {
96 // get will wait...
97}
98
99void TOISegmented::waitForData(int i) {
100 // get will wait...
101}
102
103void TOISegmented::waitForAnyData() {
104 cout << "TOISegmented::waitForAnyData unimplemented" << endl;
105 throw PError("TOISegmented::waitForAnyData unimplemented");
106}
107
108int TOISegmented::nextDataAvail(int iAfter) {
109 cout << "TOISegmented::nextDataAvail" << endl;
110 return iAfter+1;
111}
112
113bool TOISegmented::hasSomeData() {
114 cout << "TOISegmented::hasSomeData" << endl;
115 return true;
116}
117
[1692]118void TOISegmented::doPutData(int i, double value, uint_8 flag) {
[1689]119 cout << "TOISegmented::doPutData unimplemented" << endl;
120 throw PError("TOISegmented::doPutData unimplemented");
121}
122
123void TOISegmented::doGetData(int i, double& value, uint_8& flag) {
124 cout << "TOISegmented::doGetData unimplemented" << endl;
125 throw PError("TOISegmented::doGetData unimplemented");
126}
127
128
[1686]129/*******************************/
130/******* BufferSegment *********/
131/*******************************/
[1671]132
133TOISegmented::BufferSegment::BufferSegment(int sz) {
134 status = NEW;
135 bufferSize = sz;
136 sn0 = -1;
137
138 refcount = 0;
139
140 data = new double[sz];
141 flags = new uint_8[sz];
142
143 pthread_mutex_init(&refcount_mutex, NULL);
144}
145
146TOISegmented::BufferSegment::~BufferSegment() {
147 if (refcount > 0) {
148 throw(ForbiddenError("TOISegment : delete Buffer with refcount>0"));
149 }
[1700]150 LOG(cout << "Destroying buffersegment sn0 "<< sn0 << endl);
[1671]151 delete[] data;
152 delete[] flags;
153 pthread_mutex_destroy(&refcount_mutex);
154}
155
[1689]156void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) {
157 /* writer thread*/
[1671]158 if (status == NEW) {
159 status = WRITE;
160 sn0 = sn;
161 }
162 if (status == COMMITTED) {
163 throw(ForbiddenError("TOISegment : putData in committed buffer"));
164 }
165 checkInRange(sn);
166 data[sn-sn0] = d;
167 flags[sn-sn0] = f;
168}
169
170void TOISegmented::BufferSegment::incRefCount() {
171 pthread_mutex_lock(&refcount_mutex);
172 refcount++;
173 pthread_mutex_unlock(&refcount_mutex);
174}
175
176void TOISegmented::BufferSegment::decRefCount() {
177 pthread_mutex_lock(&refcount_mutex);
178 int nrc = --refcount;
179 pthread_mutex_unlock(&refcount_mutex);
180 if (nrc<0)
181 throw(ForbiddenError("TOISegment : buffer refcount < 0"));
182}
183
184int TOISegmented::BufferSegment::getRefCount() {
185 pthread_mutex_lock(&refcount_mutex);
186 int rc = refcount;
187 pthread_mutex_unlock(&refcount_mutex);
188 return rc;
189}
190
[1686]191
192/*******************************/
193/********** BufferView *********/
194/*******************************/
195
196TOISegmented::BufferView::BufferView(MasterView* m) {
197 master = m;
198 sn0 = -1;
199 segmentSize = m->segmentSize;
[1689]200 firstNeeded = -1;
[1692]201 waiting = false;
[1686]202}
203
204TOISegmented::BufferView::~BufferView() {
205}
206
[1692]207double TOISegmented::BufferView::getData(int sn) { /* Single-thread, reader thread*/
[1686]208 ensure(sn);
209 int seg = (sn-sn0)/segmentSize;
210 return segments[seg]->getData(sn);
211}
212
[1692]213uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread, reader thread */
[1686]214 ensure(sn);
215 int seg = (sn-sn0)/segmentSize;
216 return segments[seg]->getFlag(sn);
217}
218
[1692]219void TOISegmented::BufferView::ensure(int sn) { /* Single-thread, reader thread */
[1686]220 if (sn < sn0) {
[1692]221 LOG(cout << "TOISegmented::BufferView::ensure requested sample before first" << endl);
222 LOG(cout << "sn " << sn << " sn0 " << sn0 << endl);
223 abort();
[1686]224 }
225
[1692]226 if (sn0 < 0 ||
227 sn >= sn0 + segmentSize*segments.size()) {
[1710]228 LOG(cout << master->name << " BufferView "
229 << hex << this << dec << ": read fault for " << sn << endl)
[1686]230 sync();
[1709]231 pthread_mutex_lock(&(master->read_wait_mutex));
[1692]232 while (sn0<0 || sn >= sn0 + segmentSize*segments.size()) {
[1711]233 wait(); // must be atomic with loop test // $CHECK$ est-ce vrai ?
[1709]234 pthread_mutex_unlock(&(master->read_wait_mutex));
[1710]235 LOG(cout << master->name << " BufferView " << hex << this << dec << ": waiting for " << sn << endl)
[1686]236 sync();
[1709]237 pthread_mutex_lock(&(master->read_wait_mutex));
[1686]238 }
[1709]239 pthread_mutex_unlock(&(master->read_wait_mutex));
240
[1710]241 LOG(cout << master->name << " BufferView " << hex << this << dec << ": resuming for " << sn
[1692]242 << " now data for " << sn0 << " - " << sn0 + segmentSize*segments.size()
243 << " in " << segments.size() << " segments " << endl)
[1686]244 }
245}
246
[1692]247void TOISegmented::BufferView::sync() { /* Single-thread, reader thread */
[1686]248 master->updateView(this); // update me !
249}
250
[1709]251void TOISegmented::BufferView::wait() { /* reader thread, master read wait lock taken */
252 //pthread_mutex_lock(&(master->read_wait_mutex));
[1692]253 waiting = true;
[1711]254 master->waitingViews++;
[1692]255 pthread_cond_wait(&(master->read_wait_condv), &(master->read_wait_mutex));
256 waiting = false;
[1711]257 master->waitingViews--;
[1709]258 //pthread_mutex_unlock(&(master->read_wait_mutex));
[1686]259}
260
261
[1689]262void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */
263 if (sn > firstNeeded) {
264 firstNeeded = sn;
[1711]265 // C'est peut-etre le moment de faire unl sync, si on est coince par ailleurs...
266 //pthread_mutex_lock(&(master->read_wait_mutex));
267 if (sn >= sn0 + segmentSize){ // && master->waitingViews>0) {
268// LOG(cout<<master->name<< " sync on wontneed, waitingViews=" << master->waitingViews << endl);
269 LOG(cout<<master->name<< " sync on wontneed, sn = " << sn << " sn0 = " << sn0 << endl);
270 sync();
271 }
272 //pthread_mutex_unlock(&(master->read_wait_mutex));
[1689]273 }
274}
[1686]275
[1689]276
[1686]277/*******************************/
278/********** MasterView *********/
279/*******************************/
280
[1710]281TOISegmented::MasterView::MasterView(int bufsz, int maxseg, string nm) {
[1686]282 currentSegment = NULL;
283 maxSegments = maxseg;
284 segmentSize = bufsz;
285 sn0 = -1;
[1692]286 nConsumers = 0;
[1710]287 name = nm;
[1686]288
289 pthread_mutex_init(&views_mutex, NULL);
[1692]290 pthread_mutex_init(&read_wait_mutex, NULL);
291 pthread_cond_init(&write_wait_condv, NULL);
292 pthread_cond_init(&read_wait_condv, NULL);
[1686]293 pthread_key_create(&buffer_key, BufferDestroy);
294
[1689]295 waitingOnWrite = false;
[1711]296 waitingViews = 0;
[1686]297}
298
299TOISegmented::MasterView::~MasterView() {
300 pthread_mutex_destroy(&views_mutex);
[1692]301 pthread_mutex_destroy(&read_wait_mutex);
302 pthread_cond_destroy(&write_wait_condv);
303 pthread_cond_destroy(&read_wait_condv);
[1686]304 pthread_key_delete(buffer_key);
305
306 // There should not be any BufferView left... Check ?
307
308 // decrement count for segments ?
309}
310
[1692]311void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { /* writer thread */
312 if (sn0<0) {
313 LOG(cout << "***MasterView::putData sn0<0" << endl)
314 sn0=sn;
315 }
[1686]316 // can fit in current segment ?
317 if (!(currentSegment != NULL &&
318 sn >= currentSegment->sn0 &&
319 sn < currentSegment->sn0 + currentSegment->bufferSize)) {
[1710]320 LOG(cout << name << " MasterView::putData, need extend for " << sn << endl)
[1686]321 nextSegment();
322 }
323 currentSegment->putData(sn, data, flags);
324}
325
[1692]326double TOISegmented::MasterView::getData(int sn) { /* reader thread */
327 return getView()->getData(sn); /* thread-specific */
[1686]328}
329
[1692]330uint_8 TOISegmented::MasterView::getFlag(int sn) { /* reader thread */
[1686]331 return getView()->getFlag(sn);
332}
333
334TOISegmented::BufferView* TOISegmented::MasterView::getView() { /* reader thread */
335 BufferView* bv = (BufferView*) pthread_getspecific(buffer_key);
336 if (bv == NULL) {
337 bv = createView();
[1692]338 LOG(cout << "creating new view " << hex << bv << dec << endl)
[1686]339 pthread_setspecific(buffer_key, bv);
340 }
341 return bv;
342}
[1689]343
[1692]344void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ /* views locked */
345 pthread_mutex_lock(&read_wait_mutex);
346 pthread_cond_broadcast(&read_wait_condv);
347 pthread_mutex_unlock(&read_wait_mutex);
348}
[1689]349
[1692]350void TOISegmented::MasterView::signalWrite() { /* reader thread */ /* views locked */
351 if (waitingOnWrite) {
[1710]352 LOG(cout << name << " MasterView : signal for wait on write" << endl)
[1692]353 pthread_cond_signal(&write_wait_condv); // only one thread can be sleeping
[1689]354 }
355}
356
357void TOISegmented::MasterView::putDone() {
358 nextSegment(); // cree un segment inutile, a nettoyer
359}
360
361void TOISegmented::MasterView::nextSegment() { /* writer thread */
362 // The current segment, if any, is now committed. A new
363 // blank buffer is allocated, if any.
[1692]364 pthread_mutex_lock(&views_mutex);
[1689]365
[1692]366 LOG(cout << "MasterView::nextSegment "
367 << segments.size()+1 << "/" << maxSegments << endl)
[1689]368
369 if (currentSegment != NULL) {
370 currentSegment->status = BufferSegment::COMMITTED;
371 segments.push_back(currentSegment);
372 }
373
374 currentSegment = NULL;
375 while (segments.size() >= maxSegments) {
376 waitForCleaning();
377 }
378
379 currentSegment = new BufferSegment(segmentSize);
[1700]380 currentSegment->incRefCount();
[1689]381 signalWaitingViews(); // they can ask to be updated !!
[1692]382 pthread_mutex_unlock(&views_mutex);
[1689]383}
384
[1692]385void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ /* views locked */
[1710]386 LOG(cout << name << " MasterView : write wait for clean for " << sn0 << endl)
[1689]387 waitingOnWrite = true;
388 checkDeadLock();
[1692]389 pthread_cond_wait(&write_wait_condv, &views_mutex);
[1710]390 LOG(cout << name << " MasterView : wait done" << endl)
[1689]391}
392
393TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */
394 BufferView* bv = new BufferView(this);
[1692]395 pthread_mutex_lock(&views_mutex);
[1690]396 allViews.insert(bv);
[1692]397 pthread_mutex_unlock(&views_mutex);
[1689]398 updateView(bv);
399 return bv;
400}
401
402void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */
403 pthread_mutex_lock(&views_mutex);
404
[1711]405// int oldBegin = bv->sn0;
406// int oldEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
407 int oldBegin = sn0;
408 int oldEnd = sn0 + bv->segmentSize * segments.size();
[1689]409
410 for (vector<BufferSegment*>::iterator i = bv->segments.begin();
411 i != bv->segments.end(); i++) {
412 (*i)->decRefCount();
413 }
414
415 bv->segments.clear();
416
[1690]417 // utiliser firstNeeded de toutes les vues pour faire le menage chez
[1689]418 // nous.
[1690]419
[1692]420 // A condition que tous les consumers se soient fait connaitre...
[1689]421
[1692]422 if (nConsumers == allViews.size()) {
423 int firstNeeded = MAXINT;
424 for (set<BufferView*>::iterator i = allViews.begin();
425 i != allViews.end(); i++) {
[1711]426 LOG(cout << name << " View firstneeded " << (*i)->firstNeeded << endl);
[1692]427 if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded;
428 }
429
[1710]430 LOG(cout << name << " MasterView : firstNeeded = " << firstNeeded << endl);
[1692]431
432 vector<BufferSegment*>::iterator j = segments.begin();
433 bool clean = false;
434 for (vector<BufferSegment*>::iterator i = segments.begin();
435 i != segments.end(); i++) {
[1700]436 //LOG(cout << "Updating : rc = " << (*i)->getRefCount() << " sn0 = " << (*i)->sn0 << endl;);
437 if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 1)) {
[1692]438 clean = true;
[1700]439 (*i)->decRefCount();
440 delete (*i);
[1692]441 j = i;
442 }
443 }
[1700]444 j++;
[1690]445 if (clean) {
446 segments.erase(segments.begin(),j);
447 sn0 = (*segments.begin())->sn0;
[1692]448 LOG(cout << "MasterView : purged until " << sn0 << endl);
[1690]449 }
[1692]450 } else {
451 LOG(cout << "MasterView : not yet all consumer thread known "<< allViews.size()
452 << "/" << nConsumers << endl);
453 }
[1690]454
455 for (vector<BufferSegment*>::iterator i = segments.begin();
[1689]456 i != segments.end(); i++) {
457 if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) {
458 (*i)->incRefCount();
459 bv->segments.push_back(*i);
460 }
461 }
462
463 bv->sn0 = -1;
464 int newEnd = -1;
465 if (segments.size() > 0) {
466 bv->sn0 = bv->segments[0]->sn0;
467 newEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
468 }
469
[1711]470 if (sn0 > oldBegin) { // nettoyage de fait, reveiller le writer thread si besoin
[1692]471 signalWrite();
[1689]472 }
473
[1710]474 LOG(cout << name << " sync for " << hex << bv << dec << " : "
[1692]475 << oldBegin << " - " << oldEnd << " --> "
[1711]476 << sn0 << " - " << newEnd << endl);
[1692]477
478 if (newEnd > oldEnd) { // Nouveautes, reveiller les reader threads si besoin
[1689]479 signalWaitingViews();
480 }
[1692]481 pthread_mutex_unlock(&views_mutex);
[1689]482}
483
[1692]484void TOISegmented::MasterView::checkDeadLock() { /* views locked */
[1689]485 // There is a possible deadlock if no view can free old segments
486 // and we are waiting for write.
487
488 // we need to record "wont need before" for each view, and
489 // signal deadlock if any view that needs first segment data is sleeping
490 // while we are asleep
491
[1692]492 pthread_mutex_lock(&read_wait_mutex);
493 if (!waitingOnWrite) {
494 pthread_mutex_unlock(&read_wait_mutex);
495 return; // no problem, there is an active writer
496 }
[1689]497
498 // Is any sleeping view needing our first segment ?
499
[1692]500 for (set<BufferView*>::iterator i=allViews.begin();
501 i != allViews.end(); i++) {
502 if ((*i)->waiting && (*i)->firstNeeded < sn0+segmentSize) {
[1689]503 cout << "**** DEADLOCK detected ****" << endl;
504 cout << "We are waiting on write (buffer is full)"<< endl;
505 cout << "but a waiting reader still needs our first segment" << endl;
506 cout << "restart with bigger buffers" << endl;
507 abort();
508 }
509 }
[1692]510 pthread_mutex_unlock(&read_wait_mutex);
[1689]511}
512
513
514
515void TOISegmented::MasterView::BufferDestroy(void* p) {
516 BufferView* bv = (BufferView*) p;
517 delete bv;
518}
Note: See TracBrowser for help on using the repository browser.