source: Sophya/trunk/ArchTOIPipe/Kernel/toisegment.cc@ 1699

Last change on this file since 1699 was 1699, checked in by cmv, 24 years ago

intro des toisegmented + constructeur avec char* pour compat cmv 15/10/01

File size: 13.1 KB
RevLine 
[1670]1#include "toisegment.h"
[1671]2
[1697]3#include <iostream.h>
4
[1690]5#ifndef MAXINT
6#define MAXINT 2147483647
7#endif
8
[1692]9static pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
10static void cout_lock() {pthread_mutex_lock(&cout_mutex);}
11static void cout_unlock() {pthread_mutex_unlock(&cout_mutex);}
[1693]12#define LOG(_xxx_)
13/*
[1692]14#define LOG(_xxx_) \
15cout_lock(); \
16_xxx_; \
17cout_unlock();
[1693]18*/
[1692]19
[1689]20/******************************/
21/******* TOISegmented *********/
22/******************************/
23
24TOISegmented::TOISegmented(int bufsz, int maxseg) {
25 master = new MasterView(bufsz, maxseg);
26 setName("TOISegmented");
27}
28
29TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) {
30 master = new MasterView(bufsz, maxseg);
31 setName(nm);
32}
33
[1699]34TOISegmented::TOISegmented(char* cnm, int bufsz, int maxseg) {
35 string nm = cnm;
36 master = new MasterView(bufsz, maxseg);
37 setName(nm);
38}
39
[1689]40TOISegmented::~TOISegmented() {
41 delete master;
42}
43
44
[1692]45void TOISegmented::addConsumer(TOIProcessor* p) {
46 TOI::addConsumer(p);
47 master->nConsumers = consumers.size();
48}
49
50
[1689]51double TOISegmented::getData(int i) { /* reader thread */
52 return master->getData(i);
53}
54
55void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */
56 data = master->getData(i);
57 flag = master->getFlag(i);
58}
59
60void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */
61 master->putData(i, value, flag);
62}
63
64void TOISegmented::putDone() {
65 master->putDone();
66}
67
68void TOISegmented::wontNeedBefore(int i) { /* reader thread */
69 master->getView()->wontNeedBefore(i);
70}
71
72TOI::DataStatus TOISegmented::isDataAvail(int i, int j) {
73 // return master->getView()->isDataAvail(i, j);
74 cout << "TOISegmented::isDataAvail unimplemented" << endl;
75 throw PError("TOISegmented::isDataAvail unimplemented");
76}
77
78TOI::DataStatus TOISegmented::isDataAvail(int i) {
79 return isDataAvail(i,i);
80}
81
82TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) {
83 return isDataAvail(i,j);
84}
85
86void TOISegmented::waitForData(int iStart, int iEnd) {
87 // get will wait...
88}
89
90void TOISegmented::waitForData(int i) {
91 // get will wait...
92}
93
94void TOISegmented::waitForAnyData() {
95 cout << "TOISegmented::waitForAnyData unimplemented" << endl;
96 throw PError("TOISegmented::waitForAnyData unimplemented");
97}
98
99int TOISegmented::nextDataAvail(int iAfter) {
100 cout << "TOISegmented::nextDataAvail" << endl;
101 return iAfter+1;
102}
103
104bool TOISegmented::hasSomeData() {
105 cout << "TOISegmented::hasSomeData" << endl;
106 return true;
107}
108
[1692]109void TOISegmented::doPutData(int i, double value, uint_8 flag) {
[1689]110 cout << "TOISegmented::doPutData unimplemented" << endl;
111 throw PError("TOISegmented::doPutData unimplemented");
112}
113
114void TOISegmented::doGetData(int i, double& value, uint_8& flag) {
115 cout << "TOISegmented::doGetData unimplemented" << endl;
116 throw PError("TOISegmented::doGetData unimplemented");
117}
118
119
[1686]120/*******************************/
121/******* BufferSegment *********/
122/*******************************/
[1671]123
124TOISegmented::BufferSegment::BufferSegment(int sz) {
125 status = NEW;
126 bufferSize = sz;
127 sn0 = -1;
128
129 refcount = 0;
130
131 data = new double[sz];
132 flags = new uint_8[sz];
133
134 pthread_mutex_init(&refcount_mutex, NULL);
135}
136
137TOISegmented::BufferSegment::~BufferSegment() {
138 if (refcount > 0) {
139 throw(ForbiddenError("TOISegment : delete Buffer with refcount>0"));
140 }
141 delete[] data;
142 delete[] flags;
143 pthread_mutex_destroy(&refcount_mutex);
144}
145
[1689]146void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) {
147 /* writer thread*/
[1671]148 if (status == NEW) {
149 status = WRITE;
150 sn0 = sn;
151 }
152 if (status == COMMITTED) {
153 throw(ForbiddenError("TOISegment : putData in committed buffer"));
154 }
155 checkInRange(sn);
156 data[sn-sn0] = d;
157 flags[sn-sn0] = f;
158}
159
160void TOISegmented::BufferSegment::incRefCount() {
161 pthread_mutex_lock(&refcount_mutex);
162 refcount++;
163 pthread_mutex_unlock(&refcount_mutex);
164}
165
166void TOISegmented::BufferSegment::decRefCount() {
167 pthread_mutex_lock(&refcount_mutex);
168 int nrc = --refcount;
169 pthread_mutex_unlock(&refcount_mutex);
170 if (nrc<0)
171 throw(ForbiddenError("TOISegment : buffer refcount < 0"));
172}
173
174int TOISegmented::BufferSegment::getRefCount() {
175 pthread_mutex_lock(&refcount_mutex);
176 int rc = refcount;
177 pthread_mutex_unlock(&refcount_mutex);
178 return rc;
179}
180
[1686]181
182/*******************************/
183/********** BufferView *********/
184/*******************************/
185
186TOISegmented::BufferView::BufferView(MasterView* m) {
187 master = m;
188 sn0 = -1;
189 segmentSize = m->segmentSize;
[1689]190 firstNeeded = -1;
[1692]191 waiting = false;
[1686]192}
193
194TOISegmented::BufferView::~BufferView() {
195}
196
[1692]197double TOISegmented::BufferView::getData(int sn) { /* Single-thread, reader thread*/
[1686]198 ensure(sn);
199 int seg = (sn-sn0)/segmentSize;
200 return segments[seg]->getData(sn);
201}
202
[1692]203uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread, reader thread */
[1686]204 ensure(sn);
205 int seg = (sn-sn0)/segmentSize;
206 return segments[seg]->getFlag(sn);
207}
208
[1692]209void TOISegmented::BufferView::ensure(int sn) { /* Single-thread, reader thread */
[1686]210 if (sn < sn0) {
[1692]211 LOG(cout << "TOISegmented::BufferView::ensure requested sample before first" << endl);
212 LOG(cout << "sn " << sn << " sn0 " << sn0 << endl);
213 abort();
[1686]214 }
215
[1692]216 if (sn0 < 0 ||
217 sn >= sn0 + segmentSize*segments.size()) {
218 LOG(cout << "BufferView " << hex << this << dec << ": read fault for " << sn << endl)
[1686]219 sync();
[1692]220 while (sn0<0 || sn >= sn0 + segmentSize*segments.size()) {
[1686]221 wait();
[1692]222 LOG(cout << "BufferView " << hex << this << dec << ": waiting for " << sn << endl)
[1686]223 sync();
224 }
[1692]225 LOG(cout << "BufferView " << hex << this << dec << ": resuming for " << sn
226 << " now data for " << sn0 << " - " << sn0 + segmentSize*segments.size()
227 << " in " << segments.size() << " segments " << endl)
[1686]228 }
229}
230
[1692]231void TOISegmented::BufferView::sync() { /* Single-thread, reader thread */
[1686]232 master->updateView(this); // update me !
233}
234
[1692]235void TOISegmented::BufferView::wait() { /* reader thread */
236 pthread_mutex_lock(&(master->read_wait_mutex));
237 waiting = true;
238 pthread_cond_wait(&(master->read_wait_condv), &(master->read_wait_mutex));
239 waiting = false;
240 pthread_mutex_unlock(&(master->read_wait_mutex));
[1686]241}
242
243
[1689]244void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */
245 if (sn > firstNeeded) {
246 firstNeeded = sn;
247 }
248}
[1686]249
[1689]250
[1686]251/*******************************/
252/********** MasterView *********/
253/*******************************/
254
255TOISegmented::MasterView::MasterView(int bufsz, int maxseg) {
256 currentSegment = NULL;
257 maxSegments = maxseg;
258 segmentSize = bufsz;
259 sn0 = -1;
[1692]260 nConsumers = 0;
[1686]261
262 pthread_mutex_init(&views_mutex, NULL);
[1692]263 pthread_mutex_init(&read_wait_mutex, NULL);
264 pthread_cond_init(&write_wait_condv, NULL);
265 pthread_cond_init(&read_wait_condv, NULL);
[1686]266 pthread_key_create(&buffer_key, BufferDestroy);
267
[1689]268 waitingOnWrite = false;
[1686]269}
270
271TOISegmented::MasterView::~MasterView() {
272 pthread_mutex_destroy(&views_mutex);
[1692]273 pthread_mutex_destroy(&read_wait_mutex);
274 pthread_cond_destroy(&write_wait_condv);
275 pthread_cond_destroy(&read_wait_condv);
[1686]276 pthread_key_delete(buffer_key);
277
278 // There should not be any BufferView left... Check ?
279
280 // decrement count for segments ?
281}
282
[1692]283void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { /* writer thread */
284 if (sn0<0) {
285 LOG(cout << "***MasterView::putData sn0<0" << endl)
286 sn0=sn;
287 }
[1686]288 // can fit in current segment ?
289 if (!(currentSegment != NULL &&
290 sn >= currentSegment->sn0 &&
291 sn < currentSegment->sn0 + currentSegment->bufferSize)) {
[1692]292 LOG(cout << "MasterView::putData, need extend for " << sn << endl)
[1686]293 nextSegment();
294 }
295 currentSegment->putData(sn, data, flags);
296}
297
[1692]298double TOISegmented::MasterView::getData(int sn) { /* reader thread */
299 return getView()->getData(sn); /* thread-specific */
[1686]300}
301
[1692]302uint_8 TOISegmented::MasterView::getFlag(int sn) { /* reader thread */
[1686]303 return getView()->getFlag(sn);
304}
305
306TOISegmented::BufferView* TOISegmented::MasterView::getView() { /* reader thread */
307 BufferView* bv = (BufferView*) pthread_getspecific(buffer_key);
308 if (bv == NULL) {
309 bv = createView();
[1692]310 LOG(cout << "creating new view " << hex << bv << dec << endl)
[1686]311 pthread_setspecific(buffer_key, bv);
312 }
313 return bv;
314}
[1689]315
[1692]316void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ /* views locked */
317 pthread_mutex_lock(&read_wait_mutex);
318 pthread_cond_broadcast(&read_wait_condv);
319 pthread_mutex_unlock(&read_wait_mutex);
320}
[1689]321
[1692]322void TOISegmented::MasterView::signalWrite() { /* reader thread */ /* views locked */
323 if (waitingOnWrite) {
324 LOG(cout << "MasterView : signal for wait on write" << endl)
325 pthread_cond_signal(&write_wait_condv); // only one thread can be sleeping
[1689]326 }
327}
328
329void TOISegmented::MasterView::putDone() {
330 nextSegment(); // cree un segment inutile, a nettoyer
331}
332
333void TOISegmented::MasterView::nextSegment() { /* writer thread */
334 // The current segment, if any, is now committed. A new
335 // blank buffer is allocated, if any.
[1692]336 pthread_mutex_lock(&views_mutex);
[1689]337
[1692]338 LOG(cout << "MasterView::nextSegment "
339 << segments.size()+1 << "/" << maxSegments << endl)
[1689]340
341 if (currentSegment != NULL) {
342 currentSegment->status = BufferSegment::COMMITTED;
343 segments.push_back(currentSegment);
344 }
345
346 currentSegment = NULL;
347 while (segments.size() >= maxSegments) {
348 waitForCleaning();
349 }
350
351 currentSegment = new BufferSegment(segmentSize);
352 signalWaitingViews(); // they can ask to be updated !!
[1692]353 pthread_mutex_unlock(&views_mutex);
[1689]354}
355
[1692]356void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ /* views locked */
357 LOG(cout << "MasterView : write wait for clean for " << sn0 << endl)
[1689]358 waitingOnWrite = true;
359 checkDeadLock();
[1692]360 pthread_cond_wait(&write_wait_condv, &views_mutex);
361 LOG(cout << "MasterView : wait done" << endl)
[1689]362}
363
364TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */
365 BufferView* bv = new BufferView(this);
[1692]366 pthread_mutex_lock(&views_mutex);
[1690]367 allViews.insert(bv);
[1692]368 pthread_mutex_unlock(&views_mutex);
[1689]369 updateView(bv);
370 return bv;
371}
372
373void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */
374 pthread_mutex_lock(&views_mutex);
375
376 int oldBegin = bv->sn0;
377 int oldEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
378
379 for (vector<BufferSegment*>::iterator i = bv->segments.begin();
380 i != bv->segments.end(); i++) {
381 (*i)->decRefCount();
382 }
383
384 bv->segments.clear();
385
[1690]386 // utiliser firstNeeded de toutes les vues pour faire le menage chez
[1689]387 // nous.
[1690]388
[1692]389 // A condition que tous les consumers se soient fait connaitre...
[1689]390
[1692]391 if (nConsumers == allViews.size()) {
392 int firstNeeded = MAXINT;
393 for (set<BufferView*>::iterator i = allViews.begin();
394 i != allViews.end(); i++) {
395 if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded;
396 }
397
398 LOG(cout << "MasterView : firstNeeded = " << firstNeeded << endl);
399
400 vector<BufferSegment*>::iterator j = segments.begin();
401 bool clean = false;
402 for (vector<BufferSegment*>::iterator i = segments.begin();
403 i != segments.end(); i++) {
404 if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 0)) {
405 clean = true;
406 j = i;
407 }
408 }
[1690]409 if (clean) {
410 segments.erase(segments.begin(),j);
411 sn0 = (*segments.begin())->sn0;
[1692]412 LOG(cout << "MasterView : purged until " << sn0 << endl);
[1690]413 }
[1692]414 } else {
415 LOG(cout << "MasterView : not yet all consumer thread known "<< allViews.size()
416 << "/" << nConsumers << endl);
417 }
[1690]418
419 for (vector<BufferSegment*>::iterator i = segments.begin();
[1689]420 i != segments.end(); i++) {
421 if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) {
422 (*i)->incRefCount();
423 bv->segments.push_back(*i);
424 }
425 }
426
427 bv->sn0 = -1;
428 int newEnd = -1;
429 if (segments.size() > 0) {
430 bv->sn0 = bv->segments[0]->sn0;
431 newEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
432 }
433
[1692]434 if (bv->sn0 > oldBegin) { // nettoyage de fait, reveiller le writer thread si besoin
435 signalWrite();
[1689]436 }
437
[1692]438 LOG(cout << "sync for " << hex << bv << dec << " : "
439 << oldBegin << " - " << oldEnd << " --> "
440 << bv->sn0 << " - " << newEnd << endl);
441
442 if (newEnd > oldEnd) { // Nouveautes, reveiller les reader threads si besoin
[1689]443 signalWaitingViews();
444 }
[1692]445 pthread_mutex_unlock(&views_mutex);
[1689]446}
447
[1692]448void TOISegmented::MasterView::checkDeadLock() { /* views locked */
[1689]449 // There is a possible deadlock if no view can free old segments
450 // and we are waiting for write.
451
452 // we need to record "wont need before" for each view, and
453 // signal deadlock if any view that needs first segment data is sleeping
454 // while we are asleep
455
[1692]456 pthread_mutex_lock(&read_wait_mutex);
457 if (!waitingOnWrite) {
458 pthread_mutex_unlock(&read_wait_mutex);
459 return; // no problem, there is an active writer
460 }
[1689]461
462 // Is any sleeping view needing our first segment ?
463
[1692]464 for (set<BufferView*>::iterator i=allViews.begin();
465 i != allViews.end(); i++) {
466 if ((*i)->waiting && (*i)->firstNeeded < sn0+segmentSize) {
[1689]467 cout << "**** DEADLOCK detected ****" << endl;
468 cout << "We are waiting on write (buffer is full)"<< endl;
469 cout << "but a waiting reader still needs our first segment" << endl;
470 cout << "restart with bigger buffers" << endl;
471 abort();
472 }
473 }
[1692]474 pthread_mutex_unlock(&read_wait_mutex);
[1689]475}
476
477
478
479void TOISegmented::MasterView::BufferDestroy(void* p) {
480 BufferView* bv = (BufferView*) p;
481 delete bv;
482}
Note: See TracBrowser for help on using the repository browser.