source: Sophya/trunk/SophyaLib/SysTools/parlex.cc@ 3835

Last change on this file since 3835 was 3730, checked in by cmv, 16 years ago

strcmp a besoin de <string.h>, cmv 06/01/2020

File size: 6.8 KB
Line 
1#include <stdlib.h>
2#include <string.h>
3#include <unistd.h>
4#include <iostream>
5
6#include "parlex.h"
7
8using namespace std;
9
10namespace SOPHYA {
11
12/*!
13 \class ParallelTaskFunction
14 \ingroup SysTools
15 \brief ParallelTaskInterface implementation for functions
16*/
17//! Constructor with the specification of the function to be executed
18ParallelTaskFunction::ParallelTaskFunction(ParalExFunction f)
19 : parfunc_(f)
20{
21}
22int ParallelTaskFunction::execute(int tid)
23{
24 return parfunc_(tid);
25}
26
27/*!
28 \class ParalExThread
29 \ingroup SysTools
30 \brief Thread objects that provide methods to control their execution.
31*/
32/* --Methode-- */
33/*!
34 \brief Constructor with the specification of the ParallelTask object and the
35 tread-id or rank in a context of parallel execution
36*/
37ParalExThread::ParalExThread(ParallelTaskInterface& ptask, int tid)
38 : ptask_(ptask), tid_(tid), mtx_(false), state_(0)
39{
40}
41
42/* --Methode-- */
43//! specific implementation of the ZThread::run() method
44void ParalExThread::run()
45{
46 bool fgencore=true;
47 while(fgencore) {
48 mtx_.lock();
49 while ((state_!=1)&&(state_!=6)) mtx_.wait();
50 if (state_==6) { // Signal de fin d'execution du thread
51 setRC(0);
52 state_=7;
53 //DEL cout << " *DBG*FF* ParalExThread::run(tid=" << tid_ << ") state->7 " << endl;
54 mtx_.unlock();
55 mtx_.signal();
56 fgencore=false;
57 return;
58 }
59 state_=2;
60 int rcex = 0;
61 try {
62 rcex = ptask_.execute(tid_);
63 }
64 catch (std::exception exc) {
65 fgexcept_ = true;
66 msg_exc_ = "std::exception in ParalExThread::run(): ";
67 msg_exc_ += exc.what();
68 cerr << "ParalExThread::run(): catched std::exception " << exc.what() << endl;
69 rcex = 77;
70 }
71 catch (...) {
72 fgexcept_ = true;
73 msg_exc_ = "Unknown exception (...) in ParalExThread::run(): ";
74 cerr << "ParalExThread::run(): catched unknown (...) exception " << endl;
75 rcex = 78;
76 }
77 setRC(rcex);
78 state_=3;
79 mtx_.unlock();
80 mtx_.signal();
81 }
82 return;
83}
84
85/*!
86 \brief Launches the task execution (call ptask.execute() )
87 The thread should have been previously started using the start() method.
88*/
89/* --Methode-- */
90int ParalExThread::go()
91{
92 // if (!IfStarted()) start();
93 mtx_.lock();
94 if (state_>5) { // condition d'erreur - thread termine
95 mtx_.unlock();
96 return 99;
97 }
98 // Attente conditions pour lancer l'execution
99 while (state_!=0) mtx_.wait();
100
101 // On modifie l'etat pour lancer l'execution
102 state_ = 1;
103 fgexcept_ = false;
104 msg_exc_ = "";
105 mtx_.unlock();
106 mtx_.signal();
107 return 0;
108}
109
110/* --Methode-- */
111//! Waits till the end of execution launched by go()
112int ParalExThread::waitEndOfExecution()
113{
114 int rc = 0;
115 mtx_.lock();
116 if (state_>5) rc = 99;
117 else if (state_<1) rc = 97;
118 if (rc !=0) { // condition d'erreur - thread termine ou execution non lance
119 mtx_.unlock();
120 return rc;
121 }
122 while (state_!=3) mtx_.wait();
123 state_ = 0;
124 mtx_.unlock();
125 mtx_.signal();
126 return(getRC());
127}
128
129/* --Methode-- */
130int ParalExThread::setParallelTask(ParallelTaskInterface& ptask)
131{
132 int rc = 0;
133 mtx_.lock();
134 if (state_>5) { // condition d'erreur - thread termine
135 mtx_.unlock();
136 return 99;
137 }
138 while (state_!=0) mtx_.wait();
139 ptask_ = ptask;
140 mtx_.unlock();
141 return(0);
142}
143
144/* --Methode-- */
145/*!
146 \brief Terminates (stops) the thread execution.
147
148 if fgwait==true, function returns only after the effective thread termination.
149*/
150void ParalExThread::terminate(bool fgwait)
151{
152 //DEL cout << " *DBG* ParalExThread::terminate(tid=" << tid_ << ") " << endl;
153 mtx_.lock();
154 state_=6;
155 mtx_.unlock();
156 mtx_.signal();
157 if (!fgwait) return;
158 usleep(200);
159 //DEL cout << " *DBG*EE* ParalExThread::terminate(tid=" << tid_ << ") after usleep " << endl;
160 mtx_.lock();
161 while (state_!=7) mtx_.wait();
162 mtx_.unlock();
163 return;
164}
165
166
167/*!
168 \class ParallelExecutor
169 \ingroup SysTools
170 \brief This class can be used for simultaneous execution of different functions
171*/
172
173/*!
174 \brief Contructor with arguments specifying the number of parallel execution threads
175 and the parallel task object
176*/
177/* --Methode-- */
178ParallelExecutor::ParallelExecutor(ParallelTaskInterface& ptask, size_t nthr)
179 : vpext_(nthr), vrc_(nthr), fgstarted_(false), fgrunning_(false)
180{
181 if (nthr<1) throw ParmError("ParallelExecutor(ptask,nthr)/Error nthr<1");
182 for(size_t i=0; i<vpext_.size(); i++) {
183 vpext_[i] = new ParalExThread(ptask, i);
184 vrc_[i] = 0;
185 }
186}
187
188/*!
189 \brief Contructor with a vector argument containing the list of parallel task objects
190 to be executed.
191*/
192/* --Methode-- */
193ParallelExecutor::ParallelExecutor(vector< ParallelTaskInterface * > vptask)
194 : vpext_(vptask.size()), vrc_(vptask.size()), fgstarted_(false), fgrunning_(false)
195{
196 if (vptask.size()<1) throw ParmError("ParallelExecutor(vector<ptask>/Error vptask.size()<1");
197 for(size_t i=0; i<vpext_.size(); i++) {
198 vpext_[i] = new ParalExThread( (*vptask[i]), i);
199 vrc_[i] = 0;
200 }
201}
202
203/* --Methode-- */
204ParallelExecutor::~ParallelExecutor()
205{
206 //DEL cout << " **DBG*B**~ParallelExecutor() " << endl;
207 for(size_t i=0; i<vpext_.size(); i++) {
208 //DEL cout << " *DBG*C* vpext_[ " << i << " ]->terminate(true) " << endl;
209 if (fgstarted_) vpext_[i]->terminate(true);
210 delete vpext_[i];
211 }
212}
213
214/* --Methode-- */
215void ParallelExecutor::SetParallelTask( ParallelTaskInterface& ptask )
216{
217 if (fgrunning_)
218 throw ZThreadExc("ParallelExecutor::SetParallelTask(ptask)/Error parallel thread are running");
219 for(size_t i=0; i<vpext_.size(); i++)
220 vpext_[i]->setParallelTask(ptask);
221 return;
222}
223
224/* --Methode-- */
225void ParallelExecutor::SetParallelTask( vector< ParallelTaskInterface * > vptask )
226{
227 if (fgrunning_)
228 throw ZThreadExc("ParallelExecutor::SetParallelTask(vector<ptask>)/Error parallel thread are running");
229 if (vptask.size()!=vpext_.size())
230 throw SzMismatchError("ParallelExecutor::SetParallelTask(vector<ptask>) - vptask.size()!=vpext.size()");
231 for(size_t i=0; i<vpext_.size(); i++)
232 vpext_[i]->setParallelTask(*vptask[i]);
233 return;
234}
235
236
237/* --Methode-- */
238void ParallelExecutor::start()
239{
240 if (fgstarted_) return;
241 fgstarted_=true;
242 for(size_t i=0; i<vpext_.size(); i++) vpext_[i]->start();
243 return;
244}
245
246/*!
247 \brief Lauches the ParallelTask::execute() in the parallel threads and waits for their ending
248
249 The return code is zero if all threads have succesfully executed with ParallelTask::execute()
250 return code=0. The methods getRC(i) and IfException(i) should be checked in case of errors.
251*/
252/* --Methode-- */
253int ParallelExecutor::execute()
254{
255 fgrunning_ = true;
256 for(size_t i=0; i<vpext_.size(); i++) vpext_[i]->go();
257 int rc = 0;
258 for(size_t i=0; i<vpext_.size(); i++) {
259 vrc_[i] = vpext_[i]->waitEndOfExecution();
260 if (vrc_[i]!=0) rc++;
261 }
262 fgrunning_ = false;
263 return rc;
264}
265
266} // End namespace SOPHYA
267
Note: See TracBrowser for help on using the repository browser.