#include "toisegment.h" /******************************/ /******* TOISegmented *********/ /******************************/ TOISegmented::TOISegmented(int bufsz, int maxseg) { master = new MasterView(bufsz, maxseg); setName("TOISegmented"); } TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) { master = new MasterView(bufsz, maxseg); setName(nm); } TOISegmented::~TOISegmented() { delete master; } double TOISegmented::getData(int i) { /* reader thread */ return master->getData(i); } void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */ data = master->getData(i); flag = master->getFlag(i); } void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */ master->putData(i, value, flag); } void TOISegmented::putDone() { master->putDone(); } void TOISegmented::wontNeedBefore(int i) { /* reader thread */ master->getView()->wontNeedBefore(i); } TOI::DataStatus TOISegmented::isDataAvail(int i, int j) { // return master->getView()->isDataAvail(i, j); cout << "TOISegmented::isDataAvail unimplemented" << endl; throw PError("TOISegmented::isDataAvail unimplemented"); } TOI::DataStatus TOISegmented::isDataAvail(int i) { return isDataAvail(i,i); } TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) { return isDataAvail(i,j); } void TOISegmented::waitForData(int iStart, int iEnd) { // get will wait... } void TOISegmented::waitForData(int i) { // get will wait... } void TOISegmented::waitForAnyData() { cout << "TOISegmented::waitForAnyData unimplemented" << endl; throw PError("TOISegmented::waitForAnyData unimplemented"); } int TOISegmented::nextDataAvail(int iAfter) { cout << "TOISegmented::nextDataAvail" << endl; return iAfter+1; } bool TOISegmented::hasSomeData() { cout << "TOISegmented::hasSomeData" << endl; return true; } void TOISegmented::doPutData(int i, double value, uint_8 flag=0) { cout << "TOISegmented::doPutData unimplemented" << endl; throw PError("TOISegmented::doPutData unimplemented"); } void TOISegmented::doGetData(int i, double& value, uint_8& flag) { cout << "TOISegmented::doGetData unimplemented" << endl; throw PError("TOISegmented::doGetData unimplemented"); } /*******************************/ /******* BufferSegment *********/ /*******************************/ TOISegmented::BufferSegment::BufferSegment(int sz) { status = NEW; bufferSize = sz; sn0 = -1; refcount = 0; data = new double[sz]; flags = new uint_8[sz]; pthread_mutex_init(&refcount_mutex, NULL); } TOISegmented::BufferSegment::~BufferSegment() { if (refcount > 0) { throw(ForbiddenError("TOISegment : delete Buffer with refcount>0")); } delete[] data; delete[] flags; pthread_mutex_destroy(&refcount_mutex); } void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) { /* writer thread*/ if (status == NEW) { status = WRITE; sn0 = sn; } if (status == COMMITTED) { throw(ForbiddenError("TOISegment : putData in committed buffer")); } checkInRange(sn); data[sn-sn0] = d; flags[sn-sn0] = f; } void TOISegmented::BufferSegment::incRefCount() { pthread_mutex_lock(&refcount_mutex); refcount++; pthread_mutex_unlock(&refcount_mutex); } void TOISegmented::BufferSegment::decRefCount() { pthread_mutex_lock(&refcount_mutex); int nrc = --refcount; pthread_mutex_unlock(&refcount_mutex); if (nrc<0) throw(ForbiddenError("TOISegment : buffer refcount < 0")); } int TOISegmented::BufferSegment::getRefCount() { pthread_mutex_lock(&refcount_mutex); int rc = refcount; pthread_mutex_unlock(&refcount_mutex); return rc; } /*******************************/ /********** BufferView *********/ /*******************************/ TOISegmented::BufferView::BufferView(MasterView* m) { master = m; sn0 = -1; segmentSize = m->segmentSize; firstNeeded = -1; pthread_mutex_init(&mutex, NULL); pthread_cond_init(&condv, NULL); } TOISegmented::BufferView::~BufferView() { pthread_mutex_destroy(&mutex); pthread_cond_destroy(&condv); } double TOISegmented::BufferView::getData(int sn) { /* Single-thread */ ensure(sn); int seg = (sn-sn0)/segmentSize; return segments[seg]->getData(sn); } uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread */ ensure(sn); int seg = (sn-sn0)/segmentSize; return segments[seg]->getFlag(sn); } void TOISegmented::BufferView::ensure(int sn) { /* Single-thread */ if (sn < sn0) { throw RangeCheckError("requested sample before first"); } if (sn >= sn0 + segmentSize*segments.size()) { cout << "BufferView : read fault for " << sn << endl; sync(); while (sn >= sn0 + segmentSize*segments.size()) { wait(); cout << "BufferView : waiting for " << sn << endl; sync(); } cout << "BufferView : resuming for " << sn << endl; } } void TOISegmented::BufferView::sync() { /* Single-thread */ master->updateView(this); // update me ! } void TOISegmented::BufferView::wait() { /* From reader thread */ pthread_mutex_lock(&mutex); master->addToWaitList(this); // needing wake-up call pthread_cond_wait(&condv, &mutex); pthread_mutex_unlock(&mutex); } void TOISegmented::BufferView::signal() { /* From masterview, writer thread */ pthread_mutex_lock(&mutex); pthread_cond_signal(&condv); // only one thread can be sleeping master->removeFromWaitList(this); pthread_mutex_unlock(&mutex); } void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */ if (sn > firstNeeded) { firstNeeded = sn; } } /*******************************/ /********** MasterView *********/ /*******************************/ TOISegmented::MasterView::MasterView(int bufsz, int maxseg) { currentSegment = NULL; maxSegments = maxseg; segmentSize = bufsz; sn0 = -1; pthread_mutex_init(&views_mutex, NULL); pthread_mutex_init(&write_mutex, NULL); pthread_cond_init(&condv, NULL); pthread_key_create(&buffer_key, BufferDestroy); waitingOnWrite = false; } TOISegmented::MasterView::~MasterView() { pthread_mutex_destroy(&views_mutex); pthread_mutex_destroy(&write_mutex); pthread_cond_destroy(&condv); pthread_key_delete(buffer_key); // There should not be any BufferView left... Check ? // decrement count for segments ? } void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { if (sn0<0) sn0=sn; // can fit in current segment ? if (!(currentSegment != NULL && sn >= currentSegment->sn0 && sn < currentSegment->sn0 + currentSegment->bufferSize)) { cout << "MasterView::putData, need extend for " << sn << endl; nextSegment(); } currentSegment->putData(sn, data, flags); } double TOISegmented::MasterView::getData(int sn) { return getView()->getData(sn); } uint_8 TOISegmented::MasterView::getFlag(int sn) { return getView()->getFlag(sn); } void TOISegmented::MasterView::addToWaitList(BufferView* bv) { /* reader thread */ // A view needs to wait for new data. pthread_mutex_lock(&views_mutex); waitingBuffers.insert(bv); pthread_mutex_unlock(&views_mutex); checkDeadLock(); } void TOISegmented::MasterView::removeFromWaitList(BufferView* bv) { /* reader thread */ pthread_mutex_lock(&views_mutex); waitingBuffers.erase(bv); pthread_mutex_unlock(&views_mutex); } TOISegmented::BufferView* TOISegmented::MasterView::getView() { /* reader thread */ BufferView* bv = (BufferView*) pthread_getspecific(buffer_key); if (bv == NULL) { cout << "creating new view" << endl; bv = createView(); pthread_setspecific(buffer_key, bv); } return bv; } void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ pthread_mutex_lock(&views_mutex); set copyset = waitingBuffers; pthread_mutex_unlock(&views_mutex); for (set::iterator i=copyset.begin(); i != copyset.end(); i++) { (*i)->signal(); } } void TOISegmented::MasterView::signal() { /* reader thread */ pthread_mutex_lock(&write_mutex); pthread_cond_signal(&condv); // only one thread can be sleeping pthread_mutex_unlock(&write_mutex); } void TOISegmented::MasterView::putDone() { nextSegment(); // cree un segment inutile, a nettoyer } void TOISegmented::MasterView::nextSegment() { /* writer thread */ // The current segment, if any, is now committed. A new // blank buffer is allocated, if any. cout << "MasterView::nextSegment " << segments.size()+1 << "/" << maxSegments << endl; if (currentSegment != NULL) { currentSegment->status = BufferSegment::COMMITTED; pthread_mutex_lock(&views_mutex); segments.push_back(currentSegment); pthread_mutex_unlock(&views_mutex); } currentSegment = NULL; while (segments.size() >= maxSegments) { waitForCleaning(); } currentSegment = new BufferSegment(segmentSize); signalWaitingViews(); // they can ask to be updated !! } void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ cout << "MasterView : write wait for clean for " << sn0 << endl; pthread_mutex_lock(&write_mutex); waitingOnWrite = true; checkDeadLock(); pthread_cond_wait(&condv, &write_mutex); pthread_mutex_unlock(&write_mutex); cout << "MasterView : wait done" << endl; } TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */ BufferView* bv = new BufferView(this); updateView(bv); return bv; } void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */ pthread_mutex_lock(&views_mutex); int oldBegin = bv->sn0; int oldEnd = bv->sn0 + bv->segmentSize * bv->segments.size(); for (vector::iterator i = bv->segments.begin(); i != bv->segments.end(); i++) { (*i)->decRefCount(); } bv->segments.clear(); // TODO : utiliser firstNeeded de toute les vues pour faire le menage chez // nous. for (vector::iterator i = segments.begin(); i != segments.end(); i++) { if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) { (*i)->incRefCount(); bv->segments.push_back(*i); } } bv->sn0 = -1; int newEnd = -1; if (segments.size() > 0) { bv->sn0 = bv->segments[0]->sn0; newEnd = bv->sn0 + bv->segmentSize * bv->segments.size(); } if (bv->sn0 > oldBegin && waitingOnWrite) { signal(); } cout << "sync " << sn0 << " - " << newEnd << endl; pthread_mutex_unlock(&views_mutex); if (newEnd > oldEnd) { signalWaitingViews(); } } void TOISegmented::MasterView::checkDeadLock() { // There is a possible deadlock if no view can free old segments // and we are waiting for write. // we need to record "wont need before" for each view, and // signal deadlock if any view that needs first segment data is sleeping // while we are asleep if (!waitingOnWrite) return; // no problem, there is an active writer // Is any sleeping view needing our first segment ? pthread_mutex_lock(&views_mutex); for (set::iterator i=waitingBuffers.begin(); i != waitingBuffers.end(); i++) { if ((*i)->firstNeeded < sn0+segmentSize) { cout << "**** DEADLOCK detected ****" << endl; cout << "We are waiting on write (buffer is full)"<< endl; cout << "but a waiting reader still needs our first segment" << endl; cout << "restart with bigger buffers" << endl; abort(); } } pthread_mutex_unlock(&views_mutex); } void TOISegmented::MasterView::BufferDestroy(void* p) { BufferView* bv = (BufferView*) p; delete bv; }