#include #include #include #include #include "parlex.h" using namespace std; namespace SOPHYA { /*! \class ParallelTaskInterface \ingroup SysTools \brief Interface definition for parallel task object. The pure virtual method execute() should be redefined by the classes inheriting from ParallelTaskInterface */ ParallelTaskInterface::ParallelTaskInterface() { setNbParallelThreads(0); } ParallelTaskInterface::~ParallelTaskInterface() { } /*! \brief Define the number of parallel threads running to perform the task This method is called by ParallelExecutor object. If subclasses overwrite this method, base class method should be called to set the value of nbparallel_ attribute. */ void ParallelTaskInterface::setNbParallelThreads(int nbparthr) { nbparallel_=nbparthr; } /*! \class ParallelTaskFunction \ingroup SysTools \brief ParallelTaskInterface implementation for functions The function which are executed by the ParallelTaskFunction execute() method should conform to the prototype: int func(int tid, int nparthr) tid (counting from zero) is the thread rank, and nparthr the number of parallel threads executing in the context */ //! Constructor with the specification of the function to be executed ParallelTaskFunction::ParallelTaskFunction(ParalExFunction f) : parfunc_(f) { } int ParallelTaskFunction::execute(int tid) { return parfunc_(tid, getNbParallelThreads()); } /*! \class ParalExThread \ingroup SysTools \brief Thread objects that provide methods to control their execution. */ /* --Methode-- */ /*! \brief Constructor with the specification of the ParallelTask object and the tread-id or rank in a context of parallel execution */ ParalExThread::ParalExThread(ParallelTaskInterface& ptask, int tid) : ptask_(ptask), tid_(tid), mtx_(false), state_(0) { } /* --Methode-- */ //! specific implementation of the ZThread::run() method void ParalExThread::run() { bool fgencore=true; while(fgencore) { mtx_.lock(); while ((state_!=1)&&(state_!=6)) mtx_.wait(); if (state_==6) { // Signal de fin d'execution du thread setRC(0); state_=7; //DEL cout << " *DBG*FF* ParalExThread::run(tid=" << tid_ << ") state->7 " << endl; mtx_.unlock(); mtx_.signal(); fgencore=false; return; } state_=2; int rcex = 0; try { rcex = ptask_.execute(tid_); } catch (std::exception exc) { fgexcept_ = true; msg_exc_ = "std::exception in ParalExThread::run(): "; msg_exc_ += exc.what(); cerr << "ParalExThread::run(): catched std::exception " << exc.what() << endl; rcex = 77; } catch (...) { fgexcept_ = true; msg_exc_ = "Unknown exception (...) in ParalExThread::run(): "; cerr << "ParalExThread::run(): catched unknown (...) exception " << endl; rcex = 78; } setRC(rcex); state_=3; mtx_.unlock(); mtx_.signal(); } return; } /*! \brief Launches the task execution (call ptask.execute() ) The thread should have been previously started using the start() method. */ /* --Methode-- */ int ParalExThread::go() { // if (!IfStarted()) start(); mtx_.lock(); if (state_>5) { // condition d'erreur - thread termine mtx_.unlock(); return 99; } // Attente conditions pour lancer l'execution while (state_!=0) mtx_.wait(); // On modifie l'etat pour lancer l'execution state_ = 1; fgexcept_ = false; msg_exc_ = ""; mtx_.unlock(); mtx_.signal(); return 0; } /* --Methode-- */ //! Waits till the end of execution launched by go() int ParalExThread::waitEndOfExecution() { int rc = 0; mtx_.lock(); if (state_>5) rc = 99; else if (state_<1) rc = 97; if (rc !=0) { // condition d'erreur - thread termine ou execution non lance mtx_.unlock(); return rc; } while (state_!=3) mtx_.wait(); state_ = 0; mtx_.unlock(); mtx_.signal(); return(getRC()); } /* --Methode-- */ int ParalExThread::setParallelTask(ParallelTaskInterface& ptask) { int rc = 0; mtx_.lock(); if (state_>5) { // condition d'erreur - thread termine mtx_.unlock(); return 99; } while (state_!=0) mtx_.wait(); ptask_ = ptask; mtx_.unlock(); return(0); } /* --Methode-- */ /*! \brief Terminates (stops) the thread execution. if fgwait==true, function returns only after the effective thread termination. */ void ParalExThread::terminate(bool fgwait) { //DEL cout << " *DBG* ParalExThread::terminate(tid=" << tid_ << ") " << endl; mtx_.lock(); state_=6; mtx_.unlock(); mtx_.signal(); if (!fgwait) return; usleep(200); //DEL cout << " *DBG*EE* ParalExThread::terminate(tid=" << tid_ << ") after usleep " << endl; mtx_.lock(); while (state_!=7) mtx_.wait(); mtx_.unlock(); return; } /*! \class ParallelExecutor \ingroup SysTools \brief This class can be used for simultaneous execution of different functions */ /*! \brief Contructor with arguments specifying the number of parallel execution threads and the parallel task object */ /* --Methode-- */ ParallelExecutor::ParallelExecutor(ParallelTaskInterface& ptask, size_t nthr) : vpext_(nthr), vrc_(nthr), fgstarted_(false), fgrunning_(false) { if (nthr<1) throw ParmError("ParallelExecutor(ptask,nthr)/Error nthr<1"); for(size_t i=0; i vptask) : vpext_(vptask.size()), vrc_(vptask.size()), fgstarted_(false), fgrunning_(false) { if (vptask.size()<1) throw ParmError("ParallelExecutor(vector/Error vptask.size()<1"); int nthr= (int)nThreads(); for(size_t i=0; isetNbParallelThreads(nthr); vrc_[i] = 0; } } /* --Methode-- */ ParallelExecutor::~ParallelExecutor() { //DEL cout << " **DBG*B**~ParallelExecutor() " << endl; for(size_t i=0; iterminate(true) " << endl; if (fgstarted_) vpext_[i]->terminate(true); delete vpext_[i]; } } /* --Methode-- */ void ParallelExecutor::SetParallelTask( ParallelTaskInterface& ptask ) { if (fgrunning_) throw ZThreadExc("ParallelExecutor::SetParallelTask(ptask)/Error parallel thread are running"); for(size_t i=0; isetParallelTask(ptask); ptask.setNbParallelThreads( (int)nThreads() ); return; } /* --Methode-- */ void ParallelExecutor::SetParallelTask( vector< ParallelTaskInterface * > vptask ) { if (fgrunning_) throw ZThreadExc("ParallelExecutor::SetParallelTask(vector)/Error parallel thread are running"); if (vptask.size()!=vpext_.size()) throw SzMismatchError("ParallelExecutor::SetParallelTask(vector) - vptask.size()!=vpext.size()"); int nthr= (int)nThreads(); for(size_t i=0; isetParallelTask(*vptask[i]); vptask[i]->setNbParallelThreads(nthr); } return; } /* --Methode-- */ void ParallelExecutor::start() { if (fgstarted_) return; fgstarted_=true; for(size_t i=0; istart(); return; } /*! \brief Lauches the ParallelTask::execute() in the parallel threads and waits for their ending The return code is zero if all threads have succesfully executed with ParallelTask::execute() return code=0. The methods getRC(i) and IfException(i) should be checked in case of errors. */ /* --Methode-- */ int ParallelExecutor::execute() { fgrunning_ = true; for(size_t i=0; igo(); int rc = 0; for(size_t i=0; iwaitEndOfExecution(); if (vrc_[i]!=0) rc++; } fgrunning_ = false; return rc; } } // End namespace SOPHYA