// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL // Eric Aubourg // Christophe Magneville // Reza Ansari // $Id: toiprocessor.cc,v 1.20 2001-11-16 14:10:42 aubourg Exp $ #include "toiprocessor.h" #include "toimanager.h" #include #ifdef HAVE_VALUES_H #include #endif #ifndef MAXINT #define MAXINT 2147483647 #endif #ifdef HAVE_STDINT_H #include #endif #ifdef WITH_SOPHYA #include "pexceptions.h" #else #include "apexceptions.h" #endif #define pthread_mutexattr_setkind_np pthread_mutexattr_settype TOIProcessor::TOIProcessor() { //cout << "TOIProcessor::TOIProcessor" << endl; outTOIs = NULL; inTOIs = NULL; inited=false; upExtra = 0; lowExtra = 0; minOut = -1; maxOut = -1; neededHistory = 1000; lastAWN = 0; wontNeedValue = -1; } TOIProcessor::~TOIProcessor() { delete[] outTOIs; delete[] inTOIs; //if (mutex) pthread_mutex_destroy(&mutex); //if (dataReady) pthread_cond_destroy(&dataReady); pthread_detach(thread); } void TOIProcessor::init() { //cout << "TOIProcessor::init" << endl; } void TOIProcessor::afterinit() { int i; inTOIs = new (TOI*[inIx.size()]); for(i=0; i::iterator i = inIx.find(toi); if (i == inIx.end()) return -1; return (*i).second; } int TOIProcessor::getOutputTOIIndex(string toi) { chkinit(); map::iterator i = outIx.find(toi); if (i == outIx.end()) return -1; return (*i).second; } // Methodes rajoutees par Reza 11/3/2001 TOI* TOIProcessor::getInputTOI(int toiIndex) { // chkinit(); if (toiIndex >= inIx.size()) throw RangeCheckError("TOIProcessor::getInputTOI() out of bound toiIndex"); TOI* toi = inTOIs[toiIndex]; if (toi == NULL) throw NullPtrError("TOIProcessor::getInputTOI() - Not assigned TOI !"); return(toi); } TOI* TOIProcessor::getOutputTOI(int toiIndex) { // chkinit(); if (toiIndex >= outIx.size()) throw RangeCheckError("TOIProcessor::getOutputTOI() out of bound toiIndex"); TOI* toi = outTOIs[toiIndex]; if (toi == NULL) throw NullPtrError("TOIProcessor::getOutputTOI() - Not assigned TOI !"); return(toi); } bool TOIProcessor::checkInputTOIIndex(int toiIndex) { if (toiIndex >= inIx.size()) return false; if (inTOIs[toiIndex] == NULL) return false; return true; } bool TOIProcessor::checkOutputTOIIndex(int toiIndex) { if (toiIndex >= outIx.size()) return false; if (outTOIs[toiIndex] == NULL) return false; return true; } void TOIProcessor::PrintStatus(::ostream & os) { chkinit(); os << " TOIProcessor::PrintStatus() - Name= " << name << " MinIn=" << getMinIn() << " MaxIn=" << getMaxIn() << endl; os << " --- Inputs N= " << inIx.size() << endl; int k; for(k=0; kgetName() << endl; else os << " NO TOI " << endl; } os << " --- Outputs N= " << outIx.size() << endl; for(k=0; kgetName() << endl; else os << " NO TOI " << endl; } os << endl; return; } // Fin rajout Reza 11/3/2001 void TOIProcessor::addInput(string name, TOI* toi) { chkinit(); map::iterator i = inIx.find(name); if (i == inIx.end()) throw NotFoundExc("TOIProcessor::addInput "+ name+" not declared"); inTOIs[(*i).second] = toi; toi->addConsumer(this); // $CHECK$ Reza 13/3/2001 } void TOIProcessor::addOutput(string name, TOI* toi) { chkinit(); map::iterator i = outIx.find(name); if (i == outIx.end()) throw NotFoundExc("TOIProcessor::addOutput "+ name+" not declared"); toi->setProducer(this); outTOIs[(*i).second] = toi; } string TOIProcessor::getOutName(int i) { if (i > outIx.size()) throw RangeCheckError("TOIProcessor::getOutName " " out of bound"); map::iterator j; for(j=outIx.begin(); j!= outIx.end(); j++) if ((*j).second == i) return (*j).first; throw RangeCheckError("TOIProcessor::getOutName Not found index !"); } string TOIProcessor::getInName(int i) { if (i > inIx.size()) throw RangeCheckError("TOIProcessor::getInName " " out of bound"); map::iterator j; for(j=inIx.begin(); j!= inIx.end(); j++) if ((*j).second == i) return (*j).first; throw RangeCheckError("TOIProcessor::getOutName Not found index !"); } void TOIProcessor::run() { } void TOIProcessor::warnPutDone() { int n = outIx.size(); for (int i=0; iputDone(); } } } void* TOIProcessor::ThreadStart(void* arg) { TOIProcessor* p = (TOIProcessor*) arg; // cout << p->name << " new thread running " << pthread_self() << endl; p->run(); p->warnPutDone(); pthread_exit(NULL); // cout << p->name << " thread done " << pthread_self() << endl; return NULL; } #ifdef Linux #define pthread_mutexattr_settype pthread_mutexattr_setkind_np #define PTHREAD_MUTEX_ERRORCHECK PTHREAD_MUTEX_ERRORCHECK_NP #define pthread_mutex_setname_np(a,b,c) #define pthread_cond_setname_np(a,b,c) #endif void TOIProcessor::start() { pthread_cond_init(&dataReady, NULL); pthread_mutexattr_init(&mutattr); // pthread_mutexattr_settype(&mutattr, PTHREAD_MUTEX_ERRORCHECK); pthread_mutex_init(&mutex, &mutattr); //pthread_mutex_setname_np(&mutex, (name + "_proc_mutex").c_str(), 0); //pthread_cond_setname_np(&dataReady, (name + "_proc_cond").c_str(), 0); //cout << name << " starting thread " << &thread << endl; pthread_create(&thread, NULL, ThreadStart, this); TOIManager::getManager()->addThread(&thread); } #ifndef NO_SOPHYA /* ---- l'interface va etre modifiee, NE PAS UTILISER Array TOIProcessor::getData(int toiIndex, int iStart, int iEnd) { TOI* toi = getInputTOI(toiIndex); toi->waitForData(iStart, iEnd); return toi->getData(iStart, iEnd); } Array TOIProcessor::getError(int toiIndex, int iStart, int iEnd) { TOI* toi = getInputTOI(toiIndex); toi->waitForData(iStart, iEnd); return toi->getError(iStart, iEnd); } TArray TOIProcessor::getFlag(int toiIndex, int iStart, int iEnd) { TOI* toi = getInputTOI(toiIndex); toi->waitForData(iStart, iEnd); return toi->getFlag(iStart, iEnd); } l'interface va etre modifiee, NE PAS UTILISER ---- */ #endif double TOIProcessor::getData(int toiIndex, int i) { TOI* toi = getInputTOI(toiIndex); if (toi->needSyncOldWay()) toi->waitForData(i); // seulement pour autre que segmented autoWontNeed(i); return toi->getData(i); } void TOIProcessor::getData(int toiIndex, int i, double &data, uint_8 &flag) { TOI* toi = getInputTOI(toiIndex); if (toi->needSyncOldWay()) toi->waitForData(i); // seulement pour autre que segmented toi->getData(i, data, flag); autoWontNeed(i); return; } void TOIProcessor::getData(int toiIndex, int i, int n, double* d) { TOI* toi = getInputTOI(toiIndex); if (toi->needSyncOldWay()) toi->waitForData(i+n); // seulement pour autre que segmented toi->getData(i, n, d); autoWontNeed(i); return; } void TOIProcessor::getData(int toiIndex, int i, int n, double* d, uint_8* f) { TOI* toi = getInputTOI(toiIndex); if (toi->needSyncOldWay()) toi->waitForData(i+n); // seulement pour autre que segmented toi->getData(i, n, d, f); autoWontNeed(i); return; } /*RZCMV double TOIProcessor::getError(int toiIndex, int i) { TOI* toi = getInputTOI(toiIndex); toi->waitForData(i); return toi->getError(i); } int_4 TOIProcessor::getFlag(int toiIndex, int i) { TOI* toi = getInputTOI(toiIndex); toi->waitForData(i); return toi->getFlag(i); } */ void TOIProcessor::setNeededHistory(int nsamples) { neededHistory = nsamples; } void TOIProcessor::wontNeedBefore(int i) { if (iwontNeedBefore(i); } } void TOIProcessor::autoWontNeed(int iCur) { if (neededHistory <=0) return; if (iCur < lastAWN + neededHistory/10) return; lastAWN = iCur; // cout << name << " wontNeedBefore " << iCur-neededHistory << endl; wontNeedBefore(iCur-neededHistory); } void TOIProcessor::notify() { lock(); pthread_cond_broadcast(&dataReady); unlock(); } void TOIProcessor::putData(int toiIndex, int i, double value, uint_8 flg) { TOI* toi = getOutputTOI(toiIndex); toi->putData(i, value, flg); // autoWontNeed(i); // now done on getData if (toi->needSyncOldWay()) notify(); // seulement pour non segmented } void TOIProcessor::putData(int toiIndex, int i, int n, double const* val, uint_8 const* flg) { TOI* toi = getOutputTOI(toiIndex); toi->putData(i, n, val, flg); if (toi->needSyncOldWay()) notify(); // seulement pour non segmented } /*RZCMV void TOIProcessor::putDataError(int toiIndex, int i, double value, double error, int_4 flg) { TOI* toi = getOutputTOI(toiIndex); if (toi == NULL) throw NullPtrError("TOIProcessor::putDataError() - Not assigned TOI !"); toi->putDataError(i, value, error, flg); autoWontNeed(i); notify(); } */