// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL // Eric Aubourg // Christophe Magneville // Reza Ansari // $Id: ringpipe.cc,v 1.1 2003-05-19 23:31:29 aubourg Exp $ #include "ringpipe.h" #include "ringprocessor.h" #ifdef WITH_SOPHYA #include "pexceptions.h" #else #include "apexceptions.h" #endif RingPipe::RingPipe() { i0 = -1; producer = NULL; pthread_cond_init(&ringReady, NULL); pthread_mutexattr_init(&mutattr); pthread_mutex_init(&mutex, &mutattr); } RingPipe::~RingPipe() { pthread_mutex_destroy(&mutex); pthread_cond_destroy(&ringReady); } Ring const* RingPipe::getRing(int i) { lock(); waitForRing(i); Ring const* ring = data[i-i0]; unlock(); return ring; } void RingPipe::putRing(int i, Ring const* ring) { lock(); if (i0 == -1) { data.insert(data.begin(), 1, (Ring const*) NULL); i0 = i; } else if (i=i0+(int)data.size()) { data.insert(data.end(), (long) (i-(i0+data.size())+1), (Ring const*)NULL); } data[i-i0] = ring; notify(); unlock(); } void RingPipe::waitForRing(int i) { if (producer == NULL) throw NotFoundExc("RingPipe has no producer !"); DataStatus s = isRingAvail(i); if (s == DATA_OK) { return; } if (s == DATA_DELETED) { throw NotFoundExc("Data has been deleted !"); } while (isRingAvail(i) == DATA_NOT_YET) { wait(); } return; } void RingPipe::getRingRange(int& min, int& max) { producer->getRingRange(min, max); } RingPipe::DataStatus RingPipe::isRingAvail(int i) { if (i >= i0 + (long)data.size()) return DATA_NOT_YET; if (i < i0) return DATA_DELETED; return DATA_OK; } void RingPipe::wontNeedRingBefore(int i) { int j=i; for (vector::iterator k = consumers.begin(); k != consumers.end(); k++) { if ((*k)->getWontNeedBefore() < j) j = (*k)->getWontNeedBefore(); } lock(); if (i >= i0 + (long)data.size()) i = i0 + (long)data.size() -1; if (i > i0) { data.erase(data.begin(), data.begin()+(i-i0)); i0=i; } unlock(); } void RingPipe::addConsumer(RingProcessor* cons) { consumers.push_back(cons); } void RingPipe::setProducer(RingProcessor* proc) { if (producer) throw DuplicateIdExc("RingPipe::setProducer : producer already defined"); producer = proc; }