source: Sophya/trunk/ArchTOIPipe/Kernel/toiprocessor.cc@ 2187

Last change on this file since 2187 was 2187, checked in by aubourg, 23 years ago

Vivien, limites processors et connect

File size: 15.9 KB
RevLine 
[1738]1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
[2187]5// $Id: toiprocessor.cc,v 1.26 2002-09-09 15:33:15 aubourg Exp $
[1738]6
[1365]7#include "toiprocessor.h"
8#include "toimanager.h"
9#include <pthread.h>
[2133]10#include <typeinfo> // ajout pour linux
[1663]11#ifdef HAVE_VALUES_H
[1365]12#include <values.h>
[1663]13#endif
[1365]14
[1663]15#ifndef MAXINT
16#define MAXINT 2147483647
17#endif
18
19#ifdef HAVE_STDINT_H
20#include <stdint.h>
21#endif
22
[1365]23#ifdef WITH_SOPHYA
24#include "pexceptions.h"
25#else
26#include "apexceptions.h"
27#endif
28
29#define pthread_mutexattr_setkind_np pthread_mutexattr_settype
30
[2133]31
[1365]32TOIProcessor::TOIProcessor() {
33 //cout << "TOIProcessor::TOIProcessor" << endl;
34 outTOIs = NULL;
35 inTOIs = NULL;
36 inited=false;
37
38 upExtra = 0;
39 lowExtra = 0;
40 minOut = -1;
41 maxOut = -1;
[1787]42 forcedMinIn = -1;
43 forcedMaxIn = -1;
[1365]44 neededHistory = 1000;
45 lastAWN = 0;
46 wontNeedValue = -1;
47
[2187]48
49 // ajout vf 23/07/2002
50 cout << "Creating processor" << endl;
[2133]51 // Pour referencer un TOIProcessor, il faut recuperer le TOIManager correspondant
52 TOIManager::getManager()->registerProcessor(this);
[2187]53
54 // initialisation des limites du sample (par defaut tout le fichier)
55 snBegin = 0;
56 snEnd = MAXINT;
57 snMin=MAXINT;
58 snMax=0;
59 // par defaut aucune condition
60 requestedSample = false;
[2133]61
[1365]62}
63
[2133]64
65
[1365]66TOIProcessor::~TOIProcessor() {
67 delete[] outTOIs;
68 delete[] inTOIs;
69 //if (mutex)
70 pthread_mutex_destroy(&mutex);
71 //if (dataReady)
72 pthread_cond_destroy(&dataReady);
[2021]73 // Il ne faut apparemment pas appeler pthread_detach si on a fait join avant
74 // Extrait du man (Reza - Mai 2002)
75 //man: The pthread_join(3) routine also detaches the target thread after
76 //man: pthread_join(3) returns successfully.
77 // pthread_detach(thread); $CHECK$ - Reza Mai 2002
[1365]78}
79
80
81void TOIProcessor::init() {
82 //cout << "TOIProcessor::init" << endl;
83}
84
85void TOIProcessor::afterinit() {
[1437]86 int i;
[1365]87 inTOIs = new (TOI*[inIx.size()]);
[1437]88 for(i=0; i<inIx.size(); i++)
[1439]89 inTOIs[i] = NULL; // Protection-Initialisation - Reza 11/3/2001
[1365]90 outTOIs = new (TOI*[outIx.size()]);
[1437]91 for(i=0; i<outIx.size(); i++)
[1439]92 outTOIs[i] = NULL; // Protection-Initialisation - Reza 11/3/2001
[1365]93}
94
95int TOIProcessor::getMinOut() {
[2187]96 /*
[1365]97 //cout << name << "minout" << endl;
[1787]98 if (minOut < 0) minOut = calcMinOut();
[1365]99 //cout << name << "minout=" << minOut << endl;
100 return minOut;
[2187]101 */
102 return snBegin + lowExtra;
[1365]103}
104
105int TOIProcessor::getMaxOut() {
[2187]106 /*
[1365]107 //cout << name << "maxout" << endl;
108 if (maxOut < 0) maxOut = calcMaxOut();
109 //cout << name << "maxout=" << maxOut << endl;
110 return maxOut;
[2187]111 */
112 return snEnd - upExtra;
[1365]113}
114
115int TOIProcessor::calcMinOut() {
116 return getMinIn() + lowExtra;
117}
118
119int TOIProcessor::calcMaxOut() {
120 return getMaxIn() - upExtra;
121}
122
123int TOIProcessor::getMinIn() {
[2187]124 /*
[1365]125 int nIn = inIx.size();
126 int minIn = 0;
127 for (int i=0; i<nIn; i++) {
128 TOI* toi = inTOIs[i];
[1439]129 if (toi == NULL) continue; // Protection - Reza 13/3/2001
[1365]130 int x = toi->getMinSn();
131 if (x > minIn) minIn = x;
132 }
[1787]133 if (forcedMinIn > 0 && forcedMinIn > minIn) minIn = forcedMinIn;
[1365]134 return minIn;
[2187]135 */
136 return snBegin;
[1365]137}
138
139int TOIProcessor::getMaxIn() {
[2187]140 /*
[1663]141 int_4 nIn = inIx.size();
142 int_4 maxIn = MAXINT;
[1365]143 for (int i=0; i<nIn; i++) {
144 TOI* toi = inTOIs[i];
[1439]145 if (toi == NULL) continue; // Protection - Reza 13/3/2001
[1663]146 int_4 x = toi->getMaxSn();
[1365]147 if (x < maxIn) maxIn = x;
148 }
[1787]149 if (forcedMaxIn > 0 && forcedMaxIn < maxIn) maxIn = forcedMaxIn;
[1365]150 return maxIn;
[2187]151 */
152 return snEnd;
[1365]153}
154
155
156int TOIProcessor::declareInput(string toi) {
157 if (inIx.find(toi) != inIx.end())
158 throw DuplicateIdExc("TOIProcessor::declareInput : "+toi+" already declared");
159 int i = inIx.size();
160 inIx[toi] = i;
161 return i;
162}
163
164int TOIProcessor::declareOutput(string toi) {
165 if (outIx.find(toi) != outIx.end())
166 throw DuplicateIdExc("TOIProcessor::declareInput : "+toi+" already declared");
167 int i = outIx.size();
168 outIx[toi] = i;
169 return i;
170}
171
172int TOIProcessor::getInputTOIIndex(string toi) {
[1367]173 chkinit();
[1365]174 map<string, int>::iterator i = inIx.find(toi);
175 if (i == inIx.end()) return -1;
176 return (*i).second;
177}
178
179int TOIProcessor::getOutputTOIIndex(string toi) {
[1367]180 chkinit();
[1365]181 map<string, int>::iterator i = outIx.find(toi);
182 if (i == outIx.end()) return -1;
183 return (*i).second;
184}
185
[1437]186// Methodes rajoutees par Reza 11/3/2001
187TOI* TOIProcessor::getInputTOI(int toiIndex) {
188 // chkinit();
189 if (toiIndex >= inIx.size())
190 throw RangeCheckError("TOIProcessor::getInputTOI() out of bound toiIndex");
191 TOI* toi = inTOIs[toiIndex];
192 if (toi == NULL)
193 throw NullPtrError("TOIProcessor::getInputTOI() - Not assigned TOI !");
194 return(toi);
195}
196
197TOI* TOIProcessor::getOutputTOI(int toiIndex) {
198 // chkinit();
199 if (toiIndex >= outIx.size())
200 throw RangeCheckError("TOIProcessor::getOutputTOI() out of bound toiIndex");
201 TOI* toi = outTOIs[toiIndex];
[1963]202 // if (toi == NULL)
203 // throw NullPtrError("TOIProcessor::getOutputTOI() - Not assigned TOI !");
[1437]204 return(toi);
205}
206
207bool TOIProcessor::checkInputTOIIndex(int toiIndex) {
208 if (toiIndex >= inIx.size()) return false;
209 if (inTOIs[toiIndex] == NULL) return false;
210 return true;
211}
212
213bool TOIProcessor::checkOutputTOIIndex(int toiIndex) {
214 if (toiIndex >= outIx.size()) return false;
215 if (outTOIs[toiIndex] == NULL) return false;
216 return true;
217}
218
[1762]219void TOIProcessor::PrintStatus(::ostream & os)
[1437]220{
[1439]221 chkinit();
[1437]222 os << " TOIProcessor::PrintStatus() - Name= " << name
223 << " MinIn=" << getMinIn() << " MaxIn=" << getMaxIn() << endl;
224 os << " --- Inputs N= " << inIx.size() << endl;
225 int k;
226 for(k=0; k<inIx.size(); k++) {
227 os << "Input[" << k << "] : " << getInName(k) ;
228 if (inTOIs[k] != NULL)
229 os << " Connected TOI " << inTOIs[k]->getName() << endl;
230 else os << " NO TOI " << endl;
231 }
232 os << " --- Outputs N= " << outIx.size() << endl;
233 for(k=0; k<outIx.size(); k++) {
234 os << "Output[" << k << "] : " << getOutName(k) ;
235 if (outTOIs[k] != NULL)
236 os << " Connected TOI " << outTOIs[k]->getName() << endl;
237 else os << " NO TOI " << endl;
238 }
239 os << endl;
240 return;
241}
242
243// Fin rajout Reza 11/3/2001
244
[1365]245void TOIProcessor::addInput(string name, TOI* toi) {
246 chkinit();
247 map<string, int>::iterator i = inIx.find(name);
248 if (i == inIx.end()) throw NotFoundExc("TOIProcessor::addInput "+
249 name+" not declared");
[1439]250 inTOIs[(*i).second] = toi;
251 toi->addConsumer(this); // $CHECK$ Reza 13/3/2001
[1365]252}
253
254void TOIProcessor::addOutput(string name, TOI* toi) {
255 chkinit();
256 map<string, int>::iterator i = outIx.find(name);
257 if (i == outIx.end()) throw NotFoundExc("TOIProcessor::addOutput "+
258 name+" not declared");
259 toi->setProducer(this);
260 outTOIs[(*i).second] = toi;
261}
262
[1367]263string TOIProcessor::getOutName(int i) {
264 if (i > outIx.size()) throw RangeCheckError("TOIProcessor::getOutName "
265 " out of bound");
[1437]266 map<string, int>::iterator j;
267 for(j=outIx.begin(); j!= outIx.end(); j++)
268 if ((*j).second == i) return (*j).first;
269
270 throw RangeCheckError("TOIProcessor::getOutName Not found index !");
[1367]271}
272
273string TOIProcessor::getInName(int i) {
274 if (i > inIx.size()) throw RangeCheckError("TOIProcessor::getInName "
275 " out of bound");
[1437]276 map<string, int>::iterator j;
277 for(j=inIx.begin(); j!= inIx.end(); j++)
278 if ((*j).second == i) return (*j).first;
279
280 throw RangeCheckError("TOIProcessor::getOutName Not found index !");
[1367]281}
282
[1365]283void TOIProcessor::run() {
284
285}
286
[1689]287void TOIProcessor::warnPutDone() {
288 int n = outIx.size();
289 for (int i=0; i<n; i++) {
290 TOI* toi = outTOIs[i];
[1725]291 if (toi) {
292 toi->putDone();
293 }
[1689]294 }
295}
296
[1365]297void* TOIProcessor::ThreadStart(void* arg) {
[2026]298
[1365]299 TOIProcessor* p = (TOIProcessor*) arg;
300 // cout << p->name << " new thread running " << pthread_self() << endl;
[2026]301 try {
302 p->run();
303 }
304 catch (PThrowable & exc) {
305 cerr << "\n TOIProcessor::ThreadStart() Catched Exception TOIProcessor@"
306 << hex << p << dec << "\n"
307 << (string)typeid(exc).name()
308 << " - Msg= " << exc.Msg() << endl;
309 }
310 catch (const std::exception & sex) {
311 cerr << "\n TOIProcessor::ThreadStart() Catched std::exception TOIProcessor@"
312 << hex << p << dec << "\n"
313 << (string)typeid(sex).name() << endl;
314 }
315 catch (...) {
316 cerr << "\n TOIProcessor::ThreadStart() Catched ... exception TOIProcessor@"
317 << hex << p << dec << endl;
318 }
[1689]319 p->warnPutDone();
[1629]320 pthread_exit(NULL);
[1365]321 // cout << p->name << " thread done " << pthread_self() << endl;
322 return NULL;
323}
324
325#ifdef Linux
326#define pthread_mutexattr_settype pthread_mutexattr_setkind_np
327#define PTHREAD_MUTEX_ERRORCHECK PTHREAD_MUTEX_ERRORCHECK_NP
328#define pthread_mutex_setname_np(a,b,c)
329#define pthread_cond_setname_np(a,b,c)
330#endif
331
332void TOIProcessor::start() {
333 pthread_cond_init(&dataReady, NULL);
334 pthread_mutexattr_init(&mutattr);
335 // pthread_mutexattr_settype(&mutattr, PTHREAD_MUTEX_ERRORCHECK);
336 pthread_mutex_init(&mutex, &mutattr);
337 //pthread_mutex_setname_np(&mutex, (name + "_proc_mutex").c_str(), 0);
338 //pthread_cond_setname_np(&dataReady, (name + "_proc_cond").c_str(), 0);
339 //cout << name << " starting thread " << &thread << endl;
340 pthread_create(&thread, NULL, ThreadStart, this);
341 TOIManager::getManager()->addThread(&thread);
342}
343
344#ifndef NO_SOPHYA
[1464]345/* ---- l'interface va etre modifiee, NE PAS UTILISER
[1365]346Array TOIProcessor::getData(int toiIndex, int iStart, int iEnd) {
[1437]347 TOI* toi = getInputTOI(toiIndex);
[1365]348 toi->waitForData(iStart, iEnd);
349 return toi->getData(iStart, iEnd);
350}
351
352Array TOIProcessor::getError(int toiIndex, int iStart, int iEnd) {
[1437]353 TOI* toi = getInputTOI(toiIndex);
[1365]354 toi->waitForData(iStart, iEnd);
355 return toi->getError(iStart, iEnd);
356}
357
358TArray<int_4> TOIProcessor::getFlag(int toiIndex, int iStart, int iEnd) {
[1437]359 TOI* toi = getInputTOI(toiIndex);
[1365]360 toi->waitForData(iStart, iEnd);
361 return toi->getFlag(iStart, iEnd);
362}
[1464]363 l'interface va etre modifiee, NE PAS UTILISER ---- */
[1365]364#endif
365
366double TOIProcessor::getData(int toiIndex, int i) {
[1437]367 TOI* toi = getInputTOI(toiIndex);
[1740]368 if (toi->needSyncOldWay()) toi->waitForData(i); // seulement pour autre que segmented
369 autoWontNeed(i);
[1365]370 return toi->getData(i);
371}
372
[1532]373void TOIProcessor::getData(int toiIndex, int i, double &data, uint_8 &flag)
[1462]374{
375 TOI* toi = getInputTOI(toiIndex);
[1740]376 if (toi->needSyncOldWay()) toi->waitForData(i); // seulement pour autre que segmented
[1462]377 toi->getData(i, data, flag);
[1742]378 autoWontNeed(i);
[1462]379 return;
380}
381
[1742]382void TOIProcessor::getData(int toiIndex, int i, int n, double* d)
383{
384 TOI* toi = getInputTOI(toiIndex);
385 if (toi->needSyncOldWay()) toi->waitForData(i+n); // seulement pour autre que segmented
386 toi->getData(i, n, d);
387 autoWontNeed(i);
388 return;
389}
390
391void TOIProcessor::getData(int toiIndex, int i, int n, double* d, uint_8* f)
392{
393 TOI* toi = getInputTOI(toiIndex);
394 if (toi->needSyncOldWay()) toi->waitForData(i+n); // seulement pour autre que segmented
395 toi->getData(i, n, d, f);
396 autoWontNeed(i);
397 return;
398}
399
400
[1462]401/*RZCMV
[1365]402double TOIProcessor::getError(int toiIndex, int i) {
[1437]403 TOI* toi = getInputTOI(toiIndex);
[1365]404 toi->waitForData(i);
405 return toi->getError(i);
406}
407
408int_4 TOIProcessor::getFlag(int toiIndex, int i) {
[1437]409 TOI* toi = getInputTOI(toiIndex);
[1365]410 toi->waitForData(i);
411 return toi->getFlag(i);
412}
[1462]413*/
[1365]414
415void TOIProcessor::setNeededHistory(int nsamples) {
416 neededHistory = nsamples;
417}
418
419void TOIProcessor::wontNeedBefore(int i) {
420 if (i<wontNeedValue) return;
421 wontNeedValue = i;
422 for (int j=0; j< (int) inIx.size(); j++) {
[1490]423 // $CHECK$ Reza 6/5/2001 Protection sur non connected TOI
424 if (inTOIs[j]) inTOIs[j]->wontNeedBefore(i);
[1365]425 }
426}
427
428void TOIProcessor::autoWontNeed(int iCur) {
429 if (neededHistory <=0) return;
[1773]430 if (iCur < lastAWN + neededHistory/10) return;
[1365]431 lastAWN = iCur;
[1750]432 // cout << name << " wontNeedBefore " << iCur-neededHistory << endl;
[1365]433 wontNeedBefore(iCur-neededHistory);
434}
435
436void TOIProcessor::notify() {
[1740]437 lock();
[1365]438 pthread_cond_broadcast(&dataReady);
439 unlock();
440}
441
442
[1532]443void TOIProcessor::putData(int toiIndex, int i, double value, uint_8 flg) {
[1437]444 TOI* toi = getOutputTOI(toiIndex);
[1963]445 if (toi == NULL) return;
[1365]446 toi->putData(i, value, flg);
[1742]447 // autoWontNeed(i); // now done on getData
[1740]448 if (toi->needSyncOldWay()) notify(); // seulement pour non segmented
[1365]449}
450
[1742]451void TOIProcessor::putData(int toiIndex, int i, int n, double const* val,
[1743]452 uint_8 const* flg) {
[1742]453 TOI* toi = getOutputTOI(toiIndex);
[1963]454 if (toi == NULL) return;
[1742]455 toi->putData(i, n, val, flg);
456 if (toi->needSyncOldWay()) notify(); // seulement pour non segmented
457}
458
[1462]459/*RZCMV
[1365]460void TOIProcessor::putDataError(int toiIndex, int i, double value,
461 double error, int_4 flg) {
[1437]462 TOI* toi = getOutputTOI(toiIndex);
463 if (toi == NULL)
464 throw NullPtrError("TOIProcessor::putDataError() - Not assigned TOI !");
[1365]465 toi->putDataError(i, value, error, flg);
466 autoWontNeed(i);
467 notify();
468}
[1462]469*/
[1365]470
[2133]471
[2187]472// ajout vf 29/07/2002
[2133]473
[2187]474// parametrage de l'echantillon a produire (sans verification)
475void TOIProcessor::setRequestedSample(long begin, long end) {
476 requestedSample = true;
477 snBegin = begin;
478 snEnd = end;
479 // snMin = snBegin;
480 // snMax = snEnd;
481}
[2133]482
483
[2187]484bool TOIProcessor::checkSampleLimits(int pass)
485{
486 long minTmp=MAXINT;
487 long maxTmp=-1;
[2133]488
[2187]489 return checkSampleLimits(minTmp, maxTmp, pass);
490
491 cout << "toiprocessor : limites verifiees : " << snBegin << " , " << snEnd << " : " << endl;
492}
[2133]493
494
495
[2187]496
497bool TOIProcessor::checkSampleLimits(long& min, long& max, int pass)
498{
499 bool sample_input_ok=true;
500 bool sample_ok=true;
501
502 /* cout << "check " << pass << " " << name << " in " << min << " - " << max << " ; "
503 << snMin << " - " << snMax << " ; "
504 << snBegin << " - " << snEnd << endl;*/
505
506 if (pass == 3) {
507 if (snMin < snMax) {
508 snBegin = snMin;
509 snEnd = snMax;
510 }
511 return true;
512 }
513
514 // on verifie qu'on peut effectivement produire
515
516 if (min < snBegin) {
517 min = snBegin;
518 }
519
520 if (max > snEnd) {
521 max = snEnd;
522 }
523
524 bool noConst = (min>max);
525
526 if (pass == 2 && noConst) {
527 min = snBegin;
528 max = snEnd;
529 }
530
531
532 int n = inIx.size();
533 // parcours de toutes les entrees et mise a jour au plus restrictif
534 for (int i=0; i<n; i++) {
535 TOI* toi = inTOIs[i];
536 if (toi) {
537 // mise a jour des limites avec les marges si definies
538 long min_Input;
539 long max_Input;
540 if (min>0) {
541 min_Input = min - lowExtra;
542 } else {
543 min_Input = min;
544 }
545 if (max<MAXINT) {
546 max_Input = max + upExtra;
547 } else {
548 max_Input = max;
549 }
550 // propagation des limites
551 sample_input_ok = toi->checkSampleLimits(min_Input, max_Input, pass);
552
553 //Ajustement des limites si plus restrictif
554 if (min < max) {
555 // On nous a demande des bornes ->
556 if ((min_Input + lowExtra) > min) {
557 min = min_Input + lowExtra;
558 }
559 if ((max_Input - upExtra) < max) {
560 max = max_Input - upExtra;
561 }
562 } else {
563 // On nous demande tout ce qu'on peut faire -> MAJ snBegin
564 if ((min_Input + lowExtra) > snBegin) {
565 snBegin = min_Input + lowExtra;
566 }
567 if ((max_Input - upExtra) < snEnd) {
568 snEnd = max_Input - upExtra;
569 }
570 }
571 if (sample_input_ok == false) {
572 sample_ok = false;
573 }
574 }
575 }
576
577
578
579
580 //Ajustement des limites si intervalle plus large
581 if (!noConst) {
582 if (min < snMin) {
583 snMin = min;
584 }
585 if (max > snMax) {
586 snMax = max;
587 }
588 }
589
590 min=min<snMin?snMin:min;
591 max=max>snMax?snMax:max;
592
593
594 // cas sans contraintes, on retourne nos bornes
595 if (min>max) {
596 min = snBegin;
597 max = snEnd;
598 }
599
600 /* cout << "check " << pass << " " << name << " out " << min << " - " << max << " ; "
601 << snMin << " - " << snMax << " ; "
602 << snBegin << " - " << snEnd << endl;*/
603 return sample_ok;
604}
605
606
607
608
609// pour verification si le processeur est parametre
610bool TOIProcessor::getRequested()
611{
612 return requestedSample;
613}
614
615// affichage des limites
616void TOIProcessor::printLimits()
617{
618 cout << "toiprocessor " << name <<" : limites calculees : " << snBegin << " , " << snEnd << endl;
619}
620
621TOI* TOIProcessor::getOutToi(string sortie)
622{
623 // recherche du nom de la sortie et verification si le toi existe deja
624 map<string, int>::iterator i = outIx.find(sortie);
625 if (i == outIx.end()) {
626 return NULL;
627 } else {
628 return outTOIs[(*i).second];
629 }
630}
631
632
633
634
635
636
637
638
639
Note: See TracBrowser for help on using the repository browser.