Changeset 3897 in Sophya for trunk/AddOn/TAcq/racqueth.cc
- Timestamp:
- Oct 4, 2010, 4:19:59 PM (15 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/AddOn/TAcq/racqueth.cc
r3883 r3897 26 26 //---------------------------------------------------------------------------------------------------------- 27 27 28 // Si on veut avoir le protocole de controle (Hand-shake) entre l'emetteur et le recepteur, 29 // decommenter la ligne suivante - Reza , 4 octobre 2010 30 // #define BR_DO_HANDSHAKE 1 28 31 29 32 /* --Methode-- */ … … 47 50 for(size_t i=0; i<vec_pciw_.size(); i++) { 48 51 vector<ClientSocket> vskt; 52 vector<uint_8> verrcnt; 49 53 for(size_t j=0; j<destname_.size(); j++) { 50 54 ClientSocket sok(destname_[j], tcpportid_); … … 58 62 throw SocketException("PCIEToEthernet:ERROR/ Connection to EthernetReader not established "); 59 63 cout << " PCIEToEthernet: Ethernet connection established for DMA/fiber" << i << " with " << destname_[j] << endl; 60 vskt.push_back(sok); 64 vskt.push_back(sok); 65 verrcnt.push_back(0); 61 66 } 62 67 vvec_skt_.push_back(vskt); 68 vvec_errorcnt_.push_back(verrcnt); 63 69 vec_cntpaq_.push_back(0); 64 70 } 65 71 totrdsnd_ = 0; 66 SetPrintLevel( );72 SetPrintLevel(par.prtlevel_,par.prtmodulo_); 67 73 } 68 74 … … 72 78 for(size_t i=0; i<vec_pciw_.size(); i++) { 73 79 vector<ClientSocket>& vskt = vvec_skt_[i]; 74 for(size_t j=0; j<destname_.size(); j++) vskt[j].Close(); 80 vector<uint_8>& verrcnt = vvec_errorcnt_[i]; 81 for(size_t j=0; j<destname_.size(); j++) { 82 cout << " ~PCIEToEthernet() closing socket, fiber/pcieNum=" << i << " ethernet_destNum=" << j 83 << " ErrorCount=" << verrcnt[j] << endl; 84 vskt[j].Close(); 85 } 75 86 } 76 87 } … … 154 165 // Byte* nextdma = locdata+((kmz%memgr.NbZones())*(paqsz*memgr.NbPaquets())); 155 166 uint_4 npaqfaitg = 0; 156 uint_4 prtmod = par_.BlocPerFile()*par_.MMgrNbPaquet();157 if (prtmod < 500) prtmod=500;158 167 159 168 while (npaqfaitg < nmaxpaq_) { // Boucle global G … … 179 188 } 180 189 if (fgbaddma) continue; 181 if ((prtlev_>0)&&(npaqfaitg%prtmod ==0))190 if ((prtlev_>0)&&(npaqfaitg%prtmodulo_==0)) 182 191 cout << " PCIEToEthernet::run()/Info NPaqFait= " << npaqfaitg << endl; 183 192 if (fgdirectsend_) { // Pas de copie / reduction de taille de paquet, on rebalance tel quel ... … … 317 326 //DBG cout << " SendToTargets/DBG" << cnt_prt << " len=" << len << " data=" << hex << (unsigned long)data << dec << endl; cnt_prt++; 318 327 vector<ClientSocket>& vskt = vvec_skt_[fib]; 328 vector<uint_8>& verrcnt = vvec_errorcnt_[fib]; 329 char msg[BRTCPMSGLEN2]; 319 330 size_t rc=0; 320 331 if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) { 321 char msg[BRTCPMSGLEN2];322 332 sprintf(msg,"PCIEToEthernet-SendCnt %d",vec_cntpaq_[fib]); 323 333 for(size_t j=0; j<vskt.size(); j++) { 324 334 vskt[j].SendAll(msg, BRTCPMSGLEN2); 325 335 } 326 if (vec_cntpaq_[fib]>0) { 327 for(size_t j=0; j<vskt.size(); j++) { 328 vskt[j].ReceiveAll(msg, BRTCPMSGLEN2); 329 msg[BRTCPMSGLEN2-1]='\0'; 330 if (strncmp(msg,"EthernetReader-RecvCnt",22) != 0) 331 cout << " PCIEToEthernet::SendToTargets/ BAD HandShake message : " << msg << endl; 332 } 333 } 334 } 336 } 337 // Envoi des donnees (paquets) 335 338 for(size_t j=0; j<vskt.size(); j++) { 336 339 rc += vskt[j].SendAll((const char *)data, len); … … 338 341 } 339 342 vec_cntpaq_[fib]++; 343 #ifdef BR_DO_HANDSHAKE 344 // attente message de hand-shake 345 if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) { 346 for(size_t j=0; j<vskt.size(); j++) { 347 //DBG cout << "SendToTargets/DEBUG-B attente -RecvCnt fib=" << fib << " npaq=" << vec_cntpaq_[fib] << endl; 348 vskt[j].ReceiveAll(msg, BRTCPMSGLEN2); 349 msg[BRTCPMSGLEN2-1]='\0'; 350 if (strncmp(msg,"EthernetReader-RecvCnt",22) != 0) { 351 cout << " PCIEToEthernet::SendToTargets/ BAD HandShake, Fiber=" << fib 352 << " NumSocket=" << j << " msg: " << msg << endl; 353 verrcnt[j]++; 354 } 355 } 356 } 357 #endif 358 if (vec_cntpaq_[fib]==nmaxpaq_) { // fin de l'envoi pour cette fibre 359 sprintf(msg,"PCIEToEthernet-Send-END %d",vec_cntpaq_[fib]); 360 for(size_t j=0; j<vskt.size(); j++) { 361 vskt[j].SendAll(msg, BRTCPMSGLEN2); 362 } 363 364 } 340 365 return rc; 341 366 } … … 345 370 346 371 /* --Methode-- */ 347 EthernetReader::EthernetReader(RAcqMemZoneMgr& mem, BRParList const& par, int portid , bool rdsamefc)348 : memgr_(mem), par_(par), stop_(false), rdsamefc_(rdsamefc),tcpportid_(portid)372 EthernetReader::EthernetReader(RAcqMemZoneMgr& mem, BRParList const& par, int portid) 373 : memgr_(mem), par_(par), stop_(false), tcpportid_(portid) 349 374 { 350 375 totnbytesrd_ = 0; 351 376 totnpaqrd_ = 0; 352 377 totsamefc_ = 0; 378 totnbresync_ = 0; 379 353 380 if (memgr_.NbFibres() > MAXANAFIB) 354 381 throw BAORadioException("EthernetReader::EthernetReader/ NbFibres>MAXANAFIB "); 355 382 if (par_.ethr_nlink != memgr_.NbFibres()) 356 383 throw BAORadioException("EthernetReader::EthernetReader/ NbFibres != ethr_nlink"); 384 SetPrintLevel(par.prtlevel_,par.prtmodulo_); 385 386 SetReadMode(); // definitition du mode de lecture des liens 357 387 358 388 packsize_=memgr_.PaqSize(); … … 369 399 totnpqok_.push_back(0); 370 400 } 371 ServerSocket srv(tcpportid_, par_.ethr_nlink);401 srv_sokp_ = new ServerSocket(tcpportid_, par_.ethr_nlink); 372 402 char msg[BRTCPMSGLEN]; 373 403 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { 374 Socket sok=srv .WaitClientConnection();404 Socket sok=srv_sokp_->WaitClientConnection(); 375 405 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0'; 376 406 sok.ReceiveAll(msg,BRTCPMSGLEN); … … 379 409 int ia,ib,ic,id,ie; 380 410 sscanf(msg+25,"%d %d %d %d %d",&ia,&ib,&ic,&id,&ie); 381 if (((uint_4)ia!=par_.MMgrNbPaquet()) || ((uint_4)ib!=par_.MMgrPaquetSize())) { 411 // if (((uint_4)ia!=par_.MMgrNbPaquet()) || ((uint_4)ib!=par_.MMgrPaquetSize())) { 412 if (((uint_4)ia!=memgr_.NbPaquets()) || ((uint_4)ib!=memgr_.PaqSize())) { 382 413 strcpy(msg,"BAORadio-EthernetReader-BAD MMgrNbPaquet/MMgrPaquetSize()"); 383 414 sok.SendAll(msg,BRTCPMSGLEN); … … 390 421 cout << " EthernetReader/Info connection/link" << fib << " established." << endl; 391 422 vec_cntpaq_.push_back(0); 392 } 393 394 SetPrintLevel(); 395 } 396 423 vec_fgsokend_.push_back(false); 424 } 425 gl_fgsokend = true; 426 dummybuff_ = new char[packsize_]; 427 } 428 429 /* --Methode-- */ 430 EthernetReader::~EthernetReader() 431 { 432 if (prtlev_>0) cout << " ~EthernetReader() , TotNPaqRd=" << totnpaqrd_ << " TotNbReSync=" << totnbresync_ << endl; 433 for(size_t j=0; j<vsok_.size(); j++) { 434 if (prtlev_>1) 435 cout << " ~EthernetReader() closing sok[" << j << "] CntPaq=" << vec_cntpaq_[j] 436 << " TotNPaqRd=" << totnpqrd_[j] << " TotNPaqOK=" << totnpqok_[j] << endl; 437 vsok_[j].Close(); 438 } 439 srv_sokp_->Close(); 440 delete srv_sokp_; 441 if (dummybuff_) delete[] dummybuff_; 442 } 397 443 398 444 /* --Methode-- */ … … 466 512 usleep(50000); // Attente de traitement du dernier paquet 467 513 memgr_.Stop(); // Arret 468 514 CleanUpAllSockets(); // On lit tous les liens jusqu'a la reception du message END 515 cout << " EthernetReader::run(): done CleanUpAllSockets()" << endl; 469 516 } // Fin du bloc try 470 517 catch (std::exception& exc) { … … 485 532 bool EthernetReader::ReadNextAllFibers() 486 533 { 534 if (SkipAndSyncLinks()>8) return true; 535 487 536 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { 488 537 if (ReadNext(fib)) return true; // probleme … … 499 548 // On va essayer de lire jusqu'a avoir same_frame_counter 500 549 bool echec=true; 550 int nbechec=0; 501 551 while (echec) { 552 if (nbechec>sfc_maxresync_) { 553 if (prtlev_>0) 554 cout << " EthernetReader::ReadNextAllFibers()/ Stopping, reason NbEchec>(NbMax_Resync=" << sfc_maxresync_ << ")" << endl; 555 return true; 556 } 502 557 uint_8 cfc=curfc_[0]; 503 558 bool fgsamefc=true; … … 512 567 } 513 568 else { // else !fgsame 569 if (nbechec>sfc_maxresync_) return true; 570 if (nbechec>0) { 571 int rcres=SkipAndSyncLinks(); 572 if (rcres>8) return true; 573 else if (rcres==1) { // Il faut relire les fibres 574 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { 575 if (ReadNext(fib)) return true; // probleme 576 } 577 fgsamefc=true; cfc=curfc_[0]; 578 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) { 579 if (curfc_[fib]!=cfc) { 580 fgsamefc=false; 581 if (curfc_[fib] > cfc) cfc=curfc_[fib]; 582 } 583 } 584 if (fgsamefc) { 585 totsamefc_++; echec=false; return false; // c'est OK , same framecounter 586 } 587 } // fin de if (rcres==1) 588 } 589 590 if (prtlev_>5) 591 cout << " EthernetReader::ReadNextAllFibers()/DEBUG NbEchec=" << nbechec 592 << " cfc=" << cfc << endl; 593 514 594 for(uint_4 fib=0; fib<memgr_.NbFibres(); fib++) { 515 while (curfc_[fib]<cfc) { 516 if (ReadNext(fib)) return true; // probleme 595 for(uint_4 ntryrd=0; ntryrd<sfc_maxdpc_; ntryrd++) { // on tente un maximum de 596 if (curfc_[fib]>=cfc) break; 597 if (ReadNext(fib)) return true; // probleme 517 598 } 518 599 } 600 nbechec++; 519 601 } // fin de else !fgsame 520 602 } // fin de while(echec): lecture jusqu'a same_frame_counter … … 529 611 while(!fggood) { 530 612 ReceiveFromSocket(fib, (char *)vpaq_[fib].Begin(), packsize_); 613 if (vec_fgsokend_[fib]) { // fin de reception sur le lien 614 if (prtlev_>0) 615 cout << " EthernetReader::ReadNext(" << fib << ")/ Stopping, reason EndReceive fgsokend_=" << vec_fgsokend_[fib] << endl; 616 return true; 617 } 531 618 totnbytesrd_+=packsize_; 532 619 totnpqrd_[fib]++; … … 538 625 539 626 /* --Methode-- */ 627 void EthernetReader::CleanUpAllSockets() 628 { 629 bool fgallfinished=false; 630 while (!fgallfinished) { 631 fgallfinished=true; 632 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) { 633 if (vec_fgsokend_[fib]>0) continue; 634 ReceiveFromSocket(fib,dummybuff_,packsize_); 635 fgallfinished=false; 636 } 637 } 638 return; 639 } 640 641 /* --Methode-- */ 540 642 size_t EthernetReader::ReceiveFromSocket(int fib, char* data, size_t len) 541 643 { 542 644 size_t rc =0; 543 if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) { 645 if (vec_fgsokend_[fib]) { // la lecture est finie 646 memset(data, 0, len); 647 return 0; 648 } 649 650 if (vec_cntpaq_[fib]%memgr_.NbPaquets() == 0) { 544 651 char msg[BRTCPMSGLEN2]; 545 652 msg[0]='\0'; 546 653 vsok_[fib].ReceiveAll(msg, BRTCPMSGLEN2); 547 if (strncmp(msg,"PCIEToEthernet-SendCnt",22) != 0) 548 cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake message : " << msg << endl; 549 } 654 // if (strncmp(msg,"PCIEToEthernet-SendCnt",22) != 0) 655 // cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake message : " << msg << endl; 656 bool fgbadhsm=true; 657 if (strncmp(msg,"PCIEToEthernet-Send",19)==0) { 658 if (strncmp(msg+19,"Cnt",3)==0) fgbadhsm=false; 659 else if (strncmp(msg+19,"-END",4)==0) { 660 fgbadhsm=false; 661 vec_fgsokend_[fib]=1; gl_fgsokend=true; 662 //DBG cout << "ReceiveFromSocket/DEBUG -END msg received " << endl; 663 if (prtlev_>2) 664 cout << " EthernetReader::ReceiveFromSocket/ Receive on LinkNo" << fib << " Ended OK" << endl; 665 } 666 } 667 if (fgbadhsm) { 668 cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake, LinkNo " << fib << " msg: " << msg << endl; 669 vec_fgsokend_[fib]=2; 670 } 671 } 672 673 if (vec_fgsokend_[fib]>0) { // la lecture est finie 674 memset(data, 0, len); 675 return 0; 676 } 677 550 678 rc += vsok_[fib].ReceiveAll(data, len); 551 679 vec_cntpaq_[fib]++; 552 680 553 /* 554 size_t nblk = len/ethr_bsz_; 555 size_t fblk = len%ethr_bsz_; 556 size_t off = 0; 557 if (nblk>0) { 558 for(size_t i=0; i<nblk; i++) { 559 rc += vsok_[fib].ReceiveAll(data+off, ethr_bsz_); 560 off += ethr_bsz_; 561 } 562 } 563 if (fblk>0) { 564 rc += vsok_[fib].ReceiveAll(data+off, fblk); 565 } 566 vec_cntpaq_[fib]++; 567 */ 568 569 if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) { 681 #ifdef BR_DO_HANDSHAKE 682 // Envoi de message de hand-shake 683 if (vec_cntpaq_[fib]%memgr_.NbPaquets() == 0) { 570 684 char msg[BRTCPMSGLEN2]; 571 685 sprintf(msg,"EthernetReader-RecvCnt %d",vec_cntpaq_[fib]); 572 686 vsok_[fib].SendAll(msg, BRTCPMSGLEN2); 573 } 687 //DBG cout << " ReceiveFromSocket/DEBUG-B msg=" << msg << endl; 688 } 689 #endif 574 690 return rc; 691 } 692 693 /* --Methode-- */ 694 int EthernetReader::SkipAndSyncLinks() 695 { 696 uint_8 minnumpaq=vec_cntpaq_[0]; // minimum des nombres de paquets lus sur les differents liens 697 size_t minnp_fid=0; // numero de fibre correspondant au minimum de paquets lus 698 uint_8 maxnumpaq=vec_cntpaq_[0]; // maximum des nombres de paquets lus sur les differents liens 699 size_t maxnp_fid=0; // numero de fibre correspondant au maximum de paquets lus 700 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) { 701 if (vec_cntpaq_[fib]<=minnumpaq) { 702 minnumpaq=vec_cntpaq_[fib]; minnp_fid=fib; 703 } 704 if (vec_cntpaq_[fib]>=maxnumpaq) { 705 maxnumpaq=vec_cntpaq_[fib]; maxnp_fid=fib; 706 } 707 } 708 if (!rdsamefc_ && (minnumpaq!=maxnumpaq)) 709 cout << " EthernetReader::SkipAndSyncLinks()/BUG or PROBLEM - ReadAllOK and min<>maxnumpaq=" 710 << minnumpaq << "," << maxnumpaq << " !!!" << endl; 711 if ((maxnumpaq-minnumpaq)<sfc_maxdpc_) return 0; 712 713 if (prtlev_>4) 714 cout << " EthernetReader::SkipAndSyncLinks() min,maxnumpaq=" << minnumpaq << "," << maxnumpaq 715 << " min,maxnp_fid=" << minnp_fid << "," << maxnp_fid << " sfc_maxdpc_=" << sfc_maxdpc_ << endl; 716 717 for(size_t fib=0; fib<memgr_.NbFibres(); fib++) { 718 while(vec_cntpaq_[fib]<maxnumpaq) { 719 if (vec_fgsokend_[fib]>0) return 9; 720 ReceiveFromSocket(fib,dummybuff_,packsize_); 721 } 722 } 723 totnbresync_++; 724 if (prtlev_>3) { 725 cout << " EthernetReader::SkipAndSyncLinks() NbResync=" << totnbresync_ << " After Sync: vec_cntpaq_[fib]="; 726 for(size_t ii=0; ii<vec_cntpaq_.size(); ii++) cout << vec_cntpaq_[ii] << " , "; 727 cout << endl; 728 } 729 730 return 1; 575 731 } 576 732
Note:
See TracChangeset
for help on using the changeset viewer.