source: Sophya/trunk/AddOn/TAcq/mfacq.cc@ 3897

Last change on this file since 3897 was 3897, checked in by ansari, 15 years ago

Amelioration des classes PCIEToEthernet et EthernetReader pour eviter des situations de blocage lorsque lecture avec ForceSameFrameCounter, Reza 04/10/2010

File size: 16.1 KB
Line 
1#include "mfacq.h"
2
3//---------------------------------------------------------------
4// Programme d'acquisition BAORadio multi-fibres/multi-threads
5// LAL - 2009 - 2010
6// R. Ansari, M.Taurigna
7//---------------------------------------------------------------
8
9static RAcqMemZoneMgr* pMmgr=NULL;
10static PCIEMultiReader* pPcierThr=NULL;
11static PCIEToEthernet* pPcie2Eth=NULL;
12static EthernetReader* pEthRdr=NULL;
13
14void Stop(int s)
15{
16 if (s == 9765) cout << " Stop after exception ..." << endl;
17 else printf("............. MAIN ... receive signal %d \n",s);
18 if (pMmgr != NULL) pMmgr->Stop();
19 if (pPcie2Eth !=NULL) pPcie2Eth->Stop();
20 if (pPcierThr !=NULL) pPcierThr->Stop();
21 if (pEthRdr !=NULL) pEthRdr->Stop();
22}
23
24
25//-----------------------------------------------------------------------
26//-------------------- le programme principal ---------------------------
27//-----------------------------------------------------------------------
28int main(int narg, char* arg[])
29{
30
31 int rc = 0;
32 cout << " ============= BAORadio / Acquisition : mfacq =================" << endl;
33 cout << " ===============================================================" << endl;
34 cout << " ========= " <<BAOR_ACQ_VER_STR <<BAOR_ACQ_VER << " ===========" << endl;
35 cout << " ===============================================================" << endl;
36
37
38 InitTim();
39
40 // Initialisation
41 TArrayInitiator _arri;
42 FitsIOServerInitiator _fiosi;
43
44
45 if ((narg > 1)&&(strcmp(arg[1],"-h"))==0) {
46 Usage(false);
47 return 1;
48 }
49 if (narg < 3) {
50 Usage(true);
51 return 3;
52 }
53
54 const char* desact[4] = {"PCIE_To_Ethernet", "Ethernet_To_Disk", "Ethernet_To_Visibilities","PCIE_DMA_To_Disk"};
55 string action=arg[1];
56 int act = 0;
57 if ((action != "-pci2eth")&&(action != "-pci2disk")&&(action != "-eth2disk")&&(action != "-eth2visib")) {
58 cout << " mfacq/Error , Bad action argument : " << action << endl;
59 return 2;
60 }
61 if (action == "-pci2eth") act=0;
62 else if (action == "-eth2disk") act=1;
63 else if (action == "-eth2visib") act=2;
64 else if (action == "-pci2disk") act=3;
65
66 string pardcfile=arg[2];
67#ifndef NOPCIECARD
68 string basedir="/Raid";
69#else
70 string basedir="./";
71#endif
72 vector<string> oargs;
73 if (narg>3) {
74 basedir=arg[2];
75 for(int jj=3; jj<narg; jj++) oargs.push_back(arg[jj]);
76 }
77 try {
78 // Creation/initialisation parametres Acq
79 BRAcqConfig acpar;
80 acpar.ReadParamFile(pardcfile);
81 acpar.GetConfig().SetBaseDirectory(basedir);
82 if ((act==0)&&(oargs.size()>0))
83 acpar.GetParams().SetEthTargets(oargs);
84 // Creation des repertoires
85 if (act > 0)
86 if (acpar.CreateOutputDirectories()!=0) return 9;
87 acpar.GetParams().fgdoVisiC=false;
88 if (act == 2) acpar.GetParams().fgdoVisiC=true;
89
90 acpar.Print(cout);
91 struct sigaction siact;
92 if (!acpar.GetParams().fg_hard_ctrlc) {
93 siact.sa_handler=Stop;
94 sigaction(SIGINT,&siact,NULL);
95 }
96 switch (act) {
97 case 0:
98 rc = PCIEToEthernetTransfer();
99 break;
100 case 1: // ethernet --> disk
101 rc = EthernetToMemoryAcq(false);
102 break;
103 case 2: // ethernet --> visibility calculation
104 rc = EthernetToMemoryAcq(true);
105 break;
106 case 3:
107 rc = MultiFibreAcq();
108 break;
109 }
110 }
111 catch (MiniFITSException& exc) {
112 cerr << " mfacq.cc catched MiniFITSException " << exc.Msg() << endl;
113 rc = 77;
114 }
115 catch (PCIEWException& exc) {
116 cerr << " mfacq.cc catched PCIEWException " << exc.Msg() << endl;
117 rc = 75;
118 }
119 catch (PThrowable& exc) {
120 cerr << " mfacq.cc catched Exception " << exc.Msg() << endl;
121 rc = 76;
122 }
123 catch (std::exception& sex) {
124 cerr << "\n mfacq.cc std::exception :"
125 << (string)typeid(sex).name() << "\n msg= "
126 << sex.what() << endl;
127 rc = 78;
128 }
129 catch (...) {
130 cerr << " mfacq.cc : Catched ... exception " << endl;
131 rc = 79;
132 }
133 cout << "mfacq[9] ----- END --- stopping acq program " << endl;
134 return rc;
135}
136
137
138/* --Nouvelle-Fonction-- */
139void Usage(bool fgshort)
140{
141 cout << " Usage: mfacq Action DataCardFile [BaseDirectory / TargetMachines]" << endl;
142 if (fgshort) return;
143 cout << " o Action = -pci2disk , -pci2eth , -eth2disk , -eth2visib \n "
144 << " DataCardFile : File name for parameters list (DataCards) : \n"
145 << " fibres outpathname skysource paqsize dmasizekb nbfiles \n"
146 << " nblocperfile acqmode memmgr monitor reducpaqsz ... "<< endl;
147 cout << " o BaseDirectory (default= /Raid ) for -pci2disk , -eth2disk" << endl;
148 cout << " o TargetMachines : List of target machines for PCIe2Ethernet (-pci2eth) \n " << endl;
149 return;
150}
151
152
153/* --Nouvelle-Fonction-- */
154int MultiFibreAcq()
155{
156 Timer tm("mfacq/MultiFibre");
157 cout << " ---------- mfacq/ MultiFibreAcq() ------------- " << endl;
158 PCIEWrapperInterface* pciwp[16]={NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,
159 NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL};
160
161 BRAcqConfig bpar;
162 BRParList& acpar=bpar.GetParams();
163
164 cout << " mfacq[1]/Info: Creating RAcqMemZoneMgr for" << acpar.NbFibers << " fibers , nZones="
165 << acpar.nZones << " NbPaquet/Zone=" << acpar.nPaqZone
166 << " MmgrPaqSize=" << acpar.MMgrPaquetSize() << endl;
167 RAcqMemZoneMgr mmgr(acpar.nZones, bpar.NFibers(), acpar.nPaqZone, acpar.MMgrPaquetSize());
168 pMmgr=&mmgr;
169 if (acpar.fgdoProc && (acpar.stepProc>0)) {
170 // Dans ce cas, toutes les zones doivent passer ds le thread de monitoring
171 // pour etre au moins marque comme traite - seul 1/acpar.stepProc sont effectivement traite
172 mmgr.SetFinalizedMask(((uint_4)MemZS_Saved) | ((uint_4)MemZS_Proc));
173 cout << " mfacq[1.b]/Info: Mmgr.SetFinalizedMask( MemZS_Saved | MemZS_Proc )" << endl;
174 //mmgr.SetFinalizedMask( (uint_4)MemZS_Saved );
175 //cout << " mfacq[1.b]/Info: Mmgr.SetFinalizedMask( MemZS_Saved )" << endl;
176 }
177
178#ifndef NOPCIECARD
179 for (int i=0 ;i <acpar.NFibers() ;i++) {
180 UINT32 card=(acpar.FiberNum[i]-1)/2+1;
181 UINT32 cardfiber=(acpar.FiberNum[i]-1)%2;
182 cout <<"mfacq[2] CreatePCIEWrapperV6- indice " << i << "INIT card " << card
183 << " fibre " << cardfiber << endl;
184 pciwp[i] = CreatePCIEWrapperV6(card,acpar.PatternSize(),acpar.DMASizeBytes(),acpar.activate_pattern,cardfiber);
185 }
186#else
187 TestPCIWrapperNODMA pciw1(bpar.RecvPaquetSize(), acpar.nopciLossRate);
188 TestPCIWrapperNODMA pciw2(bpar.RecvPaquetSize(), acpar.nopciLossRate);
189 TestPCIWrapperNODMA pciw3(bpar.RecvPaquetSize(), acpar.nopciLossRate);
190 TestPCIWrapperNODMA pciw4(bpar.RecvPaquetSize(), acpar.nopciLossRate);
191 TestPCIWrapperNODMA pciw5(bpar.RecvPaquetSize(), acpar.nopciLossRate);
192 TestPCIWrapperNODMA pciw6(bpar.RecvPaquetSize(), acpar.nopciLossRate);
193 TestPCIWrapperNODMA pciw7(bpar.RecvPaquetSize(), acpar.nopciLossRate);
194 TestPCIWrapperNODMA pciw8(bpar.RecvPaquetSize(), acpar.nopciLossRate);
195 pciwp[0] = &pciw1;
196 pciwp[1] = &pciw2;
197 pciwp[2] = &pciw3;
198 pciwp[3] = &pciw4;
199 pciwp[4] = &pciw5;
200 pciwp[5] = &pciw6;
201 pciwp[6] = &pciw7;
202 pciwp[7] = &pciw8;
203#endif
204
205 cout <<"mfacq[2] Creating MultiDataSaver and MonitorProc thread objects ... " << endl;
206 MultiDataSaver DsThr(mmgr); // Utilise les parametres globaux BRAcqConfig
207 string ppath=bpar.OutputDirectory();
208 MonitorProc PrThr(mmgr);
209 cout << "mfacq[3] Creating PCIEMultiReader thread object " << endl;
210 vector<PCIEWrapperInterface*> vec_pciw;
211 for (int i=0 ;i<bpar.NFibers();i++) {
212 vec_pciw.push_back( pciwp[i]);
213 // cout << " mfacq[3.b]/Debug - pciwp[" << i << "] " << hex << pciwp[i] << dec << endl;
214 }
215 PCIEMultiReader PcierThr(vec_pciw, mmgr, bpar.GetParams());
216 // usleep(200); attente au cas ou ...
217 pPcierThr=&PcierThr;
218 tm.Split("Threads created");
219 if (acpar.fgdoProc>0)
220 cout << "mfacq[4] - starting three threads: PCIEMultiReader, MultiDataSaver, MonitorProc ... " << endl;
221 else
222 cout << "mfacq[4] - starting two threads: PCIEMultiReader, MultiDataSaver ... " << endl;
223
224 PcierThr.start();
225 DsThr.start();
226 if (acpar.fgdoProc>0) { // On ne demarre que si au moins NMaxProc>0
227 PrThr.start();
228 }
229
230 // On attend avant de declencher la terminaison des threads
231 usleep(200000);
232
233 cout << "mfacq[5] - Waiting for threads to finish ... " << endl;
234 PcierThr.join();
235 DsThr.join();
236 mmgr.Stop();
237 if (acpar.fgdoProc) { // On n'attend la fin que si le thread a ete demarre (NMaxProc>0)
238 PrThr.join();
239 }
240 pMmgr=NULL;
241 cout << "mfacq[6] ---------- threads finished ---------------- " << endl;
242 tm.Split("Threads Finished");
243
244 mmgr.Print(cout);
245#ifndef NOPCIECARD
246 for (int i=0 ;i <acpar.NbFibers ;i++) {
247 UINT32 card=(acpar.FiberNum[i]-1)/2+1;
248 UINT32 cardfiber=(acpar.FiberNum[i]-1)%2;
249 DeletePCIEWrapperV6(card,cardfiber);
250 }
251#endif
252return 0;
253
254}
255
256/* --Nouvelle-Fonction-- */
257int PCIEToEthernetTransfer()
258{
259 Timer tm("mfacq/PCIEToEthernetTransfer");
260 cout << " ---------- mfacq/ PCIEToEthernetTransfer() ------------- " << endl;
261 PCIEWrapperInterface* pciwp[16]={NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,
262 NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL};
263
264 pMmgr=NULL;
265 BRAcqConfig bpar;
266 BRParList& acpar=bpar.GetParams();
267
268 cout << " mfacq[1]/Info: PCIEToEthernet for" << acpar.NbFibers << " fibers , nZones="
269 << acpar.nZones << " NbPaquet/Zone=" << acpar.nPaqZone
270 << " MmgrPaqSize=" << acpar.MMgrPaquetSize() << endl;
271#ifndef NOPCIECARD
272 for (int i=0 ;i <acpar.NFibers() ;i++) {
273 UINT32 card=(acpar.FiberNum[i]-1)/2+1;
274 UINT32 cardfiber=(acpar.FiberNum[i]-1)%2;
275 cout <<"mfacq[2] CreatePCIEWrapperV6- indice " << i << "INIT card " << card
276 << " fibre " << cardfiber << endl;
277 pciwp[i] = CreatePCIEWrapperV6(card,acpar.PatternSize(),acpar.DMASizeBytes(),acpar.activate_pattern,cardfiber);
278 }
279#else
280 TestPCIWrapperNODMA pciw1(bpar.RecvPaquetSize(), acpar.nopciLossRate);
281 TestPCIWrapperNODMA pciw2(bpar.RecvPaquetSize(), acpar.nopciLossRate);
282 TestPCIWrapperNODMA pciw3(bpar.RecvPaquetSize(), acpar.nopciLossRate);
283 TestPCIWrapperNODMA pciw4(bpar.RecvPaquetSize(), acpar.nopciLossRate);
284 TestPCIWrapperNODMA pciw5(bpar.RecvPaquetSize(), acpar.nopciLossRate);
285 TestPCIWrapperNODMA pciw6(bpar.RecvPaquetSize(), acpar.nopciLossRate);
286 TestPCIWrapperNODMA pciw7(bpar.RecvPaquetSize(), acpar.nopciLossRate);
287 TestPCIWrapperNODMA pciw8(bpar.RecvPaquetSize(), acpar.nopciLossRate);
288 pciwp[0] = &pciw1;
289 pciwp[1] = &pciw2;
290 pciwp[2] = &pciw3;
291 pciwp[3] = &pciw4;
292 pciwp[4] = &pciw5;
293 pciwp[5] = &pciw6;
294 pciwp[6] = &pciw7;
295 pciwp[7] = &pciw8;
296#endif
297
298 vector<PCIEWrapperInterface*> vec_pciw;
299 for (int i=0 ;i<bpar.NFibers();i++) vec_pciw.push_back( pciwp[i]);
300 cout <<"mfacq[2] Creating PCIEToEthernet thread object " << endl;
301 PCIEToEthernet pci2eth(vec_pciw, bpar.GetParams().GetEthTargets(), bpar.GetParams(), bpar.GetParams().tcpportid);
302 pci2eth.SetPrintLevel(acpar.prtlevel_, acpar.prtmodulo_);
303 // usleep(200); attente au cas ou ...
304 pPcie2Eth=&pci2eth;
305 tm.Split("Threads created");
306 cout << "mfacq[3] - starting one threads: PCIEToEthernet... " << endl;
307 pci2eth.start();
308
309 // On attend avant de declencher la terminaison des threads
310 usleep(200000);
311
312 cout << "mfacq[5] - Waiting for threads to finish ... " << endl;
313 pci2eth.join();
314 cout << "mfacq[6] ---------- threads finished ---------------- " << endl;
315 tm.Split("Threads Finished");
316
317#ifndef NOPCIECARD
318 for (int i=0 ;i <acpar.NbFibers ;i++) {
319 UINT32 card=(acpar.FiberNum[i]-1)/2+1;
320 UINT32 cardfiber=(acpar.FiberNum[i]-1)%2;
321 DeletePCIEWrapperV6(card,cardfiber);
322 }
323#endif
324return 0;
325}
326
327
328/* Fonction-Utilitaire : Voir en fin de fichier */
329MemZaction ConvertMemZ_Status2Action( MemZStatus st );
330
331/* --Nouvelle-Fonction-- */
332int EthernetToMemoryAcq(bool fgviscal)
333{
334 Timer tm("mfacq/EthernetToMemoryAcq");
335 cout << " ---------- mfacq/ EthernetToMemoryAcq() ------------- " << endl;
336
337 BRAcqConfig bpar;
338 BRParList& acpar=bpar.GetParams();
339
340 if (fgviscal) cout << "mfacq[0]/EthernetToMemoryAcq() : On the fly Visibility Calculation " << endl;
341 else cout << "mfacq[0]/EthernetToMemoryAcq() : data dumped to disk " << endl;
342 cout << " mfacq[1]/Info: Creating RAcqMemZoneMgr for" << acpar.NbEthLinks() << " Eth-links , nZones="
343 << acpar.nZones << " NbPaquet/Zone=" << acpar.nPaqZone
344 << " MmgrPaqSize=" << acpar.MMgrPaquetSize() << endl;
345 RAcqMemZoneMgr mmgr(acpar.nZones, acpar.NbEthLinks(), acpar.nPaqZone, acpar.MMgrPaquetSize());
346 pMmgr=&mmgr;
347
348
349 string strfmask;
350 uint_4 fmask=0;
351 MemZStatus mskmon=MemZS_Proc;
352 if (fgviscal) { // gestion de finalized_mask lors du calcul de visibilites
353 if (acpar.nbcalgrpVisiC<1) acpar.nbcalgrpVisiC=1;
354 if (acpar.nbcalgrpVisiC>4) acpar.nbcalgrpVisiC=4;
355 cout << " mfacq[1.b]/Info: NbCalGrpVisiC = " << acpar.nbcalgrpVisiC << " (1<=NbCalGrpVisiC<=4)" << endl;
356 fmask=(uint_4)MemZS_ProcA; strfmask="MemZS_ProcA"; mskmon=MemZS_ProcB;
357 if (acpar.nbcalgrpVisiC>1)
358 { fmask |= (uint_4)MemZS_ProcB; strfmask+=" | MemZS_ProcB"; mskmon=MemZS_ProcC; }
359 if (acpar.nbcalgrpVisiC>2)
360 { fmask |= (uint_4)MemZS_ProcC; strfmask+=" | MemZS_ProcC"; mskmon=MemZS_ProcD; }
361 if (acpar.nbcalgrpVisiC>3)
362 { fmask |= (uint_4)MemZS_ProcD; strfmask+=" | MemZS_ProcD"; mskmon=MemZS_ProcE; }
363 }
364 else {
365 fmask=(uint_4)MemZS_Saved; strfmask="MemZS_Saved"; mskmon=MemZS_Proc;
366 }
367 if (acpar.fgdoProc)
368 { fmask |= (uint_4)mskmon; strfmask+=" | MemZS_Proc[Monotoring]"; }
369 // Dans ce cas, toutes les zones doivent passer ds le thread de monitoring
370 // pour etre au moins marque comme traite - seul 1/acpar.stepProc sont effectivement traite
371
372 mmgr.SetFinalizedMask(fmask);
373 cout << " mfacq[1.b]/Info: Mmgr.SetFinalizedMask( " << strfmask << " )" << endl;
374
375 if (fgviscal)
376 cout <<"mfacq[2] Creating BRVisCalcGroup, NbVisibCalculator=" << acpar.nbcalgrpVisiC
377 << " and MonitorProc " << endl;
378 else
379 cout <<"mfacq[2] Creating MultiDataSaver and MonitorProc thread objects ... " << endl;
380 MultiDataSaver DsThr(mmgr); // Utilise les parametres globaux BRAcqConfig
381 string ppath=bpar.OutputDirectory();
382 MonitorProc PrThr(mmgr);
383 PrThr.SetMemZAction( ConvertMemZ_Status2Action( mskmon ) );
384 BRVisCalcGroup VCGThr(acpar.nbcalgrpVisiC, mmgr, bpar.OutputDirectory(), acpar.nmeanVisiC, acpar.nthrVisiC);
385 VCGThr.SelectFreqBinning(acpar.freqminVisiC, acpar.freqmaxVisiC, acpar.nbinfreqVisiC);
386
387
388 cout << "mfacq[3] Creating EthernetReader thread object " << endl;
389 EthernetReader ethrdr(mmgr, bpar.GetParams(), bpar.GetParams().tcpportid);
390 ethrdr.SetReadMode(acpar.ethr_forcesamefc_, acpar.ethr_sfc_maxdpc_,acpar.ethr_sfc_maxresync_);
391 ethrdr.SetPrintLevel(acpar.prtlevel_, acpar.prtmodulo_);
392
393 // usleep(200); attente au cas ou ...
394 pEthRdr=&ethrdr;
395 tm.Split("Threads created");
396 cout << "mfacq[4] - starting EthernetReader thread object ..." << endl;
397 ethrdr.start();
398 if (fgviscal) {
399 cout << "mfacq[4.b] - starting Visibility calculator threads " << endl;
400 VCGThr.start();
401 }
402 else {
403 cout << "mfacq[4.b] - starting MultiDataSaver " << endl;
404 DsThr.start();
405 }
406 if (acpar.fgdoProc) { // demarrage (optionnel) du thread de monitoring
407 cout << " mfacq[4.c] - starting MonitorProc thread object " << endl;
408 PrThr.start();
409 }
410
411 // On attend avant de declencher la terminaison des threads
412 usleep(200000);
413
414 cout << "mfacq[5] - Waiting for threads to finish ... " << endl;
415 ethrdr.join();
416 if (fgviscal) VCGThr.join();
417 else DsThr.join();
418 mmgr.Stop();
419 if (acpar.fgdoProc) { // On n'attend la fin que si le thread a ete demarre (NMaxProc>0)
420 PrThr.join();
421 }
422 pMmgr=NULL;
423 cout << "mfacq[6] ---------- threads finished ---------------- " << endl;
424 tm.Split("Threads Finished");
425
426 mmgr.Print(cout);
427
428return 0;
429
430}
431
432
433/* --Nouvelle-Fonction-Utilitaire */
434MemZaction ConvertMemZ_Status2Action( MemZStatus st )
435{
436 MemZaction ra=MemZA_None;
437 switch (st) {
438 case MemZS_Filled:
439 ra=MemZA_Fill;
440 break;
441 case MemZS_Saved:
442 ra=MemZA_Save;
443 break;
444 case MemZS_Proc:
445 ra=MemZA_Proc;
446 break;
447 case MemZS_ProcA:
448 ra=MemZA_ProcA;
449 break;
450 case MemZS_ProcB:
451 ra=MemZA_ProcB;
452 break;
453 case MemZS_ProcC:
454 ra=MemZA_ProcC;
455 break;
456 case MemZS_ProcD:
457 ra=MemZA_ProcD;
458 break;
459 case MemZS_ProcE:
460 ra=MemZA_ProcE;
461 break;
462 case MemZS_ProcF:
463 ra=MemZA_ProcF;
464 break;
465 default:
466 ra=MemZA_None;
467 break;
468 }
469 return ra;
470}
Note: See TracBrowser for help on using the repository browser.