source: Sophya/trunk/ArchTOIPipe/Kernel/toimanager.cc@ 3720

Last change on this file since 3720 was 2448, checked in by aubourg, 22 years ago

pb long/int

File size: 8.9 KB
RevLine 
[1738]1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
[2448]5// $Id: toimanager.cc,v 1.24 2003-10-13 20:48:37 aubourg Exp $
[1738]6
[1365]7#include "toimanager.h"
8#include <limits.h>
9#include <pthread.h>
[1759]10#include <iostream.h>
[1704]11#include <unistd.h>
[2187]12#include <map>
[1365]13
[2448]14//#ifndef NOCFITSIO
15#include "fitstoiwtr.h"
16//#endif
17
[1702]18#ifndef MAXINT
19#define MAXINT 2147483647
20#endif
21
[1365]22TOIManager::TOIManager() {
23 reqBegin = 0;
[1702]24 reqEnd = MAXINT;
[2187]25
26 // -----------ajout cgt vf 19/08/2002
27 // par defaut TOISegmented
28 selectTOISegmented(1024, 20);
29 // ----------- fin ajout cgt
30
[1365]31}
32
33TOIManager* TOIManager::instance = NULL;
34
35TOIManager* TOIManager::getManager() {
36 if (instance == NULL) instance = new TOIManager();
37 return instance;
38}
39
[2187]40// ajout vf 26/07/2002
41
[2133]42// enregistrement d'un processeur dans la liste des processeurs pour une execution en groupe
43
44void TOIManager::registerProcessor(TOIProcessor* proc) {
45
46 cout << "Adding processor to TOIManager for group execution" << endl;
47 processors.push_back(proc);
48
49}
50
[2386]51void TOIManager::registerProcessor(RingProcessor* proc) {
52
53 cout << "Adding processor to TOIManager for group execution" << endl;
54 ringProcessors.push_back(proc);
55
56}
[2133]57
[2386]58
[2187]59// demarrage de tous les processeurs et verification auto des samplenum pour chaque processeur parametre
[2133]60
61void TOIManager::startAll() {
[2187]62 // verification des samplenum
63 bool samples_ok=checkSamplesLimits(1);
64 if (samples_ok) {
65 cout << "All limits ok" << endl << "Starting processors" << endl;
66 } else {
67 cout << "One or more limits ajusted for execution" << endl << "Starting processors" << endl;
68 }
69
70 // mise a jour des limites apres verification
71 checkSamplesLimits(2);
72 checkSamplesLimits(3);
[2329]73 cout<<"Fin checks"<<endl;
74
[2187]75
76 // debogage affichage des limites apres calcul
[2204]77 {for (vector<TOIProcessor*>::iterator i = processors.begin();
[2133]78 i != processors.end(); i++) {
79 TOIProcessor* proc = *i;
[2187]80 proc->printLimits();
81 }
[2204]82 }
[2187]83
84 // demarrage
[2393]85 {for (vector<TOIProcessor*>::iterator i = processors.begin();
[2187]86 i != processors.end(); i++) {
87 TOIProcessor* proc = *i;
88 cout << "**********************" << endl;
89 cout << "starting processor " << endl;
[2133]90 proc->start();
91 cout << "processor started " << endl;
[2393]92 }}
[2386]93
[2393]94 {for (vector<RingProcessor*>::iterator i = ringProcessors.begin();
[2386]95 i != ringProcessors.end(); i++) {
96 RingProcessor* proc = *i;
97 cout << "**********************" << endl;
98 cout << "starting processor " << endl;
99 proc->start();
100 cout << "processor started " << endl;
[2393]101 }}
[2187]102 cout << "**********************" << endl;
[2133]103}
104
[2386]105void TOIManager::checkRingLimits()
106{
107 int min=0;
108 int max = MAXINT;
109 for (int pass=0; pass<2; pass++) {
110 for (vector<RingProcessor*>::iterator i = ringProcessors.begin();
111 i != ringProcessors.end(); i++) {
112 RingProcessor* proc = *i;
113 proc->getRingRange(min, max);
114 }
115 }
116}
117
[2187]118bool TOIManager::checkSamplesLimits(int pass)
119{
120 bool processor_ok=true;
121 bool samples_ok=true;
122 for (vector<TOIProcessor*>::iterator i = processors.begin();
123 i != processors.end(); i++) {
124 TOIProcessor* proc = *i;
125 cout << "testing processor limits " << endl;
126 // test du processeur
127
128 // test seulement pour les processor cle
129 //if (proc->getRequested()) {
130 processor_ok = proc->checkSampleLimits(pass);
131 //}
132
133 if (processor_ok) {
134 cout << "processor limits ok " << endl;
135 } else {
136 cout << "processor limits ajusted" << endl;
137 samples_ok = false;
138 }
139 }
140 return samples_ok;
141}
[2133]142
[2187]143// fin ajout vf
[2133]144
[2448]145void TOIManager::setRequestedSample(long begin, long end) {
[2385]146 cout << "TOIManager::setRequestedSample should not be called anymore" << endl;
147 cout << " call same method one any toiprocessor" << endl;
148 exit(-1);
[1365]149 reqBegin = begin;
150 reqEnd = end;
151}
152
[1702]153int TOIManager::getRequestedBegin() {
[1365]154 return reqBegin;
155}
156
[1702]157int TOIManager::getRequestedEnd() {
[1365]158 return reqEnd;
159}
160
161void TOIManager::addThread(pthread_t* t) {
162 // cout << "adding thread " << t << endl;
163 threads.push_back(t);
164}
165
166void TOIManager::joinAll() {
[2133]167 waitForAll();
168}
169
170void TOIManager::waitForAll() {
[1365]171 for (vector<pthread_t*>::iterator i = threads.begin();
172 i != threads.end(); i++) {
173 pthread_t* pth = *i;
174 cout << "joining thread " << pth << endl;
175 pthread_join(*pth, NULL);
176 cout << "thread joined " << pth << endl;
177 }
178}
[1687]179
180
[2187]181// -----------ajout cgt vf 19/08/2002
182
183
184void TOIManager::selectTOISegmented(int bufsz, int maxseg)
185{
186 fgSegmented = true;
187 segBuffsz = bufsz;
188 segMaxseg = maxseg;
189}
190
191void TOIManager::selectTOISeqBuffered(int wsz)
192{
193 fgSegmented = false;
194 segBuffsz = wsz;
195}
196
[2386]197RingPipe& TOIManager::connect(RingProcessor& pout, string out,
198 RingProcessor& pin, string in) {
199 RingPipe* pipe;
200 char buff[128];
201 sprintf(buff, "Ring_[%s-%s]", in.c_str(), out.c_str());
202 if ((pipe = pout.getOutRing(out)) == NULL) {
203 pipe = new RingPipe();
204 pout.addRingOutput(out, pipe);
205 }
206
207 pin.addRingInput(in, pipe);
208 return(*pipe);
209}
210
[2187]211// methode connect de cgt simplifiee et corrigee
[2220]212TOI& TOIManager::connect(TOIProcessor& prout, string out,
213 TOIProcessor& prin, string in, string nom, int wbsz, bool withFlag)
[2187]214{
215 TOI* toi;
216 if (nom.length() < 1) {
217 char buff[128];
[2386]218 sprintf(buff, "TOI%s_[%s-%s]", nom.c_str(), in.c_str(), out.c_str());
[2187]219 nom = buff;
220 }
221 if (wbsz < 16) wbsz = segBuffsz;
222
223 // ajout test pour eviter de creer 2 tois en sortie
224 if ((toi=prout.getOutToi(out)) == NULL) {
225 //cout << "toi cree" << endl;
226 if (fgSegmented) toi = new TOISegmented(nom, wbsz, segMaxseg);
227 else toi = new TOISeqBuffered(nom, wbsz);
228 // on ajoute le toi de sortie
229 prout.addOutput(out, toi);
230 } else {
231 //cout << "toi deja cree stop" << endl;
232 }
233
234 if (withFlag) { // Si c'est un FITSTOIWriter
[2448]235 //#ifndef NOCFITSIO
[2187]236 FITSTOIWriter* ftw = dynamic_cast< FITSTOIWriter* >(&prin);
237 if (ftw) ftw->addInput(in, toi, withFlag);
238 else prin.addInput(in, toi);
[2448]239 //#else
240 // prin.addInput(in, toi);
241 //#endif
[2187]242 }
243 else prin.addInput(in, toi);
244 return(*toi);
245}
246
247
248TOI& TOIManager::connect(TOIProcessor& prout, const char* out,
249 TOIProcessor& prin, const char* in, string nom, int wbsz, bool withFlag)
250{
251 string outs = out;
252 string ins = in;
253 return connect(prout, outs, prin, ins, nom, wbsz, withFlag);
254}
255
256// ----------- fin ajout cgt
257
258
259
[1687]260// -----------------------------------------------------------------
261// Classe pour affichage de l'avancement des TOIProcessors
262// Reza 08/2001
263// -----------------------------------------------------------------
264
265RzProcSampleCounter::RzProcSampleCounter()
266{
267 _msg = "SampleCounter/Info";
268 _rate = 50;
269}
270
271RzProcSampleCounter::~RzProcSampleCounter()
272{
273}
274
275long RzProcSampleCounter::PrintStats()
276{
277 int istart = 0;
278 int iend = 0;
279 long dns_print = 1000;
280 int dns_print_fac = _rate;
281 int nbmax_dns_print = 2;
282
[2386]283 // TOIManager* mgr = TOIManager::getManager();
[1687]284
[1999]285 // istart = mgr->getRequestedBegin();
286 // iend = mgr->getRequestedEnd();
287 istart = SampleBegin();
288 iend = SampleEnd();
[1687]289
290 dns_print = (iend-istart)/dns_print_fac;
291 if (dns_print < 1000) dns_print = ((iend-istart) < 1000) ? (iend-istart) : 1000;
292 if (dns_print < 1) dns_print = 1;
293 nbmax_dns_print = (iend-istart)/dns_print;
294
295 cout << "RzProcSampleCounter::PrintStats() InfoMessage=" << _msg
296 << "\n ... " << _msg << " istart="
297 << istart << " iend= " << iend << " dns_print= " << dns_print
298 << " nbmax_dns_print= " << nbmax_dns_print << endl;
299 // ------------------- Impression continu de stat ------------------------
300 long nb_dns_print = 0;
301 int nb_sleep = 0;
302 long last_sample_count = 0;
303 long processed_samples = 0;
304 long total_sample_count = dns_print*nbmax_dns_print;
305 bool alldone = false;
[2077]306 double fracperc = 0.;
307 int fperc = 0;
[1687]308 while (!alldone) {
309 processed_samples = ProcessedSampleCount();
310 if ( (processed_samples-last_sample_count > dns_print) ||
[1702]311 (processed_samples > total_sample_count-10) ) {
[1687]312 last_sample_count = processed_samples;
313 if (nb_dns_print == 0) cout << "\n";
314 nb_dns_print++;
[2077]315 fracperc = (double)processed_samples*100./(double)total_sample_count;
[2386]316 fperc = (int)(fracperc*100);
[1687]317 cout << ">>> " << _msg << ": ProcessedSampleCount()= " << last_sample_count
[2077]318 << " Frac done = " << (double)fperc/100. << " %" << endl;
[1687]319 if (last_sample_count > total_sample_count-10) alldone = true;
320 nb_sleep = 0;
321 }
[2077]322 else if ((nb_sleep+1)%5 == 0) {
323 fracperc = (double)processed_samples*100./(double)total_sample_count;
[2386]324 fperc = (int)(fracperc*100);
[1687]325 cout << "> " << _msg << ": ProcSamples()= " << processed_samples
[2077]326 << " Done = " << " %" << (double)fperc/100.
[1702]327 << " NbSleep(1) = " << nb_sleep << endl;
[2077]328 }
[1687]329 sleep(1); nb_sleep++;
330 }
331
332 // -----------------------------------------------------------------------
333
334 return last_sample_count;
335
336}
[2187]337
338
339
340
341
342
343
344
345
346
347
348
349
350
Note: See TracBrowser for help on using the repository browser.