#include #include #include #include "parlex.h" using namespace std; namespace SOPHYA { /*! \class ParallelTaskFunction \ingroup SysTools \brief ParallelTaskInterface implementation for functions */ //! Constructor with the specification of the function to be executed ParallelTaskFunction::ParallelTaskFunction(ParalExFunction f) : parfunc_(f) { } int ParallelTaskFunction::execute(int tid) { return parfunc_(tid); } /*! \class ParalExThread \ingroup SysTools \brief Thread objects that provide methods to control their execution. */ /* --Methode-- */ /*! \brief Constructor with the specification of the ParallelTask object and the tread-id or rank in a context of parallel execution */ ParalExThread::ParalExThread(ParallelTaskInterface& ptask, int tid) : ptask_(ptask), tid_(tid), mtx_(false), state_(0) { } /* --Methode-- */ //! specific implementation of the ZThread::run() method void ParalExThread::run() { bool fgencore=true; while(fgencore) { mtx_.lock(); while ((state_!=1)&&(state_!=6)) mtx_.wait(); if (state_==6) { // Signal de fin d'execution du thread setRC(0); state_=7; //DEL cout << " *DBG*FF* ParalExThread::run(tid=" << tid_ << ") state->7 " << endl; mtx_.unlock(); mtx_.signal(); fgencore=false; return; } state_=2; int rcex = 0; try { rcex = ptask_.execute(tid_); } catch (std::exception exc) { fgexcept_ = true; msg_exc_ = "std::exception in ParalExThread::run(): "; msg_exc_ += exc.what(); cerr << "ParalExThread::run(): catched std::exception " << exc.what() << endl; rcex = 77; } catch (...) { fgexcept_ = true; msg_exc_ = "Unknown exception (...) in ParalExThread::run(): "; cerr << "ParalExThread::run(): catched unknown (...) exception " << endl; rcex = 78; } setRC(rcex); state_=3; mtx_.unlock(); mtx_.signal(); } return; } /*! \brief Launches the task execution (call ptask.execute() ) The thread should have been previously started using the start() method. */ /* --Methode-- */ int ParalExThread::go() { // if (!IfStarted()) start(); mtx_.lock(); if (state_>5) { // condition d'erreur - thread termine mtx_.unlock(); return 99; } // Attente conditions pour lancer l'execution while (state_!=0) mtx_.wait(); // On modifie l'etat pour lancer l'execution state_ = 1; fgexcept_ = false; msg_exc_ = ""; mtx_.unlock(); mtx_.signal(); } /* --Methode-- */ //! Waits till the end of execution launched by go() int ParalExThread::waitEndOfExecution() { int rc = 0; mtx_.lock(); if (state_>5) rc = 99; else if (state_<1) rc = 97; if (rc !=0) { // condition d'erreur - thread termine ou execution non lance mtx_.unlock(); return rc; } while (state_!=3) mtx_.wait(); state_ = 0; mtx_.unlock(); mtx_.signal(); return(getRC()); } /* --Methode-- */ int ParalExThread::setParallelTask(ParallelTaskInterface& ptask) { int rc = 0; mtx_.lock(); if (state_>5) { // condition d'erreur - thread termine mtx_.unlock(); return 99; } while (state_!=0) mtx_.wait(); ptask_ = ptask; mtx_.unlock(); return(0); } /* --Methode-- */ /*! \brief Terminates (stops) the thread execution. if fgwait==true, function returns only after the effective thread termination. */ void ParalExThread::terminate(bool fgwait) { //DEL cout << " *DBG* ParalExThread::terminate(tid=" << tid_ << ") " << endl; mtx_.lock(); state_=6; mtx_.unlock(); mtx_.signal(); if (!fgwait) return; usleep(200); //DEL cout << " *DBG*EE* ParalExThread::terminate(tid=" << tid_ << ") after usleep " << endl; mtx_.lock(); while (state_!=7) mtx_.wait(); mtx_.unlock(); return; } /*! \class ParallelExecutor \ingroup SysTools \brief This class can be used for simultaneous execution of different functions */ /*! \brief Contructor with arguments specifying the number of parallel execution threads and the parallel task object */ /* --Methode-- */ ParallelExecutor::ParallelExecutor(ParallelTaskInterface& ptask, size_t nthr) : vpext_(nthr), vrc_(nthr), fgstarted_(false), fgrunning_(false) { if (nthr<1) throw ParmError("ParallelExecutor(ptask,nthr)/Error nthr<1"); for(size_t i=0; i vptask) : vpext_(vptask.size()), vrc_(vptask.size()), fgstarted_(false), fgrunning_(false) { if (vptask.size()<1) throw ParmError("ParallelExecutor(vector/Error vptask.size()<1"); for(size_t i=0; iterminate(true) " << endl; vpext_[i]->terminate(true); delete vpext_[i]; } } /* --Methode-- */ void ParallelExecutor::SetParallelTask( ParallelTaskInterface& ptask ) { if (fgrunning_) throw ZThreadExc("ParallelExecutor::SetParallelTask(ptask)/Error parallel thread are running"); for(size_t i=0; isetParallelTask(ptask); return; } /* --Methode-- */ void ParallelExecutor::SetParallelTask( vector< ParallelTaskInterface * > vptask ) { if (fgrunning_) throw ZThreadExc("ParallelExecutor::SetParallelTask(vector)/Error parallel thread are running"); if (vptask.size()!=vpext_.size()) throw SzMismatchError("ParallelExecutor::SetParallelTask(vector) - vptask.size()!=vpext.size()"); for(size_t i=0; isetParallelTask(*vptask[i]); return; } /* --Methode-- */ void ParallelExecutor::start() { if (fgstarted_) return; for(size_t i=0; istart(); return; } /*! \brief Lauches the ParallelTask::execute() in the parallel threads and waits for their ending The return code is zero if all threads have succesfully executed with ParallelTask::execute() return code=0. The methods getRC(i) and IfException(i) should be checked in case of errors. */ /* --Methode-- */ int ParallelExecutor::execute() { fgrunning_ = true; for(size_t i=0; igo(); int rc = 0; for(size_t i=0; iwaitEndOfExecution(); if (vrc_[i]!=0) rc++; } fgrunning_ = false; return rc; } } // End namespace SOPHYA