// This may look like C code, but it is really -*- C++ -*- // UPS - LAL (Orsay) / IN2P3/CNRS - IRFU/SPP (Saclay) / CEA #ifndef PARLEX_SEEN #define PARLEX_SEEN // Classes pour execution de taches paralleles en utilisant les threads // R. Ansari UPS+LAL IN2P3/CNRS 2005 #include "zthread.h" #include namespace SOPHYA { using namespace std; //-------------------------------------------------------------------- // Interface pour les objets ayant une fonction (Execute()) pouvant // s'executer en parallele //-------------------------------------------------------------------- /*! \class ParallelTaskInterface \ingroup SysTools \brief Interface definition for parallel task object. The pure virtual method execute() should be redefined by the classes inheriting from ParallelTaskInterface */ class ParallelTaskInterface { public: explicit ParallelTaskInterface() { } virtual ~ParallelTaskInterface() { } /*! This method should perform the actual computation. The parameter tid is used by the ParallelExecutor class to identify the thread calling the method */ virtual int execute(int tid) = 0; }; //---- Implementation de ParallelTaskInterface pour l'execution des fonctions typedef int (* ParalExFunction) (int); //---- class ParallelTaskFunction { public: explicit ParallelTaskFunction(ParalExFunction f); virtual int execute(int tid); ParalExFunction parfunc_; }; //-------------------------------------------------------------------- // Classe de thread permettant l'execution controle de la methode // ParallelTaskInterface::Execute() //-------------------------------------------------------------------- class ParalExThread : public ZThread { public: explicit ParalExThread(ParallelTaskInterface& ptask, int tid); virtual void run(); // demarre l'execution (start() doit etre appele avant) //! int go(); // Attend la fin de l'execution int waitEndOfExecution(); // Termine (arrete) l'execution du thread . si fgwait==true, attend l'arret effectif du tread. void terminate(bool fgwait=true); // demarre l'execution et attend sa fin //! Launches the execution of ptask_.execute() and waits for its ending. inline int execute() { int rc=go(); if(rc!=0) return rc; return waitEndOfExecution(); } //! Return the the execution phase, 0: waiting, 1:go called, 2: executing, 3: execution finished inline int getExecutionState() { return state_; } //! Return true if an exception has been catched when ptask_.execute() called, and correspond message inline bool IfException(string& msg) { msg=msg_exc_; return fgexcept_; } //! Return reference to the parallel task object inline ParallelTaskInterface& getParallelTask() { return ptask_; } //! Set (changes) the parallel task object. int setParallelTask(ParallelTaskInterface& ptask); //! Return the rank or thread-id in a parallel execution context inline int Rank() { return tid_; } protected: ParallelTaskInterface& ptask_; int tid_; ZMutex mtx_; int state_; // running state // 0:idle, 1: go, 2: executing, 3: executionfinished , 6:terminatethread, 7:threadfinished bool fgexcept_; // true -> exception lors de l'execution de ptask_.execute() string msg_exc_; // message d'exception le cas echeant }; //-------------------------------------------------------------------- // Classe permettant l'execution simultanee de plusieurs fonctions // ParallelTaskInterface::Execute() //-------------------------------------------------------------------- class ParallelExecutor { public: explicit ParallelExecutor(ParallelTaskInterface& ptask, size_t nthr); explicit ParallelExecutor(vector< ParallelTaskInterface * > vptask); virtual ~ParallelExecutor(); //! Set (changes) the parallel task objects (same object for all threads) void SetParallelTask( ParallelTaskInterface& ptask ); //! Set (changes) the parallel task objects void SetParallelTask( vector< ParallelTaskInterface * > vptask); //! Starts all the parallel threads virtual void start(); // Appel a l'execution parallele virtual int execute(); //! Return the number of parallel threads inline size_t nThreads() { return vpext_.size(); } //! Return RC for the ParalleTask.execute() of thread i inline int getRC(size_t i) { return vrc_[i]; } //! Return the execption condition for the ParalleTask.execute() of thread i inline bool IfException(size_t i, string& msg) { return vpext_[i]->IfException(msg); } protected: vector vpext_; vector vrc_; bool fgstarted_; // true -> les threads tournent (thr[i].start() appeles) bool fgrunning_; // true -> en cours d'execution }; } // namespace SOPHYA #endif