source: Sophya/trunk/ArchTOIPipe/Kernel/toimanager.cc@ 2385

Last change on this file since 2385 was 2385, checked in by aubourg, 22 years ago

rings

File size: 7.6 KB
Line 
1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
5// $Id: toimanager.cc,v 1.21 2003-05-19 23:31:29 aubourg Exp $
6
7#include "toimanager.h"
8#include <limits.h>
9#include <pthread.h>
10#include <iostream.h>
11#include <unistd.h>
12#include <map>
13
14#ifndef MAXINT
15#define MAXINT 2147483647
16#endif
17
18TOIManager::TOIManager() {
19 reqBegin = 0;
20 reqEnd = MAXINT;
21
22 // -----------ajout cgt vf 19/08/2002
23 // par defaut TOISegmented
24 selectTOISegmented(1024, 20);
25 // ----------- fin ajout cgt
26
27}
28
29TOIManager* TOIManager::instance = NULL;
30
31TOIManager* TOIManager::getManager() {
32 if (instance == NULL) instance = new TOIManager();
33 return instance;
34}
35
36// ajout vf 26/07/2002
37
38// enregistrement d'un processeur dans la liste des processeurs pour une execution en groupe
39
40void TOIManager::registerProcessor(TOIProcessor* proc) {
41
42 cout << "Adding processor to TOIManager for group execution" << endl;
43 processors.push_back(proc);
44
45}
46
47
48// demarrage de tous les processeurs et verification auto des samplenum pour chaque processeur parametre
49
50void TOIManager::startAll() {
51 // verification des samplenum
52 bool samples_ok=checkSamplesLimits(1);
53 if (samples_ok) {
54 cout << "All limits ok" << endl << "Starting processors" << endl;
55 } else {
56 cout << "One or more limits ajusted for execution" << endl << "Starting processors" << endl;
57 }
58
59 // mise a jour des limites apres verification
60 checkSamplesLimits(2);
61 checkSamplesLimits(3);
62 cout<<"Fin checks"<<endl;
63
64
65 // debogage affichage des limites apres calcul
66 {for (vector<TOIProcessor*>::iterator i = processors.begin();
67 i != processors.end(); i++) {
68 TOIProcessor* proc = *i;
69 proc->printLimits();
70 }
71 }
72
73 // demarrage
74 for (vector<TOIProcessor*>::iterator i = processors.begin();
75 i != processors.end(); i++) {
76 TOIProcessor* proc = *i;
77 cout << "**********************" << endl;
78 cout << "starting processor " << endl;
79 proc->start();
80 cout << "processor started " << endl;
81 }
82 cout << "**********************" << endl;
83}
84
85bool TOIManager::checkSamplesLimits(int pass)
86{
87 bool processor_ok=true;
88 bool samples_ok=true;
89 for (vector<TOIProcessor*>::iterator i = processors.begin();
90 i != processors.end(); i++) {
91 TOIProcessor* proc = *i;
92 cout << "testing processor limits " << endl;
93 // test du processeur
94
95 // test seulement pour les processor cle
96 //if (proc->getRequested()) {
97 processor_ok = proc->checkSampleLimits(pass);
98 //}
99
100 if (processor_ok) {
101 cout << "processor limits ok " << endl;
102 } else {
103 cout << "processor limits ajusted" << endl;
104 samples_ok = false;
105 }
106 }
107 return samples_ok;
108}
109
110// fin ajout vf
111
112void TOIManager::setRequestedSample(int begin, int end) {
113 cout << "TOIManager::setRequestedSample should not be called anymore" << endl;
114 cout << " call same method one any toiprocessor" << endl;
115 exit(-1);
116 reqBegin = begin;
117 reqEnd = end;
118}
119
120int TOIManager::getRequestedBegin() {
121 return reqBegin;
122}
123
124int TOIManager::getRequestedEnd() {
125 return reqEnd;
126}
127
128void TOIManager::addThread(pthread_t* t) {
129 // cout << "adding thread " << t << endl;
130 threads.push_back(t);
131}
132
133void TOIManager::joinAll() {
134 waitForAll();
135}
136
137void TOIManager::waitForAll() {
138 for (vector<pthread_t*>::iterator i = threads.begin();
139 i != threads.end(); i++) {
140 pthread_t* pth = *i;
141 cout << "joining thread " << pth << endl;
142 pthread_join(*pth, NULL);
143 cout << "thread joined " << pth << endl;
144 }
145}
146
147
148// -----------ajout cgt vf 19/08/2002
149
150
151void TOIManager::selectTOISegmented(int bufsz, int maxseg)
152{
153 fgSegmented = true;
154 segBuffsz = bufsz;
155 segMaxseg = maxseg;
156}
157
158void TOIManager::selectTOISeqBuffered(int wsz)
159{
160 fgSegmented = false;
161 segBuffsz = wsz;
162}
163
164// methode connect de cgt simplifiee et corrigee
165TOI& TOIManager::connect(TOIProcessor& prout, string out,
166 TOIProcessor& prin, string in, string nom, int wbsz, bool withFlag)
167{
168 TOI* toi;
169 if (nom.length() < 1) {
170 char buff[128];
171 sprintf(buff, "TOI%s_[%s-%s]", nom, in, out);
172 nom = buff;
173 }
174 if (wbsz < 16) wbsz = segBuffsz;
175
176 // ajout test pour eviter de creer 2 tois en sortie
177 if ((toi=prout.getOutToi(out)) == NULL) {
178 //cout << "toi cree" << endl;
179 if (fgSegmented) toi = new TOISegmented(nom, wbsz, segMaxseg);
180 else toi = new TOISeqBuffered(nom, wbsz);
181 // on ajoute le toi de sortie
182 prout.addOutput(out, toi);
183 } else {
184 //cout << "toi deja cree stop" << endl;
185 }
186
187 if (withFlag) { // Si c'est un FITSTOIWriter
188 FITSTOIWriter* ftw = dynamic_cast< FITSTOIWriter* >(&prin);
189 if (ftw) ftw->addInput(in, toi, withFlag);
190 else prin.addInput(in, toi);
191 }
192 else prin.addInput(in, toi);
193 return(*toi);
194}
195
196
197TOI& TOIManager::connect(TOIProcessor& prout, const char* out,
198 TOIProcessor& prin, const char* in, string nom, int wbsz, bool withFlag)
199{
200 string outs = out;
201 string ins = in;
202 return connect(prout, outs, prin, ins, nom, wbsz, withFlag);
203}
204
205// ----------- fin ajout cgt
206
207
208
209// -----------------------------------------------------------------
210// Classe pour affichage de l'avancement des TOIProcessors
211// Reza 08/2001
212// -----------------------------------------------------------------
213
214RzProcSampleCounter::RzProcSampleCounter()
215{
216 _msg = "SampleCounter/Info";
217 _rate = 50;
218}
219
220RzProcSampleCounter::~RzProcSampleCounter()
221{
222}
223
224long RzProcSampleCounter::PrintStats()
225{
226 int istart = 0;
227 int iend = 0;
228 long dns_print = 1000;
229 int dns_print_fac = _rate;
230 int nbmax_dns_print = 2;
231
232 TOIManager* mgr = TOIManager::getManager();
233
234 // istart = mgr->getRequestedBegin();
235 // iend = mgr->getRequestedEnd();
236 istart = SampleBegin();
237 iend = SampleEnd();
238
239 dns_print = (iend-istart)/dns_print_fac;
240 if (dns_print < 1000) dns_print = ((iend-istart) < 1000) ? (iend-istart) : 1000;
241 if (dns_print < 1) dns_print = 1;
242 nbmax_dns_print = (iend-istart)/dns_print;
243
244 cout << "RzProcSampleCounter::PrintStats() InfoMessage=" << _msg
245 << "\n ... " << _msg << " istart="
246 << istart << " iend= " << iend << " dns_print= " << dns_print
247 << " nbmax_dns_print= " << nbmax_dns_print << endl;
248 // ------------------- Impression continu de stat ------------------------
249 long nb_dns_print = 0;
250 int nb_sleep = 0;
251 long last_sample_count = 0;
252 long processed_samples = 0;
253 long total_sample_count = dns_print*nbmax_dns_print;
254 bool alldone = false;
255 double fracperc = 0.;
256 int fperc = 0;
257 while (!alldone) {
258 processed_samples = ProcessedSampleCount();
259 if ( (processed_samples-last_sample_count > dns_print) ||
260 (processed_samples > total_sample_count-10) ) {
261 last_sample_count = processed_samples;
262 if (nb_dns_print == 0) cout << "\n";
263 nb_dns_print++;
264 fracperc = (double)processed_samples*100./(double)total_sample_count;
265 fperc = fracperc*100;
266 cout << ">>> " << _msg << ": ProcessedSampleCount()= " << last_sample_count
267 << " Frac done = " << (double)fperc/100. << " %" << endl;
268 if (last_sample_count > total_sample_count-10) alldone = true;
269 nb_sleep = 0;
270 }
271 else if ((nb_sleep+1)%5 == 0) {
272 fracperc = (double)processed_samples*100./(double)total_sample_count;
273 fperc = fracperc*100;
274 cout << "> " << _msg << ": ProcSamples()= " << processed_samples
275 << " Done = " << " %" << (double)fperc/100.
276 << " NbSleep(1) = " << nb_sleep << endl;
277 }
278 sleep(1); nb_sleep++;
279 }
280
281 // -----------------------------------------------------------------------
282
283 return last_sample_count;
284
285}
286
287
288
289
290
291
292
293
294
295
296
297
298
299
Note: See TracBrowser for help on using the repository browser.