source: Sophya/trunk/AddOn/TAcq/racqueth.cc@ 3897

Last change on this file since 3897 was 3897, checked in by ansari, 15 years ago

Amelioration des classes PCIEToEthernet et EthernetReader pour eviter des situations de blocage lorsque lecture avec ForceSameFrameCounter, Reza 04/10/2010

File size: 27.5 KB
Line 
1//----------------------------------------------------------------
2// ---- classes de threads pour lecture (transfert DMA)
3// et ecriture disque pour acquisition BAORadio -----
4// LAL - R. Ansari - Juin/Juillet 2008
5//----------------------------------------------------------------
6
7#include "racqueth.h"
8
9#include <stdlib.h>
10#include <unistd.h>
11#include <fstream>
12#include <signal.h>
13#include "pexceptions.h"
14#include "ctimer.h"
15#include "timestamp.h"
16
17#include "pciewrap.h"
18#include "brpaqu.h"
19
20#include "resusage.h" // Pour mesure temps elapsed/CPU ...
21#include <sys/time.h> // pour gettimeofday
22
23////////////////////////////////////////////////////////////////////////////////////////////////////////
24//----------------------------------------------------------------------------------------------------------
25// Classe thread de lecture PCI-Express et recopie sur interface reseau (Ethernet)
26//----------------------------------------------------------------------------------------------------------
27
28// Si on veut avoir le protocole de controle (Hand-shake) entre l'emetteur et le recepteur,
29// decommenter la ligne suivante - Reza , 4 octobre 2010
30// #define BR_DO_HANDSHAKE 1
31
32/* --Methode-- */
33PCIEToEthernet::PCIEToEthernet(vector<PCIEWrapperInterface*> vec_pciw, vector<string>& destname, BRParList const& par, int portid)
34 : par_(par), vec_pciw_ (vec_pciw), destname_(destname), tcpportid_(portid)
35{
36 nmaxpaq_ = par_.MaxNbPaquets();
37 swapall_ = par_.GetDataConvFg(); // select data swap/format conversion for BRPaquet
38 stop_ = false;
39 packSize_ = par_.RecvPaquetSize();
40 packSizeInMgr_=par_.MMgrPaquetSize();
41 sizeFr_=par_.DMASizeBytes();
42 if (vec_pciw.size() > MAXNBFIB)
43 throw ParmError("PCIEToEthernet:ERROR/ vec_pciw.size() > MAXNBFIB ");
44 nbDma_= vec_pciw.size();
45
46 // true -> direct transfer of data to ethernet
47 fgdirectsend_ = (par_.pci2eth_fgdirect && (swapall_==BR_Copy) ) ? true : false;
48 char msg[BRTCPMSGLEN];
49 cout << "PCIEToEthernet/Info: Establishing TCP/IP connections ... " << endl;
50 for(size_t i=0; i<vec_pciw_.size(); i++) {
51 vector<ClientSocket> vskt;
52 vector<uint_8> verrcnt;
53 for(size_t j=0; j<destname_.size(); j++) {
54 ClientSocket sok(destname_[j], tcpportid_);
55 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
56 uint_4 dmasz = (fgdirectsend_) ? vec_pciw_[0]->TransferSize() : 0;
57 sprintf(msg,"BAORadio-PCIEToEthernet %d %d %d %d %d", par_.MMgrNbPaquet(), par_.MMgrPaquetSize(), dmasz, 0,i);
58 // 123456789012345678901234567890
59 sok.SendAll(msg,BRTCPMSGLEN);
60 sok.ReceiveAll(msg,BRTCPMSGLEN);
61 if (strncmp(msg,"BAORadio-EthernetReader-OK",26)!=0)
62 throw SocketException("PCIEToEthernet:ERROR/ Connection to EthernetReader not established ");
63 cout << " PCIEToEthernet: Ethernet connection established for DMA/fiber" << i << " with " << destname_[j] << endl;
64 vskt.push_back(sok);
65 verrcnt.push_back(0);
66 }
67 vvec_skt_.push_back(vskt);
68 vvec_errorcnt_.push_back(verrcnt);
69 vec_cntpaq_.push_back(0);
70 }
71 totrdsnd_ = 0;
72 SetPrintLevel(par.prtlevel_,par.prtmodulo_);
73}
74
75/* --Methode-- */
76PCIEToEthernet::~PCIEToEthernet()
77{
78 for(size_t i=0; i<vec_pciw_.size(); i++) {
79 vector<ClientSocket>& vskt = vvec_skt_[i];
80 vector<uint_8>& verrcnt = vvec_errorcnt_[i];
81 for(size_t j=0; j<destname_.size(); j++) {
82 cout << " ~PCIEToEthernet() closing socket, fiber/pcieNum=" << i << " ethernet_destNum=" << j
83 << " ErrorCount=" << verrcnt[j] << endl;
84 vskt[j].Close();
85 }
86 }
87}
88
89/* --Methode-- */
90void PCIEToEthernet::run()
91{
92
93 struct timeval tv1,tv2;
94 gettimeofday(&tv1, NULL);
95
96 cout << " PCIEToEthernet::run() - Starting , NMaxMemZones=" << par_.MaxNbBlocs()
97 << " MMgrNbPaquet=" << par_.MMgrNbPaquet() << " MMgr.PaqSiz=" << par_.MMgrPaquetSize()
98 << " DMA-Paqsize " << packSize_ << " " << BRPaquet::FmtConvToString(swapall_) << endl;
99 setRC(1);
100
101 // sigaddset(&act.sa_mask,SIGINT); // pour proteger le transfert DMA
102 //sigaction(SIGINT,&act,NULL);
103 uint_4 paqszmm = par_.MMgrPaquetSize();
104 uint_4 paqsz = packSize_;
105 uint_4 dmasz = vec_pciw_[0]->TransferSize();
106 //DEL vec_pciw_[0]->StartTransfers();
107
108 BRPaqChecker pcheck[MAXNBFIB]; // Verification/comptage des paquets
109 Byte* Datas[MAXNBFIB];
110 Byte* tampon[MAXNBFIB] ;
111 Byte* nexpaq[MAXNBFIB] ; // Pour recevevoir les paquets apres reduction de taille
112
113 Byte* predtampon=NULL; // tampon de recopie pour la reduction des tailles de paquets
114 Byte* nextpaq=NULL;
115 uint_4 off_acheval=0;
116
117 int nerrdma = 0;
118 int maxerrdma = 10;
119 bool fgarret = false;
120
121 // Initialisation des tampons pour recopie des paquets a cheval pour chaque DMA
122 for (int i=0;i< (int)nbDma_ ;i++) {
123 // cout << " DEBUG*AB paqsz=" << paqsz << " paqszmm=" << paqszmm << endl;
124 tampon[i]=nexpaq[i]=NULL;
125 tampon[i] = new Byte[paqsz];
126 nexpaq[i] = new Byte[paqszmm];
127 }
128
129 if (fgdirectsend_)
130 cout << " PCIEToEthernet::run() Direct transfer mode DMA to Ethernet ... " << endl;
131 bool fgredpaq=par_.fgreducpsize;
132 if (fgredpaq) {
133 cout << " PCIEToEthernet::run() - PaquetSizeReduction - RedSize=" << par_.redpqsize
134 << " Offset=" << par_.reducoffset << " " << ((par_.reducneedcopy)?"NeedCopy":"NOCopy")
135 << " " << BRPaquet::ReducActionToString(par_.pqreducmode) << endl;
136 predtampon = new Byte[paqsz];
137 }
138
139
140 uint_8 trnbytes[MAXNBFIB];
141 uint_4 npaqfait[MAXNBFIB];
142 for (int i=0;i< (int)nbDma_ ;i++) {
143 npaqfait[i]=0; trnbytes[i]=0;
144 }
145 // Attente du message GO des taches lecteurs
146 cout << " PCIEToEthernet::run() Waiting for GO message from EthernetReader's ..." << endl;
147 char msg[BRTCPMSGLEN];
148 for(size_t i=0; i<vec_pciw_.size(); i++) {
149 vector<ClientSocket>& vskt=vvec_skt_[i];
150 for(size_t j=0; j<vskt.size(); j++) {
151 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
152 vskt[j].ReceiveAll(msg,BRTCPMSGLEN);
153 if (strncmp(msg,"BAORadio-EthernetReader-Ready-GO",32)!=0) {
154 msg[BRTCPMSGLEN-1]='\0';
155 cout << "PCIEToEthernet:ERROR BAD Go for fiber=" << i << " dest=" << j << " msg: " << msg << endl;
156 throw SocketException("PCIEToEthernet:ERROR/ BAD Go message from EthernetReader ");
157 }
158 cout << " PCIEToEthernet::run() Received GO message for Fiber/DMA " << i << " from " << destname_[j] << endl;
159 }
160 }
161
162 cout << " PCIEToEthernet::run() Starting DMA to Ethernet transfers - NbFibers=NbDMA=" << vec_pciw_.size()
163 << " x NbEthDestinations=" << destname_.size() << endl;
164
165 // Byte* nextdma = locdata+((kmz%memgr.NbZones())*(paqsz*memgr.NbPaquets()));
166 uint_4 npaqfaitg = 0;
167
168 while (npaqfaitg < nmaxpaq_) { // Boucle global G
169 if (fgarret) break;
170 if (stop_) break;
171
172 // Lancement des DMA
173 for (int dma=0; dma < (int)nbDma_ ;dma++) vec_pciw_[dma]->StartTransfers();
174
175 // On pointe vers le debut de la zone a remplir aver le prochain DMA
176 //-- Zone memoire locale Byte* nextdma = buff+i*paqsz;
177
178 bool fgbaddma=false;
179 // On boucle sur les nbDma_ en attente de leurs terminaison
180 for (int dma=0; dma <(int) nbDma_ ;dma++) {
181 Datas[dma]=vec_pciw_[dma]->GetData();
182 if (Datas[dma] == NULL) { // No data Read in DMA
183 nerrdma ++; fgbaddma=true;
184 cout << "PCIEToEthernetChecker/Erreur Waiting for datas ..." << endl;
185 vec_pciw_[dma]->PrintStatus(cout);
186 if (nerrdma>=maxerrdma) { fgarret = true; break; }
187 }
188 }
189 if (fgbaddma) continue;
190 if ((prtlev_>0)&&(npaqfaitg%prtmodulo_==0))
191 cout << " PCIEToEthernet::run()/Info NPaqFait= " << npaqfaitg << endl;
192 if (fgdirectsend_) { // Pas de copie / reduction de taille de paquet, on rebalance tel quel ...
193 for (int fib=0; fib<(int) nbDma_ ;fib++) {
194 //DBG cout << " DEBUG*B fib=" << fib << " Datas[fib]=" << hex << Datas[fib] << dec << endl;
195 SendToTargets(fib, Datas[fib], dmasz);
196 trnbytes[fib] += (uint_8)dmasz;
197 npaqfait[fib] += (trnbytes[fib]/(uint_8)paqszmm);
198 if (fib==nbDma_-1) npaqfaitg += (trnbytes[fib]/(uint_8)paqszmm);
199 trnbytes[fib] = trnbytes[fib]%(uint_8)paqszmm;
200 }
201 continue; // On bypasse le reste
202 }
203
204 // Si on arrive ici, c'est qu'il faut copier / reduire les paquets ....
205 uint_4 curoff=0;
206 //1- On traite le paquet a cheval, rempli partiellement avec le DMA d'avant si necessaire pour les n fibres
207 if (off_acheval > 0) { // IF Numero B
208 if ((paqsz-off_acheval)< dmasz) { // IF Numero A
209 for(uint_4 fib=0; fib<nbDma_; fib++)
210 memcpy((void *)((tampon[fib])+off_acheval), (void *)Datas[fib], paqsz-off_acheval);
211 curoff = paqsz-off_acheval; off_acheval = 0;
212 for(uint_4 fib=0; fib<nbDma_; fib++) {
213 // CHECK S'il faut faire une reduction de taille de paquet
214 if (fgredpaq) { // reduction taille de paquet
215 if (par_.reducneedcopy) {
216 BRPaquet paqc1(tampon[fib], predtampon, paqsz, swapall_);
217 BRPaquet paqc2(nexpaq[fib], par_.redpqsize);
218 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
219 }
220 else {
221 BRPaquet paqc1(tampon[fib], paqsz);
222 BRPaquet paqc2(nexpaq[fib], par_.redpqsize);
223 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
224 }
225 }
226 else {
227 BRPaquet paqc(tampon[fib], nexpaq[fib], paqsz, swapall_);
228 }
229 BRPaquet paq(nexpaq[fib], packSizeInMgr_);
230 SendToTargets(fib, nexpaq[fib], packSizeInMgr_);
231 npaqfait[fib]++;
232 if (fib==nbDma_-1) npaqfaitg++; // Ne pas oublier le compteur de paquets faits
233 pcheck[fib].Check(paq); // Verification du paquet / FrameCounter
234 }
235 }
236 else { // se rapporte au IF numero A
237 for(uint_4 fib=0; fib<nbDma_; fib++)
238 memcpy((void *)(tampon[fib]+off_acheval), (void *)Datas[fib], dmasz);
239 curoff =dmasz;
240 off_acheval = (dmasz+off_acheval);
241 }
242 } // Fin IF Numero B
243
244 //2- On traite les paquets complets qui se trouvent dans la zone du DMA
245 while ((curoff+paqsz)<=dmasz) { // while numero C
246 // if ((dma==nbDma_-1)&&(npaqfait >= nmax_* memgr.NbPaquets())) break;
247 // CHECK S'il faut faire une reduction de taille de paquet
248 for(uint_4 fib=0; fib<nbDma_; fib++) {
249 if (fgredpaq) { // reduction taille de paquet
250 if (par_.reducneedcopy) {
251 BRPaquet paqc1(Datas[fib]+curoff, predtampon, paqsz, swapall_);
252 BRPaquet paqc2(nexpaq[fib], par_.redpqsize);
253 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
254 }
255 else {
256 BRPaquet paqc1(Datas[fib]+curoff, paqsz);
257 BRPaquet paqc2(nexpaq[fib], par_.redpqsize);
258 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
259 }
260 }
261 else {
262 BRPaquet paqc(Datas[fib]+curoff, nexpaq[fib], paqsz, swapall_);
263 }
264 BRPaquet paq(nexpaq[fib], packSizeInMgr_);
265 SendToTargets(fib, nexpaq[fib], packSizeInMgr_);
266 npaqfait[fib]++;
267 if (fib==nbDma_-1) npaqfaitg++; // Ne pas oublier le compteur de paquets faits
268 pcheck[fib].Check(paq); // Verification du paquet / FrameCounter
269 }
270 curoff += paqsz; // On avance l'index dans le buffer du DMA
271 } // -- FIN traitement des paquets complets ds un DMA - FIN du while numero C
272 //3- On copie si besoin la fin du DMA dans la zone tampon
273 if (curoff < dmasz) { // IF numero D
274 off_acheval = dmasz-curoff;
275 for(uint_4 fib=0; fib<nbDma_; fib++)
276 memcpy(tampon[fib], (void*)(Datas[fib]+curoff), off_acheval);
277 // ne sert a rien curoff += off_acheval;
278 } // FIN du if numero D
279 } // FIN Boucle global G
280
281 setRC(0);
282 gettimeofday(&tv2, NULL);
283 double tmelaps2 = (tv2.tv_sec-tv1.tv_sec)*1000.+(tv2.tv_usec-tv1.tv_usec)/1000.;
284 if (tmelaps2<0.1) tmelaps2=0.1;
285 cout << " ---------- PCIEToEthernet::run()-End summary NPaqFait=" << npaqfaitg
286 << " TotSend (kb)=" << totrdsnd_/1024 << "------ " << endl;
287 for (int dma=0; dma < (int)nbDma_ ;dma++) {
288 cout << " --Fib=" << dma << " NPaqFait=" << npaqfait[dma] << " TotDMATransfer="
289 << vec_pciw_[dma]->TotTransferBytes()/1024
290 << " ElapsTime=" << tmelaps2 << " ms ->"
291 << (double)vec_pciw_[dma]->TotTransferBytes()/tmelaps2 << " kb/s" << endl;
292
293 vector<ClientSocket>& vskt = vvec_skt_[dma];
294 cout << " TotEthTransfer=";
295 for(size_t j=0; j<vskt.size(); j++)
296 cout << vskt[j].NBytesSent()/1024 << " , ";
297 cout << " kb" << endl;
298
299 if (!fgdirectsend_) pcheck[dma].Print(cout);
300 }
301 cout << " --------------------------------------------------------------------" << endl;
302
303 for (int i=0;i< (int)nbDma_ ;i++) {
304 delete[] tampon[i];
305 delete[] nexpaq[i];
306 }
307 if ((fgredpaq)&&predtampon) delete[] predtampon;
308
309 //DBG cout << " fin thread ========================" <<endl;
310 return;
311}
312
313
314/* --Methode-- */
315void PCIEToEthernet::Stop()
316{
317 // cout << " PCIEReaderChecker::stop() ........ STOP" <<endl;
318 stop_ = true;
319
320}
321
322static long cnt_prt=0;
323/* --Methode-- */
324size_t PCIEToEthernet::SendToTargets(int fib, Byte* data, size_t len)
325{
326 //DBG cout << " SendToTargets/DBG" << cnt_prt << " len=" << len << " data=" << hex << (unsigned long)data << dec << endl; cnt_prt++;
327 vector<ClientSocket>& vskt = vvec_skt_[fib];
328 vector<uint_8>& verrcnt = vvec_errorcnt_[fib];
329 char msg[BRTCPMSGLEN2];
330 size_t rc=0;
331 if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
332 sprintf(msg,"PCIEToEthernet-SendCnt %d",vec_cntpaq_[fib]);
333 for(size_t j=0; j<vskt.size(); j++) {
334 vskt[j].SendAll(msg, BRTCPMSGLEN2);
335 }
336 }
337 // Envoi des donnees (paquets)
338 for(size_t j=0; j<vskt.size(); j++) {
339 rc += vskt[j].SendAll((const char *)data, len);
340 totrdsnd_ += len;
341 }
342 vec_cntpaq_[fib]++;
343#ifdef BR_DO_HANDSHAKE
344 // attente message de hand-shake
345 if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
346 for(size_t j=0; j<vskt.size(); j++) {
347 //DBG cout << "SendToTargets/DEBUG-B attente -RecvCnt fib=" << fib << " npaq=" << vec_cntpaq_[fib] << endl;
348 vskt[j].ReceiveAll(msg, BRTCPMSGLEN2);
349 msg[BRTCPMSGLEN2-1]='\0';
350 if (strncmp(msg,"EthernetReader-RecvCnt",22) != 0) {
351 cout << " PCIEToEthernet::SendToTargets/ BAD HandShake, Fiber=" << fib
352 << " NumSocket=" << j << " msg: " << msg << endl;
353 verrcnt[j]++;
354 }
355 }
356 }
357#endif
358 if (vec_cntpaq_[fib]==nmaxpaq_) { // fin de l'envoi pour cette fibre
359 sprintf(msg,"PCIEToEthernet-Send-END %d",vec_cntpaq_[fib]);
360 for(size_t j=0; j<vskt.size(); j++) {
361 vskt[j].SendAll(msg, BRTCPMSGLEN2);
362 }
363
364 }
365 return rc;
366}
367//------------------------------------------------------------------
368// Classe thread de lecture sur interface reseau (Ethernet)
369//------------------------------------------------------------------
370
371/* --Methode-- */
372EthernetReader::EthernetReader(RAcqMemZoneMgr& mem, BRParList const& par, int portid)
373 : memgr_(mem), par_(par), stop_(false), tcpportid_(portid)
374{
375 totnbytesrd_ = 0;
376 totnpaqrd_ = 0;
377 totsamefc_ = 0;
378 totnbresync_ = 0;
379
380 if (memgr_.NbFibres() > MAXANAFIB)
381 throw BAORadioException("EthernetReader::EthernetReader/ NbFibres>MAXANAFIB ");
382 if (par_.ethr_nlink != memgr_.NbFibres())
383 throw BAORadioException("EthernetReader::EthernetReader/ NbFibres != ethr_nlink");
384 SetPrintLevel(par.prtlevel_,par.prtmodulo_);
385
386 SetReadMode(); // definitition du mode de lecture des liens
387
388 packsize_=memgr_.PaqSize();
389 mid_=-2;
390 mmbuf_=NULL;
391 max_targ_npaq = memgr_.NbPaquets();
392 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) mmbufib_[fib]=NULL;
393
394 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
395 vpaq_.push_back(BRPaquet(NULL,memgr_.PaqSize()));
396 vpchk_.push_back(BRPaqChecker(true,0));
397 curfc_.push_back(0);
398 totnpqrd_.push_back(0);
399 totnpqok_.push_back(0);
400 }
401 srv_sokp_ = new ServerSocket(tcpportid_, par_.ethr_nlink);
402 char msg[BRTCPMSGLEN];
403 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
404 Socket sok=srv_sokp_->WaitClientConnection();
405 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
406 sok.ReceiveAll(msg,BRTCPMSGLEN);
407 if (strncmp(msg,"BAORadio-PCIEToEthernet",23)!=0)
408 throw SocketException("EthernetReader:ERROR/ Bad message from PCIEToEthernet client !");
409 int ia,ib,ic,id,ie;
410 sscanf(msg+25,"%d %d %d %d %d",&ia,&ib,&ic,&id,&ie);
411 // if (((uint_4)ia!=par_.MMgrNbPaquet()) || ((uint_4)ib!=par_.MMgrPaquetSize())) {
412 if (((uint_4)ia!=memgr_.NbPaquets()) || ((uint_4)ib!=memgr_.PaqSize())) {
413 strcpy(msg,"BAORadio-EthernetReader-BAD MMgrNbPaquet/MMgrPaquetSize()");
414 sok.SendAll(msg,BRTCPMSGLEN);
415 throw SocketException("EthernetReader:ERROR/ Bad MMgrNbPaquet/MMgrPaquetSize() from PCIEToEthernet client !");
416 }
417 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
418 strcpy(msg,"BAORadio-EthernetReader-OK");
419 sok.SendAll(msg,BRTCPMSGLEN);
420 vsok_.push_back(sok);
421 cout << " EthernetReader/Info connection/link" << fib << " established." << endl;
422 vec_cntpaq_.push_back(0);
423 vec_fgsokend_.push_back(false);
424 }
425 gl_fgsokend = true;
426 dummybuff_ = new char[packsize_];
427}
428
429/* --Methode-- */
430EthernetReader::~EthernetReader()
431{
432 if (prtlev_>0) cout << " ~EthernetReader() , TotNPaqRd=" << totnpaqrd_ << " TotNbReSync=" << totnbresync_ << endl;
433 for(size_t j=0; j<vsok_.size(); j++) {
434 if (prtlev_>1)
435 cout << " ~EthernetReader() closing sok[" << j << "] CntPaq=" << vec_cntpaq_[j]
436 << " TotNPaqRd=" << totnpqrd_[j] << " TotNPaqOK=" << totnpqok_[j] << endl;
437 vsok_[j].Close();
438 }
439 srv_sokp_->Close();
440 delete srv_sokp_;
441 if (dummybuff_) delete[] dummybuff_;
442}
443
444/* --Methode-- */
445void EthernetReader::run()
446{
447 setRC(1);
448 try {
449 TimeStamp ts;
450 Timer tm("EthernetReader", false);
451 cout << " EthernetReader::run() - Starting " << ts << " NbFibres()=" << memgr_.NbFibres()
452 << " PaqSize() = " << memgr_.PaqSize() << endl;
453 cout << " ...ReadMode: " << ((rdsamefc_)?"Paquets With SameFrameCounter":"All OK paquets") << endl;
454
455 cout << " Sending GO message to all PCIEToEthernet clients ... " << endl;
456 char msg[BRTCPMSGLEN];
457 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
458 strcpy(msg,"BAORadio-EthernetReader-Ready-GO");
459 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
460 cout << " EthernetReader/Debug - Sending GO message to sender " << fib << endl;
461 vsok_[fib].SendAll(msg,BRTCPMSGLEN);
462 }
463 Byte* nextpaq=NULL;
464 bool fgok=true;
465 while (fgok&&(totnpaqrd_<par_.MaxNbPaquets())) {
466 if (stop_) break;
467 if ( MoveToNextTarget() ) {
468 cout << "EthernetReader::run()/Error-A- MoveToNextTarget() returned true ->STOP 9" << endl;
469 setRC(7); fgok=false; break;
470 }
471 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
472 nextpaq=GetPaquetTarget(fib);
473 if (nextpaq == NULL) { // Cela ne devrait pas arriver
474 cout << "EthernetReader::run()/Error-A2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl;
475 setRC(9); fgok=false; break;
476 }
477 vpaq_[fib].Set(nextpaq);
478 }
479 if (ReadNextAllFibers()) { fgok=false; break; }
480 totnpaqrd_++;
481 if ((prtlev_>0)&&(totnpaqrd_%prtmodulo_==0)) {
482 cout << "EthernetReader: NbPaq/Link=" << totnpaqrd_ << " NSameFC="
483 << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0]
484 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
485 }
486 }
487
488 MoveToNextTarget(); // Pour faire traiter le dernier paquet si plein
489 MZoneManage(true); // Nettoyage final
490
491 cout << " ------------------ EthernetReader::run() END ----------------- " << endl;
492 ts.SetNow();
493 tm.SplitQ();
494 cout << " END reading TotNPaq/Link=" << totnpaqrd_ << " :" << ts << endl;
495 cout << "... NbPaq/Link=" << totnpaqrd_ << " NSameFC="
496 << totsamefc_ << " / NPaqLink[0]_Read=" << totnpqrd_[0]
497 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
498
499 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
500 int perc=0;
501 if (totnpqrd_[fib]>0) perc=100*totsamefc_/totnpqrd_[fib];
502 cout << " Fiber" << fib << " TotNPaqRd=" << totnpqrd_[fib] << " TotNPaqOK=" << totnpqok_[fib]
503 << " FracSameFC=" << perc << " %" << endl;
504 if (prtlev_ > 0) vpchk_[fib].Print(cout);
505 }
506 cout << " TotalEthernetRead= " << totnbytesrd_/(1024*1024) << " MBytes Ethernet-Read rate= "
507 << (double)(totnbytesrd_)/1024./tm.PartialElapsedTimems() << " MB/s" << endl;
508 cout << " EthernetReader::run()/Timing: \n";
509 tm.Print();
510 cout << " ---------------------------------------------------------- " << endl;
511
512 usleep(50000); // Attente de traitement du dernier paquet
513 memgr_.Stop(); // Arret
514 CleanUpAllSockets(); // On lit tous les liens jusqu'a la reception du message END
515 cout << " EthernetReader::run(): done CleanUpAllSockets()" << endl;
516 } // Fin du bloc try
517 catch (std::exception& exc) {
518 cout << " EthernetReader::run()/catched execption msg= " << exc.what() << endl;
519 setRC(3);
520 return;
521 }
522 catch(...) {
523 cout << " EthernetReader::run()/catched unknown ... exception " << endl;
524 setRC(4);
525 return;
526 }
527 setRC(0);
528 return;
529}
530
531/* --Methode-- */
532bool EthernetReader::ReadNextAllFibers()
533{
534 if (SkipAndSyncLinks()>8) return true;
535
536 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
537 if (ReadNext(fib)) return true; // probleme
538 }
539 if (!rdsamefc_ || (memgr_.NbFibres()<2)) {
540 uint_8 cfc=curfc_[0];
541 bool fgsamefc=true;
542 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
543 if (curfc_[fib]!=cfc) fgsamefc=false;
544 }
545 if (fgsamefc) totsamefc_++;
546 return false; // c'est OK
547 }
548 // On va essayer de lire jusqu'a avoir same_frame_counter
549 bool echec=true;
550 int nbechec=0;
551 while (echec) {
552 if (nbechec>sfc_maxresync_) {
553 if (prtlev_>0)
554 cout << " EthernetReader::ReadNextAllFibers()/ Stopping, reason NbEchec>(NbMax_Resync=" << sfc_maxresync_ << ")" << endl;
555 return true;
556 }
557 uint_8 cfc=curfc_[0];
558 bool fgsamefc=true;
559 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
560 if (curfc_[fib]!=cfc) {
561 fgsamefc=false;
562 if (curfc_[fib] > cfc) cfc=curfc_[fib];
563 }
564 }
565 if (fgsamefc) {
566 totsamefc_++; echec=false; return false; // c'est OK , same framecounter
567 }
568 else { // else !fgsame
569 if (nbechec>sfc_maxresync_) return true;
570 if (nbechec>0) {
571 int rcres=SkipAndSyncLinks();
572 if (rcres>8) return true;
573 else if (rcres==1) { // Il faut relire les fibres
574 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
575 if (ReadNext(fib)) return true; // probleme
576 }
577 fgsamefc=true; cfc=curfc_[0];
578 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
579 if (curfc_[fib]!=cfc) {
580 fgsamefc=false;
581 if (curfc_[fib] > cfc) cfc=curfc_[fib];
582 }
583 }
584 if (fgsamefc) {
585 totsamefc_++; echec=false; return false; // c'est OK , same framecounter
586 }
587 } // fin de if (rcres==1)
588 }
589
590 if (prtlev_>5)
591 cout << " EthernetReader::ReadNextAllFibers()/DEBUG NbEchec=" << nbechec
592 << " cfc=" << cfc << endl;
593
594 for(uint_4 fib=0; fib<memgr_.NbFibres(); fib++) {
595 for(uint_4 ntryrd=0; ntryrd<sfc_maxdpc_; ntryrd++) { // on tente un maximum de
596 if (curfc_[fib]>=cfc) break;
597 if (ReadNext(fib)) return true; // probleme
598 }
599 }
600 nbechec++;
601 } // fin de else !fgsame
602 } // fin de while(echec): lecture jusqu'a same_frame_counter
603
604 return true; // probleme
605}
606
607/* --Methode-- */
608bool EthernetReader::ReadNext(int fib)
609{
610 bool fggood=false;
611 while(!fggood) {
612 ReceiveFromSocket(fib, (char *)vpaq_[fib].Begin(), packsize_);
613 if (vec_fgsokend_[fib]) { // fin de reception sur le lien
614 if (prtlev_>0)
615 cout << " EthernetReader::ReadNext(" << fib << ")/ Stopping, reason EndReceive fgsokend_=" << vec_fgsokend_[fib] << endl;
616 return true;
617 }
618 totnbytesrd_+=packsize_;
619 totnpqrd_[fib]++;
620 fggood = vpchk_[fib].Check(vpaq_[fib],curfc_[fib]);
621 }
622 totnpqok_[fib]++;
623 return false;
624}
625
626/* --Methode-- */
627void EthernetReader::CleanUpAllSockets()
628{
629 bool fgallfinished=false;
630 while (!fgallfinished) {
631 fgallfinished=true;
632 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
633 if (vec_fgsokend_[fib]>0) continue;
634 ReceiveFromSocket(fib,dummybuff_,packsize_);
635 fgallfinished=false;
636 }
637 }
638 return;
639}
640
641/* --Methode-- */
642size_t EthernetReader::ReceiveFromSocket(int fib, char* data, size_t len)
643{
644 size_t rc =0;
645 if (vec_fgsokend_[fib]) { // la lecture est finie
646 memset(data, 0, len);
647 return 0;
648 }
649
650 if (vec_cntpaq_[fib]%memgr_.NbPaquets() == 0) {
651 char msg[BRTCPMSGLEN2];
652 msg[0]='\0';
653 vsok_[fib].ReceiveAll(msg, BRTCPMSGLEN2);
654 // if (strncmp(msg,"PCIEToEthernet-SendCnt",22) != 0)
655 // cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake message : " << msg << endl;
656 bool fgbadhsm=true;
657 if (strncmp(msg,"PCIEToEthernet-Send",19)==0) {
658 if (strncmp(msg+19,"Cnt",3)==0) fgbadhsm=false;
659 else if (strncmp(msg+19,"-END",4)==0) {
660 fgbadhsm=false;
661 vec_fgsokend_[fib]=1; gl_fgsokend=true;
662 //DBG cout << "ReceiveFromSocket/DEBUG -END msg received " << endl;
663 if (prtlev_>2)
664 cout << " EthernetReader::ReceiveFromSocket/ Receive on LinkNo" << fib << " Ended OK" << endl;
665 }
666 }
667 if (fgbadhsm) {
668 cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake, LinkNo " << fib << " msg: " << msg << endl;
669 vec_fgsokend_[fib]=2;
670 }
671 }
672
673 if (vec_fgsokend_[fib]>0) { // la lecture est finie
674 memset(data, 0, len);
675 return 0;
676 }
677
678 rc += vsok_[fib].ReceiveAll(data, len);
679 vec_cntpaq_[fib]++;
680
681#ifdef BR_DO_HANDSHAKE
682 // Envoi de message de hand-shake
683 if (vec_cntpaq_[fib]%memgr_.NbPaquets() == 0) {
684 char msg[BRTCPMSGLEN2];
685 sprintf(msg,"EthernetReader-RecvCnt %d",vec_cntpaq_[fib]);
686 vsok_[fib].SendAll(msg, BRTCPMSGLEN2);
687 //DBG cout << " ReceiveFromSocket/DEBUG-B msg=" << msg << endl;
688 }
689#endif
690 return rc;
691}
692
693/* --Methode-- */
694int EthernetReader::SkipAndSyncLinks()
695{
696 uint_8 minnumpaq=vec_cntpaq_[0]; // minimum des nombres de paquets lus sur les differents liens
697 size_t minnp_fid=0; // numero de fibre correspondant au minimum de paquets lus
698 uint_8 maxnumpaq=vec_cntpaq_[0]; // maximum des nombres de paquets lus sur les differents liens
699 size_t maxnp_fid=0; // numero de fibre correspondant au maximum de paquets lus
700 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
701 if (vec_cntpaq_[fib]<=minnumpaq) {
702 minnumpaq=vec_cntpaq_[fib]; minnp_fid=fib;
703 }
704 if (vec_cntpaq_[fib]>=maxnumpaq) {
705 maxnumpaq=vec_cntpaq_[fib]; maxnp_fid=fib;
706 }
707 }
708 if (!rdsamefc_ && (minnumpaq!=maxnumpaq))
709 cout << " EthernetReader::SkipAndSyncLinks()/BUG or PROBLEM - ReadAllOK and min<>maxnumpaq="
710 << minnumpaq << "," << maxnumpaq << " !!!" << endl;
711 if ((maxnumpaq-minnumpaq)<sfc_maxdpc_) return 0;
712
713 if (prtlev_>4)
714 cout << " EthernetReader::SkipAndSyncLinks() min,maxnumpaq=" << minnumpaq << "," << maxnumpaq
715 << " min,maxnp_fid=" << minnp_fid << "," << maxnp_fid << " sfc_maxdpc_=" << sfc_maxdpc_ << endl;
716
717 for(size_t fib=0; fib<memgr_.NbFibres(); fib++) {
718 while(vec_cntpaq_[fib]<maxnumpaq) {
719 if (vec_fgsokend_[fib]>0) return 9;
720 ReceiveFromSocket(fib,dummybuff_,packsize_);
721 }
722 }
723 totnbresync_++;
724 if (prtlev_>3) {
725 cout << " EthernetReader::SkipAndSyncLinks() NbResync=" << totnbresync_ << " After Sync: vec_cntpaq_[fib]=";
726 for(size_t ii=0; ii<vec_cntpaq_.size(); ii++) cout << vec_cntpaq_[ii] << " , ";
727 cout << endl;
728 }
729
730 return 1;
731}
732
733/* --Methode-- */
734bool EthernetReader::MZoneManage(bool fgclean) // Retourne true si probleme
735{
736 /* Pour debug
737 cout << " EthernetReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_
738 << " max_targ_npaq=" << max_targ_npaq << endl;
739 */
740 if (mid_ >= 0) {
741 if (fgclean) memgr_.FreeMemZone(mid_, MemZS_Free);
742 else memgr_.FreeMemZone(mid_, MemZS_Filled);
743 }
744 mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2;
745 for (int fib=0;fib<(int)memgr_.NbFibres() ;fib++) mmbufib_[fib]=NULL;
746 if (fgclean) return false;
747 mid_ = memgr_.FindMemZoneId(MemZA_Fill);
748 mmbuf_ = memgr_.GetMemZone(mid_);
749 if (mmbuf_==NULL) return true;
750 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++)
751 mmbufib_[fib]=memgr_.GetMemZone(mid_,fib);
752 return false;
753}
Note: See TracBrowser for help on using the repository browser.