source: Sophya/trunk/ArchTOIPipe/Kernel/toimanager.cc@ 2192

Last change on this file since 2192 was 2187, checked in by aubourg, 23 years ago

Vivien, limites processors et connect

File size: 7.4 KB
RevLine 
[1738]1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
[2187]5// $Id: toimanager.cc,v 1.17 2002-09-09 15:33:15 aubourg Exp $
[1738]6
[1365]7#include "toimanager.h"
8#include <limits.h>
9#include <pthread.h>
[1759]10#include <iostream.h>
[1704]11#include <unistd.h>
[2187]12#include <map>
[1365]13
[1702]14#ifndef MAXINT
15#define MAXINT 2147483647
16#endif
17
[1365]18TOIManager::TOIManager() {
19 reqBegin = 0;
[1702]20 reqEnd = MAXINT;
[2187]21
22 // -----------ajout cgt vf 19/08/2002
23 // par defaut TOISegmented
24 selectTOISegmented(1024, 20);
25 // ----------- fin ajout cgt
26
[1365]27}
28
29TOIManager* TOIManager::instance = NULL;
30
31TOIManager* TOIManager::getManager() {
32 if (instance == NULL) instance = new TOIManager();
33 return instance;
34}
35
[2187]36// ajout vf 26/07/2002
37
[2133]38// enregistrement d'un processeur dans la liste des processeurs pour une execution en groupe
39
40void TOIManager::registerProcessor(TOIProcessor* proc) {
41
42 cout << "Adding processor to TOIManager for group execution" << endl;
43 processors.push_back(proc);
44
45}
46
47
[2187]48// demarrage de tous les processeurs et verification auto des samplenum pour chaque processeur parametre
[2133]49
50void TOIManager::startAll() {
[2187]51 // verification des samplenum
52 bool samples_ok=checkSamplesLimits(1);
53 if (samples_ok) {
54 cout << "All limits ok" << endl << "Starting processors" << endl;
55 } else {
56 cout << "One or more limits ajusted for execution" << endl << "Starting processors" << endl;
57 }
58
59 // mise a jour des limites apres verification
60 checkSamplesLimits(2);
61 checkSamplesLimits(3);
62
63 // debogage affichage des limites apres calcul
[2133]64 for (vector<TOIProcessor*>::iterator i = processors.begin();
65 i != processors.end(); i++) {
66 TOIProcessor* proc = *i;
[2187]67 proc->printLimits();
68 }
69
70 // demarrage
71 for (vector<TOIProcessor*>::iterator i = processors.begin();
72 i != processors.end(); i++) {
73 TOIProcessor* proc = *i;
74 cout << "**********************" << endl;
75 cout << "starting processor " << endl;
[2133]76 proc->start();
77 cout << "processor started " << endl;
78 }
[2187]79 cout << "**********************" << endl;
[2133]80}
81
[2187]82bool TOIManager::checkSamplesLimits(int pass)
83{
84 bool processor_ok=true;
85 bool samples_ok=true;
86 for (vector<TOIProcessor*>::iterator i = processors.begin();
87 i != processors.end(); i++) {
88 TOIProcessor* proc = *i;
89 cout << "testing processor limits " << endl;
90 // test du processeur
91
92 // test seulement pour les processor cle
93 //if (proc->getRequested()) {
94 processor_ok = proc->checkSampleLimits(pass);
95 //}
96
97 if (processor_ok) {
98 cout << "processor limits ok " << endl;
99 } else {
100 cout << "processor limits ajusted" << endl;
101 samples_ok = false;
102 }
103 }
104 return samples_ok;
105}
[2133]106
[2187]107// fin ajout vf
[2133]108
[1702]109void TOIManager::setRequestedSample(int begin, int end) {
[1365]110 reqBegin = begin;
111 reqEnd = end;
112}
113
[1702]114int TOIManager::getRequestedBegin() {
[1365]115 return reqBegin;
116}
117
[1702]118int TOIManager::getRequestedEnd() {
[1365]119 return reqEnd;
120}
121
122void TOIManager::addThread(pthread_t* t) {
123 // cout << "adding thread " << t << endl;
124 threads.push_back(t);
125}
126
127void TOIManager::joinAll() {
[2133]128 waitForAll();
129}
130
131void TOIManager::waitForAll() {
[1365]132 for (vector<pthread_t*>::iterator i = threads.begin();
133 i != threads.end(); i++) {
134 pthread_t* pth = *i;
135 cout << "joining thread " << pth << endl;
136 pthread_join(*pth, NULL);
137 cout << "thread joined " << pth << endl;
138 }
139}
[1687]140
141
[2187]142// -----------ajout cgt vf 19/08/2002
143
144
145void TOIManager::selectTOISegmented(int bufsz, int maxseg)
146{
147 fgSegmented = true;
148 segBuffsz = bufsz;
149 segMaxseg = maxseg;
150}
151
152void TOIManager::selectTOISeqBuffered(int wsz)
153{
154 fgSegmented = false;
155 segBuffsz = wsz;
156}
157
158// methode connect de cgt simplifiee et corrigee
159TOI& TOIManager::connect(TOIProcessor& prout, string& out,
160 TOIProcessor& prin, string& in, string nom, int wbsz, bool withFlag)
161{
162 TOI* toi;
163 if (nom.length() < 1) {
164 char buff[128];
165 sprintf(buff, "TOI%s_[%s-%s]", nom, in, out);
166 nom = buff;
167 }
168 if (wbsz < 16) wbsz = segBuffsz;
169
170 // ajout test pour eviter de creer 2 tois en sortie
171 if ((toi=prout.getOutToi(out)) == NULL) {
172 //cout << "toi cree" << endl;
173 if (fgSegmented) toi = new TOISegmented(nom, wbsz, segMaxseg);
174 else toi = new TOISeqBuffered(nom, wbsz);
175 // on ajoute le toi de sortie
176 prout.addOutput(out, toi);
177 } else {
178 //cout << "toi deja cree stop" << endl;
179 }
180
181 if (withFlag) { // Si c'est un FITSTOIWriter
182 FITSTOIWriter* ftw = dynamic_cast< FITSTOIWriter* >(&prin);
183 if (ftw) ftw->addInput(in, toi, withFlag);
184 else prin.addInput(in, toi);
185 }
186 else prin.addInput(in, toi);
187 return(*toi);
188}
189
190
191TOI& TOIManager::connect(TOIProcessor& prout, const char* out,
192 TOIProcessor& prin, const char* in, string nom, int wbsz, bool withFlag)
193{
194 string outs = out;
195 string ins = in;
196 return connect(prout, outs, prin, ins, nom, wbsz, withFlag);
197}
198
199// ----------- fin ajout cgt
200
201
202
[1687]203// -----------------------------------------------------------------
204// Classe pour affichage de l'avancement des TOIProcessors
205// Reza 08/2001
206// -----------------------------------------------------------------
207
208RzProcSampleCounter::RzProcSampleCounter()
209{
210 _msg = "SampleCounter/Info";
211 _rate = 50;
212}
213
214RzProcSampleCounter::~RzProcSampleCounter()
215{
216}
217
218long RzProcSampleCounter::PrintStats()
219{
220 int istart = 0;
221 int iend = 0;
222 long dns_print = 1000;
223 int dns_print_fac = _rate;
224 int nbmax_dns_print = 2;
225
226 TOIManager* mgr = TOIManager::getManager();
227
[1999]228 // istart = mgr->getRequestedBegin();
229 // iend = mgr->getRequestedEnd();
230 istart = SampleBegin();
231 iend = SampleEnd();
[1687]232
233 dns_print = (iend-istart)/dns_print_fac;
234 if (dns_print < 1000) dns_print = ((iend-istart) < 1000) ? (iend-istart) : 1000;
235 if (dns_print < 1) dns_print = 1;
236 nbmax_dns_print = (iend-istart)/dns_print;
237
238 cout << "RzProcSampleCounter::PrintStats() InfoMessage=" << _msg
239 << "\n ... " << _msg << " istart="
240 << istart << " iend= " << iend << " dns_print= " << dns_print
241 << " nbmax_dns_print= " << nbmax_dns_print << endl;
242 // ------------------- Impression continu de stat ------------------------
243 long nb_dns_print = 0;
244 int nb_sleep = 0;
245 long last_sample_count = 0;
246 long processed_samples = 0;
247 long total_sample_count = dns_print*nbmax_dns_print;
248 bool alldone = false;
[2077]249 double fracperc = 0.;
250 int fperc = 0;
[1687]251 while (!alldone) {
252 processed_samples = ProcessedSampleCount();
253 if ( (processed_samples-last_sample_count > dns_print) ||
[1702]254 (processed_samples > total_sample_count-10) ) {
[1687]255 last_sample_count = processed_samples;
256 if (nb_dns_print == 0) cout << "\n";
257 nb_dns_print++;
[2077]258 fracperc = (double)processed_samples*100./(double)total_sample_count;
259 fperc = fracperc*100;
[1687]260 cout << ">>> " << _msg << ": ProcessedSampleCount()= " << last_sample_count
[2077]261 << " Frac done = " << (double)fperc/100. << " %" << endl;
[1687]262 if (last_sample_count > total_sample_count-10) alldone = true;
263 nb_sleep = 0;
264 }
[2077]265 else if ((nb_sleep+1)%5 == 0) {
266 fracperc = (double)processed_samples*100./(double)total_sample_count;
267 fperc = fracperc*100;
[1687]268 cout << "> " << _msg << ": ProcSamples()= " << processed_samples
[2077]269 << " Done = " << " %" << (double)fperc/100.
[1702]270 << " NbSleep(1) = " << nb_sleep << endl;
[2077]271 }
[1687]272 sleep(1); nb_sleep++;
273 }
274
275 // -----------------------------------------------------------------------
276
277 return last_sample_count;
278
279}
[2187]280
281
282
283
284
285
286
287
288
289
290
291
292
293
Note: See TracBrowser for help on using the repository browser.