// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL // Eric Aubourg // Christophe Magneville // Reza Ansari // $Id: toiprocessor.cc,v 1.26 2002-09-09 15:33:15 aubourg Exp $ #include "toiprocessor.h" #include "toimanager.h" #include #include // ajout pour linux #ifdef HAVE_VALUES_H #include #endif #ifndef MAXINT #define MAXINT 2147483647 #endif #ifdef HAVE_STDINT_H #include #endif #ifdef WITH_SOPHYA #include "pexceptions.h" #else #include "apexceptions.h" #endif #define pthread_mutexattr_setkind_np pthread_mutexattr_settype TOIProcessor::TOIProcessor() { //cout << "TOIProcessor::TOIProcessor" << endl; outTOIs = NULL; inTOIs = NULL; inited=false; upExtra = 0; lowExtra = 0; minOut = -1; maxOut = -1; forcedMinIn = -1; forcedMaxIn = -1; neededHistory = 1000; lastAWN = 0; wontNeedValue = -1; // ajout vf 23/07/2002 cout << "Creating processor" << endl; // Pour referencer un TOIProcessor, il faut recuperer le TOIManager correspondant TOIManager::getManager()->registerProcessor(this); // initialisation des limites du sample (par defaut tout le fichier) snBegin = 0; snEnd = MAXINT; snMin=MAXINT; snMax=0; // par defaut aucune condition requestedSample = false; } TOIProcessor::~TOIProcessor() { delete[] outTOIs; delete[] inTOIs; //if (mutex) pthread_mutex_destroy(&mutex); //if (dataReady) pthread_cond_destroy(&dataReady); // Il ne faut apparemment pas appeler pthread_detach si on a fait join avant // Extrait du man (Reza - Mai 2002) //man: The pthread_join(3) routine also detaches the target thread after //man: pthread_join(3) returns successfully. // pthread_detach(thread); $CHECK$ - Reza Mai 2002 } void TOIProcessor::init() { //cout << "TOIProcessor::init" << endl; } void TOIProcessor::afterinit() { int i; inTOIs = new (TOI*[inIx.size()]); for(i=0; i::iterator i = inIx.find(toi); if (i == inIx.end()) return -1; return (*i).second; } int TOIProcessor::getOutputTOIIndex(string toi) { chkinit(); map::iterator i = outIx.find(toi); if (i == outIx.end()) return -1; return (*i).second; } // Methodes rajoutees par Reza 11/3/2001 TOI* TOIProcessor::getInputTOI(int toiIndex) { // chkinit(); if (toiIndex >= inIx.size()) throw RangeCheckError("TOIProcessor::getInputTOI() out of bound toiIndex"); TOI* toi = inTOIs[toiIndex]; if (toi == NULL) throw NullPtrError("TOIProcessor::getInputTOI() - Not assigned TOI !"); return(toi); } TOI* TOIProcessor::getOutputTOI(int toiIndex) { // chkinit(); if (toiIndex >= outIx.size()) throw RangeCheckError("TOIProcessor::getOutputTOI() out of bound toiIndex"); TOI* toi = outTOIs[toiIndex]; // if (toi == NULL) // throw NullPtrError("TOIProcessor::getOutputTOI() - Not assigned TOI !"); return(toi); } bool TOIProcessor::checkInputTOIIndex(int toiIndex) { if (toiIndex >= inIx.size()) return false; if (inTOIs[toiIndex] == NULL) return false; return true; } bool TOIProcessor::checkOutputTOIIndex(int toiIndex) { if (toiIndex >= outIx.size()) return false; if (outTOIs[toiIndex] == NULL) return false; return true; } void TOIProcessor::PrintStatus(::ostream & os) { chkinit(); os << " TOIProcessor::PrintStatus() - Name= " << name << " MinIn=" << getMinIn() << " MaxIn=" << getMaxIn() << endl; os << " --- Inputs N= " << inIx.size() << endl; int k; for(k=0; kgetName() << endl; else os << " NO TOI " << endl; } os << " --- Outputs N= " << outIx.size() << endl; for(k=0; kgetName() << endl; else os << " NO TOI " << endl; } os << endl; return; } // Fin rajout Reza 11/3/2001 void TOIProcessor::addInput(string name, TOI* toi) { chkinit(); map::iterator i = inIx.find(name); if (i == inIx.end()) throw NotFoundExc("TOIProcessor::addInput "+ name+" not declared"); inTOIs[(*i).second] = toi; toi->addConsumer(this); // $CHECK$ Reza 13/3/2001 } void TOIProcessor::addOutput(string name, TOI* toi) { chkinit(); map::iterator i = outIx.find(name); if (i == outIx.end()) throw NotFoundExc("TOIProcessor::addOutput "+ name+" not declared"); toi->setProducer(this); outTOIs[(*i).second] = toi; } string TOIProcessor::getOutName(int i) { if (i > outIx.size()) throw RangeCheckError("TOIProcessor::getOutName " " out of bound"); map::iterator j; for(j=outIx.begin(); j!= outIx.end(); j++) if ((*j).second == i) return (*j).first; throw RangeCheckError("TOIProcessor::getOutName Not found index !"); } string TOIProcessor::getInName(int i) { if (i > inIx.size()) throw RangeCheckError("TOIProcessor::getInName " " out of bound"); map::iterator j; for(j=inIx.begin(); j!= inIx.end(); j++) if ((*j).second == i) return (*j).first; throw RangeCheckError("TOIProcessor::getOutName Not found index !"); } void TOIProcessor::run() { } void TOIProcessor::warnPutDone() { int n = outIx.size(); for (int i=0; iputDone(); } } } void* TOIProcessor::ThreadStart(void* arg) { TOIProcessor* p = (TOIProcessor*) arg; // cout << p->name << " new thread running " << pthread_self() << endl; try { p->run(); } catch (PThrowable & exc) { cerr << "\n TOIProcessor::ThreadStart() Catched Exception TOIProcessor@" << hex << p << dec << "\n" << (string)typeid(exc).name() << " - Msg= " << exc.Msg() << endl; } catch (const std::exception & sex) { cerr << "\n TOIProcessor::ThreadStart() Catched std::exception TOIProcessor@" << hex << p << dec << "\n" << (string)typeid(sex).name() << endl; } catch (...) { cerr << "\n TOIProcessor::ThreadStart() Catched ... exception TOIProcessor@" << hex << p << dec << endl; } p->warnPutDone(); pthread_exit(NULL); // cout << p->name << " thread done " << pthread_self() << endl; return NULL; } #ifdef Linux #define pthread_mutexattr_settype pthread_mutexattr_setkind_np #define PTHREAD_MUTEX_ERRORCHECK PTHREAD_MUTEX_ERRORCHECK_NP #define pthread_mutex_setname_np(a,b,c) #define pthread_cond_setname_np(a,b,c) #endif void TOIProcessor::start() { pthread_cond_init(&dataReady, NULL); pthread_mutexattr_init(&mutattr); // pthread_mutexattr_settype(&mutattr, PTHREAD_MUTEX_ERRORCHECK); pthread_mutex_init(&mutex, &mutattr); //pthread_mutex_setname_np(&mutex, (name + "_proc_mutex").c_str(), 0); //pthread_cond_setname_np(&dataReady, (name + "_proc_cond").c_str(), 0); //cout << name << " starting thread " << &thread << endl; pthread_create(&thread, NULL, ThreadStart, this); TOIManager::getManager()->addThread(&thread); } #ifndef NO_SOPHYA /* ---- l'interface va etre modifiee, NE PAS UTILISER Array TOIProcessor::getData(int toiIndex, int iStart, int iEnd) { TOI* toi = getInputTOI(toiIndex); toi->waitForData(iStart, iEnd); return toi->getData(iStart, iEnd); } Array TOIProcessor::getError(int toiIndex, int iStart, int iEnd) { TOI* toi = getInputTOI(toiIndex); toi->waitForData(iStart, iEnd); return toi->getError(iStart, iEnd); } TArray TOIProcessor::getFlag(int toiIndex, int iStart, int iEnd) { TOI* toi = getInputTOI(toiIndex); toi->waitForData(iStart, iEnd); return toi->getFlag(iStart, iEnd); } l'interface va etre modifiee, NE PAS UTILISER ---- */ #endif double TOIProcessor::getData(int toiIndex, int i) { TOI* toi = getInputTOI(toiIndex); if (toi->needSyncOldWay()) toi->waitForData(i); // seulement pour autre que segmented autoWontNeed(i); return toi->getData(i); } void TOIProcessor::getData(int toiIndex, int i, double &data, uint_8 &flag) { TOI* toi = getInputTOI(toiIndex); if (toi->needSyncOldWay()) toi->waitForData(i); // seulement pour autre que segmented toi->getData(i, data, flag); autoWontNeed(i); return; } void TOIProcessor::getData(int toiIndex, int i, int n, double* d) { TOI* toi = getInputTOI(toiIndex); if (toi->needSyncOldWay()) toi->waitForData(i+n); // seulement pour autre que segmented toi->getData(i, n, d); autoWontNeed(i); return; } void TOIProcessor::getData(int toiIndex, int i, int n, double* d, uint_8* f) { TOI* toi = getInputTOI(toiIndex); if (toi->needSyncOldWay()) toi->waitForData(i+n); // seulement pour autre que segmented toi->getData(i, n, d, f); autoWontNeed(i); return; } /*RZCMV double TOIProcessor::getError(int toiIndex, int i) { TOI* toi = getInputTOI(toiIndex); toi->waitForData(i); return toi->getError(i); } int_4 TOIProcessor::getFlag(int toiIndex, int i) { TOI* toi = getInputTOI(toiIndex); toi->waitForData(i); return toi->getFlag(i); } */ void TOIProcessor::setNeededHistory(int nsamples) { neededHistory = nsamples; } void TOIProcessor::wontNeedBefore(int i) { if (iwontNeedBefore(i); } } void TOIProcessor::autoWontNeed(int iCur) { if (neededHistory <=0) return; if (iCur < lastAWN + neededHistory/10) return; lastAWN = iCur; // cout << name << " wontNeedBefore " << iCur-neededHistory << endl; wontNeedBefore(iCur-neededHistory); } void TOIProcessor::notify() { lock(); pthread_cond_broadcast(&dataReady); unlock(); } void TOIProcessor::putData(int toiIndex, int i, double value, uint_8 flg) { TOI* toi = getOutputTOI(toiIndex); if (toi == NULL) return; toi->putData(i, value, flg); // autoWontNeed(i); // now done on getData if (toi->needSyncOldWay()) notify(); // seulement pour non segmented } void TOIProcessor::putData(int toiIndex, int i, int n, double const* val, uint_8 const* flg) { TOI* toi = getOutputTOI(toiIndex); if (toi == NULL) return; toi->putData(i, n, val, flg); if (toi->needSyncOldWay()) notify(); // seulement pour non segmented } /*RZCMV void TOIProcessor::putDataError(int toiIndex, int i, double value, double error, int_4 flg) { TOI* toi = getOutputTOI(toiIndex); if (toi == NULL) throw NullPtrError("TOIProcessor::putDataError() - Not assigned TOI !"); toi->putDataError(i, value, error, flg); autoWontNeed(i); notify(); } */ // ajout vf 29/07/2002 // parametrage de l'echantillon a produire (sans verification) void TOIProcessor::setRequestedSample(long begin, long end) { requestedSample = true; snBegin = begin; snEnd = end; // snMin = snBegin; // snMax = snEnd; } bool TOIProcessor::checkSampleLimits(int pass) { long minTmp=MAXINT; long maxTmp=-1; return checkSampleLimits(minTmp, maxTmp, pass); cout << "toiprocessor : limites verifiees : " << snBegin << " , " << snEnd << " : " << endl; } bool TOIProcessor::checkSampleLimits(long& min, long& max, int pass) { bool sample_input_ok=true; bool sample_ok=true; /* cout << "check " << pass << " " << name << " in " << min << " - " << max << " ; " << snMin << " - " << snMax << " ; " << snBegin << " - " << snEnd << endl;*/ if (pass == 3) { if (snMin < snMax) { snBegin = snMin; snEnd = snMax; } return true; } // on verifie qu'on peut effectivement produire if (min < snBegin) { min = snBegin; } if (max > snEnd) { max = snEnd; } bool noConst = (min>max); if (pass == 2 && noConst) { min = snBegin; max = snEnd; } int n = inIx.size(); // parcours de toutes les entrees et mise a jour au plus restrictif for (int i=0; i0) { min_Input = min - lowExtra; } else { min_Input = min; } if (maxcheckSampleLimits(min_Input, max_Input, pass); //Ajustement des limites si plus restrictif if (min < max) { // On nous a demande des bornes -> if ((min_Input + lowExtra) > min) { min = min_Input + lowExtra; } if ((max_Input - upExtra) < max) { max = max_Input - upExtra; } } else { // On nous demande tout ce qu'on peut faire -> MAJ snBegin if ((min_Input + lowExtra) > snBegin) { snBegin = min_Input + lowExtra; } if ((max_Input - upExtra) < snEnd) { snEnd = max_Input - upExtra; } } if (sample_input_ok == false) { sample_ok = false; } } } //Ajustement des limites si intervalle plus large if (!noConst) { if (min < snMin) { snMin = min; } if (max > snMax) { snMax = max; } } min=minsnMax?snMax:max; // cas sans contraintes, on retourne nos bornes if (min>max) { min = snBegin; max = snEnd; } /* cout << "check " << pass << " " << name << " out " << min << " - " << max << " ; " << snMin << " - " << snMax << " ; " << snBegin << " - " << snEnd << endl;*/ return sample_ok; } // pour verification si le processeur est parametre bool TOIProcessor::getRequested() { return requestedSample; } // affichage des limites void TOIProcessor::printLimits() { cout << "toiprocessor " << name <<" : limites calculees : " << snBegin << " , " << snEnd << endl; } TOI* TOIProcessor::getOutToi(string sortie) { // recherche du nom de la sortie et verification si le toi existe deja map::iterator i = outIx.find(sortie); if (i == outIx.end()) { return NULL; } else { return outTOIs[(*i).second]; } }