//---------------------------------------------------------------- // Projet BAORadio - (C) LAL/IRFU 2008-2011 // Classes de threads pour lecture fichiers fits BAORadio //---------------------------------------------------------------- #include "brfitsrd.h" #include #include #include #include #include "pexceptions.h" #include "timestamp.h" #include "ctimer.h" #include "brpaqu.h" #include "resusage.h" // Pour mesure temps elapsed/CPU ... #include // pour gettimeofday using namespace SOPHYA; //--------------------------------------------------------------------- // Classe thread de lecture de Multi-fibres de fichiers FITS BAORadio //--------------------------------------------------------------------- /*! \class BRMultiFitsReader \ingroup TAcq \brief Read data from FITS files and fills RAcqMemZoneMgr memory zones. This class read BRPaquet data from several FITS files. Data can be synchronized between different files using the FrameCounter. */ /* --Methode-- */ BRMultiFitsReader::BRMultiFitsReader(RAcqMemZoneMgr& mem, vector& dirs, bool rdsamefc, uint_4 imin, uint_4 imax, uint_4 istep) : memgr_(mem), dirs_(dirs), stop_(false), rdsamefc_(rdsamefc), imin_(imin), imax_(imax), istep_(istep) { SetPrintLevel(); totnbytesrd_ = 0; totsamefc_ = 0; if (memgr_.NbFibres() > MAXANAFIB) throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ NbFibres>MAXANAFIB "); if (dirs_.size() != memgr_.NbFibres()) throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ NbFibres != Nb Data Directories"); packsize_=memgr_.PaqSize(); mid_=-2; mmbuf_=NULL; max_targ_npaq = memgr_.NbPaquets(); for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) mmbufib_[fib]=NULL; cpaqdeltatime_=0.; char flnm[1024]; for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { sprintf(flnm,"%s/signal%d.fits",dirs_[fib].c_str(),imin_); mff_[fib].Open(flnm, MF_Read); cout << " BRMultiFitsReader::BRMultiFitsReader() opening " << flnm << endl; if (mff_[fib].NAxis1() != memgr_.PaqSize()) { cout << " BRMultiFitsReader::BRMultiFitsReader/ fib=" << fib << " File=" << flnm << " NAxis1()= " << mff_[fib].NAxis1() << " <> PaqSize()=" << memgr_.PaqSize() << endl; throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ mff.NAxis1() != memgr_.PaqSize() "); } // Extraction de qques parametres utiles depuis les fichiers FITS string fkvs; if (fib==0) { fkvs=mff_[fib].GetKeyValue("DATEOBS"); if (fkvs.length()>0) cdateobs_.Set(fkvs); fkvs=mff_[fib].GetKeyValue("TMSTART"); if (fkvs.length()>0) { ctmstart_.Set(fkvs); fkvs=mff_[fib].GetKeyValue("TMEND"); SOPHYA::TimeStamp tmend_=ctmstart_; if (fkvs.length()>0) tmend_.Set(fkvs); cpaqdeltatime_=((double)(tmend_.DaysPart()-cdateobs_.DaysPart())*86400.+ (tmend_.SecondsPart()-cdateobs_.SecondsPart()))/(double)mff_[fib].NAxis2(); cout << " BRMultiFitsReader/First file (for fiber 0) TMSTART=" << fkvs << " TMEND-START=" << cpaqdeltatime_*(double)mff_[fib].NAxis2() << endl; } } fkvs=mff_[fib].GetKeyValue("FIBERNUM"); memgr_.FiberId(fib) = atoi( fkvs.c_str() ); vfilenum_.push_back(imin_); vfpos_.push_back(0); vpaq_.push_back(BRPaquet(NULL,memgr_.PaqSize())); vpchk_.push_back(BRPaqChecker(true,0)); curfc_.push_back(0); totnpqrd_.push_back(0); totnpqok_.push_back(0); } } /* --Methode-- */ void BRMultiFitsReader::run() { setRC(1); try { TimeStamp ts; Timer tm("BRMultiFitsReader", false); cout << " BRMultiFitsReader::run() - Starting " << ts << " NbFibres()=" << memgr_.NbFibres() << " PaqSize() = " << memgr_.PaqSize() << endl; cout << " ...ReadMode: " << ((rdsamefc_)?"Paquets With SameFrameCounter":"All OK paquets") << " signalII.fits IMin=" << imin_ << " IMax=" << imax_ << " IStep=" << istep_ << endl; uint_8 prtcnt=0; Byte* nextpaq=NULL; bool fgok=true; while (fgok) { if (stop_) break; if ( MoveToNextTarget() ) { cout << "BRMultiFitsReader::run()/Error-A- MoveToNextTarget() returned true ->STOP 9" << endl; setRC(7); fgok=false; break; } for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { nextpaq=GetPaquetTarget(fib); if (nextpaq == NULL) { // Cela ne devrait pas arriver cout << "BRMultiFitsReader::run()/Error-A2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl; setRC(9); fgok=false; break; } vpaq_[fib].Set(nextpaq); } if (ReadNextAllFibers()) { fgok=false; break; } prtcnt++; if ((prtlev_>0)&&(prtcnt%prtmodulo_==0)) { cout << "BRMultiFitsReader: NbPaqMFRead=" << prtcnt << " NSameFC=" << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0] << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl; } } MoveToNextTarget(); // Pour faire traiter le dernier paquet si plein MZoneManage(true); // Nettoyage final cout << " ------------------ BRMultiFitsReader::run() END ----------------- " << endl; ts.SetNow(); tm.SplitQ(); cout << "BRMultiFitsReader::run(): END reading : " << ts << endl; cout << "... NbPaqMFRead=" << prtcnt << " NSameFC=" << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0] << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl; for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { int perc=0; if (totnpqrd_[fib]>0) perc=100*totsamefc_/totnpqrd_[fib]; cout << " Fiber" << fib << " TotNPaqRd=" << totnpqrd_[fib] << " TotNPaqOK=" << totnpqok_[fib] << " FracSameFC=" << perc << " %" << endl; if (prtlev_ > 0) vpchk_[fib].Print(cout); } cout << " TotalDiskRead= " << totnbytesrd_/(1024*1024) << " MBytes Disk-Read rate= " << (double)(totnbytesrd_)/1024./tm.PartialElapsedTimems() << " MB/s" << endl; cout << " BRMultiFitsReader::run()/Timing: \n"; tm.Print(); cout << " ---------------------------------------------------------- " << endl; usleep(250000); // Attente de traitement du dernier paquet memgr_.Stop(); // Arret } // Fin du bloc try catch (std::exception& exc) { cout << " BRMultiFitsReader::run()/catched execption msg= " << exc.what() << endl; setRC(3); return; } catch(...) { cout << " BRMultiFitsReader::run()/catched unknown ... exception " << endl; setRC(4); return; } setRC(0); return; } /* --Methode-- */ bool BRMultiFitsReader::ReadNextAllFibers() { for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { if (ReadNext(fib)) return true; // probleme } if (!rdsamefc_ || (memgr_.NbFibres()<2)) { uint_8 cfc=curfc_[0]; bool fgsamefc=true; for(size_t fib=1; fib cfc) cfc=curfc_[fib]; } } if (fgsamefc) { totsamefc_++; echec=false; return false; // c'est OK , same framecounter } else { // else !fgsame for(uint_4 fib=0; fib= mff_[fib].NAxis2()) { mff_[fib].Close(); vfilenum_[fib]+=istep_; if (vfilenum_[fib]>imax_) return true; char flnm[1024]; sprintf(flnm,"%s/signal%d.fits",dirs_[fib].c_str(),vfilenum_[fib]); if (prtlev_ > 1) cout << " BRMultiFitsReader::ReadNext() opening" << flnm << endl; mff_[fib].Open(flnm, MF_Read); if (mff_[fib].NAxis1() != packsize_) { cout << " BRMultiFitsReader::ReadNext(fib=" << fib << " File=" << flnm << " NAxis1()= " << mff_[fib].NAxis1() << " <> PaqSize()=" << packsize_ << endl; throw BAORadioException("BRMultiFitsReader::ReadNext()/ mff.NAxis1() != memgr_.PaqSize() "); } if (fib==0) { // updating current date from file (fiber 0) string fkvs=mff_[fib].GetKeyValue("DATEOBS"); if (fkvs.length()>0) cdateobs_.Set(fkvs); fkvs=mff_[fib].GetKeyValue("TMSTART"); if (fkvs.length()>0) { ctmstart_.Set(fkvs); cout << " BRMultiFitsReader::ReadNext TMSTART=" << fkvs << endl; fkvs=mff_[fib].GetKeyValue("TMEND"); SOPHYA::TimeStamp tmend_=ctmstart_; if (fkvs.length()>0) tmend_.Set(fkvs); cpaqdeltatime_=((double)(tmend_.DaysPart()-cdateobs_.DaysPart())*86400.+ (tmend_.SecondsPart()-cdateobs_.SecondsPart()))/(double)mff_[fib].NAxis2(); } } vfpos_[fib]=0; } mff_[fib].ReadB(vpaq_[fib].Begin(), packsize_, vfpos_[fib]*packsize_); vfpos_[fib]++; totnbytesrd_+=packsize_; totnpqrd_[fib]++; fggood = vpchk_[fib].Check(vpaq_[fib],curfc_[fib]); } totnpqok_[fib]++; return false; } /* --Methode-- */ bool BRMultiFitsReader::MZoneManage(bool fgclean) // Retourne true si probleme { /* Pour debug cout << " BRMultiFitsReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_ << " max_targ_npaq=" << max_targ_npaq << endl; */ if (mid_ >= 0) { if (fgclean) memgr_.FreeMemZone(mid_, MemZS_Free); else memgr_.FreeMemZone(mid_, MemZS_Filled); } mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2; for (int fib=0;fib<(int)memgr_.NbFibres() ;fib++) mmbufib_[fib]=NULL; if (fgclean) return false; mid_ = memgr_.FindMemZoneId(MemZA_Fill); mmbuf_ = memgr_.GetMemZone(mid_); if (mmbuf_==NULL) return true; for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) mmbufib_[fib]=memgr_.GetMemZone(mid_,fib); // Definition temps pour la zone a remplir et numeros de sequence des fichiers en cours de lecture memgr_.GetAuxData(mid_)->FillTime().Set(ctmstart_.ToDays()+cpaqdeltatime_*(double)vfpos_[0]/86400.); memgr_.GetAuxData(mid_)->FileSequenceNumVec()=vfilenum_; return false; } //------------------------------------------------------- // Classe thread de lecture de fichiers fits BAORadio //------------------------------------------------------- /*! \class BRFitsReader \ingroup TAcq \brief (Deprecated) Data reader from a single serie of fits files. Use BRMultiFitsReader instead. */ /* --Methode-- */ BRFitsReader::BRFitsReader(RAcqMemZoneMgr& mem, vector& infiles, bool fgnotrl) : memgr(mem), infiles_(infiles), fgnotrl_(fgnotrl) { } /* --Methode-- */ void BRFitsReader::run() { setRC(1); try { TimeStamp ts; Timer tm("BRFitsReader", false); BRPaqChecker pcheck(!fgnotrl_); // Verification/comptage des paquets size_t totnbytesrd = 0; cout << " BRFitsReader::run() - Starting " << ts << " NbFiles=" << infiles_.size() << " memgr.PaqSize() = " << memgr.PaqSize() << endl; uint_4 nfileok = 0; uint_8 nbytesrd = 0; /* Variables pour la logique des zones memoire et numeros de paquets dans la zone memoire */ int mid = -2; Byte* buff = NULL; int kmp = 0; int kmpmax=memgr.NbPaquets(); int paqsz = 0; for(int ifile=0; ifile skipping " << endl; continue; } // Les fichier FITS contiennent l'entet (24 bytes), mais pas le trailer (16 bytes) si fgnotrl=true int incpaqsz=0; if (fgnotrl_) { incpaqsz=16; if (ifile==0) cout << " Warning : FITS files without frame trailers ..." << endl; } if (paqsz == 0) { // premier passage, on fixe la taille de paquet et on verifie compatibilite avec memgr paqsz = mff.NAxis1()+incpaqsz; if (paqsz != memgr.PaqSize()) { cout << "BRFitsReader::run() mff.NAxis1() incompatible with memgr.PaqSize() -> exception " << endl; throw SzMismatchError(" fits file size incompatible with memgr.PaqSize()"); } } else { if (paqsz != mff.NAxis1()+incpaqsz) { cout << " PB : paqsz=" << paqsz << " != mff.NAxis1()+" << incpaqsz << " --> skipping " << endl; continue; } } if (mid < 0) { mid = memgr.FindMemZoneId(MemZA_Fill); buff = memgr.GetMemZone(mid); if (buff == NULL) { cout << " BRFitsReader::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl; setRC(2); return; } kmp=0; } size_t sx = mff.NAxis1(); size_t sy = mff.NAxis2(); int nprt=0; for(int j=0; j NULL" << endl; setRC(2); return; } kmp=0; } } } nfileok++; size_t nbytesrd = sx*sy; totnbytesrd += nbytesrd; } // Fin de la boucle sur les fichiers // Gestion d'une zone partiellement remplie if (mid>=0) { for(int k=kmp;k