source: Sophya/trunk/AddOn/TAcq/racqueth.cc@ 3901

Last change on this file since 3901 was 3901, checked in by ansari, 15 years ago

Suite recherche de probleme de blocage avec transfert sur ethernet ds mfacq.cc (EthernetReader...), Reza 07/10/2010

File size: 30.0 KB
Line 
1//----------------------------------------------------------------
2// ---- classes de threads pour lecture (transfert DMA)
3// et ecriture disque pour acquisition BAORadio -----
4// LAL - R. Ansari - Juin/Juillet 2008
5//----------------------------------------------------------------
6
7#include "racqueth.h"
8
9#include <stdlib.h>
10#include <unistd.h>
11#include <fstream>
12#include <signal.h>
13#include "pexceptions.h"
14#include "ctimer.h"
15#include "timestamp.h"
16
17#include "pciewrap.h"
18#include "brpaqu.h"
19
20#include "resusage.h" // Pour mesure temps elapsed/CPU ...
21#include <sys/time.h> // pour gettimeofday
22
23////////////////////////////////////////////////////////////////////////////////////////////////////////
24//----------------------------------------------------------------------------------------------------------
25// Classe thread de lecture PCI-Express et recopie sur interface reseau (Ethernet)
26//----------------------------------------------------------------------------------------------------------
27
28// Si on veut avoir le protocole de controle (Hand-shake) entre l'emetteur et le recepteur,
29// decommenter la ligne suivante - Reza , 4 octobre 2010
30// #define BR_DO_HANDSHAKE 1
31
32/* --Methode-- */
33PCIEToEthernet::PCIEToEthernet(vector<PCIEWrapperInterface*> vec_pciw, vector<string>& destname, BRParList const& par, int portid)
34 : par_(par), vec_pciw_ (vec_pciw), destname_(destname), tcpportid_(portid)
35{
36 nmaxpaq_ = par_.MaxNbPaquets();
37 swapall_ = par_.GetDataConvFg(); // select data swap/format conversion for BRPaquet
38 stop_ = false;
39 packSize_ = par_.RecvPaquetSize();
40 packSizeInMgr_=par_.MMgrPaquetSize();
41 sizeFr_=par_.DMASizeBytes();
42 if (vec_pciw.size() > MAXNBFIB)
43 throw ParmError("PCIEToEthernet:ERROR/ vec_pciw.size() > MAXNBFIB ");
44 nbDma_= vec_pciw.size();
45
46 // true -> direct transfer of data to ethernet
47 fgdirectsend_ = (par_.pci2eth_fgdirect && (swapall_==BR_Copy) ) ? true : false;
48 char msg[BRTCPMSGLEN];
49 cout << "PCIEToEthernet/Info: Establishing TCP/IP connections ... " << endl;
50 for(size_t i=0; i<vec_pciw_.size(); i++) {
51 vector<ClientSocket> vskt;
52 vector<uint_8> verrcnt;
53 for(size_t j=0; j<destname_.size(); j++) {
54 ClientSocket sok(destname_[j], tcpportid_);
55 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
56 uint_4 dmasz = (fgdirectsend_) ? vec_pciw_[0]->TransferSize() : 0;
57 sprintf(msg,"BAORadio-PCIEToEthernet %d %d %d %d %d", par_.MMgrNbPaquet(), par_.MMgrPaquetSize(), dmasz, 0,i);
58 // 123456789012345678901234567890
59 sok.SendAll(msg,BRTCPMSGLEN);
60 sok.ReceiveAll(msg,BRTCPMSGLEN);
61 if (strncmp(msg,"BAORadio-EthernetReader-OK",26)!=0)
62 throw SocketException("PCIEToEthernet:ERROR/ Connection to EthernetReader not established ");
63 cout << " PCIEToEthernet: Ethernet connection established for DMA/fiber" << i << " with " << destname_[j] << endl;
64 vskt.push_back(sok);
65 verrcnt.push_back(0);
66 }
67 vvec_skt_.push_back(vskt);
68 vvec_errorcnt_.push_back(verrcnt);
69 vec_cntpaq_.push_back(0);
70 }
71 totrdsnd_ = 0;
72 SetPrintLevel(par.prtlevel_,par.prtmodulo_);
73}
74
75/* --Methode-- */
76PCIEToEthernet::~PCIEToEthernet()
77{
78 for(size_t i=0; i<vec_pciw_.size(); i++) {
79 vector<ClientSocket>& vskt = vvec_skt_[i];
80 vector<uint_8>& verrcnt = vvec_errorcnt_[i];
81 for(size_t j=0; j<destname_.size(); j++) {
82 cout << " ~PCIEToEthernet() closing socket, fiber/pcieNum=" << i << " ethernet_destNum=" << j
83 << " ErrorCount=" << verrcnt[j] << endl;
84 vskt[j].Close();
85 }
86 }
87}
88
89/* --Methode-- */
90void PCIEToEthernet::run()
91{
92
93 struct timeval tv1,tv2;
94 gettimeofday(&tv1, NULL);
95
96 cout << " PCIEToEthernet::run() - Starting , NMaxMemZones=" << par_.MaxNbBlocs()
97 << " MMgrNbPaquet=" << par_.MMgrNbPaquet() << " MMgr.PaqSiz=" << par_.MMgrPaquetSize()
98 << " DMA-Paqsize " << packSize_ << " " << BRPaquet::FmtConvToString(swapall_) << endl;
99 setRC(1);
100
101 // sigaddset(&act.sa_mask,SIGINT); // pour proteger le transfert DMA
102 //sigaction(SIGINT,&act,NULL);
103 uint_4 paqszmm = par_.MMgrPaquetSize();
104 uint_4 paqsz = packSize_;
105 uint_4 dmasz = vec_pciw_[0]->TransferSize();
106 //DEL vec_pciw_[0]->StartTransfers();
107
108 BRPaqChecker pcheck[MAXNBFIB]; // Verification/comptage des paquets
109 Byte* Datas[MAXNBFIB];
110 Byte* tampon[MAXNBFIB] ;
111 Byte* nexpaq[MAXNBFIB] ; // Pour recevevoir les paquets apres reduction de taille
112
113 Byte* predtampon=NULL; // tampon de recopie pour la reduction des tailles de paquets
114 Byte* nextpaq=NULL;
115 uint_4 off_acheval=0;
116
117 int nerrdma = 0;
118 int maxerrdma = 10;
119 bool fgarret = false;
120
121 // Initialisation des tampons pour recopie des paquets a cheval pour chaque DMA
122 for (int i=0;i< (int)nbDma_ ;i++) {
123 // cout << " DEBUG*AB paqsz=" << paqsz << " paqszmm=" << paqszmm << endl;
124 tampon[i]=nexpaq[i]=NULL;
125 tampon[i] = new Byte[paqsz];
126 nexpaq[i] = new Byte[paqszmm];
127 }
128
129 if (fgdirectsend_)
130 cout << " PCIEToEthernet::run() Direct transfer mode DMA to Ethernet ... " << endl;
131 bool fgredpaq=par_.fgreducpsize;
132 if (fgredpaq) {
133 cout << " PCIEToEthernet::run() - PaquetSizeReduction - RedSize=" << par_.redpqsize
134 << " Offset=" << par_.reducoffset << " " << ((par_.reducneedcopy)?"NeedCopy":"NOCopy")
135 << " " << BRPaquet::ReducActionToString(par_.pqreducmode) << endl;
136 predtampon = new Byte[paqsz];
137 }
138
139
140 uint_8 trnbytes[MAXNBFIB];
141 uint_4 npaqfait[MAXNBFIB];
142 for (int i=0;i< (int)nbDma_ ;i++) {
143 npaqfait[i]=0; trnbytes[i]=0;
144 }
145 // Attente du message GO des taches lecteurs
146 cout << " PCIEToEthernet::run() Waiting for GO message from EthernetReader's ..." << endl;
147 char msg[BRTCPMSGLEN];
148 for(size_t i=0; i<vec_pciw_.size(); i++) {
149 vector<ClientSocket>& vskt=vvec_skt_[i];
150 for(size_t j=0; j<vskt.size(); j++) {
151 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
152 vskt[j].ReceiveAll(msg,BRTCPMSGLEN);
153 if (strncmp(msg,"BAORadio-EthernetReader-Ready-GO",32)!=0) {
154 msg[BRTCPMSGLEN-1]='\0';
155 cout << "PCIEToEthernet:ERROR BAD Go for fiber=" << i << " dest=" << j << " msg: " << msg << endl;
156 throw SocketException("PCIEToEthernet:ERROR/ BAD Go message from EthernetReader ");
157 }
158 cout << " PCIEToEthernet::run() Received GO message for Fiber/DMA " << i << " from " << destname_[j] << endl;
159 }
160 }
161
162 cout << " PCIEToEthernet::run() Starting DMA to Ethernet transfers - NbFibers=NbDMA=" << vec_pciw_.size()
163 << " x NbEthDestinations=" << destname_.size() << endl;
164
165 // Byte* nextdma = locdata+((kmz%memgr.NbZones())*(paqsz*memgr.NbPaquets()));
166 uint_4 npaqfaitg = 0;
167
168 while (npaqfaitg < nmaxpaq_) { // Boucle global G
169 if (fgarret) break;
170 if (stop_) break;
171
172 // Lancement des DMA
173 for (int dma=0; dma < (int)nbDma_ ;dma++) vec_pciw_[dma]->StartTransfers();
174
175 // On pointe vers le debut de la zone a remplir aver le prochain DMA
176 //-- Zone memoire locale Byte* nextdma = buff+i*paqsz;
177
178 bool fgbaddma=false;
179 // On boucle sur les nbDma_ en attente de leurs terminaison
180 for (int dma=0; dma <(int) nbDma_ ;dma++) {
181 Datas[dma]=vec_pciw_[dma]->GetData();
182 if (Datas[dma] == NULL) { // No data Read in DMA
183 nerrdma ++; fgbaddma=true;
184 cout << "PCIEToEthernetChecker/Erreur Waiting for datas ..." << endl;
185 vec_pciw_[dma]->PrintStatus(cout);
186 if (nerrdma>=maxerrdma) { fgarret = true; break; }
187 }
188 }
189 if (fgbaddma) continue;
190 if ((prtlev_>0)&&(npaqfaitg%prtmodulo_==0))
191 cout << " PCIEToEthernet::run()/Info NPaqFait= " << npaqfaitg << endl;
192 if (fgdirectsend_) { // Pas de copie / reduction de taille de paquet, on rebalance tel quel ...
193 for (int fib=0; fib<(int) nbDma_ ;fib++) {
194 //DBG cout << " DEBUG*B fib=" << fib << " Datas[fib]=" << hex << Datas[fib] << dec << endl;
195 SendToTargets(fib, Datas[fib], dmasz);
196 trnbytes[fib] += (uint_8)dmasz;
197 npaqfait[fib] += (trnbytes[fib]/(uint_8)paqszmm);
198 if (fib==nbDma_-1) npaqfaitg += (trnbytes[fib]/(uint_8)paqszmm);
199 trnbytes[fib] = trnbytes[fib]%(uint_8)paqszmm;
200 }
201 continue; // On bypasse le reste
202 }
203
204 // Si on arrive ici, c'est qu'il faut copier / reduire les paquets ....
205 uint_4 curoff=0;
206 //1- On traite le paquet a cheval, rempli partiellement avec le DMA d'avant si necessaire pour les n fibres
207 if (off_acheval > 0) { // IF Numero B
208 if ((paqsz-off_acheval)< dmasz) { // IF Numero A
209 for(uint_4 fib=0; fib<nbDma_; fib++)
210 memcpy((void *)((tampon[fib])+off_acheval), (void *)Datas[fib], paqsz-off_acheval);
211 curoff = paqsz-off_acheval; off_acheval = 0;
212 for(uint_4 fib=0; fib<nbDma_; fib++) {
213 // CHECK S'il faut faire une reduction de taille de paquet
214 if (fgredpaq) { // reduction taille de paquet
215 if (par_.reducneedcopy) {
216 BRPaquet paqc1(tampon[fib], predtampon, paqsz, swapall_);
217 BRPaquet paqc2(nexpaq[fib], par_.redpqsize);
218 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
219 }
220 else {
221 BRPaquet paqc1(tampon[fib], paqsz);
222 BRPaquet paqc2(nexpaq[fib], par_.redpqsize);
223 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
224 }
225 }
226 else {
227 BRPaquet paqc(tampon[fib], nexpaq[fib], paqsz, swapall_);
228 }
229 BRPaquet paq(nexpaq[fib], packSizeInMgr_);
230 SendToTargets(fib, nexpaq[fib], packSizeInMgr_);
231 npaqfait[fib]++;
232 if (fib==nbDma_-1) npaqfaitg++; // Ne pas oublier le compteur de paquets faits
233 pcheck[fib].Check(paq); // Verification du paquet / FrameCounter
234 }
235 }
236 else { // se rapporte au IF numero A
237 for(uint_4 fib=0; fib<nbDma_; fib++)
238 memcpy((void *)(tampon[fib]+off_acheval), (void *)Datas[fib], dmasz);
239 curoff =dmasz;
240 off_acheval = (dmasz+off_acheval);
241 }
242 } // Fin IF Numero B
243
244 //2- On traite les paquets complets qui se trouvent dans la zone du DMA
245 while ((curoff+paqsz)<=dmasz) { // while numero C
246 // if ((dma==nbDma_-1)&&(npaqfait >= nmax_* memgr.NbPaquets())) break;
247 // CHECK S'il faut faire une reduction de taille de paquet
248 for(uint_4 fib=0; fib<nbDma_; fib++) {
249 if (fgredpaq) { // reduction taille de paquet
250 if (par_.reducneedcopy) {
251 BRPaquet paqc1(Datas[fib]+curoff, predtampon, paqsz, swapall_);
252 BRPaquet paqc2(nexpaq[fib], par_.redpqsize);
253 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
254 }
255 else {
256 BRPaquet paqc1(Datas[fib]+curoff, paqsz);
257 BRPaquet paqc2(nexpaq[fib], par_.redpqsize);
258 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
259 }
260 }
261 else {
262 BRPaquet paqc(Datas[fib]+curoff, nexpaq[fib], paqsz, swapall_);
263 }
264 BRPaquet paq(nexpaq[fib], packSizeInMgr_);
265 SendToTargets(fib, nexpaq[fib], packSizeInMgr_);
266 npaqfait[fib]++;
267 if (fib==nbDma_-1) npaqfaitg++; // Ne pas oublier le compteur de paquets faits
268 pcheck[fib].Check(paq); // Verification du paquet / FrameCounter
269 }
270 curoff += paqsz; // On avance l'index dans le buffer du DMA
271 } // -- FIN traitement des paquets complets ds un DMA - FIN du while numero C
272 //3- On copie si besoin la fin du DMA dans la zone tampon
273 if (curoff < dmasz) { // IF numero D
274 off_acheval = dmasz-curoff;
275 for(uint_4 fib=0; fib<nbDma_; fib++)
276 memcpy(tampon[fib], (void*)(Datas[fib]+curoff), off_acheval);
277 // ne sert a rien curoff += off_acheval;
278 } // FIN du if numero D
279 } // FIN Boucle global G
280
281 setRC(0);
282 gettimeofday(&tv2, NULL);
283 double tmelaps2 = (tv2.tv_sec-tv1.tv_sec)*1000.+(tv2.tv_usec-tv1.tv_usec)/1000.;
284 if (tmelaps2<0.1) tmelaps2=0.1;
285 cout << " ---------- PCIEToEthernet::run()-End summary NPaqFait=" << npaqfaitg
286 << " TotSend (kb)=" << totrdsnd_/1024 << "------ " << endl;
287 for (int dma=0; dma < (int)nbDma_ ;dma++) {
288 cout << " --Fib=" << dma << " NPaqFait=" << npaqfait[dma] << " TotDMATransfer="
289 << vec_pciw_[dma]->TotTransferBytes()/1024
290 << " ElapsTime=" << tmelaps2 << " ms ->"
291 << (double)vec_pciw_[dma]->TotTransferBytes()/tmelaps2 << " kb/s" << endl;
292
293 vector<ClientSocket>& vskt = vvec_skt_[dma];
294 cout << " TotEthTransfer=";
295 for(size_t j=0; j<vskt.size(); j++)
296 cout << vskt[j].NBytesSent()/1024 << " , ";
297 cout << " kb" << endl;
298
299 if (!fgdirectsend_) pcheck[dma].Print(cout);
300 }
301 cout << " --------------------------------------------------------------------" << endl;
302
303 for (int i=0;i< (int)nbDma_ ;i++) {
304 delete[] tampon[i];
305 delete[] nexpaq[i];
306 }
307 if ((fgredpaq)&&predtampon) delete[] predtampon;
308
309 //DBG cout << " fin thread ========================" <<endl;
310 return;
311}
312
313
314/* --Methode-- */
315void PCIEToEthernet::Stop()
316{
317 // cout << " PCIEReaderChecker::stop() ........ STOP" <<endl;
318 stop_ = true;
319
320}
321
322static long cnt_prt=0;
323/* --Methode-- */
324size_t PCIEToEthernet::SendToTargets(int fib, Byte* data, size_t len)
325{
326 //DBG cout << " SendToTargets/DBG" << cnt_prt << " len=" << len << " data=" << hex << (unsigned long)data << dec << endl; cnt_prt++;
327 vector<ClientSocket>& vskt = vvec_skt_[fib];
328 vector<uint_8>& verrcnt = vvec_errorcnt_[fib];
329 char msg[BRTCPMSGLEN2];
330 size_t rc=0;
331 if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
332 sprintf(msg,"PCIEToEthernet-SendCnt %d",vec_cntpaq_[fib]);
333 for(size_t j=0; j<vskt.size(); j++) {
334 vskt[j].SendAll(msg, BRTCPMSGLEN2);
335 }
336 }
337 // Envoi des donnees (paquets)
338 for(size_t j=0; j<vskt.size(); j++) {
339 rc += vskt[j].SendAll((const char *)data, len);
340 totrdsnd_ += len;
341 }
342 vec_cntpaq_[fib]++;
343#ifdef BR_DO_HANDSHAKE
344 // attente message de hand-shake
345 if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
346 for(size_t j=0; j<vskt.size(); j++) {
347 //DBG cout << "SendToTargets/DEBUG-B attente -RecvCnt fib=" << fib << " npaq=" << vec_cntpaq_[fib] << endl;
348 vskt[j].ReceiveAll(msg, BRTCPMSGLEN2);
349 msg[BRTCPMSGLEN2-1]='\0';
350 if (strncmp(msg,"EthernetReader-RecvCnt",22) != 0) {
351 cout << " PCIEToEthernet::SendToTargets/ BAD HandShake, Fiber=" << fib
352 << " NumSocket=" << j << " msg: " << msg << endl;
353 verrcnt[j]++;
354 }
355 }
356 }
357#endif
358 if (vec_cntpaq_[fib]==nmaxpaq_) { // fin de l'envoi pour cette fibre
359 sprintf(msg,"PCIEToEthernet-Send-END %d",vec_cntpaq_[fib]);
360 for(size_t j=0; j<vskt.size(); j++) {
361 vskt[j].SendAll(msg, BRTCPMSGLEN2);
362 }
363
364 }
365 return rc;
366}
367//------------------------------------------------------------------
368// Classe thread de lecture sur interface reseau (Ethernet)
369//------------------------------------------------------------------
370
371/* --Methode-- */
372EthernetReader::EthernetReader(RAcqMemZoneMgr& mem, BRParList const& par, int portid)
373 : memgr_(mem), par_(par), stop_(false), tcpportid_(portid)
374{
375 totnbytesrd_ = 0;
376 totnpaqrd_ = 0;
377 totsamefc_ = 0;
378 totnbresync_ = 0;
379
380 if (memgr_.NbFibres() > MAXANAFIB)
381 throw BAORadioException("EthernetReader::EthernetReader/ NbFibres>MAXANAFIB ");
382 if (par_.ethr_nlink != memgr_.NbFibres())
383 throw BAORadioException("EthernetReader::EthernetReader/ NbFibres != ethr_nlink");
384 SetPrintLevel(par.prtlevel_,par.prtmodulo_);
385
386 SetReadMode(); // definitition du mode de lecture des liens
387 WaitENDMsg4Terminate();
388
389 packsize_=memgr_.PaqSize();
390 mid_=-2;
391 mmbuf_=NULL;
392 max_targ_npaq = memgr_.NbPaquets();
393 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) mmbufib_[fib]=NULL;
394
395 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
396 vpaq_.push_back(BRPaquet(NULL,memgr_.PaqSize()));
397 vpchk_.push_back(BRPaqChecker(true,0));
398 curfc_.push_back(0);
399 totnpqrd_.push_back(0);
400 totnpqok_.push_back(0);
401 }
402 srv_sokp_ = new ServerSocket(tcpportid_, par_.ethr_nlink);
403 char msg[BRTCPMSGLEN];
404 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
405 Socket sok=srv_sokp_->WaitClientConnection();
406 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
407 sok.ReceiveAll(msg,BRTCPMSGLEN);
408 if (strncmp(msg,"BAORadio-PCIEToEthernet",23)!=0)
409 throw SocketException("EthernetReader:ERROR/ Bad message from PCIEToEthernet client !");
410 int ia,ib,ic,id,ie;
411 sscanf(msg+25,"%d %d %d %d %d",&ia,&ib,&ic,&id,&ie);
412 // if (((uint_4)ia!=par_.MMgrNbPaquet()) || ((uint_4)ib!=par_.MMgrPaquetSize())) {
413 if (((uint_4)ia!=memgr_.NbPaquets()) || ((uint_4)ib!=memgr_.PaqSize())) {
414 strcpy(msg,"BAORadio-EthernetReader-BAD MMgrNbPaquet/MMgrPaquetSize()");
415 sok.SendAll(msg,BRTCPMSGLEN);
416 throw SocketException("EthernetReader:ERROR/ Bad MMgrNbPaquet/MMgrPaquetSize() from PCIEToEthernet client !");
417 }
418 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
419 strcpy(msg,"BAORadio-EthernetReader-OK");
420 sok.SendAll(msg,BRTCPMSGLEN);
421 vsok_.push_back(sok);
422 cout << " EthernetReader/Info connection/link" << fib << " established." << endl;
423 vec_cntpaq_.push_back(0);
424 vec_fgsokend_.push_back(false);
425 }
426 gl_fgsokend = true;
427 stopreason_="??Unknown??";
428 dummybuff_ = new char[packsize_];
429}
430
431/* --Methode-- */
432EthernetReader::~EthernetReader()
433{
434 if (prtlev_>0) cout << " ~EthernetReader() , TotNPaqRd=" << totnpaqrd_ << " TotNbReSync=" << totnbresync_ << endl;
435 for(size_t j=0; j<vsok_.size(); j++) {
436 if (prtlev_>1)
437 cout << " ~EthernetReader() closing sok[" << j << "] CntPaq=" << vec_cntpaq_[j]
438 << " TotNPaqRd=" << totnpqrd_[j] << " TotNPaqOK=" << totnpqok_[j] << endl;
439 vsok_[j].Close();
440 }
441 srv_sokp_->Close();
442 delete srv_sokp_;
443 if (dummybuff_) delete[] dummybuff_;
444}
445
446/* --Methode-- */
447void EthernetReader::run()
448{
449 setRC(1);
450 try {
451 TimeStamp ts;
452 Timer tm("EthernetReader", false);
453 cout << " EthernetReader::run() - Starting " << ts << " NbFibres()=" << memgr_.NbFibres()
454 << " PaqSize() = " << memgr_.PaqSize() << endl;
455 cout << " ...ReadMode: " << ((rdsamefc_)?"Paquets With SameFrameCounter":"All OK paquets") << endl;
456
457 cout << " Sending GO message to all PCIEToEthernet clients ... " << endl;
458 char msg[BRTCPMSGLEN];
459 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
460 strcpy(msg,"BAORadio-EthernetReader-Ready-GO");
461 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
462 cout << " EthernetReader/Debug - Sending GO message to sender " << fib << endl;
463 vsok_[fib].SendAll(msg,BRTCPMSGLEN);
464 }
465 Byte* nextpaq=NULL;
466 bool fgok=true;
467 while (fgok&&(totnpaqrd_<par_.MaxNbPaquets())) {
468 if (stop_) {
469 if (prtlev_>0)
470 cout << ">S>S>S> EthernetReader::run() / Stopping, through Stop() method" << endl;
471 stopreason_="EthernetReader::run() / Stopping, through Stop() method";
472 break;
473 }
474 if ( MoveToNextTarget() ) {
475 cout << "EthernetReader::run()/Error-A- MoveToNextTarget() returned true ->STOP 9" << endl;
476 setRC(7); fgok=false; break;
477 }
478 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
479 nextpaq=GetPaquetTarget(fib);
480 if (nextpaq == NULL) { // Cela ne devrait pas arriver
481 cout << "EthernetReader::run()/Error-A2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl;
482 setRC(9); fgok=false; break;
483 }
484 vpaq_[fib].Set(nextpaq);
485 }
486 if (ReadNextAllFibers()) { fgok=false; break; }
487 totnpaqrd_++;
488 if ((prtlev_>0)&&(totnpaqrd_%prtmodulo_==0)) {
489 cout << "EthernetReader: NbPaq/Link=" << totnpaqrd_ << " NSameFC="
490 << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0]
491 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
492 }
493 if (totnpaqrd_>=par_.MaxNbPaquets()) {
494 if (prtlev_>0)
495 cout << ">S>S>S> EthernetReader::run() / Stopping, totnpaqrd_>=par_.MaxNbPaquets()" << endl;
496 stopreason_="EthernetReader::run() / Stopping, totnpaqrd_>=par_.MaxNbPaquets()";
497 }
498 }
499
500 MoveToNextTarget(); // Pour faire traiter le dernier paquet si plein
501 MZoneManage(true); // Nettoyage final
502
503 cout << " ---------------------------------------------------------------------------- " << endl;
504 cout << "--- EthernetReader::run() END, StopReason: " << stopreason_ << " ---" << endl;
505 ts.SetNow();
506 tm.SplitQ();
507 cout << " END reading TotNPaq/Link=" << totnpaqrd_ << " :" << ts << endl;
508 cout << "... NbPaq/Link=" << totnpaqrd_ << " NSameFC="
509 << totsamefc_ << " / NPaqLink[0]_Read=" << totnpqrd_[0]
510 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
511
512 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
513 int perc=0;
514 if (totnpqrd_[fib]>0) perc=100*totsamefc_/totnpqrd_[fib];
515 cout << " Fiber" << fib << " TotNPaqRd=" << totnpqrd_[fib] << " TotNPaqOK=" << totnpqok_[fib]
516 << " FracSameFC=" << perc << " %" << endl;
517 if (prtlev_ > 0) vpchk_[fib].Print(cout);
518 }
519 cout << " TotalEthernetRead= " << totnbytesrd_/(1024*1024) << " MBytes Ethernet-Read rate= "
520 << (double)(totnbytesrd_)/1024./tm.PartialElapsedTimems() << " MB/s" << endl;
521 cout << " EthernetReader::run()/Timing: \n";
522 tm.Print();
523 cout << " ---------------------------------------------------------------------------- " << endl;
524
525 usleep(50000); // Attente de traitement du dernier paquet
526 memgr_.Stop(); // Arret
527 if (waitendmsg_) {
528 CleanUpAllSockets(); // On lit tous les liens jusqu'a la reception du message END
529 cout << " EthernetReader::run(): done CleanUpAllSockets()" << endl;
530 }
531 } // Fin du bloc try
532 catch (std::exception& exc) {
533 cout << " EthernetReader::run()/catched execption msg= " << exc.what() << endl;
534 setRC(3);
535 return;
536 }
537 catch(...) {
538 cout << " EthernetReader::run()/catched unknown ... exception " << endl;
539 setRC(4);
540 return;
541 }
542 setRC(0);
543 return;
544}
545
546/* --Methode-- */
547bool EthernetReader::ReadNextAllFibers()
548{
549 if (sfc_maxresync_>0) return ReadNextAllFibersWithSync();
550
551 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
552 if (ReadNext(fib)) return true; // probleme
553 }
554 if (!rdsamefc_ || (memgr_.NbFibres()<2)) {
555 uint_8 cfc=curfc_[0];
556 bool fgsamefc=true;
557 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
558 if (curfc_[fib]!=cfc) fgsamefc=false;
559 }
560 if (fgsamefc) totsamefc_++;
561 return false; // c'est OK
562 }
563 // On va essayer de lire jusqu'a avoir same_frame_counter
564 uint_8 cfc=curfc_[0];
565 bool fgsamefc=true;
566 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
567 if (curfc_[fib]!=cfc) {
568 fgsamefc=false;
569 if (curfc_[fib] > cfc) cfc=curfc_[fib];
570 }
571 }
572 if (fgsamefc) {
573 totsamefc_++; return false; // c'est OK , same framecounter
574 }
575 else { // else !fgsame
576 bool encorerd=true;
577 uint_4 ntrd=0;
578 while(encorerd) {
579 encorerd=false;
580 for(uint_4 fib=0; fib<memgr_.NbFibres(); fib++) {
581 if (curfc_[fib]>=cfc) continue;
582 if (ReadNext(fib)) return true; // probleme
583 encorerd=true;
584 }
585 ntrd++;
586 if (ntrd>sfc_maxdpc_) {
587 if (prtlev_>0)
588 cout << ">S>S>S> EthernetReader::ReadNextAllFibers(" << fib << ")/ Stopping, Nb_ReadNext="
589 << ntrd << " > sfc_maxdpc_" << endl;
590 stopreason_="ReadNextAllFibers() StopReason: failed to get SameFC Nb_ReadNext>sfc_maxdpc_";
591 return true;
592 }
593 }
594 } // fin de else !fgsame
595
596 return true; // probleme
597}
598
599/* --Methode-- */
600bool EthernetReader::ReadNext(int fib)
601{
602 bool fggood=false;
603 while(!fggood) {
604 ReceiveFromSocket(fib, (char *)vpaq_[fib].Begin(), packsize_);
605 if (vec_fgsokend_[fib]) { // fin de reception sur le lien
606 if (prtlev_>0)
607 cout << ">S>S>S> EthernetReader::ReadNext(" << fib << ")/ Stopping, reason EndReceive fgsokend_="
608 << vec_fgsokend_[fib] << endl;
609 stopreason_="ReadNext() StopReason: EndReceive signaled on link";
610 return true;
611 }
612 totnbytesrd_+=packsize_;
613 totnpqrd_[fib]++;
614 fggood = vpchk_[fib].Check(vpaq_[fib],curfc_[fib]);
615 }
616 totnpqok_[fib]++;
617 return false;
618}
619
620/* --Methode-- */
621void EthernetReader::CleanUpAllSockets()
622{
623 bool fgallfinished=false;
624 while (!fgallfinished) {
625 fgallfinished=true;
626 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
627 if (vec_fgsokend_[fib]>0) continue;
628 ReceiveFromSocket(fib,dummybuff_,packsize_);
629 fgallfinished=false;
630 }
631 }
632 return;
633}
634
635/* --Methode-- */
636size_t EthernetReader::ReceiveFromSocket(int fib, char* data, size_t len)
637{
638 size_t rc =0;
639 if (vec_fgsokend_[fib]) { // la lecture est finie
640 memset(data, 0, len);
641 return 0;
642 }
643
644 if (vec_cntpaq_[fib]%memgr_.NbPaquets() == 0) {
645 char msg[BRTCPMSGLEN2];
646 msg[0]='\0';
647 vsok_[fib].ReceiveAll(msg, BRTCPMSGLEN2);
648 // if (strncmp(msg,"PCIEToEthernet-SendCnt",22) != 0)
649 // cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake message : " << msg << endl;
650 bool fgbadhsm=true;
651 if (strncmp(msg,"PCIEToEthernet-Send",19)==0) {
652 if (strncmp(msg+19,"Cnt",3)==0) fgbadhsm=false;
653 else if (strncmp(msg+19,"-END",4)==0) {
654 fgbadhsm=false;
655 vec_fgsokend_[fib]=1; gl_fgsokend=true;
656 //DBG cout << "ReceiveFromSocket/DEBUG -END msg received " << endl;
657 if (prtlev_>2)
658 cout << " EthernetReader::ReceiveFromSocket/ Receive on LinkNo" << fib << " Ended OK" << endl;
659 }
660 }
661 if (fgbadhsm) {
662 cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake, LinkNo " << fib << " msg: " << msg << endl;
663 vec_fgsokend_[fib]=2;
664 }
665 }
666
667 if (vec_fgsokend_[fib]>0) { // la lecture est finie
668 memset(data, 0, len);
669 return 0;
670 }
671
672 rc += vsok_[fib].ReceiveAll(data, len);
673 vec_cntpaq_[fib]++;
674
675#ifdef BR_DO_HANDSHAKE
676 // Envoi de message de hand-shake
677 if (vec_cntpaq_[fib]%memgr_.NbPaquets() == 0) {
678 char msg[BRTCPMSGLEN2];
679 sprintf(msg,"EthernetReader-RecvCnt %d",vec_cntpaq_[fib]);
680 vsok_[fib].SendAll(msg, BRTCPMSGLEN2);
681 //DBG cout << " ReceiveFromSocket/DEBUG-B msg=" << msg << endl;
682 }
683#endif
684 return rc;
685}
686
687/* --Methode-- */
688bool EthernetReader::ReadNextAllFibersWithSync()
689{
690 if (rdsamefc_&&(sfc_maxresync_>0)) {
691 if (SkipAndSyncLinks()>8) return true;
692 }
693 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
694 if (ReadNext(fib)) return true; // probleme
695 }
696 if (!rdsamefc_ || (memgr_.NbFibres()<2)) {
697 uint_8 cfc=curfc_[0];
698 bool fgsamefc=true;
699 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
700 if (curfc_[fib]!=cfc) fgsamefc=false;
701 }
702 if (fgsamefc) totsamefc_++;
703 return false; // c'est OK
704 }
705 // On va essayer de lire jusqu'a avoir same_frame_counter
706 bool echec=true;
707 int nbechec=0;
708 while (echec) {
709 if (nbechec>sfc_maxresync_) {
710 if (prtlev_>0)
711 cout << ">S>S>S> EthernetReader::ReadNextAllFibersWithSync()/ Stopping, reason NbEchec>(NbMax_Resync="
712 << sfc_maxresync_ << ")" << endl;
713 stopreason_="ReadNextAllFibersWithSync() , StopReason: NbEchec>NbMax_Resyn";
714 return true;
715 }
716 uint_8 cfc=curfc_[0];
717 bool fgsamefc=true;
718 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
719 if (curfc_[fib]!=cfc) {
720 fgsamefc=false;
721 if (curfc_[fib] > cfc) cfc=curfc_[fib];
722 }
723 }
724 if (fgsamefc) {
725 totsamefc_++; echec=false; return false; // c'est OK , same framecounter
726 }
727 else { // else !fgsame
728 if (nbechec>0) {
729 int rcres=SkipAndSyncLinks();
730 if (rcres>8) return true;
731 else if (rcres==1) { // Il faut relire les fibres
732 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
733 if (ReadNext(fib)) return true; // probleme
734 }
735 fgsamefc=true; cfc=curfc_[0];
736 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
737 if (curfc_[fib]!=cfc) {
738 fgsamefc=false;
739 if (curfc_[fib] > cfc) cfc=curfc_[fib];
740 }
741 }
742 if (fgsamefc) {
743 totsamefc_++; echec=false; return false; // c'est OK , same framecounter
744 }
745 } // fin de if (rcres==1)
746 }
747
748 if (prtlev_>5)
749 cout << " EthernetReader::ReadNextAllFibersWithSync()/DEBUG NbEchec=" << nbechec
750 << " cfc=" << cfc << endl;
751
752 for(uint_4 fib=0; fib<memgr_.NbFibres(); fib++) {
753 for(uint_4 ntryrd=0; ntryrd<sfc_maxdpc_; ntryrd++) { // on tente un maximum de
754 if (curfc_[fib]>=cfc) break;
755 if (ReadNext(fib)) return true; // probleme
756 }
757 }
758
759 nbechec++;
760 } // fin de else !fgsame
761 } // fin de while(echec): lecture jusqu'a same_frame_counter
762
763 return true; // probleme
764}
765
766/* --Methode-- */
767int EthernetReader::SkipAndSyncLinks()
768{
769 uint_8 minnumpaq=vec_cntpaq_[0]; // minimum des nombres de paquets lus sur les differents liens
770 size_t minnp_fid=0; // numero de fibre correspondant au minimum de paquets lus
771 uint_8 maxnumpaq=vec_cntpaq_[0]; // maximum des nombres de paquets lus sur les differents liens
772 size_t maxnp_fid=0; // numero de fibre correspondant au maximum de paquets lus
773 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
774 if (vec_cntpaq_[fib]<=minnumpaq) {
775 minnumpaq=vec_cntpaq_[fib]; minnp_fid=fib;
776 }
777 if (vec_cntpaq_[fib]>=maxnumpaq) {
778 maxnumpaq=vec_cntpaq_[fib]; maxnp_fid=fib;
779 }
780 }
781 if (!rdsamefc_ && (minnumpaq!=maxnumpaq))
782 cout << " EthernetReader::SkipAndSyncLinks()/BUG or PROBLEM - ReadAllOK and min<>maxnumpaq="
783 << minnumpaq << "," << maxnumpaq << " !!!" << endl;
784 if ((maxnumpaq-minnumpaq)<sfc_maxdpc_) return 0;
785
786 if (prtlev_>4)
787 cout << " EthernetReader::SkipAndSyncLinks() min,maxnumpaq=" << minnumpaq << "," << maxnumpaq
788 << " min,maxnp_fid=" << minnp_fid << "," << maxnp_fid << " sfc_maxdpc_=" << sfc_maxdpc_ << endl;
789
790 for(size_t fib=0; fib<memgr_.NbFibres(); fib++) {
791 while(vec_cntpaq_[fib]<maxnumpaq) {
792 if (vec_fgsokend_[fib]>0) return 9;
793 ReceiveFromSocket(fib,dummybuff_,packsize_);
794 }
795 }
796 totnbresync_++;
797 if (prtlev_>3) {
798 cout << " EthernetReader::SkipAndSyncLinks() NbResync=" << totnbresync_ << " After Sync: vec_cntpaq_[fib]=";
799 for(size_t ii=0; ii<vec_cntpaq_.size(); ii++) cout << vec_cntpaq_[ii] << " , ";
800 cout << endl;
801 }
802
803 return 1;
804}
805
806/* --Methode-- */
807bool EthernetReader::MZoneManage(bool fgclean) // Retourne true si probleme
808{
809 /* Pour debug
810 cout << " EthernetReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_
811 << " max_targ_npaq=" << max_targ_npaq << endl;
812 */
813 if (mid_ >= 0) {
814 if (fgclean) memgr_.FreeMemZone(mid_, MemZS_Free);
815 else memgr_.FreeMemZone(mid_, MemZS_Filled);
816 }
817 mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2;
818 for (int fib=0;fib<(int)memgr_.NbFibres() ;fib++) mmbufib_[fib]=NULL;
819 if (fgclean) return false;
820 mid_ = memgr_.FindMemZoneId(MemZA_Fill);
821 mmbuf_ = memgr_.GetMemZone(mid_);
822 if (mmbuf_==NULL) return true;
823 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++)
824 mmbufib_[fib]=memgr_.GetMemZone(mid_,fib);
825 return false;
826}
Note: See TracBrowser for help on using the repository browser.