source: Sophya/trunk/SophyaLib/SysTools/parlex.cc@ 4009

Last change on this file since 4009 was 3855, checked in by ansari, 15 years ago

Ajout attribut nombre de threads ds ParallelTaskInterface, Reza 12/08/2010

File size: 8.0 KB
Line 
1#include <stdlib.h>
2#include <string.h>
3#include <unistd.h>
4#include <iostream>
5
6#include "parlex.h"
7
8using namespace std;
9
10namespace SOPHYA {
11
12/*!
13 \class ParallelTaskInterface
14 \ingroup SysTools
15 \brief Interface definition for parallel task object.
16
17 The pure virtual method execute() should be redefined by the classes
18 inheriting from ParallelTaskInterface
19*/
20ParallelTaskInterface::ParallelTaskInterface()
21{
22 setNbParallelThreads(0);
23}
24ParallelTaskInterface::~ParallelTaskInterface()
25{
26}
27/*!
28 \brief Define the number of parallel threads running to perform the task
29
30 This method is called by ParallelExecutor object. If subclasses overwrite this method,
31 base class method should be called to set the value of nbparallel_ attribute.
32*/
33void ParallelTaskInterface::setNbParallelThreads(int nbparthr)
34{
35 nbparallel_=nbparthr;
36}
37
38/*!
39 \class ParallelTaskFunction
40 \ingroup SysTools
41 \brief ParallelTaskInterface implementation for functions
42
43 The function which are executed by the ParallelTaskFunction execute() method should
44 conform to the prototype: int func(int tid, int nparthr)
45
46 tid (counting from zero) is the thread rank, and nparthr the number of parallel threads
47 executing in the context
48*/
49//! Constructor with the specification of the function to be executed
50ParallelTaskFunction::ParallelTaskFunction(ParalExFunction f)
51 : parfunc_(f)
52{
53}
54int ParallelTaskFunction::execute(int tid)
55{
56 return parfunc_(tid, getNbParallelThreads());
57}
58
59/*!
60 \class ParalExThread
61 \ingroup SysTools
62 \brief Thread objects that provide methods to control their execution.
63*/
64/* --Methode-- */
65/*!
66 \brief Constructor with the specification of the ParallelTask object and the
67 tread-id or rank in a context of parallel execution
68*/
69ParalExThread::ParalExThread(ParallelTaskInterface& ptask, int tid)
70 : ptask_(ptask), tid_(tid), mtx_(false), state_(0)
71{
72}
73
74/* --Methode-- */
75//! specific implementation of the ZThread::run() method
76void ParalExThread::run()
77{
78 bool fgencore=true;
79 while(fgencore) {
80 mtx_.lock();
81 while ((state_!=1)&&(state_!=6)) mtx_.wait();
82 if (state_==6) { // Signal de fin d'execution du thread
83 setRC(0);
84 state_=7;
85 //DEL cout << " *DBG*FF* ParalExThread::run(tid=" << tid_ << ") state->7 " << endl;
86 mtx_.unlock();
87 mtx_.signal();
88 fgencore=false;
89 return;
90 }
91 state_=2;
92 int rcex = 0;
93 try {
94 rcex = ptask_.execute(tid_);
95 }
96 catch (std::exception exc) {
97 fgexcept_ = true;
98 msg_exc_ = "std::exception in ParalExThread::run(): ";
99 msg_exc_ += exc.what();
100 cerr << "ParalExThread::run(): catched std::exception " << exc.what() << endl;
101 rcex = 77;
102 }
103 catch (...) {
104 fgexcept_ = true;
105 msg_exc_ = "Unknown exception (...) in ParalExThread::run(): ";
106 cerr << "ParalExThread::run(): catched unknown (...) exception " << endl;
107 rcex = 78;
108 }
109 setRC(rcex);
110 state_=3;
111 mtx_.unlock();
112 mtx_.signal();
113 }
114 return;
115}
116
117/*!
118 \brief Launches the task execution (call ptask.execute() )
119 The thread should have been previously started using the start() method.
120*/
121/* --Methode-- */
122int ParalExThread::go()
123{
124 // if (!IfStarted()) start();
125 mtx_.lock();
126 if (state_>5) { // condition d'erreur - thread termine
127 mtx_.unlock();
128 return 99;
129 }
130 // Attente conditions pour lancer l'execution
131 while (state_!=0) mtx_.wait();
132
133 // On modifie l'etat pour lancer l'execution
134 state_ = 1;
135 fgexcept_ = false;
136 msg_exc_ = "";
137 mtx_.unlock();
138 mtx_.signal();
139 return 0;
140}
141
142/* --Methode-- */
143//! Waits till the end of execution launched by go()
144int ParalExThread::waitEndOfExecution()
145{
146 int rc = 0;
147 mtx_.lock();
148 if (state_>5) rc = 99;
149 else if (state_<1) rc = 97;
150 if (rc !=0) { // condition d'erreur - thread termine ou execution non lance
151 mtx_.unlock();
152 return rc;
153 }
154 while (state_!=3) mtx_.wait();
155 state_ = 0;
156 mtx_.unlock();
157 mtx_.signal();
158 return(getRC());
159}
160
161/* --Methode-- */
162int ParalExThread::setParallelTask(ParallelTaskInterface& ptask)
163{
164 int rc = 0;
165 mtx_.lock();
166 if (state_>5) { // condition d'erreur - thread termine
167 mtx_.unlock();
168 return 99;
169 }
170 while (state_!=0) mtx_.wait();
171 ptask_ = ptask;
172 mtx_.unlock();
173 return(0);
174}
175
176/* --Methode-- */
177/*!
178 \brief Terminates (stops) the thread execution.
179
180 if fgwait==true, function returns only after the effective thread termination.
181*/
182void ParalExThread::terminate(bool fgwait)
183{
184 //DEL cout << " *DBG* ParalExThread::terminate(tid=" << tid_ << ") " << endl;
185 mtx_.lock();
186 state_=6;
187 mtx_.unlock();
188 mtx_.signal();
189 if (!fgwait) return;
190 usleep(200);
191 //DEL cout << " *DBG*EE* ParalExThread::terminate(tid=" << tid_ << ") after usleep " << endl;
192 mtx_.lock();
193 while (state_!=7) mtx_.wait();
194 mtx_.unlock();
195 return;
196}
197
198
199/*!
200 \class ParallelExecutor
201 \ingroup SysTools
202 \brief This class can be used for simultaneous execution of different functions
203*/
204
205/*!
206 \brief Contructor with arguments specifying the number of parallel execution threads
207 and the parallel task object
208*/
209/* --Methode-- */
210ParallelExecutor::ParallelExecutor(ParallelTaskInterface& ptask, size_t nthr)
211 : vpext_(nthr), vrc_(nthr), fgstarted_(false), fgrunning_(false)
212{
213 if (nthr<1) throw ParmError("ParallelExecutor(ptask,nthr)/Error nthr<1");
214 for(size_t i=0; i<vpext_.size(); i++) {
215 vpext_[i] = new ParalExThread(ptask, i);
216 vrc_[i] = 0;
217 }
218 ptask.setNbParallelThreads( nthr );
219}
220
221/*!
222 \brief Contructor with a vector argument containing the list of parallel task objects
223 to be executed.
224*/
225/* --Methode-- */
226ParallelExecutor::ParallelExecutor(vector< ParallelTaskInterface * > vptask)
227 : vpext_(vptask.size()), vrc_(vptask.size()), fgstarted_(false), fgrunning_(false)
228{
229 if (vptask.size()<1) throw ParmError("ParallelExecutor(vector<ptask>/Error vptask.size()<1");
230 int nthr= (int)nThreads();
231 for(size_t i=0; i<vpext_.size(); i++) {
232 vpext_[i] = new ParalExThread( (*vptask[i]), i);
233 vptask[i]->setNbParallelThreads(nthr);
234 vrc_[i] = 0;
235 }
236}
237
238/* --Methode-- */
239ParallelExecutor::~ParallelExecutor()
240{
241 //DEL cout << " **DBG*B**~ParallelExecutor() " << endl;
242 for(size_t i=0; i<vpext_.size(); i++) {
243 //DEL cout << " *DBG*C* vpext_[ " << i << " ]->terminate(true) " << endl;
244 if (fgstarted_) vpext_[i]->terminate(true);
245 delete vpext_[i];
246 }
247}
248
249/* --Methode-- */
250void ParallelExecutor::SetParallelTask( ParallelTaskInterface& ptask )
251{
252 if (fgrunning_)
253 throw ZThreadExc("ParallelExecutor::SetParallelTask(ptask)/Error parallel thread are running");
254 for(size_t i=0; i<vpext_.size(); i++)
255 vpext_[i]->setParallelTask(ptask);
256 ptask.setNbParallelThreads( (int)nThreads() );
257 return;
258}
259
260/* --Methode-- */
261void ParallelExecutor::SetParallelTask( vector< ParallelTaskInterface * > vptask )
262{
263 if (fgrunning_)
264 throw ZThreadExc("ParallelExecutor::SetParallelTask(vector<ptask>)/Error parallel thread are running");
265 if (vptask.size()!=vpext_.size())
266 throw SzMismatchError("ParallelExecutor::SetParallelTask(vector<ptask>) - vptask.size()!=vpext.size()");
267 int nthr= (int)nThreads();
268 for(size_t i=0; i<vpext_.size(); i++) {
269 vpext_[i]->setParallelTask(*vptask[i]);
270 vptask[i]->setNbParallelThreads(nthr);
271 }
272 return;
273}
274
275
276/* --Methode-- */
277void ParallelExecutor::start()
278{
279 if (fgstarted_) return;
280 fgstarted_=true;
281 for(size_t i=0; i<vpext_.size(); i++) vpext_[i]->start();
282 return;
283}
284
285/*!
286 \brief Lauches the ParallelTask::execute() in the parallel threads and waits for their ending
287
288 The return code is zero if all threads have succesfully executed with ParallelTask::execute()
289 return code=0. The methods getRC(i) and IfException(i) should be checked in case of errors.
290*/
291/* --Methode-- */
292int ParallelExecutor::execute()
293{
294 fgrunning_ = true;
295 for(size_t i=0; i<vpext_.size(); i++) vpext_[i]->go();
296 int rc = 0;
297 for(size_t i=0; i<vpext_.size(); i++) {
298 vrc_[i] = vpext_[i]->waitEndOfExecution();
299 if (vrc_[i]!=0) rc++;
300 }
301 fgrunning_ = false;
302 return rc;
303}
304
305} // End namespace SOPHYA
306
Note: See TracBrowser for help on using the repository browser.