| 1 | // This may look like C code, but it is really -*- C++ -*- | 
|---|
| 2 | // UPS - LAL (Orsay) / IN2P3/CNRS  -  IRFU/SPP (Saclay) / CEA | 
|---|
| 3 | #ifndef PARLEX_SEEN | 
|---|
| 4 | #define PARLEX_SEEN | 
|---|
| 5 |  | 
|---|
| 6 | // Classes pour execution de taches paralleles en utilisant les threads | 
|---|
| 7 | //    R. Ansari       UPS+LAL IN2P3/CNRS  2009-2010 | 
|---|
| 8 |  | 
|---|
| 9 | #include "zthread.h" | 
|---|
| 10 | #include <vector> | 
|---|
| 11 |  | 
|---|
| 12 | namespace SOPHYA { | 
|---|
| 13 | using namespace std; | 
|---|
| 14 |  | 
|---|
| 15 | //-------------------------------------------------------------------- | 
|---|
| 16 | //  Interface pour les objets ayant une fonction (Execute()) pouvant | 
|---|
| 17 | //  s'executer en parallele | 
|---|
| 18 | //-------------------------------------------------------------------- | 
|---|
| 19 | class ParallelTaskInterface { | 
|---|
| 20 | public: | 
|---|
| 21 | explicit       ParallelTaskInterface(); | 
|---|
| 22 | virtual        ~ParallelTaskInterface(); | 
|---|
| 23 | /*! This method should perform the actual computation. The parameter tid | 
|---|
| 24 | is used by the ParallelExecutor class to identify the thread calling the method | 
|---|
| 25 | */ | 
|---|
| 26 | virtual int    execute(int tid) = 0; | 
|---|
| 27 | //! Define the number of parallel threads running to perform the task (set by ParallelExecutor object) | 
|---|
| 28 | virtual void   setNbParallelThreads(int nbparthr); | 
|---|
| 29 | //! Return the number of parallel threads running to perform the task | 
|---|
| 30 | inline  int    getNbParallelThreads() { return nbparallel_; } | 
|---|
| 31 | protected: | 
|---|
| 32 | int nbparallel_;   // total number of parallel threads which will perform the job | 
|---|
| 33 | }; | 
|---|
| 34 |  | 
|---|
| 35 | //---- Implementation de ParallelTaskInterface pour l'execution des fonctions | 
|---|
| 36 | //! Prototype for functions which executed by ParallelTaskFunction objects | 
|---|
| 37 | typedef int (* ParalExFunction) (int,int); | 
|---|
| 38 | //---- | 
|---|
| 39 | class ParallelTaskFunction : public ParallelTaskInterface { | 
|---|
| 40 | public: | 
|---|
| 41 | explicit       ParallelTaskFunction(ParalExFunction f); | 
|---|
| 42 | virtual int    execute(int tid); | 
|---|
| 43 |  | 
|---|
| 44 | ParalExFunction parfunc_; | 
|---|
| 45 | }; | 
|---|
| 46 |  | 
|---|
| 47 | //-------------------------------------------------------------------- | 
|---|
| 48 | //  Classe de thread permettant l'execution controle de la methode | 
|---|
| 49 | //   ParallelTaskInterface::Execute() | 
|---|
| 50 | //-------------------------------------------------------------------- | 
|---|
| 51 | class ParalExThread : public ZThread { | 
|---|
| 52 | public: | 
|---|
| 53 | explicit       ParalExThread(ParallelTaskInterface& ptask, int tid); | 
|---|
| 54 | virtual void   run(); | 
|---|
| 55 |  | 
|---|
| 56 | // demarre l'execution (start() doit etre appele avant) | 
|---|
| 57 | //! | 
|---|
| 58 | int    go(); | 
|---|
| 59 | // Attend la fin  de l'execution | 
|---|
| 60 | int    waitEndOfExecution(); | 
|---|
| 61 | // Termine (arrete) l'execution du thread . si fgwait==true, attend l'arret effectif du tread. | 
|---|
| 62 | void   terminate(bool fgwait=true); | 
|---|
| 63 | // demarre l'execution et attend sa fin | 
|---|
| 64 | //! Launches the execution of ptask_.execute() and waits for its ending. | 
|---|
| 65 | inline int execute() | 
|---|
| 66 | { | 
|---|
| 67 | int rc=go(); | 
|---|
| 68 | if(rc!=0) return rc; | 
|---|
| 69 | return waitEndOfExecution(); | 
|---|
| 70 | } | 
|---|
| 71 | //! Return the the execution phase, 0: waiting, 1:go called, 2: executing, 3: execution finished | 
|---|
| 72 | inline int getExecutionState() { return state_; } | 
|---|
| 73 | //! Return true if an exception has been catched when ptask_.execute() called, and correspond message | 
|---|
| 74 | inline bool IfException(string& msg) | 
|---|
| 75 | {  msg=msg_exc_;  return fgexcept_; } | 
|---|
| 76 |  | 
|---|
| 77 | //! Return reference to the parallel task object | 
|---|
| 78 | inline ParallelTaskInterface& getParallelTask() { return ptask_; } | 
|---|
| 79 | //! Set (changes) the parallel task object. | 
|---|
| 80 | int setParallelTask(ParallelTaskInterface& ptask); | 
|---|
| 81 | //! Return the rank or thread-id in a parallel execution context | 
|---|
| 82 | inline int Rank() { return tid_; } | 
|---|
| 83 |  | 
|---|
| 84 | protected: | 
|---|
| 85 | ParallelTaskInterface& ptask_; | 
|---|
| 86 | int tid_; | 
|---|
| 87 | ZMutex mtx_; | 
|---|
| 88 | int state_;     //  running state | 
|---|
| 89 | // 0:idle, 1: go, 2: executing, 3: executionfinished , 6:terminatethread, 7:threadfinished | 
|---|
| 90 | bool fgexcept_;  // true -> exception lors de l'execution de ptask_.execute() | 
|---|
| 91 | string msg_exc_;  // message d'exception le cas echeant | 
|---|
| 92 | }; | 
|---|
| 93 |  | 
|---|
| 94 | //-------------------------------------------------------------------- | 
|---|
| 95 | //  Classe permettant l'execution simultanee de plusieurs fonctions | 
|---|
| 96 | //   ParallelTaskInterface::Execute() | 
|---|
| 97 | //-------------------------------------------------------------------- | 
|---|
| 98 | class ParallelExecutor { | 
|---|
| 99 | public: | 
|---|
| 100 | explicit      ParallelExecutor(ParallelTaskInterface& ptask, size_t nthr); | 
|---|
| 101 | explicit      ParallelExecutor(vector< ParallelTaskInterface * > vptask); | 
|---|
| 102 | virtual       ~ParallelExecutor(); | 
|---|
| 103 |  | 
|---|
| 104 | //! Set (changes) the parallel task objects (same object for all threads) | 
|---|
| 105 | void SetParallelTask( ParallelTaskInterface& ptask ); | 
|---|
| 106 | //! Set (changes) the parallel task objects | 
|---|
| 107 | void SetParallelTask( vector< ParallelTaskInterface * > vptask); | 
|---|
| 108 |  | 
|---|
| 109 | //! Starts all the parallel threads | 
|---|
| 110 | virtual void  start(); | 
|---|
| 111 | // Appel a l'execution parallele | 
|---|
| 112 | virtual int   execute(); | 
|---|
| 113 | //! Return the number of parallel threads | 
|---|
| 114 | inline  size_t  nThreads()  { return vpext_.size(); } | 
|---|
| 115 | //! Return RC for the ParalleTask.execute() of thread i | 
|---|
| 116 | inline  int     getRC(size_t i) { return vrc_[i]; } | 
|---|
| 117 | //! Return the exception condition for the ParalleTask.execute() of thread i | 
|---|
| 118 | inline  bool    IfException(size_t i, string& msg)  { return vpext_[i]->IfException(msg); } | 
|---|
| 119 |  | 
|---|
| 120 | protected: | 
|---|
| 121 | vector<ParalExThread *> vpext_; | 
|---|
| 122 | vector<int> vrc_; | 
|---|
| 123 | bool fgstarted_;   // true -> les threads tournent (thr[i].start() appeles) | 
|---|
| 124 | bool fgrunning_;   // true -> en cours d'execution | 
|---|
| 125 | }; | 
|---|
| 126 |  | 
|---|
| 127 | } // namespace SOPHYA | 
|---|
| 128 |  | 
|---|
| 129 | #endif | 
|---|