//---------------------------------------------------------------- // ---- classes de threads pour lecture (transfert DMA) // et ecriture disque pour acquisition BAORadio ----- // LAL - R. Ansari - Juin/Juillet 2008 //---------------------------------------------------------------- #include "racqueth.h" #include #include #include #include #include "pexceptions.h" #include "ctimer.h" #include "timestamp.h" #include "pciewrap.h" #include "brpaqu.h" #include "resusage.h" // Pour mesure temps elapsed/CPU ... #include // pour gettimeofday //////////////////////////////////////////////////////////////////////////////////////////////////////// //---------------------------------------------------------------------------------------------------------- // Classe thread de lecture PCI-Express et recopie sur interface reseau (Ethernet) //---------------------------------------------------------------------------------------------------------- // Si on veut avoir le protocole de controle (Hand-shake) entre l'emetteur et le recepteur, // decommenter la ligne suivante - Reza , 4 octobre 2010 // #define BR_DO_HANDSHAKE 1 /* --Methode-- */ PCIEToEthernet::PCIEToEthernet(vector vec_pciw, vector& destname, BRParList const& par, int portid) : par_(par), vec_pciw_ (vec_pciw), tcpportid_(portid) { if (vec_pciw_.size() > MAXNBFIB) throw ParmError("PCIEToEthernet:ERROR/ vec_pciw.size() > MAXNBFIB "); for(size_t i=0; i vec_pciw, vector< vector >& destname, BRParList const& par, int portid) : par_(par), vec_pciw_ (vec_pciw), tcpportid_(portid) { if (vec_pciw_.size() > MAXNBFIB) throw ParmError("PCIEToEthernet:ERROR/ vec_pciw.size() > MAXNBFIB "); if (destname.size()!=vec_pciw.size()) throw ParmError("PCIEToEthernet:ERROR/ vec_pciw.size() != destname_.size() "); destname_= destname; InitConnections(); } /* --Methode-- */ void PCIEToEthernet::InitConnections() { nmaxpaq_ = par_.MaxNbPaquets(); swapall_ = par_.GetDataConvFg(); // select data swap/format conversion for BRPaquet stop_ = false; packSize_ = par_.RecvPaquetSize(); packSizeInMgr_=par_.MMgrPaquetSize(); sizeFr_=par_.DMASizeBytes(); nbDma_= vec_pciw_.size(); // true -> direct transfer of data to ethernet fgdirectsend_ = (par_.pci2eth_fgdirect && (swapall_==BR_Copy) ) ? true : false; char msg[BRTCPMSGLEN]; cout << "PCIEToEthernet/Info: Establishing TCP/IP connections ... " << endl; for(size_t i=0; i vskt; vector verrcnt; vector& sdestnm=destname_[i]; for(size_t j=0; jTransferSize() : 0; sprintf(msg,"BAORadio-PCIEToEthernet %d %d %d %d %d", par_.MMgrNbPaquet(), par_.MMgrPaquetSize(), vec_pciw_[i]->FiberId(), dmasz, i); // 123456789012345678901234567890 sok.SendAll(msg,BRTCPMSGLEN); sok.ReceiveAll(msg,BRTCPMSGLEN); if (strncmp(msg,"BAORadio-EthernetReader-OK",26)!=0) { msg[BRTCPMSGLEN-1]='\0'; cout << "PCIEToEthernet:ERROR/ bad Init_Acknowledge message from EthernetReader : \n " << msg << endl; usleep(10000); throw SocketException("PCIEToEthernet:ERROR/ Connection to EthernetReader not established "); } cout << " PCIEToEthernet: Ethernet connection established for DMA/fiber" << i << " with " << sdestnm[j] << endl; vskt.push_back(sok); verrcnt.push_back(0); } vvec_skt_.push_back(vskt); vvec_errorcnt_.push_back(verrcnt); vec_cntpaq_.push_back(0); vfgmsgfin_.push_back(false); } totrdsnd_ = 0; stopreason_="??Unknown??"; SetPrintLevel(par_.prtlevel_,par_.prtmodulo_); } /* --Methode-- */ PCIEToEthernet::~PCIEToEthernet() { for(size_t i=0; i& vskt = vvec_skt_[i]; vector& verrcnt = vvec_errorcnt_[i]; for(size_t j=0; jTransferSize(); //DEL vec_pciw_[0]->StartTransfers(); BRPaqChecker pcheck[MAXNBFIB]; // Verification/comptage des paquets Byte* Datas[MAXNBFIB]; Byte* tampon[MAXNBFIB] ; Byte* nexpaq[MAXNBFIB] ; // Pour recevevoir les paquets apres reduction de taille Byte* predtampon=NULL; // tampon de recopie pour la reduction des tailles de paquets Byte* nextpaq=NULL; uint_4 off_acheval=0; bool fgarret = false; // Initialisation des tampons pour recopie des paquets a cheval pour chaque DMA for (int i=0;i< (int)nbDma_ ;i++) { // cout << " DEBUG*AB paqsz=" << paqsz << " paqszmm=" << paqszmm << endl; tampon[i]=nexpaq[i]=NULL; tampon[i] = new Byte[paqsz]; nexpaq[i] = new Byte[paqszmm]; } if (fgdirectsend_) cout << " PCIEToEthernet::run() Direct transfer mode DMA to Ethernet ... " << endl; bool fgredpaq=par_.fgreducpsize; if (fgredpaq) { cout << " PCIEToEthernet::run() - PaquetSizeReduction - RedSize=" << par_.redpqsize << " Offset=" << par_.reducoffset << " " << ((par_.reducneedcopy)?"NeedCopy":"NOCopy") << " " << BRPaquet::ReducActionToString(par_.pqreducmode) << endl; predtampon = new Byte[paqsz]; } uint_8 trnbytes[MAXNBFIB]; uint_4 npaqfait[MAXNBFIB]; for (int i=0;i< (int)nbDma_ ;i++) { npaqfait[i]=0; trnbytes[i]=0; } // Attente du message GO des taches lecteurs cout << " PCIEToEthernet::run() Waiting for GO message from EthernetReader's ..." << endl; char msg[BRTCPMSGLEN]; for(size_t i=0; i& vskt=vvec_skt_[i]; for(size_t j=0; jPrintStatus(cout); stopreason_="--Failed DMA--"; fgarret = true; break; } } if (fgbaddma) continue; if (fgdirectsend_) { // Pas de copie / reduction de taille de paquet, on rebalance tel quel ... for (int fib=0; fib<(int) nbDma_ ;fib++) { //DBG cout << " DEBUG*B fib=" << fib << " Datas[fib]=" << hex << Datas[fib] << dec << endl; SendToTargets(fib, Datas[fib], dmasz); trnbytes[fib] += (uint_8)dmasz; npaqfait[fib] += (trnbytes[fib]/(uint_8)paqszmm); if (fib==nbDma_-1) npaqfaitg += (trnbytes[fib]/(uint_8)paqszmm); trnbytes[fib] = trnbytes[fib]%(uint_8)paqszmm; } continue; // On bypasse le reste } // Si on arrive ici, c'est qu'il faut copier / reduire les paquets .... uint_4 curoff=0; //1- On traite le paquet a cheval, rempli partiellement avec le DMA d'avant si necessaire pour les n fibres if (off_acheval > 0) { // IF Numero B if ((paqsz-off_acheval)< dmasz) { // IF Numero A for(uint_4 fib=0; fib= nmax_* memgr.NbPaquets())) break; // CHECK S'il faut faire une reduction de taille de paquet for(uint_4 fib=0; fib= nmaxpaq_) stopreason_="--Max Nb paquets reached--"; // ----- Nettoyage final et impression de resume ------- CleanUpEndSendAllLinks(); setRC(0); gettimeofday(&tv2, NULL); double tmelaps2 = (tv2.tv_sec-tv1.tv_sec)*1000.+(tv2.tv_usec-tv1.tv_usec)/1000.; if (tmelaps2<0.1) tmelaps2=0.1; cout << " ---------- PCIEToEthernet::run()-End StopReason: " << stopreason_ << endl << " Summary NPaqFait=" << npaqfaitg << " TotSend (kb)=" << totrdsnd_/1024 << "------ " << endl; for (int dma=0; dma < (int)nbDma_ ;dma++) { cout << " --Fib=" << dma << " NPaqFait=" << npaqfait[dma] << " TotDMATransfer=" << vec_pciw_[dma]->TotTransferBytes()/1024 << " ElapsTime=" << tmelaps2 << " ms ->" << (double)vec_pciw_[dma]->TotTransferBytes()/tmelaps2 << " kb/s" << endl; vector& vskt = vvec_skt_[dma]; cout << " TotEthTransfer="; for(size_t j=0; j MAXANAFIB) throw BAORadioException("EthernetReader::EthernetReader/ NbFibres>MAXANAFIB "); if (par_.ethr_nlink != memgr_.NbFibres()) throw BAORadioException("EthernetReader::EthernetReader/ NbFibres != ethr_nlink"); SetPrintLevel(par.prtlevel_,par.prtmodulo_); SetReadMode(); // definitition du mode de lecture des liens WaitENDMsg4Terminate(); packsize_=memgr_.PaqSize(); mid_=-2; mmbuf_=NULL; max_targ_npaq = memgr_.NbPaquets(); for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) mmbufib_[fib]=NULL; for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { vpaq_.push_back(BRPaquet(NULL,memgr_.PaqSize())); vpchk_.push_back(BRPaqChecker(true,0)); curfc_.push_back(0); totnpqrd_.push_back(0); totnpqok_.push_back(0); } srv_sokp_ = new ServerSocket(tcpportid_, par_.ethr_nlink); char msg[BRTCPMSGLEN]; for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { Socket sok=srv_sokp_->WaitClientConnection(); for(int ii=0; ii throw SocketException" << endl; throw SocketException("EthernetReader:ERROR/ Bad message from PCIEToEthernet client !"); } int ia,ib,ic,id,ie; sscanf(msg+25,"%d %d %d %d %d",&ia,&ib,&ic,&id,&ie); // if (((uint_4)ia!=par_.MMgrNbPaquet()) || ((uint_4)ib!=par_.MMgrPaquetSize())) { if (((uint_4)ia!=memgr_.NbPaquets()) || ((uint_4)ib!=memgr_.PaqSize())) { strcpy(msg,"BAORadio-EthernetReader-BAD MMgrNbPaquet/MMgrPaquetSize()"); sok.SendAll(msg,BRTCPMSGLEN); cout << " EthernetReader:ERROR, Bad client.MMgrNbPaquet/PaquetSize NbPaq=" << ia << " PaqSz=" << ib << " Reader_PaqSz=" << memgr_.PaqSize() << endl; usleep(10000); throw SocketException("EthernetReader:ERROR/ Bad MMgrNbPaquet/MMgrPaquetSize() from PCIEToEthernet client !"); } for(int ii=0; iivfibid_[ii+1]) { tmpskt=vsok_[ii]; tmpid=vfibid_[ii]; vsok_[ii]=vsok_[ii+1]; vfibid_[ii]=vfibid_[ii+1]; vsok_[ii+1]=tmpskt; vfibid_[ii+1]=tmpid; encore=true; } } } } cout << " EthernetReader/Info: Setting FiberIds in RAcqMemZoneMgr \n ... FiberIds= " ; for(size_t ii=0; ii0) cout << " ~EthernetReader() , TotNPaqRd=" << totnpaqrd_ << " TotNbReSync=" << totnbresync_ << endl; for(size_t j=0; j1) cout << " ~EthernetReader() closing sok[" << j << "] CntPaq=" << vec_cntpaq_[j] << " TotNPaqRd=" << totnpqrd_[j] << " TotNPaqOK=" << totnpqok_[j] << endl; vsok_[j].Close(); } srv_sokp_->Close(); delete srv_sokp_; if (dummybuff_) delete[] dummybuff_; } /* --Methode-- */ void EthernetReader::run() { setRC(1); try { TimeStamp ts; Timer tm("EthernetReader", false); cout << " EthernetReader::run() - Starting " << ts << " NbFibres()=" << memgr_.NbFibres() << " PaqSize() = " << memgr_.PaqSize() << endl; cout << " ...ReadMode: " << ((rdsamefc_)?"Paquets With SameFrameCounter":"All OK paquets") << endl; cout << " Sending GO message to all PCIEToEthernet clients ... " << endl; char msg[BRTCPMSGLEN]; for(int ii=0; ii0) cout << ">S>S>S> EthernetReader::run() / Stopping, through Stop() method" << endl; stopreason_="EthernetReader::run() / Stopping, through Stop() method"; break; } if ( MoveToNextTarget() ) { cout << "EthernetReader::run()/Error-A- MoveToNextTarget() returned true ->STOP 9" << endl; setRC(7); fgok=false; break; } for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { nextpaq=GetPaquetTarget(fib); if (nextpaq == NULL) { // Cela ne devrait pas arriver cout << "EthernetReader::run()/Error-A2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl; setRC(9); fgok=false; break; } vpaq_[fib].Set(nextpaq); } if (ReadNextAllFibers()) { fgok=false; break; } totnpaqrd_++; if ((prtlev_>0)&&(totnpaqrd_%prtmodulo_==0)) { if (prtlev_>1) { ts.SetNow(); cout << ts; } cout << "EthernetReader: NbPaq/Link=" << totnpaqrd_ << " NSameFC=" << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0] << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl; } if (totnpaqrd_>=par_.MaxNbPaquets()) { if (prtlev_>0) cout << ">S>S>S> EthernetReader::run() / Stopping, totnpaqrd_>=par_.MaxNbPaquets()" << endl; stopreason_="EthernetReader::run() / Stopping, totnpaqrd_>=par_.MaxNbPaquets()"; } } MoveToNextTarget(); // Pour faire traiter le dernier paquet si plein MZoneManage(true); // Nettoyage final cout << " ---------------------------------------------------------------------------- " << endl; cout << "--- EthernetReader::run() END, StopReason: " << stopreason_ << " ---" << endl; ts.SetNow(); tm.SplitQ(); cout << " END reading TotNPaq/Link=" << totnpaqrd_ << " :" << ts << endl; cout << "... NbPaq/Link=" << totnpaqrd_ << " NSameFC=" << totsamefc_ << " / NPaqLink[0]_Read=" << totnpqrd_[0] << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl; for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { int perc=0; if (totnpqrd_[fib]>0) perc=100*totsamefc_/totnpqrd_[fib]; cout << " Fiber" << fib << " TotNPaqRd=" << totnpqrd_[fib] << " TotNPaqOK=" << totnpqok_[fib] << " FracSameFC=" << perc << " %" << endl; if (prtlev_ > 0) vpchk_[fib].Print(cout); } cout << " TotalEthernetRead= " << totnbytesrd_/(1024*1024) << " MBytes Ethernet-Read rate= " << (double)(totnbytesrd_)/1024./tm.PartialElapsedTimems() << " MB/s" << endl; cout << " EthernetReader::run()/Timing: \n"; tm.Print(); cout << " ---------------------------------------------------------------------------- " << endl; usleep(50000); // Attente de traitement du dernier paquet memgr_.Stop(); // Arret if (waitendmsg_) { CleanUpAllSockets(); // On lit tous les liens jusqu'a la reception du message END cout << " EthernetReader::run(): done CleanUpAllSockets()" << endl; } } // Fin du bloc try catch (std::exception& exc) { cout << " EthernetReader::run()/catched execption msg= " << exc.what() << endl; setRC(3); return; } catch(...) { cout << " EthernetReader::run()/catched unknown ... exception " << endl; setRC(4); return; } setRC(0); return; } /* --Methode-- */ bool EthernetReader::ReadNextAllFibers() { if (sfc_maxresync_>0) return ReadNextAllFibersWithSync(); for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { if (ReadNext(fib)) return true; // probleme } if (!rdsamefc_ || (memgr_.NbFibres()<2)) { uint_8 cfc=curfc_[0]; bool fgsamefc=true; for(size_t fib=1; fib cfc) cfc=curfc_[fib]; } } if (fgsamefc) { totsamefc_++; return false; // c'est OK , same framecounter } else { // else !fgsame bool encorerd=true; uint_4 ntrd=0; while(encorerd) { encorerd=false; for(uint_4 fib=0; fib=cfc) continue; if (ReadNext(fib)) return true; // probleme } ntrd++; bool fgsamefc2=true; cfc=curfc_[0]; for(size_t fib=1; fibcfc) cfc=curfc_[fib]; } } if (fgsamefc2) { totsamefc_++; return false; // c'est OK , same framecounter } encorerd=true; if ((sfc_maxdpc_>0)&&(ntrd>sfc_maxdpc_)) { if (prtlev_>0) cout << ">S>S>S> EthernetReader::ReadNextAllFibers()/ Stopping, Nb_ReadNext=" << ntrd << " > sfc_maxdpc_" << endl; stopreason_="ReadNextAllFibers() StopReason: failed to get SameFC Nb_ReadNext>sfc_maxdpc_"; return true; } } // Fin de while encorerd } // fin de else !fgsame return true; // probleme } /* --Methode-- */ bool EthernetReader::ReadNext(int fib) { bool fggood=false; while(!fggood) { ReceiveFromSocket(fib, (char *)vpaq_[fib].Begin(), packsize_); if (vec_fgsokend_[fib]) { // fin de reception sur le lien if (prtlev_>0) cout << ">S>S>S> EthernetReader::ReadNext(" << fib << ")/ Stopping, reason EndReceive fgsokend_=" << vec_fgsokend_[fib] << endl; stopreason_="ReadNext() StopReason: EndReceive signaled on link"; return true; } totnbytesrd_+=packsize_; totnpqrd_[fib]++; fggood = vpchk_[fib].Check(vpaq_[fib],curfc_[fib]); } totnpqok_[fib]++; return false; } /* --Methode-- */ void EthernetReader::CleanUpAllSockets() { bool fgallfinished=false; while (!fgallfinished) { fgallfinished=true; for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { if (vec_fgsokend_[fib]>0) continue; ReceiveFromSocket(fib,dummybuff_,packsize_); fgallfinished=false; } } return; } /* --Methode-- */ size_t EthernetReader::ReceiveFromSocket(int fib, char* data, size_t len) { size_t rc =0; if (vec_fgsokend_[fib]) { // la lecture est finie memset(data, 0, len); return 0; } if (vec_cntpaq_[fib]%memgr_.NbPaquets() == 0) { char msg[BRTCPMSGLEN2]; msg[0]='\0'; vsok_[fib].ReceiveAll(msg, BRTCPMSGLEN2); // if (strncmp(msg,"PCIEToEthernet-SendCnt",22) != 0) // cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake message : " << msg << endl; bool fgbadhsm=true; if (strncmp(msg,"PCIEToEthernet-Send",19)==0) { if (strncmp(msg+19,"Cnt",3)==0) fgbadhsm=false; else if (strncmp(msg+19,"-END",4)==0) { fgbadhsm=false; vec_fgsokend_[fib]=1; gl_fgsokend=true; //DBG cout << "ReceiveFromSocket/DEBUG -END msg received " << endl; if (prtlev_>2) cout << " EthernetReader::ReceiveFromSocket/ Receive on LinkNo" << fib << " Ended OK" << endl; } } if (fgbadhsm) { cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake, LinkNo " << fib << " msg: " << msg << endl; vec_fgsokend_[fib]=2; } } if (vec_fgsokend_[fib]>0) { // la lecture est finie memset(data, 0, len); return 0; } rc += vsok_[fib].ReceiveAll(data, len); vec_cntpaq_[fib]++; #ifdef BR_DO_HANDSHAKE // Envoi de message de hand-shake if (vec_cntpaq_[fib]%memgr_.NbPaquets() == 0) { char msg[BRTCPMSGLEN2]; sprintf(msg,"EthernetReader-RecvCnt %d",vec_cntpaq_[fib]); vsok_[fib].SendAll(msg, BRTCPMSGLEN2); //DBG cout << " ReceiveFromSocket/DEBUG-B msg=" << msg << endl; } #endif return rc; } /* --Methode-- */ bool EthernetReader::ReadNextAllFibersWithSync() { if (rdsamefc_&&(sfc_maxresync_>0)) { if (SkipAndSyncLinks()>8) return true; } for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { if (ReadNext(fib)) return true; // probleme } if (!rdsamefc_ || (memgr_.NbFibres()<2)) { uint_8 cfc=curfc_[0]; bool fgsamefc=true; for(size_t fib=1; fibsfc_maxresync_) { if (prtlev_>0) cout << ">S>S>S> EthernetReader::ReadNextAllFibersWithSync()/ Stopping, reason NbEchec>(NbMax_Resync=" << sfc_maxresync_ << ")" << endl; stopreason_="ReadNextAllFibersWithSync() , StopReason: NbEchec>NbMax_Resyn"; return true; } uint_8 cfc=curfc_[0]; bool fgsamefc=true; for(size_t fib=1; fib cfc) cfc=curfc_[fib]; } } if (fgsamefc) { totsamefc_++; echec=false; return false; // c'est OK , same framecounter } else { // else !fgsame if (nbechec>0) { int rcres=SkipAndSyncLinks(); if (rcres>8) return true; else if (rcres==1) { // Il faut relire les fibres for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { if (ReadNext(fib)) return true; // probleme } fgsamefc=true; cfc=curfc_[0]; for(size_t fib=1; fib cfc) cfc=curfc_[fib]; } } if (fgsamefc) { totsamefc_++; echec=false; return false; // c'est OK , same framecounter } } // fin de if (rcres==1) } if (prtlev_>5) cout << " EthernetReader::ReadNextAllFibersWithSync()/DEBUG NbEchec=" << nbechec << " cfc=" << cfc << endl; for(uint_4 fib=0; fib=cfc) break; if (ReadNext(fib)) return true; // probleme } } nbechec++; } // fin de else !fgsame } // fin de while(echec): lecture jusqu'a same_frame_counter return true; // probleme } /* --Methode-- */ int EthernetReader::SkipAndSyncLinks() { uint_8 minnumpaq=vec_cntpaq_[0]; // minimum des nombres de paquets lus sur les differents liens size_t minnp_fid=0; // numero de fibre correspondant au minimum de paquets lus uint_8 maxnumpaq=vec_cntpaq_[0]; // maximum des nombres de paquets lus sur les differents liens size_t maxnp_fid=0; // numero de fibre correspondant au maximum de paquets lus for(size_t fib=1; fib=maxnumpaq) { maxnumpaq=vec_cntpaq_[fib]; maxnp_fid=fib; } } if (!rdsamefc_ && (minnumpaq!=maxnumpaq)) cout << " EthernetReader::SkipAndSyncLinks()/BUG or PROBLEM - ReadAllOK and min<>maxnumpaq=" << minnumpaq << "," << maxnumpaq << " !!!" << endl; if ((maxnumpaq-minnumpaq)4) cout << " EthernetReader::SkipAndSyncLinks() min,maxnumpaq=" << minnumpaq << "," << maxnumpaq << " min,maxnp_fid=" << minnp_fid << "," << maxnp_fid << " sfc_maxdpc_=" << sfc_maxdpc_ << endl; for(size_t fib=0; fib0) return 9; ReceiveFromSocket(fib,dummybuff_,packsize_); } } totnbresync_++; if (prtlev_>3) { cout << " EthernetReader::SkipAndSyncLinks() NbResync=" << totnbresync_ << " After Sync: vec_cntpaq_[fib]="; for(size_t ii=0; ii