source: Sophya/trunk/ArchTOIPipe/Kernel/toisegment.cc@ 1743

Last change on this file since 1743 was 1743, checked in by aubourg, 24 years ago

optim lecture/ecriture en bloc

File size: 17.3 KB
Line 
1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
5// $Id: toisegment.cc,v 1.16 2001-11-09 23:13:15 aubourg Exp $
6
7#include "toisegment.h"
8
9#include <iostream.h>
10
11#ifndef MAXINT
12#define MAXINT 2147483647
13#endif
14
15static pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
16static void cout_lock() {pthread_mutex_lock(&cout_mutex);}
17static void cout_unlock() {pthread_mutex_unlock(&cout_mutex);}
18#define LOG(_xxx_)
19/*
20#define LOG(_xxx_) \
21cout_lock(); \
22_xxx_; \
23cout_unlock();
24*/
25
26/******************************/
27/******* TOISegmented *********/
28/******************************/
29
30TOISegmented::TOISegmented(int bufsz, int maxseg) {
31 master = new MasterView(bufsz, maxseg, "");
32 setName("TOISegmented");
33 syncOldWay = false;
34}
35
36TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) {
37 master = new MasterView(bufsz, maxseg, nm);
38 setName(nm);
39 syncOldWay = false;
40}
41
42TOISegmented::TOISegmented(char* cnm, int bufsz, int maxseg) {
43 string nm = cnm;
44 master = new MasterView(bufsz, maxseg, nm);
45 setName(nm);
46 syncOldWay = false;
47}
48
49TOISegmented::~TOISegmented() {
50 delete master;
51}
52
53
54void TOISegmented::addConsumer(TOIProcessor* p) {
55 TOI::addConsumer(p);
56 master->nConsumers = consumers.size();
57}
58
59
60double TOISegmented::getData(int i) { /* reader thread */
61 return master->getData(i);
62}
63
64void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */
65 data = master->getData(i);
66 flag = master->getFlag(i);
67}
68
69void TOISegmented::getData(int i, int n, double* data, uint_8* flg) { /* reader thread */
70 master->getData(i, n, data, flg);
71}
72
73void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */
74 master->putData(i, value, flag);
75}
76
77void TOISegmented::putData(int i, int n, double const* val, uint_8 const* flg) { /* writer thread */
78 master->putData(i, n, val, flg);
79}
80
81void TOISegmented::putDone() {
82 master->putDone();
83}
84
85void TOISegmented::wontNeedBefore(int i) { /* reader thread */
86 master->getView()->wontNeedBefore(i);
87}
88
89TOI::DataStatus TOISegmented::isDataAvail(int i, int j) {
90 // return master->getView()->isDataAvail(i, j);
91 cout << "TOISegmented::isDataAvail unimplemented" << endl;
92 throw PError("TOISegmented::isDataAvail unimplemented");
93}
94
95TOI::DataStatus TOISegmented::isDataAvail(int i) {
96 return isDataAvail(i,i);
97}
98
99TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) {
100 return isDataAvail(i,j);
101}
102
103void TOISegmented::waitForData(int iStart, int iEnd) {
104 // get will wait...
105}
106
107void TOISegmented::waitForData(int i) {
108 // get will wait...
109}
110
111void TOISegmented::waitForAnyData() {
112 cout << "TOISegmented::waitForAnyData unimplemented" << endl;
113 throw PError("TOISegmented::waitForAnyData unimplemented");
114}
115
116int TOISegmented::nextDataAvail(int iAfter) {
117 cout << "TOISegmented::nextDataAvail" << endl;
118 return iAfter+1;
119}
120
121bool TOISegmented::hasSomeData() {
122 cout << "TOISegmented::hasSomeData" << endl;
123 return true;
124}
125
126void TOISegmented::doPutData(int i, double value, uint_8 flag) {
127 cout << "TOISegmented::doPutData unimplemented" << endl;
128 throw PError("TOISegmented::doPutData unimplemented");
129}
130
131void TOISegmented::doGetData(int i, double& value, uint_8& flag) {
132 cout << "TOISegmented::doGetData unimplemented" << endl;
133 throw PError("TOISegmented::doGetData unimplemented");
134}
135
136
137/*******************************/
138/******* BufferSegment *********/
139/*******************************/
140
141TOISegmented::BufferSegment::BufferSegment(int sz) {
142 status = NEW;
143 bufferSize = sz;
144 sn0 = -1;
145
146 refcount = 0;
147
148 data = new double[sz];
149 flags = new uint_8[sz];
150
151 pthread_mutex_init(&refcount_mutex, NULL);
152}
153
154TOISegmented::BufferSegment::~BufferSegment() {
155 if (refcount > 0) {
156 throw(ForbiddenError("TOISegment : delete Buffer with refcount>0"));
157 }
158 LOG(cout << "Destroying buffersegment sn0 "<< sn0 << endl);
159 delete[] data;
160 delete[] flags;
161 pthread_mutex_destroy(&refcount_mutex);
162}
163
164void TOISegmented::BufferSegment::getData(int sn, int n, double* d, uint_8* f) {
165 checkCommitted();
166 checkInRange(sn);
167 checkInRange(sn+n-1);
168 memcpy(d, data+(sn-sn0), n*sizeof(double));
169 if (f != NULL) {
170 memcpy(f, flags+(sn-sn0), n*sizeof(uint_8));
171 }
172}
173
174void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) {
175 /* writer thread*/
176 if (status == NEW) {
177 status = WRITE;
178 sn0 = sn;
179 }
180 if (status == COMMITTED) {
181 throw(ForbiddenError("TOISegment : putData in committed buffer"));
182 }
183 checkInRange(sn);
184 data[sn-sn0] = d;
185 flags[sn-sn0] = f;
186}
187
188void TOISegmented::BufferSegment::putData(int sn, int n, double const* d, uint_8 const* f) {
189 checkCommitted();
190 checkInRange(sn);
191 checkInRange(sn+n-1);
192 memcpy(data+(sn-sn0), d, n*sizeof(double));
193 if (f != NULL) {
194 memcpy(flags+(sn-sn0), f, n*sizeof(uint_8));
195 } else {
196 memset(flags+(sn-sn0), 0, n*sizeof(uint_8));
197 }
198}
199
200void TOISegmented::BufferSegment::incRefCount() {
201 pthread_mutex_lock(&refcount_mutex);
202 refcount++;
203 pthread_mutex_unlock(&refcount_mutex);
204}
205
206void TOISegmented::BufferSegment::decRefCount() {
207 pthread_mutex_lock(&refcount_mutex);
208 int nrc = --refcount;
209 pthread_mutex_unlock(&refcount_mutex);
210 if (nrc<0)
211 throw(ForbiddenError("TOISegment : buffer refcount < 0"));
212}
213
214int TOISegmented::BufferSegment::getRefCount() {
215 pthread_mutex_lock(&refcount_mutex);
216 int rc = refcount;
217 pthread_mutex_unlock(&refcount_mutex);
218 return rc;
219}
220
221
222/*******************************/
223/********** BufferView *********/
224/*******************************/
225
226TOISegmented::BufferView::BufferView(MasterView* m) {
227 master = m;
228 sn0 = -1;
229 segmentSize = m->segmentSize;
230 firstNeeded = -1;
231 waiting = false;
232}
233
234TOISegmented::BufferView::~BufferView() {
235}
236
237double TOISegmented::BufferView::getData(int sn) { /* Single-thread, reader thread*/
238 ensure(sn);
239 int seg = (sn-sn0)/segmentSize;
240 return segments[seg]->getData(sn);
241}
242
243uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread, reader thread */
244 ensure(sn);
245 int seg = (sn-sn0)/segmentSize;
246 return segments[seg]->getFlag(sn);
247}
248
249void TOISegmented::BufferView::getData(int sn, int n, double* dat, uint_8* flg) { /* Single-thread, reader thread */
250 ensure(sn);
251 ensure(sn+n-1);
252
253 int sn1 = sn;
254 int nsam = n;
255 double* pdat = dat;
256 uint_8* pflg = flg;
257
258 while (true) {
259 int seg = (sn1-sn0)/segmentSize;
260 BufferSegment* s = segments[seg];
261 int snmax = s->sn0 + s->bufferSize - 1;
262 int sn2 = snmax > (sn1+nsam-1) ? (sn1+nsam-1) : snmax;
263 int nget = sn2-sn1+1;
264 s->getData(sn1, nget, pdat, pflg);
265 pdat += nget;
266 if (pflg != NULL) pflg += nget;
267 nsam -= nget;
268 sn1 += nget;
269 if (nsam <= 0) break;
270 }
271}
272
273void TOISegmented::BufferView::ensure(int sn) { /* Single-thread, reader thread */
274 if (sn < sn0) {
275 LOG(cout << "TOISegmented::BufferView::ensure requested sample before first" << endl);
276 LOG(cout << "sn " << sn << " sn0 " << sn0 << endl);
277 abort();
278 }
279
280 if (sn0 < 0 ||
281 sn >= sn0 + segmentSize*segments.size()) {
282 LOG(cout << master->name << " BufferView "
283 << hex << this << dec << ": read fault for " << sn << endl)
284 sync();
285 pthread_mutex_lock(&(master->read_wait_mutex));
286 while (sn0<0 || sn >= sn0 + segmentSize*segments.size()) {
287 wait(); // must be atomic with loop test // $CHECK$ est-ce vrai ?
288 pthread_mutex_unlock(&(master->read_wait_mutex));
289 LOG(cout << master->name << " BufferView " << hex << this << dec << ": waiting for " << sn << endl)
290 sync();
291 pthread_mutex_lock(&(master->read_wait_mutex));
292 }
293 pthread_mutex_unlock(&(master->read_wait_mutex));
294
295 LOG(cout << master->name << " BufferView " << hex << this << dec << ": resuming for " << sn
296 << " now data for " << sn0 << " - " << sn0 + segmentSize*segments.size()
297 << " in " << segments.size() << " segments " << endl)
298 }
299}
300
301void TOISegmented::BufferView::sync() { /* Single-thread, reader thread */
302 master->updateView(this); // update me !
303}
304
305void TOISegmented::BufferView::wait() { /* reader thread, master read wait lock taken */
306 //pthread_mutex_lock(&(master->read_wait_mutex));
307 waiting = true;
308 master->waitingViews++;
309 pthread_cond_wait(&(master->read_wait_condv), &(master->read_wait_mutex));
310 waiting = false;
311 master->waitingViews--;
312 //pthread_mutex_unlock(&(master->read_wait_mutex));
313}
314
315
316void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */
317 if (sn > firstNeeded) {
318 firstNeeded = sn;
319 // C'est peut-etre le moment de faire unl sync, si on est coince par ailleurs...
320 //pthread_mutex_lock(&(master->read_wait_mutex));
321 if (sn >= sn0 + segmentSize){ // && master->waitingViews>0) {
322// LOG(cout<<master->name<< " sync on wontneed, waitingViews=" << master->waitingViews << endl);
323 LOG(cout<<master->name<< " sync on wontneed, sn = " << sn << " sn0 = " << sn0 << endl);
324 sync();
325 }
326 //pthread_mutex_unlock(&(master->read_wait_mutex));
327 }
328}
329
330
331/*******************************/
332/********** MasterView *********/
333/*******************************/
334
335TOISegmented::MasterView::MasterView(int bufsz, int maxseg, string nm) {
336 currentSegment = NULL;
337 maxSegments = maxseg;
338 segmentSize = bufsz;
339 sn0 = -1;
340 nConsumers = 0;
341 name = nm;
342
343 pthread_mutex_init(&views_mutex, NULL);
344 pthread_mutex_init(&read_wait_mutex, NULL);
345 pthread_cond_init(&write_wait_condv, NULL);
346 pthread_cond_init(&read_wait_condv, NULL);
347 pthread_key_create(&buffer_key, BufferDestroy);
348
349 waitingOnWrite = false;
350 waitingViews = 0;
351}
352
353TOISegmented::MasterView::~MasterView() {
354 pthread_mutex_destroy(&views_mutex);
355 pthread_mutex_destroy(&read_wait_mutex);
356 pthread_cond_destroy(&write_wait_condv);
357 pthread_cond_destroy(&read_wait_condv);
358 pthread_key_delete(buffer_key);
359
360 // There should not be any BufferView left... Check ?
361
362 // decrement count for segments ?
363}
364
365void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { /* writer thread */
366 if (sn0<0) {
367 LOG(cout << "***MasterView::putData sn0<0" << endl)
368 sn0=sn;
369 }
370 // can fit in current segment ?
371 if (!(currentSegment != NULL &&
372 sn >= currentSegment->sn0 &&
373 sn < currentSegment->sn0 + currentSegment->bufferSize)) {
374 LOG(cout << name << " MasterView::putData, need extend for " << sn << endl)
375 nextSegment();
376 }
377 currentSegment->putData(sn, data, flags);
378}
379
380void TOISegmented::MasterView::putData(int sn, int n, double const* data, uint_8 const* flags) { /* writer thread */
381 if (sn0<0) {
382 LOG(cout << "***MasterView::putData sn0<0" << endl);
383 sn0=sn;
384 }
385 double const* pdat = data;
386 uint_8 const* pflg = flags;
387 int nsam = n;
388 int sn1 = sn;
389 while (true) {
390 // maximum that current segment can take
391 int snmax = -1;
392 if (currentSegment != NULL) {
393 snmax = currentSegment->sn0 + currentSegment->bufferSize-1;
394 }
395 int sn2 = snmax > (sn1+nsam-1) ? (sn1+nsam-1) : snmax;
396 if (snmax>0) {
397 int nput = sn2-sn1+1;
398 currentSegment->putData(sn1, nput, pdat, pflg);
399 pdat += nput;
400 if (pflg != NULL) pflg += nput;
401 nsam -= nput;
402 sn1 += nput;
403 }
404 if (nsam <= 0) break;
405 nextSegment();
406 currentSegment->putData(sn1, 0, 0); // dummy, to initialize sn0 in segment : add method ?
407 }
408}
409
410double TOISegmented::MasterView::getData(int sn) { /* reader thread */
411 return getView()->getData(sn); /* thread-specific */
412}
413
414uint_8 TOISegmented::MasterView::getFlag(int sn) { /* reader thread */
415 return getView()->getFlag(sn);
416}
417
418void TOISegmented::MasterView::getData(int sn, int n, double* dat, uint_8* flg) { /* reader thread */
419 getView()->getData(sn, n, dat, flg);
420}
421
422TOISegmented::BufferView* TOISegmented::MasterView::getView() { /* reader thread */
423 BufferView* bv = (BufferView*) pthread_getspecific(buffer_key);
424 if (bv == NULL) {
425 bv = createView();
426 LOG(cout << "creating new view " << hex << bv << dec << endl)
427 pthread_setspecific(buffer_key, bv);
428 }
429 return bv;
430}
431
432void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ /* views locked */
433 pthread_mutex_lock(&read_wait_mutex);
434 pthread_cond_broadcast(&read_wait_condv);
435 pthread_mutex_unlock(&read_wait_mutex);
436}
437
438void TOISegmented::MasterView::signalWrite() { /* reader thread */ /* views locked */
439 if (waitingOnWrite) {
440 LOG(cout << name << " MasterView : signal for wait on write" << endl)
441 pthread_cond_signal(&write_wait_condv); // only one thread can be sleeping
442 }
443}
444
445void TOISegmented::MasterView::putDone() {
446 nextSegment(); // cree un segment inutile, a nettoyer
447}
448
449void TOISegmented::MasterView::nextSegment() { /* writer thread */
450 // The current segment, if any, is now committed. A new
451 // blank buffer is allocated, if any.
452 pthread_mutex_lock(&views_mutex);
453
454 LOG(cout << "MasterView::nextSegment "
455 << segments.size()+1 << "/" << maxSegments << endl)
456
457 if (currentSegment != NULL) {
458 currentSegment->status = BufferSegment::COMMITTED;
459 segments.push_back(currentSegment);
460 }
461
462 currentSegment = NULL;
463 while (segments.size() >= maxSegments) {
464 waitForCleaning();
465 }
466
467 currentSegment = new BufferSegment(segmentSize);
468 currentSegment->incRefCount();
469 signalWaitingViews(); // they can ask to be updated !!
470 pthread_mutex_unlock(&views_mutex);
471}
472
473void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ /* views locked */
474 LOG(cout << name << " MasterView : write wait for clean for " << sn0 << endl)
475 waitingOnWrite = true;
476 checkDeadLock();
477 pthread_cond_wait(&write_wait_condv, &views_mutex);
478 LOG(cout << name << " MasterView : wait done" << endl)
479}
480
481TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */
482 BufferView* bv = new BufferView(this);
483 pthread_mutex_lock(&views_mutex);
484 allViews.insert(bv);
485 pthread_mutex_unlock(&views_mutex);
486 updateView(bv);
487 return bv;
488}
489
490void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */
491 pthread_mutex_lock(&views_mutex);
492
493// int oldBegin = bv->sn0;
494// int oldEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
495 int oldBegin = sn0;
496 int oldEnd = sn0 + bv->segmentSize * segments.size();
497
498 for (vector<BufferSegment*>::iterator i = bv->segments.begin();
499 i != bv->segments.end(); i++) {
500 (*i)->decRefCount();
501 }
502
503 bv->segments.clear();
504
505 // utiliser firstNeeded de toutes les vues pour faire le menage chez
506 // nous.
507
508 // A condition que tous les consumers se soient fait connaitre...
509
510 if (nConsumers == allViews.size()) {
511 int firstNeeded = MAXINT;
512 for (set<BufferView*>::iterator i = allViews.begin();
513 i != allViews.end(); i++) {
514 LOG(cout << name << " View firstneeded " << (*i)->firstNeeded << endl);
515 if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded;
516 }
517
518 LOG(cout << name << " MasterView : firstNeeded = " << firstNeeded << endl);
519
520 vector<BufferSegment*>::iterator j = segments.begin();
521 bool clean = false;
522 for (vector<BufferSegment*>::iterator i = segments.begin();
523 i != segments.end(); i++) {
524 //LOG(cout << "Updating : rc = " << (*i)->getRefCount() << " sn0 = " << (*i)->sn0 << endl;);
525 if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 1)) {
526 clean = true;
527 (*i)->decRefCount();
528 delete (*i);
529 j = i;
530 }
531 }
532 j++;
533 if (clean) {
534 segments.erase(segments.begin(),j);
535 sn0 = (*segments.begin())->sn0;
536 LOG(cout << "MasterView : purged until " << sn0 << endl);
537 }
538 } else {
539 LOG(cout << "MasterView : not yet all consumer thread known "<< allViews.size()
540 << "/" << nConsumers << endl);
541 }
542
543 for (vector<BufferSegment*>::iterator i = segments.begin();
544 i != segments.end(); i++) {
545 if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) {
546 (*i)->incRefCount();
547 bv->segments.push_back(*i);
548 }
549 }
550
551 bv->sn0 = -1;
552 int newEnd = -1;
553 if (segments.size() > 0) {
554 bv->sn0 = bv->segments[0]->sn0;
555 newEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
556 }
557
558 if (sn0 > oldBegin) { // nettoyage de fait, reveiller le writer thread si besoin
559 signalWrite();
560 }
561
562 LOG(cout << name << " sync for " << hex << bv << dec << " : "
563 << oldBegin << " - " << oldEnd << " --> "
564 << sn0 << " - " << newEnd << endl);
565
566 if (newEnd > oldEnd) { // Nouveautes, reveiller les reader threads si besoin
567 signalWaitingViews();
568 }
569 pthread_mutex_unlock(&views_mutex);
570}
571
572void TOISegmented::MasterView::checkDeadLock() { /* views locked */
573 // There is a possible deadlock if no view can free old segments
574 // and we are waiting for write.
575
576 // we need to record "wont need before" for each view, and
577 // signal deadlock if any view that needs first segment data is sleeping
578 // while we are asleep
579
580 pthread_mutex_lock(&read_wait_mutex);
581 if (!waitingOnWrite) {
582 pthread_mutex_unlock(&read_wait_mutex);
583 return; // no problem, there is an active writer
584 }
585
586 // Is any sleeping view needing our first segment ?
587
588 for (set<BufferView*>::iterator i=allViews.begin();
589 i != allViews.end(); i++) {
590 if ((*i)->waiting && (*i)->firstNeeded < sn0+segmentSize) {
591 cout << "**** DEADLOCK detected ****" << endl;
592 cout << "We are waiting on write (buffer is full)"<< endl;
593 cout << "but a waiting reader still needs our first segment" << endl;
594 cout << "restart with bigger buffers" << endl;
595 abort();
596 }
597 }
598 pthread_mutex_unlock(&read_wait_mutex);
599}
600
601
602
603void TOISegmented::MasterView::BufferDestroy(void* p) {
604 BufferView* bv = (BufferView*) p;
605 delete bv;
606}
Note: See TracBrowser for help on using the repository browser.