Changeset 3658 in Sophya for trunk/AddOn/TAcq/racqurw.cc


Ignore:
Timestamp:
Oct 18, 2009, 11:10:33 AM (16 years ago)
Author:
ansari
Message:

1/ Gestion multi-fibre ds RAcqMemZoneMgr et les DMAReader/DiskWriter (brproc.cc)
2/ Possibilite d'ajout de mot cle ds l'entete FITS par la classe MiniFITSFile

Reza , 18/10/2009

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/AddOn/TAcq/racqurw.cc

    r3644 r3658  
    5959void PCIEReader::run()
    6060{
    61   struct sigaction act;
    62   struct sigaction old_act;
    6361   //Precision insuffisante  ResourceUsage ru; ru.Update();  // Pour recuperer le temps passe
    6462  struct timeval tv1, tv2;
     
    7068  setRC(1);     
    7169
    72   uint_4 cpt=0;
     70 
    7371  // sigaddset(&act.sa_mask,SIGINT);  // pour proteger le transfert DMA
    7472  //sigaction(SIGINT,&act,NULL); 
     
    232230    uint_4 fnum=0;
    233231    uint_4 paqsz = memgr.PaqSize();
     232    cout << " ============================ DataSaver::run() PaqSize " << paqsz <<endl;
    234233    bool fgnulldev = false;
    235234    if (path_ == "/dev/null") {
     
    253252        // Entete correspondant a l'ecriture tout le paquet - trailer compris (modif Mai 2009)
    254253        mff.setDTypeNaxis(MF_Byte, paq0.PaquetSize(), npaqperfile);
     254
    255255        // Sans TRAILER de paquet mff.setDTypeNaxis(MF_Byte, paq0.DataSize()+paq0.HeaderSize(), npaqperfile);
    256256      }
     
    353353  cout << " ... RAcqMemZoneMgr not used - using s fixed memory location for packets decoding ..." << endl;
    354354
    355   uint_4 cpt=0;
     355 
    356356  // sigaddset(&act.sa_mask,SIGINT);  // pour proteger le transfert DMA
    357357  //sigaction(SIGINT,&act,NULL);       
     
    363363
    364364  Byte* Datas = NULL;
    365   Byte* locdata = new Byte[paqsz];
     365  Byte* locdata = new Byte[paqsz*memgr.NbPaquets()*memgr.NbZones()];
    366366  Byte* tampon = new Byte[paqsz];
    367367
     
    375375    if (fgarret) break;
    376376
    377     Byte* nextdma = locdata;
     377    Byte* nextdma = locdata+((kmz%memgr.NbZones())*(paqsz*memgr.NbPaquets()));
    378378    uint_4 npaqfait = 0;     
    379379    //      for (uint_4 i=0; i<memgr.NbPaquets(); i += pktInDMATr) {  // attention pktInDMATr paquets dans 1 seul DMA
     
    419419          while((curoff+paqsz)<=dmasz) {
    420420            //        BRPaquet paq((Byte*)(Datas)+((paqsz*j)), nextdma+j*paqsz, paqsz, swapall_);
    421             BRPaquet paq(Datas+curoff, locdata, paqsz, swapall_);
     421            //  BRPaquet paq(Datas+curoff, locdata, paqsz, swapall_);
     422            BRPaquet paq(Datas+curoff, nextdma+npaqfait*paqsz, paqsz, swapall_);
    422423            curoff += paqsz;  // On avance l'index dans le buffer du DMA
    423424            npaqfait++;  // Ne pas oublier le compteur de paquets faits
     
    457458
    458459}
     460////////////////////////////////////////////////////////////////////////////////////////////////////////
     461//----------------------------------------------------------------------------------------------------------
     462// Classe thread de lecture PCI-Express + Check pour tests de verification de debit/etc avec un seul thread
     463//----------------------------------------------------------------------------------------------------------
     464
     465/* --Methode-- */
     466PCIEMultiReader::PCIEMultiReader(vector<PCIEWrapperInterface*> vec_pciw,uint_4 sizeFrame,uint_4 packSize ,RAcqMemZoneMgr& mem, uint_4 nmax, BRDataFmtConv swapall,int binMin, int nbBin)
     467  :   memgr(mem) , vec_pciw_ (vec_pciw)
     468{
     469  nmax_ = nmax;
     470  swapall_ = swapall;   // select data swap/format conversion for BRPaquet
     471  stop_ = false;
     472  packSize_ = packSize;
     473  packSizeInMgr_=memgr.PaqSize();
     474  sizeFr_ =sizeFrame;
     475  binMin_= binMin;
     476  nbBin_= nbBin;
     477  if (vec_pciw.size() != memgr.NbFibres()) {
     478    cout << " PCIEMultiReader()PbArgs: vec_pciw.size()= " << vec_pciw.size() << " memgr.NbFibres()=" <<memgr.NbFibres()<< endl;
     479    throw ParmError("PCIEMultiReader:ERROR/ arguments incompatibles vec_pciw.size() != memgr.NbFibres() ");
     480  }
     481  if (vec_pciw.size() > MAXNBFIB)
     482    throw ParmError("PCIEMultiReader:ERROR/  vec_pciw.size() > MAXNBFIB ");
     483  nbDma_= vec_pciw.size();
     484  mid_=-2;
     485  mmbuf_=NULL;
     486  max_targ_npaq = memgr.NbPaquets();
     487  for (int fid=0 ; fid<(int)nbDma_ ;fid++) mmbufib_[fid]=NULL;
     488}
     489
     490/* --Methode-- */
     491void PCIEMultiReader::run()
     492{
     493 
     494  struct timeval tv1,tv2;
     495  gettimeofday(&tv1, NULL);
     496 
     497  cout << " PCIEMultiReader::run() - Starting , NMaxMemZones=" << nmax_
     498       << " memgr.NbPaquets()=" << memgr.NbPaquets() << "Paqsize " << packSize_<< endl;
     499  setRC(1);     
     500
     501  // sigaddset(&act.sa_mask,SIGINT);  // pour proteger le transfert DMA
     502  //sigaction(SIGINT,&act,NULL);       
     503  // uint_4 paqsz = memgr[0]->PaqSize();
     504  uint_4 paqsz =  packSize_;
     505  uint_4 dmasz = vec_pciw_[0]->TransferSize();
     506  vec_pciw_[0]->StartTransfers();
     507
     508  BRPaqChecker pcheck[MAXNBFIB];  // Verification/comptage des paquets 
     509  Byte* Datas[MAXNBFIB];
     510  Byte* tampon[MAXNBFIB] ;
     511  Byte* nextpaq=NULL;
     512  uint_4 off_acheval=0;
     513 
     514  int nerrdma = 0;
     515  int maxerrdma = 10;
     516  bool fgarret = false;
     517 
     518  // Initialisation des tampons pour recopie des paquets a cheval pour chaque DMA
     519  for (int i=0;i< (int)nbDma_ ;i++) { 
     520    tampon[i]=   new Byte[paqsz];
     521  }
     522 
     523  ofstream header[MAXNBFIB];
     524  for(uint_4 fib=0; fib<nbDma_; fib++) {
     525    char hfnm[128];
     526    sprintf(hfnm, "./HDRCountPaqs%d.txt", fib);
     527    header[fib].open(hfnm);
     528  }
     529   
     530  uint_4 npaqfait[MAXNBFIB] ;
     531  for (int i=0;i< (int)nbDma_ ;i++) npaqfait[i]=0;
     532    // Byte* nextdma = locdata+((kmz%memgr.NbZones())*(paqsz*memgr.NbPaquets()));
     533  uint_4 npaqfaitg = 0;     
     534  //      for (uint_4 i=0; i<memgr.NbPaquets(); i += pktInDMATr) {  // attention pktInDMATr paquets dans 1 seul DMA
     535  while (npaqfaitg < nmax_*memgr.NbPaquets()) {  // Boucle global G
     536    if (fgarret) break;
     537    if (stop_) break;
     538   
     539    // Lancement des DMA
     540    for (int dma=0; dma < (int)nbDma_ ;dma++) vec_pciw_[dma]->StartTransfers();
     541   
     542    // On pointe vers le debut de la zone a remplir aver le prochain DMA
     543    //-- Zone memoire locale Byte* nextdma = buff+i*paqsz;
     544   
     545    bool fgbaddma=false;
     546    // On boucle sur les nbDma_ en attente de leurs terminaison   
     547    for (int dma=0; dma <(int) nbDma_ ;dma++)  {
     548      Datas[dma]=vec_pciw_[dma]->GetData();
     549      if (Datas[dma] == NULL) { // No data Read in DMA
     550        nerrdma ++;   fgbaddma=true;
     551        cout << "PCIEMultiReaderChecker/Erreur Waiting for datas ..." << endl;
     552        vec_pciw_[dma]->PrintStatus(cout);
     553        if (nerrdma>=maxerrdma) { fgarret = true; break; }
     554      }
     555    }
     556    if (fgbaddma) continue; 
     557    uint_4 curoff=0;
     558    //1- On traite le paquet a cheval, rempli partiellement avec le DMA d'avant si necessaire pour les n fibres
     559    if (off_acheval  > 0) {  // IF Numero B
     560      if ((paqsz-off_acheval)< dmasz) {  // IF Numero A
     561        for(uint_4 fib=0; fib<nbDma_; fib++)
     562          memcpy((void *)((tampon[fib])+off_acheval), (void *)Datas[fib], paqsz-off_acheval);
     563        curoff = paqsz-off_acheval;  off_acheval = 0;
     564        if ( MoveToNextTarget() ) {
     565          cout << "PCIEMultiReader::run()/Error-A- MoveToNextTarget() returned true ->STOP 9" << endl;
     566          setRC(9);  fgarret=true; break;
     567        }
     568        for(uint_4 fib=0; fib<nbDma_; fib++) {
     569          nextpaq=GetPaquetTarget(fib);
     570          if (nextpaq == NULL) { // Cela ne devrait pas arriver
     571            cout << "PCIEReader::run()/Error-A2- GetPaquetTarget(fib)  returned NULL ->STOP 9" << endl;
     572            setRC(9);  fgarret=true; break;
     573          }
     574          BRPaquet paq(tampon[fib], nextpaq, paqsz, swapall_,binMin_,nbBin_);
     575          npaqfait[fib]++;
     576          if (fib==nbDma_-1) npaqfaitg++;  // Ne pas oublier le compteur de paquets faits
     577          pcheck[fib].Check(paq);   // Verification du paquet / FrameCounter
     578          header[fib] << dec << paq.FrameCounter()<< endl; ;
     579        }
     580      }
     581      else {  // se rapporte au IF numero A
     582        for(uint_4 fib=0; fib<nbDma_; fib++)
     583          memcpy((void *)(tampon[fib]+off_acheval), (void *)Datas[fib], dmasz);
     584        curoff =dmasz;
     585        off_acheval = (dmasz+off_acheval);
     586      }
     587    }  // Fin IF Numero B
     588
     589    //2- On traite les paquets complets qui se trouvent dans la zone du DMA
     590    while ((curoff+paqsz)<=dmasz) {  // while numero C
     591      //          if ((dma==nbDma_-1)&&(npaqfait >= nmax_* memgr.NbPaquets())) break;
     592      if ( MoveToNextTarget() ) {
     593        cout << "PCIEMultiReader::run()/Error-B- MoveToNextTarget() returned true ->STOP 9" << endl;
     594        setRC(9);  fgarret=true; break;
     595      }
     596      for(uint_4 fib=0; fib<nbDma_; fib++) {
     597        if (npaqfait[fib] >= nmax_*memgr.NbPaquets())  continue;
     598        nextpaq=GetPaquetTarget(fib);
     599        if (nextpaq == NULL) { // Cela ne devrait pas arriver
     600          cout << "PCIEReader::run()/Error-B2- GetPaquetTarget(fib)  returned NULL ->STOP 9" << endl;
     601          setRC(9);  fgarret=true; break;
     602        }
     603        BRPaquet paq(Datas[fib]+curoff, nextpaq, paqsz, swapall_,binMin_,nbBin_);
     604        npaqfait[fib]++;
     605        if (fib==nbDma_-1) npaqfaitg++;  // Ne pas oublier le compteur de paquets faits
     606        pcheck[fib].Check(paq);   // Verification du paquet / FrameCounter
     607        header[fib] << dec << paq.FrameCounter()<< endl; ;
     608      }
     609      curoff += paqsz;  // On avance l'index dans le buffer du DMA
     610    } // -- FIN traitement des paquets complets ds un DMA - FIN du while numero C
     611      //3- On copie si besoin la fin du DMA dans la zone tampon
     612    if (curoff < dmasz) {  // IF numero D
     613      off_acheval = dmasz-curoff;
     614      for(uint_4 fib=0; fib<nbDma_; fib++)
     615        memcpy(tampon[fib], (void*)(Datas[fib]+curoff), off_acheval);
     616      // ne sert a rien         curoff += off_acheval;
     617    } // FIN du if numero D
     618  }  //   FIN  Boucle global G
     619
     620   
     621  setRC(0);
     622  gettimeofday(&tv2, NULL);
     623  double tmelaps2 = (tv2.tv_sec-tv1.tv_sec)*1000.+(tv2.tv_usec-tv1.tv_usec)/1000.;
     624  if (tmelaps2<0.1) tmelaps2=0.1;
     625  cout << " ---------- PCIEMultiReader::run()-End summary NPaqFait=" << npaqfaitg << "------------- " << endl;
     626  for (int dma=0; dma < (int)nbDma_ ;dma++)   {
     627    cout << " --Fib=" << dma << " NPaqFait=" << npaqfait[dma] <<  " TotTransfer="
     628         << vec_pciw_[dma]->TotTransferBytes()/1024
     629         << " kb , ElapsTime=" << tmelaps2 << " ms ->"
     630         << (double)vec_pciw_[dma]->TotTransferBytes()/tmelaps2 << " kb/s" << endl;     
     631    pcheck[dma].Print(cout);
     632  }
     633  cout << " --------------------------------------------------------------------" << endl;
     634
     635  // //// Nettoyage final
     636  MZoneManage(true);
     637  for(uint_4 fib=0; fib<nbDma_; fib++)  header[fib].close();
     638  for (int i=0;i< (int)nbDma_ ;i++) delete[] tampon[i];
     639  //DBG  cout << " fin thread ========================" <<endl; 
     640  return;
     641}
     642
     643/* --Methode-- */
     644bool PCIEMultiReader::MZoneManage(bool fgclean)    // Retourne true si probleme
     645{
     646  /* Pour debug
     647  cout << " PCIEReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_ 
     648       << " max_targ_npaq=" << max_targ_npaq << endl;
     649  */
     650  if (mid_ >= 0)  memgr.FreeMemZone(mid_, MemZS_Filled);
     651  mmbuf_ = NULL;  targ_npaq_ = 0;  mid_ = -2;
     652  for (int fid=0;fid<(int)nbDma_ ;fid++) mmbufib_[fid]=NULL;
     653  if (fgclean)  return false;
     654  mid_ = memgr.FindMemZoneId(MemZA_Fill);
     655  mmbuf_ = memgr.GetMemZone(mid_);
     656  if (mmbuf_==NULL)   return true;
     657  for (int fid=0;fid<(int)nbDma_ ;fid++) mmbufib_[fid]=memgr.GetMemZone(mid_,fid);
     658  return false;
     659}
     660
     661/*
     662bool PCIEMultiReader::MZoneManage(int zone,bool fgclean)    // Retourne true si probleme
     663{
     664  // Pour debug
     665  //cout << " PCIEReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_ 
     666       << " max_targ_npaq=" << max_targ_npaq << endl;
     667  if (mid_[zone] >= 0)  memgr[zone]->FreeMemZone(mid_[zone], MemZS_Filled);
     668  mmbuf_[zone] = NULL;  targ_npaq_[zone] = 0;  mid_[zone] = -2;
     669  if (fgclean)  return false;
     670  mid_[zone] = memgr[zone]->FindMemZoneId(MemZA_Fill);
     671  mmbuf_[zone] = memgr[zone]->GetMemZone(mid_[zone]);
     672  if (mmbuf_[zone]==NULL) return true;
     673  return false;
     674}
     675*/
     676
     677/* --Methode-- */
     678void PCIEMultiReader::Stop()
     679{
     680  // cout << " PCIEReaderChecker::stop()  ........ STOP" <<endl;
     681  stop_ = true;
     682
     683}
     684
     685
     686//--------------------------------------------------------------------
     687// Classe thread de sauvegarde sur fichiers avec gestion multifibres
     688//--------------------------------------------------------------------
     689
     690MultiDataSaver::MultiDataSaver(RAcqMemZoneMgr& mem, string& path, uint_4 nfiles, uint_4 nblocperfile, bool savesig)
     691  :  memgr(mem)
     692{
     693  nfiles_ = nfiles;
     694  nblocperfile_ = nblocperfile;
     695  nmax_ = nblocperfile_*nfiles_;
     696  savesig_ = savesig;  // Si false, pas d'ecriture des fichiers FITS du signal 
     697  stop_ = false;       
     698  path_ = path;
     699}
     700void MultiDataSaver::Stop()
     701{
     702  // cout<< " MultiDataSaver:Stop ........ " << endl;
     703  stop_=true;
     704
     705}
     706void MultiDataSaver::run()
     707{
     708  setRC(1);     
     709  BRPaqChecker pcheck[MAXNBFIB];  // Verification/comptage des paquets
     710
     711  try {
     712    TimeStamp ts;
     713    cout << " MultiDataSaver::run() - Starting " << ts << " \n   NbFiles=" << nfiles_ << " NBloc/File="
     714         << nblocperfile_ << " NMaxMemZones=" << nmax_ << endl;
     715    char fname[512];
     716
     717    sprintf(fname,"%s/msaver.log",path_.c_str());
     718    ofstream filog(fname);
     719    filog << " MultiDataSaver::run() - starting log file " << ts << " NFibres= " << memgr.NbFibres() << endl;                 
     720    filog << " NbFiles=" << nfiles_ << " NBloc/File="  << nblocperfile_ << " NMaxMemZones=" << nmax_ << endl;
     721       
     722    // Fichiers entete ascii et signal FITS
     723    ofstream header[MAXNBFIB];
     724    MiniFITSFile mff[MAXNBFIB];
     725
     726    uint_4 fnum=0;
     727    uint_4 paqsz = memgr.PaqSize();
     728    cout << " ============================ MultiDataSaver::run() PaqSize " << paqsz <<endl;
     729    bool fgnulldev = false;
     730    if (path_ == "/dev/null") {
     731      cout << " MultiDataSaver::run()/Warning /dev/null path specified, filenames=/dev/null" << endl;
     732      fgnulldev = true;
     733    }
     734    for (uint_4 nbFile=0;nbFile<nfiles_ ;nbFile++) {
     735      if (stop_ )   break;
     736      if (memgr.GetRunState() == MemZR_Stopped) break;
     737
     738      if (savesig_)
     739        for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) {
     740          if (fgnulldev) strcpy(fname,"/dev/null");
     741          else sprintf(fname,"%s/Fibre%d/HDRfits%d.txt",path_.c_str(),fib+1,fnum);
     742          header[fib].open(fname);
     743        }
     744      BRPaquet paq0(NULL, NULL, paqsz);
     745      uint_4 npaqperfile = memgr.NbPaquets()*nblocperfile_;  // Nombre de paquets ecrits dans un fichier
     746
     747      if (savesig_) { //Reza - Ouverture conditionnel fichier
     748        for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) {
     749          if (fgnulldev) strcpy(fname,"/dev/null");
     750          else sprintf(fname,"%s/Fibre%d/signal%d.fits",path_.c_str(),fib+1,(int)fnum);
     751          mff[fib].Open(fname,MF_Write);  //Reza - Ouverture conditionnel fichier
     752          // Entete correspondant a l'ecriture tout le paquet - trailer compris (modif Mai 2009)
     753          mff[fib].setDTypeNaxis(MF_Byte, paq0.PaquetSize(), npaqperfile);
     754        }
     755        fnum++;
     756        // Sans TRAILER de paquet mff.setDTypeNaxis(MF_Byte, paq0.DataSize()+paq0.HeaderSize(), npaqperfile);
     757      }
     758      else sprintf(fname,"MemDataBloc[%d]-NoDataFile",(int)fnum++);
     759
     760      for (uint_4 kmz=0; kmz<nblocperfile_; kmz++) {
     761        if (stop_) break;
     762          //DBG cout << " MultiDataSaver::run()- nbFile=" << nbFile << " kmz=" << kmz << endl; 
     763        int mid = memgr.FindMemZoneId(MemZA_Save); 
     764        Byte* buffg = memgr.GetMemZone(mid);
     765        if (buffg == NULL) {
     766          cout << " MultiDataSaver::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
     767          setRC(2);     
     768          return;       
     769        }
     770        for(uint_4 fib=0; fib<memgr.NbFibres(); fib++)  {  // Boucle sur les fibres
     771          Byte* buff = memgr.GetMemZone(mid,fib);
     772          if (buff == NULL) {  // Ceci ne devrait pas arriver - suite au test buffg ci-dessus
     773            cout << " MultiDataSaver::run()/ERROR memgr.GetMemZone(" << mid << "," << fib << ") -> NULL" << endl;
     774            setRC(2);   
     775            return;     
     776          }
     777          for(uint_4 i=0; i<memgr.NbPaquets(); i++) {  // boucle sur les paquets
     778            BRPaquet paq(NULL, buff+i*paqsz, paqsz);
     779            pcheck[fib].Check(paq);   // Verification du paquet / FrameCounter
     780            if (savesig_)
     781              header[fib] << hex << paq.HDRMarker() << " " << paq.TRLMarker() << " "
     782                          << paq.TimeTag2()<< " "<< paq.TimeTag1()<< " "
     783                          << paq.FrameCounter() << " " << paq.PaqLen()  << endl;
     784            if (savesig_) // Reza - Ecriture conditionnel fichier fits signal
     785              mff[fib].WriteB(paq.Header(),paq.PaquetSize()); // ecriture tout le paquet (modif Mai 2009)
     786          } // Fin de la boucle sur les paquets
     787        }  // Fin de la boucle sur les fibres
     788        memgr.FreeMemZone(mid, MemZS_Saved);
     789      }  // Boucle sur les blocs dans un meme fichier
     790      ts.SetNow();
     791      filog << ts << " : OK data files " << endl;
     792      cout << " MultiDataSaver::run() " << ts << " : OK data files  " << endl;
     793      for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) {
     794        if (savesig_) {
     795          if (fgnulldev) strcpy(fname,"/dev/null");
     796          else sprintf(fname,"%s/Fibre%d/signal%d.fits",path_.c_str(),fib+1,(int)fnum-1);
     797        }
     798        else sprintf(fname,"MemDataBloc[%d]-NoDataFile",(int)fnum-1);
     799        filog << "    Fib " << fib << " -> " << fname << endl;
     800        cout << "    Fib " << fib << " -> " << fname << endl;
     801      }
     802      if (savesig_)
     803        for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) {
     804          header[fib].close();
     805          mff[fib].Close();       
     806        }
     807    }  // Fin de boucle sur les fichiers
     808    cout << " --------------------  MultiDataSaver::run() -------------------- " << endl;
     809    for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) {
     810      cout << " MultiDataSaver/Summary Fib " << fib << endl;
     811      pcheck[fib].Print(cout);
     812      filog << " MultiDataSaver/Summary Fib " << fib << endl;
     813      pcheck[fib].Print(filog);
     814    }
     815    cout << " ---------------------------------------------------------- " << endl;
     816    ts.SetNow();
     817    filog << " MultiDataSaver::run() - End of processing/run() " << ts << endl;               
     818
     819  }
     820  catch (MiniFITSException& exc) {
     821    cout << " MultiDataSaver::run()/catched MiniFITSException " << exc.Msg() << endl;
     822    setRC(3);   
     823    return;
     824  }
     825  catch(...) {
     826    cout << " MultiDataSaver::run()/catched unknown ... exception " << endl;
     827    setRC(4);   
     828    return;
     829  }
     830  setRC(0);
     831  return;
     832}   
     833
Note: See TracChangeset for help on using the changeset viewer.