source: Sophya/trunk/AddOn/TAcq/brfitsrd.cc@ 3969

Last change on this file since 3969 was 3969, checked in by ansari, 14 years ago

correction erreur sur la prise en compte de istep sur les numeros de fichiers a lire par BRMultiFitsReader, Reza 28/03/2011

File size: 14.9 KB
RevLine 
[3635]1//----------------------------------------------------------------
[3939]2// Projet BAORadio - (C) LAL/IRFU 2008-2011
[3683]3// Classes de threads pour lecture fichiers fits BAORadio
[3635]4//----------------------------------------------------------------
5
6
7#include "brfitsrd.h"
8
9#include <stdlib.h>
10#include <unistd.h>
11#include <fstream>
[3683]12#include <exception>
13
[3635]14#include "pexceptions.h"
15#include "timestamp.h"
16#include "ctimer.h"
17
18#include "brpaqu.h"
19
20#include "resusage.h" // Pour mesure temps elapsed/CPU ...
21#include <sys/time.h> // pour gettimeofday
22
[3683]23using namespace SOPHYA;
24
25//---------------------------------------------------------------------
26// Classe thread de lecture de Multi-fibres de fichiers FITS BAORadio
27//---------------------------------------------------------------------
28
29/* --Methode-- */
30BRMultiFitsReader::BRMultiFitsReader(RAcqMemZoneMgr& mem, vector<string>& dirs, bool rdsamefc,
31 uint_4 imin, uint_4 imax, uint_4 istep)
[3697]32 : memgr_(mem), dirs_(dirs), stop_(false), rdsamefc_(rdsamefc), imin_(imin), imax_(imax), istep_(istep)
[3683]33{
34 SetPrintLevel();
35 totnbytesrd_ = 0;
36 totsamefc_ = 0;
37 if (memgr_.NbFibres() > MAXANAFIB)
38 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ NbFibres>MAXANAFIB ");
39 if (dirs_.size() != memgr_.NbFibres())
40 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ NbFibres != Nb Data Directories");
41
42 packsize_=memgr_.PaqSize();
43 mid_=-2;
44 mmbuf_=NULL;
45 max_targ_npaq = memgr_.NbPaquets();
46 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) mmbufib_[fib]=NULL;
47
48 char flnm[1024];
49 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
50 sprintf(flnm,"%s/signal%d.fits",dirs_[fib].c_str(),imin_);
51 mff_[fib].Open(flnm, MF_Read);
[3969]52 cout << " BRMultiFitsReader::BRMultiFitsReader() opening " << flnm << endl;
[3683]53 if (mff_[fib].NAxis1() != memgr_.PaqSize()) {
54 cout << " BRMultiFitsReader::BRMultiFitsReader/ fib=" << fib << " File=" << flnm <<
55 " NAxis1()= " << mff_[fib].NAxis1() << " <> PaqSize()=" << memgr_.PaqSize() << endl;
56 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ mff.NAxis1() != memgr_.PaqSize() ");
57 }
[3909]58 // Extraction de qques parametres utiles depuis les fichiers FITS
59 string fkvs;
[3938]60 cpaqdeltatime_=0.;
[3909]61 if (fib==0) {
62 fkvs=mff_[fib].GetKeyValue("DATEOBS");
[3938]63 if (fkvs.length()>0) cdateobs_.Set(fkvs);
64 fkvs=mff_[fib].GetKeyValue("TMSTART");
65 if (fkvs.length()>0) {
66 ctmstart_.Set(fkvs);
67 cout << " BRMultiFitsReader/First file (for fiber 0) TMSTART=" << fkvs << endl;
68 fkvs=mff_[fib].GetKeyValue("TMEND");
69 SOPHYA::TimeStamp tmend_=ctmstart_;
70 if (fkvs.length()>0) tmend_.Set(fkvs);
71 cpaqdeltatime_=((double)(tmend_.DaysPart()-cdateobs_.DaysPart())*86400.+
72 (tmend_.SecondsPart()-cdateobs_.SecondsPart()))/(double)mff_[fib].NAxis2();
[3909]73 }
74 }
[3938]75 fkvs=mff_[fib].GetKeyValue("FIBERNUM");
[3909]76 memgr_.FiberId(fib) = atoi( fkvs.c_str() );
77
[3683]78 vfilenum_.push_back(imin_);
79 vfpos_.push_back(0);
80 vpaq_.push_back(BRPaquet(NULL,memgr_.PaqSize()));
81 vpchk_.push_back(BRPaqChecker(true,0));
82 curfc_.push_back(0);
83 totnpqrd_.push_back(0);
84 totnpqok_.push_back(0);
85 }
86}
87
88
89/* --Methode-- */
90void BRMultiFitsReader::run()
91{
92 setRC(1);
93 try {
94 TimeStamp ts;
95 Timer tm("BRMultiFitsReader", false);
96 cout << " BRMultiFitsReader::run() - Starting " << ts << " NbFibres()=" << memgr_.NbFibres()
97 << " PaqSize() = " << memgr_.PaqSize() << endl;
98 cout << " ...ReadMode: " << ((rdsamefc_)?"Paquets With SameFrameCounter":"All OK paquets")
99 << " signalII.fits IMin=" << imin_ << " IMax=" << imax_ << " IStep=" << istep_ << endl;
100
[3883]101 uint_8 prtcnt=0;
[3683]102 Byte* nextpaq=NULL;
103 bool fgok=true;
104 while (fgok) {
105 if (stop_) break;
106 if ( MoveToNextTarget() ) {
107 cout << "BRMultiFitsReader::run()/Error-A- MoveToNextTarget() returned true ->STOP 9" << endl;
108 setRC(7); fgok=false; break;
109 }
110 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
111 nextpaq=GetPaquetTarget(fib);
112 if (nextpaq == NULL) { // Cela ne devrait pas arriver
113 cout << "BRMultiFitsReader::run()/Error-A2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl;
114 setRC(9); fgok=false; break;
115 }
116 vpaq_[fib].Set(nextpaq);
117 }
118 if (ReadNextAllFibers()) { fgok=false; break; }
[3883]119 prtcnt++;
120 if ((prtlev_>0)&&(prtcnt%prtmodulo_==0)) {
121 cout << "BRMultiFitsReader: NbPaqMFRead=" << prtcnt << " NSameFC="
122 << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0]
123 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
124 }
[3683]125 }
126
127 MoveToNextTarget(); // Pour faire traiter le dernier paquet si plein
128 MZoneManage(true); // Nettoyage final
129
130 cout << " ------------------ BRMultiFitsReader::run() END ----------------- " << endl;
131 ts.SetNow();
132 tm.SplitQ();
[3883]133 cout << "BRMultiFitsReader::run(): END reading : " << ts << endl;
134 cout << "... NbPaqMFRead=" << prtcnt << " NSameFC="
135 << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0]
136 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
137
[3683]138 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
139 int perc=0;
140 if (totnpqrd_[fib]>0) perc=100*totsamefc_/totnpqrd_[fib];
141 cout << " Fiber" << fib << " TotNPaqRd=" << totnpqrd_[fib] << " TotNPaqOK=" << totnpqok_[fib]
142 << " FracSameFC=" << perc << " %" << endl;
[3699]143 if (prtlev_ > 0) vpchk_[fib].Print(cout);
[3683]144 }
145 cout << " TotalDiskRead= " << totnbytesrd_/(1024*1024) << " MBytes Disk-Read rate= "
146 << (double)(totnbytesrd_)/1024./tm.PartialElapsedTimems() << " MB/s" << endl;
147 cout << " BRMultiFitsReader::run()/Timing: \n";
148 tm.Print();
149 cout << " ---------------------------------------------------------- " << endl;
150
[3956]151 usleep(250000); // Attente de traitement du dernier paquet
[3883]152 memgr_.Stop(); // Arret
153
[3683]154 } // Fin du bloc try
155 catch (std::exception& exc) {
156 cout << " BRMultiFitsReader::run()/catched execption msg= " << exc.what() << endl;
157 setRC(3);
158 return;
159 }
160 catch(...) {
161 cout << " BRMultiFitsReader::run()/catched unknown ... exception " << endl;
162 setRC(4);
163 return;
164 }
165 setRC(0);
166 return;
167}
168
169/* --Methode-- */
170bool BRMultiFitsReader::ReadNextAllFibers()
171{
172 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
173 if (ReadNext(fib)) return true; // probleme
174 }
[3883]175 if (!rdsamefc_ || (memgr_.NbFibres()<2)) {
176 uint_8 cfc=curfc_[0];
177 bool fgsamefc=true;
178 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
179 if (curfc_[fib]!=cfc) fgsamefc=false;
[3683]180 }
[3883]181 if (fgsamefc) totsamefc_++;
182 return false; // c'est OK
[3683]183 }
[3883]184 // On va essayer de lire jusqu'a avoir same_frame_counter
185 bool echec=true;
186 while (echec) {
187 uint_8 cfc=curfc_[0];
188 bool fgsamefc=true;
189 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
190 if (curfc_[fib]!=cfc) {
191 fgsamefc=false;
192 if (curfc_[fib] > cfc) cfc=curfc_[fib];
[3683]193 }
194 }
[3883]195 if (fgsamefc) {
196 totsamefc_++; echec=false; return false; // c'est OK , same framecounter
197 }
198 else { // else !fgsame
199 for(uint_4 fib=0; fib<memgr_.NbFibres(); fib++) {
200 while (curfc_[fib]<cfc) {
201 if (ReadNext(fib)) return true; // probleme
202 }
203 }
204 } // fin de else !fgsame
205 } // fin de while(echec): lecture jusqu'a same_frame_counter
206
207 return true; // probleme
[3683]208}
209
210/* --Methode-- */
211bool BRMultiFitsReader::ReadNext(int fib)
212{
213 if (!mff_[fib].IsOpen()) return true;
214 bool fggood=false;
215 while(!fggood) {
216 if (vfpos_[fib] >= mff_[fib].NAxis2()) {
217 mff_[fib].Close();
[3969]218 vfilenum_[fib]+=istep_;
[3683]219 if (vfilenum_[fib]>imax_) return true;
220 char flnm[1024];
[3700]221 sprintf(flnm,"%s/signal%d.fits",dirs_[fib].c_str(),vfilenum_[fib]);
[3699]222 if (prtlev_ > 1)
[3683]223 cout << " BRMultiFitsReader::ReadNext() opening" << flnm << endl;
224 mff_[fib].Open(flnm, MF_Read);
225 if (mff_[fib].NAxis1() != packsize_) {
226 cout << " BRMultiFitsReader::ReadNext(fib=" << fib << " File=" << flnm <<
227 " NAxis1()= " << mff_[fib].NAxis1() << " <> PaqSize()=" << packsize_ << endl;
228 throw BAORadioException("BRMultiFitsReader::ReadNext()/ mff.NAxis1() != memgr_.PaqSize() ");
229 }
[3909]230 if (fib==0) { // updating current date from file (fiber 0)
231 string fkvs=mff_[fib].GetKeyValue("DATEOBS");
232 if (fkvs.length()>0) cdateobs_.Set(fkvs);
[3938]233 fkvs=mff_[fib].GetKeyValue("TMSTART");
234 if (fkvs.length()>0) {
235 ctmstart_.Set(fkvs);
236 cout << " BRMultiFitsReader/First file (for fiber 0) TMSTART=" << fkvs << endl;
237 fkvs=mff_[fib].GetKeyValue("TMEND");
238 SOPHYA::TimeStamp tmend_=ctmstart_;
239 if (fkvs.length()>0) tmend_.Set(fkvs);
240 cpaqdeltatime_=((double)(tmend_.DaysPart()-cdateobs_.DaysPart())*86400.+
241 (tmend_.SecondsPart()-cdateobs_.SecondsPart()))/(double)mff_[fib].NAxis2();
242 }
[3909]243 }
[3683]244 vfpos_[fib]=0;
245 }
246 mff_[fib].ReadB(vpaq_[fib].Begin(), packsize_, vfpos_[fib]*packsize_);
247 vfpos_[fib]++;
248 totnbytesrd_+=packsize_;
249 totnpqrd_[fib]++;
250 fggood = vpchk_[fib].Check(vpaq_[fib],curfc_[fib]);
251 }
252 totnpqok_[fib]++;
253 return false;
254}
255
256/* --Methode-- */
257bool BRMultiFitsReader::MZoneManage(bool fgclean) // Retourne true si probleme
258{
259 /* Pour debug
260 cout << " BRMultiFitsReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_
261 << " max_targ_npaq=" << max_targ_npaq << endl;
262 */
263 if (mid_ >= 0) {
264 if (fgclean) memgr_.FreeMemZone(mid_, MemZS_Free);
[3686]265 else memgr_.FreeMemZone(mid_, MemZS_Filled);
[3683]266 }
267 mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2;
268 for (int fib=0;fib<(int)memgr_.NbFibres() ;fib++) mmbufib_[fib]=NULL;
269 if (fgclean) return false;
270 mid_ = memgr_.FindMemZoneId(MemZA_Fill);
271 mmbuf_ = memgr_.GetMemZone(mid_);
272 if (mmbuf_==NULL) return true;
273 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++)
274 mmbufib_[fib]=memgr_.GetMemZone(mid_,fib);
[3938]275 // Definition temps pour la zone a remplir
276 memgr_.GetAuxData(mid_)->FillTime().Set(ctmstart_.ToDays()+cpaqdeltatime_*(double)vfpos_[0]/86400.);
[3683]277 return false;
278}
279
[3635]280//-------------------------------------------------------
[3683]281// Classe thread de lecture de fichiers fits BAORadio
[3635]282//-------------------------------------------------------
283
[3683]284/* --Methode-- */
[3635]285BRFitsReader::BRFitsReader(RAcqMemZoneMgr& mem, vector<string>& infiles, bool fgnotrl)
286 : memgr(mem), infiles_(infiles), fgnotrl_(fgnotrl)
287{
288}
[3683]289
290/* --Methode-- */
[3635]291void BRFitsReader::run()
292{
293 setRC(1);
294
295 try {
296 TimeStamp ts;
[3640]297 Timer tm("BRFitsReader", false);
[3646]298 BRPaqChecker pcheck(!fgnotrl_); // Verification/comptage des paquets
[3640]299
[3635]300 size_t totnbytesrd = 0;
301 cout << " BRFitsReader::run() - Starting " << ts << " NbFiles=" << infiles_.size()
302 << " memgr.PaqSize() = " << memgr.PaqSize() << endl;
303
304 uint_4 nfileok = 0;
305 uint_8 nbytesrd = 0;
[3640]306 /* Variables pour la logique des zones memoire et numeros de paquets dans la zone memoire */
307 int mid = -2;
308 Byte* buff = NULL;
309 int kmp = 0;
310 int kmpmax=memgr.NbPaquets();
311
312 int paqsz = 0;
[3635]313 for(int ifile=0; ifile<infiles_.size(); ifile++) {
314 string ffname = infiles_[ifile];
315// -------------- Lecture de bytes
316 cout << "BRFitsReader::run() [" << ifile <<"]Ouverture/lecture fichier " << ffname << endl;
317 MiniFITSFile mff(ffname, MF_Read);
318 cout << "... Type=" << mff.DataTypeToString() << " NAxis1=" << mff.NAxis1()
319 << " NAxis2=" << mff.NAxis2() << endl;
320 if (mff.DataType() != MF_Byte) {
321 cout << "BRFitsReader::run() PB : DataType!=MF_Byte --> skipping " << endl;
322 continue;
323 }
324// Les fichier FITS contiennent l'entet (24 bytes), mais pas le trailer (16 bytes) si fgnotrl=true
325 int incpaqsz=0;
326 if (fgnotrl_) {
327 incpaqsz=16;
[3646]328 if (ifile==0) cout << " Warning : FITS files without frame trailers ..." << endl;
[3635]329 }
[3640]330 if (paqsz == 0) { // premier passage, on fixe la taille de paquet et on verifie compatibilite avec memgr
[3635]331 paqsz = mff.NAxis1()+incpaqsz;
332 if (paqsz != memgr.PaqSize()) {
333 cout << "BRFitsReader::run() mff.NAxis1() incompatible with memgr.PaqSize() -> exception " << endl;
334 throw SzMismatchError(" fits file size incompatible with memgr.PaqSize()");
335 }
336 }
337 else {
338 if (paqsz != mff.NAxis1()+incpaqsz) {
339 cout << " PB : paqsz=" << paqsz << " != mff.NAxis1()+" << incpaqsz << " --> skipping " << endl;
340 continue;
341 }
[3640]342 }
343 if (mid < 0) {
344 mid = memgr.FindMemZoneId(MemZA_Fill);
345 buff = memgr.GetMemZone(mid);
346 if (buff == NULL) {
347 cout << " BRFitsReader::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
348 setRC(2);
349 return;
350 }
351 kmp=0;
[3635]352 }
353 size_t sx = mff.NAxis1();
[3640]354 size_t sy = mff.NAxis2();
355 int nprt=0;
356 for(int j=0; j<sy; j++) {
357 mff.ReadB(buff+kmp*paqsz, sx, j*sx);
358 BRPaquet paq(NULL, buff+kmp*paqsz, paqsz);
359 bool pqok = pcheck.Check(paq); // Verification du paquet / FrameCounter
360 if (!pqok && (nprt < 10)) {
361 cout << "--BUG-- i=" << ifile << " mid=" << mid << " j=" << j << " kmp=" << kmp
362 << " paqsz=" << paqsz << endl;
363 nprt++;
364 paq.Print();
365 }
366 kmp++;
367 if (kmp >= kmpmax) { // Zone memoire rempli !
368 memgr.FreeMemZone(mid, MemZS_Filled);
369 mid = -2;
370 if (j<sy) {
371 mid = memgr.FindMemZoneId(MemZA_Fill);
372 buff = memgr.GetMemZone(mid);
373 if (buff == NULL) {
374 cout << " BRFitsReader::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
375 setRC(2);
376 return;
377 }
378 kmp=0;
379 }
380 }
[3635]381 }
382 nfileok++;
383 size_t nbytesrd = sx*sy;
384 totnbytesrd += nbytesrd;
[3640]385 } // Fin de la boucle sur les fichiers
386// Gestion d'une zone partiellement remplie
387 if (mid>=0) {
388 for(int k=kmp;k<kmpmax;k++) {
389 Byte* bp=buff+k*paqsz;
390 for(int l=0;l<paqsz;l++) bp[l]=0;
391 }
[3635]392 memgr.FreeMemZone(mid, MemZS_Filled);
[3640]393 }
[3635]394
395// sprintf(fname,"%s/.log",path_.c_str());
396// ofstream filog(fname);
397// filog << " DataProc::run() - starting log file " << ts << endl;
398// filog << " NbFiles=" << nfiles_ << " NBloc/File=" << nblocperfile_ << " NMaxMemZones=" << nmax_ << endl;
399
400
401
402 cout << " ------------------ BRFitsReader::run() END ----------------- " << endl;
403 ts.SetNow();
[3640]404 tm.SplitQ();
[3635]405 cout << " END reading " << ts << " NFileOK=" << nfileok << endl;
406 cout << " TotalDiskRead= " << totnbytesrd/(1024*1024) << " MBytes Disk-Read rate= "
407 << (double)(totnbytesrd)/1024./tm.PartialElapsedTimems() << " MB/s" << endl;
408 pcheck.Print(cout);
[3640]409 cout << " BRFitsReader::run()/Timing: \n";
410 tm.Print();
[3635]411 cout << " ---------------------------------------------------------- " << endl;
412
413 } // Fin du bloc try
414 catch (MiniFITSException& exc) {
415 cout << " BRFitsReader::run()/catched MiniFITSException " << exc.Msg() << endl;
416 setRC(3);
417 return;
418 }
419 catch(...) {
420 cout << " BRFitsReader::run()/catched unknown ... exception " << endl;
421 setRC(4);
422 return;
423 }
424 setRC(0);
425 return;
426}
Note: See TracBrowser for help on using the repository browser.