source: Sophya/trunk/SophyaLib/SysTools/parlex.cc@ 3719

Last change on this file since 3719 was 3718, checked in by ansari, 16 years ago

1/ Corrections et modifications mineures ds ZThread, et ajout des classes

ParallelTaskInterface, ParalExThread, ParallelExecutor pour l'execution

de fonctions en parallele dans des threads, Reza 28/12/2009

File size: 6.7 KB
RevLine 
[3718]1#include <stdlib.h>
2#include <unistd.h>
3#include <iostream>
4
5#include "parlex.h"
6
7using namespace std;
8
9namespace SOPHYA {
10
11/*!
12 \class ParallelTaskFunction
13 \ingroup SysTools
14 \brief ParallelTaskInterface implementation for functions
15*/
16//! Constructor with the specification of the function to be executed
17ParallelTaskFunction::ParallelTaskFunction(ParalExFunction f)
18 : parfunc_(f)
19{
20}
21int ParallelTaskFunction::execute(int tid)
22{
23 return parfunc_(tid);
24}
25
26/*!
27 \class ParalExThread
28 \ingroup SysTools
29 \brief Thread objects that provide methods to control their execution.
30*/
31/* --Methode-- */
32/*!
33 \brief Constructor with the specification of the ParallelTask object and the
34 tread-id or rank in a context of parallel execution
35*/
36ParalExThread::ParalExThread(ParallelTaskInterface& ptask, int tid)
37 : ptask_(ptask), tid_(tid), mtx_(false), state_(0)
38{
39}
40
41/* --Methode-- */
42//! specific implementation of the ZThread::run() method
43void ParalExThread::run()
44{
45 bool fgencore=true;
46 while(fgencore) {
47 mtx_.lock();
48 while ((state_!=1)&&(state_!=6)) mtx_.wait();
49 if (state_==6) { // Signal de fin d'execution du thread
50 setRC(0);
51 state_=7;
52 //DEL cout << " *DBG*FF* ParalExThread::run(tid=" << tid_ << ") state->7 " << endl;
53 mtx_.unlock();
54 mtx_.signal();
55 fgencore=false;
56 return;
57 }
58 state_=2;
59 int rcex = 0;
60 try {
61 rcex = ptask_.execute(tid_);
62 }
63 catch (std::exception exc) {
64 fgexcept_ = true;
65 msg_exc_ = "std::exception in ParalExThread::run(): ";
66 msg_exc_ += exc.what();
67 cerr << "ParalExThread::run(): catched std::exception " << exc.what() << endl;
68 rcex = 77;
69 }
70 catch (...) {
71 fgexcept_ = true;
72 msg_exc_ = "Unknown exception (...) in ParalExThread::run(): ";
73 cerr << "ParalExThread::run(): catched unknown (...) exception " << endl;
74 rcex = 78;
75 }
76 setRC(rcex);
77 state_=3;
78 mtx_.unlock();
79 mtx_.signal();
80 }
81 return;
82}
83
84/*!
85 \brief Launches the task execution (call ptask.execute() )
86 The thread should have been previously started using the start() method.
87*/
88/* --Methode-- */
89int ParalExThread::go()
90{
91 // if (!IfStarted()) start();
92 mtx_.lock();
93 if (state_>5) { // condition d'erreur - thread termine
94 mtx_.unlock();
95 return 99;
96 }
97 // Attente conditions pour lancer l'execution
98 while (state_!=0) mtx_.wait();
99
100 // On modifie l'etat pour lancer l'execution
101 state_ = 1;
102 fgexcept_ = false;
103 msg_exc_ = "";
104 mtx_.unlock();
105 mtx_.signal();
106}
107
108/* --Methode-- */
109//! Waits till the end of execution launched by go()
110int ParalExThread::waitEndOfExecution()
111{
112 int rc = 0;
113 mtx_.lock();
114 if (state_>5) rc = 99;
115 else if (state_<1) rc = 97;
116 if (rc !=0) { // condition d'erreur - thread termine ou execution non lance
117 mtx_.unlock();
118 return rc;
119 }
120 while (state_!=3) mtx_.wait();
121 state_ = 0;
122 mtx_.unlock();
123 mtx_.signal();
124 return(getRC());
125}
126
127/* --Methode-- */
128int ParalExThread::setParallelTask(ParallelTaskInterface& ptask)
129{
130 int rc = 0;
131 mtx_.lock();
132 if (state_>5) { // condition d'erreur - thread termine
133 mtx_.unlock();
134 return 99;
135 }
136 while (state_!=0) mtx_.wait();
137 ptask_ = ptask;
138 mtx_.unlock();
139 return(0);
140}
141
142/* --Methode-- */
143/*!
144 \brief Terminates (stops) the thread execution.
145
146 if fgwait==true, function returns only after the effective thread termination.
147*/
148void ParalExThread::terminate(bool fgwait)
149{
150 //DEL cout << " *DBG* ParalExThread::terminate(tid=" << tid_ << ") " << endl;
151 mtx_.lock();
152 state_=6;
153 mtx_.unlock();
154 mtx_.signal();
155 if (!fgwait) return;
156 usleep(200);
157 //DEL cout << " *DBG*EE* ParalExThread::terminate(tid=" << tid_ << ") after usleep " << endl;
158 mtx_.lock();
159 while (state_!=7) mtx_.wait();
160 mtx_.unlock();
161 return;
162}
163
164
165/*!
166 \class ParallelExecutor
167 \ingroup SysTools
168 \brief This class can be used for simultaneous execution of different functions
169*/
170
171/*!
172 \brief Contructor with arguments specifying the number of parallel execution threads
173 and the parallel task object
174*/
175/* --Methode-- */
176ParallelExecutor::ParallelExecutor(ParallelTaskInterface& ptask, size_t nthr)
177 : vpext_(nthr), vrc_(nthr), fgstarted_(false), fgrunning_(false)
178{
179 if (nthr<1) throw ParmError("ParallelExecutor(ptask,nthr)/Error nthr<1");
180 for(size_t i=0; i<vpext_.size(); i++) {
181 vpext_[i] = new ParalExThread(ptask, i);
182 vrc_[i] = 0;
183 }
184}
185
186/*!
187 \brief Contructor with a vector argument containing the list of parallel task objects
188 to be executed.
189*/
190/* --Methode-- */
191ParallelExecutor::ParallelExecutor(vector< ParallelTaskInterface * > vptask)
192 : vpext_(vptask.size()), vrc_(vptask.size()), fgstarted_(false), fgrunning_(false)
193{
194 if (vptask.size()<1) throw ParmError("ParallelExecutor(vector<ptask>/Error vptask.size()<1");
195 for(size_t i=0; i<vpext_.size(); i++) {
196 vpext_[i] = new ParalExThread( (*vptask[i]), i);
197 vrc_[i] = 0;
198 }
199}
200
201/* --Methode-- */
202ParallelExecutor::~ParallelExecutor()
203{
204 //DEL cout << " **DBG*B**~ParallelExecutor() " << endl;
205 for(size_t i=0; i<vpext_.size(); i++) {
206 //DEL cout << " *DBG*C* vpext_[ " << i << " ]->terminate(true) " << endl;
207 vpext_[i]->terminate(true);
208 delete vpext_[i];
209 }
210}
211
212/* --Methode-- */
213void ParallelExecutor::SetParallelTask( ParallelTaskInterface& ptask )
214{
215 if (fgrunning_)
216 throw ZThreadExc("ParallelExecutor::SetParallelTask(ptask)/Error parallel thread are running");
217 for(size_t i=0; i<vpext_.size(); i++)
218 vpext_[i]->setParallelTask(ptask);
219 return;
220}
221
222/* --Methode-- */
223void ParallelExecutor::SetParallelTask( vector< ParallelTaskInterface * > vptask )
224{
225 if (fgrunning_)
226 throw ZThreadExc("ParallelExecutor::SetParallelTask(vector<ptask>)/Error parallel thread are running");
227 if (vptask.size()!=vpext_.size())
228 throw SzMismatchError("ParallelExecutor::SetParallelTask(vector<ptask>) - vptask.size()!=vpext.size()");
229 for(size_t i=0; i<vpext_.size(); i++)
230 vpext_[i]->setParallelTask(*vptask[i]);
231 return;
232}
233
234
235/* --Methode-- */
236void ParallelExecutor::start()
237{
238 if (fgstarted_) return;
239 for(size_t i=0; i<vpext_.size(); i++) vpext_[i]->start();
240 return;
241}
242
243/*!
244 \brief Lauches the ParallelTask::execute() in the parallel threads and waits for their ending
245
246 The return code is zero if all threads have succesfully executed with ParallelTask::execute()
247 return code=0. The methods getRC(i) and IfException(i) should be checked in case of errors.
248*/
249/* --Methode-- */
250int ParallelExecutor::execute()
251{
252 fgrunning_ = true;
253 for(size_t i=0; i<vpext_.size(); i++) vpext_[i]->go();
254 int rc = 0;
255 for(size_t i=0; i<vpext_.size(); i++) {
256 vrc_[i] = vpext_[i]->waitEndOfExecution();
257 if (vrc_[i]!=0) rc++;
258 }
259 fgrunning_ = false;
260 return rc;
261}
262
263} // End namespace SOPHYA
264
Note: See TracBrowser for help on using the repository browser.