Changeset 3897 in Sophya for trunk/AddOn


Ignore:
Timestamp:
Oct 4, 2010, 4:19:59 PM (15 years ago)
Author:
ansari
Message:

Amelioration des classes PCIEToEthernet et EthernetReader pour eviter des situations de blocage lorsque lecture avec ForceSameFrameCounter, Reza 04/10/2010

Location:
trunk/AddOn/TAcq
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • trunk/AddOn/TAcq/acqparam_exemple.d

    r3883 r3897  
    5353#  En mode reception de paquets sur ethernet,  On peut forcer la lecture
    5454#   de paquets avec meme compteur de paquet (Force SameFrameCounter )
    55 #  @ethrforcesamefc
    56 ethrforcesamefc
     55#  @ethrforcesamefc  MaxDiffNumPaq   MaxNbResync
     56#  Le parametre MaxDiffNumPaq (50 par defaut) est la difference maximum entre nombre de
     57#  paquets lus sur les differents liens et MaxNbResync (def=50) est le nombre maximum de tentatives
     58#  de resynchronisation succesive
     59ethrforcesamefc 50 50
    5760## No de port TCP/IP   
    5861#  @tcpportid 6912
  • trunk/AddOn/TAcq/brparam.cc

    r3883 r3897  
    135135  tcpportid=BRSPORTID;
    136136  pci2eth_fgdirect=false;
    137   ethr_nlink = 0;
    138   ethr_forcesamefc_ = false;
     137  ethr_nlink=0;
     138  ethr_forcesamefc_=false;
     139  ethr_sfc_maxdpc_=50;
     140  ethr_sfc_maxresync_=50;
    139141
    140142  skysource="";
     
    208210  ethr_nlink=p.ethr_nlink;
    209211  ethr_forcesamefc_=p.ethr_forcesamefc_;
     212  ethr_sfc_maxdpc_=p.ethr_sfc_maxdpc_;
     213  ethr_sfc_maxresync_=p.ethr_sfc_maxresync_;
    210214
    211215  skysource=p.skysource;
     
    254258  }
    255259  ethr_nlink=conf.IParam("ethrnlink",0,0);   // Nombre de sources de paquets en reception ethernet
    256   if (conf.HasKey("ethrforcesamefc"))   ethr_forcesamefc_=true;   // force SAME FrameCounter   on all links
     260  if (conf.HasKey("ethrforcesamefc"))  {
     261    ethr_forcesamefc_=true;   // force SAME FrameCounter   on all links
     262    ethr_sfc_maxdpc_=conf.IParam("ethrforcesamefc",0,50);
     263    ethr_sfc_maxresync_=conf.IParam("ethrforcesamefc",1,50);
     264  }
    257265
    258266  //  Parametre de controle du monitoring
     
    387395    cout << endl;
    388396  }
    389   cout << " TCP-PortId=" << tcpportid << " EthernetRead NbSources (=NbLinks)= " << ethr_nlink
    390        << ((ethr_forcesamefc_)?" ForceSameFrameCounter read mode":" AllOKPaquets read mode ") << endl;
     397  cout << " TCP-PortId=" << tcpportid << " EthernetRead NbSources (=NbLinks)= " << ethr_nlink << endl;
     398  if (ethr_forcesamefc_)
     399    cout << "EthernetReader mode: ForceSameFrameCounter read mode with Max_PaquetCounterDiff= "
     400         << ethr_sfc_maxdpc_ << " MaxNbResync=" << ethr_sfc_maxresync_ << endl;
     401  else
     402    cout << "EthernetReader mode: AllOKPaquets read mode " << endl;
    391403
    392404  if (fgdoProc>0)
  • trunk/AddOn/TAcq/brparam.h

    r3883 r3897  
    166166  int ethr_nlink;    // Nombre total de source d'envoi (= nb total de fibre de fibre)
    167167  bool ethr_forcesamefc_;   // true -> on force receptions de paquets avec SAME FrameCounter sur tous les liens
    168 
     168  uint_4 ethr_sfc_maxdpc_;   // valeur maximum de difference tolere entre compteurs de paquets de differentes fibres
     169  uint_4 ethr_sfc_maxresync_;   // Nombre maximum de tentative de resynchronisation avant echec
     170 
    169171  // Identification source observee dans le ciel
    170172  string skysource;
  • trunk/AddOn/TAcq/mfacq.cc

    r3884 r3897  
    300300  cout <<"mfacq[2] Creating PCIEToEthernet  thread object " << endl;
    301301  PCIEToEthernet pci2eth(vec_pciw, bpar.GetParams().GetEthTargets(), bpar.GetParams(), bpar.GetParams().tcpportid);
    302   pci2eth.SetPrintLevel(1);
     302  pci2eth.SetPrintLevel(acpar.prtlevel_, acpar.prtmodulo_);
    303303  //  usleep(200);  attente au cas ou ...
    304304  pPcie2Eth=&pci2eth;
     
    387387
    388388  cout << "mfacq[3] Creating   EthernetReader thread object     " << endl;
    389   EthernetReader ethrdr(mmgr, bpar.GetParams(), bpar.GetParams().tcpportid, acpar.ethr_forcesamefc_);
     389  EthernetReader ethrdr(mmgr, bpar.GetParams(), bpar.GetParams().tcpportid);
     390  ethrdr.SetReadMode(acpar.ethr_forcesamefc_, acpar.ethr_sfc_maxdpc_,acpar.ethr_sfc_maxresync_);
    390391  ethrdr.SetPrintLevel(acpar.prtlevel_, acpar.prtmodulo_);
    391392
  • trunk/AddOn/TAcq/racqueth.cc

    r3883 r3897  
    2626//----------------------------------------------------------------------------------------------------------
    2727
     28// Si on veut avoir le protocole de controle (Hand-shake) entre l'emetteur et le recepteur,
     29// decommenter la ligne suivante - Reza , 4 octobre 2010
     30// #define  BR_DO_HANDSHAKE  1
    2831
    2932/* --Methode-- */
     
    4750  for(size_t i=0; i<vec_pciw_.size(); i++) {
    4851    vector<ClientSocket> vskt;
     52    vector<uint_8> verrcnt;
    4953    for(size_t j=0; j<destname_.size(); j++) {
    5054      ClientSocket sok(destname_[j], tcpportid_);
     
    5862        throw SocketException("PCIEToEthernet:ERROR/  Connection to EthernetReader not established ");
    5963      cout << " PCIEToEthernet: Ethernet connection established for DMA/fiber" << i << " with " << destname_[j] << endl;
    60       vskt.push_back(sok);   
     64      vskt.push_back(sok);
     65      verrcnt.push_back(0);
    6166    }
    6267    vvec_skt_.push_back(vskt);
     68    vvec_errorcnt_.push_back(verrcnt);
    6369    vec_cntpaq_.push_back(0);
    6470  }
    6571  totrdsnd_ = 0;
    66   SetPrintLevel();
     72  SetPrintLevel(par.prtlevel_,par.prtmodulo_);
    6773}
    6874
     
    7278  for(size_t i=0; i<vec_pciw_.size(); i++) {
    7379    vector<ClientSocket>& vskt = vvec_skt_[i];
    74     for(size_t j=0; j<destname_.size(); j++) vskt[j].Close();
     80    vector<uint_8>& verrcnt = vvec_errorcnt_[i];
     81    for(size_t j=0; j<destname_.size(); j++) {
     82      cout << " ~PCIEToEthernet() closing socket, fiber/pcieNum=" << i << " ethernet_destNum=" << j
     83           << " ErrorCount=" << verrcnt[j] << endl;
     84      vskt[j].Close();
     85    }
    7586  }
    7687}
     
    154165    // Byte* nextdma = locdata+((kmz%memgr.NbZones())*(paqsz*memgr.NbPaquets()));
    155166  uint_4 npaqfaitg = 0;
    156   uint_4 prtmod = par_.BlocPerFile()*par_.MMgrNbPaquet();
    157   if (prtmod < 500) prtmod=500;
    158167
    159168  while (npaqfaitg < nmaxpaq_) {  // Boucle global G
     
    179188    }
    180189    if (fgbaddma) continue; 
    181     if ((prtlev_>0)&&(npaqfaitg%prtmod==0))
     190    if ((prtlev_>0)&&(npaqfaitg%prtmodulo_==0))
    182191      cout << " PCIEToEthernet::run()/Info NPaqFait= " << npaqfaitg << endl;
    183192    if (fgdirectsend_) {  // Pas de copie / reduction de taille de paquet, on rebalance tel quel ...
     
    317326  //DBG    cout << " SendToTargets/DBG" << cnt_prt << " len=" << len << " data=" << hex << (unsigned long)data << dec << endl; cnt_prt++;
    318327  vector<ClientSocket>& vskt = vvec_skt_[fib];
     328  vector<uint_8>& verrcnt = vvec_errorcnt_[fib];
     329  char msg[BRTCPMSGLEN2];
    319330  size_t rc=0;
    320331  if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
    321     char msg[BRTCPMSGLEN2];
    322332    sprintf(msg,"PCIEToEthernet-SendCnt %d",vec_cntpaq_[fib]);
    323333    for(size_t j=0; j<vskt.size(); j++) {
    324334      vskt[j].SendAll(msg, BRTCPMSGLEN2);
    325335    }
    326     if (vec_cntpaq_[fib]>0) {
    327       for(size_t j=0; j<vskt.size(); j++) {
    328         vskt[j].ReceiveAll(msg, BRTCPMSGLEN2);
    329         msg[BRTCPMSGLEN2-1]='\0';
    330         if (strncmp(msg,"EthernetReader-RecvCnt",22) != 0)
    331           cout << " PCIEToEthernet::SendToTargets/ BAD HandShake message : " << msg << endl;
    332       }
    333     }
    334   }
     336  }
     337  //  Envoi des donnees (paquets)
    335338  for(size_t j=0; j<vskt.size(); j++) {
    336339    rc += vskt[j].SendAll((const char *)data, len);
     
    338341  }
    339342  vec_cntpaq_[fib]++;
     343#ifdef BR_DO_HANDSHAKE
     344  // attente message de hand-shake
     345  if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
     346    for(size_t j=0; j<vskt.size(); j++) {
     347      //DBG      cout << "SendToTargets/DEBUG-B attente -RecvCnt fib=" << fib << " npaq=" << vec_cntpaq_[fib] << endl;
     348      vskt[j].ReceiveAll(msg, BRTCPMSGLEN2);
     349      msg[BRTCPMSGLEN2-1]='\0';
     350      if (strncmp(msg,"EthernetReader-RecvCnt",22) != 0) {
     351        cout << " PCIEToEthernet::SendToTargets/ BAD HandShake, Fiber=" << fib
     352             << " NumSocket=" << j << " msg: " << msg << endl;
     353        verrcnt[j]++;
     354      }
     355    }   
     356  }
     357#endif
     358  if (vec_cntpaq_[fib]==nmaxpaq_) {  // fin de l'envoi pour cette fibre
     359    sprintf(msg,"PCIEToEthernet-Send-END %d",vec_cntpaq_[fib]);
     360    for(size_t j=0; j<vskt.size(); j++) {
     361      vskt[j].SendAll(msg, BRTCPMSGLEN2);
     362    }
     363   
     364  }
    340365  return rc;
    341366}
     
    345370
    346371/* --Methode-- */
    347 EthernetReader::EthernetReader(RAcqMemZoneMgr& mem, BRParList const& par, int portid, bool rdsamefc)
    348   :  memgr_(mem), par_(par), stop_(false), rdsamefc_(rdsamefc), tcpportid_(portid)
     372EthernetReader::EthernetReader(RAcqMemZoneMgr& mem, BRParList const& par, int portid)
     373  :  memgr_(mem), par_(par), stop_(false), tcpportid_(portid)
    349374{
    350375  totnbytesrd_ = 0;
    351376  totnpaqrd_ = 0;
    352377  totsamefc_ = 0;
     378  totnbresync_ = 0;
     379
    353380  if (memgr_.NbFibres() > MAXANAFIB)
    354381    throw BAORadioException("EthernetReader::EthernetReader/ NbFibres>MAXANAFIB ");
    355382  if (par_.ethr_nlink != memgr_.NbFibres())
    356383    throw BAORadioException("EthernetReader::EthernetReader/ NbFibres != ethr_nlink");
     384  SetPrintLevel(par.prtlevel_,par.prtmodulo_);
     385
     386  SetReadMode();  // definitition du mode de lecture des liens
    357387
    358388  packsize_=memgr_.PaqSize();
     
    369399    totnpqok_.push_back(0);   
    370400  }
    371   ServerSocket srv(tcpportid_, par_.ethr_nlink);
     401  srv_sokp_ = new ServerSocket(tcpportid_, par_.ethr_nlink);
    372402  char msg[BRTCPMSGLEN];
    373403  for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
    374     Socket sok=srv.WaitClientConnection();
     404    Socket sok=srv_sokp_->WaitClientConnection();
    375405    for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
    376406    sok.ReceiveAll(msg,BRTCPMSGLEN);
     
    379409    int ia,ib,ic,id,ie;
    380410    sscanf(msg+25,"%d %d %d %d %d",&ia,&ib,&ic,&id,&ie);
    381     if (((uint_4)ia!=par_.MMgrNbPaquet()) || ((uint_4)ib!=par_.MMgrPaquetSize())) {
     411    //    if (((uint_4)ia!=par_.MMgrNbPaquet()) || ((uint_4)ib!=par_.MMgrPaquetSize())) {
     412    if (((uint_4)ia!=memgr_.NbPaquets()) || ((uint_4)ib!=memgr_.PaqSize())) {
    382413      strcpy(msg,"BAORadio-EthernetReader-BAD MMgrNbPaquet/MMgrPaquetSize()");
    383414      sok.SendAll(msg,BRTCPMSGLEN);
     
    390421    cout << " EthernetReader/Info connection/link" << fib << " established." << endl;
    391422    vec_cntpaq_.push_back(0);
    392   }
    393 
    394   SetPrintLevel();
    395 }
    396 
     423    vec_fgsokend_.push_back(false);
     424  }
     425  gl_fgsokend = true;
     426  dummybuff_ = new char[packsize_];
     427}
     428
     429/* --Methode-- */
     430EthernetReader::~EthernetReader()
     431{
     432  if (prtlev_>0) cout << " ~EthernetReader() , TotNPaqRd=" << totnpaqrd_ << " TotNbReSync=" << totnbresync_ << endl;
     433  for(size_t j=0; j<vsok_.size(); j++) {
     434    if (prtlev_>1) 
     435      cout << " ~EthernetReader() closing sok[" << j << "] CntPaq=" << vec_cntpaq_[j]
     436           << " TotNPaqRd=" << totnpqrd_[j] << " TotNPaqOK=" << totnpqok_[j] << endl;
     437    vsok_[j].Close();
     438  }
     439  srv_sokp_->Close();
     440  delete srv_sokp_;
     441  if (dummybuff_) delete[] dummybuff_;
     442}
    397443
    398444/* --Methode-- */
     
    466512    usleep(50000);       // Attente de traitement du dernier paquet
    467513    memgr_.Stop();        // Arret
    468 
     514    CleanUpAllSockets();  // On lit tous les liens jusqu'a la reception du message END
     515    cout << " EthernetReader::run(): done CleanUpAllSockets()" << endl;
    469516  }  // Fin du bloc try
    470517  catch (std::exception& exc) {
     
    485532bool EthernetReader::ReadNextAllFibers()
    486533{
     534  if (SkipAndSyncLinks()>8)  return true;
     535 
    487536  for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++)  {
    488537    if (ReadNext(fib)) return true;  // probleme
     
    499548  //  On va essayer de lire jusqu'a avoir same_frame_counter
    500549  bool echec=true;
     550  int nbechec=0;
    501551  while (echec) {
     552    if (nbechec>sfc_maxresync_)  {
     553      if (prtlev_>0) 
     554        cout << " EthernetReader::ReadNextAllFibers()/ Stopping, reason NbEchec>(NbMax_Resync=" << sfc_maxresync_ << ")" << endl;
     555      return true;
     556    }
    502557    uint_8 cfc=curfc_[0];
    503558    bool fgsamefc=true;
     
    512567    }
    513568    else {  // else !fgsame
     569      if (nbechec>sfc_maxresync_)  return true;
     570      if (nbechec>0)   {
     571        int rcres=SkipAndSyncLinks();
     572        if (rcres>8)  return true;
     573        else if (rcres==1) {  // Il faut relire les fibres
     574          for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++)  {
     575            if (ReadNext(fib)) return true;  // probleme
     576          }
     577          fgsamefc=true;   cfc=curfc_[0];
     578          for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
     579            if (curfc_[fib]!=cfc) {
     580              fgsamefc=false;
     581              if (curfc_[fib] > cfc)  cfc=curfc_[fib];
     582            }
     583          }
     584          if (fgsamefc) {
     585            totsamefc_++;  echec=false; return false;  // c'est OK  , same framecounter
     586          }
     587        }  // fin de if (rcres==1)
     588      }
     589     
     590      if (prtlev_>5)
     591        cout << " EthernetReader::ReadNextAllFibers()/DEBUG NbEchec=" << nbechec 
     592             << " cfc=" << cfc << endl;
     593     
    514594      for(uint_4 fib=0; fib<memgr_.NbFibres(); fib++) {
    515         while (curfc_[fib]<cfc) {
    516           if (ReadNext(fib)) return true;  // probleme
     595        for(uint_4 ntryrd=0; ntryrd<sfc_maxdpc_; ntryrd++) {   // on tente un maximum de
     596          if (curfc_[fib]>=cfc) break;
     597          if (ReadNext(fib)) return true;  // probleme   
    517598        }
    518599      }
     600      nbechec++;
    519601    }   // fin de  else !fgsame
    520602  }  // fin de while(echec): lecture jusqu'a same_frame_counter
     
    529611  while(!fggood) {
    530612    ReceiveFromSocket(fib, (char *)vpaq_[fib].Begin(), packsize_);
     613    if (vec_fgsokend_[fib])   {   // fin de reception sur le lien
     614      if (prtlev_>0) 
     615        cout << " EthernetReader::ReadNext(" << fib << ")/ Stopping, reason EndReceive fgsokend_=" << vec_fgsokend_[fib] << endl;
     616      return true;
     617    }
    531618    totnbytesrd_+=packsize_;
    532619    totnpqrd_[fib]++;
     
    538625
    539626/* --Methode-- */
     627void EthernetReader::CleanUpAllSockets()
     628{
     629  bool fgallfinished=false;
     630  while (!fgallfinished) {
     631    fgallfinished=true;
     632    for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++)  {
     633      if (vec_fgsokend_[fib]>0) continue;
     634      ReceiveFromSocket(fib,dummybuff_,packsize_);
     635      fgallfinished=false;
     636    }
     637  }
     638  return;
     639}
     640
     641/* --Methode-- */
    540642size_t EthernetReader::ReceiveFromSocket(int fib, char* data, size_t len)
    541643{
    542644  size_t rc =0;
    543   if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
     645  if (vec_fgsokend_[fib]) { // la lecture est finie
     646    memset(data, 0, len);
     647    return 0;
     648  }
     649
     650  if (vec_cntpaq_[fib]%memgr_.NbPaquets() == 0) {
    544651    char msg[BRTCPMSGLEN2];
    545652    msg[0]='\0';
    546653    vsok_[fib].ReceiveAll(msg, BRTCPMSGLEN2);
    547     if (strncmp(msg,"PCIEToEthernet-SendCnt",22) != 0)
    548       cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake message : " << msg << endl;
    549   } 
     654    //    if (strncmp(msg,"PCIEToEthernet-SendCnt",22) != 0)
     655    //      cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake message : " << msg << endl;
     656    bool fgbadhsm=true;
     657    if (strncmp(msg,"PCIEToEthernet-Send",19)==0) {
     658      if (strncmp(msg+19,"Cnt",3)==0) fgbadhsm=false;
     659      else if (strncmp(msg+19,"-END",4)==0) {
     660        fgbadhsm=false;
     661        vec_fgsokend_[fib]=1;  gl_fgsokend=true;
     662        //DBG   cout << "ReceiveFromSocket/DEBUG -END msg received  " << endl;
     663        if (prtlev_>2) 
     664          cout << " EthernetReader::ReceiveFromSocket/ Receive on LinkNo" << fib << " Ended OK" << endl;
     665      }
     666    }
     667    if (fgbadhsm) {
     668      cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake, LinkNo " <<  fib << " msg: " << msg << endl;
     669      vec_fgsokend_[fib]=2;
     670    }
     671  }
     672
     673  if (vec_fgsokend_[fib]>0) { // la lecture est finie
     674    memset(data, 0, len);
     675    return 0;
     676  }
     677
    550678  rc += vsok_[fib].ReceiveAll(data, len);
    551679  vec_cntpaq_[fib]++;
    552680
    553   /*     
    554   size_t nblk = len/ethr_bsz_;
    555   size_t fblk = len%ethr_bsz_;
    556   size_t off = 0;
    557   if (nblk>0) {
    558     for(size_t i=0; i<nblk; i++)  {
    559       rc += vsok_[fib].ReceiveAll(data+off, ethr_bsz_);
    560       off += ethr_bsz_; 
    561     }
    562   }
    563   if (fblk>0) {
    564     rc += vsok_[fib].ReceiveAll(data+off, fblk);
    565   }
    566   vec_cntpaq_[fib]++;
    567   */
    568 
    569   if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
     681#ifdef BR_DO_HANDSHAKE
     682  // Envoi de message de hand-shake
     683  if (vec_cntpaq_[fib]%memgr_.NbPaquets() == 0) {
    570684    char msg[BRTCPMSGLEN2];
    571685    sprintf(msg,"EthernetReader-RecvCnt %d",vec_cntpaq_[fib]);
    572686    vsok_[fib].SendAll(msg, BRTCPMSGLEN2);
    573   } 
     687    //DBG    cout << " ReceiveFromSocket/DEBUG-B msg=" << msg << endl;
     688  }
     689#endif 
    574690  return rc;
     691}
     692
     693/* --Methode-- */
     694int EthernetReader::SkipAndSyncLinks()
     695{
     696  uint_8 minnumpaq=vec_cntpaq_[0];   // minimum des nombres de paquets lus sur les differents liens
     697  size_t minnp_fid=0;   // numero de fibre correspondant au minimum de paquets lus
     698  uint_8 maxnumpaq=vec_cntpaq_[0];   // maximum des nombres de paquets lus sur les differents liens
     699  size_t maxnp_fid=0;   // numero de fibre correspondant au maximum de paquets lus
     700  for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
     701    if (vec_cntpaq_[fib]<=minnumpaq) {
     702      minnumpaq=vec_cntpaq_[fib];    minnp_fid=fib;
     703    }
     704    if (vec_cntpaq_[fib]>=maxnumpaq) {
     705      maxnumpaq=vec_cntpaq_[fib];    maxnp_fid=fib;
     706    }
     707  }
     708  if (!rdsamefc_ && (minnumpaq!=maxnumpaq))
     709    cout << " EthernetReader::SkipAndSyncLinks()/BUG or PROBLEM - ReadAllOK and min<>maxnumpaq="
     710         << minnumpaq << "," << maxnumpaq << " !!!" << endl;
     711  if ((maxnumpaq-minnumpaq)<sfc_maxdpc_)  return 0;
     712
     713  if (prtlev_>4)
     714    cout << " EthernetReader::SkipAndSyncLinks() min,maxnumpaq=" << minnumpaq << "," << maxnumpaq
     715         << " min,maxnp_fid=" << minnp_fid << "," << maxnp_fid << " sfc_maxdpc_=" << sfc_maxdpc_ << endl;
     716
     717  for(size_t fib=0; fib<memgr_.NbFibres(); fib++)  {
     718    while(vec_cntpaq_[fib]<maxnumpaq) {
     719      if (vec_fgsokend_[fib]>0) return 9;
     720      ReceiveFromSocket(fib,dummybuff_,packsize_);
     721    }
     722  }
     723  totnbresync_++;
     724  if (prtlev_>3) {
     725    cout << " EthernetReader::SkipAndSyncLinks() NbResync=" << totnbresync_ << " After Sync: vec_cntpaq_[fib]=";
     726    for(size_t ii=0; ii<vec_cntpaq_.size(); ii++) cout << vec_cntpaq_[ii] << " , ";
     727    cout << endl;
     728  }
     729
     730  return 1;
    575731}
    576732
  • trunk/AddOn/TAcq/racqueth.h

    r3883 r3897  
    6565  BRParList par_;   // Parametres divers d'acquisition
    6666
    67   uint_8 nmaxpaq_;  // Nombre maxi de paquets traites
     67  uint_8 nmaxpaq_;  // Nombre maxi de paquets traites - DOIT etre mutiple de mmgr.NbPaquets()
    6868  BRDataFmtConv swapall_;  // select data swap/format conversion for BRPaquet
    6969  bool stop_; 
     
    7373  int tcpportid_;
    7474  vector< vector<ClientSocket> > vvec_skt_;
    75   vector< uint_4 > vec_cntpaq_;
     75  vector< vector<uint_8> > vvec_errorcnt_;
     76  vector< uint_8 > vec_cntpaq_;
    7677  uint_4 packSize_;
    7778  uint_4 packSizeInMgr_;
     
    8990class EthernetReader : public ZThread {
    9091public:
    91   EthernetReader(RAcqMemZoneMgr& mem, BRParList const& par, int portid=BRSPORTID, bool rdsamefc=false);
     92  EthernetReader(RAcqMemZoneMgr& mem, BRParList const& par, int portid=BRSPORTID);
     93  virtual ~EthernetReader();
     94
     95  // Configuration du mode de lecture des paquets
     96  // Si force_samefc=true, on essaye de lire des paquets avec meme FrameCounter, sans depasser une
     97  // difference maximum de maxdiff_paqnum entre nombre de paquets lus sur chaque fibre
     98  inline void SetReadMode(bool force_samefc=false, uint_4 maxdiff_paqnum=50, uint_4 maxresync=50)
     99  { rdsamefc_=force_samefc; sfc_maxdpc_=maxdiff_paqnum; sfc_maxresync_=maxresync; }
     100
    92101  virtual void run();
    93102  inline void Stop() { stop_ = true; }
     
    100109  bool ReadNextAllFibers();      // Renvoie true si probleme
    101110  bool ReadNext(int fib);   // Renvoie true si probleme
     111  void CleanUpAllSockets();  // pour finir la lecture de toutes les fibres
    102112  size_t ReceiveFromSocket(int fib, char* data, size_t len);
     113
     114// recale les links en cas d'ecart important entre nb de paquets lu sur chaque lien
     115  int SkipAndSyncLinks();   // Renvoie 0 si rien fait, 1 si skip fait, 9 si probleme 
    103116
    104117  // Permet d'avancer d'un paquet dans la zone - renvoie true si probleme
     
    119132  BRParList par_;   // Parametres divers d'acquisition
    120133  int tcpportid_;
    121   vector< Socket > vsok_;  //  Les sockets de lecture pour chaque fibre/lien
    122   vector< uint_4 > vec_cntpaq_;
     134  ServerSocket* srv_sokp_;  // Le socket serveur 
     135  vector< Socket > vsok_;  //  Les sockets de lecture pour chaque lien ethernet (1 lien= 1 fibre a l'envoi)
     136  vector< uint_8 > vec_cntpaq_;   // vecteur de compteurs de paquet ( 1 compteur / lien )
     137  vector< uint_4 > vec_fgsokend_;    // vecteur de flag, >0 : fin de reception (ou ereur) sur le lien
     138  bool gl_fgsokend;          // true -> au moins l'un de liens a termine
    123139  bool stop_;
    124140  bool rdsamefc_;  // if true, read paquets with same value of FrameCounter on different fibers/link
     141  uint_4 sfc_maxdpc_;   // difference maxi entre nombre de paquets lu sur les liens
     142  uint_4 sfc_maxresync_;  // nb maximum de tentatives de resynchronisation
     143
    125144  uint_8 totnbytesrd_;
    126145  uint_8 totnpaqrd_;
    127146  uint_8 totsamefc_;   // nombre total de paquets avec meme framecounter 
     147
     148  uint_8 totnbresync_;   // nombre total de saut+resynchronisation
    128149
    129150  vector<BRPaquet> vpaq_;
     
    140161  Byte* mmbufib_[MAXANAFIB];  // Pointeurs zone memoire de chaque fibre rendu par RAcqMemZoneMgr
    141162
     163  // zone tampon contenant zero si lecture sur fibre est fini
     164  char* dummybuff_;
     165
    142166  int prtlev_;   // print level
    143167  uint_8 prtmodulo_;   // print periodicity (modulo)
  • trunk/AddOn/TAcq/swrapsock.cc

    r3763 r3897  
    214214
    215215  return Socket(s);
     216}
     217
     218
     219/* --Methode-- */
     220int ServerSocket::Close()
     221{
     222  if (skt<0) return 0;
     223  int rc=-1;
     224  /*   il ne faut apparemment pas faire shutdown sur un socket serveur (non connected - ENOTCONN )
     225  rc = shutdown(skt, SHUT_RDWR);
     226  if(rc < 0)
     227    cout << "Socket::Close() Erreur: Pb shutdown() ErrNo="
     228         << errno << endl;
     229  */
     230  rc = close(skt);
     231  if(rc < 0)
     232    cout << "ServerSocket::Close() Erreur: Pb close() ErrNo="
     233         << errno << endl;
     234  skt = -1;
     235  return(0);
    216236}
    217237
  • trunk/AddOn/TAcq/swrapsock.h

    r3763 r3897  
    3737  Socket(int s=-1);
    3838  Socket(Socket const & a);
    39   ~Socket();
     39  virtual ~Socket();
    4040
    4141  size_t Send(const char * buff, size_t len, int flag=0);
     
    4444  size_t ReceiveAll(char * buff, size_t len);
    4545
    46   int Close();
     46  virtual int Close();
    4747 
    4848  inline Socket& operator=(Socket const & a)
     
    8282  ServerSocket(int port, int nconmax);
    8383  Socket WaitClientConnection();
     84  virtual int Close();  // redefinition de la methode de Socket::Close()
    8485
    8586protected:
Note: See TracChangeset for help on using the changeset viewer.