source: Sophya/trunk/AddOn/TAcq/brfitsrd.cc@ 3966

Last change on this file since 3966 was 3956, checked in by ansari, 15 years ago

Amelioration du processeur de calcul de visibilite (BRVisibilityCalculator) et du programme vismfib.cc pour permettre la prise en charge des donnees raw-2c pour le calcul des visibilites et ajout de la possibilite d ecrire les fichiers de sortie (matrices de visibilites) au format FITS, Reza 02/03/2011

File size: 14.8 KB
Line 
1//----------------------------------------------------------------
2// Projet BAORadio - (C) LAL/IRFU 2008-2011
3// Classes de threads pour lecture fichiers fits BAORadio
4//----------------------------------------------------------------
5
6
7#include "brfitsrd.h"
8
9#include <stdlib.h>
10#include <unistd.h>
11#include <fstream>
12#include <exception>
13
14#include "pexceptions.h"
15#include "timestamp.h"
16#include "ctimer.h"
17
18#include "brpaqu.h"
19
20#include "resusage.h" // Pour mesure temps elapsed/CPU ...
21#include <sys/time.h> // pour gettimeofday
22
23using namespace SOPHYA;
24
25//---------------------------------------------------------------------
26// Classe thread de lecture de Multi-fibres de fichiers FITS BAORadio
27//---------------------------------------------------------------------
28
29/* --Methode-- */
30BRMultiFitsReader::BRMultiFitsReader(RAcqMemZoneMgr& mem, vector<string>& dirs, bool rdsamefc,
31 uint_4 imin, uint_4 imax, uint_4 istep)
32 : memgr_(mem), dirs_(dirs), stop_(false), rdsamefc_(rdsamefc), imin_(imin), imax_(imax), istep_(istep)
33{
34 SetPrintLevel();
35 totnbytesrd_ = 0;
36 totsamefc_ = 0;
37 if (memgr_.NbFibres() > MAXANAFIB)
38 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ NbFibres>MAXANAFIB ");
39 if (dirs_.size() != memgr_.NbFibres())
40 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ NbFibres != Nb Data Directories");
41
42 packsize_=memgr_.PaqSize();
43 mid_=-2;
44 mmbuf_=NULL;
45 max_targ_npaq = memgr_.NbPaquets();
46 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) mmbufib_[fib]=NULL;
47
48 char flnm[1024];
49 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
50 sprintf(flnm,"%s/signal%d.fits",dirs_[fib].c_str(),imin_);
51 mff_[fib].Open(flnm, MF_Read);
52 if (mff_[fib].NAxis1() != memgr_.PaqSize()) {
53 cout << " BRMultiFitsReader::BRMultiFitsReader/ fib=" << fib << " File=" << flnm <<
54 " NAxis1()= " << mff_[fib].NAxis1() << " <> PaqSize()=" << memgr_.PaqSize() << endl;
55 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ mff.NAxis1() != memgr_.PaqSize() ");
56 }
57 // Extraction de qques parametres utiles depuis les fichiers FITS
58 string fkvs;
59 cpaqdeltatime_=0.;
60 if (fib==0) {
61 fkvs=mff_[fib].GetKeyValue("DATEOBS");
62 if (fkvs.length()>0) cdateobs_.Set(fkvs);
63 fkvs=mff_[fib].GetKeyValue("TMSTART");
64 if (fkvs.length()>0) {
65 ctmstart_.Set(fkvs);
66 cout << " BRMultiFitsReader/First file (for fiber 0) TMSTART=" << fkvs << endl;
67 fkvs=mff_[fib].GetKeyValue("TMEND");
68 SOPHYA::TimeStamp tmend_=ctmstart_;
69 if (fkvs.length()>0) tmend_.Set(fkvs);
70 cpaqdeltatime_=((double)(tmend_.DaysPart()-cdateobs_.DaysPart())*86400.+
71 (tmend_.SecondsPart()-cdateobs_.SecondsPart()))/(double)mff_[fib].NAxis2();
72 }
73 }
74 fkvs=mff_[fib].GetKeyValue("FIBERNUM");
75 memgr_.FiberId(fib) = atoi( fkvs.c_str() );
76
77 vfilenum_.push_back(imin_);
78 vfpos_.push_back(0);
79 vpaq_.push_back(BRPaquet(NULL,memgr_.PaqSize()));
80 vpchk_.push_back(BRPaqChecker(true,0));
81 curfc_.push_back(0);
82 totnpqrd_.push_back(0);
83 totnpqok_.push_back(0);
84 }
85}
86
87
88/* --Methode-- */
89void BRMultiFitsReader::run()
90{
91 setRC(1);
92 try {
93 TimeStamp ts;
94 Timer tm("BRMultiFitsReader", false);
95 cout << " BRMultiFitsReader::run() - Starting " << ts << " NbFibres()=" << memgr_.NbFibres()
96 << " PaqSize() = " << memgr_.PaqSize() << endl;
97 cout << " ...ReadMode: " << ((rdsamefc_)?"Paquets With SameFrameCounter":"All OK paquets")
98 << " signalII.fits IMin=" << imin_ << " IMax=" << imax_ << " IStep=" << istep_ << endl;
99
100 uint_8 prtcnt=0;
101 Byte* nextpaq=NULL;
102 bool fgok=true;
103 while (fgok) {
104 if (stop_) break;
105 if ( MoveToNextTarget() ) {
106 cout << "BRMultiFitsReader::run()/Error-A- MoveToNextTarget() returned true ->STOP 9" << endl;
107 setRC(7); fgok=false; break;
108 }
109 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
110 nextpaq=GetPaquetTarget(fib);
111 if (nextpaq == NULL) { // Cela ne devrait pas arriver
112 cout << "BRMultiFitsReader::run()/Error-A2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl;
113 setRC(9); fgok=false; break;
114 }
115 vpaq_[fib].Set(nextpaq);
116 }
117 if (ReadNextAllFibers()) { fgok=false; break; }
118 prtcnt++;
119 if ((prtlev_>0)&&(prtcnt%prtmodulo_==0)) {
120 cout << "BRMultiFitsReader: NbPaqMFRead=" << prtcnt << " NSameFC="
121 << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0]
122 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
123 }
124 }
125
126 MoveToNextTarget(); // Pour faire traiter le dernier paquet si plein
127 MZoneManage(true); // Nettoyage final
128
129 cout << " ------------------ BRMultiFitsReader::run() END ----------------- " << endl;
130 ts.SetNow();
131 tm.SplitQ();
132 cout << "BRMultiFitsReader::run(): END reading : " << ts << endl;
133 cout << "... NbPaqMFRead=" << prtcnt << " NSameFC="
134 << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0]
135 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
136
137 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
138 int perc=0;
139 if (totnpqrd_[fib]>0) perc=100*totsamefc_/totnpqrd_[fib];
140 cout << " Fiber" << fib << " TotNPaqRd=" << totnpqrd_[fib] << " TotNPaqOK=" << totnpqok_[fib]
141 << " FracSameFC=" << perc << " %" << endl;
142 if (prtlev_ > 0) vpchk_[fib].Print(cout);
143 }
144 cout << " TotalDiskRead= " << totnbytesrd_/(1024*1024) << " MBytes Disk-Read rate= "
145 << (double)(totnbytesrd_)/1024./tm.PartialElapsedTimems() << " MB/s" << endl;
146 cout << " BRMultiFitsReader::run()/Timing: \n";
147 tm.Print();
148 cout << " ---------------------------------------------------------- " << endl;
149
150 usleep(250000); // Attente de traitement du dernier paquet
151 memgr_.Stop(); // Arret
152
153 } // Fin du bloc try
154 catch (std::exception& exc) {
155 cout << " BRMultiFitsReader::run()/catched execption msg= " << exc.what() << endl;
156 setRC(3);
157 return;
158 }
159 catch(...) {
160 cout << " BRMultiFitsReader::run()/catched unknown ... exception " << endl;
161 setRC(4);
162 return;
163 }
164 setRC(0);
165 return;
166}
167
168/* --Methode-- */
169bool BRMultiFitsReader::ReadNextAllFibers()
170{
171 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
172 if (ReadNext(fib)) return true; // probleme
173 }
174 if (!rdsamefc_ || (memgr_.NbFibres()<2)) {
175 uint_8 cfc=curfc_[0];
176 bool fgsamefc=true;
177 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
178 if (curfc_[fib]!=cfc) fgsamefc=false;
179 }
180 if (fgsamefc) totsamefc_++;
181 return false; // c'est OK
182 }
183 // On va essayer de lire jusqu'a avoir same_frame_counter
184 bool echec=true;
185 while (echec) {
186 uint_8 cfc=curfc_[0];
187 bool fgsamefc=true;
188 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
189 if (curfc_[fib]!=cfc) {
190 fgsamefc=false;
191 if (curfc_[fib] > cfc) cfc=curfc_[fib];
192 }
193 }
194 if (fgsamefc) {
195 totsamefc_++; echec=false; return false; // c'est OK , same framecounter
196 }
197 else { // else !fgsame
198 for(uint_4 fib=0; fib<memgr_.NbFibres(); fib++) {
199 while (curfc_[fib]<cfc) {
200 if (ReadNext(fib)) return true; // probleme
201 }
202 }
203 } // fin de else !fgsame
204 } // fin de while(echec): lecture jusqu'a same_frame_counter
205
206 return true; // probleme
207}
208
209/* --Methode-- */
210bool BRMultiFitsReader::ReadNext(int fib)
211{
212 if (!mff_[fib].IsOpen()) return true;
213 bool fggood=false;
214 while(!fggood) {
215 if (vfpos_[fib] >= mff_[fib].NAxis2()) {
216 mff_[fib].Close();
217 vfilenum_[fib]++;
218 if (vfilenum_[fib]>imax_) return true;
219 char flnm[1024];
220 sprintf(flnm,"%s/signal%d.fits",dirs_[fib].c_str(),vfilenum_[fib]);
221 if (prtlev_ > 1)
222 cout << " BRMultiFitsReader::ReadNext() opening" << flnm << endl;
223 mff_[fib].Open(flnm, MF_Read);
224 if (mff_[fib].NAxis1() != packsize_) {
225 cout << " BRMultiFitsReader::ReadNext(fib=" << fib << " File=" << flnm <<
226 " NAxis1()= " << mff_[fib].NAxis1() << " <> PaqSize()=" << packsize_ << endl;
227 throw BAORadioException("BRMultiFitsReader::ReadNext()/ mff.NAxis1() != memgr_.PaqSize() ");
228 }
229 if (fib==0) { // updating current date from file (fiber 0)
230 string fkvs=mff_[fib].GetKeyValue("DATEOBS");
231 if (fkvs.length()>0) cdateobs_.Set(fkvs);
232 fkvs=mff_[fib].GetKeyValue("TMSTART");
233 if (fkvs.length()>0) {
234 ctmstart_.Set(fkvs);
235 cout << " BRMultiFitsReader/First file (for fiber 0) TMSTART=" << fkvs << endl;
236 fkvs=mff_[fib].GetKeyValue("TMEND");
237 SOPHYA::TimeStamp tmend_=ctmstart_;
238 if (fkvs.length()>0) tmend_.Set(fkvs);
239 cpaqdeltatime_=((double)(tmend_.DaysPart()-cdateobs_.DaysPart())*86400.+
240 (tmend_.SecondsPart()-cdateobs_.SecondsPart()))/(double)mff_[fib].NAxis2();
241 }
242 }
243 vfpos_[fib]=0;
244 }
245 mff_[fib].ReadB(vpaq_[fib].Begin(), packsize_, vfpos_[fib]*packsize_);
246 vfpos_[fib]++;
247 totnbytesrd_+=packsize_;
248 totnpqrd_[fib]++;
249 fggood = vpchk_[fib].Check(vpaq_[fib],curfc_[fib]);
250 }
251 totnpqok_[fib]++;
252 return false;
253}
254
255/* --Methode-- */
256bool BRMultiFitsReader::MZoneManage(bool fgclean) // Retourne true si probleme
257{
258 /* Pour debug
259 cout << " BRMultiFitsReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_
260 << " max_targ_npaq=" << max_targ_npaq << endl;
261 */
262 if (mid_ >= 0) {
263 if (fgclean) memgr_.FreeMemZone(mid_, MemZS_Free);
264 else memgr_.FreeMemZone(mid_, MemZS_Filled);
265 }
266 mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2;
267 for (int fib=0;fib<(int)memgr_.NbFibres() ;fib++) mmbufib_[fib]=NULL;
268 if (fgclean) return false;
269 mid_ = memgr_.FindMemZoneId(MemZA_Fill);
270 mmbuf_ = memgr_.GetMemZone(mid_);
271 if (mmbuf_==NULL) return true;
272 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++)
273 mmbufib_[fib]=memgr_.GetMemZone(mid_,fib);
274 // Definition temps pour la zone a remplir
275 memgr_.GetAuxData(mid_)->FillTime().Set(ctmstart_.ToDays()+cpaqdeltatime_*(double)vfpos_[0]/86400.);
276 return false;
277}
278
279//-------------------------------------------------------
280// Classe thread de lecture de fichiers fits BAORadio
281//-------------------------------------------------------
282
283/* --Methode-- */
284BRFitsReader::BRFitsReader(RAcqMemZoneMgr& mem, vector<string>& infiles, bool fgnotrl)
285 : memgr(mem), infiles_(infiles), fgnotrl_(fgnotrl)
286{
287}
288
289/* --Methode-- */
290void BRFitsReader::run()
291{
292 setRC(1);
293
294 try {
295 TimeStamp ts;
296 Timer tm("BRFitsReader", false);
297 BRPaqChecker pcheck(!fgnotrl_); // Verification/comptage des paquets
298
299 size_t totnbytesrd = 0;
300 cout << " BRFitsReader::run() - Starting " << ts << " NbFiles=" << infiles_.size()
301 << " memgr.PaqSize() = " << memgr.PaqSize() << endl;
302
303 uint_4 nfileok = 0;
304 uint_8 nbytesrd = 0;
305 /* Variables pour la logique des zones memoire et numeros de paquets dans la zone memoire */
306 int mid = -2;
307 Byte* buff = NULL;
308 int kmp = 0;
309 int kmpmax=memgr.NbPaquets();
310
311 int paqsz = 0;
312 for(int ifile=0; ifile<infiles_.size(); ifile++) {
313 string ffname = infiles_[ifile];
314// -------------- Lecture de bytes
315 cout << "BRFitsReader::run() [" << ifile <<"]Ouverture/lecture fichier " << ffname << endl;
316 MiniFITSFile mff(ffname, MF_Read);
317 cout << "... Type=" << mff.DataTypeToString() << " NAxis1=" << mff.NAxis1()
318 << " NAxis2=" << mff.NAxis2() << endl;
319 if (mff.DataType() != MF_Byte) {
320 cout << "BRFitsReader::run() PB : DataType!=MF_Byte --> skipping " << endl;
321 continue;
322 }
323// Les fichier FITS contiennent l'entet (24 bytes), mais pas le trailer (16 bytes) si fgnotrl=true
324 int incpaqsz=0;
325 if (fgnotrl_) {
326 incpaqsz=16;
327 if (ifile==0) cout << " Warning : FITS files without frame trailers ..." << endl;
328 }
329 if (paqsz == 0) { // premier passage, on fixe la taille de paquet et on verifie compatibilite avec memgr
330 paqsz = mff.NAxis1()+incpaqsz;
331 if (paqsz != memgr.PaqSize()) {
332 cout << "BRFitsReader::run() mff.NAxis1() incompatible with memgr.PaqSize() -> exception " << endl;
333 throw SzMismatchError(" fits file size incompatible with memgr.PaqSize()");
334 }
335 }
336 else {
337 if (paqsz != mff.NAxis1()+incpaqsz) {
338 cout << " PB : paqsz=" << paqsz << " != mff.NAxis1()+" << incpaqsz << " --> skipping " << endl;
339 continue;
340 }
341 }
342 if (mid < 0) {
343 mid = memgr.FindMemZoneId(MemZA_Fill);
344 buff = memgr.GetMemZone(mid);
345 if (buff == NULL) {
346 cout << " BRFitsReader::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
347 setRC(2);
348 return;
349 }
350 kmp=0;
351 }
352 size_t sx = mff.NAxis1();
353 size_t sy = mff.NAxis2();
354 int nprt=0;
355 for(int j=0; j<sy; j++) {
356 mff.ReadB(buff+kmp*paqsz, sx, j*sx);
357 BRPaquet paq(NULL, buff+kmp*paqsz, paqsz);
358 bool pqok = pcheck.Check(paq); // Verification du paquet / FrameCounter
359 if (!pqok && (nprt < 10)) {
360 cout << "--BUG-- i=" << ifile << " mid=" << mid << " j=" << j << " kmp=" << kmp
361 << " paqsz=" << paqsz << endl;
362 nprt++;
363 paq.Print();
364 }
365 kmp++;
366 if (kmp >= kmpmax) { // Zone memoire rempli !
367 memgr.FreeMemZone(mid, MemZS_Filled);
368 mid = -2;
369 if (j<sy) {
370 mid = memgr.FindMemZoneId(MemZA_Fill);
371 buff = memgr.GetMemZone(mid);
372 if (buff == NULL) {
373 cout << " BRFitsReader::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
374 setRC(2);
375 return;
376 }
377 kmp=0;
378 }
379 }
380 }
381 nfileok++;
382 size_t nbytesrd = sx*sy;
383 totnbytesrd += nbytesrd;
384 } // Fin de la boucle sur les fichiers
385// Gestion d'une zone partiellement remplie
386 if (mid>=0) {
387 for(int k=kmp;k<kmpmax;k++) {
388 Byte* bp=buff+k*paqsz;
389 for(int l=0;l<paqsz;l++) bp[l]=0;
390 }
391 memgr.FreeMemZone(mid, MemZS_Filled);
392 }
393
394// sprintf(fname,"%s/.log",path_.c_str());
395// ofstream filog(fname);
396// filog << " DataProc::run() - starting log file " << ts << endl;
397// filog << " NbFiles=" << nfiles_ << " NBloc/File=" << nblocperfile_ << " NMaxMemZones=" << nmax_ << endl;
398
399
400
401 cout << " ------------------ BRFitsReader::run() END ----------------- " << endl;
402 ts.SetNow();
403 tm.SplitQ();
404 cout << " END reading " << ts << " NFileOK=" << nfileok << endl;
405 cout << " TotalDiskRead= " << totnbytesrd/(1024*1024) << " MBytes Disk-Read rate= "
406 << (double)(totnbytesrd)/1024./tm.PartialElapsedTimems() << " MB/s" << endl;
407 pcheck.Print(cout);
408 cout << " BRFitsReader::run()/Timing: \n";
409 tm.Print();
410 cout << " ---------------------------------------------------------- " << endl;
411
412 } // Fin du bloc try
413 catch (MiniFITSException& exc) {
414 cout << " BRFitsReader::run()/catched MiniFITSException " << exc.Msg() << endl;
415 setRC(3);
416 return;
417 }
418 catch(...) {
419 cout << " BRFitsReader::run()/catched unknown ... exception " << endl;
420 setRC(4);
421 return;
422 }
423 setRC(0);
424 return;
425}
Note: See TracBrowser for help on using the repository browser.