1 | // ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
|
---|
2 | // Eric Aubourg
|
---|
3 | // Christophe Magneville
|
---|
4 | // Reza Ansari
|
---|
5 | // $Id: ringpipe.cc,v 1.1 2003-05-19 23:31:29 aubourg Exp $
|
---|
6 |
|
---|
7 | #include "ringpipe.h"
|
---|
8 | #include "ringprocessor.h"
|
---|
9 |
|
---|
10 |
|
---|
11 | #ifdef WITH_SOPHYA
|
---|
12 | #include "pexceptions.h"
|
---|
13 | #else
|
---|
14 | #include "apexceptions.h"
|
---|
15 | #endif
|
---|
16 |
|
---|
17 | RingPipe::RingPipe() {
|
---|
18 | i0 = -1;
|
---|
19 | producer = NULL;
|
---|
20 | pthread_cond_init(&ringReady, NULL);
|
---|
21 | pthread_mutexattr_init(&mutattr);
|
---|
22 | pthread_mutex_init(&mutex, &mutattr);
|
---|
23 | }
|
---|
24 |
|
---|
25 | RingPipe::~RingPipe() {
|
---|
26 | pthread_mutex_destroy(&mutex);
|
---|
27 | pthread_cond_destroy(&ringReady);
|
---|
28 | }
|
---|
29 |
|
---|
30 | Ring const* RingPipe::getRing(int i) {
|
---|
31 | lock();
|
---|
32 | waitForRing(i);
|
---|
33 | Ring const* ring = data[i-i0];
|
---|
34 | unlock();
|
---|
35 | return ring;
|
---|
36 | }
|
---|
37 |
|
---|
38 | void RingPipe::putRing(int i, Ring const* ring) {
|
---|
39 | lock();
|
---|
40 | if (i0 == -1) {
|
---|
41 | data.insert(data.begin(), 1, (Ring const*) NULL);
|
---|
42 | i0 = i;
|
---|
43 | } else if (i<i0) {
|
---|
44 | data.insert(data.begin(), i0-i, (Ring const*) NULL);
|
---|
45 | i0 = i;
|
---|
46 | } else if (i>=i0+(int)data.size()) {
|
---|
47 | data.insert(data.end(), (long) (i-(i0+data.size())+1), (Ring const*)NULL);
|
---|
48 | }
|
---|
49 | data[i-i0] = ring;
|
---|
50 | notify();
|
---|
51 | unlock();
|
---|
52 | }
|
---|
53 |
|
---|
54 | void RingPipe::waitForRing(int i) {
|
---|
55 | if (producer == NULL) throw NotFoundExc("RingPipe has no producer !");
|
---|
56 |
|
---|
57 | DataStatus s = isRingAvail(i);
|
---|
58 | if (s == DATA_OK) {
|
---|
59 | return;
|
---|
60 | }
|
---|
61 | if (s == DATA_DELETED) {
|
---|
62 | throw NotFoundExc("Data has been deleted !");
|
---|
63 | }
|
---|
64 |
|
---|
65 | while (isRingAvail(i) == DATA_NOT_YET) {
|
---|
66 | wait();
|
---|
67 | }
|
---|
68 | return;
|
---|
69 | }
|
---|
70 |
|
---|
71 | void RingPipe::getRingRange(int& min, int& max) {
|
---|
72 | producer->getRingRange(min, max);
|
---|
73 | }
|
---|
74 |
|
---|
75 | RingPipe::DataStatus RingPipe::isRingAvail(int i) {
|
---|
76 | if (i >= i0 + (long)data.size()) return DATA_NOT_YET;
|
---|
77 | if (i < i0) return DATA_DELETED;
|
---|
78 | return DATA_OK;
|
---|
79 | }
|
---|
80 |
|
---|
81 | void RingPipe::wontNeedRingBefore(int i) {
|
---|
82 | int j=i;
|
---|
83 | for (vector<RingProcessor*>::iterator k = consumers.begin();
|
---|
84 | k != consumers.end(); k++) {
|
---|
85 | if ((*k)->getWontNeedBefore() < j) j = (*k)->getWontNeedBefore();
|
---|
86 | }
|
---|
87 | lock();
|
---|
88 | if (i >= i0 + (long)data.size())
|
---|
89 | i = i0 + (long)data.size() -1;
|
---|
90 | if (i > i0) {
|
---|
91 | data.erase(data.begin(), data.begin()+(i-i0));
|
---|
92 | i0=i;
|
---|
93 | }
|
---|
94 | unlock();
|
---|
95 | }
|
---|
96 |
|
---|
97 | void RingPipe::addConsumer(RingProcessor* cons) {
|
---|
98 | consumers.push_back(cons);
|
---|
99 | }
|
---|
100 |
|
---|
101 | void RingPipe::setProducer(RingProcessor* proc) {
|
---|
102 | if (producer)
|
---|
103 | throw DuplicateIdExc("RingPipe::setProducer : producer already defined");
|
---|
104 | producer = proc;
|
---|
105 | }
|
---|
106 |
|
---|