//---------------------------------------------------------------- // ---- classes de threads pour lecture (transfert DMA) // et ecriture disque pour acquisition BAORadio ----- // LAL - R. Ansari - Juin/Juillet 2008 //---------------------------------------------------------------- #include "racqurw.h" #include #include #include #include #include "pexceptions.h" #include "timestamp.h" #include "pciewrap.h" #include "brpaqu.h" #include "minifits.h" #include "resusage.h" // Pour mesure temps elapsed/CPU ... #include "datatable.h" // Pour sauver les entetes de paquet #include // pour gettimeofday // Si on veut que MultiDataSaver cree des fichiers avec le numero des FrameCounters... // define DEBUGPAQHDR //------------------------------------------------------- // Classe thread de lecture PCI-Express //------------------------------------------------------- PCIEReader::PCIEReader(PCIEWrapperInterface &pciw,uint_4 sizeFrame,uint_4 packSize ,RAcqMemZoneMgr& mem, uint_4 nmax, BRDataFmtConv swapall) : memgr(mem) , pciw_ (pciw) { nmax_ = nmax; swapall_ = swapall; // select data swap/format conversion for BRPaquet stop_ = false; packSize_ = packSize; sizeFr_ =sizeFrame; // Pour la logique de gestion des paquets ds zone memoire mid_ = -2; targ_npaq_ = 0; max_targ_npaq = memgr.NbPaquets(); mmbuf_ = NULL; } bool PCIEReader::MZoneManage(bool fgclean) // Retourne true si probleme { /* Pour debug cout << " PCIEReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_ << " max_targ_npaq=" << max_targ_npaq << endl; */ if (mid_ >= 0) memgr.FreeMemZone(mid_, MemZS_Filled); mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2; if (fgclean) return false; mid_ = memgr.FindMemZoneId(MemZA_Fill); mmbuf_ = memgr.GetMemZone(mid_); if (mmbuf_==NULL) return true; return false; } void PCIEReader::run() { //Precision insuffisante ResourceUsage ru; ru.Update(); // Pour recuperer le temps passe struct timeval tv1, tv2; gettimeofday(&tv1, NULL); try{ cout << " PCIEReader::run() - Starting , NMaxMemZones=" << nmax_ << "memgr.NbPaquets()=" << memgr.NbPaquets() << endl; setRC(1); // sigaddset(&act.sa_mask,SIGINT); // pour proteger le transfert DMA //sigaction(SIGINT,&act,NULL); uint_4 paqsz = memgr.PaqSize(); uint_4 dmasz = pciw_.TransferSize(); pciw_.StartTransfers(); Byte* Datas = NULL; Byte* tampon = new Byte[paqsz]; Byte* nextpaq = NULL; uint_4 off_acheval = 0; int nerrdma = 0; int maxerrdma = 10; bool fgarret = false; uint_4 npaqfait = 0; // Nb total de paquets traites (DMA + decode) /// do{ si boucle infini // for (uint_4 kmz=0; kmz=maxerrdma) { fgarret = true; break; } } else { // DMA reussi uint_4 curoff = 0; //1- On traite le paquet a cheval, rempli partiellement avec le DMA d'avant si necessaire if (off_acheval > 0) { if ((paqsz-off_acheval)< dmasz) { memcpy((void *)(tampon+off_acheval), (void *)Datas, paqsz-off_acheval); curoff = paqsz-off_acheval; off_acheval = 0; if ((nextpaq=NextPaqTarget()) == NULL) { cout << "2 PCIEReader::run()/Error NextPaqTarget() returned NULL ->STOP 9" << endl; setRC(9); fgarret=true; break; } BRPaquet paq(tampon, nextpaq, paqsz, swapall_); npaqfait++; // Ne pas oublier le compteur de paquets faits } else { memcpy((void *)(tampon+off_acheval), (void *)Datas, dmasz); curoff =dmasz; off_acheval = (dmasz+off_acheval); } } //2- On traite les paquets complet qui se trouvent dans la zone du DMA while((curoff+paqsz)<=dmasz) { // BRPaquet paq((Byte*)(Datas)+((paqsz*j)), nextdma+j*paqsz, paqsz, swapall_); if ((nextpaq=NextPaqTarget()) == NULL) { cout << "3 PCIEReader::run()/Error NextPaqTarget() returned NULL ->STOP 9" << endl; setRC(9); fgarret=true; break; } BRPaquet paq(Datas+curoff, nextpaq, paqsz, swapall_); curoff += paqsz; // On avance l'index dans le buffer du DMA npaqfait++; // Ne pas oublier le compteur de paquets faits } // -- FIN traitement des paquets complets ds un DMA //3- On copie si besoin la fin du DMA dans la zone tampon if (curoff < dmasz) { if (fgarret) break; // pour sortir si l'on est passe par un STOP 9 off_acheval = dmasz-curoff; memcpy(tampon, (void*)(Datas+curoff), off_acheval); curoff += off_acheval; } } // Traitement d'un DMA OK } // }while(!stop_); gettimeofday(&tv2, NULL); double tmelaps2 = (tv2.tv_sec-tv1.tv_sec)*1000.+(tv2.tv_usec-tv1.tv_usec)/1000.; if (tmelaps2<0.1) tmelaps2=0.1; cout << " ------------------ PCIEReader::run()-End summary -------------------" << endl; cout << " PCIEReader/Info TotTransfer=" << pciw_.TotTransferBytes()/1024 << " kb , ElapsTime=" << tmelaps2 << " ms ->" << (double)pciw_.TotTransferBytes()/tmelaps2 << " kb/s" << endl; cout << " --------------------------------------------------------------------" << endl; MZoneManage(true); delete [] tampon; }catch (PException& exc) { cout << " PCIEREADER::run()/catched PException " << exc.Msg() << endl; setRC(3); return; } catch(...) { cout << " PCIEREADER::run()/catched unknown ... exception " << endl; setRC(4); return; } setRC(0); return; } void PCIEReader::Stop() { // cout << " PCIEReader::Stop() -------------> STOP" < NULL" << endl; setRC(2); return; } for(uint_4 i=0; i=maxerrdma) { fgarret = true; break; } } else { uint_4 curoff = 0; //1- On traite le paquet a cheval, rempli partiellement avec le DMA d'avant si necessaire // if (off_acheval > 0) { // memcpy((void *)(tampon+off_acheval), (void *)Datas, paqsz-off_acheval); // curoff = paqsz-off_acheval; off_acheval = 0; // BRPaquet paq(tampon, locdata, paqsz, swapall_); // npaqfait++; // Ne pas oublier le compteur de paquets faits // pcheck.Check(paq); // Verification du paquet / FrameCounter //} if (off_acheval > 0) { if ((paqsz-off_acheval)< dmasz) { memcpy((void *)(tampon+off_acheval), (void *)Datas, paqsz-off_acheval); curoff = paqsz-off_acheval; off_acheval = 0; BRPaquet paq(tampon, locdata, paqsz, swapall_); npaqfait++; // Ne pas oublier le compteur de paquets faits pcheck.Check(paq); // Verification du paquet / FrameCounter } else { memcpy((void *)(tampon+off_acheval), (void *)Datas, dmasz); curoff =dmasz; off_acheval = (dmasz+off_acheval); } } //2- On traite les paquets complet qui se trouvent dans la zone du DMA while((curoff+paqsz)<=dmasz) { // BRPaquet paq((Byte*)(Datas)+((paqsz*j)), nextdma+j*paqsz, paqsz, swapall_); // BRPaquet paq(Datas+curoff, locdata, paqsz, swapall_); BRPaquet paq(Datas+curoff, nextdma+npaqfait*paqsz, paqsz, swapall_); curoff += paqsz; // On avance l'index dans le buffer du DMA npaqfait++; // Ne pas oublier le compteur de paquets faits pcheck.Check(paq); // Verification du paquet / FrameCounter } // -- FIN traitement des paquets complets ds un DMA //3- On copie si besoin la fin du DMA dans la zone tampon if (curoff < dmasz) { off_acheval = dmasz-curoff; memcpy(tampon, (void*)(Datas+curoff), off_acheval); curoff += off_acheval; } } // Traitement d'un DMA OK } // Fin boucle de remplissage d'une zone memoire } // Fin boucle sur les zones setRC(0); gettimeofday(&tv2, NULL); double tmelaps2 = (tv2.tv_sec-tv1.tv_sec)*1000.+(tv2.tv_usec-tv1.tv_usec)/1000.; if (tmelaps2<0.1) tmelaps2=0.1; cout << " ------------------ PCIEReaderChecker::run()-End summary -------------------" << endl; cout << " PCIEReaderChecker/Info TotTransfer=" << pciw_.TotTransferBytes()/1024 << " kb , ElapsTime=" << tmelaps2 << " ms ->" << (double)pciw_.TotTransferBytes()/tmelaps2 << " kb/s" << endl; pcheck.Print(cout); cout << " --------------------------------------------------------------------" << endl; delete [] locdata; delete [] tampon; return; } void PCIEReaderChecker::Stop() { // cout << " PCIEReaderChecker::stop() ........ STOP" < vec_pciw, RAcqMemZoneMgr& mem, BRParList const& par) : memgr(mem), par_(par), vec_pciw_ (vec_pciw) { nmax_ = par_.MaxNbBlocs(); swapall_ = par_.GetDataConvFg(); // select data swap/format conversion for BRPaquet stop_ = false; packSize_ = par_.RecvPaquetSize(); packSizeInMgr_=memgr.PaqSize(); sizeFr_=par_.DMASizeBytes(); if (vec_pciw.size() != memgr.NbFibres()) { cout << " PCIEMultiReader()PbArgs: vec_pciw.size()= " << vec_pciw.size() << " memgr.NbFibres()=" < MAXNBFIB) throw ParmError("PCIEMultiReader:ERROR/ vec_pciw.size() > MAXNBFIB "); nbDma_= vec_pciw.size(); mid_=-2; mmbuf_=NULL; max_targ_npaq = memgr.NbPaquets(); for (int fid=0 ; fid<(int)nbDma_ ;fid++) { memgr.FiberId(fid)=vec_pciw[fid]->FiberId(); mmbufib_[fid]=NULL; } stopreason_="??Unknown??"; } /* --Methode-- */ void PCIEMultiReader::run() { struct timeval tv1,tv2; gettimeofday(&tv1, NULL); cout << " PCIEMultiReader::run() - Starting , NMaxMemZones=" << nmax_ << " memgr.NbPaquets(),PaqSz=" << memgr.NbPaquets() << " ," << memgr.PaqSize() << " DMA-Paqsize " << packSize_ << " " << BRPaquet::FmtConvToString(swapall_) << endl; setRC(1); // sigaddset(&act.sa_mask,SIGINT); // pour proteger le transfert DMA //sigaction(SIGINT,&act,NULL); uint_4 paqszmm = memgr.PaqSize(); uint_4 paqsz = packSize_; uint_4 dmasz = vec_pciw_[0]->TransferSize(); //DEL vec_pciw_[0]->StartTransfers(); BRPaqChecker pcheck[MAXNBFIB]; // Verification/comptage des paquets Byte* Datas[MAXNBFIB]; Byte* tampon[MAXNBFIB] ; Byte* predtampon=NULL; // tampon de recopie pour la reduction des tailles de paquets Byte* nextpaq=NULL; uint_4 off_acheval=0; bool fgarret = false; // Initialisation des tampons pour recopie des paquets a cheval pour chaque DMA for (int i=0;i< (int)nbDma_ ;i++) { tampon[i]= new Byte[paqsz]; vec_pciw_[i]->SetMaxWaitEndDMA(par_.first_maxkwedma_,par_.first_nretrydma_); } bool fgredpaq=par_.fgreducpsize; if (fgredpaq) { cout << " PCIEMultiReader::run() - PaquetSizeReduction - RedSize=" << par_.redpqsize << " Offset=" << par_.reducoffset << " " << ((par_.reducneedcopy)?"NeedCopy":"NOCopy") << " " << BRPaquet::ReducActionToString(par_.pqreducmode) << endl; predtampon = new Byte[paqsz]; } #ifdef DEBUGPAQHDR ofstream header[MAXNBFIB]; for(uint_4 fib=0; fibStartTransfers(); if ((npaqfaitg>1)&&fg_change_timeout) { for (int i=0;i< (int)nbDma_ ;i++) vec_pciw_[i]->SetMaxWaitEndDMA(par_.maxkwedma_,par_.nretrydma_); fg_change_timeout=false; } // On pointe vers le debut de la zone a remplir aver le prochain DMA //-- Zone memoire locale Byte* nextdma = buff+i*paqsz; bool fgbaddma=false; // On boucle sur les nbDma_ en attente de leurs terminaison for (int dma=0; dma <(int) nbDma_ ;dma++) { Datas[dma]=vec_pciw_[dma]->GetData(); if (Datas[dma] == NULL) { // No data Read in DMA fgbaddma=true; cout << "PCIEMultiReaderChecker/ERROR - DMA failed !" << endl; vec_pciw_[dma]->PrintStatus(cout); stopreason_="--Failed DMA--"; fgarret = true; break; } } if (fgbaddma) continue; uint_4 curoff=0; //1- On traite le paquet a cheval, rempli partiellement avec le DMA d'avant si necessaire pour les n fibres if (off_acheval > 0) { // IF Numero B if ((paqsz-off_acheval)< dmasz) { // IF Numero A for(uint_4 fib=0; fibSTOP 9" << endl; setRC(9); fgarret=true; break; } for(uint_4 fib=0; fibSTOP 9" << endl; setRC(9); fgarret=true; break; } // CHECK S'il faut faire une reduction de taille de paquet if (fgredpaq) { // reduction taille de paquet if (par_.reducneedcopy) { BRPaquet paqc1(tampon[fib], predtampon, paqsz, swapall_); BRPaquet paqc2(nextpaq, par_.redpqsize); paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset); } else { BRPaquet paqc1(tampon[fib], paqsz); BRPaquet paqc2(nextpaq, par_.redpqsize); paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset); } } else { BRPaquet paqc(tampon[fib], nextpaq, paqsz, swapall_); } BRPaquet paq(nextpaq, packSizeInMgr_); npaqfait[fib]++; if (fib==nbDma_-1) npaqfaitg++; // Ne pas oublier le compteur de paquets faits pcheck[fib].Check(paq); // Verification du paquet / FrameCounter #ifdef DEBUGPAQHDR header[fib] << dec << paq.FrameCounter()<< endl; #endif } } else { // se rapporte au IF numero A for(uint_4 fib=0; fib= nmax_* memgr.NbPaquets())) break; if ( MoveToNextTarget() ) { cout << "PCIEMultiReader::run()/Error-B- MoveToNextTarget() returned true ->STOP 9" << endl; setRC(9); fgarret=true; break; } for(uint_4 fib=0; fib= nmax_*memgr.NbPaquets()) continue; nextpaq=GetPaquetTarget(fib); if (nextpaq == NULL) { // Cela ne devrait pas arriver cout << "PCIEReader::run()/Error-B2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl; setRC(9); fgarret=true; break; } // CHECK S'il faut faire une reduction de taille de paquet if (fgredpaq) { // reduction taille de paquet if (par_.reducneedcopy) { BRPaquet paqc1(Datas[fib]+curoff, predtampon, paqsz, swapall_); BRPaquet paqc2(nextpaq, par_.redpqsize); paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset); } else { BRPaquet paqc1(Datas[fib]+curoff, paqsz); BRPaquet paqc2(nextpaq, par_.redpqsize); paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset); } } else { BRPaquet paqc(Datas[fib]+curoff, nextpaq, paqsz, swapall_); } BRPaquet paq(nextpaq, packSizeInMgr_); npaqfait[fib]++; if (fib==nbDma_-1) npaqfaitg++; // Ne pas oublier le compteur de paquets faits pcheck[fib].Check(paq); // Verification du paquet / FrameCounter #ifdef DEBUGPAQHDR header[fib] << dec << paq.FrameCounter()<< endl; #endif } curoff += paqsz; // On avance l'index dans le buffer du DMA } // -- FIN traitement des paquets complets ds un DMA - FIN du while numero C //3- On copie si besoin la fin du DMA dans la zone tampon if (curoff < dmasz) { // IF numero D off_acheval = dmasz-curoff; for(uint_4 fib=0; fib= nmax_*memgr.NbPaquets()) stopreason_="--Max Nb paquets reached--"; MoveToNextTarget(); // Pour faire traiter le dernier paquet si plein MZoneManage(true); //---- Nettoyage final setRC(0); gettimeofday(&tv2, NULL); double tmelaps2 = (tv2.tv_sec-tv1.tv_sec)*1000.+(tv2.tv_usec-tv1.tv_usec)/1000.; if (tmelaps2<0.1) tmelaps2=0.1; cout << " ---------- PCIEMultiReader::run()-End StopReason: " << stopreason_ << endl << " Summary NPaqFait=" << npaqfaitg << "------------- " << endl; for (int dma=0; dma < (int)nbDma_ ;dma++) { cout << " --Fib=" << dma << " NPaqFait=" << npaqfait[dma] << " TotTransfer=" << vec_pciw_[dma]->TotTransferBytes()/1024 << " kb , ElapsTime=" << tmelaps2 << " ms ->" << (double)vec_pciw_[dma]->TotTransferBytes()/tmelaps2 << " kb/s" << endl; pcheck[dma].Print(cout); } cout << " --------------------------------------------------------------------" << endl; usleep(250000); // Attente de traitement du dernier paquet memgr.Stop(); // Pour arreter les autres threads for (int i=0;i< (int)nbDma_ ;i++) delete[] tampon[i]; if ((fgredpaq)&&predtampon) delete[] predtampon; #ifdef DEBUGPAQHDR for(uint_4 fib=0; fib NULL" << endl; setRC(21); return; } tsmz = memgr.GetAuxData(mid)->filltime_; for(uint_4 fib=0; fib NULL" << endl; setRC(22); return; } for(uint_4 i=0; i " << fname << " Stat:" << pcsum << endl; cout << " Fib " << fib << " -> " << fname << " Stat:" << pcsum << endl; } if (savesig_) { // Ajout mots-cle additionnels a tous les fichiers FITS for(uint_4 fib=0; fib0) hassrc=true; bool fgredpsz = acpar.GetParams().fgreducpsize; for(uint_4 fib=0; fib