source: Sophya/trunk/ArchTOIPipe/Kernel/toisegment.cc@ 1689

Last change on this file since 1689 was 1689, checked in by aubourg, 24 years ago

segmented buffers...

File size: 11.3 KB
Line 
1#include "toisegment.h"
2
3/******************************/
4/******* TOISegmented *********/
5/******************************/
6
7TOISegmented::TOISegmented(int bufsz, int maxseg) {
8 master = new MasterView(bufsz, maxseg);
9 setName("TOISegmented");
10}
11
12TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) {
13 master = new MasterView(bufsz, maxseg);
14 setName(nm);
15}
16
17TOISegmented::~TOISegmented() {
18 delete master;
19}
20
21
22double TOISegmented::getData(int i) { /* reader thread */
23 return master->getData(i);
24}
25
26void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */
27 data = master->getData(i);
28 flag = master->getFlag(i);
29}
30
31void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */
32 master->putData(i, value, flag);
33}
34
35void TOISegmented::putDone() {
36 master->putDone();
37}
38
39void TOISegmented::wontNeedBefore(int i) { /* reader thread */
40 master->getView()->wontNeedBefore(i);
41}
42
43TOI::DataStatus TOISegmented::isDataAvail(int i, int j) {
44 // return master->getView()->isDataAvail(i, j);
45 cout << "TOISegmented::isDataAvail unimplemented" << endl;
46 throw PError("TOISegmented::isDataAvail unimplemented");
47}
48
49TOI::DataStatus TOISegmented::isDataAvail(int i) {
50 return isDataAvail(i,i);
51}
52
53TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) {
54 return isDataAvail(i,j);
55}
56
57void TOISegmented::waitForData(int iStart, int iEnd) {
58 // get will wait...
59}
60
61void TOISegmented::waitForData(int i) {
62 // get will wait...
63}
64
65void TOISegmented::waitForAnyData() {
66 cout << "TOISegmented::waitForAnyData unimplemented" << endl;
67 throw PError("TOISegmented::waitForAnyData unimplemented");
68}
69
70int TOISegmented::nextDataAvail(int iAfter) {
71 cout << "TOISegmented::nextDataAvail" << endl;
72 return iAfter+1;
73}
74
75bool TOISegmented::hasSomeData() {
76 cout << "TOISegmented::hasSomeData" << endl;
77 return true;
78}
79
80void TOISegmented::doPutData(int i, double value, uint_8 flag=0) {
81 cout << "TOISegmented::doPutData unimplemented" << endl;
82 throw PError("TOISegmented::doPutData unimplemented");
83}
84
85void TOISegmented::doGetData(int i, double& value, uint_8& flag) {
86 cout << "TOISegmented::doGetData unimplemented" << endl;
87 throw PError("TOISegmented::doGetData unimplemented");
88}
89
90
91/*******************************/
92/******* BufferSegment *********/
93/*******************************/
94
95TOISegmented::BufferSegment::BufferSegment(int sz) {
96 status = NEW;
97 bufferSize = sz;
98 sn0 = -1;
99
100 refcount = 0;
101
102 data = new double[sz];
103 flags = new uint_8[sz];
104
105 pthread_mutex_init(&refcount_mutex, NULL);
106}
107
108TOISegmented::BufferSegment::~BufferSegment() {
109 if (refcount > 0) {
110 throw(ForbiddenError("TOISegment : delete Buffer with refcount>0"));
111 }
112 delete[] data;
113 delete[] flags;
114 pthread_mutex_destroy(&refcount_mutex);
115}
116
117void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) {
118 /* writer thread*/
119 if (status == NEW) {
120 status = WRITE;
121 sn0 = sn;
122 }
123 if (status == COMMITTED) {
124 throw(ForbiddenError("TOISegment : putData in committed buffer"));
125 }
126 checkInRange(sn);
127 data[sn-sn0] = d;
128 flags[sn-sn0] = f;
129}
130
131void TOISegmented::BufferSegment::incRefCount() {
132 pthread_mutex_lock(&refcount_mutex);
133 refcount++;
134 pthread_mutex_unlock(&refcount_mutex);
135}
136
137void TOISegmented::BufferSegment::decRefCount() {
138 pthread_mutex_lock(&refcount_mutex);
139 int nrc = --refcount;
140 pthread_mutex_unlock(&refcount_mutex);
141 if (nrc<0)
142 throw(ForbiddenError("TOISegment : buffer refcount < 0"));
143}
144
145int TOISegmented::BufferSegment::getRefCount() {
146 pthread_mutex_lock(&refcount_mutex);
147 int rc = refcount;
148 pthread_mutex_unlock(&refcount_mutex);
149 return rc;
150}
151
152
153/*******************************/
154/********** BufferView *********/
155/*******************************/
156
157TOISegmented::BufferView::BufferView(MasterView* m) {
158 master = m;
159 sn0 = -1;
160 segmentSize = m->segmentSize;
161 firstNeeded = -1;
162 pthread_mutex_init(&mutex, NULL);
163 pthread_cond_init(&condv, NULL);
164}
165
166TOISegmented::BufferView::~BufferView() {
167 pthread_mutex_destroy(&mutex);
168 pthread_cond_destroy(&condv);
169}
170
171double TOISegmented::BufferView::getData(int sn) { /* Single-thread */
172 ensure(sn);
173 int seg = (sn-sn0)/segmentSize;
174 return segments[seg]->getData(sn);
175}
176
177uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread */
178 ensure(sn);
179 int seg = (sn-sn0)/segmentSize;
180 return segments[seg]->getFlag(sn);
181}
182
183void TOISegmented::BufferView::ensure(int sn) { /* Single-thread */
184 if (sn < sn0) {
185 throw RangeCheckError("requested sample before first");
186 }
187
188 if (sn >= sn0 + segmentSize*segments.size()) {
189 cout << "BufferView : read fault for " << sn << endl;
190 sync();
191 while (sn >= sn0 + segmentSize*segments.size()) {
192 wait();
193 cout << "BufferView : waiting for " << sn << endl;
194 sync();
195 }
196 cout << "BufferView : resuming for " << sn << endl;
197 }
198}
199
200void TOISegmented::BufferView::sync() { /* Single-thread */
201 master->updateView(this); // update me !
202}
203
204void TOISegmented::BufferView::wait() { /* From reader thread */
205 pthread_mutex_lock(&mutex);
206 master->addToWaitList(this); // needing wake-up call
207 pthread_cond_wait(&condv, &mutex);
208 pthread_mutex_unlock(&mutex);
209}
210
211void TOISegmented::BufferView::signal() { /* From masterview, writer thread */
212 pthread_mutex_lock(&mutex);
213 pthread_cond_signal(&condv); // only one thread can be sleeping
214 master->removeFromWaitList(this);
215 pthread_mutex_unlock(&mutex);
216}
217
218void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */
219 if (sn > firstNeeded) {
220 firstNeeded = sn;
221 }
222}
223
224
225/*******************************/
226/********** MasterView *********/
227/*******************************/
228
229TOISegmented::MasterView::MasterView(int bufsz, int maxseg) {
230 currentSegment = NULL;
231 maxSegments = maxseg;
232 segmentSize = bufsz;
233 sn0 = -1;
234
235 pthread_mutex_init(&views_mutex, NULL);
236 pthread_mutex_init(&write_mutex, NULL);
237 pthread_cond_init(&condv, NULL);
238 pthread_key_create(&buffer_key, BufferDestroy);
239
240 waitingOnWrite = false;
241}
242
243TOISegmented::MasterView::~MasterView() {
244 pthread_mutex_destroy(&views_mutex);
245 pthread_mutex_destroy(&write_mutex);
246 pthread_cond_destroy(&condv);
247 pthread_key_delete(buffer_key);
248
249 // There should not be any BufferView left... Check ?
250
251 // decrement count for segments ?
252}
253
254void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) {
255 if (sn0<0) sn0=sn;
256 // can fit in current segment ?
257 if (!(currentSegment != NULL &&
258 sn >= currentSegment->sn0 &&
259 sn < currentSegment->sn0 + currentSegment->bufferSize)) {
260 cout << "MasterView::putData, need extend for " << sn << endl;
261 nextSegment();
262 }
263 currentSegment->putData(sn, data, flags);
264}
265
266double TOISegmented::MasterView::getData(int sn) {
267 return getView()->getData(sn);
268}
269
270uint_8 TOISegmented::MasterView::getFlag(int sn) {
271 return getView()->getFlag(sn);
272}
273
274void TOISegmented::MasterView::addToWaitList(BufferView* bv) { /* reader thread */
275 // A view needs to wait for new data.
276
277
278 pthread_mutex_lock(&views_mutex);
279 waitingBuffers.insert(bv);
280 pthread_mutex_unlock(&views_mutex);
281 checkDeadLock();
282}
283
284void TOISegmented::MasterView::removeFromWaitList(BufferView* bv) { /* reader thread */
285 pthread_mutex_lock(&views_mutex);
286 waitingBuffers.erase(bv);
287 pthread_mutex_unlock(&views_mutex);
288}
289
290TOISegmented::BufferView* TOISegmented::MasterView::getView() { /* reader thread */
291 BufferView* bv = (BufferView*) pthread_getspecific(buffer_key);
292 if (bv == NULL) {
293 cout << "creating new view" << endl;
294 bv = createView();
295 pthread_setspecific(buffer_key, bv);
296 }
297 return bv;
298}
299
300void TOISegmented::MasterView::signalWaitingViews() { /* any thread */
301 pthread_mutex_lock(&views_mutex);
302 set<BufferView*> copyset = waitingBuffers;
303 pthread_mutex_unlock(&views_mutex);
304
305 for (set<BufferView*>::iterator i=copyset.begin();
306 i != copyset.end(); i++) {
307 (*i)->signal();
308 }
309}
310
311void TOISegmented::MasterView::signal() { /* reader thread */
312 pthread_mutex_lock(&write_mutex);
313 pthread_cond_signal(&condv); // only one thread can be sleeping
314 pthread_mutex_unlock(&write_mutex);
315}
316
317void TOISegmented::MasterView::putDone() {
318 nextSegment(); // cree un segment inutile, a nettoyer
319}
320
321void TOISegmented::MasterView::nextSegment() { /* writer thread */
322 // The current segment, if any, is now committed. A new
323 // blank buffer is allocated, if any.
324
325 cout << "MasterView::nextSegment "
326 << segments.size()+1 << "/" << maxSegments << endl;
327
328 if (currentSegment != NULL) {
329 currentSegment->status = BufferSegment::COMMITTED;
330 pthread_mutex_lock(&views_mutex);
331 segments.push_back(currentSegment);
332 pthread_mutex_unlock(&views_mutex);
333 }
334
335 currentSegment = NULL;
336 while (segments.size() >= maxSegments) {
337 waitForCleaning();
338 }
339
340 currentSegment = new BufferSegment(segmentSize);
341 signalWaitingViews(); // they can ask to be updated !!
342}
343
344void TOISegmented::MasterView::waitForCleaning() { /* writer thread */
345 cout << "MasterView : write wait for clean for " << sn0 << endl;
346 pthread_mutex_lock(&write_mutex);
347 waitingOnWrite = true;
348 checkDeadLock();
349 pthread_cond_wait(&condv, &write_mutex);
350 pthread_mutex_unlock(&write_mutex);
351 cout << "MasterView : wait done" << endl;
352}
353
354TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */
355 BufferView* bv = new BufferView(this);
356 updateView(bv);
357 return bv;
358}
359
360void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */
361 pthread_mutex_lock(&views_mutex);
362
363 int oldBegin = bv->sn0;
364 int oldEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
365
366 for (vector<BufferSegment*>::iterator i = bv->segments.begin();
367 i != bv->segments.end(); i++) {
368 (*i)->decRefCount();
369 }
370
371 bv->segments.clear();
372
373 // TODO : utiliser firstNeeded de toute les vues pour faire le menage chez
374 // nous.
375
376
377 for (vector<BufferSegment*>::iterator i = segments.begin();
378 i != segments.end(); i++) {
379 if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) {
380 (*i)->incRefCount();
381 bv->segments.push_back(*i);
382 }
383 }
384
385 bv->sn0 = -1;
386 int newEnd = -1;
387 if (segments.size() > 0) {
388 bv->sn0 = bv->segments[0]->sn0;
389 newEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
390 }
391
392 if (bv->sn0 > oldBegin && waitingOnWrite) {
393 signal();
394 }
395
396 cout << "sync " << sn0 << " - " << newEnd << endl;
397
398 pthread_mutex_unlock(&views_mutex);
399
400 if (newEnd > oldEnd) {
401 signalWaitingViews();
402 }
403}
404
405void TOISegmented::MasterView::checkDeadLock() {
406 // There is a possible deadlock if no view can free old segments
407 // and we are waiting for write.
408
409 // we need to record "wont need before" for each view, and
410 // signal deadlock if any view that needs first segment data is sleeping
411 // while we are asleep
412
413 if (!waitingOnWrite) return; // no problem, there is an active writer
414
415 // Is any sleeping view needing our first segment ?
416
417 pthread_mutex_lock(&views_mutex);
418 for (set<BufferView*>::iterator i=waitingBuffers.begin();
419 i != waitingBuffers.end(); i++) {
420 if ((*i)->firstNeeded < sn0+segmentSize) {
421 cout << "**** DEADLOCK detected ****" << endl;
422 cout << "We are waiting on write (buffer is full)"<< endl;
423 cout << "but a waiting reader still needs our first segment" << endl;
424 cout << "restart with bigger buffers" << endl;
425 abort();
426 }
427 }
428 pthread_mutex_unlock(&views_mutex);
429}
430
431
432
433void TOISegmented::MasterView::BufferDestroy(void* p) {
434 BufferView* bv = (BufferView*) p;
435 delete bv;
436}
Note: See TracBrowser for help on using the repository browser.