[2385] | 1 | // ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
|
---|
| 2 | // Eric Aubourg
|
---|
| 3 | // Christophe Magneville
|
---|
| 4 | // Reza Ansari
|
---|
| 5 | // $Id: ringpipe.cc,v 1.1 2003-05-19 23:31:29 aubourg Exp $
|
---|
| 6 |
|
---|
| 7 | #include "ringpipe.h"
|
---|
| 8 | #include "ringprocessor.h"
|
---|
| 9 |
|
---|
| 10 |
|
---|
| 11 | #ifdef WITH_SOPHYA
|
---|
| 12 | #include "pexceptions.h"
|
---|
| 13 | #else
|
---|
| 14 | #include "apexceptions.h"
|
---|
| 15 | #endif
|
---|
| 16 |
|
---|
| 17 | RingPipe::RingPipe() {
|
---|
| 18 | i0 = -1;
|
---|
| 19 | producer = NULL;
|
---|
| 20 | pthread_cond_init(&ringReady, NULL);
|
---|
| 21 | pthread_mutexattr_init(&mutattr);
|
---|
| 22 | pthread_mutex_init(&mutex, &mutattr);
|
---|
| 23 | }
|
---|
| 24 |
|
---|
| 25 | RingPipe::~RingPipe() {
|
---|
| 26 | pthread_mutex_destroy(&mutex);
|
---|
| 27 | pthread_cond_destroy(&ringReady);
|
---|
| 28 | }
|
---|
| 29 |
|
---|
| 30 | Ring const* RingPipe::getRing(int i) {
|
---|
| 31 | lock();
|
---|
| 32 | waitForRing(i);
|
---|
| 33 | Ring const* ring = data[i-i0];
|
---|
| 34 | unlock();
|
---|
| 35 | return ring;
|
---|
| 36 | }
|
---|
| 37 |
|
---|
| 38 | void RingPipe::putRing(int i, Ring const* ring) {
|
---|
| 39 | lock();
|
---|
| 40 | if (i0 == -1) {
|
---|
| 41 | data.insert(data.begin(), 1, (Ring const*) NULL);
|
---|
| 42 | i0 = i;
|
---|
| 43 | } else if (i<i0) {
|
---|
| 44 | data.insert(data.begin(), i0-i, (Ring const*) NULL);
|
---|
| 45 | i0 = i;
|
---|
| 46 | } else if (i>=i0+(int)data.size()) {
|
---|
| 47 | data.insert(data.end(), (long) (i-(i0+data.size())+1), (Ring const*)NULL);
|
---|
| 48 | }
|
---|
| 49 | data[i-i0] = ring;
|
---|
| 50 | notify();
|
---|
| 51 | unlock();
|
---|
| 52 | }
|
---|
| 53 |
|
---|
| 54 | void RingPipe::waitForRing(int i) {
|
---|
| 55 | if (producer == NULL) throw NotFoundExc("RingPipe has no producer !");
|
---|
| 56 |
|
---|
| 57 | DataStatus s = isRingAvail(i);
|
---|
| 58 | if (s == DATA_OK) {
|
---|
| 59 | return;
|
---|
| 60 | }
|
---|
| 61 | if (s == DATA_DELETED) {
|
---|
| 62 | throw NotFoundExc("Data has been deleted !");
|
---|
| 63 | }
|
---|
| 64 |
|
---|
| 65 | while (isRingAvail(i) == DATA_NOT_YET) {
|
---|
| 66 | wait();
|
---|
| 67 | }
|
---|
| 68 | return;
|
---|
| 69 | }
|
---|
| 70 |
|
---|
| 71 | void RingPipe::getRingRange(int& min, int& max) {
|
---|
| 72 | producer->getRingRange(min, max);
|
---|
| 73 | }
|
---|
| 74 |
|
---|
| 75 | RingPipe::DataStatus RingPipe::isRingAvail(int i) {
|
---|
| 76 | if (i >= i0 + (long)data.size()) return DATA_NOT_YET;
|
---|
| 77 | if (i < i0) return DATA_DELETED;
|
---|
| 78 | return DATA_OK;
|
---|
| 79 | }
|
---|
| 80 |
|
---|
| 81 | void RingPipe::wontNeedRingBefore(int i) {
|
---|
| 82 | int j=i;
|
---|
| 83 | for (vector<RingProcessor*>::iterator k = consumers.begin();
|
---|
| 84 | k != consumers.end(); k++) {
|
---|
| 85 | if ((*k)->getWontNeedBefore() < j) j = (*k)->getWontNeedBefore();
|
---|
| 86 | }
|
---|
| 87 | lock();
|
---|
| 88 | if (i >= i0 + (long)data.size())
|
---|
| 89 | i = i0 + (long)data.size() -1;
|
---|
| 90 | if (i > i0) {
|
---|
| 91 | data.erase(data.begin(), data.begin()+(i-i0));
|
---|
| 92 | i0=i;
|
---|
| 93 | }
|
---|
| 94 | unlock();
|
---|
| 95 | }
|
---|
| 96 |
|
---|
| 97 | void RingPipe::addConsumer(RingProcessor* cons) {
|
---|
| 98 | consumers.push_back(cons);
|
---|
| 99 | }
|
---|
| 100 |
|
---|
| 101 | void RingPipe::setProducer(RingProcessor* proc) {
|
---|
| 102 | if (producer)
|
---|
| 103 | throw DuplicateIdExc("RingPipe::setProducer : producer already defined");
|
---|
| 104 | producer = proc;
|
---|
| 105 | }
|
---|
| 106 |
|
---|