Changeset 1689 in Sophya for trunk/ArchTOIPipe/Kernel/toisegment.cc
- Timestamp:
- Oct 14, 2001, 1:56:14 AM (24 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/ArchTOIPipe/Kernel/toisegment.cc
r1686 r1689 1 1 #include "toisegment.h" 2 3 /******************************/ 4 /******* TOISegmented *********/ 5 /******************************/ 6 7 TOISegmented::TOISegmented(int bufsz, int maxseg) { 8 master = new MasterView(bufsz, maxseg); 9 setName("TOISegmented"); 10 } 11 12 TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) { 13 master = new MasterView(bufsz, maxseg); 14 setName(nm); 15 } 16 17 TOISegmented::~TOISegmented() { 18 delete master; 19 } 20 21 22 double TOISegmented::getData(int i) { /* reader thread */ 23 return master->getData(i); 24 } 25 26 void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */ 27 data = master->getData(i); 28 flag = master->getFlag(i); 29 } 30 31 void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */ 32 master->putData(i, value, flag); 33 } 34 35 void TOISegmented::putDone() { 36 master->putDone(); 37 } 38 39 void TOISegmented::wontNeedBefore(int i) { /* reader thread */ 40 master->getView()->wontNeedBefore(i); 41 } 42 43 TOI::DataStatus TOISegmented::isDataAvail(int i, int j) { 44 // return master->getView()->isDataAvail(i, j); 45 cout << "TOISegmented::isDataAvail unimplemented" << endl; 46 throw PError("TOISegmented::isDataAvail unimplemented"); 47 } 48 49 TOI::DataStatus TOISegmented::isDataAvail(int i) { 50 return isDataAvail(i,i); 51 } 52 53 TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) { 54 return isDataAvail(i,j); 55 } 56 57 void TOISegmented::waitForData(int iStart, int iEnd) { 58 // get will wait... 59 } 60 61 void TOISegmented::waitForData(int i) { 62 // get will wait... 63 } 64 65 void TOISegmented::waitForAnyData() { 66 cout << "TOISegmented::waitForAnyData unimplemented" << endl; 67 throw PError("TOISegmented::waitForAnyData unimplemented"); 68 } 69 70 int TOISegmented::nextDataAvail(int iAfter) { 71 cout << "TOISegmented::nextDataAvail" << endl; 72 return iAfter+1; 73 } 74 75 bool TOISegmented::hasSomeData() { 76 cout << "TOISegmented::hasSomeData" << endl; 77 return true; 78 } 79 80 void TOISegmented::doPutData(int i, double value, uint_8 flag=0) { 81 cout << "TOISegmented::doPutData unimplemented" << endl; 82 throw PError("TOISegmented::doPutData unimplemented"); 83 } 84 85 void TOISegmented::doGetData(int i, double& value, uint_8& flag) { 86 cout << "TOISegmented::doGetData unimplemented" << endl; 87 throw PError("TOISegmented::doGetData unimplemented"); 88 } 89 2 90 3 91 /*******************************/ … … 27 115 } 28 116 29 void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) { 117 void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) { 118 /* writer thread*/ 30 119 if (status == NEW) { 31 120 status = WRITE; … … 70 159 sn0 = -1; 71 160 segmentSize = m->segmentSize; 161 firstNeeded = -1; 72 162 pthread_mutex_init(&mutex, NULL); 73 163 pthread_cond_init(&condv, NULL); … … 124 214 master->removeFromWaitList(this); 125 215 pthread_mutex_unlock(&mutex); 216 } 217 218 void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */ 219 if (sn > firstNeeded) { 220 firstNeeded = sn; 221 } 126 222 } 127 223 … … 142 238 pthread_key_create(&buffer_key, BufferDestroy); 143 239 144 wait Status = NO_WAIT;240 waitingOnWrite = false; 145 241 } 146 242 … … 157 253 158 254 void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { 255 if (sn0<0) sn0=sn; 159 256 // can fit in current segment ? 160 257 if (!(currentSegment != NULL && 161 258 sn >= currentSegment->sn0 && 162 259 sn < currentSegment->sn0 + currentSegment->bufferSize)) { 260 cout << "MasterView::putData, need extend for " << sn << endl; 163 261 nextSegment(); 164 262 } … … 177 275 // A view needs to wait for new data. 178 276 179 // There is a possible deadlock if no view can free old segments180 // and we are waiting for write.181 182 // we need to record "wont need before" for each view, and183 // signal deadlock if any view that needs first segment data is sleeping184 // while we are asleep185 277 186 278 pthread_mutex_lock(&views_mutex); … … 199 291 BufferView* bv = (BufferView*) pthread_getspecific(buffer_key); 200 292 if (bv == NULL) { 293 cout << "creating new view" << endl; 201 294 bv = createView(); 202 295 pthread_setspecific(buffer_key, bv); … … 204 297 return bv; 205 298 } 299 300 void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ 301 pthread_mutex_lock(&views_mutex); 302 set<BufferView*> copyset = waitingBuffers; 303 pthread_mutex_unlock(&views_mutex); 304 305 for (set<BufferView*>::iterator i=copyset.begin(); 306 i != copyset.end(); i++) { 307 (*i)->signal(); 308 } 309 } 310 311 void TOISegmented::MasterView::signal() { /* reader thread */ 312 pthread_mutex_lock(&write_mutex); 313 pthread_cond_signal(&condv); // only one thread can be sleeping 314 pthread_mutex_unlock(&write_mutex); 315 } 316 317 void TOISegmented::MasterView::putDone() { 318 nextSegment(); // cree un segment inutile, a nettoyer 319 } 320 321 void TOISegmented::MasterView::nextSegment() { /* writer thread */ 322 // The current segment, if any, is now committed. A new 323 // blank buffer is allocated, if any. 324 325 cout << "MasterView::nextSegment " 326 << segments.size()+1 << "/" << maxSegments << endl; 327 328 if (currentSegment != NULL) { 329 currentSegment->status = BufferSegment::COMMITTED; 330 pthread_mutex_lock(&views_mutex); 331 segments.push_back(currentSegment); 332 pthread_mutex_unlock(&views_mutex); 333 } 334 335 currentSegment = NULL; 336 while (segments.size() >= maxSegments) { 337 waitForCleaning(); 338 } 339 340 currentSegment = new BufferSegment(segmentSize); 341 signalWaitingViews(); // they can ask to be updated !! 342 } 343 344 void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ 345 cout << "MasterView : write wait for clean for " << sn0 << endl; 346 pthread_mutex_lock(&write_mutex); 347 waitingOnWrite = true; 348 checkDeadLock(); 349 pthread_cond_wait(&condv, &write_mutex); 350 pthread_mutex_unlock(&write_mutex); 351 cout << "MasterView : wait done" << endl; 352 } 353 354 TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */ 355 BufferView* bv = new BufferView(this); 356 updateView(bv); 357 return bv; 358 } 359 360 void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */ 361 pthread_mutex_lock(&views_mutex); 362 363 int oldBegin = bv->sn0; 364 int oldEnd = bv->sn0 + bv->segmentSize * bv->segments.size(); 365 366 for (vector<BufferSegment*>::iterator i = bv->segments.begin(); 367 i != bv->segments.end(); i++) { 368 (*i)->decRefCount(); 369 } 370 371 bv->segments.clear(); 372 373 // TODO : utiliser firstNeeded de toute les vues pour faire le menage chez 374 // nous. 375 376 377 for (vector<BufferSegment*>::iterator i = segments.begin(); 378 i != segments.end(); i++) { 379 if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) { 380 (*i)->incRefCount(); 381 bv->segments.push_back(*i); 382 } 383 } 384 385 bv->sn0 = -1; 386 int newEnd = -1; 387 if (segments.size() > 0) { 388 bv->sn0 = bv->segments[0]->sn0; 389 newEnd = bv->sn0 + bv->segmentSize * bv->segments.size(); 390 } 391 392 if (bv->sn0 > oldBegin && waitingOnWrite) { 393 signal(); 394 } 395 396 cout << "sync " << sn0 << " - " << newEnd << endl; 397 398 pthread_mutex_unlock(&views_mutex); 399 400 if (newEnd > oldEnd) { 401 signalWaitingViews(); 402 } 403 } 404 405 void TOISegmented::MasterView::checkDeadLock() { 406 // There is a possible deadlock if no view can free old segments 407 // and we are waiting for write. 408 409 // we need to record "wont need before" for each view, and 410 // signal deadlock if any view that needs first segment data is sleeping 411 // while we are asleep 412 413 if (!waitingOnWrite) return; // no problem, there is an active writer 414 415 // Is any sleeping view needing our first segment ? 416 417 pthread_mutex_lock(&views_mutex); 418 for (set<BufferView*>::iterator i=waitingBuffers.begin(); 419 i != waitingBuffers.end(); i++) { 420 if ((*i)->firstNeeded < sn0+segmentSize) { 421 cout << "**** DEADLOCK detected ****" << endl; 422 cout << "We are waiting on write (buffer is full)"<< endl; 423 cout << "but a waiting reader still needs our first segment" << endl; 424 cout << "restart with bigger buffers" << endl; 425 abort(); 426 } 427 } 428 pthread_mutex_unlock(&views_mutex); 429 } 430 431 432 433 void TOISegmented::MasterView::BufferDestroy(void* p) { 434 BufferView* bv = (BufferView*) p; 435 delete bv; 436 }
Note:
See TracChangeset
for help on using the changeset viewer.