source: Sophya/trunk/AddOn/TAcq/mfacq.cc@ 3899

Last change on this file since 3899 was 3899, checked in by ansari, 15 years ago

Ajout flag waitendmsg (Wait for END message) pour le controle de la fin d'execution de EthernetReader , Reza 04/10/2010

File size: 16.2 KB
RevLine 
[3671]1#include "mfacq.h"
2
3//---------------------------------------------------------------
4// Programme d'acquisition BAORadio multi-fibres/multi-threads
5// LAL - 2009 - 2010
6// R. Ansari, M.Taurigna
7//---------------------------------------------------------------
8
9static RAcqMemZoneMgr* pMmgr=NULL;
10static PCIEMultiReader* pPcierThr=NULL;
[3757]11static PCIEToEthernet* pPcie2Eth=NULL;
12static EthernetReader* pEthRdr=NULL;
[3671]13
14void Stop(int s)
15{
16 if (s == 9765) cout << " Stop after exception ..." << endl;
17 else printf("............. MAIN ... receive signal %d \n",s);
18 if (pMmgr != NULL) pMmgr->Stop();
[3757]19 if (pPcie2Eth !=NULL) pPcie2Eth->Stop();
[3671]20 if (pPcierThr !=NULL) pPcierThr->Stop();
[3757]21 if (pEthRdr !=NULL) pEthRdr->Stop();
[3671]22}
23
24
25//-----------------------------------------------------------------------
26//-------------------- le programme principal ---------------------------
27//-----------------------------------------------------------------------
28int main(int narg, char* arg[])
29{
30
31 int rc = 0;
32 cout << " ============= BAORadio / Acquisition : mfacq =================" << endl;
33 cout << " ===============================================================" << endl;
34 cout << " ========= " <<BAOR_ACQ_VER_STR <<BAOR_ACQ_VER << " ===========" << endl;
35 cout << " ===============================================================" << endl;
36
37
38 InitTim();
39
40 // Initialisation
41 TArrayInitiator _arri;
42 FitsIOServerInitiator _fiosi;
43
44
[3757]45 if ((narg > 1)&&(strcmp(arg[1],"-h"))==0) {
[3671]46 Usage(false);
47 return 1;
48 }
[3757]49 if (narg < 3) {
50 Usage(true);
51 return 3;
52 }
[3671]53
[3876]54 const char* desact[4] = {"PCIE_To_Ethernet", "Ethernet_To_Disk", "Ethernet_To_Visibilities","PCIE_DMA_To_Disk"};
[3757]55 string action=arg[1];
56 int act = 0;
[3876]57 if ((action != "-pci2eth")&&(action != "-pci2disk")&&(action != "-eth2disk")&&(action != "-eth2visib")) {
[3757]58 cout << " mfacq/Error , Bad action argument : " << action << endl;
59 return 2;
60 }
61 if (action == "-pci2eth") act=0;
[3876]62 else if (action == "-eth2disk") act=1;
63 else if (action == "-eth2visib") act=2;
64 else if (action == "-pci2disk") act=3;
[3757]65
66 string pardcfile=arg[2];
[3674]67#ifndef NOPCIECARD
68 string basedir="/Raid";
69#else
70 string basedir="./";
71#endif
[3757]72 vector<string> oargs;
73 if (narg>3) {
74 basedir=arg[2];
[3758]75 for(int jj=3; jj<narg; jj++) oargs.push_back(arg[jj]);
[3757]76 }
[3671]77 try {
78 // Creation/initialisation parametres Acq
79 BRAcqConfig acpar;
[3757]80 acpar.ReadParamFile(pardcfile);
[3674]81 acpar.GetConfig().SetBaseDirectory(basedir);
[3757]82 if ((act==0)&&(oargs.size()>0))
83 acpar.GetParams().SetEthTargets(oargs);
[3671]84 // Creation des repertoires
[3757]85 if (act > 0)
86 if (acpar.CreateOutputDirectories()!=0) return 9;
[3876]87 acpar.GetParams().fgdoVisiC=false;
88 if (act == 2) acpar.GetParams().fgdoVisiC=true;
89
[3671]90 acpar.Print(cout);
[3757]91 struct sigaction siact;
[3671]92 if (!acpar.GetParams().fg_hard_ctrlc) {
[3757]93 siact.sa_handler=Stop;
94 sigaction(SIGINT,&siact,NULL);
[3671]95 }
[3757]96 switch (act) {
97 case 0:
98 rc = PCIEToEthernetTransfer();
99 break;
[3876]100 case 1: // ethernet --> disk
101 rc = EthernetToMemoryAcq(false);
[3757]102 break;
[3876]103 case 2: // ethernet --> visibility calculation
104 rc = EthernetToMemoryAcq(true);
105 break;
106 case 3:
[3757]107 rc = MultiFibreAcq();
108 break;
109 }
[3671]110 }
111 catch (MiniFITSException& exc) {
112 cerr << " mfacq.cc catched MiniFITSException " << exc.Msg() << endl;
113 rc = 77;
114 }
115 catch (PCIEWException& exc) {
[3683]116 cerr << " mfacq.cc catched PCIEWException " << exc.Msg() << endl;
[3671]117 rc = 75;
118 }
119 catch (PThrowable& exc) {
120 cerr << " mfacq.cc catched Exception " << exc.Msg() << endl;
121 rc = 76;
122 }
123 catch (std::exception& sex) {
124 cerr << "\n mfacq.cc std::exception :"
125 << (string)typeid(sex).name() << "\n msg= "
126 << sex.what() << endl;
127 rc = 78;
128 }
129 catch (...) {
130 cerr << " mfacq.cc : Catched ... exception " << endl;
131 rc = 79;
132 }
133 cout << "mfacq[9] ----- END --- stopping acq program " << endl;
134 return rc;
135}
136
137
138/* --Nouvelle-Fonction-- */
139void Usage(bool fgshort)
140{
[3767]141 cout << " Usage: mfacq Action DataCardFile [BaseDirectory / TargetMachines]" << endl;
[3757]142 if (fgshort) return;
[3876]143 cout << " o Action = -pci2disk , -pci2eth , -eth2disk , -eth2visib \n "
[3766]144 << " DataCardFile : File name for parameters list (DataCards) : \n"
145 << " fibres outpathname skysource paqsize dmasizekb nbfiles \n"
146 << " nblocperfile acqmode memmgr monitor reducpaqsz ... "<< endl;
147 cout << " o BaseDirectory (default= /Raid ) for -pci2disk , -eth2disk" << endl;
[3767]148 cout << " o TargetMachines : List of target machines for PCIe2Ethernet (-pci2eth) \n " << endl;
[3671]149 return;
150}
151
152
153/* --Nouvelle-Fonction-- */
154int MultiFibreAcq()
155{
156 Timer tm("mfacq/MultiFibre");
157 cout << " ---------- mfacq/ MultiFibreAcq() ------------- " << endl;
[3757]158 PCIEWrapperInterface* pciwp[16]={NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,
159 NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL};
[3671]160
161 BRAcqConfig bpar;
162 BRParList& acpar=bpar.GetParams();
163
164 cout << " mfacq[1]/Info: Creating RAcqMemZoneMgr for" << acpar.NbFibers << " fibers , nZones="
165 << acpar.nZones << " NbPaquet/Zone=" << acpar.nPaqZone
[3674]166 << " MmgrPaqSize=" << acpar.MMgrPaquetSize() << endl;
[3671]167 RAcqMemZoneMgr mmgr(acpar.nZones, bpar.NFibers(), acpar.nPaqZone, acpar.MMgrPaquetSize());
168 pMmgr=&mmgr;
[3681]169 if (acpar.fgdoProc && (acpar.stepProc>0)) {
170 // Dans ce cas, toutes les zones doivent passer ds le thread de monitoring
171 // pour etre au moins marque comme traite - seul 1/acpar.stepProc sont effectivement traite
172 mmgr.SetFinalizedMask(((uint_4)MemZS_Saved) | ((uint_4)MemZS_Proc));
173 cout << " mfacq[1.b]/Info: Mmgr.SetFinalizedMask( MemZS_Saved | MemZS_Proc )" << endl;
[3758]174 //mmgr.SetFinalizedMask( (uint_4)MemZS_Saved );
175 //cout << " mfacq[1.b]/Info: Mmgr.SetFinalizedMask( MemZS_Saved )" << endl;
[3681]176 }
177
[3671]178#ifndef NOPCIECARD
179 for (int i=0 ;i <acpar.NFibers() ;i++) {
180 UINT32 card=(acpar.FiberNum[i]-1)/2+1;
[3680]181 UINT32 cardfiber=(acpar.FiberNum[i]-1)%2;
[3671]182 cout <<"mfacq[2] CreatePCIEWrapperV6- indice " << i << "INIT card " << card
183 << " fibre " << cardfiber << endl;
[3683]184 pciwp[i] = CreatePCIEWrapperV6(card,acpar.PatternSize(),acpar.DMASizeBytes(),acpar.activate_pattern,cardfiber);
[3671]185 }
186#else
187 TestPCIWrapperNODMA pciw1(bpar.RecvPaquetSize(), acpar.nopciLossRate);
188 TestPCIWrapperNODMA pciw2(bpar.RecvPaquetSize(), acpar.nopciLossRate);
189 TestPCIWrapperNODMA pciw3(bpar.RecvPaquetSize(), acpar.nopciLossRate);
190 TestPCIWrapperNODMA pciw4(bpar.RecvPaquetSize(), acpar.nopciLossRate);
[3757]191 TestPCIWrapperNODMA pciw5(bpar.RecvPaquetSize(), acpar.nopciLossRate);
192 TestPCIWrapperNODMA pciw6(bpar.RecvPaquetSize(), acpar.nopciLossRate);
193 TestPCIWrapperNODMA pciw7(bpar.RecvPaquetSize(), acpar.nopciLossRate);
194 TestPCIWrapperNODMA pciw8(bpar.RecvPaquetSize(), acpar.nopciLossRate);
[3671]195 pciwp[0] = &pciw1;
196 pciwp[1] = &pciw2;
197 pciwp[2] = &pciw3;
[3757]198 pciwp[3] = &pciw4;
199 pciwp[4] = &pciw5;
200 pciwp[5] = &pciw6;
201 pciwp[6] = &pciw7;
202 pciwp[7] = &pciw8;
[3671]203#endif
204
[3681]205 cout <<"mfacq[2] Creating MultiDataSaver and MonitorProc thread objects ... " << endl;
[3672]206 MultiDataSaver DsThr(mmgr); // Utilise les parametres globaux BRAcqConfig
[3671]207 string ppath=bpar.OutputDirectory();
[3681]208 MonitorProc PrThr(mmgr);
[3758]209 cout << "mfacq[3] Creating PCIEMultiReader thread object " << endl;
[3671]210 vector<PCIEWrapperInterface*> vec_pciw;
[3758]211 for (int i=0 ;i<bpar.NFibers();i++) {
212 vec_pciw.push_back( pciwp[i]);
213 // cout << " mfacq[3.b]/Debug - pciwp[" << i << "] " << hex << pciwp[i] << dec << endl;
214 }
[3672]215 PCIEMultiReader PcierThr(vec_pciw, mmgr, bpar.GetParams());
[3681]216 // usleep(200); attente au cas ou ...
[3671]217 pPcierThr=&PcierThr;
218 tm.Split("Threads created");
[3681]219 if (acpar.fgdoProc>0)
220 cout << "mfacq[4] - starting three threads: PCIEMultiReader, MultiDataSaver, MonitorProc ... " << endl;
221 else
222 cout << "mfacq[4] - starting two threads: PCIEMultiReader, MultiDataSaver ... " << endl;
223
[3671]224 PcierThr.start();
225 DsThr.start();
[3681]226 if (acpar.fgdoProc>0) { // On ne demarre que si au moins NMaxProc>0
[3671]227 PrThr.start();
228 }
229
230 // On attend avant de declencher la terminaison des threads
[3681]231 usleep(200000);
[3671]232
233 cout << "mfacq[5] - Waiting for threads to finish ... " << endl;
234 PcierThr.join();
235 DsThr.join();
236 mmgr.Stop();
[3681]237 if (acpar.fgdoProc) { // On n'attend la fin que si le thread a ete demarre (NMaxProc>0)
[3671]238 PrThr.join();
239 }
240 pMmgr=NULL;
241 cout << "mfacq[6] ---------- threads finished ---------------- " << endl;
242 tm.Split("Threads Finished");
243
[3681]244 mmgr.Print(cout);
[3671]245#ifndef NOPCIECARD
246 for (int i=0 ;i <acpar.NbFibers ;i++) {
247 UINT32 card=(acpar.FiberNum[i]-1)/2+1;
248 UINT32 cardfiber=(acpar.FiberNum[i]-1)%2;
249 DeletePCIEWrapperV6(card,cardfiber);
250 }
251#endif
252return 0;
253
254}
[3757]255
256/* --Nouvelle-Fonction-- */
257int PCIEToEthernetTransfer()
258{
259 Timer tm("mfacq/PCIEToEthernetTransfer");
260 cout << " ---------- mfacq/ PCIEToEthernetTransfer() ------------- " << endl;
261 PCIEWrapperInterface* pciwp[16]={NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,
262 NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL};
263
264 pMmgr=NULL;
265 BRAcqConfig bpar;
266 BRParList& acpar=bpar.GetParams();
267
268 cout << " mfacq[1]/Info: PCIEToEthernet for" << acpar.NbFibers << " fibers , nZones="
269 << acpar.nZones << " NbPaquet/Zone=" << acpar.nPaqZone
270 << " MmgrPaqSize=" << acpar.MMgrPaquetSize() << endl;
271#ifndef NOPCIECARD
272 for (int i=0 ;i <acpar.NFibers() ;i++) {
273 UINT32 card=(acpar.FiberNum[i]-1)/2+1;
274 UINT32 cardfiber=(acpar.FiberNum[i]-1)%2;
275 cout <<"mfacq[2] CreatePCIEWrapperV6- indice " << i << "INIT card " << card
276 << " fibre " << cardfiber << endl;
277 pciwp[i] = CreatePCIEWrapperV6(card,acpar.PatternSize(),acpar.DMASizeBytes(),acpar.activate_pattern,cardfiber);
278 }
279#else
280 TestPCIWrapperNODMA pciw1(bpar.RecvPaquetSize(), acpar.nopciLossRate);
281 TestPCIWrapperNODMA pciw2(bpar.RecvPaquetSize(), acpar.nopciLossRate);
282 TestPCIWrapperNODMA pciw3(bpar.RecvPaquetSize(), acpar.nopciLossRate);
283 TestPCIWrapperNODMA pciw4(bpar.RecvPaquetSize(), acpar.nopciLossRate);
284 TestPCIWrapperNODMA pciw5(bpar.RecvPaquetSize(), acpar.nopciLossRate);
285 TestPCIWrapperNODMA pciw6(bpar.RecvPaquetSize(), acpar.nopciLossRate);
286 TestPCIWrapperNODMA pciw7(bpar.RecvPaquetSize(), acpar.nopciLossRate);
287 TestPCIWrapperNODMA pciw8(bpar.RecvPaquetSize(), acpar.nopciLossRate);
288 pciwp[0] = &pciw1;
289 pciwp[1] = &pciw2;
290 pciwp[2] = &pciw3;
291 pciwp[3] = &pciw4;
292 pciwp[4] = &pciw5;
293 pciwp[5] = &pciw6;
294 pciwp[6] = &pciw7;
295 pciwp[7] = &pciw8;
296#endif
297
298 vector<PCIEWrapperInterface*> vec_pciw;
299 for (int i=0 ;i<bpar.NFibers();i++) vec_pciw.push_back( pciwp[i]);
300 cout <<"mfacq[2] Creating PCIEToEthernet thread object " << endl;
[3761]301 PCIEToEthernet pci2eth(vec_pciw, bpar.GetParams().GetEthTargets(), bpar.GetParams(), bpar.GetParams().tcpportid);
[3897]302 pci2eth.SetPrintLevel(acpar.prtlevel_, acpar.prtmodulo_);
[3757]303 // usleep(200); attente au cas ou ...
304 pPcie2Eth=&pci2eth;
305 tm.Split("Threads created");
306 cout << "mfacq[3] - starting one threads: PCIEToEthernet... " << endl;
307 pci2eth.start();
308
309 // On attend avant de declencher la terminaison des threads
310 usleep(200000);
311
312 cout << "mfacq[5] - Waiting for threads to finish ... " << endl;
313 pci2eth.join();
314 cout << "mfacq[6] ---------- threads finished ---------------- " << endl;
315 tm.Split("Threads Finished");
316
317#ifndef NOPCIECARD
318 for (int i=0 ;i <acpar.NbFibers ;i++) {
319 UINT32 card=(acpar.FiberNum[i]-1)/2+1;
320 UINT32 cardfiber=(acpar.FiberNum[i]-1)%2;
321 DeletePCIEWrapperV6(card,cardfiber);
322 }
323#endif
324return 0;
325}
326
[3876]327
328/* Fonction-Utilitaire : Voir en fin de fichier */
329MemZaction ConvertMemZ_Status2Action( MemZStatus st );
330
[3757]331/* --Nouvelle-Fonction-- */
[3876]332int EthernetToMemoryAcq(bool fgviscal)
[3757]333{
334 Timer tm("mfacq/EthernetToMemoryAcq");
335 cout << " ---------- mfacq/ EthernetToMemoryAcq() ------------- " << endl;
336
337 BRAcqConfig bpar;
338 BRParList& acpar=bpar.GetParams();
[3876]339
340 if (fgviscal) cout << "mfacq[0]/EthernetToMemoryAcq() : On the fly Visibility Calculation " << endl;
341 else cout << "mfacq[0]/EthernetToMemoryAcq() : data dumped to disk " << endl;
[3757]342 cout << " mfacq[1]/Info: Creating RAcqMemZoneMgr for" << acpar.NbEthLinks() << " Eth-links , nZones="
343 << acpar.nZones << " NbPaquet/Zone=" << acpar.nPaqZone
344 << " MmgrPaqSize=" << acpar.MMgrPaquetSize() << endl;
345 RAcqMemZoneMgr mmgr(acpar.nZones, acpar.NbEthLinks(), acpar.nPaqZone, acpar.MMgrPaquetSize());
346 pMmgr=&mmgr;
[3876]347
348
349 string strfmask;
350 uint_4 fmask=0;
351 MemZStatus mskmon=MemZS_Proc;
352 if (fgviscal) { // gestion de finalized_mask lors du calcul de visibilites
353 if (acpar.nbcalgrpVisiC<1) acpar.nbcalgrpVisiC=1;
354 if (acpar.nbcalgrpVisiC>4) acpar.nbcalgrpVisiC=4;
355 cout << " mfacq[1.b]/Info: NbCalGrpVisiC = " << acpar.nbcalgrpVisiC << " (1<=NbCalGrpVisiC<=4)" << endl;
356 fmask=(uint_4)MemZS_ProcA; strfmask="MemZS_ProcA"; mskmon=MemZS_ProcB;
357 if (acpar.nbcalgrpVisiC>1)
358 { fmask |= (uint_4)MemZS_ProcB; strfmask+=" | MemZS_ProcB"; mskmon=MemZS_ProcC; }
359 if (acpar.nbcalgrpVisiC>2)
360 { fmask |= (uint_4)MemZS_ProcC; strfmask+=" | MemZS_ProcC"; mskmon=MemZS_ProcD; }
361 if (acpar.nbcalgrpVisiC>3)
362 { fmask |= (uint_4)MemZS_ProcD; strfmask+=" | MemZS_ProcD"; mskmon=MemZS_ProcE; }
363 }
364 else {
365 fmask=(uint_4)MemZS_Saved; strfmask="MemZS_Saved"; mskmon=MemZS_Proc;
366 }
367 if (acpar.fgdoProc)
368 { fmask |= (uint_4)mskmon; strfmask+=" | MemZS_Proc[Monotoring]"; }
[3757]369 // Dans ce cas, toutes les zones doivent passer ds le thread de monitoring
370 // pour etre au moins marque comme traite - seul 1/acpar.stepProc sont effectivement traite
[3876]371
372 mmgr.SetFinalizedMask(fmask);
373 cout << " mfacq[1.b]/Info: Mmgr.SetFinalizedMask( " << strfmask << " )" << endl;
[3757]374
[3876]375 if (fgviscal)
376 cout <<"mfacq[2] Creating BRVisCalcGroup, NbVisibCalculator=" << acpar.nbcalgrpVisiC
377 << " and MonitorProc " << endl;
378 else
379 cout <<"mfacq[2] Creating MultiDataSaver and MonitorProc thread objects ... " << endl;
[3757]380 MultiDataSaver DsThr(mmgr); // Utilise les parametres globaux BRAcqConfig
381 string ppath=bpar.OutputDirectory();
382 MonitorProc PrThr(mmgr);
[3876]383 PrThr.SetMemZAction( ConvertMemZ_Status2Action( mskmon ) );
384 BRVisCalcGroup VCGThr(acpar.nbcalgrpVisiC, mmgr, bpar.OutputDirectory(), acpar.nmeanVisiC, acpar.nthrVisiC);
385 VCGThr.SelectFreqBinning(acpar.freqminVisiC, acpar.freqmaxVisiC, acpar.nbinfreqVisiC);
386
387
[3757]388 cout << "mfacq[3] Creating EthernetReader thread object " << endl;
[3897]389 EthernetReader ethrdr(mmgr, bpar.GetParams(), bpar.GetParams().tcpportid);
390 ethrdr.SetReadMode(acpar.ethr_forcesamefc_, acpar.ethr_sfc_maxdpc_,acpar.ethr_sfc_maxresync_);
[3899]391 ethrdr.WaitENDMsg4Terminate(acpar.ethr_waitendmsg_);
[3884]392 ethrdr.SetPrintLevel(acpar.prtlevel_, acpar.prtmodulo_);
[3760]393
[3757]394 // usleep(200); attente au cas ou ...
395 pEthRdr=&ethrdr;
[3876]396 tm.Split("Threads created");
397 cout << "mfacq[4] - starting EthernetReader thread object ..." << endl;
[3757]398 ethrdr.start();
[3876]399 if (fgviscal) {
400 cout << "mfacq[4.b] - starting Visibility calculator threads " << endl;
401 VCGThr.start();
402 }
403 else {
404 cout << "mfacq[4.b] - starting MultiDataSaver " << endl;
405 DsThr.start();
406 }
407 if (acpar.fgdoProc) { // demarrage (optionnel) du thread de monitoring
408 cout << " mfacq[4.c] - starting MonitorProc thread object " << endl;
[3757]409 PrThr.start();
410 }
411
412 // On attend avant de declencher la terminaison des threads
413 usleep(200000);
414
415 cout << "mfacq[5] - Waiting for threads to finish ... " << endl;
416 ethrdr.join();
[3876]417 if (fgviscal) VCGThr.join();
418 else DsThr.join();
[3757]419 mmgr.Stop();
420 if (acpar.fgdoProc) { // On n'attend la fin que si le thread a ete demarre (NMaxProc>0)
421 PrThr.join();
422 }
423 pMmgr=NULL;
424 cout << "mfacq[6] ---------- threads finished ---------------- " << endl;
425 tm.Split("Threads Finished");
426
427 mmgr.Print(cout);
428
429return 0;
430
431}
[3876]432
433
434/* --Nouvelle-Fonction-Utilitaire */
435MemZaction ConvertMemZ_Status2Action( MemZStatus st )
436{
437 MemZaction ra=MemZA_None;
438 switch (st) {
439 case MemZS_Filled:
440 ra=MemZA_Fill;
441 break;
442 case MemZS_Saved:
443 ra=MemZA_Save;
444 break;
445 case MemZS_Proc:
446 ra=MemZA_Proc;
447 break;
448 case MemZS_ProcA:
449 ra=MemZA_ProcA;
450 break;
451 case MemZS_ProcB:
452 ra=MemZA_ProcB;
453 break;
454 case MemZS_ProcC:
455 ra=MemZA_ProcC;
456 break;
457 case MemZS_ProcD:
458 ra=MemZA_ProcD;
459 break;
460 case MemZS_ProcE:
461 ra=MemZA_ProcE;
462 break;
463 case MemZS_ProcF:
464 ra=MemZA_ProcF;
465 break;
466 default:
467 ra=MemZA_None;
468 break;
469 }
470 return ra;
471}
Note: See TracBrowser for help on using the repository browser.