source: Sophya/trunk/ArchTOIPipe/Kernel/toisegment.cc@ 1711

Last change on this file since 1711 was 1711, checked in by aubourg, 24 years ago

bugs de synchro corriges

File size: 14.5 KB
Line 
1#include "toisegment.h"
2
3#include <iostream.h>
4
5#ifndef MAXINT
6#define MAXINT 2147483647
7#endif
8
9static pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
10static void cout_lock() {pthread_mutex_lock(&cout_mutex);}
11static void cout_unlock() {pthread_mutex_unlock(&cout_mutex);}
12#define LOG(_xxx_)
13/*
14#define LOG(_xxx_) \
15cout_lock(); \
16_xxx_; \
17cout_unlock();
18*/
19
20/******************************/
21/******* TOISegmented *********/
22/******************************/
23
24TOISegmented::TOISegmented(int bufsz, int maxseg) {
25 master = new MasterView(bufsz, maxseg, "");
26 setName("TOISegmented");
27}
28
29TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) {
30 master = new MasterView(bufsz, maxseg, nm);
31 setName(nm);
32}
33
34TOISegmented::TOISegmented(char* cnm, int bufsz, int maxseg) {
35 string nm = cnm;
36 master = new MasterView(bufsz, maxseg, nm);
37 setName(nm);
38}
39
40TOISegmented::~TOISegmented() {
41 delete master;
42}
43
44
45void TOISegmented::addConsumer(TOIProcessor* p) {
46 TOI::addConsumer(p);
47 master->nConsumers = consumers.size();
48}
49
50
51double TOISegmented::getData(int i) { /* reader thread */
52 return master->getData(i);
53}
54
55void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */
56 data = master->getData(i);
57 flag = master->getFlag(i);
58}
59
60void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */
61 master->putData(i, value, flag);
62}
63
64void TOISegmented::putDone() {
65 master->putDone();
66}
67
68void TOISegmented::wontNeedBefore(int i) { /* reader thread */
69 master->getView()->wontNeedBefore(i);
70}
71
72TOI::DataStatus TOISegmented::isDataAvail(int i, int j) {
73 // return master->getView()->isDataAvail(i, j);
74 cout << "TOISegmented::isDataAvail unimplemented" << endl;
75 throw PError("TOISegmented::isDataAvail unimplemented");
76}
77
78TOI::DataStatus TOISegmented::isDataAvail(int i) {
79 return isDataAvail(i,i);
80}
81
82TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) {
83 return isDataAvail(i,j);
84}
85
86void TOISegmented::waitForData(int iStart, int iEnd) {
87 // get will wait...
88}
89
90void TOISegmented::waitForData(int i) {
91 // get will wait...
92}
93
94void TOISegmented::waitForAnyData() {
95 cout << "TOISegmented::waitForAnyData unimplemented" << endl;
96 throw PError("TOISegmented::waitForAnyData unimplemented");
97}
98
99int TOISegmented::nextDataAvail(int iAfter) {
100 cout << "TOISegmented::nextDataAvail" << endl;
101 return iAfter+1;
102}
103
104bool TOISegmented::hasSomeData() {
105 cout << "TOISegmented::hasSomeData" << endl;
106 return true;
107}
108
109void TOISegmented::doPutData(int i, double value, uint_8 flag) {
110 cout << "TOISegmented::doPutData unimplemented" << endl;
111 throw PError("TOISegmented::doPutData unimplemented");
112}
113
114void TOISegmented::doGetData(int i, double& value, uint_8& flag) {
115 cout << "TOISegmented::doGetData unimplemented" << endl;
116 throw PError("TOISegmented::doGetData unimplemented");
117}
118
119
120/*******************************/
121/******* BufferSegment *********/
122/*******************************/
123
124TOISegmented::BufferSegment::BufferSegment(int sz) {
125 status = NEW;
126 bufferSize = sz;
127 sn0 = -1;
128
129 refcount = 0;
130
131 data = new double[sz];
132 flags = new uint_8[sz];
133
134 pthread_mutex_init(&refcount_mutex, NULL);
135}
136
137TOISegmented::BufferSegment::~BufferSegment() {
138 if (refcount > 0) {
139 throw(ForbiddenError("TOISegment : delete Buffer with refcount>0"));
140 }
141 LOG(cout << "Destroying buffersegment sn0 "<< sn0 << endl);
142 delete[] data;
143 delete[] flags;
144 pthread_mutex_destroy(&refcount_mutex);
145}
146
147void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) {
148 /* writer thread*/
149 if (status == NEW) {
150 status = WRITE;
151 sn0 = sn;
152 }
153 if (status == COMMITTED) {
154 throw(ForbiddenError("TOISegment : putData in committed buffer"));
155 }
156 checkInRange(sn);
157 data[sn-sn0] = d;
158 flags[sn-sn0] = f;
159}
160
161void TOISegmented::BufferSegment::incRefCount() {
162 pthread_mutex_lock(&refcount_mutex);
163 refcount++;
164 pthread_mutex_unlock(&refcount_mutex);
165}
166
167void TOISegmented::BufferSegment::decRefCount() {
168 pthread_mutex_lock(&refcount_mutex);
169 int nrc = --refcount;
170 pthread_mutex_unlock(&refcount_mutex);
171 if (nrc<0)
172 throw(ForbiddenError("TOISegment : buffer refcount < 0"));
173}
174
175int TOISegmented::BufferSegment::getRefCount() {
176 pthread_mutex_lock(&refcount_mutex);
177 int rc = refcount;
178 pthread_mutex_unlock(&refcount_mutex);
179 return rc;
180}
181
182
183/*******************************/
184/********** BufferView *********/
185/*******************************/
186
187TOISegmented::BufferView::BufferView(MasterView* m) {
188 master = m;
189 sn0 = -1;
190 segmentSize = m->segmentSize;
191 firstNeeded = -1;
192 waiting = false;
193}
194
195TOISegmented::BufferView::~BufferView() {
196}
197
198double TOISegmented::BufferView::getData(int sn) { /* Single-thread, reader thread*/
199 ensure(sn);
200 int seg = (sn-sn0)/segmentSize;
201 return segments[seg]->getData(sn);
202}
203
204uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread, reader thread */
205 ensure(sn);
206 int seg = (sn-sn0)/segmentSize;
207 return segments[seg]->getFlag(sn);
208}
209
210void TOISegmented::BufferView::ensure(int sn) { /* Single-thread, reader thread */
211 if (sn < sn0) {
212 LOG(cout << "TOISegmented::BufferView::ensure requested sample before first" << endl);
213 LOG(cout << "sn " << sn << " sn0 " << sn0 << endl);
214 abort();
215 }
216
217 if (sn0 < 0 ||
218 sn >= sn0 + segmentSize*segments.size()) {
219 LOG(cout << master->name << " BufferView "
220 << hex << this << dec << ": read fault for " << sn << endl)
221 sync();
222 pthread_mutex_lock(&(master->read_wait_mutex));
223 while (sn0<0 || sn >= sn0 + segmentSize*segments.size()) {
224 wait(); // must be atomic with loop test // $CHECK$ est-ce vrai ?
225 pthread_mutex_unlock(&(master->read_wait_mutex));
226 LOG(cout << master->name << " BufferView " << hex << this << dec << ": waiting for " << sn << endl)
227 sync();
228 pthread_mutex_lock(&(master->read_wait_mutex));
229 }
230 pthread_mutex_unlock(&(master->read_wait_mutex));
231
232 LOG(cout << master->name << " BufferView " << hex << this << dec << ": resuming for " << sn
233 << " now data for " << sn0 << " - " << sn0 + segmentSize*segments.size()
234 << " in " << segments.size() << " segments " << endl)
235 }
236}
237
238void TOISegmented::BufferView::sync() { /* Single-thread, reader thread */
239 master->updateView(this); // update me !
240}
241
242void TOISegmented::BufferView::wait() { /* reader thread, master read wait lock taken */
243 //pthread_mutex_lock(&(master->read_wait_mutex));
244 waiting = true;
245 master->waitingViews++;
246 pthread_cond_wait(&(master->read_wait_condv), &(master->read_wait_mutex));
247 waiting = false;
248 master->waitingViews--;
249 //pthread_mutex_unlock(&(master->read_wait_mutex));
250}
251
252
253void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */
254 if (sn > firstNeeded) {
255 firstNeeded = sn;
256 // C'est peut-etre le moment de faire unl sync, si on est coince par ailleurs...
257 //pthread_mutex_lock(&(master->read_wait_mutex));
258 if (sn >= sn0 + segmentSize){ // && master->waitingViews>0) {
259// LOG(cout<<master->name<< " sync on wontneed, waitingViews=" << master->waitingViews << endl);
260 LOG(cout<<master->name<< " sync on wontneed, sn = " << sn << " sn0 = " << sn0 << endl);
261 sync();
262 }
263 //pthread_mutex_unlock(&(master->read_wait_mutex));
264 }
265}
266
267
268/*******************************/
269/********** MasterView *********/
270/*******************************/
271
272TOISegmented::MasterView::MasterView(int bufsz, int maxseg, string nm) {
273 currentSegment = NULL;
274 maxSegments = maxseg;
275 segmentSize = bufsz;
276 sn0 = -1;
277 nConsumers = 0;
278 name = nm;
279
280 pthread_mutex_init(&views_mutex, NULL);
281 pthread_mutex_init(&read_wait_mutex, NULL);
282 pthread_cond_init(&write_wait_condv, NULL);
283 pthread_cond_init(&read_wait_condv, NULL);
284 pthread_key_create(&buffer_key, BufferDestroy);
285
286 waitingOnWrite = false;
287 waitingViews = 0;
288}
289
290TOISegmented::MasterView::~MasterView() {
291 pthread_mutex_destroy(&views_mutex);
292 pthread_mutex_destroy(&read_wait_mutex);
293 pthread_cond_destroy(&write_wait_condv);
294 pthread_cond_destroy(&read_wait_condv);
295 pthread_key_delete(buffer_key);
296
297 // There should not be any BufferView left... Check ?
298
299 // decrement count for segments ?
300}
301
302void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { /* writer thread */
303 if (sn0<0) {
304 LOG(cout << "***MasterView::putData sn0<0" << endl)
305 sn0=sn;
306 }
307 // can fit in current segment ?
308 if (!(currentSegment != NULL &&
309 sn >= currentSegment->sn0 &&
310 sn < currentSegment->sn0 + currentSegment->bufferSize)) {
311 LOG(cout << name << " MasterView::putData, need extend for " << sn << endl)
312 nextSegment();
313 }
314 currentSegment->putData(sn, data, flags);
315}
316
317double TOISegmented::MasterView::getData(int sn) { /* reader thread */
318 return getView()->getData(sn); /* thread-specific */
319}
320
321uint_8 TOISegmented::MasterView::getFlag(int sn) { /* reader thread */
322 return getView()->getFlag(sn);
323}
324
325TOISegmented::BufferView* TOISegmented::MasterView::getView() { /* reader thread */
326 BufferView* bv = (BufferView*) pthread_getspecific(buffer_key);
327 if (bv == NULL) {
328 bv = createView();
329 LOG(cout << "creating new view " << hex << bv << dec << endl)
330 pthread_setspecific(buffer_key, bv);
331 }
332 return bv;
333}
334
335void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ /* views locked */
336 pthread_mutex_lock(&read_wait_mutex);
337 pthread_cond_broadcast(&read_wait_condv);
338 pthread_mutex_unlock(&read_wait_mutex);
339}
340
341void TOISegmented::MasterView::signalWrite() { /* reader thread */ /* views locked */
342 if (waitingOnWrite) {
343 LOG(cout << name << " MasterView : signal for wait on write" << endl)
344 pthread_cond_signal(&write_wait_condv); // only one thread can be sleeping
345 }
346}
347
348void TOISegmented::MasterView::putDone() {
349 nextSegment(); // cree un segment inutile, a nettoyer
350}
351
352void TOISegmented::MasterView::nextSegment() { /* writer thread */
353 // The current segment, if any, is now committed. A new
354 // blank buffer is allocated, if any.
355 pthread_mutex_lock(&views_mutex);
356
357 LOG(cout << "MasterView::nextSegment "
358 << segments.size()+1 << "/" << maxSegments << endl)
359
360 if (currentSegment != NULL) {
361 currentSegment->status = BufferSegment::COMMITTED;
362 segments.push_back(currentSegment);
363 }
364
365 currentSegment = NULL;
366 while (segments.size() >= maxSegments) {
367 waitForCleaning();
368 }
369
370 currentSegment = new BufferSegment(segmentSize);
371 currentSegment->incRefCount();
372 signalWaitingViews(); // they can ask to be updated !!
373 pthread_mutex_unlock(&views_mutex);
374}
375
376void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ /* views locked */
377 LOG(cout << name << " MasterView : write wait for clean for " << sn0 << endl)
378 waitingOnWrite = true;
379 checkDeadLock();
380 pthread_cond_wait(&write_wait_condv, &views_mutex);
381 LOG(cout << name << " MasterView : wait done" << endl)
382}
383
384TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */
385 BufferView* bv = new BufferView(this);
386 pthread_mutex_lock(&views_mutex);
387 allViews.insert(bv);
388 pthread_mutex_unlock(&views_mutex);
389 updateView(bv);
390 return bv;
391}
392
393void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */
394 pthread_mutex_lock(&views_mutex);
395
396// int oldBegin = bv->sn0;
397// int oldEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
398 int oldBegin = sn0;
399 int oldEnd = sn0 + bv->segmentSize * segments.size();
400
401 for (vector<BufferSegment*>::iterator i = bv->segments.begin();
402 i != bv->segments.end(); i++) {
403 (*i)->decRefCount();
404 }
405
406 bv->segments.clear();
407
408 // utiliser firstNeeded de toutes les vues pour faire le menage chez
409 // nous.
410
411 // A condition que tous les consumers se soient fait connaitre...
412
413 if (nConsumers == allViews.size()) {
414 int firstNeeded = MAXINT;
415 for (set<BufferView*>::iterator i = allViews.begin();
416 i != allViews.end(); i++) {
417 LOG(cout << name << " View firstneeded " << (*i)->firstNeeded << endl);
418 if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded;
419 }
420
421 LOG(cout << name << " MasterView : firstNeeded = " << firstNeeded << endl);
422
423 vector<BufferSegment*>::iterator j = segments.begin();
424 bool clean = false;
425 for (vector<BufferSegment*>::iterator i = segments.begin();
426 i != segments.end(); i++) {
427 //LOG(cout << "Updating : rc = " << (*i)->getRefCount() << " sn0 = " << (*i)->sn0 << endl;);
428 if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 1)) {
429 clean = true;
430 (*i)->decRefCount();
431 delete (*i);
432 j = i;
433 }
434 }
435 j++;
436 if (clean) {
437 segments.erase(segments.begin(),j);
438 sn0 = (*segments.begin())->sn0;
439 LOG(cout << "MasterView : purged until " << sn0 << endl);
440 }
441 } else {
442 LOG(cout << "MasterView : not yet all consumer thread known "<< allViews.size()
443 << "/" << nConsumers << endl);
444 }
445
446 for (vector<BufferSegment*>::iterator i = segments.begin();
447 i != segments.end(); i++) {
448 if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) {
449 (*i)->incRefCount();
450 bv->segments.push_back(*i);
451 }
452 }
453
454 bv->sn0 = -1;
455 int newEnd = -1;
456 if (segments.size() > 0) {
457 bv->sn0 = bv->segments[0]->sn0;
458 newEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
459 }
460
461 if (sn0 > oldBegin) { // nettoyage de fait, reveiller le writer thread si besoin
462 signalWrite();
463 }
464
465 LOG(cout << name << " sync for " << hex << bv << dec << " : "
466 << oldBegin << " - " << oldEnd << " --> "
467 << sn0 << " - " << newEnd << endl);
468
469 if (newEnd > oldEnd) { // Nouveautes, reveiller les reader threads si besoin
470 signalWaitingViews();
471 }
472 pthread_mutex_unlock(&views_mutex);
473}
474
475void TOISegmented::MasterView::checkDeadLock() { /* views locked */
476 // There is a possible deadlock if no view can free old segments
477 // and we are waiting for write.
478
479 // we need to record "wont need before" for each view, and
480 // signal deadlock if any view that needs first segment data is sleeping
481 // while we are asleep
482
483 pthread_mutex_lock(&read_wait_mutex);
484 if (!waitingOnWrite) {
485 pthread_mutex_unlock(&read_wait_mutex);
486 return; // no problem, there is an active writer
487 }
488
489 // Is any sleeping view needing our first segment ?
490
491 for (set<BufferView*>::iterator i=allViews.begin();
492 i != allViews.end(); i++) {
493 if ((*i)->waiting && (*i)->firstNeeded < sn0+segmentSize) {
494 cout << "**** DEADLOCK detected ****" << endl;
495 cout << "We are waiting on write (buffer is full)"<< endl;
496 cout << "but a waiting reader still needs our first segment" << endl;
497 cout << "restart with bigger buffers" << endl;
498 abort();
499 }
500 }
501 pthread_mutex_unlock(&read_wait_mutex);
502}
503
504
505
506void TOISegmented::MasterView::BufferDestroy(void* p) {
507 BufferView* bv = (BufferView*) p;
508 delete bv;
509}
Note: See TracBrowser for help on using the repository browser.