source: Sophya/trunk/ArchTOIPipe/Kernel/toisegment.cc@ 1738

Last change on this file since 1738 was 1738, checked in by aubourg, 24 years ago

copyright

File size: 14.8 KB
RevLine 
[1738]1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
5// $Id: toisegment.cc,v 1.14 2001-11-08 15:47:46 aubourg Exp $
6
[1670]7#include "toisegment.h"
[1671]8
[1697]9#include <iostream.h>
10
[1690]11#ifndef MAXINT
12#define MAXINT 2147483647
13#endif
14
[1692]15static pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
16static void cout_lock() {pthread_mutex_lock(&cout_mutex);}
17static void cout_unlock() {pthread_mutex_unlock(&cout_mutex);}
[1711]18#define LOG(_xxx_)
19/*
[1692]20#define LOG(_xxx_) \
21cout_lock(); \
22_xxx_; \
23cout_unlock();
[1711]24*/
[1692]25
[1689]26/******************************/
27/******* TOISegmented *********/
28/******************************/
29
30TOISegmented::TOISegmented(int bufsz, int maxseg) {
[1710]31 master = new MasterView(bufsz, maxseg, "");
[1689]32 setName("TOISegmented");
33}
34
35TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) {
[1710]36 master = new MasterView(bufsz, maxseg, nm);
[1689]37 setName(nm);
38}
39
[1699]40TOISegmented::TOISegmented(char* cnm, int bufsz, int maxseg) {
41 string nm = cnm;
[1710]42 master = new MasterView(bufsz, maxseg, nm);
[1699]43 setName(nm);
44}
45
[1689]46TOISegmented::~TOISegmented() {
47 delete master;
48}
49
50
[1692]51void TOISegmented::addConsumer(TOIProcessor* p) {
52 TOI::addConsumer(p);
53 master->nConsumers = consumers.size();
54}
55
56
[1689]57double TOISegmented::getData(int i) { /* reader thread */
58 return master->getData(i);
59}
60
61void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */
62 data = master->getData(i);
63 flag = master->getFlag(i);
64}
65
66void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */
67 master->putData(i, value, flag);
68}
69
70void TOISegmented::putDone() {
71 master->putDone();
72}
73
74void TOISegmented::wontNeedBefore(int i) { /* reader thread */
75 master->getView()->wontNeedBefore(i);
76}
77
78TOI::DataStatus TOISegmented::isDataAvail(int i, int j) {
79 // return master->getView()->isDataAvail(i, j);
80 cout << "TOISegmented::isDataAvail unimplemented" << endl;
81 throw PError("TOISegmented::isDataAvail unimplemented");
82}
83
84TOI::DataStatus TOISegmented::isDataAvail(int i) {
85 return isDataAvail(i,i);
86}
87
88TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) {
89 return isDataAvail(i,j);
90}
91
92void TOISegmented::waitForData(int iStart, int iEnd) {
93 // get will wait...
94}
95
96void TOISegmented::waitForData(int i) {
97 // get will wait...
98}
99
100void TOISegmented::waitForAnyData() {
101 cout << "TOISegmented::waitForAnyData unimplemented" << endl;
102 throw PError("TOISegmented::waitForAnyData unimplemented");
103}
104
105int TOISegmented::nextDataAvail(int iAfter) {
106 cout << "TOISegmented::nextDataAvail" << endl;
107 return iAfter+1;
108}
109
110bool TOISegmented::hasSomeData() {
111 cout << "TOISegmented::hasSomeData" << endl;
112 return true;
113}
114
[1692]115void TOISegmented::doPutData(int i, double value, uint_8 flag) {
[1689]116 cout << "TOISegmented::doPutData unimplemented" << endl;
117 throw PError("TOISegmented::doPutData unimplemented");
118}
119
120void TOISegmented::doGetData(int i, double& value, uint_8& flag) {
121 cout << "TOISegmented::doGetData unimplemented" << endl;
122 throw PError("TOISegmented::doGetData unimplemented");
123}
124
125
[1686]126/*******************************/
127/******* BufferSegment *********/
128/*******************************/
[1671]129
130TOISegmented::BufferSegment::BufferSegment(int sz) {
131 status = NEW;
132 bufferSize = sz;
133 sn0 = -1;
134
135 refcount = 0;
136
137 data = new double[sz];
138 flags = new uint_8[sz];
139
140 pthread_mutex_init(&refcount_mutex, NULL);
141}
142
143TOISegmented::BufferSegment::~BufferSegment() {
144 if (refcount > 0) {
145 throw(ForbiddenError("TOISegment : delete Buffer with refcount>0"));
146 }
[1700]147 LOG(cout << "Destroying buffersegment sn0 "<< sn0 << endl);
[1671]148 delete[] data;
149 delete[] flags;
150 pthread_mutex_destroy(&refcount_mutex);
151}
152
[1689]153void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) {
154 /* writer thread*/
[1671]155 if (status == NEW) {
156 status = WRITE;
157 sn0 = sn;
158 }
159 if (status == COMMITTED) {
160 throw(ForbiddenError("TOISegment : putData in committed buffer"));
161 }
162 checkInRange(sn);
163 data[sn-sn0] = d;
164 flags[sn-sn0] = f;
165}
166
167void TOISegmented::BufferSegment::incRefCount() {
168 pthread_mutex_lock(&refcount_mutex);
169 refcount++;
170 pthread_mutex_unlock(&refcount_mutex);
171}
172
173void TOISegmented::BufferSegment::decRefCount() {
174 pthread_mutex_lock(&refcount_mutex);
175 int nrc = --refcount;
176 pthread_mutex_unlock(&refcount_mutex);
177 if (nrc<0)
178 throw(ForbiddenError("TOISegment : buffer refcount < 0"));
179}
180
181int TOISegmented::BufferSegment::getRefCount() {
182 pthread_mutex_lock(&refcount_mutex);
183 int rc = refcount;
184 pthread_mutex_unlock(&refcount_mutex);
185 return rc;
186}
187
[1686]188
189/*******************************/
190/********** BufferView *********/
191/*******************************/
192
193TOISegmented::BufferView::BufferView(MasterView* m) {
194 master = m;
195 sn0 = -1;
196 segmentSize = m->segmentSize;
[1689]197 firstNeeded = -1;
[1692]198 waiting = false;
[1686]199}
200
201TOISegmented::BufferView::~BufferView() {
202}
203
[1692]204double TOISegmented::BufferView::getData(int sn) { /* Single-thread, reader thread*/
[1686]205 ensure(sn);
206 int seg = (sn-sn0)/segmentSize;
207 return segments[seg]->getData(sn);
208}
209
[1692]210uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread, reader thread */
[1686]211 ensure(sn);
212 int seg = (sn-sn0)/segmentSize;
213 return segments[seg]->getFlag(sn);
214}
215
[1692]216void TOISegmented::BufferView::ensure(int sn) { /* Single-thread, reader thread */
[1686]217 if (sn < sn0) {
[1692]218 LOG(cout << "TOISegmented::BufferView::ensure requested sample before first" << endl);
219 LOG(cout << "sn " << sn << " sn0 " << sn0 << endl);
220 abort();
[1686]221 }
222
[1692]223 if (sn0 < 0 ||
224 sn >= sn0 + segmentSize*segments.size()) {
[1710]225 LOG(cout << master->name << " BufferView "
226 << hex << this << dec << ": read fault for " << sn << endl)
[1686]227 sync();
[1709]228 pthread_mutex_lock(&(master->read_wait_mutex));
[1692]229 while (sn0<0 || sn >= sn0 + segmentSize*segments.size()) {
[1711]230 wait(); // must be atomic with loop test // $CHECK$ est-ce vrai ?
[1709]231 pthread_mutex_unlock(&(master->read_wait_mutex));
[1710]232 LOG(cout << master->name << " BufferView " << hex << this << dec << ": waiting for " << sn << endl)
[1686]233 sync();
[1709]234 pthread_mutex_lock(&(master->read_wait_mutex));
[1686]235 }
[1709]236 pthread_mutex_unlock(&(master->read_wait_mutex));
237
[1710]238 LOG(cout << master->name << " BufferView " << hex << this << dec << ": resuming for " << sn
[1692]239 << " now data for " << sn0 << " - " << sn0 + segmentSize*segments.size()
240 << " in " << segments.size() << " segments " << endl)
[1686]241 }
242}
243
[1692]244void TOISegmented::BufferView::sync() { /* Single-thread, reader thread */
[1686]245 master->updateView(this); // update me !
246}
247
[1709]248void TOISegmented::BufferView::wait() { /* reader thread, master read wait lock taken */
249 //pthread_mutex_lock(&(master->read_wait_mutex));
[1692]250 waiting = true;
[1711]251 master->waitingViews++;
[1692]252 pthread_cond_wait(&(master->read_wait_condv), &(master->read_wait_mutex));
253 waiting = false;
[1711]254 master->waitingViews--;
[1709]255 //pthread_mutex_unlock(&(master->read_wait_mutex));
[1686]256}
257
258
[1689]259void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */
260 if (sn > firstNeeded) {
261 firstNeeded = sn;
[1711]262 // C'est peut-etre le moment de faire unl sync, si on est coince par ailleurs...
263 //pthread_mutex_lock(&(master->read_wait_mutex));
264 if (sn >= sn0 + segmentSize){ // && master->waitingViews>0) {
265// LOG(cout<<master->name<< " sync on wontneed, waitingViews=" << master->waitingViews << endl);
266 LOG(cout<<master->name<< " sync on wontneed, sn = " << sn << " sn0 = " << sn0 << endl);
267 sync();
268 }
269 //pthread_mutex_unlock(&(master->read_wait_mutex));
[1689]270 }
271}
[1686]272
[1689]273
[1686]274/*******************************/
275/********** MasterView *********/
276/*******************************/
277
[1710]278TOISegmented::MasterView::MasterView(int bufsz, int maxseg, string nm) {
[1686]279 currentSegment = NULL;
280 maxSegments = maxseg;
281 segmentSize = bufsz;
282 sn0 = -1;
[1692]283 nConsumers = 0;
[1710]284 name = nm;
[1686]285
286 pthread_mutex_init(&views_mutex, NULL);
[1692]287 pthread_mutex_init(&read_wait_mutex, NULL);
288 pthread_cond_init(&write_wait_condv, NULL);
289 pthread_cond_init(&read_wait_condv, NULL);
[1686]290 pthread_key_create(&buffer_key, BufferDestroy);
291
[1689]292 waitingOnWrite = false;
[1711]293 waitingViews = 0;
[1686]294}
295
296TOISegmented::MasterView::~MasterView() {
297 pthread_mutex_destroy(&views_mutex);
[1692]298 pthread_mutex_destroy(&read_wait_mutex);
299 pthread_cond_destroy(&write_wait_condv);
300 pthread_cond_destroy(&read_wait_condv);
[1686]301 pthread_key_delete(buffer_key);
302
303 // There should not be any BufferView left... Check ?
304
305 // decrement count for segments ?
306}
307
[1692]308void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { /* writer thread */
309 if (sn0<0) {
310 LOG(cout << "***MasterView::putData sn0<0" << endl)
311 sn0=sn;
312 }
[1686]313 // can fit in current segment ?
314 if (!(currentSegment != NULL &&
315 sn >= currentSegment->sn0 &&
316 sn < currentSegment->sn0 + currentSegment->bufferSize)) {
[1710]317 LOG(cout << name << " MasterView::putData, need extend for " << sn << endl)
[1686]318 nextSegment();
319 }
320 currentSegment->putData(sn, data, flags);
321}
322
[1692]323double TOISegmented::MasterView::getData(int sn) { /* reader thread */
324 return getView()->getData(sn); /* thread-specific */
[1686]325}
326
[1692]327uint_8 TOISegmented::MasterView::getFlag(int sn) { /* reader thread */
[1686]328 return getView()->getFlag(sn);
329}
330
331TOISegmented::BufferView* TOISegmented::MasterView::getView() { /* reader thread */
332 BufferView* bv = (BufferView*) pthread_getspecific(buffer_key);
333 if (bv == NULL) {
334 bv = createView();
[1692]335 LOG(cout << "creating new view " << hex << bv << dec << endl)
[1686]336 pthread_setspecific(buffer_key, bv);
337 }
338 return bv;
339}
[1689]340
[1692]341void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ /* views locked */
342 pthread_mutex_lock(&read_wait_mutex);
343 pthread_cond_broadcast(&read_wait_condv);
344 pthread_mutex_unlock(&read_wait_mutex);
345}
[1689]346
[1692]347void TOISegmented::MasterView::signalWrite() { /* reader thread */ /* views locked */
348 if (waitingOnWrite) {
[1710]349 LOG(cout << name << " MasterView : signal for wait on write" << endl)
[1692]350 pthread_cond_signal(&write_wait_condv); // only one thread can be sleeping
[1689]351 }
352}
353
354void TOISegmented::MasterView::putDone() {
355 nextSegment(); // cree un segment inutile, a nettoyer
356}
357
358void TOISegmented::MasterView::nextSegment() { /* writer thread */
359 // The current segment, if any, is now committed. A new
360 // blank buffer is allocated, if any.
[1692]361 pthread_mutex_lock(&views_mutex);
[1689]362
[1692]363 LOG(cout << "MasterView::nextSegment "
364 << segments.size()+1 << "/" << maxSegments << endl)
[1689]365
366 if (currentSegment != NULL) {
367 currentSegment->status = BufferSegment::COMMITTED;
368 segments.push_back(currentSegment);
369 }
370
371 currentSegment = NULL;
372 while (segments.size() >= maxSegments) {
373 waitForCleaning();
374 }
375
376 currentSegment = new BufferSegment(segmentSize);
[1700]377 currentSegment->incRefCount();
[1689]378 signalWaitingViews(); // they can ask to be updated !!
[1692]379 pthread_mutex_unlock(&views_mutex);
[1689]380}
381
[1692]382void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ /* views locked */
[1710]383 LOG(cout << name << " MasterView : write wait for clean for " << sn0 << endl)
[1689]384 waitingOnWrite = true;
385 checkDeadLock();
[1692]386 pthread_cond_wait(&write_wait_condv, &views_mutex);
[1710]387 LOG(cout << name << " MasterView : wait done" << endl)
[1689]388}
389
390TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */
391 BufferView* bv = new BufferView(this);
[1692]392 pthread_mutex_lock(&views_mutex);
[1690]393 allViews.insert(bv);
[1692]394 pthread_mutex_unlock(&views_mutex);
[1689]395 updateView(bv);
396 return bv;
397}
398
399void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */
400 pthread_mutex_lock(&views_mutex);
401
[1711]402// int oldBegin = bv->sn0;
403// int oldEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
404 int oldBegin = sn0;
405 int oldEnd = sn0 + bv->segmentSize * segments.size();
[1689]406
407 for (vector<BufferSegment*>::iterator i = bv->segments.begin();
408 i != bv->segments.end(); i++) {
409 (*i)->decRefCount();
410 }
411
412 bv->segments.clear();
413
[1690]414 // utiliser firstNeeded de toutes les vues pour faire le menage chez
[1689]415 // nous.
[1690]416
[1692]417 // A condition que tous les consumers se soient fait connaitre...
[1689]418
[1692]419 if (nConsumers == allViews.size()) {
420 int firstNeeded = MAXINT;
421 for (set<BufferView*>::iterator i = allViews.begin();
422 i != allViews.end(); i++) {
[1711]423 LOG(cout << name << " View firstneeded " << (*i)->firstNeeded << endl);
[1692]424 if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded;
425 }
426
[1710]427 LOG(cout << name << " MasterView : firstNeeded = " << firstNeeded << endl);
[1692]428
429 vector<BufferSegment*>::iterator j = segments.begin();
430 bool clean = false;
431 for (vector<BufferSegment*>::iterator i = segments.begin();
432 i != segments.end(); i++) {
[1700]433 //LOG(cout << "Updating : rc = " << (*i)->getRefCount() << " sn0 = " << (*i)->sn0 << endl;);
434 if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 1)) {
[1692]435 clean = true;
[1700]436 (*i)->decRefCount();
437 delete (*i);
[1692]438 j = i;
439 }
440 }
[1700]441 j++;
[1690]442 if (clean) {
443 segments.erase(segments.begin(),j);
444 sn0 = (*segments.begin())->sn0;
[1692]445 LOG(cout << "MasterView : purged until " << sn0 << endl);
[1690]446 }
[1692]447 } else {
448 LOG(cout << "MasterView : not yet all consumer thread known "<< allViews.size()
449 << "/" << nConsumers << endl);
450 }
[1690]451
452 for (vector<BufferSegment*>::iterator i = segments.begin();
[1689]453 i != segments.end(); i++) {
454 if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) {
455 (*i)->incRefCount();
456 bv->segments.push_back(*i);
457 }
458 }
459
460 bv->sn0 = -1;
461 int newEnd = -1;
462 if (segments.size() > 0) {
463 bv->sn0 = bv->segments[0]->sn0;
464 newEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
465 }
466
[1711]467 if (sn0 > oldBegin) { // nettoyage de fait, reveiller le writer thread si besoin
[1692]468 signalWrite();
[1689]469 }
470
[1710]471 LOG(cout << name << " sync for " << hex << bv << dec << " : "
[1692]472 << oldBegin << " - " << oldEnd << " --> "
[1711]473 << sn0 << " - " << newEnd << endl);
[1692]474
475 if (newEnd > oldEnd) { // Nouveautes, reveiller les reader threads si besoin
[1689]476 signalWaitingViews();
477 }
[1692]478 pthread_mutex_unlock(&views_mutex);
[1689]479}
480
[1692]481void TOISegmented::MasterView::checkDeadLock() { /* views locked */
[1689]482 // There is a possible deadlock if no view can free old segments
483 // and we are waiting for write.
484
485 // we need to record "wont need before" for each view, and
486 // signal deadlock if any view that needs first segment data is sleeping
487 // while we are asleep
488
[1692]489 pthread_mutex_lock(&read_wait_mutex);
490 if (!waitingOnWrite) {
491 pthread_mutex_unlock(&read_wait_mutex);
492 return; // no problem, there is an active writer
493 }
[1689]494
495 // Is any sleeping view needing our first segment ?
496
[1692]497 for (set<BufferView*>::iterator i=allViews.begin();
498 i != allViews.end(); i++) {
499 if ((*i)->waiting && (*i)->firstNeeded < sn0+segmentSize) {
[1689]500 cout << "**** DEADLOCK detected ****" << endl;
501 cout << "We are waiting on write (buffer is full)"<< endl;
502 cout << "but a waiting reader still needs our first segment" << endl;
503 cout << "restart with bigger buffers" << endl;
504 abort();
505 }
506 }
[1692]507 pthread_mutex_unlock(&read_wait_mutex);
[1689]508}
509
510
511
512void TOISegmented::MasterView::BufferDestroy(void* p) {
513 BufferView* bv = (BufferView*) p;
514 delete bv;
515}
Note: See TracBrowser for help on using the repository browser.