source: Sophya/trunk/ArchTOIPipe/Kernel/toiprocessor.cc@ 2436

Last change on this file since 2436 was 2282, checked in by aubourg, 23 years ago

deadlock work

File size: 15.1 KB
Line 
1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
5// $Id: toiprocessor.cc,v 1.27 2002-11-28 15:49:21 aubourg Exp $
6
7#include "toiprocessor.h"
8#include "toimanager.h"
9#include <pthread.h>
10#include <typeinfo> // ajout pour linux
11#ifdef HAVE_VALUES_H
12#include <values.h>
13#endif
14
15#ifndef MAXINT
16#define MAXINT 2147483647
17#endif
18
19#ifdef HAVE_STDINT_H
20#include <stdint.h>
21#endif
22
23#ifdef WITH_SOPHYA
24#include "pexceptions.h"
25#else
26#include "apexceptions.h"
27#endif
28
29#define pthread_mutexattr_setkind_np pthread_mutexattr_settype
30
31
32TOIProcessor::TOIProcessor() {
33 //cout << "TOIProcessor::TOIProcessor" << endl;
34 outTOIs = NULL;
35 inTOIs = NULL;
36 inited=false;
37
38 upExtra = 0;
39 lowExtra = 0;
40 minOut = -1;
41 maxOut = -1;
42 forcedMinIn = -1;
43 forcedMaxIn = -1;
44 neededHistory = 1000;
45 lastAWN = 0;
46 wontNeedValue = -1;
47
48
49 // ajout vf 23/07/2002
50 cout << "Creating processor" << endl;
51 // Pour referencer un TOIProcessor, il faut recuperer le TOIManager correspondant
52 TOIManager::getManager()->registerProcessor(this);
53
54 // initialisation des limites du sample (par defaut tout le fichier)
55 snBegin = 0;
56 snEnd = MAXINT;
57 snMin=MAXINT;
58 snMax=0;
59 // par defaut aucune condition
60 requestedSample = false;
61
62}
63
64
65
66TOIProcessor::~TOIProcessor() {
67 delete[] outTOIs;
68 delete[] inTOIs;
69 //if (mutex)
70 pthread_mutex_destroy(&mutex);
71 //if (dataReady)
72 pthread_cond_destroy(&dataReady);
73 // Il ne faut apparemment pas appeler pthread_detach si on a fait join avant
74 // Extrait du man (Reza - Mai 2002)
75 //man: The pthread_join(3) routine also detaches the target thread after
76 //man: pthread_join(3) returns successfully.
77 // pthread_detach(thread); $CHECK$ - Reza Mai 2002
78}
79
80
81void TOIProcessor::init() {
82 //cout << "TOIProcessor::init" << endl;
83}
84
85void TOIProcessor::afterinit() {
86 int i;
87 inTOIs = new (TOI*[inIx.size()]);
88 for(i=0; i<inIx.size(); i++)
89 inTOIs[i] = NULL; // Protection-Initialisation - Reza 11/3/2001
90 outTOIs = new (TOI*[outIx.size()]);
91 for(i=0; i<outIx.size(); i++)
92 outTOIs[i] = NULL; // Protection-Initialisation - Reza 11/3/2001
93}
94
95int TOIProcessor::getMinOut() {
96 // return snBegin + lowExtra;
97 return snBegin;
98}
99
100int TOIProcessor::getMaxOut() {
101 // return snEnd - upExtra;
102 return snEnd;
103}
104
105int TOIProcessor::calcMinOut() {
106 return getMinIn() + lowExtra;
107}
108
109int TOIProcessor::calcMaxOut() {
110 return getMaxIn() - upExtra;
111}
112
113int TOIProcessor::getMinIn() {
114 // return snBegin;
115 return snBegin - lowExtra;
116}
117
118int TOIProcessor::getMaxIn() {
119 // return snEnd;
120 return snEnd + upExtra;
121}
122
123
124int TOIProcessor::declareInput(string toi) {
125 if (inIx.find(toi) != inIx.end())
126 throw DuplicateIdExc("TOIProcessor::declareInput : "+toi+" already declared");
127 int i = inIx.size();
128 inIx[toi] = i;
129 return i;
130}
131
132int TOIProcessor::declareOutput(string toi) {
133 if (outIx.find(toi) != outIx.end())
134 throw DuplicateIdExc("TOIProcessor::declareInput : "+toi+" already declared");
135 int i = outIx.size();
136 outIx[toi] = i;
137 return i;
138}
139
140int TOIProcessor::getInputTOIIndex(string toi) {
141 chkinit();
142 map<string, int>::iterator i = inIx.find(toi);
143 if (i == inIx.end()) return -1;
144 return (*i).second;
145}
146
147int TOIProcessor::getOutputTOIIndex(string toi) {
148 chkinit();
149 map<string, int>::iterator i = outIx.find(toi);
150 if (i == outIx.end()) return -1;
151 return (*i).second;
152}
153
154// Methodes rajoutees par Reza 11/3/2001
155TOI* TOIProcessor::getInputTOI(int toiIndex) {
156 // chkinit();
157 if (toiIndex >= inIx.size())
158 throw RangeCheckError("TOIProcessor::getInputTOI() out of bound toiIndex");
159 TOI* toi = inTOIs[toiIndex];
160 if (toi == NULL)
161 throw NullPtrError("TOIProcessor::getInputTOI() - Not assigned TOI !");
162 return(toi);
163}
164
165TOI* TOIProcessor::getOutputTOI(int toiIndex) {
166 // chkinit();
167 if (toiIndex >= outIx.size())
168 throw RangeCheckError("TOIProcessor::getOutputTOI() out of bound toiIndex");
169 TOI* toi = outTOIs[toiIndex];
170 // if (toi == NULL)
171 // throw NullPtrError("TOIProcessor::getOutputTOI() - Not assigned TOI !");
172 return(toi);
173}
174
175bool TOIProcessor::checkInputTOIIndex(int toiIndex) {
176 if (toiIndex >= inIx.size()) return false;
177 if (inTOIs[toiIndex] == NULL) return false;
178 return true;
179}
180
181bool TOIProcessor::checkOutputTOIIndex(int toiIndex) {
182 if (toiIndex >= outIx.size()) return false;
183 if (outTOIs[toiIndex] == NULL) return false;
184 return true;
185}
186
187void TOIProcessor::PrintStatus(::ostream & os)
188{
189 chkinit();
190 os << " TOIProcessor::PrintStatus() - Name= " << name
191 << " MinIn=" << getMinIn() << " MaxIn=" << getMaxIn() << endl;
192 os << " --- Inputs N= " << inIx.size() << endl;
193 int k;
194 for(k=0; k<inIx.size(); k++) {
195 os << "Input[" << k << "] : " << getInName(k) ;
196 if (inTOIs[k] != NULL)
197 os << " Connected TOI " << inTOIs[k]->getName() << endl;
198 else os << " NO TOI " << endl;
199 }
200 os << " --- Outputs N= " << outIx.size() << endl;
201 for(k=0; k<outIx.size(); k++) {
202 os << "Output[" << k << "] : " << getOutName(k) ;
203 if (outTOIs[k] != NULL)
204 os << " Connected TOI " << outTOIs[k]->getName() << endl;
205 else os << " NO TOI " << endl;
206 }
207 os << endl;
208 return;
209}
210
211// Fin rajout Reza 11/3/2001
212
213void TOIProcessor::addInput(string name, TOI* toi) {
214 chkinit();
215 map<string, int>::iterator i = inIx.find(name);
216 if (i == inIx.end()) throw NotFoundExc("TOIProcessor::addInput "+
217 name+" not declared");
218 inTOIs[(*i).second] = toi;
219 toi->addConsumer(this); // $CHECK$ Reza 13/3/2001
220}
221
222void TOIProcessor::addOutput(string name, TOI* toi) {
223 chkinit();
224 map<string, int>::iterator i = outIx.find(name);
225 if (i == outIx.end()) throw NotFoundExc("TOIProcessor::addOutput "+
226 name+" not declared");
227 toi->setProducer(this);
228 outTOIs[(*i).second] = toi;
229}
230
231string TOIProcessor::getOutName(int i) {
232 if (i > outIx.size()) throw RangeCheckError("TOIProcessor::getOutName "
233 " out of bound");
234 map<string, int>::iterator j;
235 for(j=outIx.begin(); j!= outIx.end(); j++)
236 if ((*j).second == i) return (*j).first;
237
238 throw RangeCheckError("TOIProcessor::getOutName Not found index !");
239}
240
241string TOIProcessor::getInName(int i) {
242 if (i > inIx.size()) throw RangeCheckError("TOIProcessor::getInName "
243 " out of bound");
244 map<string, int>::iterator j;
245 for(j=inIx.begin(); j!= inIx.end(); j++)
246 if ((*j).second == i) return (*j).first;
247
248 throw RangeCheckError("TOIProcessor::getOutName Not found index !");
249}
250
251void TOIProcessor::run() {
252
253}
254
255void TOIProcessor::warnPutDone() {
256 int n = outIx.size();
257 for (int i=0; i<n; i++) {
258 TOI* toi = outTOIs[i];
259 if (toi) {
260 toi->putDone();
261 }
262 }
263}
264
265void* TOIProcessor::ThreadStart(void* arg) {
266
267 TOIProcessor* p = (TOIProcessor*) arg;
268 // cout << p->name << " new thread running " << pthread_self() << endl;
269 try {
270 p->run();
271 }
272 catch (PThrowable & exc) {
273 cerr << "\n TOIProcessor::ThreadStart() Catched Exception TOIProcessor@"
274 << hex << p << dec << "\n"
275 << (string)typeid(exc).name()
276 << " - Msg= " << exc.Msg() << endl;
277 }
278 catch (const std::exception & sex) {
279 cerr << "\n TOIProcessor::ThreadStart() Catched std::exception TOIProcessor@"
280 << hex << p << dec << "\n"
281 << (string)typeid(sex).name() << endl;
282 }
283 catch (...) {
284 cerr << "\n TOIProcessor::ThreadStart() Catched ... exception TOIProcessor@"
285 << hex << p << dec << endl;
286 }
287 p->warnPutDone();
288 pthread_exit(NULL);
289 // cout << p->name << " thread done " << pthread_self() << endl;
290 return NULL;
291}
292
293#ifdef Linux
294#define pthread_mutexattr_settype pthread_mutexattr_setkind_np
295#define PTHREAD_MUTEX_ERRORCHECK PTHREAD_MUTEX_ERRORCHECK_NP
296#define pthread_mutex_setname_np(a,b,c)
297#define pthread_cond_setname_np(a,b,c)
298#endif
299
300void TOIProcessor::start() {
301 pthread_cond_init(&dataReady, NULL);
302 pthread_mutexattr_init(&mutattr);
303 // pthread_mutexattr_settype(&mutattr, PTHREAD_MUTEX_ERRORCHECK);
304 pthread_mutex_init(&mutex, &mutattr);
305 //pthread_mutex_setname_np(&mutex, (name + "_proc_mutex").c_str(), 0);
306 //pthread_cond_setname_np(&dataReady, (name + "_proc_cond").c_str(), 0);
307 //cout << name << " starting thread " << &thread << endl;
308 pthread_create(&thread, NULL, ThreadStart, this);
309 TOIManager::getManager()->addThread(&thread);
310}
311
312#ifndef NO_SOPHYA
313/* ---- l'interface va etre modifiee, NE PAS UTILISER
314Array TOIProcessor::getData(int toiIndex, int iStart, int iEnd) {
315 TOI* toi = getInputTOI(toiIndex);
316 toi->waitForData(iStart, iEnd);
317 return toi->getData(iStart, iEnd);
318}
319
320Array TOIProcessor::getError(int toiIndex, int iStart, int iEnd) {
321 TOI* toi = getInputTOI(toiIndex);
322 toi->waitForData(iStart, iEnd);
323 return toi->getError(iStart, iEnd);
324}
325
326TArray<int_4> TOIProcessor::getFlag(int toiIndex, int iStart, int iEnd) {
327 TOI* toi = getInputTOI(toiIndex);
328 toi->waitForData(iStart, iEnd);
329 return toi->getFlag(iStart, iEnd);
330}
331 l'interface va etre modifiee, NE PAS UTILISER ---- */
332#endif
333
334double TOIProcessor::getData(int toiIndex, int i) {
335 TOI* toi = getInputTOI(toiIndex);
336 if (toi->needSyncOldWay()) toi->waitForData(i); // seulement pour autre que segmented
337 autoWontNeed(i);
338 return toi->getData(i);
339}
340
341void TOIProcessor::getData(int toiIndex, int i, double &data, uint_8 &flag)
342{
343 TOI* toi = getInputTOI(toiIndex);
344 if (toi->needSyncOldWay()) toi->waitForData(i); // seulement pour autre que segmented
345 toi->getData(i, data, flag);
346 autoWontNeed(i);
347 return;
348}
349
350void TOIProcessor::getData(int toiIndex, int i, int n, double* d)
351{
352 TOI* toi = getInputTOI(toiIndex);
353 if (toi->needSyncOldWay()) toi->waitForData(i+n); // seulement pour autre que segmented
354 toi->getData(i, n, d);
355 autoWontNeed(i);
356 return;
357}
358
359void TOIProcessor::getData(int toiIndex, int i, int n, double* d, uint_8* f)
360{
361 TOI* toi = getInputTOI(toiIndex);
362 if (toi->needSyncOldWay()) toi->waitForData(i+n); // seulement pour autre que segmented
363 toi->getData(i, n, d, f);
364 autoWontNeed(i);
365 return;
366}
367
368
369/*RZCMV
370double TOIProcessor::getError(int toiIndex, int i) {
371 TOI* toi = getInputTOI(toiIndex);
372 toi->waitForData(i);
373 return toi->getError(i);
374}
375
376int_4 TOIProcessor::getFlag(int toiIndex, int i) {
377 TOI* toi = getInputTOI(toiIndex);
378 toi->waitForData(i);
379 return toi->getFlag(i);
380}
381*/
382
383void TOIProcessor::setNeededHistory(int nsamples) {
384 neededHistory = nsamples;
385}
386
387void TOIProcessor::wontNeedBefore(int i) {
388 if (i<wontNeedValue) return;
389 wontNeedValue = i;
390 for (int j=0; j< (int) inIx.size(); j++) {
391 // $CHECK$ Reza 6/5/2001 Protection sur non connected TOI
392 if (inTOIs[j]) inTOIs[j]->wontNeedBefore(i);
393 }
394}
395
396void TOIProcessor::autoWontNeed(int iCur) {
397 if (neededHistory <=0) return;
398 if (iCur < lastAWN + neededHistory/10) return;
399 lastAWN = iCur;
400 // cout << name << " wontNeedBefore " << iCur-neededHistory << endl;
401 wontNeedBefore(iCur-neededHistory);
402}
403
404void TOIProcessor::notify() {
405 lock();
406 pthread_cond_broadcast(&dataReady);
407 unlock();
408}
409
410
411void TOIProcessor::putData(int toiIndex, int i, double value, uint_8 flg) {
412 TOI* toi = getOutputTOI(toiIndex);
413 if (toi == NULL) return;
414 toi->putData(i, value, flg);
415 // autoWontNeed(i); // now done on getData
416 if (toi->needSyncOldWay()) notify(); // seulement pour non segmented
417}
418
419void TOIProcessor::putData(int toiIndex, int i, int n, double const* val,
420 uint_8 const* flg) {
421 TOI* toi = getOutputTOI(toiIndex);
422 if (toi == NULL) return;
423 toi->putData(i, n, val, flg);
424 if (toi->needSyncOldWay()) notify(); // seulement pour non segmented
425}
426
427/*RZCMV
428void TOIProcessor::putDataError(int toiIndex, int i, double value,
429 double error, int_4 flg) {
430 TOI* toi = getOutputTOI(toiIndex);
431 if (toi == NULL)
432 throw NullPtrError("TOIProcessor::putDataError() - Not assigned TOI !");
433 toi->putDataError(i, value, error, flg);
434 autoWontNeed(i);
435 notify();
436}
437*/
438
439
440// ajout vf 29/07/2002
441
442// parametrage de l'echantillon a produire (sans verification)
443void TOIProcessor::setRequestedSample(long begin, long end) {
444 requestedSample = true;
445 snBegin = begin;
446 snEnd = end;
447 // snMin = snBegin;
448 // snMax = snEnd;
449}
450
451
452bool TOIProcessor::checkSampleLimits(int pass)
453{
454 long minTmp=MAXINT;
455 long maxTmp=-1;
456
457 return checkSampleLimits(minTmp, maxTmp, pass);
458
459 cout << "toiprocessor : limites verifiees : " << snBegin << " , " << snEnd << " : " << endl;
460}
461
462
463
464
465bool TOIProcessor::checkSampleLimits(long& min, long& max, int pass)
466{
467 bool sample_input_ok=true;
468 bool sample_ok=true;
469
470 /* cout << "check " << pass << " " << name << " in " << min << " - " << max << " ; "
471 << snMin << " - " << snMax << " ; "
472 << snBegin << " - " << snEnd << endl;*/
473
474 if (pass == 3) {
475 if (snMin < snMax) {
476 snBegin = snMin;
477 snEnd = snMax;
478 }
479 return true;
480 }
481
482 // on verifie qu'on peut effectivement produire
483
484 if (min < snBegin) {
485 min = snBegin;
486 }
487
488 if (max > snEnd) {
489 max = snEnd;
490 }
491
492 bool noConst = (min>max);
493
494 if (pass == 2 && noConst) {
495 min = snBegin;
496 max = snEnd;
497 }
498
499
500 int n = inIx.size();
501 // parcours de toutes les entrees et mise a jour au plus restrictif
502 for (int i=0; i<n; i++) {
503 TOI* toi = inTOIs[i];
504 if (toi) {
505 // mise a jour des limites avec les marges si definies
506 long min_Input;
507 long max_Input;
508 if (min>0) {
509 min_Input = min - lowExtra;
510 } else {
511 min_Input = min;
512 }
513 if (max<MAXINT) {
514 max_Input = max + upExtra;
515 } else {
516 max_Input = max;
517 }
518 // propagation des limites
519 sample_input_ok = toi->checkSampleLimits(min_Input, max_Input, pass);
520
521 //Ajustement des limites si plus restrictif
522 if (min < max) {
523 // On nous a demande des bornes ->
524 if ((min_Input + lowExtra) > min) {
525 min = min_Input + lowExtra;
526 }
527 if ((max_Input - upExtra) < max) {
528 max = max_Input - upExtra;
529 }
530 } else {
531 // On nous demande tout ce qu'on peut faire -> MAJ snBegin
532 if ((min_Input + lowExtra) > snBegin) {
533 snBegin = min_Input + lowExtra;
534 }
535 if ((max_Input - upExtra) < snEnd) {
536 snEnd = max_Input - upExtra;
537 }
538 }
539 if (sample_input_ok == false) {
540 sample_ok = false;
541 }
542 }
543 }
544
545
546
547
548 //Ajustement des limites si intervalle plus large
549 if (!noConst) {
550 if (min < snMin) {
551 snMin = min;
552 }
553 if (max > snMax) {
554 snMax = max;
555 }
556 }
557
558 min=min<snMin?snMin:min;
559 max=max>snMax?snMax:max;
560
561
562 // cas sans contraintes, on retourne nos bornes
563 if (min>max) {
564 min = snBegin;
565 max = snEnd;
566 }
567
568 /* cout << "check " << pass << " " << name << " out " << min << " - " << max << " ; "
569 << snMin << " - " << snMax << " ; "
570 << snBegin << " - " << snEnd << endl;*/
571 return sample_ok;
572}
573
574
575
576
577// pour verification si le processeur est parametre
578bool TOIProcessor::getRequested()
579{
580 return requestedSample;
581}
582
583// affichage des limites
584void TOIProcessor::printLimits()
585{
586 cout << "toiprocessor " << name <<" : limites calculees : " << snBegin << " , " << snEnd << endl;
587}
588
589TOI* TOIProcessor::getOutToi(string sortie)
590{
591 // recherche du nom de la sortie et verification si le toi existe deja
592 map<string, int>::iterator i = outIx.find(sortie);
593 if (i == outIx.end()) {
594 return NULL;
595 } else {
596 return outTOIs[(*i).second];
597 }
598}
599
600
601
602
603
604
605
606
607
Note: See TracBrowser for help on using the repository browser.