source: Sophya/trunk/ArchTOIPipe/Kernel/toisegment.cc@ 1697

Last change on this file since 1697 was 1697, checked in by aubourg, 24 years ago

OSF5

File size: 13.0 KB
Line 
1#include "toisegment.h"
2
3#include <iostream.h>
4
5#ifndef MAXINT
6#define MAXINT 2147483647
7#endif
8
9static pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
10static void cout_lock() {pthread_mutex_lock(&cout_mutex);}
11static void cout_unlock() {pthread_mutex_unlock(&cout_mutex);}
12#define LOG(_xxx_)
13/*
14#define LOG(_xxx_) \
15cout_lock(); \
16_xxx_; \
17cout_unlock();
18*/
19
20/******************************/
21/******* TOISegmented *********/
22/******************************/
23
24TOISegmented::TOISegmented(int bufsz, int maxseg) {
25 master = new MasterView(bufsz, maxseg);
26 setName("TOISegmented");
27}
28
29TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) {
30 master = new MasterView(bufsz, maxseg);
31 setName(nm);
32}
33
34TOISegmented::~TOISegmented() {
35 delete master;
36}
37
38
39void TOISegmented::addConsumer(TOIProcessor* p) {
40 TOI::addConsumer(p);
41 master->nConsumers = consumers.size();
42}
43
44
45double TOISegmented::getData(int i) { /* reader thread */
46 return master->getData(i);
47}
48
49void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */
50 data = master->getData(i);
51 flag = master->getFlag(i);
52}
53
54void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */
55 master->putData(i, value, flag);
56}
57
58void TOISegmented::putDone() {
59 master->putDone();
60}
61
62void TOISegmented::wontNeedBefore(int i) { /* reader thread */
63 master->getView()->wontNeedBefore(i);
64}
65
66TOI::DataStatus TOISegmented::isDataAvail(int i, int j) {
67 // return master->getView()->isDataAvail(i, j);
68 cout << "TOISegmented::isDataAvail unimplemented" << endl;
69 throw PError("TOISegmented::isDataAvail unimplemented");
70}
71
72TOI::DataStatus TOISegmented::isDataAvail(int i) {
73 return isDataAvail(i,i);
74}
75
76TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) {
77 return isDataAvail(i,j);
78}
79
80void TOISegmented::waitForData(int iStart, int iEnd) {
81 // get will wait...
82}
83
84void TOISegmented::waitForData(int i) {
85 // get will wait...
86}
87
88void TOISegmented::waitForAnyData() {
89 cout << "TOISegmented::waitForAnyData unimplemented" << endl;
90 throw PError("TOISegmented::waitForAnyData unimplemented");
91}
92
93int TOISegmented::nextDataAvail(int iAfter) {
94 cout << "TOISegmented::nextDataAvail" << endl;
95 return iAfter+1;
96}
97
98bool TOISegmented::hasSomeData() {
99 cout << "TOISegmented::hasSomeData" << endl;
100 return true;
101}
102
103void TOISegmented::doPutData(int i, double value, uint_8 flag) {
104 cout << "TOISegmented::doPutData unimplemented" << endl;
105 throw PError("TOISegmented::doPutData unimplemented");
106}
107
108void TOISegmented::doGetData(int i, double& value, uint_8& flag) {
109 cout << "TOISegmented::doGetData unimplemented" << endl;
110 throw PError("TOISegmented::doGetData unimplemented");
111}
112
113
114/*******************************/
115/******* BufferSegment *********/
116/*******************************/
117
118TOISegmented::BufferSegment::BufferSegment(int sz) {
119 status = NEW;
120 bufferSize = sz;
121 sn0 = -1;
122
123 refcount = 0;
124
125 data = new double[sz];
126 flags = new uint_8[sz];
127
128 pthread_mutex_init(&refcount_mutex, NULL);
129}
130
131TOISegmented::BufferSegment::~BufferSegment() {
132 if (refcount > 0) {
133 throw(ForbiddenError("TOISegment : delete Buffer with refcount>0"));
134 }
135 delete[] data;
136 delete[] flags;
137 pthread_mutex_destroy(&refcount_mutex);
138}
139
140void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) {
141 /* writer thread*/
142 if (status == NEW) {
143 status = WRITE;
144 sn0 = sn;
145 }
146 if (status == COMMITTED) {
147 throw(ForbiddenError("TOISegment : putData in committed buffer"));
148 }
149 checkInRange(sn);
150 data[sn-sn0] = d;
151 flags[sn-sn0] = f;
152}
153
154void TOISegmented::BufferSegment::incRefCount() {
155 pthread_mutex_lock(&refcount_mutex);
156 refcount++;
157 pthread_mutex_unlock(&refcount_mutex);
158}
159
160void TOISegmented::BufferSegment::decRefCount() {
161 pthread_mutex_lock(&refcount_mutex);
162 int nrc = --refcount;
163 pthread_mutex_unlock(&refcount_mutex);
164 if (nrc<0)
165 throw(ForbiddenError("TOISegment : buffer refcount < 0"));
166}
167
168int TOISegmented::BufferSegment::getRefCount() {
169 pthread_mutex_lock(&refcount_mutex);
170 int rc = refcount;
171 pthread_mutex_unlock(&refcount_mutex);
172 return rc;
173}
174
175
176/*******************************/
177/********** BufferView *********/
178/*******************************/
179
180TOISegmented::BufferView::BufferView(MasterView* m) {
181 master = m;
182 sn0 = -1;
183 segmentSize = m->segmentSize;
184 firstNeeded = -1;
185 waiting = false;
186}
187
188TOISegmented::BufferView::~BufferView() {
189}
190
191double TOISegmented::BufferView::getData(int sn) { /* Single-thread, reader thread*/
192 ensure(sn);
193 int seg = (sn-sn0)/segmentSize;
194 return segments[seg]->getData(sn);
195}
196
197uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread, reader thread */
198 ensure(sn);
199 int seg = (sn-sn0)/segmentSize;
200 return segments[seg]->getFlag(sn);
201}
202
203void TOISegmented::BufferView::ensure(int sn) { /* Single-thread, reader thread */
204 if (sn < sn0) {
205 LOG(cout << "TOISegmented::BufferView::ensure requested sample before first" << endl);
206 LOG(cout << "sn " << sn << " sn0 " << sn0 << endl);
207 abort();
208 }
209
210 if (sn0 < 0 ||
211 sn >= sn0 + segmentSize*segments.size()) {
212 LOG(cout << "BufferView " << hex << this << dec << ": read fault for " << sn << endl)
213 sync();
214 while (sn0<0 || sn >= sn0 + segmentSize*segments.size()) {
215 wait();
216 LOG(cout << "BufferView " << hex << this << dec << ": waiting for " << sn << endl)
217 sync();
218 }
219 LOG(cout << "BufferView " << hex << this << dec << ": resuming for " << sn
220 << " now data for " << sn0 << " - " << sn0 + segmentSize*segments.size()
221 << " in " << segments.size() << " segments " << endl)
222 }
223}
224
225void TOISegmented::BufferView::sync() { /* Single-thread, reader thread */
226 master->updateView(this); // update me !
227}
228
229void TOISegmented::BufferView::wait() { /* reader thread */
230 pthread_mutex_lock(&(master->read_wait_mutex));
231 waiting = true;
232 pthread_cond_wait(&(master->read_wait_condv), &(master->read_wait_mutex));
233 waiting = false;
234 pthread_mutex_unlock(&(master->read_wait_mutex));
235}
236
237
238void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */
239 if (sn > firstNeeded) {
240 firstNeeded = sn;
241 }
242}
243
244
245/*******************************/
246/********** MasterView *********/
247/*******************************/
248
249TOISegmented::MasterView::MasterView(int bufsz, int maxseg) {
250 currentSegment = NULL;
251 maxSegments = maxseg;
252 segmentSize = bufsz;
253 sn0 = -1;
254 nConsumers = 0;
255
256 pthread_mutex_init(&views_mutex, NULL);
257 pthread_mutex_init(&read_wait_mutex, NULL);
258 pthread_cond_init(&write_wait_condv, NULL);
259 pthread_cond_init(&read_wait_condv, NULL);
260 pthread_key_create(&buffer_key, BufferDestroy);
261
262 waitingOnWrite = false;
263}
264
265TOISegmented::MasterView::~MasterView() {
266 pthread_mutex_destroy(&views_mutex);
267 pthread_mutex_destroy(&read_wait_mutex);
268 pthread_cond_destroy(&write_wait_condv);
269 pthread_cond_destroy(&read_wait_condv);
270 pthread_key_delete(buffer_key);
271
272 // There should not be any BufferView left... Check ?
273
274 // decrement count for segments ?
275}
276
277void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { /* writer thread */
278 if (sn0<0) {
279 LOG(cout << "***MasterView::putData sn0<0" << endl)
280 sn0=sn;
281 }
282 // can fit in current segment ?
283 if (!(currentSegment != NULL &&
284 sn >= currentSegment->sn0 &&
285 sn < currentSegment->sn0 + currentSegment->bufferSize)) {
286 LOG(cout << "MasterView::putData, need extend for " << sn << endl)
287 nextSegment();
288 }
289 currentSegment->putData(sn, data, flags);
290}
291
292double TOISegmented::MasterView::getData(int sn) { /* reader thread */
293 return getView()->getData(sn); /* thread-specific */
294}
295
296uint_8 TOISegmented::MasterView::getFlag(int sn) { /* reader thread */
297 return getView()->getFlag(sn);
298}
299
300TOISegmented::BufferView* TOISegmented::MasterView::getView() { /* reader thread */
301 BufferView* bv = (BufferView*) pthread_getspecific(buffer_key);
302 if (bv == NULL) {
303 bv = createView();
304 LOG(cout << "creating new view " << hex << bv << dec << endl)
305 pthread_setspecific(buffer_key, bv);
306 }
307 return bv;
308}
309
310void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ /* views locked */
311 pthread_mutex_lock(&read_wait_mutex);
312 pthread_cond_broadcast(&read_wait_condv);
313 pthread_mutex_unlock(&read_wait_mutex);
314}
315
316void TOISegmented::MasterView::signalWrite() { /* reader thread */ /* views locked */
317 if (waitingOnWrite) {
318 LOG(cout << "MasterView : signal for wait on write" << endl)
319 pthread_cond_signal(&write_wait_condv); // only one thread can be sleeping
320 }
321}
322
323void TOISegmented::MasterView::putDone() {
324 nextSegment(); // cree un segment inutile, a nettoyer
325}
326
327void TOISegmented::MasterView::nextSegment() { /* writer thread */
328 // The current segment, if any, is now committed. A new
329 // blank buffer is allocated, if any.
330 pthread_mutex_lock(&views_mutex);
331
332 LOG(cout << "MasterView::nextSegment "
333 << segments.size()+1 << "/" << maxSegments << endl)
334
335 if (currentSegment != NULL) {
336 currentSegment->status = BufferSegment::COMMITTED;
337 segments.push_back(currentSegment);
338 }
339
340 currentSegment = NULL;
341 while (segments.size() >= maxSegments) {
342 waitForCleaning();
343 }
344
345 currentSegment = new BufferSegment(segmentSize);
346 signalWaitingViews(); // they can ask to be updated !!
347 pthread_mutex_unlock(&views_mutex);
348}
349
350void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ /* views locked */
351 LOG(cout << "MasterView : write wait for clean for " << sn0 << endl)
352 waitingOnWrite = true;
353 checkDeadLock();
354 pthread_cond_wait(&write_wait_condv, &views_mutex);
355 LOG(cout << "MasterView : wait done" << endl)
356}
357
358TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */
359 BufferView* bv = new BufferView(this);
360 pthread_mutex_lock(&views_mutex);
361 allViews.insert(bv);
362 pthread_mutex_unlock(&views_mutex);
363 updateView(bv);
364 return bv;
365}
366
367void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */
368 pthread_mutex_lock(&views_mutex);
369
370 int oldBegin = bv->sn0;
371 int oldEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
372
373 for (vector<BufferSegment*>::iterator i = bv->segments.begin();
374 i != bv->segments.end(); i++) {
375 (*i)->decRefCount();
376 }
377
378 bv->segments.clear();
379
380 // utiliser firstNeeded de toutes les vues pour faire le menage chez
381 // nous.
382
383 // A condition que tous les consumers se soient fait connaitre...
384
385 if (nConsumers == allViews.size()) {
386 int firstNeeded = MAXINT;
387 for (set<BufferView*>::iterator i = allViews.begin();
388 i != allViews.end(); i++) {
389 if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded;
390 }
391
392 LOG(cout << "MasterView : firstNeeded = " << firstNeeded << endl);
393
394 vector<BufferSegment*>::iterator j = segments.begin();
395 bool clean = false;
396 for (vector<BufferSegment*>::iterator i = segments.begin();
397 i != segments.end(); i++) {
398 if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 0)) {
399 clean = true;
400 j = i;
401 }
402 }
403 if (clean) {
404 segments.erase(segments.begin(),j);
405 sn0 = (*segments.begin())->sn0;
406 LOG(cout << "MasterView : purged until " << sn0 << endl);
407 }
408 } else {
409 LOG(cout << "MasterView : not yet all consumer thread known "<< allViews.size()
410 << "/" << nConsumers << endl);
411 }
412
413 for (vector<BufferSegment*>::iterator i = segments.begin();
414 i != segments.end(); i++) {
415 if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) {
416 (*i)->incRefCount();
417 bv->segments.push_back(*i);
418 }
419 }
420
421 bv->sn0 = -1;
422 int newEnd = -1;
423 if (segments.size() > 0) {
424 bv->sn0 = bv->segments[0]->sn0;
425 newEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
426 }
427
428 if (bv->sn0 > oldBegin) { // nettoyage de fait, reveiller le writer thread si besoin
429 signalWrite();
430 }
431
432 LOG(cout << "sync for " << hex << bv << dec << " : "
433 << oldBegin << " - " << oldEnd << " --> "
434 << bv->sn0 << " - " << newEnd << endl);
435
436 if (newEnd > oldEnd) { // Nouveautes, reveiller les reader threads si besoin
437 signalWaitingViews();
438 }
439 pthread_mutex_unlock(&views_mutex);
440}
441
442void TOISegmented::MasterView::checkDeadLock() { /* views locked */
443 // There is a possible deadlock if no view can free old segments
444 // and we are waiting for write.
445
446 // we need to record "wont need before" for each view, and
447 // signal deadlock if any view that needs first segment data is sleeping
448 // while we are asleep
449
450 pthread_mutex_lock(&read_wait_mutex);
451 if (!waitingOnWrite) {
452 pthread_mutex_unlock(&read_wait_mutex);
453 return; // no problem, there is an active writer
454 }
455
456 // Is any sleeping view needing our first segment ?
457
458 for (set<BufferView*>::iterator i=allViews.begin();
459 i != allViews.end(); i++) {
460 if ((*i)->waiting && (*i)->firstNeeded < sn0+segmentSize) {
461 cout << "**** DEADLOCK detected ****" << endl;
462 cout << "We are waiting on write (buffer is full)"<< endl;
463 cout << "but a waiting reader still needs our first segment" << endl;
464 cout << "restart with bigger buffers" << endl;
465 abort();
466 }
467 }
468 pthread_mutex_unlock(&read_wait_mutex);
469}
470
471
472
473void TOISegmented::MasterView::BufferDestroy(void* p) {
474 BufferView* bv = (BufferView*) p;
475 delete bv;
476}
Note: See TracBrowser for help on using the repository browser.