Changeset 3681 in Sophya for trunk/AddOn/TAcq/racquproc.cc


Ignore:
Timestamp:
Nov 20, 2009, 12:46:54 PM (16 years ago)
Author:
ansari
Message:

Ajout classe de traitement en ligne (monitoring) MonitorProc , Reza 20/11/2009

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/AddOn/TAcq/racquproc.cc

    r3671 r3681  
    11#include "racquproc.h"
     2
     3/* ----------------------------------------
     4     Projet BAORadio --- LAL
     5     2008 - 2010
     6-------------------------------------------  */
    27
    38#include <stdlib.h>
     
    1924#include "brpaqu.h"
    2025#include "minifits.h"
    21 struct sigaction act;
    22 
     26
     27/* Fonction, module^2 des nombres complexes */
     28static inline r_4 Zmod2(complex<r_4> z)
     29{ return (z.real()*z.real()+z.imag()*z.imag()); }
     30static inline r_8 Zmod2(complex<r_8> z)
     31{ return (z.real()*z.real()+z.imag()*z.imag()); }
     32static inline r_4 Zmod2(TwoByteComplex z)
     33{ return ((r_4)z.realB()*(r_4)z.realB()+(r_4)z.imagB()*(r_4)z.imagB()); }
     34
     35//---------------------------------------------------------------------
     36// Classe thread de traitement/monitoring multifibre Raw/FFT
     37//---------------------------------------------------------------------
     38/* --Methode-- */
     39MonitorProc::MonitorProc(RAcqMemZoneMgr& mem)
     40  :  memgr(mem)
     41{
     42  BRAcqConfig bpar;
     43  par_.Set(bpar.GetParams());
     44  nmax_ = par_.nmaxProc;
     45  if (nmax_==0) nmax_=par_.MaxNbBlocs();
     46  nmean_ = par_.nmeanProc;
     47  step_ = par_.stepProc;
     48  stop_ = false;       
     49  path_ = bpar.OutputDirectory();
     50  nfiles_ = 0;
     51  nblocproc_ = 0;
     52  nprocpaq_=0;
     53  npaqsamefc_=0;
     54  totnprocpaq_=0;
     55  totnpaqsamefc_=0;
     56  curfc_.SetSize(memgr.NbFibres(), memgr.NbPaquets());
     57  cpaqok_.SetSize(memgr.NbFibres(), memgr.NbPaquets());
     58}
     59
     60/* --Methode-- */
     61MonitorProc::~MonitorProc()
     62{
     63  //  cout << " **** DBG ***** MonitorProc::~MonitorProc() " << endl;
     64}
     65
     66/* --Methode-- */
     67void MonitorProc::Stop()
     68{
     69 stop_=true;
     70}
     71
     72/* --Methode-- */
     73void MonitorProc::run()
     74{
     75  setRC(1);     
     76  int rc=0;
     77  try {
     78    TimeStamp ts;
     79    cout << " MonitorProc::run() - Starting " << ts << " NMaxMemZones=" << nmax_
     80         << " NMean=" << nmean_ << " Step=" << step_ << endl;   
     81    cout << " MonitorProc::run()... - Output Data Path: " << path_ << endl;
     82    char fname[512];
     83    sprintf(fname,"%s/monproc.log",path_.c_str());
     84    ofstream filog(fname);
     85    filog << " MonitorProc::run() - starting log file " << ts << endl;                 
     86    filog << " ... NMaxMemZones=" << nmax_ << " NMean=" << nmean_ << " Step=" << step_ << endl;
     87    uint_4 paqsz = memgr.PaqSize();
     88    BRPaquet pq(paqsz);
     89    if (par_.fgsinglechannel) {
     90      spectre_.SetSize(memgr.NbFibres(), pq.DataSize()/2);
     91      for(int kc=0; kc<memgr.NbFibres(); kc++)  nzm_.push_back(0);
     92      rc=procData1C(filog);
     93    }
     94    else {
     95      spectre_.SetSize(2*memgr.NbFibres(), pq.DataSize()/4);
     96      for(int kc=0; kc<2*memgr.NbFibres(); kc++)  nzm_.push_back(0);
     97      rc=procData2C(filog);
     98    }
     99    cout << " ---- MonitorProc::run()/End NBlocProcessed=" << nblocproc_
     100          << " NFiles=" << nfiles_ << " Rc=" << rc << endl;
     101    ts.SetNow();
     102    filog << " ---- MonitorProc::run()/End " << ts << endl;
     103    filog << " --------- MonitorProc::run()/End NBlocProcessed=" << nblocproc_
     104          << " NFiles=" << nfiles_ << " Rc=" << rc << endl;
     105  }
     106  catch (std::exception& exc) {
     107    cout << " MonitorProc::run()/catched std::exception " << exc.what() << endl;
     108    setRC(98); 
     109    return;
     110  }
     111  catch(...) {
     112    cout << " MonitorProc::run()/catched unknown ... exception " << endl;
     113    setRC(99); 
     114    return;
     115  }
     116
     117  setRC(rc);
     118  return;
     119}   
     120
     121/* --Methode-- */
     122int MonitorProc::procData1C(ofstream& logf)
     123{
     124  cout << " MonitorProc::procData1C() - NOT IMPLEMENTED -> Rc=67" << endl;
     125  logf << " MonitorProc::procData1C() - NOT IMPLEMENTED -> Rc=67" << endl;
     126  return 67; 
     127}
     128
     129/* --Methode-- */
     130int MonitorProc::procData2C(ofstream& filog)
     131{
     132  BRPaqChecker pcheck[MAXNBFIB];  // Verification/comptage des paquets
     133// Initialisation pour calcul FFT
     134  uint_4 paqsz = memgr.PaqSize();
     135  BRPaquet pq(paqsz);
     136  TVector<r_4> vx(pq.DataSize()/2);
     137  vx = (r_4)(0.);
     138  TVector< complex<r_4> > cfour(pq.DataSize()/4+1);  // composant TF
     139 
     140  fftwf_plan plan = fftwf_plan_dft_r2c_1d(vx.Size(), vx.Data(),
     141                                          (fftwf_complex *)cfour.Data(), FFTW_ESTIMATE);
     142  TimeStamp ts;   
     143  char fname[512];
     144  nfiles_ = 0;                                 
     145  for (uint_4 kmz=0; kmz<nmax_; kmz++) {
     146    if (stop_) break;
     147    if (memgr.GetRunState() == MemZR_Stopped) break;
     148
     149    int mid = memgr.FindMemZoneId(MemZA_Proc);
     150    Byte* buffg = memgr.GetMemZone(mid);
     151    if (buffg == NULL) {
     152      cout << " MonitorProc::procData2C()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
     153      break;   
     154    }
     155    if ((step_>1)&&(kmz%step_ != 0))  {
     156      memgr.FreeMemZone(mid, MemZS_Proc);
     157      continue;
     158    }
     159    sa_size_t lc=0;
     160    Byte* fbuff[MAXNBFIB];
     161    for(uint_4 fib=0; fib<memgr.NbFibres(); fib++)  {  // Boucle sur les fibres
     162      fbuff[fib] = memgr.GetMemZone(mid,fib);
     163      if (fbuff[fib] == NULL) { // cela ne devrait pas arriver
     164        cout << " MonitorProc::procData2C()/ERROR memgr.GetMemZone(" << mid << "," << fib << ") -> NULL" << endl;
     165        return 9;       
     166      }
     167    }
     168   
     169    cpaqok_ = (uint_1)0;
     170    curfc_ = (uint_8)0;
     171    for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) {
     172      for(uint_4 i=0; i<memgr.NbPaquets(); i++) {
     173        BRPaquet paq(fbuff[fib]+i*paqsz, paqsz);
     174        bool pqok=pcheck[fib].Check(paq,curfc_(fib,i));   // Verification du paquet / FrameCounter
     175        if (!pqok)  continue;
     176        cpaqok_(fib,i) = 1;
     177        sa_size_t lc=2*fib;
     178        if (par_.fgdatafft) {    // Traitement data de type FFT
     179          TwoByteComplex* tbcp=paq.Data1C();
     180          for(sa_size_t j=1; j<spectre_.NCols(); j++)
     181            spectre_(lc,j) += Zmod2(tbcp[j]);
     182          nzm_[lc]++;   
     183          tbcp=paq.Data2C();
     184          for(sa_size_t j=1; j<spectre_.NCols(); j++)
     185            spectre_(lc,j) += Zmod2(tbcp[j]);
     186          nzm_[lc+1]++;
     187        }
     188        else {   // Traitement RawData
     189          for(sa_size_t j=0; j<vx.Size(); j++)
     190            vx(j) = (r_4)(*(paq.Data1()+j))-127.5;
     191          fftwf_execute(plan);
     192          //   ffts_.FFTForward(vx, cfour_);
     193          for(sa_size_t j=0; j<spectre_.NCols(); j++)
     194            spectre_(lc,j) += Zmod2(cfour(j+1));
     195          nzm_[lc]++;
     196          for(sa_size_t j=0; j<vx.Size(); j++)
     197            vx(j) = (r_4)(*(paq.Data2()+j))-127.5;
     198          fftwf_execute(plan);
     199          //    ffts_.FFTForward(vx, cfour_);
     200          for(sa_size_t j=0; j<spectre_.NCols(); j++)
     201            spectre_(lc,j) += Zmod2(cfour(j+1));
     202          nzm_[lc+1]++;
     203        }       
     204      }  // FIN de la boucle sur les paquets
     205    }   // Boucle sur les fibres
     206    memgr.FreeMemZone(mid, MemZS_Proc);
     207    CheckFrameCounters();
     208
     209    nblocproc_ ++;   
     210    totnprocpaq_ += memgr.NbPaquets();  nprocpaq_ += memgr.NbPaquets();
     211    bool fgnzm=true;
     212    for(int lc=0; lc<2*memgr.NbFibres(); lc++) 
     213      if (nzm_[lc]<nmean_) fgnzm=false;
     214   
     215    if (fgnzm) {
     216      char buff[32];
     217      for(sa_size_t lc=0; lc<2*memgr.NbFibres(); lc++)  {
     218        spectre_.Row(lc) /= (r_4)nzm_[lc];
     219        sprintf(buff,"NPaqMoy%d",(int)lc);
     220        spectre_.Info()[buff] = nzm_[lc];
     221      }
     222      sprintf(fname,"%s/meanspec%d.ppf",path_.c_str(),(int)nfiles_);
     223      nfiles_++;
     224      POutPersist po(fname);
     225      po << PPFNameTag("spectre") << spectre_;       
     226      spectre_ = (r_4)(0.);
     227      for(int lc=0; lc<2*memgr.NbFibres(); lc++)  nzm_[lc]=0;
     228      ts.SetNow();
     229      // Calcul / impression fraction des paquets avec same-framecounter
     230      int fracsame=0;
     231      if (nprocpaq_>0)  fracsame=100*npaqsamefc_/nprocpaq_;
     232      int totfracsame=0;
     233      if (totnprocpaq_>0)  totfracsame=100*totnpaqsamefc_/totnprocpaq_;
     234      filog << ts << " :  proc file (meanspectra) " << fname << endl;
     235      filog << "  NBlocProcessed=" << nblocproc_ << " NSameFC=" << totnpaqsamefc_ << " / " << totnprocpaq_
     236            << " -> " << totfracsame << " % (LastPqs: " << npaqsamefc_ <<  " / " << nprocpaq_
     237            << " -> " << fracsame << " % )" << endl;
     238      cout << " MonitorProc::procData2C() " << ts << " : created file  " << fname << endl;
     239      cout << "  NBlocProcessed=" << nblocproc_ << " NSameFC=" << totnpaqsamefc_ << " / " << totnprocpaq_
     240           << " -> " << totfracsame << " % (LastPqs: " << npaqsamefc_ <<  " / " << nprocpaq_
     241           << " -> " << fracsame << " % )" << endl;
     242      nprocpaq_=npaqsamefc_=0;
     243    }
     244  }   // Fin de boucle sur les kmz ( bloc MemZoneMgr  a traiter )
     245
     246  bool fgnzm=false;
     247  for(int lc=0; lc<2*memgr.NbFibres(); lc++) 
     248    if (nzm_[lc]>0) fgnzm=true;
     249 
     250  if (fgnzm) {
     251    char buff[32];
     252    for(sa_size_t lc=0; lc<2*memgr.NbFibres(); lc++)  {
     253      if (nzm_[lc]>0)  spectre_.Row(lc) /= (r_4)nzm_[lc];
     254      sprintf(buff,"NPaqMoy%d",(int)lc);
     255      spectre_.Info()[buff] = nzm_[lc];
     256    }
     257    sprintf(fname,"%s/meanspec%d.ppf",path_.c_str(),(int)nfiles_);
     258    POutPersist po(fname);
     259    po << PPFNameTag("spectre") << spectre_;       
     260    spectre_ = (r_4)(0.);
     261    for(int lc=0; lc<2*memgr.NbFibres(); lc++)  nzm_[lc]=0;
     262    ts.SetNow();
     263    // Calcul / impression fraction des paquets avec same-framecounter
     264    int fracsame=0;
     265    if (nprocpaq_>0)  fracsame=100*npaqsamefc_/nprocpaq_;
     266    int totfracsame=0;
     267    if (totnprocpaq_>0)  totfracsame=100*totnpaqsamefc_/totnprocpaq_;
     268    filog << ts << " :  proc file (meanspectra) " << fname << endl;                   
     269    filog << "  NBlocProcessed=" << nblocproc_ << " NSameFC=" << totnpaqsamefc_ << " / " << totnprocpaq_
     270          << " -> " << totfracsame << " % (LastPqs: " << npaqsamefc_ <<  " / " << nprocpaq_
     271          << " -> " << fracsame << " % )" << endl;
     272    cout << " MonitorProc::procData2C() " << ts << " : created file  " << fname << endl;
     273    cout << "  NBlocProcessed=" << nblocproc_ << " NSameFC=" << totnpaqsamefc_ << " / " << totnprocpaq_
     274         << " -> " << totfracsame << " % (LastPqs: " << npaqsamefc_ <<  " / " << nprocpaq_
     275         << " -> " << fracsame << " % )" << endl;
     276  }
     277  return 0;
     278}
     279
     280/* --Methode-- */
     281int MonitorProc::CheckFrameCounters()
     282{
     283  if (memgr.NbFibres()<2)  {
     284    npaqsamefc_++;  totnpaqsamefc_++;
     285    return 99;
     286  }
     287  sa_size_t  pidx[MAXNBFIB];
     288  sa_size_t maxidx=curfc_.NCols();
     289  uint_8 cfc=0;
     290  for(uint_4 fib=0; fib<curfc_.NRows(); fib++) {
     291    pidx[fib]=0;
     292    while((pidx[fib]<maxidx)&&(cpaqok_(fib,pidx[fib])==0)) pidx[fib]++;
     293  }
     294
     295  bool fgsuite=true;
     296  while (fgsuite) {   // Boucle sur l'ensemble des paquets
     297    for(uint_4 fib=0; fib<curfc_.NRows(); fib++)  {
     298      if ((pidx[fib]>=maxidx)||(cpaqok_(fib,pidx[fib])==0))  { fgsuite=false; break; }
     299    }
     300    if (!fgsuite) break;
     301    cfc=curfc_(0,pidx[0]);
     302    bool fgsame=true;
     303    for(uint_4 fib=1; fib<curfc_.NRows(); fib++) {
     304      if (curfc_(fib,pidx[fib])!=cfc) {
     305        fgsame=false;
     306        if (curfc_(fib,pidx[fib]) > cfc)  cfc=curfc_(fib,pidx[fib]);
     307      }
     308    }
     309    if (fgsame) {
     310      npaqsamefc_++;  totnpaqsamefc_++;
     311      for(uint_4 fib=0; fib<curfc_.NRows(); fib++) {
     312        pidx[fib]++;
     313        while((pidx[fib]<maxidx)&&(cpaqok_(fib,pidx[fib])==0)) pidx[fib]++;
     314      }
     315    }   // fin if (fgsame)
     316    else {  // else !fgsame
     317      for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) {
     318        if (curfc_(fib,pidx[fib])<cfc) {
     319          pidx[fib]++;
     320          while((pidx[fib]<maxidx)&&(cpaqok_(fib,pidx[fib])==0)) pidx[fib]++;
     321        }
     322      }
     323    }   // fin de  else !fgsame
     324  }  // Fin de while sur l'ensemble des paquets
     325  return 0;
     326}
     327
     328
     329static struct sigaction act;
    23330//-------------------------------------------------------
    24331// Classe thread de traitement  avec 1 voie par frame
     
    42349}
    43350
    44 static inline r_4 Zmod2(complex<r_4> z)
    45 { return (z.real()*z.real()+z.imag()*z.imag()); }
    46351
    47352void DataProc::Stop()
Note: See TracChangeset for help on using the changeset viewer.