[2385] | 1 | // ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
|
---|
| 2 | // Eric Aubourg
|
---|
| 3 | // Christophe Magneville
|
---|
| 4 | // Reza Ansari
|
---|
[2386] | 5 | // $Id: ringpipe.cc,v 1.2 2003-05-20 10:10:08 aubourg Exp $
|
---|
[2385] | 6 |
|
---|
| 7 | #include "ringpipe.h"
|
---|
| 8 | #include "ringprocessor.h"
|
---|
| 9 |
|
---|
| 10 |
|
---|
| 11 | #ifdef WITH_SOPHYA
|
---|
| 12 | #include "pexceptions.h"
|
---|
| 13 | #else
|
---|
| 14 | #include "apexceptions.h"
|
---|
| 15 | #endif
|
---|
| 16 |
|
---|
[2386] | 17 | RingPipe::RingPipe(int ws) {
|
---|
[2385] | 18 | i0 = -1;
|
---|
[2386] | 19 | winSize = ws;
|
---|
[2385] | 20 | producer = NULL;
|
---|
| 21 | pthread_cond_init(&ringReady, NULL);
|
---|
| 22 | pthread_mutexattr_init(&mutattr);
|
---|
| 23 | pthread_mutex_init(&mutex, &mutattr);
|
---|
| 24 | }
|
---|
| 25 |
|
---|
| 26 | RingPipe::~RingPipe() {
|
---|
| 27 | pthread_mutex_destroy(&mutex);
|
---|
| 28 | pthread_cond_destroy(&ringReady);
|
---|
| 29 | }
|
---|
| 30 |
|
---|
| 31 | Ring const* RingPipe::getRing(int i) {
|
---|
| 32 | lock();
|
---|
| 33 | waitForRing(i);
|
---|
| 34 | Ring const* ring = data[i-i0];
|
---|
| 35 | unlock();
|
---|
| 36 | return ring;
|
---|
| 37 | }
|
---|
| 38 |
|
---|
| 39 | void RingPipe::putRing(int i, Ring const* ring) {
|
---|
| 40 | lock();
|
---|
[2386] | 41 | waitForRoom(i);
|
---|
[2385] | 42 | if (i0 == -1) {
|
---|
| 43 | data.insert(data.begin(), 1, (Ring const*) NULL);
|
---|
| 44 | i0 = i;
|
---|
| 45 | } else if (i<i0) {
|
---|
| 46 | data.insert(data.begin(), i0-i, (Ring const*) NULL);
|
---|
| 47 | i0 = i;
|
---|
| 48 | } else if (i>=i0+(int)data.size()) {
|
---|
| 49 | data.insert(data.end(), (long) (i-(i0+data.size())+1), (Ring const*)NULL);
|
---|
| 50 | }
|
---|
| 51 | data[i-i0] = ring;
|
---|
| 52 | notify();
|
---|
| 53 | unlock();
|
---|
| 54 | }
|
---|
| 55 |
|
---|
[2386] | 56 | void RingPipe::waitForRoom(int j) {
|
---|
| 57 | while (j-i0 >= winSize) {
|
---|
| 58 | wait();
|
---|
| 59 | }
|
---|
| 60 | }
|
---|
| 61 |
|
---|
[2385] | 62 | void RingPipe::waitForRing(int i) {
|
---|
| 63 | if (producer == NULL) throw NotFoundExc("RingPipe has no producer !");
|
---|
| 64 |
|
---|
| 65 | DataStatus s = isRingAvail(i);
|
---|
| 66 | if (s == DATA_OK) {
|
---|
| 67 | return;
|
---|
| 68 | }
|
---|
| 69 | if (s == DATA_DELETED) {
|
---|
| 70 | throw NotFoundExc("Data has been deleted !");
|
---|
| 71 | }
|
---|
| 72 |
|
---|
| 73 | while (isRingAvail(i) == DATA_NOT_YET) {
|
---|
| 74 | wait();
|
---|
| 75 | }
|
---|
| 76 | return;
|
---|
| 77 | }
|
---|
| 78 |
|
---|
| 79 | void RingPipe::getRingRange(int& min, int& max) {
|
---|
| 80 | producer->getRingRange(min, max);
|
---|
| 81 | }
|
---|
| 82 |
|
---|
| 83 | RingPipe::DataStatus RingPipe::isRingAvail(int i) {
|
---|
| 84 | if (i >= i0 + (long)data.size()) return DATA_NOT_YET;
|
---|
| 85 | if (i < i0) return DATA_DELETED;
|
---|
| 86 | return DATA_OK;
|
---|
| 87 | }
|
---|
| 88 |
|
---|
| 89 | void RingPipe::wontNeedRingBefore(int i) {
|
---|
| 90 | int j=i;
|
---|
| 91 | for (vector<RingProcessor*>::iterator k = consumers.begin();
|
---|
| 92 | k != consumers.end(); k++) {
|
---|
| 93 | if ((*k)->getWontNeedBefore() < j) j = (*k)->getWontNeedBefore();
|
---|
| 94 | }
|
---|
| 95 | lock();
|
---|
| 96 | if (i >= i0 + (long)data.size())
|
---|
| 97 | i = i0 + (long)data.size() -1;
|
---|
| 98 | if (i > i0) {
|
---|
| 99 | data.erase(data.begin(), data.begin()+(i-i0));
|
---|
| 100 | i0=i;
|
---|
[2386] | 101 | notify();
|
---|
[2385] | 102 | }
|
---|
| 103 | unlock();
|
---|
| 104 | }
|
---|
| 105 |
|
---|
| 106 | void RingPipe::addConsumer(RingProcessor* cons) {
|
---|
| 107 | consumers.push_back(cons);
|
---|
| 108 | }
|
---|
| 109 |
|
---|
| 110 | void RingPipe::setProducer(RingProcessor* proc) {
|
---|
| 111 | if (producer)
|
---|
| 112 | throw DuplicateIdExc("RingPipe::setProducer : producer already defined");
|
---|
| 113 | producer = proc;
|
---|
| 114 | }
|
---|
| 115 |
|
---|