source: Sophya/trunk/ArchTOIPipe/Kernel/toimanager.cc@ 2396

Last change on this file since 2396 was 2393, checked in by aubourg, 22 years ago

pour magique

File size: 8.7 KB
RevLine 
[1738]1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
[2393]5// $Id: toimanager.cc,v 1.23 2003-05-28 21:03:53 aubourg Exp $
[1738]6
[1365]7#include "toimanager.h"
8#include <limits.h>
9#include <pthread.h>
[1759]10#include <iostream.h>
[1704]11#include <unistd.h>
[2187]12#include <map>
[1365]13
[1702]14#ifndef MAXINT
15#define MAXINT 2147483647
16#endif
17
[1365]18TOIManager::TOIManager() {
19 reqBegin = 0;
[1702]20 reqEnd = MAXINT;
[2187]21
22 // -----------ajout cgt vf 19/08/2002
23 // par defaut TOISegmented
24 selectTOISegmented(1024, 20);
25 // ----------- fin ajout cgt
26
[1365]27}
28
29TOIManager* TOIManager::instance = NULL;
30
31TOIManager* TOIManager::getManager() {
32 if (instance == NULL) instance = new TOIManager();
33 return instance;
34}
35
[2187]36// ajout vf 26/07/2002
37
[2133]38// enregistrement d'un processeur dans la liste des processeurs pour une execution en groupe
39
40void TOIManager::registerProcessor(TOIProcessor* proc) {
41
42 cout << "Adding processor to TOIManager for group execution" << endl;
43 processors.push_back(proc);
44
45}
46
[2386]47void TOIManager::registerProcessor(RingProcessor* proc) {
48
49 cout << "Adding processor to TOIManager for group execution" << endl;
50 ringProcessors.push_back(proc);
51
52}
[2133]53
[2386]54
[2187]55// demarrage de tous les processeurs et verification auto des samplenum pour chaque processeur parametre
[2133]56
57void TOIManager::startAll() {
[2187]58 // verification des samplenum
59 bool samples_ok=checkSamplesLimits(1);
60 if (samples_ok) {
61 cout << "All limits ok" << endl << "Starting processors" << endl;
62 } else {
63 cout << "One or more limits ajusted for execution" << endl << "Starting processors" << endl;
64 }
65
66 // mise a jour des limites apres verification
67 checkSamplesLimits(2);
68 checkSamplesLimits(3);
[2329]69 cout<<"Fin checks"<<endl;
70
[2187]71
72 // debogage affichage des limites apres calcul
[2204]73 {for (vector<TOIProcessor*>::iterator i = processors.begin();
[2133]74 i != processors.end(); i++) {
75 TOIProcessor* proc = *i;
[2187]76 proc->printLimits();
77 }
[2204]78 }
[2187]79
80 // demarrage
[2393]81 {for (vector<TOIProcessor*>::iterator i = processors.begin();
[2187]82 i != processors.end(); i++) {
83 TOIProcessor* proc = *i;
84 cout << "**********************" << endl;
85 cout << "starting processor " << endl;
[2133]86 proc->start();
87 cout << "processor started " << endl;
[2393]88 }}
[2386]89
[2393]90 {for (vector<RingProcessor*>::iterator i = ringProcessors.begin();
[2386]91 i != ringProcessors.end(); i++) {
92 RingProcessor* proc = *i;
93 cout << "**********************" << endl;
94 cout << "starting processor " << endl;
95 proc->start();
96 cout << "processor started " << endl;
[2393]97 }}
[2187]98 cout << "**********************" << endl;
[2133]99}
100
[2386]101void TOIManager::checkRingLimits()
102{
103 int min=0;
104 int max = MAXINT;
105 for (int pass=0; pass<2; pass++) {
106 for (vector<RingProcessor*>::iterator i = ringProcessors.begin();
107 i != ringProcessors.end(); i++) {
108 RingProcessor* proc = *i;
109 proc->getRingRange(min, max);
110 }
111 }
112}
113
[2187]114bool TOIManager::checkSamplesLimits(int pass)
115{
116 bool processor_ok=true;
117 bool samples_ok=true;
118 for (vector<TOIProcessor*>::iterator i = processors.begin();
119 i != processors.end(); i++) {
120 TOIProcessor* proc = *i;
121 cout << "testing processor limits " << endl;
122 // test du processeur
123
124 // test seulement pour les processor cle
125 //if (proc->getRequested()) {
126 processor_ok = proc->checkSampleLimits(pass);
127 //}
128
129 if (processor_ok) {
130 cout << "processor limits ok " << endl;
131 } else {
132 cout << "processor limits ajusted" << endl;
133 samples_ok = false;
134 }
135 }
136 return samples_ok;
137}
[2133]138
[2187]139// fin ajout vf
[2133]140
[1702]141void TOIManager::setRequestedSample(int begin, int end) {
[2385]142 cout << "TOIManager::setRequestedSample should not be called anymore" << endl;
143 cout << " call same method one any toiprocessor" << endl;
144 exit(-1);
[1365]145 reqBegin = begin;
146 reqEnd = end;
147}
148
[1702]149int TOIManager::getRequestedBegin() {
[1365]150 return reqBegin;
151}
152
[1702]153int TOIManager::getRequestedEnd() {
[1365]154 return reqEnd;
155}
156
157void TOIManager::addThread(pthread_t* t) {
158 // cout << "adding thread " << t << endl;
159 threads.push_back(t);
160}
161
162void TOIManager::joinAll() {
[2133]163 waitForAll();
164}
165
166void TOIManager::waitForAll() {
[1365]167 for (vector<pthread_t*>::iterator i = threads.begin();
168 i != threads.end(); i++) {
169 pthread_t* pth = *i;
170 cout << "joining thread " << pth << endl;
171 pthread_join(*pth, NULL);
172 cout << "thread joined " << pth << endl;
173 }
174}
[1687]175
176
[2187]177// -----------ajout cgt vf 19/08/2002
178
179
180void TOIManager::selectTOISegmented(int bufsz, int maxseg)
181{
182 fgSegmented = true;
183 segBuffsz = bufsz;
184 segMaxseg = maxseg;
185}
186
187void TOIManager::selectTOISeqBuffered(int wsz)
188{
189 fgSegmented = false;
190 segBuffsz = wsz;
191}
192
[2386]193RingPipe& TOIManager::connect(RingProcessor& pout, string out,
194 RingProcessor& pin, string in) {
195 RingPipe* pipe;
196 char buff[128];
197 sprintf(buff, "Ring_[%s-%s]", in.c_str(), out.c_str());
198 if ((pipe = pout.getOutRing(out)) == NULL) {
199 pipe = new RingPipe();
200 pout.addRingOutput(out, pipe);
201 }
202
203 pin.addRingInput(in, pipe);
204 return(*pipe);
205}
206
[2187]207// methode connect de cgt simplifiee et corrigee
[2220]208TOI& TOIManager::connect(TOIProcessor& prout, string out,
209 TOIProcessor& prin, string in, string nom, int wbsz, bool withFlag)
[2187]210{
211 TOI* toi;
212 if (nom.length() < 1) {
213 char buff[128];
[2386]214 sprintf(buff, "TOI%s_[%s-%s]", nom.c_str(), in.c_str(), out.c_str());
[2187]215 nom = buff;
216 }
217 if (wbsz < 16) wbsz = segBuffsz;
218
219 // ajout test pour eviter de creer 2 tois en sortie
220 if ((toi=prout.getOutToi(out)) == NULL) {
221 //cout << "toi cree" << endl;
222 if (fgSegmented) toi = new TOISegmented(nom, wbsz, segMaxseg);
223 else toi = new TOISeqBuffered(nom, wbsz);
224 // on ajoute le toi de sortie
225 prout.addOutput(out, toi);
226 } else {
227 //cout << "toi deja cree stop" << endl;
228 }
229
230 if (withFlag) { // Si c'est un FITSTOIWriter
231 FITSTOIWriter* ftw = dynamic_cast< FITSTOIWriter* >(&prin);
232 if (ftw) ftw->addInput(in, toi, withFlag);
233 else prin.addInput(in, toi);
234 }
235 else prin.addInput(in, toi);
236 return(*toi);
237}
238
239
240TOI& TOIManager::connect(TOIProcessor& prout, const char* out,
241 TOIProcessor& prin, const char* in, string nom, int wbsz, bool withFlag)
242{
243 string outs = out;
244 string ins = in;
245 return connect(prout, outs, prin, ins, nom, wbsz, withFlag);
246}
247
248// ----------- fin ajout cgt
249
250
251
[1687]252// -----------------------------------------------------------------
253// Classe pour affichage de l'avancement des TOIProcessors
254// Reza 08/2001
255// -----------------------------------------------------------------
256
257RzProcSampleCounter::RzProcSampleCounter()
258{
259 _msg = "SampleCounter/Info";
260 _rate = 50;
261}
262
263RzProcSampleCounter::~RzProcSampleCounter()
264{
265}
266
267long RzProcSampleCounter::PrintStats()
268{
269 int istart = 0;
270 int iend = 0;
271 long dns_print = 1000;
272 int dns_print_fac = _rate;
273 int nbmax_dns_print = 2;
274
[2386]275 // TOIManager* mgr = TOIManager::getManager();
[1687]276
[1999]277 // istart = mgr->getRequestedBegin();
278 // iend = mgr->getRequestedEnd();
279 istart = SampleBegin();
280 iend = SampleEnd();
[1687]281
282 dns_print = (iend-istart)/dns_print_fac;
283 if (dns_print < 1000) dns_print = ((iend-istart) < 1000) ? (iend-istart) : 1000;
284 if (dns_print < 1) dns_print = 1;
285 nbmax_dns_print = (iend-istart)/dns_print;
286
287 cout << "RzProcSampleCounter::PrintStats() InfoMessage=" << _msg
288 << "\n ... " << _msg << " istart="
289 << istart << " iend= " << iend << " dns_print= " << dns_print
290 << " nbmax_dns_print= " << nbmax_dns_print << endl;
291 // ------------------- Impression continu de stat ------------------------
292 long nb_dns_print = 0;
293 int nb_sleep = 0;
294 long last_sample_count = 0;
295 long processed_samples = 0;
296 long total_sample_count = dns_print*nbmax_dns_print;
297 bool alldone = false;
[2077]298 double fracperc = 0.;
299 int fperc = 0;
[1687]300 while (!alldone) {
301 processed_samples = ProcessedSampleCount();
302 if ( (processed_samples-last_sample_count > dns_print) ||
[1702]303 (processed_samples > total_sample_count-10) ) {
[1687]304 last_sample_count = processed_samples;
305 if (nb_dns_print == 0) cout << "\n";
306 nb_dns_print++;
[2077]307 fracperc = (double)processed_samples*100./(double)total_sample_count;
[2386]308 fperc = (int)(fracperc*100);
[1687]309 cout << ">>> " << _msg << ": ProcessedSampleCount()= " << last_sample_count
[2077]310 << " Frac done = " << (double)fperc/100. << " %" << endl;
[1687]311 if (last_sample_count > total_sample_count-10) alldone = true;
312 nb_sleep = 0;
313 }
[2077]314 else if ((nb_sleep+1)%5 == 0) {
315 fracperc = (double)processed_samples*100./(double)total_sample_count;
[2386]316 fperc = (int)(fracperc*100);
[1687]317 cout << "> " << _msg << ": ProcSamples()= " << processed_samples
[2077]318 << " Done = " << " %" << (double)fperc/100.
[1702]319 << " NbSleep(1) = " << nb_sleep << endl;
[2077]320 }
[1687]321 sleep(1); nb_sleep++;
322 }
323
324 // -----------------------------------------------------------------------
325
326 return last_sample_count;
327
328}
[2187]329
330
331
332
333
334
335
336
337
338
339
340
341
342
Note: See TracBrowser for help on using the repository browser.