#include "toisegment.h" /*******************************/ /******* BufferSegment *********/ /*******************************/ TOISegmented::BufferSegment::BufferSegment(int sz) { status = NEW; bufferSize = sz; sn0 = -1; refcount = 0; data = new double[sz]; flags = new uint_8[sz]; pthread_mutex_init(&refcount_mutex, NULL); } TOISegmented::BufferSegment::~BufferSegment() { if (refcount > 0) { throw(ForbiddenError("TOISegment : delete Buffer with refcount>0")); } delete[] data; delete[] flags; pthread_mutex_destroy(&refcount_mutex); } void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) { if (status == NEW) { status = WRITE; sn0 = sn; } if (status == COMMITTED) { throw(ForbiddenError("TOISegment : putData in committed buffer")); } checkInRange(sn); data[sn-sn0] = d; flags[sn-sn0] = f; } void TOISegmented::BufferSegment::incRefCount() { pthread_mutex_lock(&refcount_mutex); refcount++; pthread_mutex_unlock(&refcount_mutex); } void TOISegmented::BufferSegment::decRefCount() { pthread_mutex_lock(&refcount_mutex); int nrc = --refcount; pthread_mutex_unlock(&refcount_mutex); if (nrc<0) throw(ForbiddenError("TOISegment : buffer refcount < 0")); } int TOISegmented::BufferSegment::getRefCount() { pthread_mutex_lock(&refcount_mutex); int rc = refcount; pthread_mutex_unlock(&refcount_mutex); return rc; } /*******************************/ /********** BufferView *********/ /*******************************/ TOISegmented::BufferView::BufferView(MasterView* m) { master = m; sn0 = -1; segmentSize = m->segmentSize; pthread_mutex_init(&mutex, NULL); pthread_cond_init(&condv, NULL); } TOISegmented::BufferView::~BufferView() { pthread_mutex_destroy(&mutex); pthread_cond_destroy(&condv); } double TOISegmented::BufferView::getData(int sn) { /* Single-thread */ ensure(sn); int seg = (sn-sn0)/segmentSize; return segments[seg]->getData(sn); } uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread */ ensure(sn); int seg = (sn-sn0)/segmentSize; return segments[seg]->getFlag(sn); } void TOISegmented::BufferView::ensure(int sn) { /* Single-thread */ if (sn < sn0) { throw RangeCheckError("requested sample before first"); } if (sn >= sn0 + segmentSize*segments.size()) { cout << "BufferView : read fault for " << sn << endl; sync(); while (sn >= sn0 + segmentSize*segments.size()) { wait(); cout << "BufferView : waiting for " << sn << endl; sync(); } cout << "BufferView : resuming for " << sn << endl; } } void TOISegmented::BufferView::sync() { /* Single-thread */ master->updateView(this); // update me ! } void TOISegmented::BufferView::wait() { /* From reader thread */ pthread_mutex_lock(&mutex); master->addToWaitList(this); // needing wake-up call pthread_cond_wait(&condv, &mutex); pthread_mutex_unlock(&mutex); } void TOISegmented::BufferView::signal() { /* From masterview, writer thread */ pthread_mutex_lock(&mutex); pthread_cond_signal(&condv); // only one thread can be sleeping master->removeFromWaitList(this); pthread_mutex_unlock(&mutex); } /*******************************/ /********** MasterView *********/ /*******************************/ TOISegmented::MasterView::MasterView(int bufsz, int maxseg) { currentSegment = NULL; maxSegments = maxseg; segmentSize = bufsz; sn0 = -1; pthread_mutex_init(&views_mutex, NULL); pthread_mutex_init(&write_mutex, NULL); pthread_cond_init(&condv, NULL); pthread_key_create(&buffer_key, BufferDestroy); waitStatus = NO_WAIT; } TOISegmented::MasterView::~MasterView() { pthread_mutex_destroy(&views_mutex); pthread_mutex_destroy(&write_mutex); pthread_cond_destroy(&condv); pthread_key_delete(buffer_key); // There should not be any BufferView left... Check ? // decrement count for segments ? } void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { // can fit in current segment ? if (!(currentSegment != NULL && sn >= currentSegment->sn0 && sn < currentSegment->sn0 + currentSegment->bufferSize)) { nextSegment(); } currentSegment->putData(sn, data, flags); } double TOISegmented::MasterView::getData(int sn) { return getView()->getData(sn); } uint_8 TOISegmented::MasterView::getFlag(int sn) { return getView()->getFlag(sn); } void TOISegmented::MasterView::addToWaitList(BufferView* bv) { /* reader thread */ // A view needs to wait for new data. // There is a possible deadlock if no view can free old segments // and we are waiting for write. // we need to record "wont need before" for each view, and // signal deadlock if any view that needs first segment data is sleeping // while we are asleep pthread_mutex_lock(&views_mutex); waitingBuffers.insert(bv); pthread_mutex_unlock(&views_mutex); checkDeadLock(); } void TOISegmented::MasterView::removeFromWaitList(BufferView* bv) { /* reader thread */ pthread_mutex_lock(&views_mutex); waitingBuffers.erase(bv); pthread_mutex_unlock(&views_mutex); } TOISegmented::BufferView* TOISegmented::MasterView::getView() { /* reader thread */ BufferView* bv = (BufferView*) pthread_getspecific(buffer_key); if (bv == NULL) { bv = createView(); pthread_setspecific(buffer_key, bv); } return bv; }