source: Sophya/trunk/AddOn/TAcq/brfitsrd.cc@ 3945

Last change on this file since 3945 was 3939, checked in by ansari, 15 years ago

1/ La description s'applique a la revision precedente qui a ete committee
avec un commentaire errone
2/ Mise en place des methodes et mecanismes permettant de traiter et transmettre correctement
le temps d'acquisition et le temps des paquets depuis la lecture fits aux fichiers de
traitement
3/ Ameliorations diverses ds BRMeanSpecCalculator et specmfib.cc

Reza 13/01/2011

File size: 14.8 KB
Line 
1//----------------------------------------------------------------
2// Projet BAORadio - (C) LAL/IRFU 2008-2011
3// Classes de threads pour lecture fichiers fits BAORadio
4//----------------------------------------------------------------
5
6
7#include "brfitsrd.h"
8
9#include <stdlib.h>
10#include <unistd.h>
11#include <fstream>
12#include <exception>
13
14#include "pexceptions.h"
15#include "timestamp.h"
16#include "ctimer.h"
17
18#include "brpaqu.h"
19
20#include "resusage.h" // Pour mesure temps elapsed/CPU ...
21#include <sys/time.h> // pour gettimeofday
22
23using namespace SOPHYA;
24
25//---------------------------------------------------------------------
26// Classe thread de lecture de Multi-fibres de fichiers FITS BAORadio
27//---------------------------------------------------------------------
28
29/* --Methode-- */
30BRMultiFitsReader::BRMultiFitsReader(RAcqMemZoneMgr& mem, vector<string>& dirs, bool rdsamefc,
31 uint_4 imin, uint_4 imax, uint_4 istep)
32 : memgr_(mem), dirs_(dirs), stop_(false), rdsamefc_(rdsamefc), imin_(imin), imax_(imax), istep_(istep)
33{
34 SetPrintLevel();
35 totnbytesrd_ = 0;
36 totsamefc_ = 0;
37 if (memgr_.NbFibres() > MAXANAFIB)
38 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ NbFibres>MAXANAFIB ");
39 if (dirs_.size() != memgr_.NbFibres())
40 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ NbFibres != Nb Data Directories");
41
42 packsize_=memgr_.PaqSize();
43 mid_=-2;
44 mmbuf_=NULL;
45 max_targ_npaq = memgr_.NbPaquets();
46 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) mmbufib_[fib]=NULL;
47
48 char flnm[1024];
49 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
50 sprintf(flnm,"%s/signal%d.fits",dirs_[fib].c_str(),imin_);
51 mff_[fib].Open(flnm, MF_Read);
52 if (mff_[fib].NAxis1() != memgr_.PaqSize()) {
53 cout << " BRMultiFitsReader::BRMultiFitsReader/ fib=" << fib << " File=" << flnm <<
54 " NAxis1()= " << mff_[fib].NAxis1() << " <> PaqSize()=" << memgr_.PaqSize() << endl;
55 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ mff.NAxis1() != memgr_.PaqSize() ");
56 }
57 // Extraction de qques parametres utiles depuis les fichiers FITS
58 string fkvs;
59 cpaqdeltatime_=0.;
60 if (fib==0) {
61 fkvs=mff_[fib].GetKeyValue("DATEOBS");
62 if (fkvs.length()>0) cdateobs_.Set(fkvs);
63 fkvs=mff_[fib].GetKeyValue("TMSTART");
64 if (fkvs.length()>0) {
65 ctmstart_.Set(fkvs);
66 cout << " BRMultiFitsReader/First file (for fiber 0) TMSTART=" << fkvs << endl;
67 fkvs=mff_[fib].GetKeyValue("TMEND");
68 SOPHYA::TimeStamp tmend_=ctmstart_;
69 if (fkvs.length()>0) tmend_.Set(fkvs);
70 cpaqdeltatime_=((double)(tmend_.DaysPart()-cdateobs_.DaysPart())*86400.+
71 (tmend_.SecondsPart()-cdateobs_.SecondsPart()))/(double)mff_[fib].NAxis2();
72 }
73 }
74 fkvs=mff_[fib].GetKeyValue("FIBERNUM");
75 memgr_.FiberId(fib) = atoi( fkvs.c_str() );
76
77 vfilenum_.push_back(imin_);
78 vfpos_.push_back(0);
79 vpaq_.push_back(BRPaquet(NULL,memgr_.PaqSize()));
80 vpchk_.push_back(BRPaqChecker(true,0));
81 curfc_.push_back(0);
82 totnpqrd_.push_back(0);
83 totnpqok_.push_back(0);
84 }
85}
86
87
88/* --Methode-- */
89void BRMultiFitsReader::run()
90{
91 setRC(1);
92 try {
93 TimeStamp ts;
94 Timer tm("BRMultiFitsReader", false);
95 cout << " BRMultiFitsReader::run() - Starting " << ts << " NbFibres()=" << memgr_.NbFibres()
96 << " PaqSize() = " << memgr_.PaqSize() << endl;
97 cout << " ...ReadMode: " << ((rdsamefc_)?"Paquets With SameFrameCounter":"All OK paquets")
98 << " signalII.fits IMin=" << imin_ << " IMax=" << imax_ << " IStep=" << istep_ << endl;
99
100 uint_8 prtcnt=0;
101 Byte* nextpaq=NULL;
102 bool fgok=true;
103 while (fgok) {
104 if (stop_) break;
105 if ( MoveToNextTarget() ) {
106 cout << "BRMultiFitsReader::run()/Error-A- MoveToNextTarget() returned true ->STOP 9" << endl;
107 setRC(7); fgok=false; break;
108 }
109 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
110 nextpaq=GetPaquetTarget(fib);
111 if (nextpaq == NULL) { // Cela ne devrait pas arriver
112 cout << "BRMultiFitsReader::run()/Error-A2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl;
113 setRC(9); fgok=false; break;
114 }
115 vpaq_[fib].Set(nextpaq);
116 }
117 if (ReadNextAllFibers()) { fgok=false; break; }
118 prtcnt++;
119 if ((prtlev_>0)&&(prtcnt%prtmodulo_==0)) {
120 cout << "BRMultiFitsReader: NbPaqMFRead=" << prtcnt << " NSameFC="
121 << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0]
122 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
123 }
124 }
125
126 MoveToNextTarget(); // Pour faire traiter le dernier paquet si plein
127 MZoneManage(true); // Nettoyage final
128
129 cout << " ------------------ BRMultiFitsReader::run() END ----------------- " << endl;
130 ts.SetNow();
131 tm.SplitQ();
132 cout << "BRMultiFitsReader::run(): END reading : " << ts << endl;
133 cout << "... NbPaqMFRead=" << prtcnt << " NSameFC="
134 << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0]
135 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
136
137 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
138 int perc=0;
139 if (totnpqrd_[fib]>0) perc=100*totsamefc_/totnpqrd_[fib];
140 cout << " Fiber" << fib << " TotNPaqRd=" << totnpqrd_[fib] << " TotNPaqOK=" << totnpqok_[fib]
141 << " FracSameFC=" << perc << " %" << endl;
142 if (prtlev_ > 0) vpchk_[fib].Print(cout);
143 }
144 cout << " TotalDiskRead= " << totnbytesrd_/(1024*1024) << " MBytes Disk-Read rate= "
145 << (double)(totnbytesrd_)/1024./tm.PartialElapsedTimems() << " MB/s" << endl;
146 cout << " BRMultiFitsReader::run()/Timing: \n";
147 tm.Print();
148 cout << " ---------------------------------------------------------- " << endl;
149
150 usleep(50000); // Attente de traitement du dernier paquet
151 memgr_.Stop(); // Arret
152
153 } // Fin du bloc try
154 catch (std::exception& exc) {
155 cout << " BRMultiFitsReader::run()/catched execption msg= " << exc.what() << endl;
156 setRC(3);
157 return;
158 }
159 catch(...) {
160 cout << " BRMultiFitsReader::run()/catched unknown ... exception " << endl;
161 setRC(4);
162 return;
163 }
164 setRC(0);
165 return;
166}
167
168/* --Methode-- */
169bool BRMultiFitsReader::ReadNextAllFibers()
170{
171 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
172 if (ReadNext(fib)) return true; // probleme
173 }
174 if (!rdsamefc_ || (memgr_.NbFibres()<2)) {
175 uint_8 cfc=curfc_[0];
176 bool fgsamefc=true;
177 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
178 if (curfc_[fib]!=cfc) fgsamefc=false;
179 }
180 if (fgsamefc) totsamefc_++;
181 return false; // c'est OK
182 }
183 // On va essayer de lire jusqu'a avoir same_frame_counter
184 bool echec=true;
185 while (echec) {
186 uint_8 cfc=curfc_[0];
187 bool fgsamefc=true;
188 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
189 if (curfc_[fib]!=cfc) {
190 fgsamefc=false;
191 if (curfc_[fib] > cfc) cfc=curfc_[fib];
192 }
193 }
194 if (fgsamefc) {
195 totsamefc_++; echec=false; return false; // c'est OK , same framecounter
196 }
197 else { // else !fgsame
198 for(uint_4 fib=0; fib<memgr_.NbFibres(); fib++) {
199 while (curfc_[fib]<cfc) {
200 if (ReadNext(fib)) return true; // probleme
201 }
202 }
203 } // fin de else !fgsame
204 } // fin de while(echec): lecture jusqu'a same_frame_counter
205
206 return true; // probleme
207}
208
209/* --Methode-- */
210bool BRMultiFitsReader::ReadNext(int fib)
211{
212 if (!mff_[fib].IsOpen()) return true;
213 bool fggood=false;
214 while(!fggood) {
215 if (vfpos_[fib] >= mff_[fib].NAxis2()) {
216 mff_[fib].Close();
217 vfilenum_[fib]++;
218 if (vfilenum_[fib]>imax_) return true;
219 char flnm[1024];
220 sprintf(flnm,"%s/signal%d.fits",dirs_[fib].c_str(),vfilenum_[fib]);
221 if (prtlev_ > 1)
222 cout << " BRMultiFitsReader::ReadNext() opening" << flnm << endl;
223 mff_[fib].Open(flnm, MF_Read);
224 if (mff_[fib].NAxis1() != packsize_) {
225 cout << " BRMultiFitsReader::ReadNext(fib=" << fib << " File=" << flnm <<
226 " NAxis1()= " << mff_[fib].NAxis1() << " <> PaqSize()=" << packsize_ << endl;
227 throw BAORadioException("BRMultiFitsReader::ReadNext()/ mff.NAxis1() != memgr_.PaqSize() ");
228 }
229 if (fib==0) { // updating current date from file (fiber 0)
230 string fkvs=mff_[fib].GetKeyValue("DATEOBS");
231 if (fkvs.length()>0) cdateobs_.Set(fkvs);
232 fkvs=mff_[fib].GetKeyValue("TMSTART");
233 if (fkvs.length()>0) {
234 ctmstart_.Set(fkvs);
235 cout << " BRMultiFitsReader/First file (for fiber 0) TMSTART=" << fkvs << endl;
236 fkvs=mff_[fib].GetKeyValue("TMEND");
237 SOPHYA::TimeStamp tmend_=ctmstart_;
238 if (fkvs.length()>0) tmend_.Set(fkvs);
239 cpaqdeltatime_=((double)(tmend_.DaysPart()-cdateobs_.DaysPart())*86400.+
240 (tmend_.SecondsPart()-cdateobs_.SecondsPart()))/(double)mff_[fib].NAxis2();
241 }
242 }
243 vfpos_[fib]=0;
244 }
245 mff_[fib].ReadB(vpaq_[fib].Begin(), packsize_, vfpos_[fib]*packsize_);
246 vfpos_[fib]++;
247 totnbytesrd_+=packsize_;
248 totnpqrd_[fib]++;
249 fggood = vpchk_[fib].Check(vpaq_[fib],curfc_[fib]);
250 }
251 totnpqok_[fib]++;
252 return false;
253}
254
255/* --Methode-- */
256bool BRMultiFitsReader::MZoneManage(bool fgclean) // Retourne true si probleme
257{
258 /* Pour debug
259 cout << " BRMultiFitsReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_
260 << " max_targ_npaq=" << max_targ_npaq << endl;
261 */
262 if (mid_ >= 0) {
263 if (fgclean) memgr_.FreeMemZone(mid_, MemZS_Free);
264 else memgr_.FreeMemZone(mid_, MemZS_Filled);
265 }
266 mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2;
267 for (int fib=0;fib<(int)memgr_.NbFibres() ;fib++) mmbufib_[fib]=NULL;
268 if (fgclean) return false;
269 mid_ = memgr_.FindMemZoneId(MemZA_Fill);
270 mmbuf_ = memgr_.GetMemZone(mid_);
271 if (mmbuf_==NULL) return true;
272 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++)
273 mmbufib_[fib]=memgr_.GetMemZone(mid_,fib);
274 // Definition temps pour la zone a remplir
275 memgr_.GetAuxData(mid_)->FillTime().Set(ctmstart_.ToDays()+cpaqdeltatime_*(double)vfpos_[0]/86400.);
276 return false;
277}
278
279//-------------------------------------------------------
280// Classe thread de lecture de fichiers fits BAORadio
281//-------------------------------------------------------
282
283/* --Methode-- */
284BRFitsReader::BRFitsReader(RAcqMemZoneMgr& mem, vector<string>& infiles, bool fgnotrl)
285 : memgr(mem), infiles_(infiles), fgnotrl_(fgnotrl)
286{
287}
288
289/* --Methode-- */
290void BRFitsReader::run()
291{
292 setRC(1);
293
294 try {
295 TimeStamp ts;
296 Timer tm("BRFitsReader", false);
297 BRPaqChecker pcheck(!fgnotrl_); // Verification/comptage des paquets
298
299 size_t totnbytesrd = 0;
300 cout << " BRFitsReader::run() - Starting " << ts << " NbFiles=" << infiles_.size()
301 << " memgr.PaqSize() = " << memgr.PaqSize() << endl;
302
303 uint_4 nfileok = 0;
304 uint_8 nbytesrd = 0;
305 /* Variables pour la logique des zones memoire et numeros de paquets dans la zone memoire */
306 int mid = -2;
307 Byte* buff = NULL;
308 int kmp = 0;
309 int kmpmax=memgr.NbPaquets();
310
311 int paqsz = 0;
312 for(int ifile=0; ifile<infiles_.size(); ifile++) {
313 string ffname = infiles_[ifile];
314// -------------- Lecture de bytes
315 cout << "BRFitsReader::run() [" << ifile <<"]Ouverture/lecture fichier " << ffname << endl;
316 MiniFITSFile mff(ffname, MF_Read);
317 cout << "... Type=" << mff.DataTypeToString() << " NAxis1=" << mff.NAxis1()
318 << " NAxis2=" << mff.NAxis2() << endl;
319 if (mff.DataType() != MF_Byte) {
320 cout << "BRFitsReader::run() PB : DataType!=MF_Byte --> skipping " << endl;
321 continue;
322 }
323// Les fichier FITS contiennent l'entet (24 bytes), mais pas le trailer (16 bytes) si fgnotrl=true
324 int incpaqsz=0;
325 if (fgnotrl_) {
326 incpaqsz=16;
327 if (ifile==0) cout << " Warning : FITS files without frame trailers ..." << endl;
328 }
329 if (paqsz == 0) { // premier passage, on fixe la taille de paquet et on verifie compatibilite avec memgr
330 paqsz = mff.NAxis1()+incpaqsz;
331 if (paqsz != memgr.PaqSize()) {
332 cout << "BRFitsReader::run() mff.NAxis1() incompatible with memgr.PaqSize() -> exception " << endl;
333 throw SzMismatchError(" fits file size incompatible with memgr.PaqSize()");
334 }
335 }
336 else {
337 if (paqsz != mff.NAxis1()+incpaqsz) {
338 cout << " PB : paqsz=" << paqsz << " != mff.NAxis1()+" << incpaqsz << " --> skipping " << endl;
339 continue;
340 }
341 }
342 if (mid < 0) {
343 mid = memgr.FindMemZoneId(MemZA_Fill);
344 buff = memgr.GetMemZone(mid);
345 if (buff == NULL) {
346 cout << " BRFitsReader::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
347 setRC(2);
348 return;
349 }
350 kmp=0;
351 }
352 size_t sx = mff.NAxis1();
353 size_t sy = mff.NAxis2();
354 int nprt=0;
355 for(int j=0; j<sy; j++) {
356 mff.ReadB(buff+kmp*paqsz, sx, j*sx);
357 BRPaquet paq(NULL, buff+kmp*paqsz, paqsz);
358 bool pqok = pcheck.Check(paq); // Verification du paquet / FrameCounter
359 if (!pqok && (nprt < 10)) {
360 cout << "--BUG-- i=" << ifile << " mid=" << mid << " j=" << j << " kmp=" << kmp
361 << " paqsz=" << paqsz << endl;
362 nprt++;
363 paq.Print();
364 }
365 kmp++;
366 if (kmp >= kmpmax) { // Zone memoire rempli !
367 memgr.FreeMemZone(mid, MemZS_Filled);
368 mid = -2;
369 if (j<sy) {
370 mid = memgr.FindMemZoneId(MemZA_Fill);
371 buff = memgr.GetMemZone(mid);
372 if (buff == NULL) {
373 cout << " BRFitsReader::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
374 setRC(2);
375 return;
376 }
377 kmp=0;
378 }
379 }
380 }
381 nfileok++;
382 size_t nbytesrd = sx*sy;
383 totnbytesrd += nbytesrd;
384 } // Fin de la boucle sur les fichiers
385// Gestion d'une zone partiellement remplie
386 if (mid>=0) {
387 for(int k=kmp;k<kmpmax;k++) {
388 Byte* bp=buff+k*paqsz;
389 for(int l=0;l<paqsz;l++) bp[l]=0;
390 }
391 memgr.FreeMemZone(mid, MemZS_Filled);
392 }
393
394// sprintf(fname,"%s/.log",path_.c_str());
395// ofstream filog(fname);
396// filog << " DataProc::run() - starting log file " << ts << endl;
397// filog << " NbFiles=" << nfiles_ << " NBloc/File=" << nblocperfile_ << " NMaxMemZones=" << nmax_ << endl;
398
399
400
401 cout << " ------------------ BRFitsReader::run() END ----------------- " << endl;
402 ts.SetNow();
403 tm.SplitQ();
404 cout << " END reading " << ts << " NFileOK=" << nfileok << endl;
405 cout << " TotalDiskRead= " << totnbytesrd/(1024*1024) << " MBytes Disk-Read rate= "
406 << (double)(totnbytesrd)/1024./tm.PartialElapsedTimems() << " MB/s" << endl;
407 pcheck.Print(cout);
408 cout << " BRFitsReader::run()/Timing: \n";
409 tm.Print();
410 cout << " ---------------------------------------------------------- " << endl;
411
412 } // Fin du bloc try
413 catch (MiniFITSException& exc) {
414 cout << " BRFitsReader::run()/catched MiniFITSException " << exc.Msg() << endl;
415 setRC(3);
416 return;
417 }
418 catch(...) {
419 cout << " BRFitsReader::run()/catched unknown ... exception " << endl;
420 setRC(4);
421 return;
422 }
423 setRC(0);
424 return;
425}
Note: See TracBrowser for help on using the repository browser.