source: Sophya/trunk/ArchTOIPipe/Kernel/toiprocessor.cc@ 2061

Last change on this file since 2061 was 2026, checked in by ansari, 23 years ago

Correction de pb divers (3) - Reza 30/5/2002

File size: 12.0 KB
Line 
1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
5// $Id: toiprocessor.cc,v 1.24 2002-05-30 11:51:26 ansari Exp $
6
7#include "toiprocessor.h"
8#include "toimanager.h"
9#include <pthread.h>
10
11#ifdef HAVE_VALUES_H
12#include <values.h>
13#endif
14
15#ifndef MAXINT
16#define MAXINT 2147483647
17#endif
18
19#ifdef HAVE_STDINT_H
20#include <stdint.h>
21#endif
22
23#ifdef WITH_SOPHYA
24#include "pexceptions.h"
25#else
26#include "apexceptions.h"
27#endif
28
29#define pthread_mutexattr_setkind_np pthread_mutexattr_settype
30
31TOIProcessor::TOIProcessor() {
32 //cout << "TOIProcessor::TOIProcessor" << endl;
33 outTOIs = NULL;
34 inTOIs = NULL;
35 inited=false;
36
37 upExtra = 0;
38 lowExtra = 0;
39 minOut = -1;
40 maxOut = -1;
41 forcedMinIn = -1;
42 forcedMaxIn = -1;
43 neededHistory = 1000;
44 lastAWN = 0;
45 wontNeedValue = -1;
46
47}
48
49TOIProcessor::~TOIProcessor() {
50 delete[] outTOIs;
51 delete[] inTOIs;
52 //if (mutex)
53 pthread_mutex_destroy(&mutex);
54 //if (dataReady)
55 pthread_cond_destroy(&dataReady);
56 // Il ne faut apparemment pas appeler pthread_detach si on a fait join avant
57 // Extrait du man (Reza - Mai 2002)
58 //man: The pthread_join(3) routine also detaches the target thread after
59 //man: pthread_join(3) returns successfully.
60 // pthread_detach(thread); $CHECK$ - Reza Mai 2002
61}
62
63
64void TOIProcessor::init() {
65 //cout << "TOIProcessor::init" << endl;
66}
67
68void TOIProcessor::afterinit() {
69 int i;
70 inTOIs = new (TOI*[inIx.size()]);
71 for(i=0; i<inIx.size(); i++)
72 inTOIs[i] = NULL; // Protection-Initialisation - Reza 11/3/2001
73 outTOIs = new (TOI*[outIx.size()]);
74 for(i=0; i<outIx.size(); i++)
75 outTOIs[i] = NULL; // Protection-Initialisation - Reza 11/3/2001
76}
77
78int TOIProcessor::getMinOut() {
79 //cout << name << "minout" << endl;
80 if (minOut < 0) minOut = calcMinOut();
81 //cout << name << "minout=" << minOut << endl;
82 return minOut;
83}
84
85int TOIProcessor::getMaxOut() {
86 //cout << name << "maxout" << endl;
87 if (maxOut < 0) maxOut = calcMaxOut();
88 //cout << name << "maxout=" << maxOut << endl;
89 return maxOut;
90}
91
92int TOIProcessor::calcMinOut() {
93 return getMinIn() + lowExtra;
94}
95
96int TOIProcessor::calcMaxOut() {
97 return getMaxIn() - upExtra;
98}
99
100int TOIProcessor::getMinIn() {
101 int nIn = inIx.size();
102 int minIn = 0;
103 for (int i=0; i<nIn; i++) {
104 TOI* toi = inTOIs[i];
105 if (toi == NULL) continue; // Protection - Reza 13/3/2001
106 int x = toi->getMinSn();
107 if (x > minIn) minIn = x;
108 }
109 if (forcedMinIn > 0 && forcedMinIn > minIn) minIn = forcedMinIn;
110 return minIn;
111}
112
113int TOIProcessor::getMaxIn() {
114 int_4 nIn = inIx.size();
115 int_4 maxIn = MAXINT;
116 for (int i=0; i<nIn; i++) {
117 TOI* toi = inTOIs[i];
118 if (toi == NULL) continue; // Protection - Reza 13/3/2001
119 int_4 x = toi->getMaxSn();
120 if (x < maxIn) maxIn = x;
121 }
122 if (forcedMaxIn > 0 && forcedMaxIn < maxIn) maxIn = forcedMaxIn;
123 return maxIn;
124}
125
126
127int TOIProcessor::declareInput(string toi) {
128 if (inIx.find(toi) != inIx.end())
129 throw DuplicateIdExc("TOIProcessor::declareInput : "+toi+" already declared");
130 int i = inIx.size();
131 inIx[toi] = i;
132 return i;
133}
134
135int TOIProcessor::declareOutput(string toi) {
136 if (outIx.find(toi) != outIx.end())
137 throw DuplicateIdExc("TOIProcessor::declareInput : "+toi+" already declared");
138 int i = outIx.size();
139 outIx[toi] = i;
140 return i;
141}
142
143int TOIProcessor::getInputTOIIndex(string toi) {
144 chkinit();
145 map<string, int>::iterator i = inIx.find(toi);
146 if (i == inIx.end()) return -1;
147 return (*i).second;
148}
149
150int TOIProcessor::getOutputTOIIndex(string toi) {
151 chkinit();
152 map<string, int>::iterator i = outIx.find(toi);
153 if (i == outIx.end()) return -1;
154 return (*i).second;
155}
156
157// Methodes rajoutees par Reza 11/3/2001
158TOI* TOIProcessor::getInputTOI(int toiIndex) {
159 // chkinit();
160 if (toiIndex >= inIx.size())
161 throw RangeCheckError("TOIProcessor::getInputTOI() out of bound toiIndex");
162 TOI* toi = inTOIs[toiIndex];
163 if (toi == NULL)
164 throw NullPtrError("TOIProcessor::getInputTOI() - Not assigned TOI !");
165 return(toi);
166}
167
168TOI* TOIProcessor::getOutputTOI(int toiIndex) {
169 // chkinit();
170 if (toiIndex >= outIx.size())
171 throw RangeCheckError("TOIProcessor::getOutputTOI() out of bound toiIndex");
172 TOI* toi = outTOIs[toiIndex];
173 // if (toi == NULL)
174 // throw NullPtrError("TOIProcessor::getOutputTOI() - Not assigned TOI !");
175 return(toi);
176}
177
178bool TOIProcessor::checkInputTOIIndex(int toiIndex) {
179 if (toiIndex >= inIx.size()) return false;
180 if (inTOIs[toiIndex] == NULL) return false;
181 return true;
182}
183
184bool TOIProcessor::checkOutputTOIIndex(int toiIndex) {
185 if (toiIndex >= outIx.size()) return false;
186 if (outTOIs[toiIndex] == NULL) return false;
187 return true;
188}
189
190void TOIProcessor::PrintStatus(::ostream & os)
191{
192 chkinit();
193 os << " TOIProcessor::PrintStatus() - Name= " << name
194 << " MinIn=" << getMinIn() << " MaxIn=" << getMaxIn() << endl;
195 os << " --- Inputs N= " << inIx.size() << endl;
196 int k;
197 for(k=0; k<inIx.size(); k++) {
198 os << "Input[" << k << "] : " << getInName(k) ;
199 if (inTOIs[k] != NULL)
200 os << " Connected TOI " << inTOIs[k]->getName() << endl;
201 else os << " NO TOI " << endl;
202 }
203 os << " --- Outputs N= " << outIx.size() << endl;
204 for(k=0; k<outIx.size(); k++) {
205 os << "Output[" << k << "] : " << getOutName(k) ;
206 if (outTOIs[k] != NULL)
207 os << " Connected TOI " << outTOIs[k]->getName() << endl;
208 else os << " NO TOI " << endl;
209 }
210 os << endl;
211 return;
212}
213
214// Fin rajout Reza 11/3/2001
215
216void TOIProcessor::addInput(string name, TOI* toi) {
217 chkinit();
218 map<string, int>::iterator i = inIx.find(name);
219 if (i == inIx.end()) throw NotFoundExc("TOIProcessor::addInput "+
220 name+" not declared");
221 inTOIs[(*i).second] = toi;
222 toi->addConsumer(this); // $CHECK$ Reza 13/3/2001
223}
224
225void TOIProcessor::addOutput(string name, TOI* toi) {
226 chkinit();
227 map<string, int>::iterator i = outIx.find(name);
228 if (i == outIx.end()) throw NotFoundExc("TOIProcessor::addOutput "+
229 name+" not declared");
230 toi->setProducer(this);
231 outTOIs[(*i).second] = toi;
232}
233
234string TOIProcessor::getOutName(int i) {
235 if (i > outIx.size()) throw RangeCheckError("TOIProcessor::getOutName "
236 " out of bound");
237 map<string, int>::iterator j;
238 for(j=outIx.begin(); j!= outIx.end(); j++)
239 if ((*j).second == i) return (*j).first;
240
241 throw RangeCheckError("TOIProcessor::getOutName Not found index !");
242}
243
244string TOIProcessor::getInName(int i) {
245 if (i > inIx.size()) throw RangeCheckError("TOIProcessor::getInName "
246 " out of bound");
247 map<string, int>::iterator j;
248 for(j=inIx.begin(); j!= inIx.end(); j++)
249 if ((*j).second == i) return (*j).first;
250
251 throw RangeCheckError("TOIProcessor::getOutName Not found index !");
252}
253
254void TOIProcessor::run() {
255
256}
257
258void TOIProcessor::warnPutDone() {
259 int n = outIx.size();
260 for (int i=0; i<n; i++) {
261 TOI* toi = outTOIs[i];
262 if (toi) {
263 toi->putDone();
264 }
265 }
266}
267
268void* TOIProcessor::ThreadStart(void* arg) {
269
270 TOIProcessor* p = (TOIProcessor*) arg;
271 // cout << p->name << " new thread running " << pthread_self() << endl;
272 try {
273 p->run();
274 }
275 catch (PThrowable & exc) {
276 cerr << "\n TOIProcessor::ThreadStart() Catched Exception TOIProcessor@"
277 << hex << p << dec << "\n"
278 << (string)typeid(exc).name()
279 << " - Msg= " << exc.Msg() << endl;
280 }
281 catch (const std::exception & sex) {
282 cerr << "\n TOIProcessor::ThreadStart() Catched std::exception TOIProcessor@"
283 << hex << p << dec << "\n"
284 << (string)typeid(sex).name() << endl;
285 }
286 catch (...) {
287 cerr << "\n TOIProcessor::ThreadStart() Catched ... exception TOIProcessor@"
288 << hex << p << dec << endl;
289 }
290 p->warnPutDone();
291 pthread_exit(NULL);
292 // cout << p->name << " thread done " << pthread_self() << endl;
293 return NULL;
294}
295
296#ifdef Linux
297#define pthread_mutexattr_settype pthread_mutexattr_setkind_np
298#define PTHREAD_MUTEX_ERRORCHECK PTHREAD_MUTEX_ERRORCHECK_NP
299#define pthread_mutex_setname_np(a,b,c)
300#define pthread_cond_setname_np(a,b,c)
301#endif
302
303void TOIProcessor::start() {
304 pthread_cond_init(&dataReady, NULL);
305 pthread_mutexattr_init(&mutattr);
306 // pthread_mutexattr_settype(&mutattr, PTHREAD_MUTEX_ERRORCHECK);
307 pthread_mutex_init(&mutex, &mutattr);
308 //pthread_mutex_setname_np(&mutex, (name + "_proc_mutex").c_str(), 0);
309 //pthread_cond_setname_np(&dataReady, (name + "_proc_cond").c_str(), 0);
310 //cout << name << " starting thread " << &thread << endl;
311 pthread_create(&thread, NULL, ThreadStart, this);
312 TOIManager::getManager()->addThread(&thread);
313}
314
315#ifndef NO_SOPHYA
316/* ---- l'interface va etre modifiee, NE PAS UTILISER
317Array TOIProcessor::getData(int toiIndex, int iStart, int iEnd) {
318 TOI* toi = getInputTOI(toiIndex);
319 toi->waitForData(iStart, iEnd);
320 return toi->getData(iStart, iEnd);
321}
322
323Array TOIProcessor::getError(int toiIndex, int iStart, int iEnd) {
324 TOI* toi = getInputTOI(toiIndex);
325 toi->waitForData(iStart, iEnd);
326 return toi->getError(iStart, iEnd);
327}
328
329TArray<int_4> TOIProcessor::getFlag(int toiIndex, int iStart, int iEnd) {
330 TOI* toi = getInputTOI(toiIndex);
331 toi->waitForData(iStart, iEnd);
332 return toi->getFlag(iStart, iEnd);
333}
334 l'interface va etre modifiee, NE PAS UTILISER ---- */
335#endif
336
337double TOIProcessor::getData(int toiIndex, int i) {
338 TOI* toi = getInputTOI(toiIndex);
339 if (toi->needSyncOldWay()) toi->waitForData(i); // seulement pour autre que segmented
340 autoWontNeed(i);
341 return toi->getData(i);
342}
343
344void TOIProcessor::getData(int toiIndex, int i, double &data, uint_8 &flag)
345{
346 TOI* toi = getInputTOI(toiIndex);
347 if (toi->needSyncOldWay()) toi->waitForData(i); // seulement pour autre que segmented
348 toi->getData(i, data, flag);
349 autoWontNeed(i);
350 return;
351}
352
353void TOIProcessor::getData(int toiIndex, int i, int n, double* d)
354{
355 TOI* toi = getInputTOI(toiIndex);
356 if (toi->needSyncOldWay()) toi->waitForData(i+n); // seulement pour autre que segmented
357 toi->getData(i, n, d);
358 autoWontNeed(i);
359 return;
360}
361
362void TOIProcessor::getData(int toiIndex, int i, int n, double* d, uint_8* f)
363{
364 TOI* toi = getInputTOI(toiIndex);
365 if (toi->needSyncOldWay()) toi->waitForData(i+n); // seulement pour autre que segmented
366 toi->getData(i, n, d, f);
367 autoWontNeed(i);
368 return;
369}
370
371
372/*RZCMV
373double TOIProcessor::getError(int toiIndex, int i) {
374 TOI* toi = getInputTOI(toiIndex);
375 toi->waitForData(i);
376 return toi->getError(i);
377}
378
379int_4 TOIProcessor::getFlag(int toiIndex, int i) {
380 TOI* toi = getInputTOI(toiIndex);
381 toi->waitForData(i);
382 return toi->getFlag(i);
383}
384*/
385
386void TOIProcessor::setNeededHistory(int nsamples) {
387 neededHistory = nsamples;
388}
389
390void TOIProcessor::wontNeedBefore(int i) {
391 if (i<wontNeedValue) return;
392 wontNeedValue = i;
393 for (int j=0; j< (int) inIx.size(); j++) {
394 // $CHECK$ Reza 6/5/2001 Protection sur non connected TOI
395 if (inTOIs[j]) inTOIs[j]->wontNeedBefore(i);
396 }
397}
398
399void TOIProcessor::autoWontNeed(int iCur) {
400 if (neededHistory <=0) return;
401 if (iCur < lastAWN + neededHistory/10) return;
402 lastAWN = iCur;
403 // cout << name << " wontNeedBefore " << iCur-neededHistory << endl;
404 wontNeedBefore(iCur-neededHistory);
405}
406
407void TOIProcessor::notify() {
408 lock();
409 pthread_cond_broadcast(&dataReady);
410 unlock();
411}
412
413
414void TOIProcessor::putData(int toiIndex, int i, double value, uint_8 flg) {
415 TOI* toi = getOutputTOI(toiIndex);
416 if (toi == NULL) return;
417 toi->putData(i, value, flg);
418 // autoWontNeed(i); // now done on getData
419 if (toi->needSyncOldWay()) notify(); // seulement pour non segmented
420}
421
422void TOIProcessor::putData(int toiIndex, int i, int n, double const* val,
423 uint_8 const* flg) {
424 TOI* toi = getOutputTOI(toiIndex);
425 if (toi == NULL) return;
426 toi->putData(i, n, val, flg);
427 if (toi->needSyncOldWay()) notify(); // seulement pour non segmented
428}
429
430/*RZCMV
431void TOIProcessor::putDataError(int toiIndex, int i, double value,
432 double error, int_4 flg) {
433 TOI* toi = getOutputTOI(toiIndex);
434 if (toi == NULL)
435 throw NullPtrError("TOIProcessor::putDataError() - Not assigned TOI !");
436 toi->putDataError(i, value, error, flg);
437 autoWontNeed(i);
438 notify();
439}
440*/
441
Note: See TracBrowser for help on using the repository browser.