source: Sophya/trunk/ArchTOIPipe/Kernel/toiprocessor.cc@ 2010

Last change on this file since 2010 was 1963, checked in by aubourg, 23 years ago

new output for wiener, unconnected outs ok in processor

File size: 11.1 KB
Line 
1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
5// $Id: toiprocessor.cc,v 1.22 2002-04-11 16:12:17 aubourg Exp $
6
7#include "toiprocessor.h"
8#include "toimanager.h"
9#include <pthread.h>
10
11#ifdef HAVE_VALUES_H
12#include <values.h>
13#endif
14
15#ifndef MAXINT
16#define MAXINT 2147483647
17#endif
18
19#ifdef HAVE_STDINT_H
20#include <stdint.h>
21#endif
22
23#ifdef WITH_SOPHYA
24#include "pexceptions.h"
25#else
26#include "apexceptions.h"
27#endif
28
29#define pthread_mutexattr_setkind_np pthread_mutexattr_settype
30
31TOIProcessor::TOIProcessor() {
32 //cout << "TOIProcessor::TOIProcessor" << endl;
33 outTOIs = NULL;
34 inTOIs = NULL;
35 inited=false;
36
37 upExtra = 0;
38 lowExtra = 0;
39 minOut = -1;
40 maxOut = -1;
41 forcedMinIn = -1;
42 forcedMaxIn = -1;
43 neededHistory = 1000;
44 lastAWN = 0;
45 wontNeedValue = -1;
46
47}
48
49TOIProcessor::~TOIProcessor() {
50 delete[] outTOIs;
51 delete[] inTOIs;
52 //if (mutex)
53 pthread_mutex_destroy(&mutex);
54 //if (dataReady)
55 pthread_cond_destroy(&dataReady);
56 pthread_detach(thread);
57}
58
59
60void TOIProcessor::init() {
61 //cout << "TOIProcessor::init" << endl;
62}
63
64void TOIProcessor::afterinit() {
65 int i;
66 inTOIs = new (TOI*[inIx.size()]);
67 for(i=0; i<inIx.size(); i++)
68 inTOIs[i] = NULL; // Protection-Initialisation - Reza 11/3/2001
69 outTOIs = new (TOI*[outIx.size()]);
70 for(i=0; i<outIx.size(); i++)
71 outTOIs[i] = NULL; // Protection-Initialisation - Reza 11/3/2001
72}
73
74int TOIProcessor::getMinOut() {
75 //cout << name << "minout" << endl;
76 if (minOut < 0) minOut = calcMinOut();
77 //cout << name << "minout=" << minOut << endl;
78 return minOut;
79}
80
81int TOIProcessor::getMaxOut() {
82 //cout << name << "maxout" << endl;
83 if (maxOut < 0) maxOut = calcMaxOut();
84 //cout << name << "maxout=" << maxOut << endl;
85 return maxOut;
86}
87
88int TOIProcessor::calcMinOut() {
89 return getMinIn() + lowExtra;
90}
91
92int TOIProcessor::calcMaxOut() {
93 return getMaxIn() - upExtra;
94}
95
96int TOIProcessor::getMinIn() {
97 int nIn = inIx.size();
98 int minIn = 0;
99 for (int i=0; i<nIn; i++) {
100 TOI* toi = inTOIs[i];
101 if (toi == NULL) continue; // Protection - Reza 13/3/2001
102 int x = toi->getMinSn();
103 if (x > minIn) minIn = x;
104 }
105 if (forcedMinIn > 0 && forcedMinIn > minIn) minIn = forcedMinIn;
106 return minIn;
107}
108
109int TOIProcessor::getMaxIn() {
110 int_4 nIn = inIx.size();
111 int_4 maxIn = MAXINT;
112 for (int i=0; i<nIn; i++) {
113 TOI* toi = inTOIs[i];
114 if (toi == NULL) continue; // Protection - Reza 13/3/2001
115 int_4 x = toi->getMaxSn();
116 if (x < maxIn) maxIn = x;
117 }
118 if (forcedMaxIn > 0 && forcedMaxIn < maxIn) maxIn = forcedMaxIn;
119 return maxIn;
120}
121
122
123int TOIProcessor::declareInput(string toi) {
124 if (inIx.find(toi) != inIx.end())
125 throw DuplicateIdExc("TOIProcessor::declareInput : "+toi+" already declared");
126 int i = inIx.size();
127 inIx[toi] = i;
128 return i;
129}
130
131int TOIProcessor::declareOutput(string toi) {
132 if (outIx.find(toi) != outIx.end())
133 throw DuplicateIdExc("TOIProcessor::declareInput : "+toi+" already declared");
134 int i = outIx.size();
135 outIx[toi] = i;
136 return i;
137}
138
139int TOIProcessor::getInputTOIIndex(string toi) {
140 chkinit();
141 map<string, int>::iterator i = inIx.find(toi);
142 if (i == inIx.end()) return -1;
143 return (*i).second;
144}
145
146int TOIProcessor::getOutputTOIIndex(string toi) {
147 chkinit();
148 map<string, int>::iterator i = outIx.find(toi);
149 if (i == outIx.end()) return -1;
150 return (*i).second;
151}
152
153// Methodes rajoutees par Reza 11/3/2001
154TOI* TOIProcessor::getInputTOI(int toiIndex) {
155 // chkinit();
156 if (toiIndex >= inIx.size())
157 throw RangeCheckError("TOIProcessor::getInputTOI() out of bound toiIndex");
158 TOI* toi = inTOIs[toiIndex];
159 if (toi == NULL)
160 throw NullPtrError("TOIProcessor::getInputTOI() - Not assigned TOI !");
161 return(toi);
162}
163
164TOI* TOIProcessor::getOutputTOI(int toiIndex) {
165 // chkinit();
166 if (toiIndex >= outIx.size())
167 throw RangeCheckError("TOIProcessor::getOutputTOI() out of bound toiIndex");
168 TOI* toi = outTOIs[toiIndex];
169 // if (toi == NULL)
170 // throw NullPtrError("TOIProcessor::getOutputTOI() - Not assigned TOI !");
171 return(toi);
172}
173
174bool TOIProcessor::checkInputTOIIndex(int toiIndex) {
175 if (toiIndex >= inIx.size()) return false;
176 if (inTOIs[toiIndex] == NULL) return false;
177 return true;
178}
179
180bool TOIProcessor::checkOutputTOIIndex(int toiIndex) {
181 if (toiIndex >= outIx.size()) return false;
182 if (outTOIs[toiIndex] == NULL) return false;
183 return true;
184}
185
186void TOIProcessor::PrintStatus(::ostream & os)
187{
188 chkinit();
189 os << " TOIProcessor::PrintStatus() - Name= " << name
190 << " MinIn=" << getMinIn() << " MaxIn=" << getMaxIn() << endl;
191 os << " --- Inputs N= " << inIx.size() << endl;
192 int k;
193 for(k=0; k<inIx.size(); k++) {
194 os << "Input[" << k << "] : " << getInName(k) ;
195 if (inTOIs[k] != NULL)
196 os << " Connected TOI " << inTOIs[k]->getName() << endl;
197 else os << " NO TOI " << endl;
198 }
199 os << " --- Outputs N= " << outIx.size() << endl;
200 for(k=0; k<outIx.size(); k++) {
201 os << "Output[" << k << "] : " << getOutName(k) ;
202 if (outTOIs[k] != NULL)
203 os << " Connected TOI " << outTOIs[k]->getName() << endl;
204 else os << " NO TOI " << endl;
205 }
206 os << endl;
207 return;
208}
209
210// Fin rajout Reza 11/3/2001
211
212void TOIProcessor::addInput(string name, TOI* toi) {
213 chkinit();
214 map<string, int>::iterator i = inIx.find(name);
215 if (i == inIx.end()) throw NotFoundExc("TOIProcessor::addInput "+
216 name+" not declared");
217 inTOIs[(*i).second] = toi;
218 toi->addConsumer(this); // $CHECK$ Reza 13/3/2001
219}
220
221void TOIProcessor::addOutput(string name, TOI* toi) {
222 chkinit();
223 map<string, int>::iterator i = outIx.find(name);
224 if (i == outIx.end()) throw NotFoundExc("TOIProcessor::addOutput "+
225 name+" not declared");
226 toi->setProducer(this);
227 outTOIs[(*i).second] = toi;
228}
229
230string TOIProcessor::getOutName(int i) {
231 if (i > outIx.size()) throw RangeCheckError("TOIProcessor::getOutName "
232 " out of bound");
233 map<string, int>::iterator j;
234 for(j=outIx.begin(); j!= outIx.end(); j++)
235 if ((*j).second == i) return (*j).first;
236
237 throw RangeCheckError("TOIProcessor::getOutName Not found index !");
238}
239
240string TOIProcessor::getInName(int i) {
241 if (i > inIx.size()) throw RangeCheckError("TOIProcessor::getInName "
242 " out of bound");
243 map<string, int>::iterator j;
244 for(j=inIx.begin(); j!= inIx.end(); j++)
245 if ((*j).second == i) return (*j).first;
246
247 throw RangeCheckError("TOIProcessor::getOutName Not found index !");
248}
249
250void TOIProcessor::run() {
251
252}
253
254void TOIProcessor::warnPutDone() {
255 int n = outIx.size();
256 for (int i=0; i<n; i++) {
257 TOI* toi = outTOIs[i];
258 if (toi) {
259 toi->putDone();
260 }
261 }
262}
263
264void* TOIProcessor::ThreadStart(void* arg) {
265 TOIProcessor* p = (TOIProcessor*) arg;
266 // cout << p->name << " new thread running " << pthread_self() << endl;
267 p->run();
268 p->warnPutDone();
269 pthread_exit(NULL);
270 // cout << p->name << " thread done " << pthread_self() << endl;
271 return NULL;
272}
273
274#ifdef Linux
275#define pthread_mutexattr_settype pthread_mutexattr_setkind_np
276#define PTHREAD_MUTEX_ERRORCHECK PTHREAD_MUTEX_ERRORCHECK_NP
277#define pthread_mutex_setname_np(a,b,c)
278#define pthread_cond_setname_np(a,b,c)
279#endif
280
281void TOIProcessor::start() {
282 pthread_cond_init(&dataReady, NULL);
283 pthread_mutexattr_init(&mutattr);
284 // pthread_mutexattr_settype(&mutattr, PTHREAD_MUTEX_ERRORCHECK);
285 pthread_mutex_init(&mutex, &mutattr);
286 //pthread_mutex_setname_np(&mutex, (name + "_proc_mutex").c_str(), 0);
287 //pthread_cond_setname_np(&dataReady, (name + "_proc_cond").c_str(), 0);
288 //cout << name << " starting thread " << &thread << endl;
289 pthread_create(&thread, NULL, ThreadStart, this);
290 TOIManager::getManager()->addThread(&thread);
291}
292
293#ifndef NO_SOPHYA
294/* ---- l'interface va etre modifiee, NE PAS UTILISER
295Array TOIProcessor::getData(int toiIndex, int iStart, int iEnd) {
296 TOI* toi = getInputTOI(toiIndex);
297 toi->waitForData(iStart, iEnd);
298 return toi->getData(iStart, iEnd);
299}
300
301Array TOIProcessor::getError(int toiIndex, int iStart, int iEnd) {
302 TOI* toi = getInputTOI(toiIndex);
303 toi->waitForData(iStart, iEnd);
304 return toi->getError(iStart, iEnd);
305}
306
307TArray<int_4> TOIProcessor::getFlag(int toiIndex, int iStart, int iEnd) {
308 TOI* toi = getInputTOI(toiIndex);
309 toi->waitForData(iStart, iEnd);
310 return toi->getFlag(iStart, iEnd);
311}
312 l'interface va etre modifiee, NE PAS UTILISER ---- */
313#endif
314
315double TOIProcessor::getData(int toiIndex, int i) {
316 TOI* toi = getInputTOI(toiIndex);
317 if (toi->needSyncOldWay()) toi->waitForData(i); // seulement pour autre que segmented
318 autoWontNeed(i);
319 return toi->getData(i);
320}
321
322void TOIProcessor::getData(int toiIndex, int i, double &data, uint_8 &flag)
323{
324 TOI* toi = getInputTOI(toiIndex);
325 if (toi->needSyncOldWay()) toi->waitForData(i); // seulement pour autre que segmented
326 toi->getData(i, data, flag);
327 autoWontNeed(i);
328 return;
329}
330
331void TOIProcessor::getData(int toiIndex, int i, int n, double* d)
332{
333 TOI* toi = getInputTOI(toiIndex);
334 if (toi->needSyncOldWay()) toi->waitForData(i+n); // seulement pour autre que segmented
335 toi->getData(i, n, d);
336 autoWontNeed(i);
337 return;
338}
339
340void TOIProcessor::getData(int toiIndex, int i, int n, double* d, uint_8* f)
341{
342 TOI* toi = getInputTOI(toiIndex);
343 if (toi->needSyncOldWay()) toi->waitForData(i+n); // seulement pour autre que segmented
344 toi->getData(i, n, d, f);
345 autoWontNeed(i);
346 return;
347}
348
349
350/*RZCMV
351double TOIProcessor::getError(int toiIndex, int i) {
352 TOI* toi = getInputTOI(toiIndex);
353 toi->waitForData(i);
354 return toi->getError(i);
355}
356
357int_4 TOIProcessor::getFlag(int toiIndex, int i) {
358 TOI* toi = getInputTOI(toiIndex);
359 toi->waitForData(i);
360 return toi->getFlag(i);
361}
362*/
363
364void TOIProcessor::setNeededHistory(int nsamples) {
365 neededHistory = nsamples;
366}
367
368void TOIProcessor::wontNeedBefore(int i) {
369 if (i<wontNeedValue) return;
370 wontNeedValue = i;
371 for (int j=0; j< (int) inIx.size(); j++) {
372 // $CHECK$ Reza 6/5/2001 Protection sur non connected TOI
373 if (inTOIs[j]) inTOIs[j]->wontNeedBefore(i);
374 }
375}
376
377void TOIProcessor::autoWontNeed(int iCur) {
378 if (neededHistory <=0) return;
379 if (iCur < lastAWN + neededHistory/10) return;
380 lastAWN = iCur;
381 // cout << name << " wontNeedBefore " << iCur-neededHistory << endl;
382 wontNeedBefore(iCur-neededHistory);
383}
384
385void TOIProcessor::notify() {
386 lock();
387 pthread_cond_broadcast(&dataReady);
388 unlock();
389}
390
391
392void TOIProcessor::putData(int toiIndex, int i, double value, uint_8 flg) {
393 TOI* toi = getOutputTOI(toiIndex);
394 if (toi == NULL) return;
395 toi->putData(i, value, flg);
396 // autoWontNeed(i); // now done on getData
397 if (toi->needSyncOldWay()) notify(); // seulement pour non segmented
398}
399
400void TOIProcessor::putData(int toiIndex, int i, int n, double const* val,
401 uint_8 const* flg) {
402 TOI* toi = getOutputTOI(toiIndex);
403 if (toi == NULL) return;
404 toi->putData(i, n, val, flg);
405 if (toi->needSyncOldWay()) notify(); // seulement pour non segmented
406}
407
408/*RZCMV
409void TOIProcessor::putDataError(int toiIndex, int i, double value,
410 double error, int_4 flg) {
411 TOI* toi = getOutputTOI(toiIndex);
412 if (toi == NULL)
413 throw NullPtrError("TOIProcessor::putDataError() - Not assigned TOI !");
414 toi->putDataError(i, value, error, flg);
415 autoWontNeed(i);
416 notify();
417}
418*/
419
Note: See TracBrowser for help on using the repository browser.