//---------------------------------------------------------------- // ---- classes de threads pour lecture (transfert DMA) // et ecriture disque pour acquisition BAORadio ----- // LAL - R. Ansari - Juin/Juillet 2008 //---------------------------------------------------------------- #include "racqueth.h" #include #include #include #include #include "pexceptions.h" #include "ctimer.h" #include "timestamp.h" #include "pciewrap.h" #include "brpaqu.h" #include "resusage.h" // Pour mesure temps elapsed/CPU ... #include // pour gettimeofday //////////////////////////////////////////////////////////////////////////////////////////////////////// //---------------------------------------------------------------------------------------------------------- // Classe thread de lecture PCI-Express et recopie sur interface reseau (Ethernet) //---------------------------------------------------------------------------------------------------------- /* --Methode-- */ PCIEToEthernet::PCIEToEthernet(vector vec_pciw, vector& destname, BRParList const& par, int portid) : par_(par), vec_pciw_ (vec_pciw), destname_(destname), tcpportid_(portid) { nmaxpaq_ = par_.MaxNbPaquets(); swapall_ = par_.GetDataConvFg(); // select data swap/format conversion for BRPaquet stop_ = false; packSize_ = par_.RecvPaquetSize(); packSizeInMgr_=par_.MMgrPaquetSize(); sizeFr_=par_.DMASizeBytes(); if (vec_pciw.size() > MAXNBFIB) throw ParmError("PCIEToEthernet:ERROR/ vec_pciw.size() > MAXNBFIB "); nbDma_= vec_pciw.size(); char msg[BRTCPMSGLEN]; for(size_t i=0; i vskt; for(size_t j=0; j& vskt = vvec_skt_[i]; for(size_t j=0; jTransferSize(); //DEL vec_pciw_[0]->StartTransfers(); BRPaqChecker pcheck[MAXNBFIB]; // Verification/comptage des paquets Byte* Datas[MAXNBFIB]; Byte* tampon[MAXNBFIB] ; Byte* nexpaq[MAXNBFIB] ; // Pour recevevoir les paquets apres reduction de taille Byte* predtampon=NULL; // tampon de recopie pour la reduction des tailles de paquets Byte* nextpaq=NULL; uint_4 off_acheval=0; int nerrdma = 0; int maxerrdma = 10; bool fgarret = false; // Initialisation des tampons pour recopie des paquets a cheval pour chaque DMA for (int i=0;i< (int)nbDma_ ;i++) { tampon[i] = new Byte[paqsz]; nexpaq[i] = new Byte[paqszmm]; } bool fgdirectsend = (par_.pci2eth_fgdirect && (swapall_==BR_Copy) ) ? true : false; // true -> direct transfer of data to ethernet if (fgdirectsend) cout << " PCIEToEthernet::run() Direct transfer mode DMA to Ethernet ... " << endl; bool fgredpaq=par_.fgreducpsize; if (fgredpaq) { cout << " PCIEToEthernet::run() - PaquetSizeReduction - RedSize=" << par_.redpqsize << " Offset=" << par_.reducoffset << " " << ((par_.reducneedcopy)?"NeedCopy":"NOCopy") << " " << BRPaquet::ReducActionToString(par_.pqreducmode) << endl; predtampon = new Byte[paqsz]; } uint_8 trnbytes[MAXNBFIB]; uint_4 npaqfait[MAXNBFIB]; for (int i=0;i< (int)nbDma_ ;i++) { npaqfait[i]=0; trnbytes[i]=0; } // Attente du message GO des taches lecteurs cout << " PCIEToEthernet::run() Waiting for GO message from EthernetReader's ..." << endl; char msg[BRTCPMSGLEN]; for(size_t i=0; i& vskt=vvec_skt_[i]; for(size_t j=0; jPrintStatus(cout); if (nerrdma>=maxerrdma) { fgarret = true; break; } } } if (fgbaddma) continue; if (fgdirectsend) { // Pas de copie / reduction de taille de paquet, on rebalance tel quel ... for (int fib=0; fib<(int) nbDma_ ;fib++) { vector& vskt = vvec_skt_[fib]; for(size_t j=0; j 0) { // IF Numero B if ((paqsz-off_acheval)< dmasz) { // IF Numero A for(uint_4 fib=0; fib& vskt = vvec_skt_[fib]; for(size_t j=0; j= nmax_* memgr.NbPaquets())) break; // CHECK S'il faut faire une reduction de taille de paquet for(uint_4 fib=0; fib& vskt = vvec_skt_[fib]; for(size_t j=0; j" << (double)vec_pciw_[dma]->TotTransferBytes()/tmelaps2 << " kb/s" << endl; vector& vskt = vvec_skt_[dma]; cout << " TotEthTransfer="; for(size_t j=0; j MAXANAFIB) throw BAORadioException("EthernetReader::EthernetReader/ NbFibres>MAXANAFIB "); if (par_.ethr_nlink != memgr_.NbFibres()) throw BAORadioException("EthernetReader::EthernetReader/ NbFibres != ethr_nlink"); packsize_=memgr_.PaqSize(); mid_=-2; mmbuf_=NULL; max_targ_npaq = memgr_.NbPaquets(); for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) mmbufib_[fib]=NULL; for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { vpaq_.push_back(BRPaquet(NULL,memgr_.PaqSize())); vpchk_.push_back(BRPaqChecker(true,0)); curfc_.push_back(0); totnpqrd_.push_back(0); totnpqok_.push_back(0); } ServerSocket srv(tcpportid_, par_.ethr_nlink); char msg[BRTCPMSGLEN]; for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { Socket sok=srv.WaitClientConnection(); for(int ii=0; iiSTOP 9" << endl; setRC(7); fgok=false; break; } for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { nextpaq=GetPaquetTarget(fib); if (nextpaq == NULL) { // Cela ne devrait pas arriver cout << "EthernetReader::run()/Error-A2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl; setRC(9); fgok=false; break; } vpaq_[fib].Set(nextpaq); } if (ReadNextAllFibers()) { fgok=false; break; } totnpaqrd_++; } MoveToNextTarget(); // Pour faire traiter le dernier paquet si plein MZoneManage(true); // Nettoyage final usleep(50000); // Attente de traitement du dernier paquet memgr_.Stop(); // Arret cout << " ------------------ EthernetReader::run() END ----------------- " << endl; ts.SetNow(); tm.SplitQ(); cout << " END reading TotNPaq/Link=" << totnpaqrd_ << " :" << ts ; if (rdsamefc_) cout << " NSameFC=" << totsamefc_ << endl; else cout << endl; for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { int perc=0; if (totnpqrd_[fib]>0) perc=100*totsamefc_/totnpqrd_[fib]; cout << " Fiber" << fib << " TotNPaqRd=" << totnpqrd_[fib] << " TotNPaqOK=" << totnpqok_[fib] << " FracSameFC=" << perc << " %" << endl; if (prtlev_ > 0) vpchk_[fib].Print(cout); } cout << " TotalEthernetRead= " << totnbytesrd_/(1024*1024) << " MBytes Ethernet-Read rate= " << (double)(totnbytesrd_)/1024./tm.PartialElapsedTimems() << " MB/s" << endl; cout << " EthernetReader::run()/Timing: \n"; tm.Print(); cout << " ---------------------------------------------------------- " << endl; } // Fin du bloc try catch (std::exception& exc) { cout << " EthernetReader::run()/catched execption msg= " << exc.what() << endl; setRC(3); return; } catch(...) { cout << " EthernetReader::run()/catched unknown ... exception " << endl; setRC(4); return; } setRC(0); return; } /* --Methode-- */ bool EthernetReader::ReadNextAllFibers() { for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { if (ReadNext(fib)) return true; // probleme } if (!rdsamefc_ || (memgr_.NbFibres()<2)) { totsamefc_++; return false; // c'est OK } uint_8 cfc=curfc_[0]; bool fgsamefc=true; for(size_t fib=1; fib cfc) cfc=curfc_[fib]; } } if (fgsamefc) { totsamefc_++; return false; // c'est OK , same framecounter } else { // else !fgsame for(uint_4 fib=0; fib