source: Sophya/trunk/ArchTOIPipe/Kernel/ringpipe.cc@ 2447

Last change on this file since 2447 was 2386, checked in by aubourg, 22 years ago

ring pipes

File size: 2.5 KB
Line 
1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
5// $Id: ringpipe.cc,v 1.2 2003-05-20 10:10:08 aubourg Exp $
6
7#include "ringpipe.h"
8#include "ringprocessor.h"
9
10
11#ifdef WITH_SOPHYA
12#include "pexceptions.h"
13#else
14#include "apexceptions.h"
15#endif
16
17RingPipe::RingPipe(int ws) {
18 i0 = -1;
19 winSize = ws;
20 producer = NULL;
21 pthread_cond_init(&ringReady, NULL);
22 pthread_mutexattr_init(&mutattr);
23 pthread_mutex_init(&mutex, &mutattr);
24}
25
26RingPipe::~RingPipe() {
27 pthread_mutex_destroy(&mutex);
28 pthread_cond_destroy(&ringReady);
29}
30
31Ring const* RingPipe::getRing(int i) {
32 lock();
33 waitForRing(i);
34 Ring const* ring = data[i-i0];
35 unlock();
36 return ring;
37}
38
39void RingPipe::putRing(int i, Ring const* ring) {
40 lock();
41 waitForRoom(i);
42 if (i0 == -1) {
43 data.insert(data.begin(), 1, (Ring const*) NULL);
44 i0 = i;
45 } else if (i<i0) {
46 data.insert(data.begin(), i0-i, (Ring const*) NULL);
47 i0 = i;
48 } else if (i>=i0+(int)data.size()) {
49 data.insert(data.end(), (long) (i-(i0+data.size())+1), (Ring const*)NULL);
50 }
51 data[i-i0] = ring;
52 notify();
53 unlock();
54}
55
56void RingPipe::waitForRoom(int j) {
57 while (j-i0 >= winSize) {
58 wait();
59 }
60}
61
62void RingPipe::waitForRing(int i) {
63 if (producer == NULL) throw NotFoundExc("RingPipe has no producer !");
64
65 DataStatus s = isRingAvail(i);
66 if (s == DATA_OK) {
67 return;
68 }
69 if (s == DATA_DELETED) {
70 throw NotFoundExc("Data has been deleted !");
71 }
72
73 while (isRingAvail(i) == DATA_NOT_YET) {
74 wait();
75 }
76 return;
77}
78
79void RingPipe::getRingRange(int& min, int& max) {
80 producer->getRingRange(min, max);
81}
82
83RingPipe::DataStatus RingPipe::isRingAvail(int i) {
84 if (i >= i0 + (long)data.size()) return DATA_NOT_YET;
85 if (i < i0) return DATA_DELETED;
86 return DATA_OK;
87}
88
89void RingPipe::wontNeedRingBefore(int i) {
90 int j=i;
91 for (vector<RingProcessor*>::iterator k = consumers.begin();
92 k != consumers.end(); k++) {
93 if ((*k)->getWontNeedBefore() < j) j = (*k)->getWontNeedBefore();
94 }
95 lock();
96 if (i >= i0 + (long)data.size())
97 i = i0 + (long)data.size() -1;
98 if (i > i0) {
99 data.erase(data.begin(), data.begin()+(i-i0));
100 i0=i;
101 notify();
102 }
103 unlock();
104}
105
106void RingPipe::addConsumer(RingProcessor* cons) {
107 consumers.push_back(cons);
108}
109
110void RingPipe::setProducer(RingProcessor* proc) {
111 if (producer)
112 throw DuplicateIdExc("RingPipe::setProducer : producer already defined");
113 producer = proc;
114}
115
Note: See TracBrowser for help on using the repository browser.