source: Sophya/trunk/AddOn/TAcq/brfitsrd.cc@ 4016

Last change on this file since 4016 was 4016, checked in by ansari, 14 years ago

Ajout de commentaires d'autodocumentation Doxygen, Reza 12/08/2011

File size: 15.5 KB
Line 
1//----------------------------------------------------------------
2// Projet BAORadio - (C) LAL/IRFU 2008-2011
3// Classes de threads pour lecture fichiers fits BAORadio
4//----------------------------------------------------------------
5
6
7#include "brfitsrd.h"
8
9#include <stdlib.h>
10#include <unistd.h>
11#include <fstream>
12#include <exception>
13
14#include "pexceptions.h"
15#include "timestamp.h"
16#include "ctimer.h"
17
18#include "brpaqu.h"
19
20#include "resusage.h" // Pour mesure temps elapsed/CPU ...
21#include <sys/time.h> // pour gettimeofday
22
23using namespace SOPHYA;
24
25//---------------------------------------------------------------------
26// Classe thread de lecture de Multi-fibres de fichiers FITS BAORadio
27//---------------------------------------------------------------------
28/*!
29 \class BRMultiFitsReader
30 \ingroup TAcq
31
32 \brief Read data from FITS files and fills RAcqMemZoneMgr memory zones.
33
34 This class read BRPaquet data from several FITS files. Data can be synchronized between
35 different files using the FrameCounter.
36*/
37
38/* --Methode-- */
39BRMultiFitsReader::BRMultiFitsReader(RAcqMemZoneMgr& mem, vector<string>& dirs, bool rdsamefc,
40 uint_4 imin, uint_4 imax, uint_4 istep)
41 : memgr_(mem), dirs_(dirs), stop_(false), rdsamefc_(rdsamefc), imin_(imin), imax_(imax), istep_(istep)
42{
43 SetPrintLevel();
44 totnbytesrd_ = 0;
45 totsamefc_ = 0;
46 if (memgr_.NbFibres() > MAXANAFIB)
47 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ NbFibres>MAXANAFIB ");
48 if (dirs_.size() != memgr_.NbFibres())
49 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ NbFibres != Nb Data Directories");
50
51 packsize_=memgr_.PaqSize();
52 mid_=-2;
53 mmbuf_=NULL;
54 max_targ_npaq = memgr_.NbPaquets();
55 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) mmbufib_[fib]=NULL;
56
57 cpaqdeltatime_=0.;
58
59 char flnm[1024];
60 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
61 sprintf(flnm,"%s/signal%d.fits",dirs_[fib].c_str(),imin_);
62 mff_[fib].Open(flnm, MF_Read);
63 cout << " BRMultiFitsReader::BRMultiFitsReader() opening " << flnm << endl;
64 if (mff_[fib].NAxis1() != memgr_.PaqSize()) {
65 cout << " BRMultiFitsReader::BRMultiFitsReader/ fib=" << fib << " File=" << flnm <<
66 " NAxis1()= " << mff_[fib].NAxis1() << " <> PaqSize()=" << memgr_.PaqSize() << endl;
67 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ mff.NAxis1() != memgr_.PaqSize() ");
68 }
69 // Extraction de qques parametres utiles depuis les fichiers FITS
70 string fkvs;
71 if (fib==0) {
72 fkvs=mff_[fib].GetKeyValue("DATEOBS");
73 if (fkvs.length()>0) cdateobs_.Set(fkvs);
74 fkvs=mff_[fib].GetKeyValue("TMSTART");
75 if (fkvs.length()>0) {
76 ctmstart_.Set(fkvs);
77 fkvs=mff_[fib].GetKeyValue("TMEND");
78 SOPHYA::TimeStamp tmend_=ctmstart_;
79 if (fkvs.length()>0) tmend_.Set(fkvs);
80 cpaqdeltatime_=((double)(tmend_.DaysPart()-cdateobs_.DaysPart())*86400.+
81 (tmend_.SecondsPart()-cdateobs_.SecondsPart()))/(double)mff_[fib].NAxis2();
82 cout << " BRMultiFitsReader/First file (for fiber 0) TMSTART=" << fkvs << " TMEND-START="
83 << cpaqdeltatime_*(double)mff_[fib].NAxis2() << endl;
84 }
85 }
86 fkvs=mff_[fib].GetKeyValue("FIBERNUM");
87 memgr_.FiberId(fib) = atoi( fkvs.c_str() );
88
89 vfilenum_.push_back(imin_);
90 vfpos_.push_back(0);
91 vpaq_.push_back(BRPaquet(NULL,memgr_.PaqSize()));
92 vpchk_.push_back(BRPaqChecker(true,0));
93 curfc_.push_back(0);
94 totnpqrd_.push_back(0);
95 totnpqok_.push_back(0);
96 }
97}
98
99
100/* --Methode-- */
101void BRMultiFitsReader::run()
102{
103 setRC(1);
104 try {
105 TimeStamp ts;
106 Timer tm("BRMultiFitsReader", false);
107 cout << " BRMultiFitsReader::run() - Starting " << ts << " NbFibres()=" << memgr_.NbFibres()
108 << " PaqSize() = " << memgr_.PaqSize() << endl;
109 cout << " ...ReadMode: " << ((rdsamefc_)?"Paquets With SameFrameCounter":"All OK paquets")
110 << " signalII.fits IMin=" << imin_ << " IMax=" << imax_ << " IStep=" << istep_ << endl;
111
112 uint_8 prtcnt=0;
113 Byte* nextpaq=NULL;
114 bool fgok=true;
115 while (fgok) {
116 if (stop_) break;
117 if ( MoveToNextTarget() ) {
118 cout << "BRMultiFitsReader::run()/Error-A- MoveToNextTarget() returned true ->STOP 9" << endl;
119 setRC(7); fgok=false; break;
120 }
121 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
122 nextpaq=GetPaquetTarget(fib);
123 if (nextpaq == NULL) { // Cela ne devrait pas arriver
124 cout << "BRMultiFitsReader::run()/Error-A2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl;
125 setRC(9); fgok=false; break;
126 }
127 vpaq_[fib].Set(nextpaq);
128 }
129 if (ReadNextAllFibers()) { fgok=false; break; }
130 prtcnt++;
131 if ((prtlev_>0)&&(prtcnt%prtmodulo_==0)) {
132 cout << "BRMultiFitsReader: NbPaqMFRead=" << prtcnt << " NSameFC="
133 << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0]
134 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
135 }
136 }
137
138 MoveToNextTarget(); // Pour faire traiter le dernier paquet si plein
139 MZoneManage(true); // Nettoyage final
140
141 cout << " ------------------ BRMultiFitsReader::run() END ----------------- " << endl;
142 ts.SetNow();
143 tm.SplitQ();
144 cout << "BRMultiFitsReader::run(): END reading : " << ts << endl;
145 cout << "... NbPaqMFRead=" << prtcnt << " NSameFC="
146 << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0]
147 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
148
149 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
150 int perc=0;
151 if (totnpqrd_[fib]>0) perc=100*totsamefc_/totnpqrd_[fib];
152 cout << " Fiber" << fib << " TotNPaqRd=" << totnpqrd_[fib] << " TotNPaqOK=" << totnpqok_[fib]
153 << " FracSameFC=" << perc << " %" << endl;
154 if (prtlev_ > 0) vpchk_[fib].Print(cout);
155 }
156 cout << " TotalDiskRead= " << totnbytesrd_/(1024*1024) << " MBytes Disk-Read rate= "
157 << (double)(totnbytesrd_)/1024./tm.PartialElapsedTimems() << " MB/s" << endl;
158 cout << " BRMultiFitsReader::run()/Timing: \n";
159 tm.Print();
160 cout << " ---------------------------------------------------------- " << endl;
161
162 usleep(250000); // Attente de traitement du dernier paquet
163 memgr_.Stop(); // Arret
164
165 } // Fin du bloc try
166 catch (std::exception& exc) {
167 cout << " BRMultiFitsReader::run()/catched execption msg= " << exc.what() << endl;
168 setRC(3);
169 return;
170 }
171 catch(...) {
172 cout << " BRMultiFitsReader::run()/catched unknown ... exception " << endl;
173 setRC(4);
174 return;
175 }
176 setRC(0);
177 return;
178}
179
180/* --Methode-- */
181bool BRMultiFitsReader::ReadNextAllFibers()
182{
183 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
184 if (ReadNext(fib)) return true; // probleme
185 }
186 if (!rdsamefc_ || (memgr_.NbFibres()<2)) {
187 uint_8 cfc=curfc_[0];
188 bool fgsamefc=true;
189 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
190 if (curfc_[fib]!=cfc) fgsamefc=false;
191 }
192 if (fgsamefc) totsamefc_++;
193 return false; // c'est OK
194 }
195 // On va essayer de lire jusqu'a avoir same_frame_counter
196 bool echec=true;
197 while (echec) {
198 uint_8 cfc=curfc_[0];
199 bool fgsamefc=true;
200 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
201 if (curfc_[fib]!=cfc) {
202 fgsamefc=false;
203 if (curfc_[fib] > cfc) cfc=curfc_[fib];
204 }
205 }
206 if (fgsamefc) {
207 totsamefc_++; echec=false; return false; // c'est OK , same framecounter
208 }
209 else { // else !fgsame
210 for(uint_4 fib=0; fib<memgr_.NbFibres(); fib++) {
211 while (curfc_[fib]<cfc) {
212 if (ReadNext(fib)) return true; // probleme
213 }
214 }
215 } // fin de else !fgsame
216 } // fin de while(echec): lecture jusqu'a same_frame_counter
217
218 return true; // probleme
219}
220
221/* --Methode-- */
222bool BRMultiFitsReader::ReadNext(int fib)
223{
224 if (!mff_[fib].IsOpen()) return true;
225 bool fggood=false;
226 while(!fggood) {
227 if (vfpos_[fib] >= mff_[fib].NAxis2()) {
228 mff_[fib].Close();
229 vfilenum_[fib]+=istep_;
230 if (vfilenum_[fib]>imax_) return true;
231 char flnm[1024];
232 sprintf(flnm,"%s/signal%d.fits",dirs_[fib].c_str(),vfilenum_[fib]);
233 if (prtlev_ > 1)
234 cout << " BRMultiFitsReader::ReadNext() opening" << flnm << endl;
235 mff_[fib].Open(flnm, MF_Read);
236 if (mff_[fib].NAxis1() != packsize_) {
237 cout << " BRMultiFitsReader::ReadNext(fib=" << fib << " File=" << flnm <<
238 " NAxis1()= " << mff_[fib].NAxis1() << " <> PaqSize()=" << packsize_ << endl;
239 throw BAORadioException("BRMultiFitsReader::ReadNext()/ mff.NAxis1() != memgr_.PaqSize() ");
240 }
241 if (fib==0) { // updating current date from file (fiber 0)
242 string fkvs=mff_[fib].GetKeyValue("DATEOBS");
243 if (fkvs.length()>0) cdateobs_.Set(fkvs);
244 fkvs=mff_[fib].GetKeyValue("TMSTART");
245 if (fkvs.length()>0) {
246 ctmstart_.Set(fkvs);
247 cout << " BRMultiFitsReader::ReadNext TMSTART=" << fkvs << endl;
248 fkvs=mff_[fib].GetKeyValue("TMEND");
249 SOPHYA::TimeStamp tmend_=ctmstart_;
250 if (fkvs.length()>0) tmend_.Set(fkvs);
251 cpaqdeltatime_=((double)(tmend_.DaysPart()-cdateobs_.DaysPart())*86400.+
252 (tmend_.SecondsPart()-cdateobs_.SecondsPart()))/(double)mff_[fib].NAxis2();
253 }
254 }
255 vfpos_[fib]=0;
256 }
257 mff_[fib].ReadB(vpaq_[fib].Begin(), packsize_, vfpos_[fib]*packsize_);
258 vfpos_[fib]++;
259 totnbytesrd_+=packsize_;
260 totnpqrd_[fib]++;
261 fggood = vpchk_[fib].Check(vpaq_[fib],curfc_[fib]);
262 }
263 totnpqok_[fib]++;
264 return false;
265}
266
267/* --Methode-- */
268bool BRMultiFitsReader::MZoneManage(bool fgclean) // Retourne true si probleme
269{
270 /* Pour debug
271 cout << " BRMultiFitsReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_
272 << " max_targ_npaq=" << max_targ_npaq << endl;
273 */
274 if (mid_ >= 0) {
275 if (fgclean) memgr_.FreeMemZone(mid_, MemZS_Free);
276 else memgr_.FreeMemZone(mid_, MemZS_Filled);
277 }
278 mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2;
279 for (int fib=0;fib<(int)memgr_.NbFibres() ;fib++) mmbufib_[fib]=NULL;
280 if (fgclean) return false;
281 mid_ = memgr_.FindMemZoneId(MemZA_Fill);
282 mmbuf_ = memgr_.GetMemZone(mid_);
283 if (mmbuf_==NULL) return true;
284 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++)
285 mmbufib_[fib]=memgr_.GetMemZone(mid_,fib);
286 // Definition temps pour la zone a remplir et numeros de sequence des fichiers en cours de lecture
287 memgr_.GetAuxData(mid_)->FillTime().Set(ctmstart_.ToDays()+cpaqdeltatime_*(double)vfpos_[0]/86400.);
288 memgr_.GetAuxData(mid_)->FileSequenceNumVec()=vfilenum_;
289 return false;
290}
291
292//-------------------------------------------------------
293// Classe thread de lecture de fichiers fits BAORadio
294//-------------------------------------------------------
295/*!
296 \class BRFitsReader
297 \ingroup TAcq
298
299 \brief (Deprecated) Data reader from a single serie of fits files.
300
301 Use BRMultiFitsReader instead.
302*/
303
304/* --Methode-- */
305BRFitsReader::BRFitsReader(RAcqMemZoneMgr& mem, vector<string>& infiles, bool fgnotrl)
306 : memgr(mem), infiles_(infiles), fgnotrl_(fgnotrl)
307{
308}
309
310/* --Methode-- */
311void BRFitsReader::run()
312{
313 setRC(1);
314
315 try {
316 TimeStamp ts;
317 Timer tm("BRFitsReader", false);
318 BRPaqChecker pcheck(!fgnotrl_); // Verification/comptage des paquets
319
320 size_t totnbytesrd = 0;
321 cout << " BRFitsReader::run() - Starting " << ts << " NbFiles=" << infiles_.size()
322 << " memgr.PaqSize() = " << memgr.PaqSize() << endl;
323
324 uint_4 nfileok = 0;
325 uint_8 nbytesrd = 0;
326 /* Variables pour la logique des zones memoire et numeros de paquets dans la zone memoire */
327 int mid = -2;
328 Byte* buff = NULL;
329 int kmp = 0;
330 int kmpmax=memgr.NbPaquets();
331
332 int paqsz = 0;
333 for(int ifile=0; ifile<infiles_.size(); ifile++) {
334 string ffname = infiles_[ifile];
335// -------------- Lecture de bytes
336 cout << "BRFitsReader::run() [" << ifile <<"]Ouverture/lecture fichier " << ffname << endl;
337 MiniFITSFile mff(ffname, MF_Read);
338 cout << "... Type=" << mff.DataTypeToString() << " NAxis1=" << mff.NAxis1()
339 << " NAxis2=" << mff.NAxis2() << endl;
340 if (mff.DataType() != MF_Byte) {
341 cout << "BRFitsReader::run() PB : DataType!=MF_Byte --> skipping " << endl;
342 continue;
343 }
344// Les fichier FITS contiennent l'entet (24 bytes), mais pas le trailer (16 bytes) si fgnotrl=true
345 int incpaqsz=0;
346 if (fgnotrl_) {
347 incpaqsz=16;
348 if (ifile==0) cout << " Warning : FITS files without frame trailers ..." << endl;
349 }
350 if (paqsz == 0) { // premier passage, on fixe la taille de paquet et on verifie compatibilite avec memgr
351 paqsz = mff.NAxis1()+incpaqsz;
352 if (paqsz != memgr.PaqSize()) {
353 cout << "BRFitsReader::run() mff.NAxis1() incompatible with memgr.PaqSize() -> exception " << endl;
354 throw SzMismatchError(" fits file size incompatible with memgr.PaqSize()");
355 }
356 }
357 else {
358 if (paqsz != mff.NAxis1()+incpaqsz) {
359 cout << " PB : paqsz=" << paqsz << " != mff.NAxis1()+" << incpaqsz << " --> skipping " << endl;
360 continue;
361 }
362 }
363 if (mid < 0) {
364 mid = memgr.FindMemZoneId(MemZA_Fill);
365 buff = memgr.GetMemZone(mid);
366 if (buff == NULL) {
367 cout << " BRFitsReader::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
368 setRC(2);
369 return;
370 }
371 kmp=0;
372 }
373 size_t sx = mff.NAxis1();
374 size_t sy = mff.NAxis2();
375 int nprt=0;
376 for(int j=0; j<sy; j++) {
377 mff.ReadB(buff+kmp*paqsz, sx, j*sx);
378 BRPaquet paq(NULL, buff+kmp*paqsz, paqsz);
379 bool pqok = pcheck.Check(paq); // Verification du paquet / FrameCounter
380 if (!pqok && (nprt < 10)) {
381 cout << "--BUG-- i=" << ifile << " mid=" << mid << " j=" << j << " kmp=" << kmp
382 << " paqsz=" << paqsz << endl;
383 nprt++;
384 paq.Print();
385 }
386 kmp++;
387 if (kmp >= kmpmax) { // Zone memoire rempli !
388 memgr.FreeMemZone(mid, MemZS_Filled);
389 mid = -2;
390 if (j<sy) {
391 mid = memgr.FindMemZoneId(MemZA_Fill);
392 buff = memgr.GetMemZone(mid);
393 if (buff == NULL) {
394 cout << " BRFitsReader::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
395 setRC(2);
396 return;
397 }
398 kmp=0;
399 }
400 }
401 }
402 nfileok++;
403 size_t nbytesrd = sx*sy;
404 totnbytesrd += nbytesrd;
405 } // Fin de la boucle sur les fichiers
406// Gestion d'une zone partiellement remplie
407 if (mid>=0) {
408 for(int k=kmp;k<kmpmax;k++) {
409 Byte* bp=buff+k*paqsz;
410 for(int l=0;l<paqsz;l++) bp[l]=0;
411 }
412 memgr.FreeMemZone(mid, MemZS_Filled);
413 }
414
415// sprintf(fname,"%s/.log",path_.c_str());
416// ofstream filog(fname);
417// filog << " DataProc::run() - starting log file " << ts << endl;
418// filog << " NbFiles=" << nfiles_ << " NBloc/File=" << nblocperfile_ << " NMaxMemZones=" << nmax_ << endl;
419
420
421
422 cout << " ------------------ BRFitsReader::run() END ----------------- " << endl;
423 ts.SetNow();
424 tm.SplitQ();
425 cout << " END reading " << ts << " NFileOK=" << nfileok << endl;
426 cout << " TotalDiskRead= " << totnbytesrd/(1024*1024) << " MBytes Disk-Read rate= "
427 << (double)(totnbytesrd)/1024./tm.PartialElapsedTimems() << " MB/s" << endl;
428 pcheck.Print(cout);
429 cout << " BRFitsReader::run()/Timing: \n";
430 tm.Print();
431 cout << " ---------------------------------------------------------- " << endl;
432
433 } // Fin du bloc try
434 catch (MiniFITSException& exc) {
435 cout << " BRFitsReader::run()/catched MiniFITSException " << exc.Msg() << endl;
436 setRC(3);
437 return;
438 }
439 catch(...) {
440 cout << " BRFitsReader::run()/catched unknown ... exception " << endl;
441 setRC(4);
442 return;
443 }
444 setRC(0);
445 return;
446}
Note: See TracBrowser for help on using the repository browser.