source: Sophya/trunk/SophyaLib/SysTools/parlex.cc@ 3728

Last change on this file since 3728 was 3728, checked in by ansari, 16 years ago

Ajout return(s) manquants siute compil AIX, Reza 04 Jan 2010

File size: 6.7 KB
Line 
1#include <stdlib.h>
2#include <unistd.h>
3#include <iostream>
4
5#include "parlex.h"
6
7using namespace std;
8
9namespace SOPHYA {
10
11/*!
12 \class ParallelTaskFunction
13 \ingroup SysTools
14 \brief ParallelTaskInterface implementation for functions
15*/
16//! Constructor with the specification of the function to be executed
17ParallelTaskFunction::ParallelTaskFunction(ParalExFunction f)
18 : parfunc_(f)
19{
20}
21int ParallelTaskFunction::execute(int tid)
22{
23 return parfunc_(tid);
24}
25
26/*!
27 \class ParalExThread
28 \ingroup SysTools
29 \brief Thread objects that provide methods to control their execution.
30*/
31/* --Methode-- */
32/*!
33 \brief Constructor with the specification of the ParallelTask object and the
34 tread-id or rank in a context of parallel execution
35*/
36ParalExThread::ParalExThread(ParallelTaskInterface& ptask, int tid)
37 : ptask_(ptask), tid_(tid), mtx_(false), state_(0)
38{
39}
40
41/* --Methode-- */
42//! specific implementation of the ZThread::run() method
43void ParalExThread::run()
44{
45 bool fgencore=true;
46 while(fgencore) {
47 mtx_.lock();
48 while ((state_!=1)&&(state_!=6)) mtx_.wait();
49 if (state_==6) { // Signal de fin d'execution du thread
50 setRC(0);
51 state_=7;
52 //DEL cout << " *DBG*FF* ParalExThread::run(tid=" << tid_ << ") state->7 " << endl;
53 mtx_.unlock();
54 mtx_.signal();
55 fgencore=false;
56 return;
57 }
58 state_=2;
59 int rcex = 0;
60 try {
61 rcex = ptask_.execute(tid_);
62 }
63 catch (std::exception exc) {
64 fgexcept_ = true;
65 msg_exc_ = "std::exception in ParalExThread::run(): ";
66 msg_exc_ += exc.what();
67 cerr << "ParalExThread::run(): catched std::exception " << exc.what() << endl;
68 rcex = 77;
69 }
70 catch (...) {
71 fgexcept_ = true;
72 msg_exc_ = "Unknown exception (...) in ParalExThread::run(): ";
73 cerr << "ParalExThread::run(): catched unknown (...) exception " << endl;
74 rcex = 78;
75 }
76 setRC(rcex);
77 state_=3;
78 mtx_.unlock();
79 mtx_.signal();
80 }
81 return;
82}
83
84/*!
85 \brief Launches the task execution (call ptask.execute() )
86 The thread should have been previously started using the start() method.
87*/
88/* --Methode-- */
89int ParalExThread::go()
90{
91 // if (!IfStarted()) start();
92 mtx_.lock();
93 if (state_>5) { // condition d'erreur - thread termine
94 mtx_.unlock();
95 return 99;
96 }
97 // Attente conditions pour lancer l'execution
98 while (state_!=0) mtx_.wait();
99
100 // On modifie l'etat pour lancer l'execution
101 state_ = 1;
102 fgexcept_ = false;
103 msg_exc_ = "";
104 mtx_.unlock();
105 mtx_.signal();
106 return 0;
107}
108
109/* --Methode-- */
110//! Waits till the end of execution launched by go()
111int ParalExThread::waitEndOfExecution()
112{
113 int rc = 0;
114 mtx_.lock();
115 if (state_>5) rc = 99;
116 else if (state_<1) rc = 97;
117 if (rc !=0) { // condition d'erreur - thread termine ou execution non lance
118 mtx_.unlock();
119 return rc;
120 }
121 while (state_!=3) mtx_.wait();
122 state_ = 0;
123 mtx_.unlock();
124 mtx_.signal();
125 return(getRC());
126}
127
128/* --Methode-- */
129int ParalExThread::setParallelTask(ParallelTaskInterface& ptask)
130{
131 int rc = 0;
132 mtx_.lock();
133 if (state_>5) { // condition d'erreur - thread termine
134 mtx_.unlock();
135 return 99;
136 }
137 while (state_!=0) mtx_.wait();
138 ptask_ = ptask;
139 mtx_.unlock();
140 return(0);
141}
142
143/* --Methode-- */
144/*!
145 \brief Terminates (stops) the thread execution.
146
147 if fgwait==true, function returns only after the effective thread termination.
148*/
149void ParalExThread::terminate(bool fgwait)
150{
151 //DEL cout << " *DBG* ParalExThread::terminate(tid=" << tid_ << ") " << endl;
152 mtx_.lock();
153 state_=6;
154 mtx_.unlock();
155 mtx_.signal();
156 if (!fgwait) return;
157 usleep(200);
158 //DEL cout << " *DBG*EE* ParalExThread::terminate(tid=" << tid_ << ") after usleep " << endl;
159 mtx_.lock();
160 while (state_!=7) mtx_.wait();
161 mtx_.unlock();
162 return;
163}
164
165
166/*!
167 \class ParallelExecutor
168 \ingroup SysTools
169 \brief This class can be used for simultaneous execution of different functions
170*/
171
172/*!
173 \brief Contructor with arguments specifying the number of parallel execution threads
174 and the parallel task object
175*/
176/* --Methode-- */
177ParallelExecutor::ParallelExecutor(ParallelTaskInterface& ptask, size_t nthr)
178 : vpext_(nthr), vrc_(nthr), fgstarted_(false), fgrunning_(false)
179{
180 if (nthr<1) throw ParmError("ParallelExecutor(ptask,nthr)/Error nthr<1");
181 for(size_t i=0; i<vpext_.size(); i++) {
182 vpext_[i] = new ParalExThread(ptask, i);
183 vrc_[i] = 0;
184 }
185}
186
187/*!
188 \brief Contructor with a vector argument containing the list of parallel task objects
189 to be executed.
190*/
191/* --Methode-- */
192ParallelExecutor::ParallelExecutor(vector< ParallelTaskInterface * > vptask)
193 : vpext_(vptask.size()), vrc_(vptask.size()), fgstarted_(false), fgrunning_(false)
194{
195 if (vptask.size()<1) throw ParmError("ParallelExecutor(vector<ptask>/Error vptask.size()<1");
196 for(size_t i=0; i<vpext_.size(); i++) {
197 vpext_[i] = new ParalExThread( (*vptask[i]), i);
198 vrc_[i] = 0;
199 }
200}
201
202/* --Methode-- */
203ParallelExecutor::~ParallelExecutor()
204{
205 //DEL cout << " **DBG*B**~ParallelExecutor() " << endl;
206 for(size_t i=0; i<vpext_.size(); i++) {
207 //DEL cout << " *DBG*C* vpext_[ " << i << " ]->terminate(true) " << endl;
208 if (fgstarted_) vpext_[i]->terminate(true);
209 delete vpext_[i];
210 }
211}
212
213/* --Methode-- */
214void ParallelExecutor::SetParallelTask( ParallelTaskInterface& ptask )
215{
216 if (fgrunning_)
217 throw ZThreadExc("ParallelExecutor::SetParallelTask(ptask)/Error parallel thread are running");
218 for(size_t i=0; i<vpext_.size(); i++)
219 vpext_[i]->setParallelTask(ptask);
220 return;
221}
222
223/* --Methode-- */
224void ParallelExecutor::SetParallelTask( vector< ParallelTaskInterface * > vptask )
225{
226 if (fgrunning_)
227 throw ZThreadExc("ParallelExecutor::SetParallelTask(vector<ptask>)/Error parallel thread are running");
228 if (vptask.size()!=vpext_.size())
229 throw SzMismatchError("ParallelExecutor::SetParallelTask(vector<ptask>) - vptask.size()!=vpext.size()");
230 for(size_t i=0; i<vpext_.size(); i++)
231 vpext_[i]->setParallelTask(*vptask[i]);
232 return;
233}
234
235
236/* --Methode-- */
237void ParallelExecutor::start()
238{
239 if (fgstarted_) return;
240 fgstarted_=true;
241 for(size_t i=0; i<vpext_.size(); i++) vpext_[i]->start();
242 return;
243}
244
245/*!
246 \brief Lauches the ParallelTask::execute() in the parallel threads and waits for their ending
247
248 The return code is zero if all threads have succesfully executed with ParallelTask::execute()
249 return code=0. The methods getRC(i) and IfException(i) should be checked in case of errors.
250*/
251/* --Methode-- */
252int ParallelExecutor::execute()
253{
254 fgrunning_ = true;
255 for(size_t i=0; i<vpext_.size(); i++) vpext_[i]->go();
256 int rc = 0;
257 for(size_t i=0; i<vpext_.size(); i++) {
258 vrc_[i] = vpext_[i]->waitEndOfExecution();
259 if (vrc_[i]!=0) rc++;
260 }
261 fgrunning_ = false;
262 return rc;
263}
264
265} // End namespace SOPHYA
266
Note: See TracBrowser for help on using the repository browser.