source: Sophya/trunk/AddOn/TAcq/brfitsrd.cc@ 4006

Last change on this file since 4006 was 3979, checked in by ansari, 14 years ago

Ajout definition de fenetre en temps de traitement (appel a Process()) ds BRBaseProcessor et BRAnaParam, FileSequenceNumber (numero signalXXX.fits) pour chaque zone memoire dans RAcqMZAuxData / RAcqMemZoneMgr, Reza 04/05/2011

File size: 15.1 KB
Line 
1//----------------------------------------------------------------
2// Projet BAORadio - (C) LAL/IRFU 2008-2011
3// Classes de threads pour lecture fichiers fits BAORadio
4//----------------------------------------------------------------
5
6
7#include "brfitsrd.h"
8
9#include <stdlib.h>
10#include <unistd.h>
11#include <fstream>
12#include <exception>
13
14#include "pexceptions.h"
15#include "timestamp.h"
16#include "ctimer.h"
17
18#include "brpaqu.h"
19
20#include "resusage.h" // Pour mesure temps elapsed/CPU ...
21#include <sys/time.h> // pour gettimeofday
22
23using namespace SOPHYA;
24
25//---------------------------------------------------------------------
26// Classe thread de lecture de Multi-fibres de fichiers FITS BAORadio
27//---------------------------------------------------------------------
28
29/* --Methode-- */
30BRMultiFitsReader::BRMultiFitsReader(RAcqMemZoneMgr& mem, vector<string>& dirs, bool rdsamefc,
31 uint_4 imin, uint_4 imax, uint_4 istep)
32 : memgr_(mem), dirs_(dirs), stop_(false), rdsamefc_(rdsamefc), imin_(imin), imax_(imax), istep_(istep)
33{
34 SetPrintLevel();
35 totnbytesrd_ = 0;
36 totsamefc_ = 0;
37 if (memgr_.NbFibres() > MAXANAFIB)
38 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ NbFibres>MAXANAFIB ");
39 if (dirs_.size() != memgr_.NbFibres())
40 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ NbFibres != Nb Data Directories");
41
42 packsize_=memgr_.PaqSize();
43 mid_=-2;
44 mmbuf_=NULL;
45 max_targ_npaq = memgr_.NbPaquets();
46 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) mmbufib_[fib]=NULL;
47
48 cpaqdeltatime_=0.;
49
50 char flnm[1024];
51 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
52 sprintf(flnm,"%s/signal%d.fits",dirs_[fib].c_str(),imin_);
53 mff_[fib].Open(flnm, MF_Read);
54 cout << " BRMultiFitsReader::BRMultiFitsReader() opening " << flnm << endl;
55 if (mff_[fib].NAxis1() != memgr_.PaqSize()) {
56 cout << " BRMultiFitsReader::BRMultiFitsReader/ fib=" << fib << " File=" << flnm <<
57 " NAxis1()= " << mff_[fib].NAxis1() << " <> PaqSize()=" << memgr_.PaqSize() << endl;
58 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ mff.NAxis1() != memgr_.PaqSize() ");
59 }
60 // Extraction de qques parametres utiles depuis les fichiers FITS
61 string fkvs;
62 if (fib==0) {
63 fkvs=mff_[fib].GetKeyValue("DATEOBS");
64 if (fkvs.length()>0) cdateobs_.Set(fkvs);
65 fkvs=mff_[fib].GetKeyValue("TMSTART");
66 if (fkvs.length()>0) {
67 ctmstart_.Set(fkvs);
68 fkvs=mff_[fib].GetKeyValue("TMEND");
69 SOPHYA::TimeStamp tmend_=ctmstart_;
70 if (fkvs.length()>0) tmend_.Set(fkvs);
71 cpaqdeltatime_=((double)(tmend_.DaysPart()-cdateobs_.DaysPart())*86400.+
72 (tmend_.SecondsPart()-cdateobs_.SecondsPart()))/(double)mff_[fib].NAxis2();
73 cout << " BRMultiFitsReader/First file (for fiber 0) TMSTART=" << fkvs << " TMEND-START="
74 << cpaqdeltatime_*(double)mff_[fib].NAxis2() << endl;
75 }
76 }
77 fkvs=mff_[fib].GetKeyValue("FIBERNUM");
78 memgr_.FiberId(fib) = atoi( fkvs.c_str() );
79
80 vfilenum_.push_back(imin_);
81 vfpos_.push_back(0);
82 vpaq_.push_back(BRPaquet(NULL,memgr_.PaqSize()));
83 vpchk_.push_back(BRPaqChecker(true,0));
84 curfc_.push_back(0);
85 totnpqrd_.push_back(0);
86 totnpqok_.push_back(0);
87 }
88}
89
90
91/* --Methode-- */
92void BRMultiFitsReader::run()
93{
94 setRC(1);
95 try {
96 TimeStamp ts;
97 Timer tm("BRMultiFitsReader", false);
98 cout << " BRMultiFitsReader::run() - Starting " << ts << " NbFibres()=" << memgr_.NbFibres()
99 << " PaqSize() = " << memgr_.PaqSize() << endl;
100 cout << " ...ReadMode: " << ((rdsamefc_)?"Paquets With SameFrameCounter":"All OK paquets")
101 << " signalII.fits IMin=" << imin_ << " IMax=" << imax_ << " IStep=" << istep_ << endl;
102
103 uint_8 prtcnt=0;
104 Byte* nextpaq=NULL;
105 bool fgok=true;
106 while (fgok) {
107 if (stop_) break;
108 if ( MoveToNextTarget() ) {
109 cout << "BRMultiFitsReader::run()/Error-A- MoveToNextTarget() returned true ->STOP 9" << endl;
110 setRC(7); fgok=false; break;
111 }
112 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
113 nextpaq=GetPaquetTarget(fib);
114 if (nextpaq == NULL) { // Cela ne devrait pas arriver
115 cout << "BRMultiFitsReader::run()/Error-A2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl;
116 setRC(9); fgok=false; break;
117 }
118 vpaq_[fib].Set(nextpaq);
119 }
120 if (ReadNextAllFibers()) { fgok=false; break; }
121 prtcnt++;
122 if ((prtlev_>0)&&(prtcnt%prtmodulo_==0)) {
123 cout << "BRMultiFitsReader: NbPaqMFRead=" << prtcnt << " NSameFC="
124 << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0]
125 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
126 }
127 }
128
129 MoveToNextTarget(); // Pour faire traiter le dernier paquet si plein
130 MZoneManage(true); // Nettoyage final
131
132 cout << " ------------------ BRMultiFitsReader::run() END ----------------- " << endl;
133 ts.SetNow();
134 tm.SplitQ();
135 cout << "BRMultiFitsReader::run(): END reading : " << ts << endl;
136 cout << "... NbPaqMFRead=" << prtcnt << " NSameFC="
137 << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0]
138 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
139
140 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
141 int perc=0;
142 if (totnpqrd_[fib]>0) perc=100*totsamefc_/totnpqrd_[fib];
143 cout << " Fiber" << fib << " TotNPaqRd=" << totnpqrd_[fib] << " TotNPaqOK=" << totnpqok_[fib]
144 << " FracSameFC=" << perc << " %" << endl;
145 if (prtlev_ > 0) vpchk_[fib].Print(cout);
146 }
147 cout << " TotalDiskRead= " << totnbytesrd_/(1024*1024) << " MBytes Disk-Read rate= "
148 << (double)(totnbytesrd_)/1024./tm.PartialElapsedTimems() << " MB/s" << endl;
149 cout << " BRMultiFitsReader::run()/Timing: \n";
150 tm.Print();
151 cout << " ---------------------------------------------------------- " << endl;
152
153 usleep(250000); // Attente de traitement du dernier paquet
154 memgr_.Stop(); // Arret
155
156 } // Fin du bloc try
157 catch (std::exception& exc) {
158 cout << " BRMultiFitsReader::run()/catched execption msg= " << exc.what() << endl;
159 setRC(3);
160 return;
161 }
162 catch(...) {
163 cout << " BRMultiFitsReader::run()/catched unknown ... exception " << endl;
164 setRC(4);
165 return;
166 }
167 setRC(0);
168 return;
169}
170
171/* --Methode-- */
172bool BRMultiFitsReader::ReadNextAllFibers()
173{
174 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
175 if (ReadNext(fib)) return true; // probleme
176 }
177 if (!rdsamefc_ || (memgr_.NbFibres()<2)) {
178 uint_8 cfc=curfc_[0];
179 bool fgsamefc=true;
180 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
181 if (curfc_[fib]!=cfc) fgsamefc=false;
182 }
183 if (fgsamefc) totsamefc_++;
184 return false; // c'est OK
185 }
186 // On va essayer de lire jusqu'a avoir same_frame_counter
187 bool echec=true;
188 while (echec) {
189 uint_8 cfc=curfc_[0];
190 bool fgsamefc=true;
191 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
192 if (curfc_[fib]!=cfc) {
193 fgsamefc=false;
194 if (curfc_[fib] > cfc) cfc=curfc_[fib];
195 }
196 }
197 if (fgsamefc) {
198 totsamefc_++; echec=false; return false; // c'est OK , same framecounter
199 }
200 else { // else !fgsame
201 for(uint_4 fib=0; fib<memgr_.NbFibres(); fib++) {
202 while (curfc_[fib]<cfc) {
203 if (ReadNext(fib)) return true; // probleme
204 }
205 }
206 } // fin de else !fgsame
207 } // fin de while(echec): lecture jusqu'a same_frame_counter
208
209 return true; // probleme
210}
211
212/* --Methode-- */
213bool BRMultiFitsReader::ReadNext(int fib)
214{
215 if (!mff_[fib].IsOpen()) return true;
216 bool fggood=false;
217 while(!fggood) {
218 if (vfpos_[fib] >= mff_[fib].NAxis2()) {
219 mff_[fib].Close();
220 vfilenum_[fib]+=istep_;
221 if (vfilenum_[fib]>imax_) return true;
222 char flnm[1024];
223 sprintf(flnm,"%s/signal%d.fits",dirs_[fib].c_str(),vfilenum_[fib]);
224 if (prtlev_ > 1)
225 cout << " BRMultiFitsReader::ReadNext() opening" << flnm << endl;
226 mff_[fib].Open(flnm, MF_Read);
227 if (mff_[fib].NAxis1() != packsize_) {
228 cout << " BRMultiFitsReader::ReadNext(fib=" << fib << " File=" << flnm <<
229 " NAxis1()= " << mff_[fib].NAxis1() << " <> PaqSize()=" << packsize_ << endl;
230 throw BAORadioException("BRMultiFitsReader::ReadNext()/ mff.NAxis1() != memgr_.PaqSize() ");
231 }
232 if (fib==0) { // updating current date from file (fiber 0)
233 string fkvs=mff_[fib].GetKeyValue("DATEOBS");
234 if (fkvs.length()>0) cdateobs_.Set(fkvs);
235 fkvs=mff_[fib].GetKeyValue("TMSTART");
236 if (fkvs.length()>0) {
237 ctmstart_.Set(fkvs);
238 cout << " BRMultiFitsReader::ReadNext TMSTART=" << fkvs << endl;
239 fkvs=mff_[fib].GetKeyValue("TMEND");
240 SOPHYA::TimeStamp tmend_=ctmstart_;
241 if (fkvs.length()>0) tmend_.Set(fkvs);
242 cpaqdeltatime_=((double)(tmend_.DaysPart()-cdateobs_.DaysPart())*86400.+
243 (tmend_.SecondsPart()-cdateobs_.SecondsPart()))/(double)mff_[fib].NAxis2();
244 }
245 }
246 vfpos_[fib]=0;
247 }
248 mff_[fib].ReadB(vpaq_[fib].Begin(), packsize_, vfpos_[fib]*packsize_);
249 vfpos_[fib]++;
250 totnbytesrd_+=packsize_;
251 totnpqrd_[fib]++;
252 fggood = vpchk_[fib].Check(vpaq_[fib],curfc_[fib]);
253 }
254 totnpqok_[fib]++;
255 return false;
256}
257
258/* --Methode-- */
259bool BRMultiFitsReader::MZoneManage(bool fgclean) // Retourne true si probleme
260{
261 /* Pour debug
262 cout << " BRMultiFitsReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_
263 << " max_targ_npaq=" << max_targ_npaq << endl;
264 */
265 if (mid_ >= 0) {
266 if (fgclean) memgr_.FreeMemZone(mid_, MemZS_Free);
267 else memgr_.FreeMemZone(mid_, MemZS_Filled);
268 }
269 mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2;
270 for (int fib=0;fib<(int)memgr_.NbFibres() ;fib++) mmbufib_[fib]=NULL;
271 if (fgclean) return false;
272 mid_ = memgr_.FindMemZoneId(MemZA_Fill);
273 mmbuf_ = memgr_.GetMemZone(mid_);
274 if (mmbuf_==NULL) return true;
275 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++)
276 mmbufib_[fib]=memgr_.GetMemZone(mid_,fib);
277 // Definition temps pour la zone a remplir et numeros de sequence des fichiers en cours de lecture
278 memgr_.GetAuxData(mid_)->FillTime().Set(ctmstart_.ToDays()+cpaqdeltatime_*(double)vfpos_[0]/86400.);
279 memgr_.GetAuxData(mid_)->FileSequenceNumVec()=vfilenum_;
280 return false;
281}
282
283//-------------------------------------------------------
284// Classe thread de lecture de fichiers fits BAORadio
285//-------------------------------------------------------
286
287/* --Methode-- */
288BRFitsReader::BRFitsReader(RAcqMemZoneMgr& mem, vector<string>& infiles, bool fgnotrl)
289 : memgr(mem), infiles_(infiles), fgnotrl_(fgnotrl)
290{
291}
292
293/* --Methode-- */
294void BRFitsReader::run()
295{
296 setRC(1);
297
298 try {
299 TimeStamp ts;
300 Timer tm("BRFitsReader", false);
301 BRPaqChecker pcheck(!fgnotrl_); // Verification/comptage des paquets
302
303 size_t totnbytesrd = 0;
304 cout << " BRFitsReader::run() - Starting " << ts << " NbFiles=" << infiles_.size()
305 << " memgr.PaqSize() = " << memgr.PaqSize() << endl;
306
307 uint_4 nfileok = 0;
308 uint_8 nbytesrd = 0;
309 /* Variables pour la logique des zones memoire et numeros de paquets dans la zone memoire */
310 int mid = -2;
311 Byte* buff = NULL;
312 int kmp = 0;
313 int kmpmax=memgr.NbPaquets();
314
315 int paqsz = 0;
316 for(int ifile=0; ifile<infiles_.size(); ifile++) {
317 string ffname = infiles_[ifile];
318// -------------- Lecture de bytes
319 cout << "BRFitsReader::run() [" << ifile <<"]Ouverture/lecture fichier " << ffname << endl;
320 MiniFITSFile mff(ffname, MF_Read);
321 cout << "... Type=" << mff.DataTypeToString() << " NAxis1=" << mff.NAxis1()
322 << " NAxis2=" << mff.NAxis2() << endl;
323 if (mff.DataType() != MF_Byte) {
324 cout << "BRFitsReader::run() PB : DataType!=MF_Byte --> skipping " << endl;
325 continue;
326 }
327// Les fichier FITS contiennent l'entet (24 bytes), mais pas le trailer (16 bytes) si fgnotrl=true
328 int incpaqsz=0;
329 if (fgnotrl_) {
330 incpaqsz=16;
331 if (ifile==0) cout << " Warning : FITS files without frame trailers ..." << endl;
332 }
333 if (paqsz == 0) { // premier passage, on fixe la taille de paquet et on verifie compatibilite avec memgr
334 paqsz = mff.NAxis1()+incpaqsz;
335 if (paqsz != memgr.PaqSize()) {
336 cout << "BRFitsReader::run() mff.NAxis1() incompatible with memgr.PaqSize() -> exception " << endl;
337 throw SzMismatchError(" fits file size incompatible with memgr.PaqSize()");
338 }
339 }
340 else {
341 if (paqsz != mff.NAxis1()+incpaqsz) {
342 cout << " PB : paqsz=" << paqsz << " != mff.NAxis1()+" << incpaqsz << " --> skipping " << endl;
343 continue;
344 }
345 }
346 if (mid < 0) {
347 mid = memgr.FindMemZoneId(MemZA_Fill);
348 buff = memgr.GetMemZone(mid);
349 if (buff == NULL) {
350 cout << " BRFitsReader::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
351 setRC(2);
352 return;
353 }
354 kmp=0;
355 }
356 size_t sx = mff.NAxis1();
357 size_t sy = mff.NAxis2();
358 int nprt=0;
359 for(int j=0; j<sy; j++) {
360 mff.ReadB(buff+kmp*paqsz, sx, j*sx);
361 BRPaquet paq(NULL, buff+kmp*paqsz, paqsz);
362 bool pqok = pcheck.Check(paq); // Verification du paquet / FrameCounter
363 if (!pqok && (nprt < 10)) {
364 cout << "--BUG-- i=" << ifile << " mid=" << mid << " j=" << j << " kmp=" << kmp
365 << " paqsz=" << paqsz << endl;
366 nprt++;
367 paq.Print();
368 }
369 kmp++;
370 if (kmp >= kmpmax) { // Zone memoire rempli !
371 memgr.FreeMemZone(mid, MemZS_Filled);
372 mid = -2;
373 if (j<sy) {
374 mid = memgr.FindMemZoneId(MemZA_Fill);
375 buff = memgr.GetMemZone(mid);
376 if (buff == NULL) {
377 cout << " BRFitsReader::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
378 setRC(2);
379 return;
380 }
381 kmp=0;
382 }
383 }
384 }
385 nfileok++;
386 size_t nbytesrd = sx*sy;
387 totnbytesrd += nbytesrd;
388 } // Fin de la boucle sur les fichiers
389// Gestion d'une zone partiellement remplie
390 if (mid>=0) {
391 for(int k=kmp;k<kmpmax;k++) {
392 Byte* bp=buff+k*paqsz;
393 for(int l=0;l<paqsz;l++) bp[l]=0;
394 }
395 memgr.FreeMemZone(mid, MemZS_Filled);
396 }
397
398// sprintf(fname,"%s/.log",path_.c_str());
399// ofstream filog(fname);
400// filog << " DataProc::run() - starting log file " << ts << endl;
401// filog << " NbFiles=" << nfiles_ << " NBloc/File=" << nblocperfile_ << " NMaxMemZones=" << nmax_ << endl;
402
403
404
405 cout << " ------------------ BRFitsReader::run() END ----------------- " << endl;
406 ts.SetNow();
407 tm.SplitQ();
408 cout << " END reading " << ts << " NFileOK=" << nfileok << endl;
409 cout << " TotalDiskRead= " << totnbytesrd/(1024*1024) << " MBytes Disk-Read rate= "
410 << (double)(totnbytesrd)/1024./tm.PartialElapsedTimems() << " MB/s" << endl;
411 pcheck.Print(cout);
412 cout << " BRFitsReader::run()/Timing: \n";
413 tm.Print();
414 cout << " ---------------------------------------------------------- " << endl;
415
416 } // Fin du bloc try
417 catch (MiniFITSException& exc) {
418 cout << " BRFitsReader::run()/catched MiniFITSException " << exc.Msg() << endl;
419 setRC(3);
420 return;
421 }
422 catch(...) {
423 cout << " BRFitsReader::run()/catched unknown ... exception " << endl;
424 setRC(4);
425 return;
426 }
427 setRC(0);
428 return;
429}
Note: See TracBrowser for help on using the repository browser.