#include "toiprocessor.h" #include "toimanager.h" #include #include #ifdef WITH_SOPHYA #include "pexceptions.h" #else #include "apexceptions.h" #endif #define pthread_mutexattr_setkind_np pthread_mutexattr_settype TOIProcessor::TOIProcessor() { //cout << "TOIProcessor::TOIProcessor" << endl; outTOIs = NULL; inTOIs = NULL; inited=false; upExtra = 0; lowExtra = 0; minOut = -1; maxOut = -1; neededHistory = 1000; lastAWN = 0; wontNeedValue = -1; } TOIProcessor::~TOIProcessor() { delete[] outTOIs; delete[] inTOIs; //if (mutex) pthread_mutex_destroy(&mutex); //if (dataReady) pthread_cond_destroy(&dataReady); pthread_detach(thread); } void TOIProcessor::init() { //cout << "TOIProcessor::init" << endl; } void TOIProcessor::afterinit() { inTOIs = new (TOI*[inIx.size()]); outTOIs = new (TOI*[outIx.size()]); } int TOIProcessor::getMinOut() { //cout << name << "minout" << endl; if (minOut < 0) minOut = calcMinOut(); //cout << name << "minout=" << minOut << endl; return minOut; } int TOIProcessor::getMaxOut() { //cout << name << "maxout" << endl; if (maxOut < 0) maxOut = calcMaxOut(); //cout << name << "maxout=" << maxOut << endl; return maxOut; } int TOIProcessor::calcMinOut() { return getMinIn() + lowExtra; } int TOIProcessor::calcMaxOut() { return getMaxIn() - upExtra; } int TOIProcessor::getMinIn() { int nIn = inIx.size(); int minIn = 0; for (int i=0; igetMinSn(); if (x > minIn) minIn = x; } return minIn; } int TOIProcessor::getMaxIn() { int nIn = inIx.size(); int maxIn = MAXINT; for (int i=0; igetMaxSn(); if (x < maxIn) maxIn = x; } return maxIn; } int TOIProcessor::declareInput(string toi) { if (inIx.find(toi) != inIx.end()) throw DuplicateIdExc("TOIProcessor::declareInput : "+toi+" already declared"); int i = inIx.size(); inIx[toi] = i; return i; } int TOIProcessor::declareOutput(string toi) { if (outIx.find(toi) != outIx.end()) throw DuplicateIdExc("TOIProcessor::declareInput : "+toi+" already declared"); int i = outIx.size(); outIx[toi] = i; return i; } int TOIProcessor::getInputTOIIndex(string toi) { chkinit(); map::iterator i = inIx.find(toi); if (i == inIx.end()) return -1; return (*i).second; } int TOIProcessor::getOutputTOIIndex(string toi) { chkinit(); map::iterator i = outIx.find(toi); if (i == outIx.end()) return -1; return (*i).second; } void TOIProcessor::addInput(string name, TOI* toi) { chkinit(); map::iterator i = inIx.find(name); if (i == inIx.end()) throw NotFoundExc("TOIProcessor::addInput "+ name+" not declared"); inTOIs[(*i).second] = toi; } void TOIProcessor::addOutput(string name, TOI* toi) { chkinit(); map::iterator i = outIx.find(name); if (i == outIx.end()) throw NotFoundExc("TOIProcessor::addOutput "+ name+" not declared"); toi->setProducer(this); outTOIs[(*i).second] = toi; } string TOIProcessor::getOutName(int i) { if (i > outIx.size()) throw RangeCheckError("TOIProcessor::getOutName " " out of bound"); map::iterator j = outIx.begin(); while (i) {i--; j++;}; return (*j).first; } string TOIProcessor::getInName(int i) { if (i > inIx.size()) throw RangeCheckError("TOIProcessor::getInName " " out of bound"); map::iterator j = inIx.begin(); while (i) {i--; j++;}; return (*j).first; } void TOIProcessor::run() { } void* TOIProcessor::ThreadStart(void* arg) { TOIProcessor* p = (TOIProcessor*) arg; // cout << p->name << " new thread running " << pthread_self() << endl; p->run(); // cout << p->name << " thread done " << pthread_self() << endl; return NULL; } #ifdef Linux #define pthread_mutexattr_settype pthread_mutexattr_setkind_np #define PTHREAD_MUTEX_ERRORCHECK PTHREAD_MUTEX_ERRORCHECK_NP #define pthread_mutex_setname_np(a,b,c) #define pthread_cond_setname_np(a,b,c) #endif void TOIProcessor::start() { pthread_cond_init(&dataReady, NULL); pthread_mutexattr_init(&mutattr); // pthread_mutexattr_settype(&mutattr, PTHREAD_MUTEX_ERRORCHECK); pthread_mutex_init(&mutex, &mutattr); //pthread_mutex_setname_np(&mutex, (name + "_proc_mutex").c_str(), 0); //pthread_cond_setname_np(&dataReady, (name + "_proc_cond").c_str(), 0); //cout << name << " starting thread " << &thread << endl; pthread_create(&thread, NULL, ThreadStart, this); TOIManager::getManager()->addThread(&thread); } #ifndef NO_SOPHYA Array TOIProcessor::getData(int toiIndex, int iStart, int iEnd) { TOI* toi = inTOIs[toiIndex]; toi->waitForData(iStart, iEnd); return toi->getData(iStart, iEnd); } Array TOIProcessor::getError(int toiIndex, int iStart, int iEnd) { TOI* toi = inTOIs[toiIndex]; toi->waitForData(iStart, iEnd); return toi->getError(iStart, iEnd); } TArray TOIProcessor::getFlag(int toiIndex, int iStart, int iEnd) { TOI* toi = inTOIs[toiIndex]; toi->waitForData(iStart, iEnd); return toi->getFlag(iStart, iEnd); } #endif double TOIProcessor::getData(int toiIndex, int i) { TOI* toi = inTOIs[toiIndex]; toi->waitForData(i); return toi->getData(i); } double TOIProcessor::getError(int toiIndex, int i) { TOI* toi = inTOIs[toiIndex]; toi->waitForData(i); return toi->getError(i); } int_4 TOIProcessor::getFlag(int toiIndex, int i) { TOI* toi = inTOIs[toiIndex]; toi->waitForData(i); return toi->getFlag(i); } void TOIProcessor::setNeededHistory(int nsamples) { neededHistory = nsamples; } void TOIProcessor::wontNeedBefore(int i) { if (iwontNeedBefore(i); } } void TOIProcessor::autoWontNeed(int iCur) { if (neededHistory <=0) return; if (iCur < lastAWN + neededHistory) return; lastAWN = iCur; //cout << name << " wontNeedBefore " << iCur-neededHistory << endl; wontNeedBefore(iCur-neededHistory); } void TOIProcessor::notify() { lock(); /* if (mutex.__m_lock.__status == 0) { cout << "wait without lock" << endl; abort(); }*/ pthread_cond_broadcast(&dataReady); unlock(); } void TOIProcessor::putData(int toiIndex, int i, double value, int_4 flg) { TOI* toi = outTOIs[toiIndex]; toi->putData(i, value, flg); autoWontNeed(i); notify(); } void TOIProcessor::putDataError(int toiIndex, int i, double value, double error, int_4 flg) { TOI* toi = outTOIs[toiIndex]; toi->putDataError(i, value, error, flg); autoWontNeed(i); notify(); }