source: Sophya/trunk/ArchTOIPipe/Kernel/toiprocessor.cc@ 1365

Last change on this file since 1365 was 1365, checked in by aubourg, 25 years ago

pipeline TOI archeops

File size: 6.0 KB
Line 
1#include "toiprocessor.h"
2#include "toimanager.h"
3#include <pthread.h>
4#include <values.h>
5
6#ifdef WITH_SOPHYA
7#include "pexceptions.h"
8#else
9#include "apexceptions.h"
10#endif
11
12#define pthread_mutexattr_setkind_np pthread_mutexattr_settype
13
14TOIProcessor::TOIProcessor() {
15 //cout << "TOIProcessor::TOIProcessor" << endl;
16 outTOIs = NULL;
17 inTOIs = NULL;
18 inited=false;
19
20 upExtra = 0;
21 lowExtra = 0;
22 minOut = -1;
23 maxOut = -1;
24 neededHistory = 1000;
25 lastAWN = 0;
26 wontNeedValue = -1;
27
28}
29
30TOIProcessor::~TOIProcessor() {
31 delete[] outTOIs;
32 delete[] inTOIs;
33 //if (mutex)
34 pthread_mutex_destroy(&mutex);
35 //if (dataReady)
36 pthread_cond_destroy(&dataReady);
37 pthread_detach(thread);
38}
39
40
41void TOIProcessor::init() {
42 //cout << "TOIProcessor::init" << endl;
43}
44
45void TOIProcessor::afterinit() {
46 inTOIs = new (TOI*[inIx.size()]);
47 outTOIs = new (TOI*[outIx.size()]);
48}
49
50int TOIProcessor::getMinOut() {
51 //cout << name << "minout" << endl;
52 if (minOut < 0) minOut = calcMinOut();
53 //cout << name << "minout=" << minOut << endl;
54 return minOut;
55}
56
57int TOIProcessor::getMaxOut() {
58 //cout << name << "maxout" << endl;
59 if (maxOut < 0) maxOut = calcMaxOut();
60 //cout << name << "maxout=" << maxOut << endl;
61 return maxOut;
62}
63
64int TOIProcessor::calcMinOut() {
65 return getMinIn() + lowExtra;
66}
67
68int TOIProcessor::calcMaxOut() {
69 return getMaxIn() - upExtra;
70}
71
72int TOIProcessor::getMinIn() {
73 int nIn = inIx.size();
74 int minIn = 0;
75 for (int i=0; i<nIn; i++) {
76 TOI* toi = inTOIs[i];
77 int x = toi->getMinSn();
78 if (x > minIn) minIn = x;
79 }
80 return minIn;
81}
82
83int TOIProcessor::getMaxIn() {
84 int nIn = inIx.size();
85 int maxIn = MAXINT;
86 for (int i=0; i<nIn; i++) {
87 TOI* toi = inTOIs[i];
88 int x = toi->getMaxSn();
89 if (x < maxIn) maxIn = x;
90 }
91 return maxIn;
92}
93
94
95int TOIProcessor::declareInput(string toi) {
96 if (inIx.find(toi) != inIx.end())
97 throw DuplicateIdExc("TOIProcessor::declareInput : "+toi+" already declared");
98 int i = inIx.size();
99 inIx[toi] = i;
100 return i;
101}
102
103int TOIProcessor::declareOutput(string toi) {
104 if (outIx.find(toi) != outIx.end())
105 throw DuplicateIdExc("TOIProcessor::declareInput : "+toi+" already declared");
106 int i = outIx.size();
107 outIx[toi] = i;
108 return i;
109}
110
111int TOIProcessor::getInputTOIIndex(string toi) {
112 map<string, int>::iterator i = inIx.find(toi);
113 if (i == inIx.end()) return -1;
114 return (*i).second;
115}
116
117int TOIProcessor::getOutputTOIIndex(string toi) {
118 map<string, int>::iterator i = outIx.find(toi);
119 if (i == outIx.end()) return -1;
120 return (*i).second;
121}
122
123void TOIProcessor::addInput(string name, TOI* toi) {
124 chkinit();
125 map<string, int>::iterator i = inIx.find(name);
126 if (i == inIx.end()) throw NotFoundExc("TOIProcessor::addInput "+
127 name+" not declared");
128 inTOIs[(*i).second] = toi;
129}
130
131void TOIProcessor::addOutput(string name, TOI* toi) {
132 chkinit();
133 map<string, int>::iterator i = outIx.find(name);
134 if (i == outIx.end()) throw NotFoundExc("TOIProcessor::addOutput "+
135 name+" not declared");
136 toi->setProducer(this);
137 outTOIs[(*i).second] = toi;
138}
139
140void TOIProcessor::run() {
141
142}
143
144void* TOIProcessor::ThreadStart(void* arg) {
145 TOIProcessor* p = (TOIProcessor*) arg;
146 // cout << p->name << " new thread running " << pthread_self() << endl;
147 p->run();
148 // cout << p->name << " thread done " << pthread_self() << endl;
149 return NULL;
150}
151
152#ifdef Linux
153#define pthread_mutexattr_settype pthread_mutexattr_setkind_np
154#define PTHREAD_MUTEX_ERRORCHECK PTHREAD_MUTEX_ERRORCHECK_NP
155#define pthread_mutex_setname_np(a,b,c)
156#define pthread_cond_setname_np(a,b,c)
157#endif
158
159void TOIProcessor::start() {
160 pthread_cond_init(&dataReady, NULL);
161 pthread_mutexattr_init(&mutattr);
162 // pthread_mutexattr_settype(&mutattr, PTHREAD_MUTEX_ERRORCHECK);
163 pthread_mutex_init(&mutex, &mutattr);
164 //pthread_mutex_setname_np(&mutex, (name + "_proc_mutex").c_str(), 0);
165 //pthread_cond_setname_np(&dataReady, (name + "_proc_cond").c_str(), 0);
166 //cout << name << " starting thread " << &thread << endl;
167 pthread_create(&thread, NULL, ThreadStart, this);
168 TOIManager::getManager()->addThread(&thread);
169}
170
171#ifndef NO_SOPHYA
172Array TOIProcessor::getData(int toiIndex, int iStart, int iEnd) {
173 TOI* toi = inTOIs[toiIndex];
174 toi->waitForData(iStart, iEnd);
175 return toi->getData(iStart, iEnd);
176}
177
178Array TOIProcessor::getError(int toiIndex, int iStart, int iEnd) {
179 TOI* toi = inTOIs[toiIndex];
180 toi->waitForData(iStart, iEnd);
181 return toi->getError(iStart, iEnd);
182}
183
184TArray<int_4> TOIProcessor::getFlag(int toiIndex, int iStart, int iEnd) {
185 TOI* toi = inTOIs[toiIndex];
186 toi->waitForData(iStart, iEnd);
187 return toi->getFlag(iStart, iEnd);
188}
189#endif
190
191double TOIProcessor::getData(int toiIndex, int i) {
192 TOI* toi = inTOIs[toiIndex];
193 toi->waitForData(i);
194 return toi->getData(i);
195}
196
197double TOIProcessor::getError(int toiIndex, int i) {
198 TOI* toi = inTOIs[toiIndex];
199 toi->waitForData(i);
200 return toi->getError(i);
201}
202
203int_4 TOIProcessor::getFlag(int toiIndex, int i) {
204 TOI* toi = inTOIs[toiIndex];
205 toi->waitForData(i);
206 return toi->getFlag(i);
207}
208
209void TOIProcessor::setNeededHistory(int nsamples) {
210 neededHistory = nsamples;
211}
212
213void TOIProcessor::wontNeedBefore(int i) {
214 if (i<wontNeedValue) return;
215 wontNeedValue = i;
216 for (int j=0; j< (int) inIx.size(); j++) {
217 inTOIs[j]->wontNeedBefore(i);
218 }
219}
220
221void TOIProcessor::autoWontNeed(int iCur) {
222 if (neededHistory <=0) return;
223 if (iCur < lastAWN + neededHistory) return;
224 lastAWN = iCur;
225 //cout << name << " wontNeedBefore " << iCur-neededHistory << endl;
226 wontNeedBefore(iCur-neededHistory);
227}
228
229void TOIProcessor::notify() {
230 lock();
231/* if (mutex.__m_lock.__status == 0) {
232 cout << "wait without lock" << endl; abort();
233 }*/
234
235 pthread_cond_broadcast(&dataReady);
236 unlock();
237}
238
239
240void TOIProcessor::putData(int toiIndex, int i, double value, int_4 flg) {
241 TOI* toi = outTOIs[toiIndex];
242 toi->putData(i, value, flg);
243 autoWontNeed(i);
244 notify();
245}
246
247
248void TOIProcessor::putDataError(int toiIndex, int i, double value,
249 double error, int_4 flg) {
250 TOI* toi = outTOIs[toiIndex];
251 toi->putDataError(i, value, error, flg);
252 autoWontNeed(i);
253 notify();
254}
255
256
Note: See TracBrowser for help on using the repository browser.