source: Sophya/trunk/ArchTOIPipe/Kernel/toisegment.cc@ 1692

Last change on this file since 1692 was 1692, checked in by aubourg, 24 years ago

thread debugging

File size: 13.0 KB
RevLine 
[1670]1#include "toisegment.h"
[1671]2
[1690]3#ifndef MAXINT
4#define MAXINT 2147483647
5#endif
6
[1692]7static pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
8static void cout_lock() {pthread_mutex_lock(&cout_mutex);}
9static void cout_unlock() {pthread_mutex_unlock(&cout_mutex);}
10#define LOG(_xxx_) \
11cout_lock(); \
12_xxx_; \
13cout_unlock();
14
15
[1689]16/******************************/
17/******* TOISegmented *********/
18/******************************/
19
20TOISegmented::TOISegmented(int bufsz, int maxseg) {
21 master = new MasterView(bufsz, maxseg);
22 setName("TOISegmented");
23}
24
25TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) {
26 master = new MasterView(bufsz, maxseg);
27 setName(nm);
28}
29
30TOISegmented::~TOISegmented() {
31 delete master;
32}
33
34
[1692]35void TOISegmented::addConsumer(TOIProcessor* p) {
36 TOI::addConsumer(p);
37 master->nConsumers = consumers.size();
38}
39
40
[1689]41double TOISegmented::getData(int i) { /* reader thread */
42 return master->getData(i);
43}
44
45void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */
46 data = master->getData(i);
47 flag = master->getFlag(i);
48}
49
50void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */
51 master->putData(i, value, flag);
52}
53
54void TOISegmented::putDone() {
55 master->putDone();
56}
57
58void TOISegmented::wontNeedBefore(int i) { /* reader thread */
59 master->getView()->wontNeedBefore(i);
60}
61
62TOI::DataStatus TOISegmented::isDataAvail(int i, int j) {
63 // return master->getView()->isDataAvail(i, j);
64 cout << "TOISegmented::isDataAvail unimplemented" << endl;
65 throw PError("TOISegmented::isDataAvail unimplemented");
66}
67
68TOI::DataStatus TOISegmented::isDataAvail(int i) {
69 return isDataAvail(i,i);
70}
71
72TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) {
73 return isDataAvail(i,j);
74}
75
76void TOISegmented::waitForData(int iStart, int iEnd) {
77 // get will wait...
78}
79
80void TOISegmented::waitForData(int i) {
81 // get will wait...
82}
83
84void TOISegmented::waitForAnyData() {
85 cout << "TOISegmented::waitForAnyData unimplemented" << endl;
86 throw PError("TOISegmented::waitForAnyData unimplemented");
87}
88
89int TOISegmented::nextDataAvail(int iAfter) {
90 cout << "TOISegmented::nextDataAvail" << endl;
91 return iAfter+1;
92}
93
94bool TOISegmented::hasSomeData() {
95 cout << "TOISegmented::hasSomeData" << endl;
96 return true;
97}
98
[1692]99void TOISegmented::doPutData(int i, double value, uint_8 flag) {
[1689]100 cout << "TOISegmented::doPutData unimplemented" << endl;
101 throw PError("TOISegmented::doPutData unimplemented");
102}
103
104void TOISegmented::doGetData(int i, double& value, uint_8& flag) {
105 cout << "TOISegmented::doGetData unimplemented" << endl;
106 throw PError("TOISegmented::doGetData unimplemented");
107}
108
109
[1686]110/*******************************/
111/******* BufferSegment *********/
112/*******************************/
[1671]113
114TOISegmented::BufferSegment::BufferSegment(int sz) {
115 status = NEW;
116 bufferSize = sz;
117 sn0 = -1;
118
119 refcount = 0;
120
121 data = new double[sz];
122 flags = new uint_8[sz];
123
124 pthread_mutex_init(&refcount_mutex, NULL);
125}
126
127TOISegmented::BufferSegment::~BufferSegment() {
128 if (refcount > 0) {
129 throw(ForbiddenError("TOISegment : delete Buffer with refcount>0"));
130 }
131 delete[] data;
132 delete[] flags;
133 pthread_mutex_destroy(&refcount_mutex);
134}
135
[1689]136void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) {
137 /* writer thread*/
[1671]138 if (status == NEW) {
139 status = WRITE;
140 sn0 = sn;
141 }
142 if (status == COMMITTED) {
143 throw(ForbiddenError("TOISegment : putData in committed buffer"));
144 }
145 checkInRange(sn);
146 data[sn-sn0] = d;
147 flags[sn-sn0] = f;
148}
149
150void TOISegmented::BufferSegment::incRefCount() {
151 pthread_mutex_lock(&refcount_mutex);
152 refcount++;
153 pthread_mutex_unlock(&refcount_mutex);
154}
155
156void TOISegmented::BufferSegment::decRefCount() {
157 pthread_mutex_lock(&refcount_mutex);
158 int nrc = --refcount;
159 pthread_mutex_unlock(&refcount_mutex);
160 if (nrc<0)
161 throw(ForbiddenError("TOISegment : buffer refcount < 0"));
162}
163
164int TOISegmented::BufferSegment::getRefCount() {
165 pthread_mutex_lock(&refcount_mutex);
166 int rc = refcount;
167 pthread_mutex_unlock(&refcount_mutex);
168 return rc;
169}
170
[1686]171
172/*******************************/
173/********** BufferView *********/
174/*******************************/
175
176TOISegmented::BufferView::BufferView(MasterView* m) {
177 master = m;
178 sn0 = -1;
179 segmentSize = m->segmentSize;
[1689]180 firstNeeded = -1;
[1692]181 waiting = false;
[1686]182}
183
184TOISegmented::BufferView::~BufferView() {
185}
186
[1692]187double TOISegmented::BufferView::getData(int sn) { /* Single-thread, reader thread*/
[1686]188 ensure(sn);
189 int seg = (sn-sn0)/segmentSize;
190 return segments[seg]->getData(sn);
191}
192
[1692]193uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread, reader thread */
[1686]194 ensure(sn);
195 int seg = (sn-sn0)/segmentSize;
196 return segments[seg]->getFlag(sn);
197}
198
[1692]199void TOISegmented::BufferView::ensure(int sn) { /* Single-thread, reader thread */
[1686]200 if (sn < sn0) {
[1692]201 LOG(cout << "TOISegmented::BufferView::ensure requested sample before first" << endl);
202 LOG(cout << "sn " << sn << " sn0 " << sn0 << endl);
203 abort();
[1686]204 }
205
[1692]206 if (sn0 < 0 ||
207 sn >= sn0 + segmentSize*segments.size()) {
208 LOG(cout << "BufferView " << hex << this << dec << ": read fault for " << sn << endl)
[1686]209 sync();
[1692]210 while (sn0<0 || sn >= sn0 + segmentSize*segments.size()) {
[1686]211 wait();
[1692]212 LOG(cout << "BufferView " << hex << this << dec << ": waiting for " << sn << endl)
[1686]213 sync();
214 }
[1692]215 LOG(cout << "BufferView " << hex << this << dec << ": resuming for " << sn
216 << " now data for " << sn0 << " - " << sn0 + segmentSize*segments.size()
217 << " in " << segments.size() << " segments " << endl)
[1686]218 }
219}
220
[1692]221void TOISegmented::BufferView::sync() { /* Single-thread, reader thread */
[1686]222 master->updateView(this); // update me !
223}
224
[1692]225void TOISegmented::BufferView::wait() { /* reader thread */
226 pthread_mutex_lock(&(master->read_wait_mutex));
227 waiting = true;
228 pthread_cond_wait(&(master->read_wait_condv), &(master->read_wait_mutex));
229 waiting = false;
230 pthread_mutex_unlock(&(master->read_wait_mutex));
[1686]231}
232
233
[1689]234void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */
235 if (sn > firstNeeded) {
236 firstNeeded = sn;
237 }
238}
[1686]239
[1689]240
[1686]241/*******************************/
242/********** MasterView *********/
243/*******************************/
244
245TOISegmented::MasterView::MasterView(int bufsz, int maxseg) {
246 currentSegment = NULL;
247 maxSegments = maxseg;
248 segmentSize = bufsz;
249 sn0 = -1;
[1692]250 nConsumers = 0;
[1686]251
252 pthread_mutex_init(&views_mutex, NULL);
[1692]253 pthread_mutex_init(&read_wait_mutex, NULL);
254 pthread_cond_init(&write_wait_condv, NULL);
255 pthread_cond_init(&read_wait_condv, NULL);
[1686]256 pthread_key_create(&buffer_key, BufferDestroy);
257
[1689]258 waitingOnWrite = false;
[1686]259}
260
261TOISegmented::MasterView::~MasterView() {
262 pthread_mutex_destroy(&views_mutex);
[1692]263 pthread_mutex_destroy(&read_wait_mutex);
264 pthread_cond_destroy(&write_wait_condv);
265 pthread_cond_destroy(&read_wait_condv);
[1686]266 pthread_key_delete(buffer_key);
267
268 // There should not be any BufferView left... Check ?
269
270 // decrement count for segments ?
271}
272
[1692]273void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { /* writer thread */
274 if (sn0<0) {
275 LOG(cout << "***MasterView::putData sn0<0" << endl)
276 sn0=sn;
277 }
[1686]278 // can fit in current segment ?
279 if (!(currentSegment != NULL &&
280 sn >= currentSegment->sn0 &&
281 sn < currentSegment->sn0 + currentSegment->bufferSize)) {
[1692]282 LOG(cout << "MasterView::putData, need extend for " << sn << endl)
[1686]283 nextSegment();
284 }
285 currentSegment->putData(sn, data, flags);
286}
287
[1692]288double TOISegmented::MasterView::getData(int sn) { /* reader thread */
289 return getView()->getData(sn); /* thread-specific */
[1686]290}
291
[1692]292uint_8 TOISegmented::MasterView::getFlag(int sn) { /* reader thread */
[1686]293 return getView()->getFlag(sn);
294}
295
296TOISegmented::BufferView* TOISegmented::MasterView::getView() { /* reader thread */
297 BufferView* bv = (BufferView*) pthread_getspecific(buffer_key);
298 if (bv == NULL) {
299 bv = createView();
[1692]300 LOG(cout << "creating new view " << hex << bv << dec << endl)
[1686]301 pthread_setspecific(buffer_key, bv);
302 }
303 return bv;
304}
[1689]305
[1692]306void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ /* views locked */
307 pthread_mutex_lock(&read_wait_mutex);
308 pthread_cond_broadcast(&read_wait_condv);
309 pthread_mutex_unlock(&read_wait_mutex);
310}
[1689]311
[1692]312void TOISegmented::MasterView::signalWrite() { /* reader thread */ /* views locked */
313 if (waitingOnWrite) {
314 LOG(cout << "MasterView : signal for wait on write" << endl)
315 pthread_cond_signal(&write_wait_condv); // only one thread can be sleeping
[1689]316 }
317}
318
319void TOISegmented::MasterView::putDone() {
320 nextSegment(); // cree un segment inutile, a nettoyer
321}
322
323void TOISegmented::MasterView::nextSegment() { /* writer thread */
324 // The current segment, if any, is now committed. A new
325 // blank buffer is allocated, if any.
[1692]326 pthread_mutex_lock(&views_mutex);
[1689]327
[1692]328 LOG(cout << "MasterView::nextSegment "
329 << segments.size()+1 << "/" << maxSegments << endl)
[1689]330
331 if (currentSegment != NULL) {
332 currentSegment->status = BufferSegment::COMMITTED;
333 segments.push_back(currentSegment);
334 }
335
336 currentSegment = NULL;
337 while (segments.size() >= maxSegments) {
338 waitForCleaning();
339 }
340
341 currentSegment = new BufferSegment(segmentSize);
342 signalWaitingViews(); // they can ask to be updated !!
[1692]343 pthread_mutex_unlock(&views_mutex);
[1689]344}
345
[1692]346void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ /* views locked */
347 LOG(cout << "MasterView : write wait for clean for " << sn0 << endl)
[1689]348 waitingOnWrite = true;
349 checkDeadLock();
[1692]350 pthread_cond_wait(&write_wait_condv, &views_mutex);
351 LOG(cout << "MasterView : wait done" << endl)
[1689]352}
353
354TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */
355 BufferView* bv = new BufferView(this);
[1692]356 pthread_mutex_lock(&views_mutex);
[1690]357 allViews.insert(bv);
[1692]358 pthread_mutex_unlock(&views_mutex);
[1689]359 updateView(bv);
360 return bv;
361}
362
363void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */
364 pthread_mutex_lock(&views_mutex);
365
366 int oldBegin = bv->sn0;
367 int oldEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
368
369 for (vector<BufferSegment*>::iterator i = bv->segments.begin();
370 i != bv->segments.end(); i++) {
371 (*i)->decRefCount();
372 }
373
374 bv->segments.clear();
375
[1690]376 // utiliser firstNeeded de toutes les vues pour faire le menage chez
[1689]377 // nous.
[1690]378
[1692]379 // A condition que tous les consumers se soient fait connaitre...
[1689]380
[1692]381 if (nConsumers == allViews.size()) {
382 int firstNeeded = MAXINT;
383 for (set<BufferView*>::iterator i = allViews.begin();
384 i != allViews.end(); i++) {
385 if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded;
386 }
387
388 LOG(cout << "MasterView : firstNeeded = " << firstNeeded << endl);
389
390 vector<BufferSegment*>::iterator j = segments.begin();
391 bool clean = false;
392 for (vector<BufferSegment*>::iterator i = segments.begin();
393 i != segments.end(); i++) {
394 if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 0)) {
395 clean = true;
396 j = i;
397 }
398 }
[1690]399 if (clean) {
400 segments.erase(segments.begin(),j);
401 sn0 = (*segments.begin())->sn0;
[1692]402 LOG(cout << "MasterView : purged until " << sn0 << endl);
[1690]403 }
[1692]404 } else {
405 LOG(cout << "MasterView : not yet all consumer thread known "<< allViews.size()
406 << "/" << nConsumers << endl);
407 }
[1690]408
409 for (vector<BufferSegment*>::iterator i = segments.begin();
[1689]410 i != segments.end(); i++) {
411 if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) {
412 (*i)->incRefCount();
413 bv->segments.push_back(*i);
414 }
415 }
416
417 bv->sn0 = -1;
418 int newEnd = -1;
419 if (segments.size() > 0) {
420 bv->sn0 = bv->segments[0]->sn0;
421 newEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
422 }
423
[1692]424 if (bv->sn0 > oldBegin) { // nettoyage de fait, reveiller le writer thread si besoin
425 signalWrite();
[1689]426 }
427
[1692]428 LOG(cout << "sync for " << hex << bv << dec << " : "
429 << oldBegin << " - " << oldEnd << " --> "
430 << bv->sn0 << " - " << newEnd << endl);
431
432 if (newEnd > oldEnd) { // Nouveautes, reveiller les reader threads si besoin
[1689]433 signalWaitingViews();
434 }
[1692]435 pthread_mutex_unlock(&views_mutex);
[1689]436}
437
[1692]438void TOISegmented::MasterView::checkDeadLock() { /* views locked */
[1689]439 // There is a possible deadlock if no view can free old segments
440 // and we are waiting for write.
441
442 // we need to record "wont need before" for each view, and
443 // signal deadlock if any view that needs first segment data is sleeping
444 // while we are asleep
445
[1692]446 pthread_mutex_lock(&read_wait_mutex);
447 if (!waitingOnWrite) {
448 pthread_mutex_unlock(&read_wait_mutex);
449 return; // no problem, there is an active writer
450 }
[1689]451
452 // Is any sleeping view needing our first segment ?
453
[1692]454 for (set<BufferView*>::iterator i=allViews.begin();
455 i != allViews.end(); i++) {
456 if ((*i)->waiting && (*i)->firstNeeded < sn0+segmentSize) {
[1689]457 cout << "**** DEADLOCK detected ****" << endl;
458 cout << "We are waiting on write (buffer is full)"<< endl;
459 cout << "but a waiting reader still needs our first segment" << endl;
460 cout << "restart with bigger buffers" << endl;
461 abort();
462 }
463 }
[1692]464 pthread_mutex_unlock(&read_wait_mutex);
[1689]465}
466
467
468
469void TOISegmented::MasterView::BufferDestroy(void* p) {
470 BufferView* bv = (BufferView*) p;
471 delete bv;
472}
Note: See TracBrowser for help on using the repository browser.