Changeset 3897 in Sophya for trunk/AddOn/TAcq/racqueth.cc


Ignore:
Timestamp:
Oct 4, 2010, 4:19:59 PM (15 years ago)
Author:
ansari
Message:

Amelioration des classes PCIEToEthernet et EthernetReader pour eviter des situations de blocage lorsque lecture avec ForceSameFrameCounter, Reza 04/10/2010

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/AddOn/TAcq/racqueth.cc

    r3883 r3897  
    2626//----------------------------------------------------------------------------------------------------------
    2727
     28// Si on veut avoir le protocole de controle (Hand-shake) entre l'emetteur et le recepteur,
     29// decommenter la ligne suivante - Reza , 4 octobre 2010
     30// #define  BR_DO_HANDSHAKE  1
    2831
    2932/* --Methode-- */
     
    4750  for(size_t i=0; i<vec_pciw_.size(); i++) {
    4851    vector<ClientSocket> vskt;
     52    vector<uint_8> verrcnt;
    4953    for(size_t j=0; j<destname_.size(); j++) {
    5054      ClientSocket sok(destname_[j], tcpportid_);
     
    5862        throw SocketException("PCIEToEthernet:ERROR/  Connection to EthernetReader not established ");
    5963      cout << " PCIEToEthernet: Ethernet connection established for DMA/fiber" << i << " with " << destname_[j] << endl;
    60       vskt.push_back(sok);   
     64      vskt.push_back(sok);
     65      verrcnt.push_back(0);
    6166    }
    6267    vvec_skt_.push_back(vskt);
     68    vvec_errorcnt_.push_back(verrcnt);
    6369    vec_cntpaq_.push_back(0);
    6470  }
    6571  totrdsnd_ = 0;
    66   SetPrintLevel();
     72  SetPrintLevel(par.prtlevel_,par.prtmodulo_);
    6773}
    6874
     
    7278  for(size_t i=0; i<vec_pciw_.size(); i++) {
    7379    vector<ClientSocket>& vskt = vvec_skt_[i];
    74     for(size_t j=0; j<destname_.size(); j++) vskt[j].Close();
     80    vector<uint_8>& verrcnt = vvec_errorcnt_[i];
     81    for(size_t j=0; j<destname_.size(); j++) {
     82      cout << " ~PCIEToEthernet() closing socket, fiber/pcieNum=" << i << " ethernet_destNum=" << j
     83           << " ErrorCount=" << verrcnt[j] << endl;
     84      vskt[j].Close();
     85    }
    7586  }
    7687}
     
    154165    // Byte* nextdma = locdata+((kmz%memgr.NbZones())*(paqsz*memgr.NbPaquets()));
    155166  uint_4 npaqfaitg = 0;
    156   uint_4 prtmod = par_.BlocPerFile()*par_.MMgrNbPaquet();
    157   if (prtmod < 500) prtmod=500;
    158167
    159168  while (npaqfaitg < nmaxpaq_) {  // Boucle global G
     
    179188    }
    180189    if (fgbaddma) continue; 
    181     if ((prtlev_>0)&&(npaqfaitg%prtmod==0))
     190    if ((prtlev_>0)&&(npaqfaitg%prtmodulo_==0))
    182191      cout << " PCIEToEthernet::run()/Info NPaqFait= " << npaqfaitg << endl;
    183192    if (fgdirectsend_) {  // Pas de copie / reduction de taille de paquet, on rebalance tel quel ...
     
    317326  //DBG    cout << " SendToTargets/DBG" << cnt_prt << " len=" << len << " data=" << hex << (unsigned long)data << dec << endl; cnt_prt++;
    318327  vector<ClientSocket>& vskt = vvec_skt_[fib];
     328  vector<uint_8>& verrcnt = vvec_errorcnt_[fib];
     329  char msg[BRTCPMSGLEN2];
    319330  size_t rc=0;
    320331  if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
    321     char msg[BRTCPMSGLEN2];
    322332    sprintf(msg,"PCIEToEthernet-SendCnt %d",vec_cntpaq_[fib]);
    323333    for(size_t j=0; j<vskt.size(); j++) {
    324334      vskt[j].SendAll(msg, BRTCPMSGLEN2);
    325335    }
    326     if (vec_cntpaq_[fib]>0) {
    327       for(size_t j=0; j<vskt.size(); j++) {
    328         vskt[j].ReceiveAll(msg, BRTCPMSGLEN2);
    329         msg[BRTCPMSGLEN2-1]='\0';
    330         if (strncmp(msg,"EthernetReader-RecvCnt",22) != 0)
    331           cout << " PCIEToEthernet::SendToTargets/ BAD HandShake message : " << msg << endl;
    332       }
    333     }
    334   }
     336  }
     337  //  Envoi des donnees (paquets)
    335338  for(size_t j=0; j<vskt.size(); j++) {
    336339    rc += vskt[j].SendAll((const char *)data, len);
     
    338341  }
    339342  vec_cntpaq_[fib]++;
     343#ifdef BR_DO_HANDSHAKE
     344  // attente message de hand-shake
     345  if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
     346    for(size_t j=0; j<vskt.size(); j++) {
     347      //DBG      cout << "SendToTargets/DEBUG-B attente -RecvCnt fib=" << fib << " npaq=" << vec_cntpaq_[fib] << endl;
     348      vskt[j].ReceiveAll(msg, BRTCPMSGLEN2);
     349      msg[BRTCPMSGLEN2-1]='\0';
     350      if (strncmp(msg,"EthernetReader-RecvCnt",22) != 0) {
     351        cout << " PCIEToEthernet::SendToTargets/ BAD HandShake, Fiber=" << fib
     352             << " NumSocket=" << j << " msg: " << msg << endl;
     353        verrcnt[j]++;
     354      }
     355    }   
     356  }
     357#endif
     358  if (vec_cntpaq_[fib]==nmaxpaq_) {  // fin de l'envoi pour cette fibre
     359    sprintf(msg,"PCIEToEthernet-Send-END %d",vec_cntpaq_[fib]);
     360    for(size_t j=0; j<vskt.size(); j++) {
     361      vskt[j].SendAll(msg, BRTCPMSGLEN2);
     362    }
     363   
     364  }
    340365  return rc;
    341366}
     
    345370
    346371/* --Methode-- */
    347 EthernetReader::EthernetReader(RAcqMemZoneMgr& mem, BRParList const& par, int portid, bool rdsamefc)
    348   :  memgr_(mem), par_(par), stop_(false), rdsamefc_(rdsamefc), tcpportid_(portid)
     372EthernetReader::EthernetReader(RAcqMemZoneMgr& mem, BRParList const& par, int portid)
     373  :  memgr_(mem), par_(par), stop_(false), tcpportid_(portid)
    349374{
    350375  totnbytesrd_ = 0;
    351376  totnpaqrd_ = 0;
    352377  totsamefc_ = 0;
     378  totnbresync_ = 0;
     379
    353380  if (memgr_.NbFibres() > MAXANAFIB)
    354381    throw BAORadioException("EthernetReader::EthernetReader/ NbFibres>MAXANAFIB ");
    355382  if (par_.ethr_nlink != memgr_.NbFibres())
    356383    throw BAORadioException("EthernetReader::EthernetReader/ NbFibres != ethr_nlink");
     384  SetPrintLevel(par.prtlevel_,par.prtmodulo_);
     385
     386  SetReadMode();  // definitition du mode de lecture des liens
    357387
    358388  packsize_=memgr_.PaqSize();
     
    369399    totnpqok_.push_back(0);   
    370400  }
    371   ServerSocket srv(tcpportid_, par_.ethr_nlink);
     401  srv_sokp_ = new ServerSocket(tcpportid_, par_.ethr_nlink);
    372402  char msg[BRTCPMSGLEN];
    373403  for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
    374     Socket sok=srv.WaitClientConnection();
     404    Socket sok=srv_sokp_->WaitClientConnection();
    375405    for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
    376406    sok.ReceiveAll(msg,BRTCPMSGLEN);
     
    379409    int ia,ib,ic,id,ie;
    380410    sscanf(msg+25,"%d %d %d %d %d",&ia,&ib,&ic,&id,&ie);
    381     if (((uint_4)ia!=par_.MMgrNbPaquet()) || ((uint_4)ib!=par_.MMgrPaquetSize())) {
     411    //    if (((uint_4)ia!=par_.MMgrNbPaquet()) || ((uint_4)ib!=par_.MMgrPaquetSize())) {
     412    if (((uint_4)ia!=memgr_.NbPaquets()) || ((uint_4)ib!=memgr_.PaqSize())) {
    382413      strcpy(msg,"BAORadio-EthernetReader-BAD MMgrNbPaquet/MMgrPaquetSize()");
    383414      sok.SendAll(msg,BRTCPMSGLEN);
     
    390421    cout << " EthernetReader/Info connection/link" << fib << " established." << endl;
    391422    vec_cntpaq_.push_back(0);
    392   }
    393 
    394   SetPrintLevel();
    395 }
    396 
     423    vec_fgsokend_.push_back(false);
     424  }
     425  gl_fgsokend = true;
     426  dummybuff_ = new char[packsize_];
     427}
     428
     429/* --Methode-- */
     430EthernetReader::~EthernetReader()
     431{
     432  if (prtlev_>0) cout << " ~EthernetReader() , TotNPaqRd=" << totnpaqrd_ << " TotNbReSync=" << totnbresync_ << endl;
     433  for(size_t j=0; j<vsok_.size(); j++) {
     434    if (prtlev_>1) 
     435      cout << " ~EthernetReader() closing sok[" << j << "] CntPaq=" << vec_cntpaq_[j]
     436           << " TotNPaqRd=" << totnpqrd_[j] << " TotNPaqOK=" << totnpqok_[j] << endl;
     437    vsok_[j].Close();
     438  }
     439  srv_sokp_->Close();
     440  delete srv_sokp_;
     441  if (dummybuff_) delete[] dummybuff_;
     442}
    397443
    398444/* --Methode-- */
     
    466512    usleep(50000);       // Attente de traitement du dernier paquet
    467513    memgr_.Stop();        // Arret
    468 
     514    CleanUpAllSockets();  // On lit tous les liens jusqu'a la reception du message END
     515    cout << " EthernetReader::run(): done CleanUpAllSockets()" << endl;
    469516  }  // Fin du bloc try
    470517  catch (std::exception& exc) {
     
    485532bool EthernetReader::ReadNextAllFibers()
    486533{
     534  if (SkipAndSyncLinks()>8)  return true;
     535 
    487536  for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++)  {
    488537    if (ReadNext(fib)) return true;  // probleme
     
    499548  //  On va essayer de lire jusqu'a avoir same_frame_counter
    500549  bool echec=true;
     550  int nbechec=0;
    501551  while (echec) {
     552    if (nbechec>sfc_maxresync_)  {
     553      if (prtlev_>0) 
     554        cout << " EthernetReader::ReadNextAllFibers()/ Stopping, reason NbEchec>(NbMax_Resync=" << sfc_maxresync_ << ")" << endl;
     555      return true;
     556    }
    502557    uint_8 cfc=curfc_[0];
    503558    bool fgsamefc=true;
     
    512567    }
    513568    else {  // else !fgsame
     569      if (nbechec>sfc_maxresync_)  return true;
     570      if (nbechec>0)   {
     571        int rcres=SkipAndSyncLinks();
     572        if (rcres>8)  return true;
     573        else if (rcres==1) {  // Il faut relire les fibres
     574          for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++)  {
     575            if (ReadNext(fib)) return true;  // probleme
     576          }
     577          fgsamefc=true;   cfc=curfc_[0];
     578          for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
     579            if (curfc_[fib]!=cfc) {
     580              fgsamefc=false;
     581              if (curfc_[fib] > cfc)  cfc=curfc_[fib];
     582            }
     583          }
     584          if (fgsamefc) {
     585            totsamefc_++;  echec=false; return false;  // c'est OK  , same framecounter
     586          }
     587        }  // fin de if (rcres==1)
     588      }
     589     
     590      if (prtlev_>5)
     591        cout << " EthernetReader::ReadNextAllFibers()/DEBUG NbEchec=" << nbechec 
     592             << " cfc=" << cfc << endl;
     593     
    514594      for(uint_4 fib=0; fib<memgr_.NbFibres(); fib++) {
    515         while (curfc_[fib]<cfc) {
    516           if (ReadNext(fib)) return true;  // probleme
     595        for(uint_4 ntryrd=0; ntryrd<sfc_maxdpc_; ntryrd++) {   // on tente un maximum de
     596          if (curfc_[fib]>=cfc) break;
     597          if (ReadNext(fib)) return true;  // probleme   
    517598        }
    518599      }
     600      nbechec++;
    519601    }   // fin de  else !fgsame
    520602  }  // fin de while(echec): lecture jusqu'a same_frame_counter
     
    529611  while(!fggood) {
    530612    ReceiveFromSocket(fib, (char *)vpaq_[fib].Begin(), packsize_);
     613    if (vec_fgsokend_[fib])   {   // fin de reception sur le lien
     614      if (prtlev_>0) 
     615        cout << " EthernetReader::ReadNext(" << fib << ")/ Stopping, reason EndReceive fgsokend_=" << vec_fgsokend_[fib] << endl;
     616      return true;
     617    }
    531618    totnbytesrd_+=packsize_;
    532619    totnpqrd_[fib]++;
     
    538625
    539626/* --Methode-- */
     627void EthernetReader::CleanUpAllSockets()
     628{
     629  bool fgallfinished=false;
     630  while (!fgallfinished) {
     631    fgallfinished=true;
     632    for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++)  {
     633      if (vec_fgsokend_[fib]>0) continue;
     634      ReceiveFromSocket(fib,dummybuff_,packsize_);
     635      fgallfinished=false;
     636    }
     637  }
     638  return;
     639}
     640
     641/* --Methode-- */
    540642size_t EthernetReader::ReceiveFromSocket(int fib, char* data, size_t len)
    541643{
    542644  size_t rc =0;
    543   if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
     645  if (vec_fgsokend_[fib]) { // la lecture est finie
     646    memset(data, 0, len);
     647    return 0;
     648  }
     649
     650  if (vec_cntpaq_[fib]%memgr_.NbPaquets() == 0) {
    544651    char msg[BRTCPMSGLEN2];
    545652    msg[0]='\0';
    546653    vsok_[fib].ReceiveAll(msg, BRTCPMSGLEN2);
    547     if (strncmp(msg,"PCIEToEthernet-SendCnt",22) != 0)
    548       cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake message : " << msg << endl;
    549   } 
     654    //    if (strncmp(msg,"PCIEToEthernet-SendCnt",22) != 0)
     655    //      cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake message : " << msg << endl;
     656    bool fgbadhsm=true;
     657    if (strncmp(msg,"PCIEToEthernet-Send",19)==0) {
     658      if (strncmp(msg+19,"Cnt",3)==0) fgbadhsm=false;
     659      else if (strncmp(msg+19,"-END",4)==0) {
     660        fgbadhsm=false;
     661        vec_fgsokend_[fib]=1;  gl_fgsokend=true;
     662        //DBG   cout << "ReceiveFromSocket/DEBUG -END msg received  " << endl;
     663        if (prtlev_>2) 
     664          cout << " EthernetReader::ReceiveFromSocket/ Receive on LinkNo" << fib << " Ended OK" << endl;
     665      }
     666    }
     667    if (fgbadhsm) {
     668      cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake, LinkNo " <<  fib << " msg: " << msg << endl;
     669      vec_fgsokend_[fib]=2;
     670    }
     671  }
     672
     673  if (vec_fgsokend_[fib]>0) { // la lecture est finie
     674    memset(data, 0, len);
     675    return 0;
     676  }
     677
    550678  rc += vsok_[fib].ReceiveAll(data, len);
    551679  vec_cntpaq_[fib]++;
    552680
    553   /*     
    554   size_t nblk = len/ethr_bsz_;
    555   size_t fblk = len%ethr_bsz_;
    556   size_t off = 0;
    557   if (nblk>0) {
    558     for(size_t i=0; i<nblk; i++)  {
    559       rc += vsok_[fib].ReceiveAll(data+off, ethr_bsz_);
    560       off += ethr_bsz_; 
    561     }
    562   }
    563   if (fblk>0) {
    564     rc += vsok_[fib].ReceiveAll(data+off, fblk);
    565   }
    566   vec_cntpaq_[fib]++;
    567   */
    568 
    569   if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
     681#ifdef BR_DO_HANDSHAKE
     682  // Envoi de message de hand-shake
     683  if (vec_cntpaq_[fib]%memgr_.NbPaquets() == 0) {
    570684    char msg[BRTCPMSGLEN2];
    571685    sprintf(msg,"EthernetReader-RecvCnt %d",vec_cntpaq_[fib]);
    572686    vsok_[fib].SendAll(msg, BRTCPMSGLEN2);
    573   } 
     687    //DBG    cout << " ReceiveFromSocket/DEBUG-B msg=" << msg << endl;
     688  }
     689#endif 
    574690  return rc;
     691}
     692
     693/* --Methode-- */
     694int EthernetReader::SkipAndSyncLinks()
     695{
     696  uint_8 minnumpaq=vec_cntpaq_[0];   // minimum des nombres de paquets lus sur les differents liens
     697  size_t minnp_fid=0;   // numero de fibre correspondant au minimum de paquets lus
     698  uint_8 maxnumpaq=vec_cntpaq_[0];   // maximum des nombres de paquets lus sur les differents liens
     699  size_t maxnp_fid=0;   // numero de fibre correspondant au maximum de paquets lus
     700  for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
     701    if (vec_cntpaq_[fib]<=minnumpaq) {
     702      minnumpaq=vec_cntpaq_[fib];    minnp_fid=fib;
     703    }
     704    if (vec_cntpaq_[fib]>=maxnumpaq) {
     705      maxnumpaq=vec_cntpaq_[fib];    maxnp_fid=fib;
     706    }
     707  }
     708  if (!rdsamefc_ && (minnumpaq!=maxnumpaq))
     709    cout << " EthernetReader::SkipAndSyncLinks()/BUG or PROBLEM - ReadAllOK and min<>maxnumpaq="
     710         << minnumpaq << "," << maxnumpaq << " !!!" << endl;
     711  if ((maxnumpaq-minnumpaq)<sfc_maxdpc_)  return 0;
     712
     713  if (prtlev_>4)
     714    cout << " EthernetReader::SkipAndSyncLinks() min,maxnumpaq=" << minnumpaq << "," << maxnumpaq
     715         << " min,maxnp_fid=" << minnp_fid << "," << maxnp_fid << " sfc_maxdpc_=" << sfc_maxdpc_ << endl;
     716
     717  for(size_t fib=0; fib<memgr_.NbFibres(); fib++)  {
     718    while(vec_cntpaq_[fib]<maxnumpaq) {
     719      if (vec_fgsokend_[fib]>0) return 9;
     720      ReceiveFromSocket(fib,dummybuff_,packsize_);
     721    }
     722  }
     723  totnbresync_++;
     724  if (prtlev_>3) {
     725    cout << " EthernetReader::SkipAndSyncLinks() NbResync=" << totnbresync_ << " After Sync: vec_cntpaq_[fib]=";
     726    for(size_t ii=0; ii<vec_cntpaq_.size(); ii++) cout << vec_cntpaq_[ii] << " , ";
     727    cout << endl;
     728  }
     729
     730  return 1;
    575731}
    576732
Note: See TracChangeset for help on using the changeset viewer.