//---------------------------------------------------------------- // Projet BAORadio - (C) LAL/IRFU 2008-2010 // Classes de threads pour lecture fichiers fits BAORadio //---------------------------------------------------------------- #include "brfitsrd.h" #include #include #include #include #include "pexceptions.h" #include "timestamp.h" #include "ctimer.h" #include "brpaqu.h" #include "resusage.h" // Pour mesure temps elapsed/CPU ... #include // pour gettimeofday using namespace SOPHYA; //--------------------------------------------------------------------- // Classe thread de lecture de Multi-fibres de fichiers FITS BAORadio //--------------------------------------------------------------------- /* --Methode-- */ BRMultiFitsReader::BRMultiFitsReader(RAcqMemZoneMgr& mem, vector& dirs, bool rdsamefc, uint_4 imin, uint_4 imax, uint_4 istep) : memgr_(mem), dirs_(dirs), stop_(false), rdsamefc_(rdsamefc), imin_(imin), imax_(imax), istep_(istep) { SetPrintLevel(); totnbytesrd_ = 0; totsamefc_ = 0; if (memgr_.NbFibres() > MAXANAFIB) throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ NbFibres>MAXANAFIB "); if (dirs_.size() != memgr_.NbFibres()) throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ NbFibres != Nb Data Directories"); packsize_=memgr_.PaqSize(); mid_=-2; mmbuf_=NULL; max_targ_npaq = memgr_.NbPaquets(); for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) mmbufib_[fib]=NULL; char flnm[1024]; for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { sprintf(flnm,"%s/signal%d.fits",dirs_[fib].c_str(),imin_); mff_[fib].Open(flnm, MF_Read); if (mff_[fib].NAxis1() != memgr_.PaqSize()) { cout << " BRMultiFitsReader::BRMultiFitsReader/ fib=" << fib << " File=" << flnm << " NAxis1()= " << mff_[fib].NAxis1() << " <> PaqSize()=" << memgr_.PaqSize() << endl; throw BAORadioException("BRMultiFitsReader::BRMultiFitsReader/ mff.NAxis1() != memgr_.PaqSize() "); } // Extraction de qques parametres utiles depuis les fichiers FITS string fkvs; if (fib==0) { fkvs=mff_[fib].GetKeyValue("DATEOBS"); if (fkvs.length()>0) { cdateobs_.Set(fkvs); cout << " BRMultiFitsReader/First file (for fiber 0) DATEOBS=" << fkvs << endl; } } fkvs=mff_[fib].GetKeyValue("DATEOBS"); memgr_.FiberId(fib) = atoi( fkvs.c_str() ); vfilenum_.push_back(imin_); vfpos_.push_back(0); vpaq_.push_back(BRPaquet(NULL,memgr_.PaqSize())); vpchk_.push_back(BRPaqChecker(true,0)); curfc_.push_back(0); totnpqrd_.push_back(0); totnpqok_.push_back(0); } } /* --Methode-- */ void BRMultiFitsReader::run() { setRC(1); try { TimeStamp ts; Timer tm("BRMultiFitsReader", false); cout << " BRMultiFitsReader::run() - Starting " << ts << " NbFibres()=" << memgr_.NbFibres() << " PaqSize() = " << memgr_.PaqSize() << endl; cout << " ...ReadMode: " << ((rdsamefc_)?"Paquets With SameFrameCounter":"All OK paquets") << " signalII.fits IMin=" << imin_ << " IMax=" << imax_ << " IStep=" << istep_ << endl; uint_8 prtcnt=0; Byte* nextpaq=NULL; bool fgok=true; while (fgok) { if (stop_) break; if ( MoveToNextTarget() ) { cout << "BRMultiFitsReader::run()/Error-A- MoveToNextTarget() returned true ->STOP 9" << endl; setRC(7); fgok=false; break; } for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { nextpaq=GetPaquetTarget(fib); if (nextpaq == NULL) { // Cela ne devrait pas arriver cout << "BRMultiFitsReader::run()/Error-A2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl; setRC(9); fgok=false; break; } vpaq_[fib].Set(nextpaq); } if (ReadNextAllFibers()) { fgok=false; break; } prtcnt++; if ((prtlev_>0)&&(prtcnt%prtmodulo_==0)) { cout << "BRMultiFitsReader: NbPaqMFRead=" << prtcnt << " NSameFC=" << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0] << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl; } } MoveToNextTarget(); // Pour faire traiter le dernier paquet si plein MZoneManage(true); // Nettoyage final cout << " ------------------ BRMultiFitsReader::run() END ----------------- " << endl; ts.SetNow(); tm.SplitQ(); cout << "BRMultiFitsReader::run(): END reading : " << ts << endl; cout << "... NbPaqMFRead=" << prtcnt << " NSameFC=" << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0] << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl; for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { int perc=0; if (totnpqrd_[fib]>0) perc=100*totsamefc_/totnpqrd_[fib]; cout << " Fiber" << fib << " TotNPaqRd=" << totnpqrd_[fib] << " TotNPaqOK=" << totnpqok_[fib] << " FracSameFC=" << perc << " %" << endl; if (prtlev_ > 0) vpchk_[fib].Print(cout); } cout << " TotalDiskRead= " << totnbytesrd_/(1024*1024) << " MBytes Disk-Read rate= " << (double)(totnbytesrd_)/1024./tm.PartialElapsedTimems() << " MB/s" << endl; cout << " BRMultiFitsReader::run()/Timing: \n"; tm.Print(); cout << " ---------------------------------------------------------- " << endl; usleep(50000); // Attente de traitement du dernier paquet memgr_.Stop(); // Arret } // Fin du bloc try catch (std::exception& exc) { cout << " BRMultiFitsReader::run()/catched execption msg= " << exc.what() << endl; setRC(3); return; } catch(...) { cout << " BRMultiFitsReader::run()/catched unknown ... exception " << endl; setRC(4); return; } setRC(0); return; } /* --Methode-- */ bool BRMultiFitsReader::ReadNextAllFibers() { for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { if (ReadNext(fib)) return true; // probleme } if (!rdsamefc_ || (memgr_.NbFibres()<2)) { uint_8 cfc=curfc_[0]; bool fgsamefc=true; for(size_t fib=1; fib cfc) cfc=curfc_[fib]; } } if (fgsamefc) { totsamefc_++; echec=false; return false; // c'est OK , same framecounter } else { // else !fgsame for(uint_4 fib=0; fib= mff_[fib].NAxis2()) { mff_[fib].Close(); vfilenum_[fib]++; if (vfilenum_[fib]>imax_) return true; char flnm[1024]; sprintf(flnm,"%s/signal%d.fits",dirs_[fib].c_str(),vfilenum_[fib]); if (prtlev_ > 1) cout << " BRMultiFitsReader::ReadNext() opening" << flnm << endl; mff_[fib].Open(flnm, MF_Read); if (mff_[fib].NAxis1() != packsize_) { cout << " BRMultiFitsReader::ReadNext(fib=" << fib << " File=" << flnm << " NAxis1()= " << mff_[fib].NAxis1() << " <> PaqSize()=" << packsize_ << endl; throw BAORadioException("BRMultiFitsReader::ReadNext()/ mff.NAxis1() != memgr_.PaqSize() "); } if (fib==0) { // updating current date from file (fiber 0) string fkvs=mff_[fib].GetKeyValue("DATEOBS"); if (fkvs.length()>0) cdateobs_.Set(fkvs); } vfpos_[fib]=0; } mff_[fib].ReadB(vpaq_[fib].Begin(), packsize_, vfpos_[fib]*packsize_); vfpos_[fib]++; totnbytesrd_+=packsize_; totnpqrd_[fib]++; fggood = vpchk_[fib].Check(vpaq_[fib],curfc_[fib]); } totnpqok_[fib]++; return false; } /* --Methode-- */ bool BRMultiFitsReader::MZoneManage(bool fgclean) // Retourne true si probleme { /* Pour debug cout << " BRMultiFitsReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_ << " max_targ_npaq=" << max_targ_npaq << endl; */ if (mid_ >= 0) { if (fgclean) memgr_.FreeMemZone(mid_, MemZS_Free); else memgr_.FreeMemZone(mid_, MemZS_Filled); } mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2; for (int fib=0;fib<(int)memgr_.NbFibres() ;fib++) mmbufib_[fib]=NULL; if (fgclean) return false; mid_ = memgr_.FindMemZoneId(MemZA_Fill); mmbuf_ = memgr_.GetMemZone(mid_); if (mmbuf_==NULL) return true; for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) mmbufib_[fib]=memgr_.GetMemZone(mid_,fib); return false; } //------------------------------------------------------- // Classe thread de lecture de fichiers fits BAORadio //------------------------------------------------------- /* --Methode-- */ BRFitsReader::BRFitsReader(RAcqMemZoneMgr& mem, vector& infiles, bool fgnotrl) : memgr(mem), infiles_(infiles), fgnotrl_(fgnotrl) { } /* --Methode-- */ void BRFitsReader::run() { setRC(1); try { TimeStamp ts; Timer tm("BRFitsReader", false); BRPaqChecker pcheck(!fgnotrl_); // Verification/comptage des paquets size_t totnbytesrd = 0; cout << " BRFitsReader::run() - Starting " << ts << " NbFiles=" << infiles_.size() << " memgr.PaqSize() = " << memgr.PaqSize() << endl; uint_4 nfileok = 0; uint_8 nbytesrd = 0; /* Variables pour la logique des zones memoire et numeros de paquets dans la zone memoire */ int mid = -2; Byte* buff = NULL; int kmp = 0; int kmpmax=memgr.NbPaquets(); int paqsz = 0; for(int ifile=0; ifile skipping " << endl; continue; } // Les fichier FITS contiennent l'entet (24 bytes), mais pas le trailer (16 bytes) si fgnotrl=true int incpaqsz=0; if (fgnotrl_) { incpaqsz=16; if (ifile==0) cout << " Warning : FITS files without frame trailers ..." << endl; } if (paqsz == 0) { // premier passage, on fixe la taille de paquet et on verifie compatibilite avec memgr paqsz = mff.NAxis1()+incpaqsz; if (paqsz != memgr.PaqSize()) { cout << "BRFitsReader::run() mff.NAxis1() incompatible with memgr.PaqSize() -> exception " << endl; throw SzMismatchError(" fits file size incompatible with memgr.PaqSize()"); } } else { if (paqsz != mff.NAxis1()+incpaqsz) { cout << " PB : paqsz=" << paqsz << " != mff.NAxis1()+" << incpaqsz << " --> skipping " << endl; continue; } } if (mid < 0) { mid = memgr.FindMemZoneId(MemZA_Fill); buff = memgr.GetMemZone(mid); if (buff == NULL) { cout << " BRFitsReader::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl; setRC(2); return; } kmp=0; } size_t sx = mff.NAxis1(); size_t sy = mff.NAxis2(); int nprt=0; for(int j=0; j NULL" << endl; setRC(2); return; } kmp=0; } } } nfileok++; size_t nbytesrd = sx*sy; totnbytesrd += nbytesrd; } // Fin de la boucle sur les fichiers // Gestion d'une zone partiellement remplie if (mid>=0) { for(int k=kmp;k