Ignore:
Timestamp:
Oct 14, 2001, 11:15:01 PM (24 years ago)
Author:
aubourg
Message:

thread debugging

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/ArchTOIPipe/Kernel/toisegment.cc

    r1690 r1692  
    55#endif
    66
     7static pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
     8static void cout_lock() {pthread_mutex_lock(&cout_mutex);}
     9static void cout_unlock() {pthread_mutex_unlock(&cout_mutex);}
     10#define LOG(_xxx_) \
     11cout_lock(); \
     12_xxx_; \
     13cout_unlock();
     14
     15
    716/******************************/
    817/******* TOISegmented *********/
     
    2130TOISegmented::~TOISegmented() {
    2231  delete master;
     32}
     33
     34
     35void TOISegmented::addConsumer(TOIProcessor* p) {
     36  TOI::addConsumer(p);
     37  master->nConsumers = consumers.size();
    2338}
    2439
     
    8297}
    8398
    84 void TOISegmented::doPutData(int i, double value, uint_8 flag=0) {
     99void TOISegmented::doPutData(int i, double value, uint_8 flag) {
    85100  cout << "TOISegmented::doPutData unimplemented" << endl;
    86101  throw PError("TOISegmented::doPutData unimplemented");
     
    164179  segmentSize = m->segmentSize;
    165180  firstNeeded = -1;
    166   pthread_mutex_init(&mutex, NULL);
    167   pthread_cond_init(&condv, NULL);
     181  waiting = false;
    168182}
    169183
    170184TOISegmented::BufferView::~BufferView() {
    171   pthread_mutex_destroy(&mutex);
    172   pthread_cond_destroy(&condv);
    173 }
    174 
    175 double TOISegmented::BufferView::getData(int sn) { /* Single-thread */
     185}
     186
     187double TOISegmented::BufferView::getData(int sn) { /* Single-thread, reader thread*/
    176188  ensure(sn);
    177189  int seg = (sn-sn0)/segmentSize;
     
    179191}
    180192
    181 uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread */
     193uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread, reader thread */
    182194  ensure(sn);
    183195  int seg = (sn-sn0)/segmentSize;
     
    185197}
    186198
    187 void  TOISegmented::BufferView::ensure(int sn) { /* Single-thread */
     199void  TOISegmented::BufferView::ensure(int sn) { /* Single-thread, reader thread */
    188200  if (sn < sn0) {
    189     throw RangeCheckError("requested sample before first");
    190   }
    191 
    192   if (sn >= sn0 + segmentSize*segments.size()) {
    193     cout << "BufferView : read fault for " << sn << endl;
     201    LOG(cout << "TOISegmented::BufferView::ensure requested sample before first" << endl);
     202    LOG(cout << "sn " << sn << " sn0 " << sn0 << endl);
     203    abort();
     204  }
     205
     206  if (sn0 < 0 ||
     207      sn >= sn0 + segmentSize*segments.size()) {
     208    LOG(cout << "BufferView " << hex << this << dec << ": read fault for " << sn << endl)
    194209    sync();
    195     while (sn >= sn0 + segmentSize*segments.size()) {
     210    while (sn0<0 || sn >= sn0 + segmentSize*segments.size()) {
    196211      wait();
    197       cout << "BufferView : waiting for " << sn << endl;
     212      LOG(cout << "BufferView " << hex << this << dec << ": waiting for " << sn << endl)
    198213      sync();
    199214    }
    200     cout << "BufferView : resuming for " << sn << endl;   
    201   }
    202 }
    203 
    204 void TOISegmented::BufferView::sync() { /* Single-thread */
     215    LOG(cout << "BufferView " << hex << this << dec << ": resuming for " << sn
     216        << " now data for " << sn0 << " - " << sn0 + segmentSize*segments.size()
     217        << " in " << segments.size() << " segments " << endl)
     218  }
     219}
     220
     221void TOISegmented::BufferView::sync() { /* Single-thread, reader thread */
    205222  master->updateView(this); // update me !
    206223}
    207224
    208 void TOISegmented::BufferView::wait() { /* From reader thread */
    209   pthread_mutex_lock(&mutex);
    210   master->addToWaitList(this); // needing wake-up call
    211   pthread_cond_wait(&condv, &mutex);
    212   pthread_mutex_unlock(&mutex);
    213 }
    214 
    215 void TOISegmented::BufferView::signal() { /* From masterview, writer thread */
    216   pthread_mutex_lock(&mutex);
    217   pthread_cond_signal(&condv); // only one thread can be sleeping
    218   master->removeFromWaitList(this);
    219   pthread_mutex_unlock(&mutex); 
    220 }
     225void TOISegmented::BufferView::wait() { /* reader thread */
     226  pthread_mutex_lock(&(master->read_wait_mutex));
     227  waiting = true;
     228  pthread_cond_wait(&(master->read_wait_condv), &(master->read_wait_mutex));
     229  waiting = false;
     230  pthread_mutex_unlock(&(master->read_wait_mutex));
     231}
     232
    221233
    222234void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */
     
    236248  segmentSize    = bufsz;
    237249  sn0            = -1;
     250  nConsumers = 0;
    238251 
    239252  pthread_mutex_init(&views_mutex, NULL);
    240   pthread_mutex_init(&write_mutex, NULL);
    241   pthread_cond_init(&condv, NULL);
     253  pthread_mutex_init(&read_wait_mutex, NULL);
     254  pthread_cond_init(&write_wait_condv, NULL);
     255  pthread_cond_init(&read_wait_condv, NULL);
    242256  pthread_key_create(&buffer_key, BufferDestroy);
    243257
     
    247261TOISegmented::MasterView::~MasterView() {
    248262  pthread_mutex_destroy(&views_mutex);
    249   pthread_mutex_destroy(&write_mutex);
    250   pthread_cond_destroy(&condv);
     263  pthread_mutex_destroy(&read_wait_mutex);
     264  pthread_cond_destroy(&write_wait_condv);
     265  pthread_cond_destroy(&read_wait_condv);
    251266  pthread_key_delete(buffer_key);
    252267
     
    256271}
    257272
    258 void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) {
    259   if (sn0<0) sn0=sn;
     273void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { /* writer thread */
     274  if (sn0<0) {
     275    LOG(cout << "***MasterView::putData sn0<0" << endl)
     276    sn0=sn;
     277  }
    260278  // can fit in current segment ?
    261279  if (!(currentSegment != NULL &&
    262280        sn >= currentSegment->sn0 &&
    263281        sn < currentSegment->sn0 + currentSegment->bufferSize)) {
    264     cout << "MasterView::putData, need extend for " << sn << endl;
     282    LOG(cout << "MasterView::putData, need extend for " << sn << endl)
    265283    nextSegment();
    266284  }
     
    268286}
    269287
    270 double TOISegmented::MasterView::getData(int sn) {
    271   return getView()->getData(sn);
    272 }
    273 
    274 uint_8 TOISegmented::MasterView::getFlag(int sn) {
     288double TOISegmented::MasterView::getData(int sn) { /* reader thread */
     289  return getView()->getData(sn); /* thread-specific */
     290}
     291
     292uint_8 TOISegmented::MasterView::getFlag(int sn) { /* reader thread */
    275293  return getView()->getFlag(sn);
    276 }
    277 
    278 void TOISegmented::MasterView::addToWaitList(BufferView* bv) { /* reader thread */
    279   // A view needs to wait for new data.
    280 
    281 
    282   pthread_mutex_lock(&views_mutex);
    283   waitingBuffers.insert(bv);
    284   pthread_mutex_unlock(&views_mutex);
    285   checkDeadLock();
    286 }
    287 
    288 void TOISegmented::MasterView::removeFromWaitList(BufferView* bv) { /* reader thread */
    289   pthread_mutex_lock(&views_mutex);
    290   waitingBuffers.erase(bv);
    291   pthread_mutex_unlock(&views_mutex); 
    292294}
    293295
     
    295297  BufferView* bv = (BufferView*) pthread_getspecific(buffer_key);
    296298  if (bv == NULL) {
    297     cout << "creating new view" << endl;
    298299    bv = createView();
     300    LOG(cout << "creating new view " << hex << bv << dec << endl)
    299301    pthread_setspecific(buffer_key, bv);
    300302  }
     
    302304}
    303305
    304 void TOISegmented::MasterView::signalWaitingViews() { /* any thread */
    305   pthread_mutex_lock(&views_mutex);
    306   set<BufferView*> copyset = waitingBuffers;
    307   pthread_mutex_unlock(&views_mutex);
    308 
    309   for (set<BufferView*>::iterator i=copyset.begin();
    310        i != copyset.end(); i++) {
    311     (*i)->signal();
    312   }
    313 }
    314 
    315 void TOISegmented::MasterView::signal() { /* reader thread */
    316   pthread_mutex_lock(&write_mutex);
    317   pthread_cond_signal(&condv); // only one thread can be sleeping
    318   pthread_mutex_unlock(&write_mutex); 
     306void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ /* views locked */
     307  pthread_mutex_lock(&read_wait_mutex);
     308  pthread_cond_broadcast(&read_wait_condv);
     309  pthread_mutex_unlock(&read_wait_mutex);
     310}
     311
     312void TOISegmented::MasterView::signalWrite() { /* reader thread */ /* views locked */
     313  if (waitingOnWrite) {
     314    LOG(cout << "MasterView : signal for wait on write" << endl)
     315    pthread_cond_signal(&write_wait_condv); // only one thread can be sleeping
     316  }
    319317}
    320318
     
    326324  // The current segment, if any, is now committed. A new
    327325  // blank buffer is allocated, if any.
    328 
    329   cout << "MasterView::nextSegment "
    330        << segments.size()+1 << "/" << maxSegments << endl;
     326  pthread_mutex_lock(&views_mutex);
     327
     328  LOG(cout << "MasterView::nextSegment "
     329      << segments.size()+1 << "/" << maxSegments << endl)
    331330
    332331  if (currentSegment != NULL) {
    333332    currentSegment->status = BufferSegment::COMMITTED;
    334     pthread_mutex_lock(&views_mutex);
    335333    segments.push_back(currentSegment);
    336     pthread_mutex_unlock(&views_mutex);
    337334  }
    338335
     
    344341  currentSegment = new BufferSegment(segmentSize);
    345342  signalWaitingViews(); // they can ask to be updated !!
    346 }
    347 
    348 void TOISegmented::MasterView::waitForCleaning() { /* writer thread */
    349   cout << "MasterView : write wait for clean for " << sn0 << endl;
    350   pthread_mutex_lock(&write_mutex);
     343  pthread_mutex_unlock(&views_mutex);
     344}
     345
     346void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ /* views locked */
     347  LOG(cout << "MasterView : write wait for clean for " << sn0 << endl)
    351348  waitingOnWrite = true;
    352349  checkDeadLock();
    353   pthread_cond_wait(&condv, &write_mutex);
    354   pthread_mutex_unlock(&write_mutex);
    355   cout << "MasterView : wait done" << endl;
     350  pthread_cond_wait(&write_wait_condv, &views_mutex);
     351  LOG(cout << "MasterView : wait done" << endl)
    356352}
    357353
    358354TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */
    359355  BufferView* bv =  new BufferView(this);
     356  pthread_mutex_lock(&views_mutex);
    360357  allViews.insert(bv);
     358  pthread_mutex_unlock(&views_mutex);
    361359  updateView(bv);
    362360  return bv;
     
    379377  // nous.
    380378 
    381   int firstNeeded = MAXINT;
    382   for (set<BufferView*>::iterator i = allViews.begin();
    383        i != allViews.end(); i++) {
    384     if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded;
    385   }
    386 
    387   cout << "firstNeeded = " << firstNeeded << endl;
    388 
    389   vector<BufferSegment*>::iterator j = segments.begin();
    390   bool clean = false;
    391   for (vector<BufferSegment*>::iterator i = segments.begin();
    392        i != segments.end(); i++) {
    393     if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 0)) {
    394       clean = true;
    395       j = i;
    396     } else {}
     379  // A condition que tous les consumers se soient fait connaitre...
     380
     381  if (nConsumers == allViews.size()) {
     382    int firstNeeded = MAXINT;
     383    for (set<BufferView*>::iterator i = allViews.begin();
     384         i != allViews.end(); i++) {
     385      if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded;
     386    }
     387   
     388    LOG(cout << "MasterView : firstNeeded = " << firstNeeded << endl);
     389     
     390    vector<BufferSegment*>::iterator j = segments.begin();
     391    bool clean = false;
     392    for (vector<BufferSegment*>::iterator i = segments.begin();
     393         i != segments.end(); i++) {
     394      if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 0)) {
     395        clean = true;
     396        j = i;
     397      }
     398    }
    397399    if (clean) {
    398400      segments.erase(segments.begin(),j);
    399401      sn0 = (*segments.begin())->sn0;
    400     }
    401   }
     402      LOG(cout << "MasterView : purged until " << sn0 << endl);
     403    }
     404  } else {
     405    LOG(cout << "MasterView : not yet all consumer thread known "<< allViews.size()
     406        << "/" << nConsumers  << endl);
     407  }
    402408
    403409  for (vector<BufferSegment*>::iterator i = segments.begin();
     
    416422  }
    417423
    418   if (bv->sn0 > oldBegin && waitingOnWrite) {
    419     signal();
    420   }
    421 
    422   cout << "sync " << sn0 << " - " << newEnd << endl;
    423 
     424  if (bv->sn0 > oldBegin) { // nettoyage de fait, reveiller le writer thread si besoin
     425    signalWrite();
     426  }
     427
     428  LOG(cout << "sync for " << hex << bv << dec << " : "
     429      << oldBegin << " - " << oldEnd << "  -->  "
     430      << bv->sn0 << " - " << newEnd << endl);
     431 
     432  if (newEnd > oldEnd) { // Nouveautes, reveiller les reader threads si besoin
     433    signalWaitingViews();
     434  }
    424435  pthread_mutex_unlock(&views_mutex);
    425 
    426   if (newEnd > oldEnd) {
    427     signalWaitingViews();
    428   }
    429 }
    430 
    431 void TOISegmented::MasterView::checkDeadLock() {
     436}
     437
     438void TOISegmented::MasterView::checkDeadLock() { /* views locked */
    432439   // There is a possible deadlock if no view can free old segments
    433440  // and we are waiting for write.
     
    437444  // while we are asleep
    438445
    439   if (!waitingOnWrite) return; // no problem, there is an active writer
     446  pthread_mutex_lock(&read_wait_mutex);
     447  if (!waitingOnWrite) {
     448    pthread_mutex_unlock(&read_wait_mutex);
     449    return; // no problem, there is an active writer
     450  }
    440451
    441452  // Is any sleeping view needing our first segment ?
    442453
    443   pthread_mutex_lock(&views_mutex);
    444   for (set<BufferView*>::iterator i=waitingBuffers.begin();
    445        i != waitingBuffers.end(); i++) {
    446     if ((*i)->firstNeeded < sn0+segmentSize) {
     454  for (set<BufferView*>::iterator i=allViews.begin();
     455       i != allViews.end(); i++) {
     456    if ((*i)->waiting && (*i)->firstNeeded < sn0+segmentSize) {
    447457      cout << "**** DEADLOCK detected ****" << endl;
    448458      cout << "We are waiting on write (buffer is full)"<< endl;
     
    452462    }
    453463  }
    454   pthread_mutex_unlock(&views_mutex);
     464  pthread_mutex_unlock(&read_wait_mutex);
    455465}
    456466
Note: See TracChangeset for help on using the changeset viewer.