source: Sophya/trunk/AddOn/TAcq/racqurw.cc@ 3643

Last change on this file since 3643 was 3643, checked in by ansari, 16 years ago

Importation des modifs effectuees sur pc-sitr2 pour l'acquisition - Reza 27/05/2009

File size: 15.3 KB
RevLine 
[3537]1//----------------------------------------------------------------
2// ---- classes de threads pour lecture (transfert DMA)
3// et ecriture disque pour acquisition BAORadio -----
4// LAL - R. Ansari - Juin/Juillet 2008
5//----------------------------------------------------------------
6
7#include "racqurw.h"
8
9#include <stdlib.h>
10#include <unistd.h>
11#include <fstream>
[3623]12#include <signal.h>
[3537]13#include "pexceptions.h"
14#include "timestamp.h"
15
16#include "pciewrap.h"
17#include "brpaqu.h"
18#include "minifits.h"
19
[3623]20#include "resusage.h" // Pour mesure temps elapsed/CPU ...
21#include "datatable.h" // Pour sauver les entetes de paquet
22#include <sys/time.h> // pour gettimeofday
[3537]23//-------------------------------------------------------
24// Classe thread de lecture PCI-Express
25//-------------------------------------------------------
26
[3628]27PCIEReader::PCIEReader(PCIEWrapperInterface &pciw,uint_4 sizeFrame,uint_4 packSize ,RAcqMemZoneMgr& mem,
[3623]28 uint_4 nmax, BRDataFmtConv swapall)
29 : memgr(mem) , pciw_ (pciw)
[3537]30{
[3623]31 nmax_ = nmax;
32 swapall_ = swapall; // select data swap/format conversion for BRPaquet
33 stop_ = false;
34 packSize_ = packSize;
35 sizeFr_ =sizeFrame;
36 // Pour la logique de gestion des paquets ds zone memoire
37 mid_ = -2;
38 targ_npaq_ = 0;
39
40 max_targ_npaq = memgr.NbPaquets();
41 mmbuf_ = NULL;
[3537]42}
43
[3623]44bool PCIEReader::MZoneManage(bool fgclean) // Retourne true si probleme
45{
46 /* Pour debug
47 cout << " PCIEReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_
48 << " max_targ_npaq=" << max_targ_npaq << endl;
49 */
50 if (mid_ >= 0) memgr.FreeMemZone(mid_, MemZS_Filled);
51 mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2;
52 if (fgclean) return false;
53 mid_ = memgr.FindMemZoneId(MemZA_Fill);
54 mmbuf_ = memgr.GetMemZone(mid_);
55 if (mmbuf_==NULL) return true;
56 return false;
57}
58
[3537]59void PCIEReader::run()
60{
[3623]61 struct sigaction act;
62 struct sigaction old_act;
63 //Precision insuffisante ResourceUsage ru; ru.Update(); // Pour recuperer le temps passe
64 struct timeval tv1, tv2;
65 gettimeofday(&tv1, NULL);
66
67 try{
68 cout << " PCIEReader::run() - Starting , NMaxMemZones=" << nmax_
69 << "memgr.NbPaquets()=" << memgr.NbPaquets() << endl;
[3537]70 setRC(1);
[3623]71
72 uint_4 cpt=0;
73 // sigaddset(&act.sa_mask,SIGINT); // pour proteger le transfert DMA
74 //sigaction(SIGINT,&act,NULL);
75
[3537]76 uint_4 paqsz = memgr.PaqSize();
[3623]77 uint_4 dmasz = pciw_.TransferSize();
[3643]78 pciw_.StartTransfers();
[3623]79
80 Byte* Datas = NULL;
81 Byte* tampon = new Byte[paqsz];
82 Byte* nextpaq = NULL;
83
84 uint_4 off_acheval = 0;
85
86 int nerrdma = 0;
87 int maxerrdma = 10;
88 bool fgarret = false;
89
90 uint_4 npaqfait = 0; // Nb total de paquets traites (DMA + decode)
91
92 /// do{ si boucle infini
93 // for (uint_4 kmz=0; kmz<nmax_; kmz++) {
94 while (npaqfait < nmax_*memgr.NbPaquets()) {
95 if (stop_) break;
96 if (fgarret) break;
97 // On pointe vers le debut de la zone a remplir aver le prochain DMA
[3643]98 Datas=pciw_.GetData();
[3623]99 if (Datas == NULL) { // No data Read in DMA
100 nerrdma ++;
101 cout << "PCIEReaderChecker/Erreur Waiting for datas ..." << endl;
102 pciw_.PrintStatus(cout);
103 if (nerrdma>=maxerrdma) { fgarret = true; break; }
104 }
105 else { // DMA reussi
106 uint_4 curoff = 0;
107 //1- On traite le paquet a cheval, rempli partiellement avec le DMA d'avant si necessaire
108 if (off_acheval > 0) {
109 if ((paqsz-off_acheval)< dmasz) {
110 memcpy((void *)(tampon+off_acheval), (void *)Datas, paqsz-off_acheval);
111 curoff = paqsz-off_acheval; off_acheval = 0;
112
113 if ((nextpaq=NextPaqTarget()) == NULL) {
114 cout << "2 PCIEReader::run()/Error NextPaqTarget() returned NULL ->STOP 9" << endl;
115 setRC(9); fgarret=true; break;
116 }
117 BRPaquet paq(tampon, nextpaq, paqsz, swapall_);
118 npaqfait++; // Ne pas oublier le compteur de paquets faits
119 }
120 else {
121 memcpy((void *)(tampon+off_acheval), (void *)Datas, dmasz);
122 curoff =dmasz;
123 off_acheval = (dmasz+off_acheval);
124 }
125 }
126 //2- On traite les paquets complet qui se trouvent dans la zone du DMA
127 while((curoff+paqsz)<=dmasz) {
128 // BRPaquet paq((Byte*)(Datas)+((paqsz*j)), nextdma+j*paqsz, paqsz, swapall_);
129 if ((nextpaq=NextPaqTarget()) == NULL) {
130 cout << "3 PCIEReader::run()/Error NextPaqTarget() returned NULL ->STOP 9" << endl;
131 setRC(9); fgarret=true; break;
132 }
133 BRPaquet paq(Datas+curoff, nextpaq, paqsz, swapall_);
134 curoff += paqsz; // On avance l'index dans le buffer du DMA
135 npaqfait++; // Ne pas oublier le compteur de paquets faits
136 } // -- FIN traitement des paquets complets ds un DMA
137 //3- On copie si besoin la fin du DMA dans la zone tampon
138 if (curoff < dmasz) {
139 if (fgarret) break; // pour sortir si l'on est passe par un STOP 9
140 off_acheval = dmasz-curoff;
141 memcpy(tampon, (void*)(Datas+curoff), off_acheval);
142 curoff += off_acheval;
143 }
144 } // Traitement d'un DMA OK
145 }
146// }while(!stop_);
147
148
149 gettimeofday(&tv2, NULL);
150 double tmelaps2 = (tv2.tv_sec-tv1.tv_sec)*1000.+(tv2.tv_usec-tv1.tv_usec)/1000.;
151 if (tmelaps2<0.1) tmelaps2=0.1;
152 cout << " ------------------ PCIEReader::run()-End summary -------------------" << endl;
153 cout << " PCIEReader/Info TotTransfer=" << pciw_.TotTransferBytes()/1024
154 << " kb , ElapsTime=" << tmelaps2 << " ms ->"
155 << (double)pciw_.TotTransferBytes()/tmelaps2 << " kb/s" << endl;
156 cout << " --------------------------------------------------------------------" << endl;
157
158 MZoneManage(true);
159 delete [] tampon;
160
161 }catch (PException& exc) {
162 cout << " PCIEREADER::run()/catched PException " << exc.Msg() << endl;
163 setRC(3);
164 return;
165 }
166 catch(...) {
167 cout << " PCIEREADER::run()/catched unknown ... exception " << endl;
168 setRC(4);
169 return;
170 }
[3537]171 setRC(0);
[3623]172
[3537]173 return;
174}
175
176
[3623]177void PCIEReader::Stop()
178{
179 // cout << " PCIEReader::Stop() -------------> STOP" <<endl;
180 stop_ = true;
181 return;
182}
[3537]183
[3623]184
[3537]185//-------------------------------------------------------
186// Classe thread de sauvegarde sur fichiers
187//-------------------------------------------------------
188
[3623]189DataSaver::DataSaver(RAcqMemZoneMgr& mem, string& path, uint_4 nfiles, uint_4 nblocperfile, bool savesig)
[3537]190 : memgr(mem)
191{
[3623]192 nfiles_ = nfiles;
193 nblocperfile_ = nblocperfile;
194 nmax_ = nblocperfile_*nfiles_;
195 savesig_ = savesig; // Si false, pas d'ecriture des fichiers FITS du signal
[3537]196 stop_ = false;
197 path_ = path;
198}
[3623]199void DataSaver::Stop()
200{
201 // cout<< " DataSaver:Stop ........ " << endl;
202 stop_=true;
[3537]203
[3623]204}
[3537]205void DataSaver::run()
206{
207 setRC(1);
[3623]208 BRPaqChecker pcheck; // Verification/comptage des paquets
209
[3537]210 try {
211 TimeStamp ts;
[3623]212 cout << " DataSaver::run() - Starting " << ts << " NbFiles=" << nfiles_ << " NBloc/File="
213 << nblocperfile_ << " NMaxMemZones=" << nmax_ << endl;
[3537]214 char fname[512];
[3623]215
[3537]216 sprintf(fname,"%s/saver.log",path_.c_str());
217 ofstream filog(fname);
218 filog << " DataProc::run() - starting log file " << ts << endl;
[3623]219 filog << " NbFiles=" << nfiles_ << " NBloc/File=" << nblocperfile_ << " NMaxMemZones=" << nmax_ << endl;
220
221 DataTable dt;
222 dt.AddLongColumn("TimeTag");
223 dt.AddIntegerColumn("FrameCounter");
224 dt.AddIntegerColumn("FrameLength");
225 dt.AddIntegerColumn("Num");
226 DataTableRow dtr = dt.EmptyRow();
227 uint_8 timtag = 0;
228 uint_4 numpaq = 0;
229 bool dthead = false; // Mettre a false pour ne pas remplir DataTable Headers
[3537]230
[3623]231
232 uint_4 fnum=0;
[3537]233 uint_4 paqsz = memgr.PaqSize();
[3623]234 for (uint_4 nbFile=0;nbFile<nfiles_ ;nbFile++) {
235 if (stop_ ) break;
236 sprintf(fname,"%s/HDRfits%d.txt",path_.c_str(),fnum);
237 ofstream header(fname);
238
239 BRPaquet paq0(NULL, NULL, paqsz);
240 uint_4 npaqperfile = memgr.NbPaquets()*nblocperfile_; // Nombre de paquets ecrits dans un fichier
241
242 MiniFITSFile mff;
243 if (savesig_) { //Reza - Ouverture conditionnel fichier
244 sprintf(fname,"%s/signal%d.fits",path_.c_str(),(int)fnum++);
245 mff.Open(fname,MF_Write); //Reza - Ouverture conditionnel fichier
[3640]246 // Entete correspondant a l'ecriture tout le paquet - trailer compris (modif Mai 2009)
247 mff.setDTypeNaxis(MF_Byte, paq0.PaquetSize(), npaqperfile);
248 // Sans TRAILER de paquet mff.setDTypeNaxis(MF_Byte, paq0.DataSize()+paq0.HeaderSize(), npaqperfile);
[3537]249 }
[3623]250 else sprintf(fname,"MemDataBloc[%d]-NoDataFile",(int)fnum++);
251
252 for (uint_4 kmz=0; kmz<nblocperfile_; kmz++) {
253 if (stop_) break;
254 //DBG cout << " DataSaver::run()- nbFile=" << nbFile << " kmz=" << kmz << endl;
255 int mid = memgr.FindMemZoneId(MemZA_Save);
256 Byte* buff = memgr.GetMemZone(mid);
257 if (buff == NULL) {
258 cout << " DataSaver::run()/ERROR memgr.GetMemZone(" << mid << ") -> NULL" << endl;
259 setRC(2);
260 return;
261 }
262 for(uint_4 i=0; i<memgr.NbPaquets(); i++) {
263
264 BRPaquet paq(NULL, buff+i*paqsz, paqsz);
265 pcheck.Check(paq); // Verification du paquet / FrameCounter
[3643]266 if (savesig_)
267 header << hex << paq.HDRMarker() << " " << paq.TRLMarker() << " "
268 << paq.TimeTag2()<< " "<< paq.TimeTag1()<< " "
269 << paq.FrameCounter() << " " << paq.PaqLen() << endl;
[3623]270
271 if (dthead) { // Remplissage DataTable entetes paquets
272 timtag = (uint_8)paq.TimeTag2()*0x100000000ULL+paq.TimeTag1();
273 dtr[0] = timtag;
274 dtr[1] = paq.FrameCounter();
275 dtr[2] = paq.PaqLen();
276 dtr[3] = numpaq++;
277 dt.AddRow(dtr);
278 }
279
280 if (savesig_) // Reza - Ecriture conditionnel fichier fits signal
[3640]281 mff.WriteB(paq.Header(),paq.PaquetSize()); // ecriture tout le paquet (modif Mai 2009)
282 // mff.WriteB(paq.Data1(), paq.DataSize());
283 // mff.WriteB(paq.Header(),paq.DataSize()+paq.HeaderSize()); // ecriture datas + header
[3623]284 }
285 memgr.FreeMemZone(mid, MemZS_Saved);
[3537]286 }
[3623]287 ts.SetNow();
288 filog << ts << " : OK data file " << fname << endl;
289 cout << " DataSaver::run() " << ts << " : OK data file " << fname << endl;
[3537]290 }
[3623]291 if (dthead) {
292 cout << dt;
293 char fname2[256];
294 sprintf(fname2,"%s/headers.ppf",path_.c_str());
295 POutPersist po(fname2);
296 po << dt;
297 }
298 cout << " -------------------- DataSaver::run() -------------------- " << endl;
299 pcheck.Print(cout);
300 cout << " ---------------------------------------------------------- " << endl;
301 ts.SetNow();
302 pcheck.Print(filog);
303 filog << " DataSaver::run() - End of processing/run() " << ts << endl;
304
[3537]305 }
306 catch (MiniFITSException& exc) {
307 cout << " DataSaver::run()/catched MiniFITSException " << exc.Msg() << endl;
308 setRC(3);
309 return;
310 }
311 catch(...) {
312 cout << " DataSaver::run()/catched unknown ... exception " << endl;
313 setRC(4);
314 return;
315 }
316 setRC(0);
317 return;
318}
319
320
[3623]321
322
323//----------------------------------------------------------------------------------------------------------
324// Classe thread de lecture PCI-Express + Check pour tests de verification de debit/etc avec un seul thread
325//----------------------------------------------------------------------------------------------------------
326
[3628]327PCIEReaderChecker::PCIEReaderChecker(PCIEWrapperInterface &pciw,uint_4 sizeFrame,uint_4 packSize ,RAcqMemZoneMgr& mem,
[3623]328 uint_4 nmax, BRDataFmtConv swapall)
329 : memgr(mem) , pciw_ (pciw)
330{
331 nmax_ = nmax;
332 swapall_ = swapall; // select data swap/format conversion for BRPaquet
333 stop_ = false;
334 packSize_ = packSize;
335 sizeFr_ =sizeFrame;
336}
337void PCIEReaderChecker::run()
338{
339
340 struct timeval tv1, tv2;
341 gettimeofday(&tv1, NULL);
342
343 cout << " PCIEReaderChecker::run() - Starting , NMaxMemZones=" << nmax_
344 << " memgr.NbPaquets()=" << memgr.NbPaquets() << endl;
345 setRC(1);
346 cout << " ... RAcqMemZoneMgr not used - using s fixed memory location for packets decoding ..." << endl;
347
348 uint_4 cpt=0;
349 // sigaddset(&act.sa_mask,SIGINT); // pour proteger le transfert DMA
350 //sigaction(SIGINT,&act,NULL);
351 uint_4 paqsz = memgr.PaqSize();
352 uint_4 dmasz = pciw_.TransferSize();
[3643]353 pciw_.StartTransfers();
[3623]354
355 BRPaqChecker pcheck; // Verification/comptage des paquets
356
357 Byte* Datas = NULL;
358 Byte* locdata = new Byte[paqsz];
359 Byte* tampon = new Byte[paqsz];
360
361 uint_4 off_acheval = 0;
362
363 int nerrdma = 0;
364 int maxerrdma = 10;
365 bool fgarret = false;
366 for (uint_4 kmz=0; kmz<nmax_; kmz++) {
367 if (stop_) break;
368 if (fgarret) break;
369
370 Byte* nextdma = locdata;
371 uint_4 npaqfait = 0;
372 // for (uint_4 i=0; i<memgr.NbPaquets(); i += pktInDMATr) { // attention pktInDMATr paquets dans 1 seul DMA
373 while (npaqfait < memgr.NbPaquets()) {
374 if (fgarret) break;
375 // On pointe vers le debut de la zone a remplir aver le prochain DMA
376 //-- Zone memoire locale Byte* nextdma = buff+i*paqsz;
[3643]377 Datas=pciw_.GetData();
[3623]378
379 if (Datas == NULL) { // No data Read in DMA
380 nerrdma ++;
381 cout << "PCIEReaderChecker/Erreur Waiting for datas ..." << endl;
382 pciw_.PrintStatus(cout);
383 if (nerrdma>=maxerrdma) { fgarret = true; break; }
384 }
385 else {
386 uint_4 curoff = 0;
387 //1- On traite le paquet a cheval, rempli partiellement avec le DMA d'avant si necessaire
388 // if (off_acheval > 0) {
389 // memcpy((void *)(tampon+off_acheval), (void *)Datas, paqsz-off_acheval);
390 // curoff = paqsz-off_acheval; off_acheval = 0;
391
392 // BRPaquet paq(tampon, locdata, paqsz, swapall_);
393 // npaqfait++; // Ne pas oublier le compteur de paquets faits
394 // pcheck.Check(paq); // Verification du paquet / FrameCounter
395 //}
396 if (off_acheval > 0) {
397 if ((paqsz-off_acheval)< dmasz) {
398 memcpy((void *)(tampon+off_acheval), (void *)Datas, paqsz-off_acheval);
399 curoff = paqsz-off_acheval; off_acheval = 0;
400
401 BRPaquet paq(tampon, locdata, paqsz, swapall_);
402 npaqfait++; // Ne pas oublier le compteur de paquets faits
403 pcheck.Check(paq); // Verification du paquet / FrameCounter
404 }
405 else {
406 memcpy((void *)(tampon+off_acheval), (void *)Datas, dmasz);
407 curoff =dmasz;
408 off_acheval = (dmasz+off_acheval);
409 }
410 }
411 //2- On traite les paquets complet qui se trouvent dans la zone du DMA
412 while((curoff+paqsz)<=dmasz) {
413 // BRPaquet paq((Byte*)(Datas)+((paqsz*j)), nextdma+j*paqsz, paqsz, swapall_);
414 BRPaquet paq(Datas+curoff, locdata, paqsz, swapall_);
415 curoff += paqsz; // On avance l'index dans le buffer du DMA
416 npaqfait++; // Ne pas oublier le compteur de paquets faits
417 pcheck.Check(paq); // Verification du paquet / FrameCounter
418 } // -- FIN traitement des paquets complets ds un DMA
419 //3- On copie si besoin la fin du DMA dans la zone tampon
420 if (curoff < dmasz) {
421 off_acheval = dmasz-curoff;
422 memcpy(tampon, (void*)(Datas+curoff), off_acheval);
423 curoff += off_acheval;
424 }
425 } // Traitement d'un DMA OK
426
427 } // Fin boucle de remplissage d'une zone memoire
428 } // Fin boucle sur les zones
429
430 setRC(0);
431 gettimeofday(&tv2, NULL);
432 double tmelaps2 = (tv2.tv_sec-tv1.tv_sec)*1000.+(tv2.tv_usec-tv1.tv_usec)/1000.;
433 if (tmelaps2<0.1) tmelaps2=0.1;
434 cout << " ------------------ PCIEReaderChecker::run()-End summary -------------------" << endl;
435 cout << " PCIEReaderChecker/Info TotTransfer=" << pciw_.TotTransferBytes()/1024
436 << " kb , ElapsTime=" << tmelaps2 << " ms ->"
437 << (double)pciw_.TotTransferBytes()/tmelaps2 << " kb/s" << endl;
438 pcheck.Print(cout);
439 cout << " --------------------------------------------------------------------" << endl;
440
441 delete [] locdata;
442 delete [] tampon;
443
444 return;
445}
446void PCIEReaderChecker::Stop()
447{
448 // cout << " PCIEReaderChecker::stop() ........ STOP" <<endl;
449 stop_ = true;
450
451}
Note: See TracBrowser for help on using the repository browser.