source: Sophya/trunk/AddOn/TAcq/racqurw.cc@ 4016

Last change on this file since 4016 was 4016, checked in by ansari, 14 years ago

Ajout de commentaires d'autodocumentation Doxygen, Reza 12/08/2011

File size: 36.6 KB
Line 
1//----------------------------------------------------------------
2// ---- classes de threads pour lecture (transfert DMA)
3// et ecriture disque pour acquisition BAORadio -----
4// LAL - R. Ansari - Juin/Juillet 2008
5//----------------------------------------------------------------
6
7#include "racqurw.h"
8
9#include <stdlib.h>
10#include <unistd.h>
11#include <fstream>
12#include <signal.h>
13#include "pexceptions.h"
14#include "timestamp.h"
15
16#include "pciewrap.h"
17#include "brpaqu.h"
18#include "minifits.h"
19
20#include "resusage.h" // Pour mesure temps elapsed/CPU ...
21#include "datatable.h" // Pour sauver les entetes de paquet
22#include <sys/time.h> // pour gettimeofday
23
24// Si on veut que MultiDataSaver cree des fichiers avec le numero des FrameCounters...
25// define DEBUGPAQHDR
26
27//-------------------------------------------------------
28// Classe thread de lecture PCI-Express
29//-------------------------------------------------------
30
31/*!
32 \class PCIEReader
33 \ingroup TAcq
34
35 \brief PCI-Express data read-out thread class \b (deprecated)
36
37 \b Deprecated - Use PCIEMultiReader instead.
38 DMA through the PCIEWrapperInterface object interface and tranfer to RAcqMemZoneMgr memory
39*/
40
41PCIEReader::PCIEReader(PCIEWrapperInterface &pciw,uint_4 sizeFrame,uint_4 packSize ,RAcqMemZoneMgr& mem,
42 uint_4 nmax, BRDataFmtConv swapall)
43 : memgr(mem) , pciw_ (pciw)
44{
45 nmax_ = nmax;
46 swapall_ = swapall; // select data swap/format conversion for BRPaquet
47 stop_ = false;
48 packSize_ = packSize;
49 sizeFr_ =sizeFrame;
50 // Pour la logique de gestion des paquets ds zone memoire
51 mid_ = -2;
52 targ_npaq_ = 0;
53
54 max_targ_npaq = memgr.NbPaquets();
55 mmbuf_ = NULL;
56}
57
58bool PCIEReader::MZoneManage(bool fgclean) // Retourne true si probleme
59{
60 /* Pour debug
61 cout << " PCIEReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_
62 << " max_targ_npaq=" << max_targ_npaq << endl;
63 */
64 if (mid_ >= 0) memgr.FreeMemZone(mid_, MemZS_Filled);
65 mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2;
66 if (fgclean) return false;
67 mid_ = memgr.FindMemZoneId(MemZA_Fill);
68 mmbuf_ = memgr.GetMemZone(mid_);
69 if (mmbuf_==NULL) return true;
70 return false;
71}
72
73void PCIEReader::run()
74{
75 //Precision insuffisante ResourceUsage ru; ru.Update(); // Pour recuperer le temps passe
76 struct timeval tv1, tv2;
77 gettimeofday(&tv1, NULL);
78
79 try{
80 cout << " PCIEReader::run() - Starting , NMaxMemZones=" << nmax_
81 << "memgr.NbPaquets()=" << memgr.NbPaquets() << endl;
82 setRC(1);
83
84
85 // sigaddset(&act.sa_mask,SIGINT); // pour proteger le transfert DMA
86 //sigaction(SIGINT,&act,NULL);
87
88 uint_4 paqsz = memgr.PaqSize();
89 uint_4 dmasz = pciw_.TransferSize();
90 pciw_.StartTransfers();
91
92 Byte* Datas = NULL;
93 Byte* tampon = new Byte[paqsz];
94 Byte* nextpaq = NULL;
95
96 uint_4 off_acheval = 0;
97
98 int nerrdma = 0;
99 int maxerrdma = 10;
100 bool fgarret = false;
101
102 uint_4 npaqfait = 0; // Nb total de paquets traites (DMA + decode)
103
104 /// do{ si boucle infini
105 // for (uint_4 kmz=0; kmz<nmax_; kmz++) {
106 while (npaqfait < nmax_*memgr.NbPaquets()) {
107 if (stop_) break;
108 if (fgarret) break;
109 // On pointe vers le debut de la zone a remplir aver le prochain DMA
110 Datas=pciw_.GetData();
111 if (Datas == NULL) { // No data Read in DMA
112 nerrdma ++;
113 cout << "PCIEReaderChecker/Erreur Waiting for datas ..." << endl;
114 pciw_.PrintStatus(cout);
115 if (nerrdma>=maxerrdma) { fgarret = true; break; }
116 }
117 else { // DMA reussi
118 uint_4 curoff = 0;
119 //1- On traite le paquet a cheval, rempli partiellement avec le DMA d'avant si necessaire
120 if (off_acheval > 0) {
121 if ((paqsz-off_acheval)< dmasz) {
122 memcpy((void *)(tampon+off_acheval), (void *)Datas, paqsz-off_acheval);
123 curoff = paqsz-off_acheval; off_acheval = 0;
124
125 if ((nextpaq=NextPaqTarget()) == NULL) {
126 cout << "2 PCIEReader::run()/Error NextPaqTarget() returned NULL ->STOP 9" << endl;
127 setRC(9); fgarret=true; break;
128 }
129 BRPaquet paq(tampon, nextpaq, paqsz, swapall_);
130 npaqfait++; // Ne pas oublier le compteur de paquets faits
131 }
132 else {
133 memcpy((void *)(tampon+off_acheval), (void *)Datas, dmasz);
134 curoff =dmasz;
135 off_acheval = (dmasz+off_acheval);
136 }
137 }
138 //2- On traite les paquets complet qui se trouvent dans la zone du DMA
139 while((curoff+paqsz)<=dmasz) {
140 // BRPaquet paq((Byte*)(Datas)+((paqsz*j)), nextdma+j*paqsz, paqsz, swapall_);
141 if ((nextpaq=NextPaqTarget()) == NULL) {
142 cout << "3 PCIEReader::run()/Error NextPaqTarget() returned NULL ->STOP 9" << endl;
143 setRC(9); fgarret=true; break;
144 }
145 BRPaquet paq(Datas+curoff, nextpaq, paqsz, swapall_);
146 curoff += paqsz; // On avance l'index dans le buffer du DMA
147 npaqfait++; // Ne pas oublier le compteur de paquets faits
148 } // -- FIN traitement des paquets complets ds un DMA
149 //3- On copie si besoin la fin du DMA dans la zone tampon
150 if (curoff < dmasz) {
151 if (fgarret) break; // pour sortir si l'on est passe par un STOP 9
152 off_acheval = dmasz-curoff;
153 memcpy(tampon, (void*)(Datas+curoff), off_acheval);
154 curoff += off_acheval;
155 }
156 } // Traitement d'un DMA OK
157 }
158// }while(!stop_);
159
160
161 gettimeofday(&tv2, NULL);
162 double tmelaps2 = (tv2.tv_sec-tv1.tv_sec)*1000.+(tv2.tv_usec-tv1.tv_usec)/1000.;
163 if (tmelaps2<0.1) tmelaps2=0.1;
164 cout << " ------------------ PCIEReader::run()-End summary -------------------" << endl;
165 cout << " PCIEReader/Info TotTransfer=" << pciw_.TotTransferBytes()/1024
166 << " kb , ElapsTime=" << tmelaps2 << " ms ->"
167 << (double)pciw_.TotTransferBytes()/tmelaps2 << " kb/s" << endl;
168 cout << " --------------------------------------------------------------------" << endl;
169
170 MZoneManage(true);
171 delete [] tampon;
172
173 }catch (PException& exc) {
174 cout << " PCIEREADER::run()/catched PException " << exc.Msg() << endl;
175 setRC(3);
176 return;
177 }
178 catch(...) {
179 cout << " PCIEREADER::run()/catched unknown ... exception " << endl;
180 setRC(4);
181 return;
182 }
183 setRC(0);
184
185 return;
186}
187
188
189void PCIEReader::Stop()
190{
191 // cout << " PCIEReader::Stop() -------------> STOP" <<endl;
192 stop_ = true;
193 return;
194}
195
196
197//-------------------------------------------------------
198// Classe thread de sauvegarde sur fichiers
199//-------------------------------------------------------
200
201/*!
202 \class DataSaver
203 \ingroup TAcq
204
205 \brief Data saver in FITS format thread class \b (deprecated)
206
207 \b Deprecated - Use MultiDataSaver instead.
208 BRPaquets are dumped to disk in FITS format.
209*/
210
211DataSaver::DataSaver(RAcqMemZoneMgr& mem, string& path, uint_4 nfiles, uint_4 nblocperfile, bool savesig)
212 : memgr(mem)
213{
214 nfiles_ = nfiles;
215 nblocperfile_ = nblocperfile;
216 nmax_ = nblocperfile_*nfiles_;
217 savesig_ = savesig; // Si false, pas d'ecriture des fichiers FITS du signal
218 stop_ = false;
219 path_ = path;
220}
221void DataSaver::Stop()
222{
223 // cout<< " DataSaver:Stop ........ " << endl;
224 stop_=true;
225
226}
227void DataSaver::run()
228{
229 setRC(1);
230 BRPaqChecker pcheck; // Verification/comptage des paquets
231
232 try {
233 TimeStamp ts;
234 cout << " DataSaver::run() - Starting " << ts << " NbFiles=" << nfiles_ << " NBloc/File="
235 << nblocperfile_ << " NMaxMemZones=" << nmax_ << endl;
236 char fname[512];
237
238 sprintf(fname,"%s/saver.log",path_.c_str());
239 ofstream filog(fname);
240 filog << " DataSaver::run() - starting log file " << ts << endl;
241 filog << " NbFiles=" << nfiles_ << " NBloc/File=" << nblocperfile_ << " NMaxMemZones=" << nmax_ << endl;
242
243 DataTable dt;
244 dt.AddLongColumn("TimeTag");
245 dt.AddIntegerColumn("FrameCounter");
246 dt.AddIntegerColumn("FrameLength");
247 dt.AddIntegerColumn("Num");
248 DataTableRow dtr = dt.EmptyRow();
249 uint_8 timtag = 0;
250 uint_4 numpaq = 0;
251 bool dthead = false; // Mettre a false pour ne pas remplir DataTable Headers
252
253
254 uint_4 fnum=0;
255 uint_4 paqsz = memgr.PaqSize();
256 cout << " ============================ DataSaver::run() PaqSize " << paqsz <<endl;
257 bool fgnulldev = false;
258 if (path_ == "/dev/null") {
259 cout << " DataSaver::run()/Warning /dev/null path specified, filenames=/dev/null" << endl;
260 fgnulldev = true;
261 }
262 for (uint_4 nbFile=0;nbFile<nfiles_ ;nbFile++) {
263 if (stop_ ) break;
264 if (fgnulldev) strcpy(fname,"/dev/null");
265 else sprintf(fname,"%s/HDRfits%d.txt",path_.c_str(),fnum);
266 ofstream header(fname);
267
268 BRPaquet paq0(NULL, NULL, paqsz);
269 uint_4 npaqperfile = memgr.NbPaquets()*nblocperfile_; // Nombre de paquets ecrits dans un fichier
270
271 MiniFITSFile mff;
272 if (savesig_) { //Reza - Ouverture conditionnel fichier
273 if (fgnulldev) strcpy(fname,"/dev/null");
274 else sprintf(fname,"%s/signal%d.fits",path_.c_str(),(int)fnum++);
275 mff.Open(fname,MF_Write); //Reza - Ouverture conditionnel fichier
276 // Entete correspondant a l'ecriture tout le paquet - trailer compris (modif Mai 2009)
277 mff.setDTypeNaxis(MF_Byte, paq0.PaquetSize(), npaqperfile);
278
279 // Sans TRAILER de paquet mff.setDTypeNaxis(MF_Byte, paq0.DataSize()+paq0.HeaderSize(), npaqperfile);
280 }
281 else sprintf(fname,"MemDataBloc[%d]-NoDataFile",(int)fnum++);
282
283 for (uint_4 kmz=0; kmz<nblocperfile_; kmz++) {
284 if (stop_) break;
285 //DBG cout << " DataSaver::run()- nbFile=" << nbFile << " kmz=" << kmz << endl;
286 int mid = memgr.FindMemZoneId(MemZA_Save);
287 Byte* buff = memgr.GetMemZone(mid);
288 if (buff == NULL) {
289 cout << " DataSaver::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
290 setRC(2);
291 return;
292 }
293 for(uint_4 i=0; i<memgr.NbPaquets(); i++) {
294
295 BRPaquet paq(NULL, buff+i*paqsz, paqsz);
296 pcheck.Check(paq); // Verification du paquet / FrameCounter
297 if (savesig_)
298 header << hex << paq.HDRMarker() << " " << paq.TRLMarker() << " "
299 << paq.TimeTag2()<< " "<< paq.TimeTag1()<< " "
300 << paq.FrameCounter() << " " << paq.PaqLen() << endl;
301
302 if (dthead) { // Remplissage DataTable entetes paquets
303 timtag = (uint_8)paq.TimeTag2()*0x100000000ULL+paq.TimeTag1();
304 dtr[0] = timtag;
305 dtr[1] = paq.FrameCounter();
306 dtr[2] = paq.PaqLen();
307 dtr[3] = numpaq++;
308 dt.AddRow(dtr);
309 }
310
311 if (savesig_) // Reza - Ecriture conditionnel fichier fits signal
312 mff.WriteB(paq.Header(),paq.PaquetSize()); // ecriture tout le paquet (modif Mai 2009)
313 // mff.WriteB(paq.Data1(), paq.DataSize());
314 // mff.WriteB(paq.Header(),paq.DataSize()+paq.HeaderSize()); // ecriture datas + header
315 }
316 memgr.FreeMemZone(mid, MemZS_Saved);
317 }
318 ts.SetNow();
319 filog << ts << " : OK data file " << fname << endl;
320 cout << " DataSaver::run() " << ts << " : OK data file " << fname << endl;
321 }
322 if (dthead) {
323 cout << dt;
324 char fname2[256];
325 sprintf(fname2,"%s/headers.ppf",path_.c_str());
326 POutPersist po(fname2);
327 po << dt;
328 }
329 cout << " -------------------- DataSaver::run() -------------------- " << endl;
330 pcheck.Print(cout);
331 cout << " ---------------------------------------------------------- " << endl;
332 ts.SetNow();
333 pcheck.Print(filog);
334 filog << " DataSaver::run() - End of processing/run() " << ts << endl;
335
336 }
337 catch (MiniFITSException& exc) {
338 cout << " DataSaver::run()/catched MiniFITSException " << exc.Msg() << endl;
339 setRC(3);
340 return;
341 }
342 catch(...) {
343 cout << " DataSaver::run()/catched unknown ... exception " << endl;
344 setRC(4);
345 return;
346 }
347 setRC(0);
348 return;
349}
350
351
352
353
354//----------------------------------------------------------------------------------------------------------
355// Classe thread de lecture PCI-Express + Check pour tests de verification de debit/etc avec un seul thread
356//----------------------------------------------------------------------------------------------------------
357
358/*!
359 \class PCIEReaderChecker
360 \ingroup TAcq
361
362 \brief PCI-Express data read-out and BRPaquet structure check
363
364*/
365
366
367PCIEReaderChecker::PCIEReaderChecker(PCIEWrapperInterface &pciw, uint_4 sizeFrame,uint_4 packSize, RAcqMemZoneMgr& mem,
368 uint_4 nmax, BRDataFmtConv swapall)
369 : memgr(mem) , pciw_ (pciw)
370{
371 nmax_ = nmax;
372 swapall_ = swapall; // select data swap/format conversion for BRPaquet
373 stop_ = false;
374 packSize_ = packSize;
375 sizeFr_ =sizeFrame;
376}
377void PCIEReaderChecker::run()
378{
379
380 struct timeval tv1, tv2;
381 gettimeofday(&tv1, NULL);
382
383 cout << " PCIEReaderChecker::run() - Starting , NMaxMemZones=" << nmax_
384 << " memgr.NbPaquets()=" << memgr.NbPaquets() << endl;
385 setRC(1);
386 cout << " ... RAcqMemZoneMgr not used - using s fixed memory location for packets decoding ..." << endl;
387
388
389 // sigaddset(&act.sa_mask,SIGINT); // pour proteger le transfert DMA
390 //sigaction(SIGINT,&act,NULL);
391 uint_4 paqsz = memgr.PaqSize();
392 uint_4 dmasz = pciw_.TransferSize();
393 pciw_.StartTransfers();
394
395 BRPaqChecker pcheck; // Verification/comptage des paquets
396
397 Byte* Datas = NULL;
398 Byte* locdata = new Byte[paqsz*memgr.NbPaquets()*memgr.NbZones()];
399 Byte* tampon = new Byte[paqsz];
400
401 uint_4 off_acheval = 0;
402
403 int nerrdma = 0;
404 int maxerrdma = 10;
405 bool fgarret = false;
406 for (uint_4 kmz=0; kmz<nmax_; kmz++) {
407 if (stop_) break;
408 if (fgarret) break;
409
410 Byte* nextdma = locdata+((kmz%memgr.NbZones())*(paqsz*memgr.NbPaquets()));
411 uint_4 npaqfait = 0;
412 // for (uint_4 i=0; i<memgr.NbPaquets(); i += pktInDMATr) { // attention pktInDMATr paquets dans 1 seul DMA
413 while (npaqfait < memgr.NbPaquets()) {
414 if (fgarret) break;
415 // On pointe vers le debut de la zone a remplir aver le prochain DMA
416 //-- Zone memoire locale Byte* nextdma = buff+i*paqsz;
417 Datas=pciw_.GetData();
418
419 if (Datas == NULL) { // No data Read in DMA
420 nerrdma ++;
421 cout << "PCIEReaderChecker/Erreur Waiting for datas ..." << endl;
422 pciw_.PrintStatus(cout);
423 if (nerrdma>=maxerrdma) { fgarret = true; break; }
424 }
425 else {
426 uint_4 curoff = 0;
427 //1- On traite le paquet a cheval, rempli partiellement avec le DMA d'avant si necessaire
428 // if (off_acheval > 0) {
429 // memcpy((void *)(tampon+off_acheval), (void *)Datas, paqsz-off_acheval);
430 // curoff = paqsz-off_acheval; off_acheval = 0;
431
432 // BRPaquet paq(tampon, locdata, paqsz, swapall_);
433 // npaqfait++; // Ne pas oublier le compteur de paquets faits
434 // pcheck.Check(paq); // Verification du paquet / FrameCounter
435 //}
436 if (off_acheval > 0) {
437 if ((paqsz-off_acheval)< dmasz) {
438 memcpy((void *)(tampon+off_acheval), (void *)Datas, paqsz-off_acheval);
439 curoff = paqsz-off_acheval; off_acheval = 0;
440
441 BRPaquet paq(tampon, locdata, paqsz, swapall_);
442 npaqfait++; // Ne pas oublier le compteur de paquets faits
443 pcheck.Check(paq); // Verification du paquet / FrameCounter
444 }
445 else {
446 memcpy((void *)(tampon+off_acheval), (void *)Datas, dmasz);
447 curoff =dmasz;
448 off_acheval = (dmasz+off_acheval);
449 }
450 }
451 //2- On traite les paquets complet qui se trouvent dans la zone du DMA
452 while((curoff+paqsz)<=dmasz) {
453 // BRPaquet paq((Byte*)(Datas)+((paqsz*j)), nextdma+j*paqsz, paqsz, swapall_);
454 // BRPaquet paq(Datas+curoff, locdata, paqsz, swapall_);
455 BRPaquet paq(Datas+curoff, nextdma+npaqfait*paqsz, paqsz, swapall_);
456 curoff += paqsz; // On avance l'index dans le buffer du DMA
457 npaqfait++; // Ne pas oublier le compteur de paquets faits
458 pcheck.Check(paq); // Verification du paquet / FrameCounter
459 } // -- FIN traitement des paquets complets ds un DMA
460 //3- On copie si besoin la fin du DMA dans la zone tampon
461 if (curoff < dmasz) {
462 off_acheval = dmasz-curoff;
463 memcpy(tampon, (void*)(Datas+curoff), off_acheval);
464 curoff += off_acheval;
465 }
466 } // Traitement d'un DMA OK
467
468 } // Fin boucle de remplissage d'une zone memoire
469 } // Fin boucle sur les zones
470
471 setRC(0);
472 gettimeofday(&tv2, NULL);
473 double tmelaps2 = (tv2.tv_sec-tv1.tv_sec)*1000.+(tv2.tv_usec-tv1.tv_usec)/1000.;
474 if (tmelaps2<0.1) tmelaps2=0.1;
475 cout << " ------------------ PCIEReaderChecker::run()-End summary -------------------" << endl;
476 cout << " PCIEReaderChecker/Info TotTransfer=" << pciw_.TotTransferBytes()/1024
477 << " kb , ElapsTime=" << tmelaps2 << " ms ->"
478 << (double)pciw_.TotTransferBytes()/tmelaps2 << " kb/s" << endl;
479 pcheck.Print(cout);
480 cout << " --------------------------------------------------------------------" << endl;
481
482 delete [] locdata;
483 delete [] tampon;
484
485 return;
486}
487void PCIEReaderChecker::Stop()
488{
489 // cout << " PCIEReaderChecker::stop() ........ STOP" <<endl;
490 stop_ = true;
491
492}
493////////////////////////////////////////////////////////////////////////////////////////////////////////
494//----------------------------------------------------------------------------------------------------------
495// Classe thread de lecture PCI-Express + Check pour tests de verification de debit/etc avec un seul thread
496//----------------------------------------------------------------------------------------------------------
497/*!
498 \class PCIEMultiReader
499 \ingroup TAcq
500
501 \brief PCI-Express data read-out thread class from multiple fibres (sources)
502
503 Perform DMA through the set PCIEWrapperInterface object interfaces and tranfer date to RAcqMemZoneMgr memory.
504*/
505
506/* --Methode-- */
507PCIEMultiReader::PCIEMultiReader(vector<PCIEWrapperInterface*> vec_pciw, RAcqMemZoneMgr& mem, BRParList const& par)
508 : memgr(mem), par_(par), vec_pciw_ (vec_pciw)
509{
510 nmax_ = par_.MaxNbBlocs();
511 swapall_ = par_.GetDataConvFg(); // select data swap/format conversion for BRPaquet
512 stop_ = false;
513 packSize_ = par_.RecvPaquetSize();
514 packSizeInMgr_=memgr.PaqSize();
515 sizeFr_=par_.DMASizeBytes();
516 if (vec_pciw.size() != memgr.NbFibres()) {
517 cout << " PCIEMultiReader()PbArgs: vec_pciw.size()= " << vec_pciw.size() << " memgr.NbFibres()=" <<memgr.NbFibres()<< endl;
518 throw ParmError("PCIEMultiReader:ERROR/ arguments incompatibles vec_pciw.size() != memgr.NbFibres() ");
519 }
520 if (vec_pciw.size() > MAXNBFIB)
521 throw ParmError("PCIEMultiReader:ERROR/ vec_pciw.size() > MAXNBFIB ");
522 nbDma_= vec_pciw.size();
523 mid_=-2;
524 mmbuf_=NULL;
525 max_targ_npaq = memgr.NbPaquets();
526 for (int fid=0 ; fid<(int)nbDma_ ;fid++) {
527 memgr.FiberId(fid)=vec_pciw[fid]->FiberId();
528 mmbufib_[fid]=NULL;
529 }
530 stopreason_="??Unknown??";
531}
532
533/* --Methode-- */
534void PCIEMultiReader::run()
535{
536
537 struct timeval tv1,tv2;
538 gettimeofday(&tv1, NULL);
539
540 cout << " PCIEMultiReader::run() - Starting , NMaxMemZones=" << nmax_
541 << " memgr.NbPaquets(),PaqSz=" << memgr.NbPaquets() << " ," << memgr.PaqSize()
542 << " DMA-Paqsize " << packSize_ << " " << BRPaquet::FmtConvToString(swapall_) << endl;
543 setRC(1);
544
545 // sigaddset(&act.sa_mask,SIGINT); // pour proteger le transfert DMA
546 //sigaction(SIGINT,&act,NULL);
547 uint_4 paqszmm = memgr.PaqSize();
548 uint_4 paqsz = packSize_;
549 uint_4 dmasz = vec_pciw_[0]->TransferSize();
550 //DEL vec_pciw_[0]->StartTransfers();
551
552 BRPaqChecker pcheck[MAXNBFIB]; // Verification/comptage des paquets
553 Byte* Datas[MAXNBFIB];
554 Byte* tampon[MAXNBFIB] ;
555 Byte* predtampon=NULL; // tampon de recopie pour la reduction des tailles de paquets
556 Byte* nextpaq=NULL;
557 uint_4 off_acheval=0;
558
559 bool fgarret = false;
560
561 // Initialisation des tampons pour recopie des paquets a cheval pour chaque DMA
562 for (int i=0;i< (int)nbDma_ ;i++) {
563 tampon[i]= new Byte[paqsz];
564 vec_pciw_[i]->SetMaxWaitEndDMA(par_.first_maxkwedma_,par_.first_nretrydma_);
565 }
566 bool fgredpaq=par_.fgreducpsize;
567 if (fgredpaq) {
568 cout << " PCIEMultiReader::run() - PaquetSizeReduction - RedSize=" << par_.redpqsize
569 << " Offset=" << par_.reducoffset << " " << ((par_.reducneedcopy)?"NeedCopy":"NOCopy")
570 << " " << BRPaquet::ReducActionToString(par_.pqreducmode) << endl;
571 predtampon = new Byte[paqsz];
572 }
573
574#ifdef DEBUGPAQHDR
575 ofstream header[MAXNBFIB];
576 for(uint_4 fib=0; fib<nbDma_; fib++) {
577 char hfnm[128];
578 sprintf(hfnm, "./HDRCountPaqs%d.txt", fib);
579 header[fib].open(hfnm);
580 }
581#endif
582 uint_4 npaqfait[MAXNBFIB] ;
583 for (int i=0;i< (int)nbDma_ ;i++) npaqfait[i]=0;
584 // Byte* nextdma = locdata+((kmz%memgr.NbZones())*(paqsz*memgr.NbPaquets()));
585 uint_4 npaqfaitg = 0;
586 bool fg_change_timeout=true;
587 // for (uint_4 i=0; i<memgr.NbPaquets(); i += pktInDMATr) { // attention pktInDMATr paquets dans 1 seul DMA
588 while (npaqfaitg < nmax_*memgr.NbPaquets()) { // Boucle global G
589 if (fgarret) break;
590 if (stop_) break;
591
592 // Lancement des DMA
593 for (int dma=0; dma < (int)nbDma_ ;dma++) vec_pciw_[dma]->StartTransfers();
594 if ((npaqfaitg>1)&&fg_change_timeout) {
595 for (int i=0;i< (int)nbDma_ ;i++)
596 vec_pciw_[i]->SetMaxWaitEndDMA(par_.maxkwedma_,par_.nretrydma_);
597 fg_change_timeout=false;
598 }
599 // On pointe vers le debut de la zone a remplir aver le prochain DMA
600 //-- Zone memoire locale Byte* nextdma = buff+i*paqsz;
601
602 bool fgbaddma=false;
603 // On boucle sur les nbDma_ en attente de leurs terminaison
604 for (int dma=0; dma <(int) nbDma_ ;dma++) {
605 Datas[dma]=vec_pciw_[dma]->GetData();
606 if (Datas[dma] == NULL) { // No data Read in DMA
607 fgbaddma=true;
608 cout << "PCIEMultiReaderChecker/ERROR - DMA failed !" << endl;
609 vec_pciw_[dma]->PrintStatus(cout);
610 stopreason_="--Failed DMA--";
611 fgarret = true; break;
612 }
613 }
614 if (fgbaddma) continue;
615 uint_4 curoff=0;
616 //1- On traite le paquet a cheval, rempli partiellement avec le DMA d'avant si necessaire pour les n fibres
617 if (off_acheval > 0) { // IF Numero B
618 if ((paqsz-off_acheval)< dmasz) { // IF Numero A
619 for(uint_4 fib=0; fib<nbDma_; fib++)
620 memcpy((void *)((tampon[fib])+off_acheval), (void *)Datas[fib], paqsz-off_acheval);
621 curoff = paqsz-off_acheval; off_acheval = 0;
622 if ( MoveToNextTarget() ) {
623 cout << "PCIEMultiReader::run()/Error-A- MoveToNextTarget() returned true ->STOP 9" << endl;
624 setRC(9); fgarret=true; break;
625 }
626 for(uint_4 fib=0; fib<nbDma_; fib++) {
627 nextpaq=GetPaquetTarget(fib);
628 if (nextpaq == NULL) { // Cela ne devrait pas arriver
629 cout << "PCIEReader::run()/Error-A2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl;
630 setRC(9); fgarret=true; break;
631 }
632 // CHECK S'il faut faire une reduction de taille de paquet
633 if (fgredpaq) { // reduction taille de paquet
634 if (par_.reducneedcopy) {
635 BRPaquet paqc1(tampon[fib], predtampon, paqsz, swapall_);
636 BRPaquet paqc2(nextpaq, par_.redpqsize);
637 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
638 }
639 else {
640 BRPaquet paqc1(tampon[fib], paqsz);
641 BRPaquet paqc2(nextpaq, par_.redpqsize);
642 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
643 }
644 }
645 else {
646 BRPaquet paqc(tampon[fib], nextpaq, paqsz, swapall_);
647 }
648 BRPaquet paq(nextpaq, packSizeInMgr_);
649 npaqfait[fib]++;
650 if (fib==nbDma_-1) npaqfaitg++; // Ne pas oublier le compteur de paquets faits
651 pcheck[fib].Check(paq); // Verification du paquet / FrameCounter
652#ifdef DEBUGPAQHDR
653 header[fib] << dec << paq.FrameCounter()<< endl;
654#endif
655 }
656 }
657 else { // se rapporte au IF numero A
658 for(uint_4 fib=0; fib<nbDma_; fib++)
659 memcpy((void *)(tampon[fib]+off_acheval), (void *)Datas[fib], dmasz);
660 curoff =dmasz;
661 off_acheval = (dmasz+off_acheval);
662 }
663 } // Fin IF Numero B
664
665 //2- On traite les paquets complets qui se trouvent dans la zone du DMA
666 while ((curoff+paqsz)<=dmasz) { // while numero C
667 // if ((dma==nbDma_-1)&&(npaqfait >= nmax_* memgr.NbPaquets())) break;
668 if ( MoveToNextTarget() ) {
669 cout << "PCIEMultiReader::run()/Error-B- MoveToNextTarget() returned true ->STOP 9" << endl;
670 setRC(9); fgarret=true; break;
671 }
672 for(uint_4 fib=0; fib<nbDma_; fib++) {
673 if (npaqfait[fib] >= nmax_*memgr.NbPaquets()) continue;
674 nextpaq=GetPaquetTarget(fib);
675 if (nextpaq == NULL) { // Cela ne devrait pas arriver
676 cout << "PCIEReader::run()/Error-B2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl;
677 setRC(9); fgarret=true; break;
678 }
679 // CHECK S'il faut faire une reduction de taille de paquet
680 if (fgredpaq) { // reduction taille de paquet
681 if (par_.reducneedcopy) {
682 BRPaquet paqc1(Datas[fib]+curoff, predtampon, paqsz, swapall_);
683 BRPaquet paqc2(nextpaq, par_.redpqsize);
684 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
685 }
686 else {
687 BRPaquet paqc1(Datas[fib]+curoff, paqsz);
688 BRPaquet paqc2(nextpaq, par_.redpqsize);
689 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
690 }
691 }
692 else {
693 BRPaquet paqc(Datas[fib]+curoff, nextpaq, paqsz, swapall_);
694 }
695 BRPaquet paq(nextpaq, packSizeInMgr_);
696 npaqfait[fib]++;
697 if (fib==nbDma_-1) npaqfaitg++; // Ne pas oublier le compteur de paquets faits
698 pcheck[fib].Check(paq); // Verification du paquet / FrameCounter
699#ifdef DEBUGPAQHDR
700 header[fib] << dec << paq.FrameCounter()<< endl;
701#endif
702 }
703 curoff += paqsz; // On avance l'index dans le buffer du DMA
704 } // -- FIN traitement des paquets complets ds un DMA - FIN du while numero C
705 //3- On copie si besoin la fin du DMA dans la zone tampon
706 if (curoff < dmasz) { // IF numero D
707 off_acheval = dmasz-curoff;
708 for(uint_4 fib=0; fib<nbDma_; fib++)
709 memcpy(tampon[fib], (void*)(Datas[fib]+curoff), off_acheval);
710 // ne sert a rien curoff += off_acheval;
711 } // FIN du if numero D
712 } // FIN Boucle global G
713
714 if (npaqfaitg >= nmax_*memgr.NbPaquets()) stopreason_="--Max Nb paquets reached--";
715
716 MoveToNextTarget(); // Pour faire traiter le dernier paquet si plein
717 MZoneManage(true); //---- Nettoyage final
718
719 setRC(0);
720 gettimeofday(&tv2, NULL);
721 double tmelaps2 = (tv2.tv_sec-tv1.tv_sec)*1000.+(tv2.tv_usec-tv1.tv_usec)/1000.;
722 if (tmelaps2<0.1) tmelaps2=0.1;
723 cout << " ---------- PCIEMultiReader::run()-End StopReason: " << stopreason_ << endl
724 << " Summary NPaqFait=" << npaqfaitg << "------------- " << endl;
725 for (int dma=0; dma < (int)nbDma_ ;dma++) {
726 cout << " --Fib=" << dma << " NPaqFait=" << npaqfait[dma] << " TotTransfer="
727 << vec_pciw_[dma]->TotTransferBytes()/1024
728 << " kb , ElapsTime=" << tmelaps2 << " ms ->"
729 << (double)vec_pciw_[dma]->TotTransferBytes()/tmelaps2 << " kb/s" << endl;
730 pcheck[dma].Print(cout);
731 }
732 cout << " --------------------------------------------------------------------" << endl;
733
734 usleep(250000); // Attente de traitement du dernier paquet
735 memgr.Stop(); // Pour arreter les autres threads
736
737 for (int i=0;i< (int)nbDma_ ;i++) delete[] tampon[i];
738 if ((fgredpaq)&&predtampon) delete[] predtampon;
739#ifdef DEBUGPAQHDR
740 for(uint_4 fib=0; fib<nbDma_; fib++) header[fib].close();
741#endif
742
743 //DBG cout << " fin thread ========================" <<endl;
744 return;
745}
746
747/* --Methode-- */
748bool PCIEMultiReader::MZoneManage(bool fgclean) // Retourne true si probleme
749{
750 /* Pour debug
751 cout << " PCIEReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_
752 << " max_targ_npaq=" << max_targ_npaq << endl;
753 */
754 if (mid_ >= 0) {
755 memgr.GetAuxData(mid_)->filltime_.SetNow();
756 if (fgclean) memgr.FreeMemZone(mid_, MemZS_Free);
757 else memgr.FreeMemZone(mid_, MemZS_Filled);
758 }
759 mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2;
760 for (int fid=0;fid<(int)nbDma_ ;fid++) mmbufib_[fid]=NULL;
761 if (fgclean) return false;
762 mid_ = memgr.FindMemZoneId(MemZA_Fill);
763 mmbuf_ = memgr.GetMemZone(mid_);
764 if (mmbuf_==NULL) return true;
765 for (int fid=0;fid<(int)nbDma_ ;fid++) mmbufib_[fid]=memgr.GetMemZone(mid_,fid);
766 return false;
767}
768
769/*
770bool PCIEMultiReader::MZoneManage(int zone,bool fgclean) // Retourne true si probleme
771{
772 // Pour debug
773 //cout << " PCIEReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_
774 << " max_targ_npaq=" << max_targ_npaq << endl;
775 if (mid_[zone] >= 0) memgr[zone]->FreeMemZone(mid_[zone], MemZS_Filled);
776 mmbuf_[zone] = NULL; targ_npaq_[zone] = 0; mid_[zone] = -2;
777 if (fgclean) return false;
778 mid_[zone] = memgr[zone]->FindMemZoneId(MemZA_Fill);
779 mmbuf_[zone] = memgr[zone]->GetMemZone(mid_[zone]);
780 if (mmbuf_[zone]==NULL) return true;
781 return false;
782}
783*/
784
785/* --Methode-- */
786void PCIEMultiReader::Stop()
787{
788 // cout << " PCIEReaderChecker::stop() ........ STOP" <<endl;
789 stop_ = true;
790
791}
792
793
794//--------------------------------------------------------------------
795// Classe thread de sauvegarde sur fichiers avec gestion multifibres
796//--------------------------------------------------------------------
797/*!
798 \class MultiDataSaver
799 \ingroup TAcq
800
801 \brief Data saver in FITS format thread class for multiple BRPaquet sources (fibres)
802
803 BRPaquets from each source (fibre) are dumped to disk in separate files in FITS format.
804*/
805
806/* --Methode-- */
807MultiDataSaver::MultiDataSaver(RAcqMemZoneMgr& mem)
808 : memgr(mem)
809{
810 BRAcqConfig bpar;
811 nfiles_ = bpar.MaxNbFiles();
812 nblocperfile_ = bpar.BlocPerFile();
813 nmax_ = nblocperfile_*nfiles_;
814 savesig_ = bpar.GetSaveFits(); // Si false, pas d'ecriture des fichiers FITS du signal
815 stop_ = false;
816}
817
818/* --Methode-- */
819void MultiDataSaver::Stop()
820{
821 // cout<< " MultiDataSaver:Stop ........ " << endl;
822 stop_=true;
823}
824
825/* --Methode-- */
826void MultiDataSaver::run()
827{
828 setRC(1);
829 BRPaqChecker pcheck[MAXNBFIB]; // Verification/comptage des paquets
830 BRAcqConfig bpar;
831 try {
832 TimeStamp ts;
833 cout << " MultiDataSaver::run() - Starting " << ts << " \n NbFiles=" << nfiles_ << " NBloc/File="
834 << nblocperfile_ << " NMaxMemZones=" << nmax_ << endl;
835 char fname[512];
836
837 sprintf(fname,"%s/msaver.log",bpar.OutputDirectory().c_str());
838 ofstream filog(fname);
839 filog << " MultiDataSaver::run() - starting log file " << ts << " NFibres= " << memgr.NbFibres() << endl;
840 filog << " NbFiles=" << nfiles_ << " NBloc/File=" << nblocperfile_ << " NMaxMemZones=" << nmax_ << endl;
841
842 // Fichiers entete ascii et signal FITS
843 ofstream header[MAXNBFIB];
844 MiniFITSFile mff[MAXNBFIB];
845
846 uint_4 fnum=0;
847 uint_4 paqsz = memgr.PaqSize();
848 cout << " ============================ MultiDataSaver::run() PaqSize " << paqsz <<endl;
849 bool fgfirstfctt=false;
850 bool fgnulldev = bpar.GetFileDevNull();
851 if (fgnulldev) {
852 cout << " MultiDataSaver::run()/Warning /dev/null path specified, filenames=/dev/null" << endl;
853 fgnulldev = true;
854 }
855 BRPaquet paq0(NULL, NULL, paqsz);
856 TimeStamp tsmz;
857 for (uint_4 nbFile=0;nbFile<nfiles_ ;nbFile++) {
858 if (stop_ ) break;
859 if (memgr.GetRunState() == MemZR_Stopped) break;
860
861 if (savesig_)
862 for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) {
863 if (fgnulldev) strcpy(fname,"/dev/null");
864 else sprintf(fname,"%sHDRfits%d.txt",bpar.OutputDirectoryFib(fib).c_str(),fnum);
865 header[fib].open(fname);
866 }
867
868 uint_4 npaqperfile = memgr.NbPaquets()*nblocperfile_; // Nombre de paquets ecrits dans un fichier
869
870 if (savesig_) { //Reza - Ouverture conditionnel fichier
871 for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) {
872 if (fgnulldev) strcpy(fname,"/dev/null");
873 else sprintf(fname,"%ssignal%d.fits",bpar.OutputDirectoryFib(fib).c_str(),(int)fnum);
874 // cout << " ***DBG** Opening file " << fname << endl;
875 mff[fib].Open(fname,MF_Write); //Reza - Ouverture conditionnel fichier
876 // Entete correspondant a l'ecriture tout le paquet - trailer compris (modif Mai 2009)
877 mff[fib].setDTypeNaxis(MF_Byte, paq0.PaquetSize(), npaqperfile);
878 // Sans TRAILER de paquet mff.setDTypeNaxis(MF_Byte, paq0.DataSize()+paq0.HeaderSize(), npaqperfile);
879 }
880 fnum++; fgfirstfctt=true;
881 }
882 else sprintf(fname,"MemDataBloc[%d]-NoDataFile",(int)fnum++);
883
884 for (uint_4 kmz=0; kmz<nblocperfile_; kmz++) {
885 if (stop_) break;
886 //DBG cout << " MultiDataSaver::run()- nbFile=" << nbFile << " kmz=" << kmz << endl;
887 int mid = memgr.FindMemZoneId(MemZA_Save);
888 Byte* buffg = memgr.GetMemZone(mid);
889 if (buffg == NULL) {
890 cout << " MultiDataSaver::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
891 setRC(21);
892 return;
893 }
894 tsmz = memgr.GetAuxData(mid)->filltime_;
895 for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) { // Boucle sur les fibres
896 Byte* buff = memgr.GetMemZone(mid,fib);
897 if (buff == NULL) { // Ceci ne devrait pas arriver - suite au test buffg ci-dessus
898 cout << " MultiDataSaver::run()/ERROR memgr.GetMemZone(" << mid << "," << fib << ") -> NULL" << endl;
899 setRC(22);
900 return;
901 }
902 for(uint_4 i=0; i<memgr.NbPaquets(); i++) { // boucle sur les paquets
903 BRPaquet paq(NULL, buff+i*paqsz, paqsz);
904 bool cpaqok=pcheck[fib].Check(paq); // Verification du paquet / FrameCounter
905 if (cpaqok && fgfirstfctt) {
906 framecnt_first_[fib] = pcheck[fib].LastFrameNum();
907 timetag_first_[fib] = paq.TimeTag();
908 AddFitsKWStart(mff,bpar,tsmz);
909 fgfirstfctt=false;
910 }
911 if (savesig_)
912 header[fib] << hex << paq.HDRMarker() << " " << paq.TRLMarker() << " "
913 << paq.TimeTag2()<< " "<< paq.TimeTag1()<< " "
914 << paq.FrameCounter() << " " << paq.PaqLen() << endl;
915 if (savesig_) // Reza - Ecriture conditionnel fichier fits signal
916 mff[fib].WriteB(paq.Header(),paq.PaquetSize()); // ecriture tout le paquet (modif Mai 2009)
917 } // Fin de la boucle sur les paquets
918 } // Fin de la boucle sur les fibres
919 memgr.FreeMemZone(mid, MemZS_Saved);
920 } // Boucle sur les blocs dans un meme fichier
921 ts.SetNow();
922 filog << ts << " : OK data files " << endl;
923 cout << " MultiDataSaver::run() " << ts << " : OK data files " << endl;
924 for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) {
925 if (savesig_) {
926 if (fgnulldev) strcpy(fname,"/dev/null");
927 else sprintf(fname,"%ssignal%d.fits",bpar.OutputDirectoryFib(fib).c_str(),(int)fnum-1);
928 }
929 else sprintf(fname,"MemDataBloc[%d]-NoDataFile",(int)fnum-1);
930 string pcsum = pcheck[fib].Summary();
931 filog << " Fib " << fib << " -> " << fname << " Stat:" << pcsum << endl;
932 cout << " Fib " << fib << " -> " << fname << " Stat:" << pcsum << endl;
933 }
934 if (savesig_) {
935 // Ajout mots-cle additionnels a tous les fichiers FITS
936 for(uint_4 fib=0; fib<memgr.NbFibres(); fib++)
937 framecnt_last_[fib] = pcheck[fib].LastFrameNum();
938
939 AddFitsKWEnd(mff,bpar,tsmz);
940 for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) {
941 header[fib].close();
942 mff[fib].Close();
943 }
944 }
945
946 } // Fin de boucle sur les fichiers
947 cout << " -------------------- MultiDataSaver::run() -------------------- " << endl;
948 for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) {
949 cout << " MultiDataSaver/Summary Fib " << fib << endl;
950 pcheck[fib].Print(cout);
951 filog << " MultiDataSaver/Summary Fib " << fib << endl;
952 pcheck[fib].Print(filog);
953 }
954 cout << " ---------------------------------------------------------- " << endl;
955 ts.SetNow();
956 filog << " MultiDataSaver::run() - End of processing/run() " << ts << endl;
957
958 }
959 catch (MiniFITSException& exc) {
960 cout << " MultiDataSaver::run()/catched MiniFITSException " << exc.Msg() << endl;
961 setRC(3);
962 return;
963 }
964 catch(...) {
965 cout << " MultiDataSaver::run()/catched unknown ... exception " << endl;
966 setRC(4);
967 return;
968 }
969 setRC(0);
970 return;
971}
972
973
974/* --Methode-- */
975int MultiDataSaver::AddFitsKWStart(MiniFITSFile* mff, BRAcqConfig& acpar, TimeStamp& ts)
976{
977 string cdtu=ts.ToString();
978 string& skysrc=acpar.SkySource();
979 bool hassrc=false;
980 if (skysrc.length()>0) hassrc=true;
981 bool fgredpsz = acpar.GetParams().fgreducpsize;
982 for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) {
983 mff[fib].AddKeyS("DATEOBS", cdtu.c_str(), " Observation Time (YYYY-MM-DDThh:mm:ss UT) ");
984 mff[fib].AddKeyS("TMSTART", cdtu.c_str(), " File Acqu. Start Time/Date ");
985 mff[fib].AddKeyD("ACQVER", acpar.AcqVersion(), " BAORadio Acq Software version ") ;
986 mff[fib].AddKeyS("ACQMODE", acpar.GetParams().AcqMode, " BAORadio Acq run mode" );
987 mff[fib].AddKeyS("BRPAQCFMT", BRPaquet::FmtConvToString(acpar.GetParams().GetDataConvFg()),
988 " BAORadio BRPaquet DataFormatConversion" );
989 mff[fib].AddKeyI("FIBERNUM", acpar.FiberNum(fib), " Fiber number") ;
990 mff[fib].AddKeyI("FIBERID",memgr.FiberId(fib), " Fiber identifier (absolute id)");
991 if (hassrc)
992 mff[fib].AddKeyS("SKYSOURC", skysrc, " Source identification" );
993 if (fgredpsz) {
994 mff[fib].AddKeyS("REDPSZMOD", BRPaquet::ReducActionToString(acpar.GetParams().pqreducmode),
995 "PaquetSize Reduction Mode") ;
996 mff[fib].AddKeyI("REDPSZOF", acpar.GetParams().reducoffset," PaquetSize Reduction Offset") ;
997 }
998 }
999 return 0;
1000}
1001
1002/* --Methode-- */
1003int MultiDataSaver::AddFitsKWEnd(MiniFITSFile* mff, BRAcqConfig& acpar, TimeStamp& ts)
1004{
1005 string cdtu=ts.ToString();
1006 for(uint_4 fib=0; fib<memgr.NbFibres(); fib++) {
1007 mff[fib].AddKeyS("TMEND", cdtu.c_str(), " File Acqu. End Time/Date ");
1008 mff[fib].AddKeyI("FCFIRST", framecnt_first_[fib], " First valid frame counter in file") ;
1009 mff[fib].AddKeyI("FCLAST", framecnt_last_[fib], " Last valid frame counter in file") ;
1010 mff[fib].AddKeyI("TTFIRST", timetag_first_[fib], " First valid timetag in file") ;
1011 }
1012 return 0;
1013}
1014
Note: See TracBrowser for help on using the repository browser.