source: Sophya/trunk/ArchTOIPipe/Kernel/toiprocessor.cc@ 1437

Last change on this file since 1437 was 1437, checked in by ansari, 25 years ago

Protections diverses dans TOIProcessor et FITSTOIReader/Writer
Ajout d'un TOI (TOISeqBuffered) avec gestion d'un buffer entre put/get
Ajout de processeurs de test (RZTOIProc...) , programme test associe
et SMakefile (pour compil avec SOPHYA)

Reza 12/3/2001

File size: 8.7 KB
Line 
1#include "toiprocessor.h"
2#include "toimanager.h"
3#include <pthread.h>
4#include <values.h>
5
6#ifdef WITH_SOPHYA
7#include "pexceptions.h"
8#else
9#include "apexceptions.h"
10#endif
11
12#define pthread_mutexattr_setkind_np pthread_mutexattr_settype
13
14TOIProcessor::TOIProcessor() {
15 //cout << "TOIProcessor::TOIProcessor" << endl;
16 outTOIs = NULL;
17 inTOIs = NULL;
18 inited=false;
19
20 upExtra = 0;
21 lowExtra = 0;
22 minOut = -1;
23 maxOut = -1;
24 neededHistory = 1000;
25 lastAWN = 0;
26 wontNeedValue = -1;
27
28}
29
30TOIProcessor::~TOIProcessor() {
31 delete[] outTOIs;
32 delete[] inTOIs;
33 //if (mutex)
34 pthread_mutex_destroy(&mutex);
35 //if (dataReady)
36 pthread_cond_destroy(&dataReady);
37 pthread_detach(thread);
38}
39
40
41void TOIProcessor::init() {
42 //cout << "TOIProcessor::init" << endl;
43}
44
45void TOIProcessor::afterinit() {
46 int i;
47 inTOIs = new (TOI*[inIx.size()]);
48 for(i=0; i<inIx.size(); i++)
49 inTOIs[i] = NULL;
50 outTOIs = new (TOI*[outIx.size()]);
51 for(i=0; i<outIx.size(); i++)
52 outTOIs[i] = NULL;
53}
54
55int TOIProcessor::getMinOut() {
56 //cout << name << "minout" << endl;
57 if (minOut < 0) minOut = calcMinOut();
58 //cout << name << "minout=" << minOut << endl;
59 return minOut;
60}
61
62int TOIProcessor::getMaxOut() {
63 //cout << name << "maxout" << endl;
64 if (maxOut < 0) maxOut = calcMaxOut();
65 //cout << name << "maxout=" << maxOut << endl;
66 return maxOut;
67}
68
69int TOIProcessor::calcMinOut() {
70 return getMinIn() + lowExtra;
71}
72
73int TOIProcessor::calcMaxOut() {
74 return getMaxIn() - upExtra;
75}
76
77int TOIProcessor::getMinIn() {
78 int nIn = inIx.size();
79 int minIn = 0;
80 for (int i=0; i<nIn; i++) {
81 TOI* toi = inTOIs[i];
82 int x = toi->getMinSn();
83 if (x > minIn) minIn = x;
84 }
85 return minIn;
86}
87
88int TOIProcessor::getMaxIn() {
89 int nIn = inIx.size();
90 int maxIn = MAXINT;
91 for (int i=0; i<nIn; i++) {
92 TOI* toi = inTOIs[i];
93 int x = toi->getMaxSn();
94 if (x < maxIn) maxIn = x;
95 }
96 return maxIn;
97}
98
99
100int TOIProcessor::declareInput(string toi) {
101 if (inIx.find(toi) != inIx.end())
102 throw DuplicateIdExc("TOIProcessor::declareInput : "+toi+" already declared");
103 int i = inIx.size();
104 inIx[toi] = i;
105 return i;
106}
107
108int TOIProcessor::declareOutput(string toi) {
109 if (outIx.find(toi) != outIx.end())
110 throw DuplicateIdExc("TOIProcessor::declareInput : "+toi+" already declared");
111 int i = outIx.size();
112 outIx[toi] = i;
113 return i;
114}
115
116int TOIProcessor::getInputTOIIndex(string toi) {
117 chkinit();
118 map<string, int>::iterator i = inIx.find(toi);
119 if (i == inIx.end()) return -1;
120 return (*i).second;
121}
122
123int TOIProcessor::getOutputTOIIndex(string toi) {
124 chkinit();
125 map<string, int>::iterator i = outIx.find(toi);
126 if (i == outIx.end()) return -1;
127 return (*i).second;
128}
129
130// Methodes rajoutees par Reza 11/3/2001
131TOI* TOIProcessor::getInputTOI(int toiIndex) {
132 // chkinit();
133 if (toiIndex >= inIx.size())
134 throw RangeCheckError("TOIProcessor::getInputTOI() out of bound toiIndex");
135 TOI* toi = inTOIs[toiIndex];
136 if (toi == NULL)
137 throw NullPtrError("TOIProcessor::getInputTOI() - Not assigned TOI !");
138 return(toi);
139}
140
141TOI* TOIProcessor::getOutputTOI(int toiIndex) {
142 // chkinit();
143 if (toiIndex >= outIx.size())
144 throw RangeCheckError("TOIProcessor::getOutputTOI() out of bound toiIndex");
145 TOI* toi = outTOIs[toiIndex];
146 if (toi == NULL)
147 throw NullPtrError("TOIProcessor::getOutputTOI() - Not assigned TOI !");
148 return(toi);
149}
150
151bool TOIProcessor::checkInputTOIIndex(int toiIndex) {
152 if (toiIndex >= inIx.size()) return false;
153 if (inTOIs[toiIndex] == NULL) return false;
154 return true;
155}
156
157bool TOIProcessor::checkOutputTOIIndex(int toiIndex) {
158 if (toiIndex >= outIx.size()) return false;
159 if (outTOIs[toiIndex] == NULL) return false;
160 return true;
161}
162
163void TOIProcessor::PrintStatus(ostream & os)
164{
165 os << " TOIProcessor::PrintStatus() - Name= " << name
166 << " MinIn=" << getMinIn() << " MaxIn=" << getMaxIn() << endl;
167 os << " --- Inputs N= " << inIx.size() << endl;
168 int k;
169 for(k=0; k<inIx.size(); k++) {
170 os << "Input[" << k << "] : " << getInName(k) ;
171 if (inTOIs[k] != NULL)
172 os << " Connected TOI " << inTOIs[k]->getName() << endl;
173 else os << " NO TOI " << endl;
174 }
175 os << " --- Outputs N= " << outIx.size() << endl;
176 for(k=0; k<outIx.size(); k++) {
177 os << "Output[" << k << "] : " << getOutName(k) ;
178 if (outTOIs[k] != NULL)
179 os << " Connected TOI " << outTOIs[k]->getName() << endl;
180 else os << " NO TOI " << endl;
181 }
182 os << endl;
183 return;
184}
185
186// Fin rajout Reza 11/3/2001
187
188void TOIProcessor::addInput(string name, TOI* toi) {
189 chkinit();
190 map<string, int>::iterator i = inIx.find(name);
191 if (i == inIx.end()) throw NotFoundExc("TOIProcessor::addInput "+
192 name+" not declared");
193 inTOIs[(*i).second] = toi;
194}
195
196void TOIProcessor::addOutput(string name, TOI* toi) {
197 chkinit();
198 map<string, int>::iterator i = outIx.find(name);
199 if (i == outIx.end()) throw NotFoundExc("TOIProcessor::addOutput "+
200 name+" not declared");
201 toi->setProducer(this);
202 outTOIs[(*i).second] = toi;
203}
204
205string TOIProcessor::getOutName(int i) {
206 if (i > outIx.size()) throw RangeCheckError("TOIProcessor::getOutName "
207 " out of bound");
208 map<string, int>::iterator j;
209 for(j=outIx.begin(); j!= outIx.end(); j++)
210 if ((*j).second == i) return (*j).first;
211
212 throw RangeCheckError("TOIProcessor::getOutName Not found index !");
213}
214
215string TOIProcessor::getInName(int i) {
216 if (i > inIx.size()) throw RangeCheckError("TOIProcessor::getInName "
217 " out of bound");
218 map<string, int>::iterator j;
219 for(j=inIx.begin(); j!= inIx.end(); j++)
220 if ((*j).second == i) return (*j).first;
221
222 throw RangeCheckError("TOIProcessor::getOutName Not found index !");
223}
224
225void TOIProcessor::run() {
226
227}
228
229void* TOIProcessor::ThreadStart(void* arg) {
230 TOIProcessor* p = (TOIProcessor*) arg;
231 // cout << p->name << " new thread running " << pthread_self() << endl;
232 p->run();
233 // cout << p->name << " thread done " << pthread_self() << endl;
234 return NULL;
235}
236
237#ifdef Linux
238#define pthread_mutexattr_settype pthread_mutexattr_setkind_np
239#define PTHREAD_MUTEX_ERRORCHECK PTHREAD_MUTEX_ERRORCHECK_NP
240#define pthread_mutex_setname_np(a,b,c)
241#define pthread_cond_setname_np(a,b,c)
242#endif
243
244void TOIProcessor::start() {
245 pthread_cond_init(&dataReady, NULL);
246 pthread_mutexattr_init(&mutattr);
247 // pthread_mutexattr_settype(&mutattr, PTHREAD_MUTEX_ERRORCHECK);
248 pthread_mutex_init(&mutex, &mutattr);
249 //pthread_mutex_setname_np(&mutex, (name + "_proc_mutex").c_str(), 0);
250 //pthread_cond_setname_np(&dataReady, (name + "_proc_cond").c_str(), 0);
251 //cout << name << " starting thread " << &thread << endl;
252 pthread_create(&thread, NULL, ThreadStart, this);
253 TOIManager::getManager()->addThread(&thread);
254}
255
256#ifndef NO_SOPHYA
257Array TOIProcessor::getData(int toiIndex, int iStart, int iEnd) {
258 TOI* toi = getInputTOI(toiIndex);
259 toi->waitForData(iStart, iEnd);
260 return toi->getData(iStart, iEnd);
261}
262
263Array TOIProcessor::getError(int toiIndex, int iStart, int iEnd) {
264 TOI* toi = getInputTOI(toiIndex);
265 toi->waitForData(iStart, iEnd);
266 return toi->getError(iStart, iEnd);
267}
268
269TArray<int_4> TOIProcessor::getFlag(int toiIndex, int iStart, int iEnd) {
270 TOI* toi = getInputTOI(toiIndex);
271 toi->waitForData(iStart, iEnd);
272 return toi->getFlag(iStart, iEnd);
273}
274#endif
275
276double TOIProcessor::getData(int toiIndex, int i) {
277 TOI* toi = getInputTOI(toiIndex);
278 toi->waitForData(i);
279 return toi->getData(i);
280}
281
282double TOIProcessor::getError(int toiIndex, int i) {
283 TOI* toi = getInputTOI(toiIndex);
284 toi->waitForData(i);
285 return toi->getError(i);
286}
287
288int_4 TOIProcessor::getFlag(int toiIndex, int i) {
289 TOI* toi = getInputTOI(toiIndex);
290 toi->waitForData(i);
291 return toi->getFlag(i);
292}
293
294void TOIProcessor::setNeededHistory(int nsamples) {
295 neededHistory = nsamples;
296}
297
298void TOIProcessor::wontNeedBefore(int i) {
299 if (i<wontNeedValue) return;
300 wontNeedValue = i;
301 for (int j=0; j< (int) inIx.size(); j++) {
302 inTOIs[j]->wontNeedBefore(i);
303 }
304}
305
306void TOIProcessor::autoWontNeed(int iCur) {
307 if (neededHistory <=0) return;
308 if (iCur < lastAWN + neededHistory) return;
309 lastAWN = iCur;
310 //cout << name << " wontNeedBefore " << iCur-neededHistory << endl;
311 wontNeedBefore(iCur-neededHistory);
312}
313
314void TOIProcessor::notify() {
315 lock();
316/* if (mutex.__m_lock.__status == 0) {
317 cout << "wait without lock" << endl; abort();
318 }*/
319
320 pthread_cond_broadcast(&dataReady);
321 unlock();
322}
323
324
325void TOIProcessor::putData(int toiIndex, int i, double value, int_4 flg) {
326 TOI* toi = getOutputTOI(toiIndex);
327 toi->putData(i, value, flg);
328 autoWontNeed(i);
329 notify();
330}
331
332
333void TOIProcessor::putDataError(int toiIndex, int i, double value,
334 double error, int_4 flg) {
335 TOI* toi = getOutputTOI(toiIndex);
336 if (toi == NULL)
337 throw NullPtrError("TOIProcessor::putDataError() - Not assigned TOI !");
338 toi->putDataError(i, value, error, flg);
339 autoWontNeed(i);
340 notify();
341}
342
343
Note: See TracBrowser for help on using the repository browser.