source: Sophya/trunk/AddOn/TAcq/brfitsrd.cc@ 3902

Last change on this file since 3902 was 3883, checked in by ansari, 15 years ago

1/ Correction bug de lecture ds BRMultiFitsReader et EthernetReader lorsque lecture avec SameFrameCounter etait demande
2/ Amelioration des impressions ds ces deux classes et ajout possibilite de controle du flag "SameFC" avec les classes de parametres BRParList et BRAnaParam
3/ Nouveaux datacard @ethrforcesamefc et @prtlev pour acquisition

Reza, 23/09/2010

File size: 13.3 KB
Line 
1//----------------------------------------------------------------
2// Projet BAORadio - (C) LAL/IRFU 2008-2010
3// Classes de threads pour lecture fichiers fits BAORadio
4//----------------------------------------------------------------
5
6
7#include "brfitsrd.h"
8
9#include <stdlib.h>
10#include <unistd.h>
11#include <fstream>
12#include <exception>
13
14#include "pexceptions.h"
15#include "timestamp.h"
16#include "ctimer.h"
17
18#include "brpaqu.h"
19
20#include "resusage.h" // Pour mesure temps elapsed/CPU ...
21#include <sys/time.h> // pour gettimeofday
22
23using namespace SOPHYA;
24
25//---------------------------------------------------------------------
26// Classe thread de lecture de Multi-fibres de fichiers FITS BAORadio
27//---------------------------------------------------------------------
28
29/* --Methode-- */
30BRMultiFitsReader::BRMultiFitsReader(RAcqMemZoneMgr& mem, vector<string>& dirs, bool rdsamefc,
31 uint_4 imin, uint_4 imax, uint_4 istep)
32 : memgr_(mem), dirs_(dirs), stop_(false), rdsamefc_(rdsamefc), imin_(imin), imax_(imax), istep_(istep)
33{
34 SetPrintLevel();
35 totnbytesrd_ = 0;
36 totsamefc_ = 0;
37 if (memgr_.NbFibres() > MAXANAFIB)
38 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ NbFibres>MAXANAFIB ");
39 if (dirs_.size() != memgr_.NbFibres())
40 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ NbFibres != Nb Data Directories");
41
42 packsize_=memgr_.PaqSize();
43 mid_=-2;
44 mmbuf_=NULL;
45 max_targ_npaq = memgr_.NbPaquets();
46 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) mmbufib_[fib]=NULL;
47
48 char flnm[1024];
49 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
50 sprintf(flnm,"%s/signal%d.fits",dirs_[fib].c_str(),imin_);
51 mff_[fib].Open(flnm, MF_Read);
52 if (mff_[fib].NAxis1() != memgr_.PaqSize()) {
53 cout << " BRMultiFitsReader::BRMultiFitsReader/ fib=" << fib << " File=" << flnm <<
54 " NAxis1()= " << mff_[fib].NAxis1() << " <> PaqSize()=" << memgr_.PaqSize() << endl;
55 throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ mff.NAxis1() != memgr_.PaqSize() ");
56 }
57 vfilenum_.push_back(imin_);
58 vfpos_.push_back(0);
59 vpaq_.push_back(BRPaquet(NULL,memgr_.PaqSize()));
60 vpchk_.push_back(BRPaqChecker(true,0));
61 curfc_.push_back(0);
62 totnpqrd_.push_back(0);
63 totnpqok_.push_back(0);
64 }
65}
66
67
68/* --Methode-- */
69void BRMultiFitsReader::run()
70{
71 setRC(1);
72 try {
73 TimeStamp ts;
74 Timer tm("BRMultiFitsReader", false);
75 cout << " BRMultiFitsReader::run() - Starting " << ts << " NbFibres()=" << memgr_.NbFibres()
76 << " PaqSize() = " << memgr_.PaqSize() << endl;
77 cout << " ...ReadMode: " << ((rdsamefc_)?"Paquets With SameFrameCounter":"All OK paquets")
78 << " signalII.fits IMin=" << imin_ << " IMax=" << imax_ << " IStep=" << istep_ << endl;
79
80 uint_8 prtcnt=0;
81 Byte* nextpaq=NULL;
82 bool fgok=true;
83 while (fgok) {
84 if (stop_) break;
85 if ( MoveToNextTarget() ) {
86 cout << "BRMultiFitsReader::run()/Error-A- MoveToNextTarget() returned true ->STOP 9" << endl;
87 setRC(7); fgok=false; break;
88 }
89 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
90 nextpaq=GetPaquetTarget(fib);
91 if (nextpaq == NULL) { // Cela ne devrait pas arriver
92 cout << "BRMultiFitsReader::run()/Error-A2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl;
93 setRC(9); fgok=false; break;
94 }
95 vpaq_[fib].Set(nextpaq);
96 }
97 if (ReadNextAllFibers()) { fgok=false; break; }
98 prtcnt++;
99 if ((prtlev_>0)&&(prtcnt%prtmodulo_==0)) {
100 cout << "BRMultiFitsReader: NbPaqMFRead=" << prtcnt << " NSameFC="
101 << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0]
102 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
103 }
104 }
105
106 MoveToNextTarget(); // Pour faire traiter le dernier paquet si plein
107 MZoneManage(true); // Nettoyage final
108
109 cout << " ------------------ BRMultiFitsReader::run() END ----------------- " << endl;
110 ts.SetNow();
111 tm.SplitQ();
112 cout << "BRMultiFitsReader::run(): END reading : " << ts << endl;
113 cout << "... NbPaqMFRead=" << prtcnt << " NSameFC="
114 << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0]
115 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
116
117 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
118 int perc=0;
119 if (totnpqrd_[fib]>0) perc=100*totsamefc_/totnpqrd_[fib];
120 cout << " Fiber" << fib << " TotNPaqRd=" << totnpqrd_[fib] << " TotNPaqOK=" << totnpqok_[fib]
121 << " FracSameFC=" << perc << " %" << endl;
122 if (prtlev_ > 0) vpchk_[fib].Print(cout);
123 }
124 cout << " TotalDiskRead= " << totnbytesrd_/(1024*1024) << " MBytes Disk-Read rate= "
125 << (double)(totnbytesrd_)/1024./tm.PartialElapsedTimems() << " MB/s" << endl;
126 cout << " BRMultiFitsReader::run()/Timing: \n";
127 tm.Print();
128 cout << " ---------------------------------------------------------- " << endl;
129
130 usleep(50000); // Attente de traitement du dernier paquet
131 memgr_.Stop(); // Arret
132
133 } // Fin du bloc try
134 catch (std::exception& exc) {
135 cout << " BRMultiFitsReader::run()/catched execption msg= " << exc.what() << endl;
136 setRC(3);
137 return;
138 }
139 catch(...) {
140 cout << " BRMultiFitsReader::run()/catched unknown ... exception " << endl;
141 setRC(4);
142 return;
143 }
144 setRC(0);
145 return;
146}
147
148/* --Methode-- */
149bool BRMultiFitsReader::ReadNextAllFibers()
150{
151 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
152 if (ReadNext(fib)) return true; // probleme
153 }
154 if (!rdsamefc_ || (memgr_.NbFibres()<2)) {
155 uint_8 cfc=curfc_[0];
156 bool fgsamefc=true;
157 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
158 if (curfc_[fib]!=cfc) fgsamefc=false;
159 }
160 if (fgsamefc) totsamefc_++;
161 return false; // c'est OK
162 }
163 // On va essayer de lire jusqu'a avoir same_frame_counter
164 bool echec=true;
165 while (echec) {
166 uint_8 cfc=curfc_[0];
167 bool fgsamefc=true;
168 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
169 if (curfc_[fib]!=cfc) {
170 fgsamefc=false;
171 if (curfc_[fib] > cfc) cfc=curfc_[fib];
172 }
173 }
174 if (fgsamefc) {
175 totsamefc_++; echec=false; return false; // c'est OK , same framecounter
176 }
177 else { // else !fgsame
178 for(uint_4 fib=0; fib<memgr_.NbFibres(); fib++) {
179 while (curfc_[fib]<cfc) {
180 if (ReadNext(fib)) return true; // probleme
181 }
182 }
183 } // fin de else !fgsame
184 } // fin de while(echec): lecture jusqu'a same_frame_counter
185
186 return true; // probleme
187}
188
189/* --Methode-- */
190bool BRMultiFitsReader::ReadNext(int fib)
191{
192 if (!mff_[fib].IsOpen()) return true;
193 bool fggood=false;
194 while(!fggood) {
195 if (vfpos_[fib] >= mff_[fib].NAxis2()) {
196 mff_[fib].Close();
197 vfilenum_[fib]++;
198 if (vfilenum_[fib]>imax_) return true;
199 char flnm[1024];
200 sprintf(flnm,"%s/signal%d.fits",dirs_[fib].c_str(),vfilenum_[fib]);
201 if (prtlev_ > 1)
202 cout << " BRMultiFitsReader::ReadNext() opening" << flnm << endl;
203 mff_[fib].Open(flnm, MF_Read);
204 if (mff_[fib].NAxis1() != packsize_) {
205 cout << " BRMultiFitsReader::ReadNext(fib=" << fib << " File=" << flnm <<
206 " NAxis1()= " << mff_[fib].NAxis1() << " <> PaqSize()=" << packsize_ << endl;
207 throw BAORadioException("BRMultiFitsReader::ReadNext()/ mff.NAxis1() != memgr_.PaqSize() ");
208 }
209 vfpos_[fib]=0;
210 }
211 mff_[fib].ReadB(vpaq_[fib].Begin(), packsize_, vfpos_[fib]*packsize_);
212 vfpos_[fib]++;
213 totnbytesrd_+=packsize_;
214 totnpqrd_[fib]++;
215 fggood = vpchk_[fib].Check(vpaq_[fib],curfc_[fib]);
216 }
217 totnpqok_[fib]++;
218 return false;
219}
220
221/* --Methode-- */
222bool BRMultiFitsReader::MZoneManage(bool fgclean) // Retourne true si probleme
223{
224 /* Pour debug
225 cout << " BRMultiFitsReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_
226 << " max_targ_npaq=" << max_targ_npaq << endl;
227 */
228 if (mid_ >= 0) {
229 if (fgclean) memgr_.FreeMemZone(mid_, MemZS_Free);
230 else memgr_.FreeMemZone(mid_, MemZS_Filled);
231 }
232 mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2;
233 for (int fib=0;fib<(int)memgr_.NbFibres() ;fib++) mmbufib_[fib]=NULL;
234 if (fgclean) return false;
235 mid_ = memgr_.FindMemZoneId(MemZA_Fill);
236 mmbuf_ = memgr_.GetMemZone(mid_);
237 if (mmbuf_==NULL) return true;
238 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++)
239 mmbufib_[fib]=memgr_.GetMemZone(mid_,fib);
240 return false;
241}
242
243//-------------------------------------------------------
244// Classe thread de lecture de fichiers fits BAORadio
245//-------------------------------------------------------
246
247/* --Methode-- */
248BRFitsReader::BRFitsReader(RAcqMemZoneMgr& mem, vector<string>& infiles, bool fgnotrl)
249 : memgr(mem), infiles_(infiles), fgnotrl_(fgnotrl)
250{
251}
252
253/* --Methode-- */
254void BRFitsReader::run()
255{
256 setRC(1);
257
258 try {
259 TimeStamp ts;
260 Timer tm("BRFitsReader", false);
261 BRPaqChecker pcheck(!fgnotrl_); // Verification/comptage des paquets
262
263 size_t totnbytesrd = 0;
264 cout << " BRFitsReader::run() - Starting " << ts << " NbFiles=" << infiles_.size()
265 << " memgr.PaqSize() = " << memgr.PaqSize() << endl;
266
267 uint_4 nfileok = 0;
268 uint_8 nbytesrd = 0;
269 /* Variables pour la logique des zones memoire et numeros de paquets dans la zone memoire */
270 int mid = -2;
271 Byte* buff = NULL;
272 int kmp = 0;
273 int kmpmax=memgr.NbPaquets();
274
275 int paqsz = 0;
276 for(int ifile=0; ifile<infiles_.size(); ifile++) {
277 string ffname = infiles_[ifile];
278// -------------- Lecture de bytes
279 cout << "BRFitsReader::run() [" << ifile <<"]Ouverture/lecture fichier " << ffname << endl;
280 MiniFITSFile mff(ffname, MF_Read);
281 cout << "... Type=" << mff.DataTypeToString() << " NAxis1=" << mff.NAxis1()
282 << " NAxis2=" << mff.NAxis2() << endl;
283 if (mff.DataType() != MF_Byte) {
284 cout << "BRFitsReader::run() PB : DataType!=MF_Byte --> skipping " << endl;
285 continue;
286 }
287// Les fichier FITS contiennent l'entet (24 bytes), mais pas le trailer (16 bytes) si fgnotrl=true
288 int incpaqsz=0;
289 if (fgnotrl_) {
290 incpaqsz=16;
291 if (ifile==0) cout << " Warning : FITS files without frame trailers ..." << endl;
292 }
293 if (paqsz == 0) { // premier passage, on fixe la taille de paquet et on verifie compatibilite avec memgr
294 paqsz = mff.NAxis1()+incpaqsz;
295 if (paqsz != memgr.PaqSize()) {
296 cout << "BRFitsReader::run() mff.NAxis1() incompatible with memgr.PaqSize() -> exception " << endl;
297 throw SzMismatchError(" fits file size incompatible with memgr.PaqSize()");
298 }
299 }
300 else {
301 if (paqsz != mff.NAxis1()+incpaqsz) {
302 cout << " PB : paqsz=" << paqsz << " != mff.NAxis1()+" << incpaqsz << " --> skipping " << endl;
303 continue;
304 }
305 }
306 if (mid < 0) {
307 mid = memgr.FindMemZoneId(MemZA_Fill);
308 buff = memgr.GetMemZone(mid);
309 if (buff == NULL) {
310 cout << " BRFitsReader::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
311 setRC(2);
312 return;
313 }
314 kmp=0;
315 }
316 size_t sx = mff.NAxis1();
317 size_t sy = mff.NAxis2();
318 int nprt=0;
319 for(int j=0; j<sy; j++) {
320 mff.ReadB(buff+kmp*paqsz, sx, j*sx);
321 BRPaquet paq(NULL, buff+kmp*paqsz, paqsz);
322 bool pqok = pcheck.Check(paq); // Verification du paquet / FrameCounter
323 if (!pqok && (nprt < 10)) {
324 cout << "--BUG-- i=" << ifile << " mid=" << mid << " j=" << j << " kmp=" << kmp
325 << " paqsz=" << paqsz << endl;
326 nprt++;
327 paq.Print();
328 }
329 kmp++;
330 if (kmp >= kmpmax) { // Zone memoire rempli !
331 memgr.FreeMemZone(mid, MemZS_Filled);
332 mid = -2;
333 if (j<sy) {
334 mid = memgr.FindMemZoneId(MemZA_Fill);
335 buff = memgr.GetMemZone(mid);
336 if (buff == NULL) {
337 cout << " BRFitsReader::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
338 setRC(2);
339 return;
340 }
341 kmp=0;
342 }
343 }
344 }
345 nfileok++;
346 size_t nbytesrd = sx*sy;
347 totnbytesrd += nbytesrd;
348 } // Fin de la boucle sur les fichiers
349// Gestion d'une zone partiellement remplie
350 if (mid>=0) {
351 for(int k=kmp;k<kmpmax;k++) {
352 Byte* bp=buff+k*paqsz;
353 for(int l=0;l<paqsz;l++) bp[l]=0;
354 }
355 memgr.FreeMemZone(mid, MemZS_Filled);
356 }
357
358// sprintf(fname,"%s/.log",path_.c_str());
359// ofstream filog(fname);
360// filog << " DataProc::run() - starting log file " << ts << endl;
361// filog << " NbFiles=" << nfiles_ << " NBloc/File=" << nblocperfile_ << " NMaxMemZones=" << nmax_ << endl;
362
363
364
365 cout << " ------------------ BRFitsReader::run() END ----------------- " << endl;
366 ts.SetNow();
367 tm.SplitQ();
368 cout << " END reading " << ts << " NFileOK=" << nfileok << endl;
369 cout << " TotalDiskRead= " << totnbytesrd/(1024*1024) << " MBytes Disk-Read rate= "
370 << (double)(totnbytesrd)/1024./tm.PartialElapsedTimems() << " MB/s" << endl;
371 pcheck.Print(cout);
372 cout << " BRFitsReader::run()/Timing: \n";
373 tm.Print();
374 cout << " ---------------------------------------------------------- " << endl;
375
376 } // Fin du bloc try
377 catch (MiniFITSException& exc) {
378 cout << " BRFitsReader::run()/catched MiniFITSException " << exc.Msg() << endl;
379 setRC(3);
380 return;
381 }
382 catch(...) {
383 cout << " BRFitsReader::run()/catched unknown ... exception " << endl;
384 setRC(4);
385 return;
386 }
387 setRC(0);
388 return;
389}
Note: See TracBrowser for help on using the repository browser.