source: Sophya/trunk/ArchTOIPipe/Kernel/toiprocessor.cc@ 2135

Last change on this file since 2135 was 2133, checked in by vfebvre, 23 years ago

startAll sur TOIManager

File size: 12.1 KB
Line 
1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
5// $Id: toiprocessor.cc,v 1.25 2002-07-26 08:52:43 vfebvre Exp $
6
7#include "toiprocessor.h"
8#include "toimanager.h"
9#include <pthread.h>
10#include <typeinfo> // ajout pour linux
11#ifdef HAVE_VALUES_H
12#include <values.h>
13#endif
14
15#ifndef MAXINT
16#define MAXINT 2147483647
17#endif
18
19#ifdef HAVE_STDINT_H
20#include <stdint.h>
21#endif
22
23#ifdef WITH_SOPHYA
24#include "pexceptions.h"
25#else
26#include "apexceptions.h"
27#endif
28
29#define pthread_mutexattr_setkind_np pthread_mutexattr_settype
30
31
32TOIProcessor::TOIProcessor() {
33 //cout << "TOIProcessor::TOIProcessor" << endl;
34 outTOIs = NULL;
35 inTOIs = NULL;
36 inited=false;
37
38 upExtra = 0;
39 lowExtra = 0;
40 minOut = -1;
41 maxOut = -1;
42 forcedMinIn = -1;
43 forcedMaxIn = -1;
44 neededHistory = 1000;
45 lastAWN = 0;
46 wontNeedValue = -1;
47
48 // Pour referencer un TOIProcessor, il faut recuperer le TOIManager correspondant
49 TOIManager::getManager()->registerProcessor(this);
50
51}
52
53
54
55TOIProcessor::~TOIProcessor() {
56 delete[] outTOIs;
57 delete[] inTOIs;
58 //if (mutex)
59 pthread_mutex_destroy(&mutex);
60 //if (dataReady)
61 pthread_cond_destroy(&dataReady);
62 // Il ne faut apparemment pas appeler pthread_detach si on a fait join avant
63 // Extrait du man (Reza - Mai 2002)
64 //man: The pthread_join(3) routine also detaches the target thread after
65 //man: pthread_join(3) returns successfully.
66 // pthread_detach(thread); $CHECK$ - Reza Mai 2002
67}
68
69
70void TOIProcessor::init() {
71 //cout << "TOIProcessor::init" << endl;
72}
73
74void TOIProcessor::afterinit() {
75 int i;
76 inTOIs = new (TOI*[inIx.size()]);
77 for(i=0; i<inIx.size(); i++)
78 inTOIs[i] = NULL; // Protection-Initialisation - Reza 11/3/2001
79 outTOIs = new (TOI*[outIx.size()]);
80 for(i=0; i<outIx.size(); i++)
81 outTOIs[i] = NULL; // Protection-Initialisation - Reza 11/3/2001
82}
83
84int TOIProcessor::getMinOut() {
85 //cout << name << "minout" << endl;
86 if (minOut < 0) minOut = calcMinOut();
87 //cout << name << "minout=" << minOut << endl;
88 return minOut;
89}
90
91int TOIProcessor::getMaxOut() {
92 //cout << name << "maxout" << endl;
93 if (maxOut < 0) maxOut = calcMaxOut();
94 //cout << name << "maxout=" << maxOut << endl;
95 return maxOut;
96}
97
98int TOIProcessor::calcMinOut() {
99 return getMinIn() + lowExtra;
100}
101
102int TOIProcessor::calcMaxOut() {
103 return getMaxIn() - upExtra;
104}
105
106int TOIProcessor::getMinIn() {
107 int nIn = inIx.size();
108 int minIn = 0;
109 for (int i=0; i<nIn; i++) {
110 TOI* toi = inTOIs[i];
111 if (toi == NULL) continue; // Protection - Reza 13/3/2001
112 int x = toi->getMinSn();
113 if (x > minIn) minIn = x;
114 }
115 if (forcedMinIn > 0 && forcedMinIn > minIn) minIn = forcedMinIn;
116 return minIn;
117}
118
119int TOIProcessor::getMaxIn() {
120 int_4 nIn = inIx.size();
121 int_4 maxIn = MAXINT;
122 for (int i=0; i<nIn; i++) {
123 TOI* toi = inTOIs[i];
124 if (toi == NULL) continue; // Protection - Reza 13/3/2001
125 int_4 x = toi->getMaxSn();
126 if (x < maxIn) maxIn = x;
127 }
128 if (forcedMaxIn > 0 && forcedMaxIn < maxIn) maxIn = forcedMaxIn;
129 return maxIn;
130}
131
132
133int TOIProcessor::declareInput(string toi) {
134 if (inIx.find(toi) != inIx.end())
135 throw DuplicateIdExc("TOIProcessor::declareInput : "+toi+" already declared");
136 int i = inIx.size();
137 inIx[toi] = i;
138 return i;
139}
140
141int TOIProcessor::declareOutput(string toi) {
142 if (outIx.find(toi) != outIx.end())
143 throw DuplicateIdExc("TOIProcessor::declareInput : "+toi+" already declared");
144 int i = outIx.size();
145 outIx[toi] = i;
146 return i;
147}
148
149int TOIProcessor::getInputTOIIndex(string toi) {
150 chkinit();
151 map<string, int>::iterator i = inIx.find(toi);
152 if (i == inIx.end()) return -1;
153 return (*i).second;
154}
155
156int TOIProcessor::getOutputTOIIndex(string toi) {
157 chkinit();
158 map<string, int>::iterator i = outIx.find(toi);
159 if (i == outIx.end()) return -1;
160 return (*i).second;
161}
162
163// Methodes rajoutees par Reza 11/3/2001
164TOI* TOIProcessor::getInputTOI(int toiIndex) {
165 // chkinit();
166 if (toiIndex >= inIx.size())
167 throw RangeCheckError("TOIProcessor::getInputTOI() out of bound toiIndex");
168 TOI* toi = inTOIs[toiIndex];
169 if (toi == NULL)
170 throw NullPtrError("TOIProcessor::getInputTOI() - Not assigned TOI !");
171 return(toi);
172}
173
174TOI* TOIProcessor::getOutputTOI(int toiIndex) {
175 // chkinit();
176 if (toiIndex >= outIx.size())
177 throw RangeCheckError("TOIProcessor::getOutputTOI() out of bound toiIndex");
178 TOI* toi = outTOIs[toiIndex];
179 // if (toi == NULL)
180 // throw NullPtrError("TOIProcessor::getOutputTOI() - Not assigned TOI !");
181 return(toi);
182}
183
184bool TOIProcessor::checkInputTOIIndex(int toiIndex) {
185 if (toiIndex >= inIx.size()) return false;
186 if (inTOIs[toiIndex] == NULL) return false;
187 return true;
188}
189
190bool TOIProcessor::checkOutputTOIIndex(int toiIndex) {
191 if (toiIndex >= outIx.size()) return false;
192 if (outTOIs[toiIndex] == NULL) return false;
193 return true;
194}
195
196void TOIProcessor::PrintStatus(::ostream & os)
197{
198 chkinit();
199 os << " TOIProcessor::PrintStatus() - Name= " << name
200 << " MinIn=" << getMinIn() << " MaxIn=" << getMaxIn() << endl;
201 os << " --- Inputs N= " << inIx.size() << endl;
202 int k;
203 for(k=0; k<inIx.size(); k++) {
204 os << "Input[" << k << "] : " << getInName(k) ;
205 if (inTOIs[k] != NULL)
206 os << " Connected TOI " << inTOIs[k]->getName() << endl;
207 else os << " NO TOI " << endl;
208 }
209 os << " --- Outputs N= " << outIx.size() << endl;
210 for(k=0; k<outIx.size(); k++) {
211 os << "Output[" << k << "] : " << getOutName(k) ;
212 if (outTOIs[k] != NULL)
213 os << " Connected TOI " << outTOIs[k]->getName() << endl;
214 else os << " NO TOI " << endl;
215 }
216 os << endl;
217 return;
218}
219
220// Fin rajout Reza 11/3/2001
221
222void TOIProcessor::addInput(string name, TOI* toi) {
223 chkinit();
224 map<string, int>::iterator i = inIx.find(name);
225 if (i == inIx.end()) throw NotFoundExc("TOIProcessor::addInput "+
226 name+" not declared");
227 inTOIs[(*i).second] = toi;
228 toi->addConsumer(this); // $CHECK$ Reza 13/3/2001
229}
230
231void TOIProcessor::addOutput(string name, TOI* toi) {
232 chkinit();
233 map<string, int>::iterator i = outIx.find(name);
234 if (i == outIx.end()) throw NotFoundExc("TOIProcessor::addOutput "+
235 name+" not declared");
236 toi->setProducer(this);
237 outTOIs[(*i).second] = toi;
238}
239
240string TOIProcessor::getOutName(int i) {
241 if (i > outIx.size()) throw RangeCheckError("TOIProcessor::getOutName "
242 " out of bound");
243 map<string, int>::iterator j;
244 for(j=outIx.begin(); j!= outIx.end(); j++)
245 if ((*j).second == i) return (*j).first;
246
247 throw RangeCheckError("TOIProcessor::getOutName Not found index !");
248}
249
250string TOIProcessor::getInName(int i) {
251 if (i > inIx.size()) throw RangeCheckError("TOIProcessor::getInName "
252 " out of bound");
253 map<string, int>::iterator j;
254 for(j=inIx.begin(); j!= inIx.end(); j++)
255 if ((*j).second == i) return (*j).first;
256
257 throw RangeCheckError("TOIProcessor::getOutName Not found index !");
258}
259
260void TOIProcessor::run() {
261
262}
263
264void TOIProcessor::warnPutDone() {
265 int n = outIx.size();
266 for (int i=0; i<n; i++) {
267 TOI* toi = outTOIs[i];
268 if (toi) {
269 toi->putDone();
270 }
271 }
272}
273
274void* TOIProcessor::ThreadStart(void* arg) {
275
276 TOIProcessor* p = (TOIProcessor*) arg;
277 // cout << p->name << " new thread running " << pthread_self() << endl;
278 try {
279 p->run();
280 }
281 catch (PThrowable & exc) {
282 cerr << "\n TOIProcessor::ThreadStart() Catched Exception TOIProcessor@"
283 << hex << p << dec << "\n"
284 << (string)typeid(exc).name()
285 << " - Msg= " << exc.Msg() << endl;
286 }
287 catch (const std::exception & sex) {
288 cerr << "\n TOIProcessor::ThreadStart() Catched std::exception TOIProcessor@"
289 << hex << p << dec << "\n"
290 << (string)typeid(sex).name() << endl;
291 }
292 catch (...) {
293 cerr << "\n TOIProcessor::ThreadStart() Catched ... exception TOIProcessor@"
294 << hex << p << dec << endl;
295 }
296 p->warnPutDone();
297 pthread_exit(NULL);
298 // cout << p->name << " thread done " << pthread_self() << endl;
299 return NULL;
300}
301
302#ifdef Linux
303#define pthread_mutexattr_settype pthread_mutexattr_setkind_np
304#define PTHREAD_MUTEX_ERRORCHECK PTHREAD_MUTEX_ERRORCHECK_NP
305#define pthread_mutex_setname_np(a,b,c)
306#define pthread_cond_setname_np(a,b,c)
307#endif
308
309void TOIProcessor::start() {
310 pthread_cond_init(&dataReady, NULL);
311 pthread_mutexattr_init(&mutattr);
312 // pthread_mutexattr_settype(&mutattr, PTHREAD_MUTEX_ERRORCHECK);
313 pthread_mutex_init(&mutex, &mutattr);
314 //pthread_mutex_setname_np(&mutex, (name + "_proc_mutex").c_str(), 0);
315 //pthread_cond_setname_np(&dataReady, (name + "_proc_cond").c_str(), 0);
316 //cout << name << " starting thread " << &thread << endl;
317 pthread_create(&thread, NULL, ThreadStart, this);
318 TOIManager::getManager()->addThread(&thread);
319}
320
321#ifndef NO_SOPHYA
322/* ---- l'interface va etre modifiee, NE PAS UTILISER
323Array TOIProcessor::getData(int toiIndex, int iStart, int iEnd) {
324 TOI* toi = getInputTOI(toiIndex);
325 toi->waitForData(iStart, iEnd);
326 return toi->getData(iStart, iEnd);
327}
328
329Array TOIProcessor::getError(int toiIndex, int iStart, int iEnd) {
330 TOI* toi = getInputTOI(toiIndex);
331 toi->waitForData(iStart, iEnd);
332 return toi->getError(iStart, iEnd);
333}
334
335TArray<int_4> TOIProcessor::getFlag(int toiIndex, int iStart, int iEnd) {
336 TOI* toi = getInputTOI(toiIndex);
337 toi->waitForData(iStart, iEnd);
338 return toi->getFlag(iStart, iEnd);
339}
340 l'interface va etre modifiee, NE PAS UTILISER ---- */
341#endif
342
343double TOIProcessor::getData(int toiIndex, int i) {
344 TOI* toi = getInputTOI(toiIndex);
345 if (toi->needSyncOldWay()) toi->waitForData(i); // seulement pour autre que segmented
346 autoWontNeed(i);
347 return toi->getData(i);
348}
349
350void TOIProcessor::getData(int toiIndex, int i, double &data, uint_8 &flag)
351{
352 TOI* toi = getInputTOI(toiIndex);
353 if (toi->needSyncOldWay()) toi->waitForData(i); // seulement pour autre que segmented
354 toi->getData(i, data, flag);
355 autoWontNeed(i);
356 return;
357}
358
359void TOIProcessor::getData(int toiIndex, int i, int n, double* d)
360{
361 TOI* toi = getInputTOI(toiIndex);
362 if (toi->needSyncOldWay()) toi->waitForData(i+n); // seulement pour autre que segmented
363 toi->getData(i, n, d);
364 autoWontNeed(i);
365 return;
366}
367
368void TOIProcessor::getData(int toiIndex, int i, int n, double* d, uint_8* f)
369{
370 TOI* toi = getInputTOI(toiIndex);
371 if (toi->needSyncOldWay()) toi->waitForData(i+n); // seulement pour autre que segmented
372 toi->getData(i, n, d, f);
373 autoWontNeed(i);
374 return;
375}
376
377
378/*RZCMV
379double TOIProcessor::getError(int toiIndex, int i) {
380 TOI* toi = getInputTOI(toiIndex);
381 toi->waitForData(i);
382 return toi->getError(i);
383}
384
385int_4 TOIProcessor::getFlag(int toiIndex, int i) {
386 TOI* toi = getInputTOI(toiIndex);
387 toi->waitForData(i);
388 return toi->getFlag(i);
389}
390*/
391
392void TOIProcessor::setNeededHistory(int nsamples) {
393 neededHistory = nsamples;
394}
395
396void TOIProcessor::wontNeedBefore(int i) {
397 if (i<wontNeedValue) return;
398 wontNeedValue = i;
399 for (int j=0; j< (int) inIx.size(); j++) {
400 // $CHECK$ Reza 6/5/2001 Protection sur non connected TOI
401 if (inTOIs[j]) inTOIs[j]->wontNeedBefore(i);
402 }
403}
404
405void TOIProcessor::autoWontNeed(int iCur) {
406 if (neededHistory <=0) return;
407 if (iCur < lastAWN + neededHistory/10) return;
408 lastAWN = iCur;
409 // cout << name << " wontNeedBefore " << iCur-neededHistory << endl;
410 wontNeedBefore(iCur-neededHistory);
411}
412
413void TOIProcessor::notify() {
414 lock();
415 pthread_cond_broadcast(&dataReady);
416 unlock();
417}
418
419
420void TOIProcessor::putData(int toiIndex, int i, double value, uint_8 flg) {
421 TOI* toi = getOutputTOI(toiIndex);
422 if (toi == NULL) return;
423 toi->putData(i, value, flg);
424 // autoWontNeed(i); // now done on getData
425 if (toi->needSyncOldWay()) notify(); // seulement pour non segmented
426}
427
428void TOIProcessor::putData(int toiIndex, int i, int n, double const* val,
429 uint_8 const* flg) {
430 TOI* toi = getOutputTOI(toiIndex);
431 if (toi == NULL) return;
432 toi->putData(i, n, val, flg);
433 if (toi->needSyncOldWay()) notify(); // seulement pour non segmented
434}
435
436/*RZCMV
437void TOIProcessor::putDataError(int toiIndex, int i, double value,
438 double error, int_4 flg) {
439 TOI* toi = getOutputTOI(toiIndex);
440 if (toi == NULL)
441 throw NullPtrError("TOIProcessor::putDataError() - Not assigned TOI !");
442 toi->putDataError(i, value, error, flg);
443 autoWontNeed(i);
444 notify();
445}
446*/
447
448
449
450
451
452
453
454
455
Note: See TracBrowser for help on using the repository browser.