#include "toisegment.h" #ifndef MAXINT #define MAXINT 2147483647 #endif static pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER; static void cout_lock() {pthread_mutex_lock(&cout_mutex);} static void cout_unlock() {pthread_mutex_unlock(&cout_mutex);} #define LOG(_xxx_) \ cout_lock(); \ _xxx_; \ cout_unlock(); /******************************/ /******* TOISegmented *********/ /******************************/ TOISegmented::TOISegmented(int bufsz, int maxseg) { master = new MasterView(bufsz, maxseg); setName("TOISegmented"); } TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) { master = new MasterView(bufsz, maxseg); setName(nm); } TOISegmented::~TOISegmented() { delete master; } void TOISegmented::addConsumer(TOIProcessor* p) { TOI::addConsumer(p); master->nConsumers = consumers.size(); } double TOISegmented::getData(int i) { /* reader thread */ return master->getData(i); } void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */ data = master->getData(i); flag = master->getFlag(i); } void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */ master->putData(i, value, flag); } void TOISegmented::putDone() { master->putDone(); } void TOISegmented::wontNeedBefore(int i) { /* reader thread */ master->getView()->wontNeedBefore(i); } TOI::DataStatus TOISegmented::isDataAvail(int i, int j) { // return master->getView()->isDataAvail(i, j); cout << "TOISegmented::isDataAvail unimplemented" << endl; throw PError("TOISegmented::isDataAvail unimplemented"); } TOI::DataStatus TOISegmented::isDataAvail(int i) { return isDataAvail(i,i); } TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) { return isDataAvail(i,j); } void TOISegmented::waitForData(int iStart, int iEnd) { // get will wait... } void TOISegmented::waitForData(int i) { // get will wait... } void TOISegmented::waitForAnyData() { cout << "TOISegmented::waitForAnyData unimplemented" << endl; throw PError("TOISegmented::waitForAnyData unimplemented"); } int TOISegmented::nextDataAvail(int iAfter) { cout << "TOISegmented::nextDataAvail" << endl; return iAfter+1; } bool TOISegmented::hasSomeData() { cout << "TOISegmented::hasSomeData" << endl; return true; } void TOISegmented::doPutData(int i, double value, uint_8 flag) { cout << "TOISegmented::doPutData unimplemented" << endl; throw PError("TOISegmented::doPutData unimplemented"); } void TOISegmented::doGetData(int i, double& value, uint_8& flag) { cout << "TOISegmented::doGetData unimplemented" << endl; throw PError("TOISegmented::doGetData unimplemented"); } /*******************************/ /******* BufferSegment *********/ /*******************************/ TOISegmented::BufferSegment::BufferSegment(int sz) { status = NEW; bufferSize = sz; sn0 = -1; refcount = 0; data = new double[sz]; flags = new uint_8[sz]; pthread_mutex_init(&refcount_mutex, NULL); } TOISegmented::BufferSegment::~BufferSegment() { if (refcount > 0) { throw(ForbiddenError("TOISegment : delete Buffer with refcount>0")); } delete[] data; delete[] flags; pthread_mutex_destroy(&refcount_mutex); } void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) { /* writer thread*/ if (status == NEW) { status = WRITE; sn0 = sn; } if (status == COMMITTED) { throw(ForbiddenError("TOISegment : putData in committed buffer")); } checkInRange(sn); data[sn-sn0] = d; flags[sn-sn0] = f; } void TOISegmented::BufferSegment::incRefCount() { pthread_mutex_lock(&refcount_mutex); refcount++; pthread_mutex_unlock(&refcount_mutex); } void TOISegmented::BufferSegment::decRefCount() { pthread_mutex_lock(&refcount_mutex); int nrc = --refcount; pthread_mutex_unlock(&refcount_mutex); if (nrc<0) throw(ForbiddenError("TOISegment : buffer refcount < 0")); } int TOISegmented::BufferSegment::getRefCount() { pthread_mutex_lock(&refcount_mutex); int rc = refcount; pthread_mutex_unlock(&refcount_mutex); return rc; } /*******************************/ /********** BufferView *********/ /*******************************/ TOISegmented::BufferView::BufferView(MasterView* m) { master = m; sn0 = -1; segmentSize = m->segmentSize; firstNeeded = -1; waiting = false; } TOISegmented::BufferView::~BufferView() { } double TOISegmented::BufferView::getData(int sn) { /* Single-thread, reader thread*/ ensure(sn); int seg = (sn-sn0)/segmentSize; return segments[seg]->getData(sn); } uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread, reader thread */ ensure(sn); int seg = (sn-sn0)/segmentSize; return segments[seg]->getFlag(sn); } void TOISegmented::BufferView::ensure(int sn) { /* Single-thread, reader thread */ if (sn < sn0) { LOG(cout << "TOISegmented::BufferView::ensure requested sample before first" << endl); LOG(cout << "sn " << sn << " sn0 " << sn0 << endl); abort(); } if (sn0 < 0 || sn >= sn0 + segmentSize*segments.size()) { LOG(cout << "BufferView " << hex << this << dec << ": read fault for " << sn << endl) sync(); while (sn0<0 || sn >= sn0 + segmentSize*segments.size()) { wait(); LOG(cout << "BufferView " << hex << this << dec << ": waiting for " << sn << endl) sync(); } LOG(cout << "BufferView " << hex << this << dec << ": resuming for " << sn << " now data for " << sn0 << " - " << sn0 + segmentSize*segments.size() << " in " << segments.size() << " segments " << endl) } } void TOISegmented::BufferView::sync() { /* Single-thread, reader thread */ master->updateView(this); // update me ! } void TOISegmented::BufferView::wait() { /* reader thread */ pthread_mutex_lock(&(master->read_wait_mutex)); waiting = true; pthread_cond_wait(&(master->read_wait_condv), &(master->read_wait_mutex)); waiting = false; pthread_mutex_unlock(&(master->read_wait_mutex)); } void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */ if (sn > firstNeeded) { firstNeeded = sn; } } /*******************************/ /********** MasterView *********/ /*******************************/ TOISegmented::MasterView::MasterView(int bufsz, int maxseg) { currentSegment = NULL; maxSegments = maxseg; segmentSize = bufsz; sn0 = -1; nConsumers = 0; pthread_mutex_init(&views_mutex, NULL); pthread_mutex_init(&read_wait_mutex, NULL); pthread_cond_init(&write_wait_condv, NULL); pthread_cond_init(&read_wait_condv, NULL); pthread_key_create(&buffer_key, BufferDestroy); waitingOnWrite = false; } TOISegmented::MasterView::~MasterView() { pthread_mutex_destroy(&views_mutex); pthread_mutex_destroy(&read_wait_mutex); pthread_cond_destroy(&write_wait_condv); pthread_cond_destroy(&read_wait_condv); pthread_key_delete(buffer_key); // There should not be any BufferView left... Check ? // decrement count for segments ? } void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { /* writer thread */ if (sn0<0) { LOG(cout << "***MasterView::putData sn0<0" << endl) sn0=sn; } // can fit in current segment ? if (!(currentSegment != NULL && sn >= currentSegment->sn0 && sn < currentSegment->sn0 + currentSegment->bufferSize)) { LOG(cout << "MasterView::putData, need extend for " << sn << endl) nextSegment(); } currentSegment->putData(sn, data, flags); } double TOISegmented::MasterView::getData(int sn) { /* reader thread */ return getView()->getData(sn); /* thread-specific */ } uint_8 TOISegmented::MasterView::getFlag(int sn) { /* reader thread */ return getView()->getFlag(sn); } TOISegmented::BufferView* TOISegmented::MasterView::getView() { /* reader thread */ BufferView* bv = (BufferView*) pthread_getspecific(buffer_key); if (bv == NULL) { bv = createView(); LOG(cout << "creating new view " << hex << bv << dec << endl) pthread_setspecific(buffer_key, bv); } return bv; } void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ /* views locked */ pthread_mutex_lock(&read_wait_mutex); pthread_cond_broadcast(&read_wait_condv); pthread_mutex_unlock(&read_wait_mutex); } void TOISegmented::MasterView::signalWrite() { /* reader thread */ /* views locked */ if (waitingOnWrite) { LOG(cout << "MasterView : signal for wait on write" << endl) pthread_cond_signal(&write_wait_condv); // only one thread can be sleeping } } void TOISegmented::MasterView::putDone() { nextSegment(); // cree un segment inutile, a nettoyer } void TOISegmented::MasterView::nextSegment() { /* writer thread */ // The current segment, if any, is now committed. A new // blank buffer is allocated, if any. pthread_mutex_lock(&views_mutex); LOG(cout << "MasterView::nextSegment " << segments.size()+1 << "/" << maxSegments << endl) if (currentSegment != NULL) { currentSegment->status = BufferSegment::COMMITTED; segments.push_back(currentSegment); } currentSegment = NULL; while (segments.size() >= maxSegments) { waitForCleaning(); } currentSegment = new BufferSegment(segmentSize); signalWaitingViews(); // they can ask to be updated !! pthread_mutex_unlock(&views_mutex); } void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ /* views locked */ LOG(cout << "MasterView : write wait for clean for " << sn0 << endl) waitingOnWrite = true; checkDeadLock(); pthread_cond_wait(&write_wait_condv, &views_mutex); LOG(cout << "MasterView : wait done" << endl) } TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */ BufferView* bv = new BufferView(this); pthread_mutex_lock(&views_mutex); allViews.insert(bv); pthread_mutex_unlock(&views_mutex); updateView(bv); return bv; } void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */ pthread_mutex_lock(&views_mutex); int oldBegin = bv->sn0; int oldEnd = bv->sn0 + bv->segmentSize * bv->segments.size(); for (vector::iterator i = bv->segments.begin(); i != bv->segments.end(); i++) { (*i)->decRefCount(); } bv->segments.clear(); // utiliser firstNeeded de toutes les vues pour faire le menage chez // nous. // A condition que tous les consumers se soient fait connaitre... if (nConsumers == allViews.size()) { int firstNeeded = MAXINT; for (set::iterator i = allViews.begin(); i != allViews.end(); i++) { if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded; } LOG(cout << "MasterView : firstNeeded = " << firstNeeded << endl); vector::iterator j = segments.begin(); bool clean = false; for (vector::iterator i = segments.begin(); i != segments.end(); i++) { if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 0)) { clean = true; j = i; } } if (clean) { segments.erase(segments.begin(),j); sn0 = (*segments.begin())->sn0; LOG(cout << "MasterView : purged until " << sn0 << endl); } } else { LOG(cout << "MasterView : not yet all consumer thread known "<< allViews.size() << "/" << nConsumers << endl); } for (vector::iterator i = segments.begin(); i != segments.end(); i++) { if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) { (*i)->incRefCount(); bv->segments.push_back(*i); } } bv->sn0 = -1; int newEnd = -1; if (segments.size() > 0) { bv->sn0 = bv->segments[0]->sn0; newEnd = bv->sn0 + bv->segmentSize * bv->segments.size(); } if (bv->sn0 > oldBegin) { // nettoyage de fait, reveiller le writer thread si besoin signalWrite(); } LOG(cout << "sync for " << hex << bv << dec << " : " << oldBegin << " - " << oldEnd << " --> " << bv->sn0 << " - " << newEnd << endl); if (newEnd > oldEnd) { // Nouveautes, reveiller les reader threads si besoin signalWaitingViews(); } pthread_mutex_unlock(&views_mutex); } void TOISegmented::MasterView::checkDeadLock() { /* views locked */ // There is a possible deadlock if no view can free old segments // and we are waiting for write. // we need to record "wont need before" for each view, and // signal deadlock if any view that needs first segment data is sleeping // while we are asleep pthread_mutex_lock(&read_wait_mutex); if (!waitingOnWrite) { pthread_mutex_unlock(&read_wait_mutex); return; // no problem, there is an active writer } // Is any sleeping view needing our first segment ? for (set::iterator i=allViews.begin(); i != allViews.end(); i++) { if ((*i)->waiting && (*i)->firstNeeded < sn0+segmentSize) { cout << "**** DEADLOCK detected ****" << endl; cout << "We are waiting on write (buffer is full)"<< endl; cout << "but a waiting reader still needs our first segment" << endl; cout << "restart with bigger buffers" << endl; abort(); } } pthread_mutex_unlock(&read_wait_mutex); } void TOISegmented::MasterView::BufferDestroy(void* p) { BufferView* bv = (BufferView*) p; delete bv; }