Changeset 3658 in Sophya for trunk/AddOn/TAcq/racqurw.cc
- Timestamp:
- Oct 18, 2009, 11:10:33 AM (16 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/AddOn/TAcq/racqurw.cc
r3644 r3658 59 59 void PCIEReader::run() 60 60 { 61 struct sigaction act;62 struct sigaction old_act;63 61 //Precision insuffisante ResourceUsage ru; ru.Update(); // Pour recuperer le temps passe 64 62 struct timeval tv1, tv2; … … 70 68 setRC(1); 71 69 72 uint_4 cpt=0;70 73 71 // sigaddset(&act.sa_mask,SIGINT); // pour proteger le transfert DMA 74 72 //sigaction(SIGINT,&act,NULL); … … 232 230 uint_4 fnum=0; 233 231 uint_4 paqsz = memgr.PaqSize(); 232 cout << " ============================ DataSaver::run() PaqSize " << paqsz <<endl; 234 233 bool fgnulldev = false; 235 234 if (path_ == "/dev/null") { … … 253 252 // Entete correspondant a l'ecriture tout le paquet - trailer compris (modif Mai 2009) 254 253 mff.setDTypeNaxis(MF_Byte, paq0.PaquetSize(), npaqperfile); 254 255 255 // Sans TRAILER de paquet mff.setDTypeNaxis(MF_Byte, paq0.DataSize()+paq0.HeaderSize(), npaqperfile); 256 256 } … … 353 353 cout << " ... RAcqMemZoneMgr not used - using s fixed memory location for packets decoding ..." << endl; 354 354 355 uint_4 cpt=0;355 356 356 // sigaddset(&act.sa_mask,SIGINT); // pour proteger le transfert DMA 357 357 //sigaction(SIGINT,&act,NULL); … … 363 363 364 364 Byte* Datas = NULL; 365 Byte* locdata = new Byte[paqsz ];365 Byte* locdata = new Byte[paqsz*memgr.NbPaquets()*memgr.NbZones()]; 366 366 Byte* tampon = new Byte[paqsz]; 367 367 … … 375 375 if (fgarret) break; 376 376 377 Byte* nextdma = locdata ;377 Byte* nextdma = locdata+((kmz%memgr.NbZones())*(paqsz*memgr.NbPaquets())); 378 378 uint_4 npaqfait = 0; 379 379 // for (uint_4 i=0; i<memgr.NbPaquets(); i += pktInDMATr) { // attention pktInDMATr paquets dans 1 seul DMA … … 419 419 while((curoff+paqsz)<=dmasz) { 420 420 // BRPaquet paq((Byte*)(Datas)+((paqsz*j)), nextdma+j*paqsz, paqsz, swapall_); 421 BRPaquet paq(Datas+curoff, locdata, paqsz, swapall_); 421 // BRPaquet paq(Datas+curoff, locdata, paqsz, swapall_); 422 BRPaquet paq(Datas+curoff, nextdma+npaqfait*paqsz, paqsz, swapall_); 422 423 curoff += paqsz; // On avance l'index dans le buffer du DMA 423 424 npaqfait++; // Ne pas oublier le compteur de paquets faits … … 457 458 458 459 } 460 //////////////////////////////////////////////////////////////////////////////////////////////////////// 461 //---------------------------------------------------------------------------------------------------------- 462 // Classe thread de lecture PCI-Express + Check pour tests de verification de debit/etc avec un seul thread 463 //---------------------------------------------------------------------------------------------------------- 464 465 /* --Methode-- */ 466 PCIEMultiReader::PCIEMultiReader(vector<PCIEWrapperInterface*> vec_pciw,uint_4 sizeFrame,uint_4 packSize ,RAcqMemZoneMgr& mem, uint_4 nmax, BRDataFmtConv swapall,int binMin, int nbBin) 467 : memgr(mem) , vec_pciw_ (vec_pciw) 468 { 469 nmax_ = nmax; 470 swapall_ = swapall; // select data swap/format conversion for BRPaquet 471 stop_ = false; 472 packSize_ = packSize; 473 packSizeInMgr_=memgr.PaqSize(); 474 sizeFr_ =sizeFrame; 475 binMin_= binMin; 476 nbBin_= nbBin; 477 if (vec_pciw.size() != memgr.NbFibres()) { 478 cout << " PCIEMultiReader()PbArgs: vec_pciw.size()= " << vec_pciw.size() << " memgr.NbFibres()=" <<memgr.NbFibres()<< endl; 479 throw ParmError("PCIEMultiReader:ERROR/ arguments incompatibles vec_pciw.size() != memgr.NbFibres() "); 480 } 481 if (vec_pciw.size() > MAXNBFIB) 482 throw ParmError("PCIEMultiReader:ERROR/ vec_pciw.size() > MAXNBFIB "); 483 nbDma_= vec_pciw.size(); 484 mid_=-2; 485 mmbuf_=NULL; 486 max_targ_npaq = memgr.NbPaquets(); 487 for (int fid=0 ; fid<(int)nbDma_ ;fid++) mmbufib_[fid]=NULL; 488 } 489 490 /* --Methode-- */ 491 void PCIEMultiReader::run() 492 { 493 494 struct timeval tv1,tv2; 495 gettimeofday(&tv1, NULL); 496 497 cout << " PCIEMultiReader::run() - Starting , NMaxMemZones=" << nmax_ 498 << " memgr.NbPaquets()=" << memgr.NbPaquets() << "Paqsize " << packSize_<< endl; 499 setRC(1); 500 501 // sigaddset(&act.sa_mask,SIGINT); // pour proteger le transfert DMA 502 //sigaction(SIGINT,&act,NULL); 503 // uint_4 paqsz = memgr[0]->PaqSize(); 504 uint_4 paqsz = packSize_; 505 uint_4 dmasz = vec_pciw_[0]->TransferSize(); 506 vec_pciw_[0]->StartTransfers(); 507 508 BRPaqChecker pcheck[MAXNBFIB]; // Verification/comptage des paquets 509 Byte* Datas[MAXNBFIB]; 510 Byte* tampon[MAXNBFIB] ; 511 Byte* nextpaq=NULL; 512 uint_4 off_acheval=0; 513 514 int nerrdma = 0; 515 int maxerrdma = 10; 516 bool fgarret = false; 517 518 // Initialisation des tampons pour recopie des paquets a cheval pour chaque DMA 519 for (int i=0;i< (int)nbDma_ ;i++) { 520 tampon[i]= new Byte[paqsz]; 521 } 522 523 ofstream header[MAXNBFIB]; 524 for(uint_4 fib=0; fib<nbDma_; fib++) { 525 char hfnm[128]; 526 sprintf(hfnm, "./HDRCountPaqs%d.txt", fib); 527 header[fib].open(hfnm); 528 } 529 530 uint_4 npaqfait[MAXNBFIB] ; 531 for (int i=0;i< (int)nbDma_ ;i++) npaqfait[i]=0; 532 // Byte* nextdma = locdata+((kmz%memgr.NbZones())*(paqsz*memgr.NbPaquets())); 533 uint_4 npaqfaitg = 0; 534 // for (uint_4 i=0; i<memgr.NbPaquets(); i += pktInDMATr) { // attention pktInDMATr paquets dans 1 seul DMA 535 while (npaqfaitg < nmax_*memgr.NbPaquets()) { // Boucle global G 536 if (fgarret) break; 537 if (stop_) break; 538 539 // Lancement des DMA 540 for (int dma=0; dma < (int)nbDma_ ;dma++) vec_pciw_[dma]->StartTransfers(); 541 542 // On pointe vers le debut de la zone a remplir aver le prochain DMA 543 //-- Zone memoire locale Byte* nextdma = buff+i*paqsz; 544 545 bool fgbaddma=false; 546 // On boucle sur les nbDma_ en attente de leurs terminaison 547 for (int dma=0; dma <(int) nbDma_ ;dma++) { 548 Datas[dma]=vec_pciw_[dma]->GetData(); 549 if (Datas[dma] == NULL) { // No data Read in DMA 550 nerrdma ++; fgbaddma=true; 551 cout << "PCIEMultiReaderChecker/Erreur Waiting for datas ..." << endl; 552 vec_pciw_[dma]->PrintStatus(cout); 553 if (nerrdma>=maxerrdma) { fgarret = true; break; } 554 } 555 } 556 if (fgbaddma) continue; 557 uint_4 curoff=0; 558 //1- On traite le paquet a cheval, rempli partiellement avec le DMA d'avant si necessaire pour les n fibres 559 if (off_acheval > 0) { // IF Numero B 560 if ((paqsz-off_acheval)< dmasz) { // IF Numero A 561 for(uint_4 fib=0; fib<nbDma_; fib++) 562 memcpy((void *)((tampon[fib])+off_acheval), (void *)Datas[fib], paqsz-off_acheval); 563 curoff = paqsz-off_acheval; off_acheval = 0; 564 if ( MoveToNextTarget() ) { 565 cout << "PCIEMultiReader::run()/Error-A- MoveToNextTarget() returned true ->STOP 9" << endl; 566 setRC(9); fgarret=true; break; 567 } 568 for(uint_4 fib=0; fib<nbDma_; fib++) { 569 nextpaq=GetPaquetTarget(fib); 570 if (nextpaq == NULL) { // Cela ne devrait pas arriver 571 cout << "PCIEReader::run()/Error-A2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl; 572 setRC(9); fgarret=true; break; 573 } 574 BRPaquet paq(tampon[fib], nextpaq, paqsz, swapall_,binMin_,nbBin_); 575 npaqfait[fib]++; 576 if (fib==nbDma_-1) npaqfaitg++; // Ne pas oublier le compteur de paquets faits 577 pcheck[fib].Check(paq); // Verification du paquet / FrameCounter 578 header[fib] << dec << paq.FrameCounter()<< endl; ; 579 } 580 } 581 else { // se rapporte au IF numero A 582 for(uint_4 fib=0; fib<nbDma_; fib++) 583 memcpy((void *)(tampon[fib]+off_acheval), (void *)Datas[fib], dmasz); 584 curoff =dmasz; 585 off_acheval = (dmasz+off_acheval); 586 } 587 } // Fin IF Numero B 588 589 //2- On traite les paquets complets qui se trouvent dans la zone du DMA 590 while ((curoff+paqsz)<=dmasz) { // while numero C 591 // if ((dma==nbDma_-1)&&(npaqfait >= nmax_* memgr.NbPaquets())) break; 592 if ( MoveToNextTarget() ) { 593 cout << "PCIEMultiReader::run()/Error-B- MoveToNextTarget() returned true ->STOP 9" << endl; 594 setRC(9); fgarret=true; break; 595 } 596 for(uint_4 fib=0; fib<nbDma_; fib++) { 597 if (npaqfait[fib] >= nmax_*memgr.NbPaquets()) continue; 598 nextpaq=GetPaquetTarget(fib); 599 if (nextpaq == NULL) { // Cela ne devrait pas arriver 600 cout << "PCIEReader::run()/Error-B2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl; 601 setRC(9); fgarret=true; break; 602 } 603 BRPaquet paq(Datas[fib]+curoff, nextpaq, paqsz, swapall_,binMin_,nbBin_); 604 npaqfait[fib]++; 605 if (fib==nbDma_-1) npaqfaitg++; // Ne pas oublier le compteur de paquets faits 606 pcheck[fib].Check(paq); // Verification du paquet / FrameCounter 607 header[fib] << dec << paq.FrameCounter()<< endl; ; 608 } 609 curoff += paqsz; // On avance l'index dans le buffer du DMA 610 } // -- FIN traitement des paquets complets ds un DMA - FIN du while numero C 611 //3- On copie si besoin la fin du DMA dans la zone tampon 612 if (curoff < dmasz) { // IF numero D 613 off_acheval = dmasz-curoff; 614 for(uint_4 fib=0; fib<nbDma_; fib++) 615 memcpy(tampon[fib], (void*)(Datas[fib]+curoff), off_acheval); 616 // ne sert a rien curoff += off_acheval; 617 } // FIN du if numero D 618 } // FIN Boucle global G 619 620 621 setRC(0); 622 gettimeofday(&tv2, NULL); 623 double tmelaps2 = (tv2.tv_sec-tv1.tv_sec)*1000.+(tv2.tv_usec-tv1.tv_usec)/1000.; 624 if (tmelaps2<0.1) tmelaps2=0.1; 625 cout << " ---------- PCIEMultiReader::run()-End summary NPaqFait=" << npaqfaitg << "------------- " << endl; 626 for (int dma=0; dma < (int)nbDma_ ;dma++) { 627 cout << " --Fib=" << dma << " NPaqFait=" << npaqfait[dma] << " TotTransfer=" 628 << vec_pciw_[dma]->TotTransferBytes()/1024 629 << " kb , ElapsTime=" << tmelaps2 << " ms ->" 630 << (double)vec_pciw_[dma]->TotTransferBytes()/tmelaps2 << " kb/s" << endl; 631 pcheck[dma].Print(cout); 632 } 633 cout << " --------------------------------------------------------------------" << endl; 634 635 // //// Nettoyage final 636 MZoneManage(true); 637 for(uint_4 fib=0; fib<nbDma_; fib++) header[fib].close(); 638 for (int i=0;i< (int)nbDma_ ;i++) delete[] tampon[i]; 639 //DBG cout << " fin thread ========================" <<endl; 640 return; 641 } 642 643 /* --Methode-- */ 644 bool PCIEMultiReader::MZoneManage(bool fgclean) // Retourne true si probleme 645 { 646 /* Pour debug 647 cout << " PCIEReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_ 648 << " max_targ_npaq=" << max_targ_npaq << endl; 649 */ 650 if (mid_ >= 0) memgr.FreeMemZone(mid_, MemZS_Filled); 651 mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2; 652 for (int fid=0;fid<(int)nbDma_ ;fid++) mmbufib_[fid]=NULL; 653 if (fgclean) return false; 654 mid_ = memgr.FindMemZoneId(MemZA_Fill); 655 mmbuf_ = memgr.GetMemZone(mid_); 656 if (mmbuf_==NULL) return true; 657 for (int fid=0;fid<(int)nbDma_ ;fid++) mmbufib_[fid]=memgr.GetMemZone(mid_,fid); 658 return false; 659 } 660 661 /* 662 bool PCIEMultiReader::MZoneManage(int zone,bool fgclean) // Retourne true si probleme 663 { 664 // Pour debug 665 //cout << " PCIEReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_ 666 << " max_targ_npaq=" << max_targ_npaq << endl; 667 if (mid_[zone] >= 0) memgr[zone]->FreeMemZone(mid_[zone], MemZS_Filled); 668 mmbuf_[zone] = NULL; targ_npaq_[zone] = 0; mid_[zone] = -2; 669 if (fgclean) return false; 670 mid_[zone] = memgr[zone]->FindMemZoneId(MemZA_Fill); 671 mmbuf_[zone] = memgr[zone]->GetMemZone(mid_[zone]); 672 if (mmbuf_[zone]==NULL) return true; 673 return false; 674 } 675 */ 676 677 /* --Methode-- */ 678 void PCIEMultiReader::Stop() 679 { 680 // cout << " PCIEReaderChecker::stop() ........ STOP" <<endl; 681 stop_ = true; 682 683 } 684 685 686 //-------------------------------------------------------------------- 687 // Classe thread de sauvegarde sur fichiers avec gestion multifibres 688 //-------------------------------------------------------------------- 689 690 MultiDataSaver::MultiDataSaver(RAcqMemZoneMgr& mem, string& path, uint_4 nfiles, uint_4 nblocperfile, bool savesig) 691 : memgr(mem) 692 { 693 nfiles_ = nfiles; 694 nblocperfile_ = nblocperfile; 695 nmax_ = nblocperfile_*nfiles_; 696 savesig_ = savesig; // Si false, pas d'ecriture des fichiers FITS du signal 697 stop_ = false; 698 path_ = path; 699 } 700 void MultiDataSaver::Stop() 701 { 702 // cout<< " MultiDataSaver:Stop ........ " << endl; 703 stop_=true; 704 705 } 706 void MultiDataSaver::run() 707 { 708 setRC(1); 709 BRPaqChecker pcheck[MAXNBFIB]; // Verification/comptage des paquets 710 711 try { 712 TimeStamp ts; 713 cout << " MultiDataSaver::run() - Starting " << ts << " \n NbFiles=" << nfiles_ << " NBloc/File=" 714 << nblocperfile_ << " NMaxMemZones=" << nmax_ << endl; 715 char fname[512]; 716 717 sprintf(fname,"%s/msaver.log",path_.c_str()); 718 ofstream filog(fname); 719 filog << " MultiDataSaver::run() - starting log file " << ts << " NFibres= " << memgr.NbFibres() << endl; 720 filog << " NbFiles=" << nfiles_ << " NBloc/File=" << nblocperfile_ << " NMaxMemZones=" << nmax_ << endl; 721 722 // Fichiers entete ascii et signal FITS 723 ofstream header[MAXNBFIB]; 724 MiniFITSFile mff[MAXNBFIB]; 725 726 uint_4 fnum=0; 727 uint_4 paqsz = memgr.PaqSize(); 728 cout << " ============================ MultiDataSaver::run() PaqSize " << paqsz <<endl; 729 bool fgnulldev = false; 730 if (path_ == "/dev/null") { 731 cout << " MultiDataSaver::run()/Warning /dev/null path specified, filenames=/dev/null" << endl; 732 fgnulldev = true; 733 } 734 for (uint_4 nbFile=0;nbFile<nfiles_ ;nbFile++) { 735 if (stop_ ) break; 736 if (memgr.GetRunState() == MemZR_Stopped) break; 737 738 if (savesig_) 739 for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) { 740 if (fgnulldev) strcpy(fname,"/dev/null"); 741 else sprintf(fname,"%s/Fibre%d/HDRfits%d.txt",path_.c_str(),fib+1,fnum); 742 header[fib].open(fname); 743 } 744 BRPaquet paq0(NULL, NULL, paqsz); 745 uint_4 npaqperfile = memgr.NbPaquets()*nblocperfile_; // Nombre de paquets ecrits dans un fichier 746 747 if (savesig_) { //Reza - Ouverture conditionnel fichier 748 for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) { 749 if (fgnulldev) strcpy(fname,"/dev/null"); 750 else sprintf(fname,"%s/Fibre%d/signal%d.fits",path_.c_str(),fib+1,(int)fnum); 751 mff[fib].Open(fname,MF_Write); //Reza - Ouverture conditionnel fichier 752 // Entete correspondant a l'ecriture tout le paquet - trailer compris (modif Mai 2009) 753 mff[fib].setDTypeNaxis(MF_Byte, paq0.PaquetSize(), npaqperfile); 754 } 755 fnum++; 756 // Sans TRAILER de paquet mff.setDTypeNaxis(MF_Byte, paq0.DataSize()+paq0.HeaderSize(), npaqperfile); 757 } 758 else sprintf(fname,"MemDataBloc[%d]-NoDataFile",(int)fnum++); 759 760 for (uint_4 kmz=0; kmz<nblocperfile_; kmz++) { 761 if (stop_) break; 762 //DBG cout << " MultiDataSaver::run()- nbFile=" << nbFile << " kmz=" << kmz << endl; 763 int mid = memgr.FindMemZoneId(MemZA_Save); 764 Byte* buffg = memgr.GetMemZone(mid); 765 if (buffg == NULL) { 766 cout << " MultiDataSaver::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl; 767 setRC(2); 768 return; 769 } 770 for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) { // Boucle sur les fibres 771 Byte* buff = memgr.GetMemZone(mid,fib); 772 if (buff == NULL) { // Ceci ne devrait pas arriver - suite au test buffg ci-dessus 773 cout << " MultiDataSaver::run()/ERROR memgr.GetMemZone(" << mid << "," << fib << ") -> NULL" << endl; 774 setRC(2); 775 return; 776 } 777 for(uint_4 i=0; i<memgr.NbPaquets(); i++) { // boucle sur les paquets 778 BRPaquet paq(NULL, buff+i*paqsz, paqsz); 779 pcheck[fib].Check(paq); // Verification du paquet / FrameCounter 780 if (savesig_) 781 header[fib] << hex << paq.HDRMarker() << " " << paq.TRLMarker() << " " 782 << paq.TimeTag2()<< " "<< paq.TimeTag1()<< " " 783 << paq.FrameCounter() << " " << paq.PaqLen() << endl; 784 if (savesig_) // Reza - Ecriture conditionnel fichier fits signal 785 mff[fib].WriteB(paq.Header(),paq.PaquetSize()); // ecriture tout le paquet (modif Mai 2009) 786 } // Fin de la boucle sur les paquets 787 } // Fin de la boucle sur les fibres 788 memgr.FreeMemZone(mid, MemZS_Saved); 789 } // Boucle sur les blocs dans un meme fichier 790 ts.SetNow(); 791 filog << ts << " : OK data files " << endl; 792 cout << " MultiDataSaver::run() " << ts << " : OK data files " << endl; 793 for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) { 794 if (savesig_) { 795 if (fgnulldev) strcpy(fname,"/dev/null"); 796 else sprintf(fname,"%s/Fibre%d/signal%d.fits",path_.c_str(),fib+1,(int)fnum-1); 797 } 798 else sprintf(fname,"MemDataBloc[%d]-NoDataFile",(int)fnum-1); 799 filog << " Fib " << fib << " -> " << fname << endl; 800 cout << " Fib " << fib << " -> " << fname << endl; 801 } 802 if (savesig_) 803 for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) { 804 header[fib].close(); 805 mff[fib].Close(); 806 } 807 } // Fin de boucle sur les fichiers 808 cout << " -------------------- MultiDataSaver::run() -------------------- " << endl; 809 for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) { 810 cout << " MultiDataSaver/Summary Fib " << fib << endl; 811 pcheck[fib].Print(cout); 812 filog << " MultiDataSaver/Summary Fib " << fib << endl; 813 pcheck[fib].Print(filog); 814 } 815 cout << " ---------------------------------------------------------- " << endl; 816 ts.SetNow(); 817 filog << " MultiDataSaver::run() - End of processing/run() " << ts << endl; 818 819 } 820 catch (MiniFITSException& exc) { 821 cout << " MultiDataSaver::run()/catched MiniFITSException " << exc.Msg() << endl; 822 setRC(3); 823 return; 824 } 825 catch(...) { 826 cout << " MultiDataSaver::run()/catched unknown ... exception " << endl; 827 setRC(4); 828 return; 829 } 830 setRC(0); 831 return; 832 } 833
Note:
See TracChangeset
for help on using the changeset viewer.