// This may look like C code, but it is really -*- C++ -*- // UPS - LAL (Orsay) / IN2P3/CNRS - IRFU/SPP (Saclay) / CEA #ifndef PARLEX_SEEN #define PARLEX_SEEN // Classes pour execution de taches paralleles en utilisant les threads // R. Ansari UPS+LAL IN2P3/CNRS 2009-2010 #include "zthread.h" #include namespace SOPHYA { using namespace std; //-------------------------------------------------------------------- // Interface pour les objets ayant une fonction (Execute()) pouvant // s'executer en parallele //-------------------------------------------------------------------- class ParallelTaskInterface { public: explicit ParallelTaskInterface(); virtual ~ParallelTaskInterface(); /*! This method should perform the actual computation. The parameter tid is used by the ParallelExecutor class to identify the thread calling the method */ virtual int execute(int tid) = 0; //! Define the number of parallel threads running to perform the task (set by ParallelExecutor object) virtual void setNbParallelThreads(int nbparthr); //! Return the number of parallel threads running to perform the task inline int getNbParallelThreads() { return nbparallel_; } protected: int nbparallel_; // total number of parallel threads which will perform the job }; //---- Implementation de ParallelTaskInterface pour l'execution des fonctions //! Prototype for functions which executed by ParallelTaskFunction objects typedef int (* ParalExFunction) (int,int); //---- class ParallelTaskFunction : public ParallelTaskInterface { public: explicit ParallelTaskFunction(ParalExFunction f); virtual int execute(int tid); ParalExFunction parfunc_; }; //-------------------------------------------------------------------- // Classe de thread permettant l'execution controle de la methode // ParallelTaskInterface::Execute() //-------------------------------------------------------------------- class ParalExThread : public ZThread { public: explicit ParalExThread(ParallelTaskInterface& ptask, int tid); virtual void run(); // demarre l'execution (start() doit etre appele avant) //! int go(); // Attend la fin de l'execution int waitEndOfExecution(); // Termine (arrete) l'execution du thread . si fgwait==true, attend l'arret effectif du tread. void terminate(bool fgwait=true); // demarre l'execution et attend sa fin //! Launches the execution of ptask_.execute() and waits for its ending. inline int execute() { int rc=go(); if(rc!=0) return rc; return waitEndOfExecution(); } //! Return the the execution phase, 0: waiting, 1:go called, 2: executing, 3: execution finished inline int getExecutionState() { return state_; } //! Return true if an exception has been catched when ptask_.execute() called, and correspond message inline bool IfException(string& msg) { msg=msg_exc_; return fgexcept_; } //! Return reference to the parallel task object inline ParallelTaskInterface& getParallelTask() { return ptask_; } //! Set (changes) the parallel task object. int setParallelTask(ParallelTaskInterface& ptask); //! Return the rank or thread-id in a parallel execution context inline int Rank() { return tid_; } protected: ParallelTaskInterface& ptask_; int tid_; ZMutex mtx_; int state_; // running state // 0:idle, 1: go, 2: executing, 3: executionfinished , 6:terminatethread, 7:threadfinished bool fgexcept_; // true -> exception lors de l'execution de ptask_.execute() string msg_exc_; // message d'exception le cas echeant }; //-------------------------------------------------------------------- // Classe permettant l'execution simultanee de plusieurs fonctions // ParallelTaskInterface::Execute() //-------------------------------------------------------------------- class ParallelExecutor { public: explicit ParallelExecutor(ParallelTaskInterface& ptask, size_t nthr); explicit ParallelExecutor(vector< ParallelTaskInterface * > vptask); virtual ~ParallelExecutor(); //! Set (changes) the parallel task objects (same object for all threads) void SetParallelTask( ParallelTaskInterface& ptask ); //! Set (changes) the parallel task objects void SetParallelTask( vector< ParallelTaskInterface * > vptask); //! Starts all the parallel threads virtual void start(); // Appel a l'execution parallele virtual int execute(); //! Return the number of parallel threads inline size_t nThreads() { return vpext_.size(); } //! Return RC for the ParalleTask.execute() of thread i inline int getRC(size_t i) { return vrc_[i]; } //! Return the exception condition for the ParalleTask.execute() of thread i inline bool IfException(size_t i, string& msg) { return vpext_[i]->IfException(msg); } protected: vector vpext_; vector vrc_; bool fgstarted_; // true -> les threads tournent (thr[i].start() appeles) bool fgrunning_; // true -> en cours d'execution }; } // namespace SOPHYA #endif