source: Sophya/trunk/ArchTOIPipe/Kernel/toisegment.cc@ 1690

Last change on this file since 1690 was 1690, checked in by aubourg, 24 years ago

segmentes

File size: 12.0 KB
Line 
1#include "toisegment.h"
2
3#ifndef MAXINT
4#define MAXINT 2147483647
5#endif
6
7/******************************/
8/******* TOISegmented *********/
9/******************************/
10
11TOISegmented::TOISegmented(int bufsz, int maxseg) {
12 master = new MasterView(bufsz, maxseg);
13 setName("TOISegmented");
14}
15
16TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) {
17 master = new MasterView(bufsz, maxseg);
18 setName(nm);
19}
20
21TOISegmented::~TOISegmented() {
22 delete master;
23}
24
25
26double TOISegmented::getData(int i) { /* reader thread */
27 return master->getData(i);
28}
29
30void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */
31 data = master->getData(i);
32 flag = master->getFlag(i);
33}
34
35void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */
36 master->putData(i, value, flag);
37}
38
39void TOISegmented::putDone() {
40 master->putDone();
41}
42
43void TOISegmented::wontNeedBefore(int i) { /* reader thread */
44 master->getView()->wontNeedBefore(i);
45}
46
47TOI::DataStatus TOISegmented::isDataAvail(int i, int j) {
48 // return master->getView()->isDataAvail(i, j);
49 cout << "TOISegmented::isDataAvail unimplemented" << endl;
50 throw PError("TOISegmented::isDataAvail unimplemented");
51}
52
53TOI::DataStatus TOISegmented::isDataAvail(int i) {
54 return isDataAvail(i,i);
55}
56
57TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) {
58 return isDataAvail(i,j);
59}
60
61void TOISegmented::waitForData(int iStart, int iEnd) {
62 // get will wait...
63}
64
65void TOISegmented::waitForData(int i) {
66 // get will wait...
67}
68
69void TOISegmented::waitForAnyData() {
70 cout << "TOISegmented::waitForAnyData unimplemented" << endl;
71 throw PError("TOISegmented::waitForAnyData unimplemented");
72}
73
74int TOISegmented::nextDataAvail(int iAfter) {
75 cout << "TOISegmented::nextDataAvail" << endl;
76 return iAfter+1;
77}
78
79bool TOISegmented::hasSomeData() {
80 cout << "TOISegmented::hasSomeData" << endl;
81 return true;
82}
83
84void TOISegmented::doPutData(int i, double value, uint_8 flag=0) {
85 cout << "TOISegmented::doPutData unimplemented" << endl;
86 throw PError("TOISegmented::doPutData unimplemented");
87}
88
89void TOISegmented::doGetData(int i, double& value, uint_8& flag) {
90 cout << "TOISegmented::doGetData unimplemented" << endl;
91 throw PError("TOISegmented::doGetData unimplemented");
92}
93
94
95/*******************************/
96/******* BufferSegment *********/
97/*******************************/
98
99TOISegmented::BufferSegment::BufferSegment(int sz) {
100 status = NEW;
101 bufferSize = sz;
102 sn0 = -1;
103
104 refcount = 0;
105
106 data = new double[sz];
107 flags = new uint_8[sz];
108
109 pthread_mutex_init(&refcount_mutex, NULL);
110}
111
112TOISegmented::BufferSegment::~BufferSegment() {
113 if (refcount > 0) {
114 throw(ForbiddenError("TOISegment : delete Buffer with refcount>0"));
115 }
116 delete[] data;
117 delete[] flags;
118 pthread_mutex_destroy(&refcount_mutex);
119}
120
121void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) {
122 /* writer thread*/
123 if (status == NEW) {
124 status = WRITE;
125 sn0 = sn;
126 }
127 if (status == COMMITTED) {
128 throw(ForbiddenError("TOISegment : putData in committed buffer"));
129 }
130 checkInRange(sn);
131 data[sn-sn0] = d;
132 flags[sn-sn0] = f;
133}
134
135void TOISegmented::BufferSegment::incRefCount() {
136 pthread_mutex_lock(&refcount_mutex);
137 refcount++;
138 pthread_mutex_unlock(&refcount_mutex);
139}
140
141void TOISegmented::BufferSegment::decRefCount() {
142 pthread_mutex_lock(&refcount_mutex);
143 int nrc = --refcount;
144 pthread_mutex_unlock(&refcount_mutex);
145 if (nrc<0)
146 throw(ForbiddenError("TOISegment : buffer refcount < 0"));
147}
148
149int TOISegmented::BufferSegment::getRefCount() {
150 pthread_mutex_lock(&refcount_mutex);
151 int rc = refcount;
152 pthread_mutex_unlock(&refcount_mutex);
153 return rc;
154}
155
156
157/*******************************/
158/********** BufferView *********/
159/*******************************/
160
161TOISegmented::BufferView::BufferView(MasterView* m) {
162 master = m;
163 sn0 = -1;
164 segmentSize = m->segmentSize;
165 firstNeeded = -1;
166 pthread_mutex_init(&mutex, NULL);
167 pthread_cond_init(&condv, NULL);
168}
169
170TOISegmented::BufferView::~BufferView() {
171 pthread_mutex_destroy(&mutex);
172 pthread_cond_destroy(&condv);
173}
174
175double TOISegmented::BufferView::getData(int sn) { /* Single-thread */
176 ensure(sn);
177 int seg = (sn-sn0)/segmentSize;
178 return segments[seg]->getData(sn);
179}
180
181uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread */
182 ensure(sn);
183 int seg = (sn-sn0)/segmentSize;
184 return segments[seg]->getFlag(sn);
185}
186
187void TOISegmented::BufferView::ensure(int sn) { /* Single-thread */
188 if (sn < sn0) {
189 throw RangeCheckError("requested sample before first");
190 }
191
192 if (sn >= sn0 + segmentSize*segments.size()) {
193 cout << "BufferView : read fault for " << sn << endl;
194 sync();
195 while (sn >= sn0 + segmentSize*segments.size()) {
196 wait();
197 cout << "BufferView : waiting for " << sn << endl;
198 sync();
199 }
200 cout << "BufferView : resuming for " << sn << endl;
201 }
202}
203
204void TOISegmented::BufferView::sync() { /* Single-thread */
205 master->updateView(this); // update me !
206}
207
208void TOISegmented::BufferView::wait() { /* From reader thread */
209 pthread_mutex_lock(&mutex);
210 master->addToWaitList(this); // needing wake-up call
211 pthread_cond_wait(&condv, &mutex);
212 pthread_mutex_unlock(&mutex);
213}
214
215void TOISegmented::BufferView::signal() { /* From masterview, writer thread */
216 pthread_mutex_lock(&mutex);
217 pthread_cond_signal(&condv); // only one thread can be sleeping
218 master->removeFromWaitList(this);
219 pthread_mutex_unlock(&mutex);
220}
221
222void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */
223 if (sn > firstNeeded) {
224 firstNeeded = sn;
225 }
226}
227
228
229/*******************************/
230/********** MasterView *********/
231/*******************************/
232
233TOISegmented::MasterView::MasterView(int bufsz, int maxseg) {
234 currentSegment = NULL;
235 maxSegments = maxseg;
236 segmentSize = bufsz;
237 sn0 = -1;
238
239 pthread_mutex_init(&views_mutex, NULL);
240 pthread_mutex_init(&write_mutex, NULL);
241 pthread_cond_init(&condv, NULL);
242 pthread_key_create(&buffer_key, BufferDestroy);
243
244 waitingOnWrite = false;
245}
246
247TOISegmented::MasterView::~MasterView() {
248 pthread_mutex_destroy(&views_mutex);
249 pthread_mutex_destroy(&write_mutex);
250 pthread_cond_destroy(&condv);
251 pthread_key_delete(buffer_key);
252
253 // There should not be any BufferView left... Check ?
254
255 // decrement count for segments ?
256}
257
258void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) {
259 if (sn0<0) sn0=sn;
260 // can fit in current segment ?
261 if (!(currentSegment != NULL &&
262 sn >= currentSegment->sn0 &&
263 sn < currentSegment->sn0 + currentSegment->bufferSize)) {
264 cout << "MasterView::putData, need extend for " << sn << endl;
265 nextSegment();
266 }
267 currentSegment->putData(sn, data, flags);
268}
269
270double TOISegmented::MasterView::getData(int sn) {
271 return getView()->getData(sn);
272}
273
274uint_8 TOISegmented::MasterView::getFlag(int sn) {
275 return getView()->getFlag(sn);
276}
277
278void TOISegmented::MasterView::addToWaitList(BufferView* bv) { /* reader thread */
279 // A view needs to wait for new data.
280
281
282 pthread_mutex_lock(&views_mutex);
283 waitingBuffers.insert(bv);
284 pthread_mutex_unlock(&views_mutex);
285 checkDeadLock();
286}
287
288void TOISegmented::MasterView::removeFromWaitList(BufferView* bv) { /* reader thread */
289 pthread_mutex_lock(&views_mutex);
290 waitingBuffers.erase(bv);
291 pthread_mutex_unlock(&views_mutex);
292}
293
294TOISegmented::BufferView* TOISegmented::MasterView::getView() { /* reader thread */
295 BufferView* bv = (BufferView*) pthread_getspecific(buffer_key);
296 if (bv == NULL) {
297 cout << "creating new view" << endl;
298 bv = createView();
299 pthread_setspecific(buffer_key, bv);
300 }
301 return bv;
302}
303
304void TOISegmented::MasterView::signalWaitingViews() { /* any thread */
305 pthread_mutex_lock(&views_mutex);
306 set<BufferView*> copyset = waitingBuffers;
307 pthread_mutex_unlock(&views_mutex);
308
309 for (set<BufferView*>::iterator i=copyset.begin();
310 i != copyset.end(); i++) {
311 (*i)->signal();
312 }
313}
314
315void TOISegmented::MasterView::signal() { /* reader thread */
316 pthread_mutex_lock(&write_mutex);
317 pthread_cond_signal(&condv); // only one thread can be sleeping
318 pthread_mutex_unlock(&write_mutex);
319}
320
321void TOISegmented::MasterView::putDone() {
322 nextSegment(); // cree un segment inutile, a nettoyer
323}
324
325void TOISegmented::MasterView::nextSegment() { /* writer thread */
326 // The current segment, if any, is now committed. A new
327 // blank buffer is allocated, if any.
328
329 cout << "MasterView::nextSegment "
330 << segments.size()+1 << "/" << maxSegments << endl;
331
332 if (currentSegment != NULL) {
333 currentSegment->status = BufferSegment::COMMITTED;
334 pthread_mutex_lock(&views_mutex);
335 segments.push_back(currentSegment);
336 pthread_mutex_unlock(&views_mutex);
337 }
338
339 currentSegment = NULL;
340 while (segments.size() >= maxSegments) {
341 waitForCleaning();
342 }
343
344 currentSegment = new BufferSegment(segmentSize);
345 signalWaitingViews(); // they can ask to be updated !!
346}
347
348void TOISegmented::MasterView::waitForCleaning() { /* writer thread */
349 cout << "MasterView : write wait for clean for " << sn0 << endl;
350 pthread_mutex_lock(&write_mutex);
351 waitingOnWrite = true;
352 checkDeadLock();
353 pthread_cond_wait(&condv, &write_mutex);
354 pthread_mutex_unlock(&write_mutex);
355 cout << "MasterView : wait done" << endl;
356}
357
358TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */
359 BufferView* bv = new BufferView(this);
360 allViews.insert(bv);
361 updateView(bv);
362 return bv;
363}
364
365void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */
366 pthread_mutex_lock(&views_mutex);
367
368 int oldBegin = bv->sn0;
369 int oldEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
370
371 for (vector<BufferSegment*>::iterator i = bv->segments.begin();
372 i != bv->segments.end(); i++) {
373 (*i)->decRefCount();
374 }
375
376 bv->segments.clear();
377
378 // utiliser firstNeeded de toutes les vues pour faire le menage chez
379 // nous.
380
381 int firstNeeded = MAXINT;
382 for (set<BufferView*>::iterator i = allViews.begin();
383 i != allViews.end(); i++) {
384 if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded;
385 }
386
387 cout << "firstNeeded = " << firstNeeded << endl;
388
389 vector<BufferSegment*>::iterator j = segments.begin();
390 bool clean = false;
391 for (vector<BufferSegment*>::iterator i = segments.begin();
392 i != segments.end(); i++) {
393 if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 0)) {
394 clean = true;
395 j = i;
396 } else {}
397 if (clean) {
398 segments.erase(segments.begin(),j);
399 sn0 = (*segments.begin())->sn0;
400 }
401 }
402
403 for (vector<BufferSegment*>::iterator i = segments.begin();
404 i != segments.end(); i++) {
405 if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) {
406 (*i)->incRefCount();
407 bv->segments.push_back(*i);
408 }
409 }
410
411 bv->sn0 = -1;
412 int newEnd = -1;
413 if (segments.size() > 0) {
414 bv->sn0 = bv->segments[0]->sn0;
415 newEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
416 }
417
418 if (bv->sn0 > oldBegin && waitingOnWrite) {
419 signal();
420 }
421
422 cout << "sync " << sn0 << " - " << newEnd << endl;
423
424 pthread_mutex_unlock(&views_mutex);
425
426 if (newEnd > oldEnd) {
427 signalWaitingViews();
428 }
429}
430
431void TOISegmented::MasterView::checkDeadLock() {
432 // There is a possible deadlock if no view can free old segments
433 // and we are waiting for write.
434
435 // we need to record "wont need before" for each view, and
436 // signal deadlock if any view that needs first segment data is sleeping
437 // while we are asleep
438
439 if (!waitingOnWrite) return; // no problem, there is an active writer
440
441 // Is any sleeping view needing our first segment ?
442
443 pthread_mutex_lock(&views_mutex);
444 for (set<BufferView*>::iterator i=waitingBuffers.begin();
445 i != waitingBuffers.end(); i++) {
446 if ((*i)->firstNeeded < sn0+segmentSize) {
447 cout << "**** DEADLOCK detected ****" << endl;
448 cout << "We are waiting on write (buffer is full)"<< endl;
449 cout << "but a waiting reader still needs our first segment" << endl;
450 cout << "restart with bigger buffers" << endl;
451 abort();
452 }
453 }
454 pthread_mutex_unlock(&views_mutex);
455}
456
457
458
459void TOISegmented::MasterView::BufferDestroy(void* p) {
460 BufferView* bv = (BufferView*) p;
461 delete bv;
462}
Note: See TracBrowser for help on using the repository browser.