source: Sophya/trunk/AddOn/TAcq/racqueth.cc@ 3899

Last change on this file since 3899 was 3899, checked in by ansari, 15 years ago

Ajout flag waitendmsg (Wait for END message) pour le controle de la fin d'execution de EthernetReader , Reza 04/10/2010

File size: 27.9 KB
Line 
1//----------------------------------------------------------------
2// ---- classes de threads pour lecture (transfert DMA)
3// et ecriture disque pour acquisition BAORadio -----
4// LAL - R. Ansari - Juin/Juillet 2008
5//----------------------------------------------------------------
6
7#include "racqueth.h"
8
9#include <stdlib.h>
10#include <unistd.h>
11#include <fstream>
12#include <signal.h>
13#include "pexceptions.h"
14#include "ctimer.h"
15#include "timestamp.h"
16
17#include "pciewrap.h"
18#include "brpaqu.h"
19
20#include "resusage.h" // Pour mesure temps elapsed/CPU ...
21#include <sys/time.h> // pour gettimeofday
22
23////////////////////////////////////////////////////////////////////////////////////////////////////////
24//----------------------------------------------------------------------------------------------------------
25// Classe thread de lecture PCI-Express et recopie sur interface reseau (Ethernet)
26//----------------------------------------------------------------------------------------------------------
27
28// Si on veut avoir le protocole de controle (Hand-shake) entre l'emetteur et le recepteur,
29// decommenter la ligne suivante - Reza , 4 octobre 2010
30// #define BR_DO_HANDSHAKE 1
31
32/* --Methode-- */
33PCIEToEthernet::PCIEToEthernet(vector<PCIEWrapperInterface*> vec_pciw, vector<string>& destname, BRParList const& par, int portid)
34 : par_(par), vec_pciw_ (vec_pciw), destname_(destname), tcpportid_(portid)
35{
36 nmaxpaq_ = par_.MaxNbPaquets();
37 swapall_ = par_.GetDataConvFg(); // select data swap/format conversion for BRPaquet
38 stop_ = false;
39 packSize_ = par_.RecvPaquetSize();
40 packSizeInMgr_=par_.MMgrPaquetSize();
41 sizeFr_=par_.DMASizeBytes();
42 if (vec_pciw.size() > MAXNBFIB)
43 throw ParmError("PCIEToEthernet:ERROR/ vec_pciw.size() > MAXNBFIB ");
44 nbDma_= vec_pciw.size();
45
46 // true -> direct transfer of data to ethernet
47 fgdirectsend_ = (par_.pci2eth_fgdirect && (swapall_==BR_Copy) ) ? true : false;
48 char msg[BRTCPMSGLEN];
49 cout << "PCIEToEthernet/Info: Establishing TCP/IP connections ... " << endl;
50 for(size_t i=0; i<vec_pciw_.size(); i++) {
51 vector<ClientSocket> vskt;
52 vector<uint_8> verrcnt;
53 for(size_t j=0; j<destname_.size(); j++) {
54 ClientSocket sok(destname_[j], tcpportid_);
55 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
56 uint_4 dmasz = (fgdirectsend_) ? vec_pciw_[0]->TransferSize() : 0;
57 sprintf(msg,"BAORadio-PCIEToEthernet %d %d %d %d %d", par_.MMgrNbPaquet(), par_.MMgrPaquetSize(), dmasz, 0,i);
58 // 123456789012345678901234567890
59 sok.SendAll(msg,BRTCPMSGLEN);
60 sok.ReceiveAll(msg,BRTCPMSGLEN);
61 if (strncmp(msg,"BAORadio-EthernetReader-OK",26)!=0)
62 throw SocketException("PCIEToEthernet:ERROR/ Connection to EthernetReader not established ");
63 cout << " PCIEToEthernet: Ethernet connection established for DMA/fiber" << i << " with " << destname_[j] << endl;
64 vskt.push_back(sok);
65 verrcnt.push_back(0);
66 }
67 vvec_skt_.push_back(vskt);
68 vvec_errorcnt_.push_back(verrcnt);
69 vec_cntpaq_.push_back(0);
70 }
71 totrdsnd_ = 0;
72 SetPrintLevel(par.prtlevel_,par.prtmodulo_);
73}
74
75/* --Methode-- */
76PCIEToEthernet::~PCIEToEthernet()
77{
78 for(size_t i=0; i<vec_pciw_.size(); i++) {
79 vector<ClientSocket>& vskt = vvec_skt_[i];
80 vector<uint_8>& verrcnt = vvec_errorcnt_[i];
81 for(size_t j=0; j<destname_.size(); j++) {
82 cout << " ~PCIEToEthernet() closing socket, fiber/pcieNum=" << i << " ethernet_destNum=" << j
83 << " ErrorCount=" << verrcnt[j] << endl;
84 vskt[j].Close();
85 }
86 }
87}
88
89/* --Methode-- */
90void PCIEToEthernet::run()
91{
92
93 struct timeval tv1,tv2;
94 gettimeofday(&tv1, NULL);
95
96 cout << " PCIEToEthernet::run() - Starting , NMaxMemZones=" << par_.MaxNbBlocs()
97 << " MMgrNbPaquet=" << par_.MMgrNbPaquet() << " MMgr.PaqSiz=" << par_.MMgrPaquetSize()
98 << " DMA-Paqsize " << packSize_ << " " << BRPaquet::FmtConvToString(swapall_) << endl;
99 setRC(1);
100
101 // sigaddset(&act.sa_mask,SIGINT); // pour proteger le transfert DMA
102 //sigaction(SIGINT,&act,NULL);
103 uint_4 paqszmm = par_.MMgrPaquetSize();
104 uint_4 paqsz = packSize_;
105 uint_4 dmasz = vec_pciw_[0]->TransferSize();
106 //DEL vec_pciw_[0]->StartTransfers();
107
108 BRPaqChecker pcheck[MAXNBFIB]; // Verification/comptage des paquets
109 Byte* Datas[MAXNBFIB];
110 Byte* tampon[MAXNBFIB] ;
111 Byte* nexpaq[MAXNBFIB] ; // Pour recevevoir les paquets apres reduction de taille
112
113 Byte* predtampon=NULL; // tampon de recopie pour la reduction des tailles de paquets
114 Byte* nextpaq=NULL;
115 uint_4 off_acheval=0;
116
117 int nerrdma = 0;
118 int maxerrdma = 10;
119 bool fgarret = false;
120
121 // Initialisation des tampons pour recopie des paquets a cheval pour chaque DMA
122 for (int i=0;i< (int)nbDma_ ;i++) {
123 // cout << " DEBUG*AB paqsz=" << paqsz << " paqszmm=" << paqszmm << endl;
124 tampon[i]=nexpaq[i]=NULL;
125 tampon[i] = new Byte[paqsz];
126 nexpaq[i] = new Byte[paqszmm];
127 }
128
129 if (fgdirectsend_)
130 cout << " PCIEToEthernet::run() Direct transfer mode DMA to Ethernet ... " << endl;
131 bool fgredpaq=par_.fgreducpsize;
132 if (fgredpaq) {
133 cout << " PCIEToEthernet::run() - PaquetSizeReduction - RedSize=" << par_.redpqsize
134 << " Offset=" << par_.reducoffset << " " << ((par_.reducneedcopy)?"NeedCopy":"NOCopy")
135 << " " << BRPaquet::ReducActionToString(par_.pqreducmode) << endl;
136 predtampon = new Byte[paqsz];
137 }
138
139
140 uint_8 trnbytes[MAXNBFIB];
141 uint_4 npaqfait[MAXNBFIB];
142 for (int i=0;i< (int)nbDma_ ;i++) {
143 npaqfait[i]=0; trnbytes[i]=0;
144 }
145 // Attente du message GO des taches lecteurs
146 cout << " PCIEToEthernet::run() Waiting for GO message from EthernetReader's ..." << endl;
147 char msg[BRTCPMSGLEN];
148 for(size_t i=0; i<vec_pciw_.size(); i++) {
149 vector<ClientSocket>& vskt=vvec_skt_[i];
150 for(size_t j=0; j<vskt.size(); j++) {
151 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
152 vskt[j].ReceiveAll(msg,BRTCPMSGLEN);
153 if (strncmp(msg,"BAORadio-EthernetReader-Ready-GO",32)!=0) {
154 msg[BRTCPMSGLEN-1]='\0';
155 cout << "PCIEToEthernet:ERROR BAD Go for fiber=" << i << " dest=" << j << " msg: " << msg << endl;
156 throw SocketException("PCIEToEthernet:ERROR/ BAD Go message from EthernetReader ");
157 }
158 cout << " PCIEToEthernet::run() Received GO message for Fiber/DMA " << i << " from " << destname_[j] << endl;
159 }
160 }
161
162 cout << " PCIEToEthernet::run() Starting DMA to Ethernet transfers - NbFibers=NbDMA=" << vec_pciw_.size()
163 << " x NbEthDestinations=" << destname_.size() << endl;
164
165 // Byte* nextdma = locdata+((kmz%memgr.NbZones())*(paqsz*memgr.NbPaquets()));
166 uint_4 npaqfaitg = 0;
167
168 while (npaqfaitg < nmaxpaq_) { // Boucle global G
169 if (fgarret) break;
170 if (stop_) break;
171
172 // Lancement des DMA
173 for (int dma=0; dma < (int)nbDma_ ;dma++) vec_pciw_[dma]->StartTransfers();
174
175 // On pointe vers le debut de la zone a remplir aver le prochain DMA
176 //-- Zone memoire locale Byte* nextdma = buff+i*paqsz;
177
178 bool fgbaddma=false;
179 // On boucle sur les nbDma_ en attente de leurs terminaison
180 for (int dma=0; dma <(int) nbDma_ ;dma++) {
181 Datas[dma]=vec_pciw_[dma]->GetData();
182 if (Datas[dma] == NULL) { // No data Read in DMA
183 nerrdma ++; fgbaddma=true;
184 cout << "PCIEToEthernetChecker/Erreur Waiting for datas ..." << endl;
185 vec_pciw_[dma]->PrintStatus(cout);
186 if (nerrdma>=maxerrdma) { fgarret = true; break; }
187 }
188 }
189 if (fgbaddma) continue;
190 if ((prtlev_>0)&&(npaqfaitg%prtmodulo_==0))
191 cout << " PCIEToEthernet::run()/Info NPaqFait= " << npaqfaitg << endl;
192 if (fgdirectsend_) { // Pas de copie / reduction de taille de paquet, on rebalance tel quel ...
193 for (int fib=0; fib<(int) nbDma_ ;fib++) {
194 //DBG cout << " DEBUG*B fib=" << fib << " Datas[fib]=" << hex << Datas[fib] << dec << endl;
195 SendToTargets(fib, Datas[fib], dmasz);
196 trnbytes[fib] += (uint_8)dmasz;
197 npaqfait[fib] += (trnbytes[fib]/(uint_8)paqszmm);
198 if (fib==nbDma_-1) npaqfaitg += (trnbytes[fib]/(uint_8)paqszmm);
199 trnbytes[fib] = trnbytes[fib]%(uint_8)paqszmm;
200 }
201 continue; // On bypasse le reste
202 }
203
204 // Si on arrive ici, c'est qu'il faut copier / reduire les paquets ....
205 uint_4 curoff=0;
206 //1- On traite le paquet a cheval, rempli partiellement avec le DMA d'avant si necessaire pour les n fibres
207 if (off_acheval > 0) { // IF Numero B
208 if ((paqsz-off_acheval)< dmasz) { // IF Numero A
209 for(uint_4 fib=0; fib<nbDma_; fib++)
210 memcpy((void *)((tampon[fib])+off_acheval), (void *)Datas[fib], paqsz-off_acheval);
211 curoff = paqsz-off_acheval; off_acheval = 0;
212 for(uint_4 fib=0; fib<nbDma_; fib++) {
213 // CHECK S'il faut faire une reduction de taille de paquet
214 if (fgredpaq) { // reduction taille de paquet
215 if (par_.reducneedcopy) {
216 BRPaquet paqc1(tampon[fib], predtampon, paqsz, swapall_);
217 BRPaquet paqc2(nexpaq[fib], par_.redpqsize);
218 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
219 }
220 else {
221 BRPaquet paqc1(tampon[fib], paqsz);
222 BRPaquet paqc2(nexpaq[fib], par_.redpqsize);
223 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
224 }
225 }
226 else {
227 BRPaquet paqc(tampon[fib], nexpaq[fib], paqsz, swapall_);
228 }
229 BRPaquet paq(nexpaq[fib], packSizeInMgr_);
230 SendToTargets(fib, nexpaq[fib], packSizeInMgr_);
231 npaqfait[fib]++;
232 if (fib==nbDma_-1) npaqfaitg++; // Ne pas oublier le compteur de paquets faits
233 pcheck[fib].Check(paq); // Verification du paquet / FrameCounter
234 }
235 }
236 else { // se rapporte au IF numero A
237 for(uint_4 fib=0; fib<nbDma_; fib++)
238 memcpy((void *)(tampon[fib]+off_acheval), (void *)Datas[fib], dmasz);
239 curoff =dmasz;
240 off_acheval = (dmasz+off_acheval);
241 }
242 } // Fin IF Numero B
243
244 //2- On traite les paquets complets qui se trouvent dans la zone du DMA
245 while ((curoff+paqsz)<=dmasz) { // while numero C
246 // if ((dma==nbDma_-1)&&(npaqfait >= nmax_* memgr.NbPaquets())) break;
247 // CHECK S'il faut faire une reduction de taille de paquet
248 for(uint_4 fib=0; fib<nbDma_; fib++) {
249 if (fgredpaq) { // reduction taille de paquet
250 if (par_.reducneedcopy) {
251 BRPaquet paqc1(Datas[fib]+curoff, predtampon, paqsz, swapall_);
252 BRPaquet paqc2(nexpaq[fib], par_.redpqsize);
253 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
254 }
255 else {
256 BRPaquet paqc1(Datas[fib]+curoff, paqsz);
257 BRPaquet paqc2(nexpaq[fib], par_.redpqsize);
258 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
259 }
260 }
261 else {
262 BRPaquet paqc(Datas[fib]+curoff, nexpaq[fib], paqsz, swapall_);
263 }
264 BRPaquet paq(nexpaq[fib], packSizeInMgr_);
265 SendToTargets(fib, nexpaq[fib], packSizeInMgr_);
266 npaqfait[fib]++;
267 if (fib==nbDma_-1) npaqfaitg++; // Ne pas oublier le compteur de paquets faits
268 pcheck[fib].Check(paq); // Verification du paquet / FrameCounter
269 }
270 curoff += paqsz; // On avance l'index dans le buffer du DMA
271 } // -- FIN traitement des paquets complets ds un DMA - FIN du while numero C
272 //3- On copie si besoin la fin du DMA dans la zone tampon
273 if (curoff < dmasz) { // IF numero D
274 off_acheval = dmasz-curoff;
275 for(uint_4 fib=0; fib<nbDma_; fib++)
276 memcpy(tampon[fib], (void*)(Datas[fib]+curoff), off_acheval);
277 // ne sert a rien curoff += off_acheval;
278 } // FIN du if numero D
279 } // FIN Boucle global G
280
281 setRC(0);
282 gettimeofday(&tv2, NULL);
283 double tmelaps2 = (tv2.tv_sec-tv1.tv_sec)*1000.+(tv2.tv_usec-tv1.tv_usec)/1000.;
284 if (tmelaps2<0.1) tmelaps2=0.1;
285 cout << " ---------- PCIEToEthernet::run()-End summary NPaqFait=" << npaqfaitg
286 << " TotSend (kb)=" << totrdsnd_/1024 << "------ " << endl;
287 for (int dma=0; dma < (int)nbDma_ ;dma++) {
288 cout << " --Fib=" << dma << " NPaqFait=" << npaqfait[dma] << " TotDMATransfer="
289 << vec_pciw_[dma]->TotTransferBytes()/1024
290 << " ElapsTime=" << tmelaps2 << " ms ->"
291 << (double)vec_pciw_[dma]->TotTransferBytes()/tmelaps2 << " kb/s" << endl;
292
293 vector<ClientSocket>& vskt = vvec_skt_[dma];
294 cout << " TotEthTransfer=";
295 for(size_t j=0; j<vskt.size(); j++)
296 cout << vskt[j].NBytesSent()/1024 << " , ";
297 cout << " kb" << endl;
298
299 if (!fgdirectsend_) pcheck[dma].Print(cout);
300 }
301 cout << " --------------------------------------------------------------------" << endl;
302
303 for (int i=0;i< (int)nbDma_ ;i++) {
304 delete[] tampon[i];
305 delete[] nexpaq[i];
306 }
307 if ((fgredpaq)&&predtampon) delete[] predtampon;
308
309 //DBG cout << " fin thread ========================" <<endl;
310 return;
311}
312
313
314/* --Methode-- */
315void PCIEToEthernet::Stop()
316{
317 // cout << " PCIEReaderChecker::stop() ........ STOP" <<endl;
318 stop_ = true;
319
320}
321
322static long cnt_prt=0;
323/* --Methode-- */
324size_t PCIEToEthernet::SendToTargets(int fib, Byte* data, size_t len)
325{
326 //DBG cout << " SendToTargets/DBG" << cnt_prt << " len=" << len << " data=" << hex << (unsigned long)data << dec << endl; cnt_prt++;
327 vector<ClientSocket>& vskt = vvec_skt_[fib];
328 vector<uint_8>& verrcnt = vvec_errorcnt_[fib];
329 char msg[BRTCPMSGLEN2];
330 size_t rc=0;
331 if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
332 sprintf(msg,"PCIEToEthernet-SendCnt %d",vec_cntpaq_[fib]);
333 for(size_t j=0; j<vskt.size(); j++) {
334 vskt[j].SendAll(msg, BRTCPMSGLEN2);
335 }
336 }
337 // Envoi des donnees (paquets)
338 for(size_t j=0; j<vskt.size(); j++) {
339 rc += vskt[j].SendAll((const char *)data, len);
340 totrdsnd_ += len;
341 }
342 vec_cntpaq_[fib]++;
343#ifdef BR_DO_HANDSHAKE
344 // attente message de hand-shake
345 if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
346 for(size_t j=0; j<vskt.size(); j++) {
347 //DBG cout << "SendToTargets/DEBUG-B attente -RecvCnt fib=" << fib << " npaq=" << vec_cntpaq_[fib] << endl;
348 vskt[j].ReceiveAll(msg, BRTCPMSGLEN2);
349 msg[BRTCPMSGLEN2-1]='\0';
350 if (strncmp(msg,"EthernetReader-RecvCnt",22) != 0) {
351 cout << " PCIEToEthernet::SendToTargets/ BAD HandShake, Fiber=" << fib
352 << " NumSocket=" << j << " msg: " << msg << endl;
353 verrcnt[j]++;
354 }
355 }
356 }
357#endif
358 if (vec_cntpaq_[fib]==nmaxpaq_) { // fin de l'envoi pour cette fibre
359 sprintf(msg,"PCIEToEthernet-Send-END %d",vec_cntpaq_[fib]);
360 for(size_t j=0; j<vskt.size(); j++) {
361 vskt[j].SendAll(msg, BRTCPMSGLEN2);
362 }
363
364 }
365 return rc;
366}
367//------------------------------------------------------------------
368// Classe thread de lecture sur interface reseau (Ethernet)
369//------------------------------------------------------------------
370
371/* --Methode-- */
372EthernetReader::EthernetReader(RAcqMemZoneMgr& mem, BRParList const& par, int portid)
373 : memgr_(mem), par_(par), stop_(false), tcpportid_(portid)
374{
375 totnbytesrd_ = 0;
376 totnpaqrd_ = 0;
377 totsamefc_ = 0;
378 totnbresync_ = 0;
379
380 if (memgr_.NbFibres() > MAXANAFIB)
381 throw BAORadioException("EthernetReader::EthernetReader/ NbFibres>MAXANAFIB ");
382 if (par_.ethr_nlink != memgr_.NbFibres())
383 throw BAORadioException("EthernetReader::EthernetReader/ NbFibres != ethr_nlink");
384 SetPrintLevel(par.prtlevel_,par.prtmodulo_);
385
386 SetReadMode(); // definitition du mode de lecture des liens
387
388 packsize_=memgr_.PaqSize();
389 mid_=-2;
390 mmbuf_=NULL;
391 max_targ_npaq = memgr_.NbPaquets();
392 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) mmbufib_[fib]=NULL;
393
394 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
395 vpaq_.push_back(BRPaquet(NULL,memgr_.PaqSize()));
396 vpchk_.push_back(BRPaqChecker(true,0));
397 curfc_.push_back(0);
398 totnpqrd_.push_back(0);
399 totnpqok_.push_back(0);
400 }
401 srv_sokp_ = new ServerSocket(tcpportid_, par_.ethr_nlink);
402 char msg[BRTCPMSGLEN];
403 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
404 Socket sok=srv_sokp_->WaitClientConnection();
405 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
406 sok.ReceiveAll(msg,BRTCPMSGLEN);
407 if (strncmp(msg,"BAORadio-PCIEToEthernet",23)!=0)
408 throw SocketException("EthernetReader:ERROR/ Bad message from PCIEToEthernet client !");
409 int ia,ib,ic,id,ie;
410 sscanf(msg+25,"%d %d %d %d %d",&ia,&ib,&ic,&id,&ie);
411 // if (((uint_4)ia!=par_.MMgrNbPaquet()) || ((uint_4)ib!=par_.MMgrPaquetSize())) {
412 if (((uint_4)ia!=memgr_.NbPaquets()) || ((uint_4)ib!=memgr_.PaqSize())) {
413 strcpy(msg,"BAORadio-EthernetReader-BAD MMgrNbPaquet/MMgrPaquetSize()");
414 sok.SendAll(msg,BRTCPMSGLEN);
415 throw SocketException("EthernetReader:ERROR/ Bad MMgrNbPaquet/MMgrPaquetSize() from PCIEToEthernet client !");
416 }
417 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
418 strcpy(msg,"BAORadio-EthernetReader-OK");
419 sok.SendAll(msg,BRTCPMSGLEN);
420 vsok_.push_back(sok);
421 cout << " EthernetReader/Info connection/link" << fib << " established." << endl;
422 vec_cntpaq_.push_back(0);
423 vec_fgsokend_.push_back(false);
424 }
425 gl_fgsokend = true;
426 stopreason_="??Unknown??";
427 dummybuff_ = new char[packsize_];
428}
429
430/* --Methode-- */
431EthernetReader::~EthernetReader()
432{
433 if (prtlev_>0) cout << " ~EthernetReader() , TotNPaqRd=" << totnpaqrd_ << " TotNbReSync=" << totnbresync_ << endl;
434 for(size_t j=0; j<vsok_.size(); j++) {
435 if (prtlev_>1)
436 cout << " ~EthernetReader() closing sok[" << j << "] CntPaq=" << vec_cntpaq_[j]
437 << " TotNPaqRd=" << totnpqrd_[j] << " TotNPaqOK=" << totnpqok_[j] << endl;
438 vsok_[j].Close();
439 }
440 srv_sokp_->Close();
441 delete srv_sokp_;
442 if (dummybuff_) delete[] dummybuff_;
443}
444
445/* --Methode-- */
446void EthernetReader::run()
447{
448 setRC(1);
449 try {
450 TimeStamp ts;
451 Timer tm("EthernetReader", false);
452 cout << " EthernetReader::run() - Starting " << ts << " NbFibres()=" << memgr_.NbFibres()
453 << " PaqSize() = " << memgr_.PaqSize() << endl;
454 cout << " ...ReadMode: " << ((rdsamefc_)?"Paquets With SameFrameCounter":"All OK paquets") << endl;
455
456 cout << " Sending GO message to all PCIEToEthernet clients ... " << endl;
457 char msg[BRTCPMSGLEN];
458 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
459 strcpy(msg,"BAORadio-EthernetReader-Ready-GO");
460 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
461 cout << " EthernetReader/Debug - Sending GO message to sender " << fib << endl;
462 vsok_[fib].SendAll(msg,BRTCPMSGLEN);
463 }
464 Byte* nextpaq=NULL;
465 bool fgok=true;
466 while (fgok&&(totnpaqrd_<par_.MaxNbPaquets())) {
467 if (stop_) break;
468 if ( MoveToNextTarget() ) {
469 cout << "EthernetReader::run()/Error-A- MoveToNextTarget() returned true ->STOP 9" << endl;
470 setRC(7); fgok=false; break;
471 }
472 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
473 nextpaq=GetPaquetTarget(fib);
474 if (nextpaq == NULL) { // Cela ne devrait pas arriver
475 cout << "EthernetReader::run()/Error-A2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl;
476 setRC(9); fgok=false; break;
477 }
478 vpaq_[fib].Set(nextpaq);
479 }
480 if (ReadNextAllFibers()) { fgok=false; break; }
481 totnpaqrd_++;
482 if ((prtlev_>0)&&(totnpaqrd_%prtmodulo_==0)) {
483 cout << "EthernetReader: NbPaq/Link=" << totnpaqrd_ << " NSameFC="
484 << totsamefc_ << " / NPaqFib0Read=" << totnpqrd_[0]
485 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
486 }
487 }
488
489 MoveToNextTarget(); // Pour faire traiter le dernier paquet si plein
490 MZoneManage(true); // Nettoyage final
491
492 cout << " ---------------------------------------------------------------------------- " << endl;
493 cout << "--- EthernetReader::run() END, StopReason: " << stopreason_ << " ---" << endl;
494 ts.SetNow();
495 tm.SplitQ();
496 cout << " END reading TotNPaq/Link=" << totnpaqrd_ << " :" << ts << endl;
497 cout << "... NbPaq/Link=" << totnpaqrd_ << " NSameFC="
498 << totsamefc_ << " / NPaqLink[0]_Read=" << totnpqrd_[0]
499 << " FracSameFC=" << 100*totsamefc_/totnpqrd_[0] << " %" << endl;
500
501 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
502 int perc=0;
503 if (totnpqrd_[fib]>0) perc=100*totsamefc_/totnpqrd_[fib];
504 cout << " Fiber" << fib << " TotNPaqRd=" << totnpqrd_[fib] << " TotNPaqOK=" << totnpqok_[fib]
505 << " FracSameFC=" << perc << " %" << endl;
506 if (prtlev_ > 0) vpchk_[fib].Print(cout);
507 }
508 cout << " TotalEthernetRead= " << totnbytesrd_/(1024*1024) << " MBytes Ethernet-Read rate= "
509 << (double)(totnbytesrd_)/1024./tm.PartialElapsedTimems() << " MB/s" << endl;
510 cout << " EthernetReader::run()/Timing: \n";
511 tm.Print();
512 cout << " ---------------------------------------------------------------------------- " << endl;
513
514 usleep(50000); // Attente de traitement du dernier paquet
515 memgr_.Stop(); // Arret
516 if (waitendmsg_) {
517 CleanUpAllSockets(); // On lit tous les liens jusqu'a la reception du message END
518 cout << " EthernetReader::run(): done CleanUpAllSockets()" << endl;
519 }
520 } // Fin du bloc try
521 catch (std::exception& exc) {
522 cout << " EthernetReader::run()/catched execption msg= " << exc.what() << endl;
523 setRC(3);
524 return;
525 }
526 catch(...) {
527 cout << " EthernetReader::run()/catched unknown ... exception " << endl;
528 setRC(4);
529 return;
530 }
531 setRC(0);
532 return;
533}
534
535/* --Methode-- */
536bool EthernetReader::ReadNextAllFibers()
537{
538 if (SkipAndSyncLinks()>8) return true;
539
540 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
541 if (ReadNext(fib)) return true; // probleme
542 }
543 if (!rdsamefc_ || (memgr_.NbFibres()<2)) {
544 uint_8 cfc=curfc_[0];
545 bool fgsamefc=true;
546 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
547 if (curfc_[fib]!=cfc) fgsamefc=false;
548 }
549 if (fgsamefc) totsamefc_++;
550 return false; // c'est OK
551 }
552 // On va essayer de lire jusqu'a avoir same_frame_counter
553 bool echec=true;
554 int nbechec=0;
555 while (echec) {
556 if (nbechec>sfc_maxresync_) {
557 if (prtlev_>0)
558 cout << ">S>S>S> EthernetReader::ReadNextAllFibers()/ Stopping, reason NbEchec>(NbMax_Resync="
559 << sfc_maxresync_ << ")" << endl;
560 stopreason_="ReadNextAllFibers() , StopReason: NbEchec>NbMax_Resyn";
561 return true;
562 }
563 uint_8 cfc=curfc_[0];
564 bool fgsamefc=true;
565 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
566 if (curfc_[fib]!=cfc) {
567 fgsamefc=false;
568 if (curfc_[fib] > cfc) cfc=curfc_[fib];
569 }
570 }
571 if (fgsamefc) {
572 totsamefc_++; echec=false; return false; // c'est OK , same framecounter
573 }
574 else { // else !fgsame
575 if (nbechec>sfc_maxresync_) return true;
576 if (nbechec>0) {
577 int rcres=SkipAndSyncLinks();
578 if (rcres>8) return true;
579 else if (rcres==1) { // Il faut relire les fibres
580 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
581 if (ReadNext(fib)) return true; // probleme
582 }
583 fgsamefc=true; cfc=curfc_[0];
584 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
585 if (curfc_[fib]!=cfc) {
586 fgsamefc=false;
587 if (curfc_[fib] > cfc) cfc=curfc_[fib];
588 }
589 }
590 if (fgsamefc) {
591 totsamefc_++; echec=false; return false; // c'est OK , same framecounter
592 }
593 } // fin de if (rcres==1)
594 }
595
596 if (prtlev_>5)
597 cout << " EthernetReader::ReadNextAllFibers()/DEBUG NbEchec=" << nbechec
598 << " cfc=" << cfc << endl;
599
600 for(uint_4 fib=0; fib<memgr_.NbFibres(); fib++) {
601 for(uint_4 ntryrd=0; ntryrd<sfc_maxdpc_; ntryrd++) { // on tente un maximum de
602 if (curfc_[fib]>=cfc) break;
603 if (ReadNext(fib)) return true; // probleme
604 }
605 }
606 nbechec++;
607 } // fin de else !fgsame
608 } // fin de while(echec): lecture jusqu'a same_frame_counter
609
610 return true; // probleme
611}
612
613/* --Methode-- */
614bool EthernetReader::ReadNext(int fib)
615{
616 bool fggood=false;
617 while(!fggood) {
618 ReceiveFromSocket(fib, (char *)vpaq_[fib].Begin(), packsize_);
619 if (vec_fgsokend_[fib]) { // fin de reception sur le lien
620 if (prtlev_>0)
621 cout << ">S>S>S> EthernetReader::ReadNext(" << fib << ")/ Stopping, reason EndReceive fgsokend_="
622 << vec_fgsokend_[fib] << endl;
623 stopreason_="ReadNext() StopReason: EndReceive signaled on link";
624 return true;
625 }
626 totnbytesrd_+=packsize_;
627 totnpqrd_[fib]++;
628 fggood = vpchk_[fib].Check(vpaq_[fib],curfc_[fib]);
629 }
630 totnpqok_[fib]++;
631 return false;
632}
633
634/* --Methode-- */
635void EthernetReader::CleanUpAllSockets()
636{
637 bool fgallfinished=false;
638 while (!fgallfinished) {
639 fgallfinished=true;
640 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
641 if (vec_fgsokend_[fib]>0) continue;
642 ReceiveFromSocket(fib,dummybuff_,packsize_);
643 fgallfinished=false;
644 }
645 }
646 return;
647}
648
649/* --Methode-- */
650size_t EthernetReader::ReceiveFromSocket(int fib, char* data, size_t len)
651{
652 size_t rc =0;
653 if (vec_fgsokend_[fib]) { // la lecture est finie
654 memset(data, 0, len);
655 return 0;
656 }
657
658 if (vec_cntpaq_[fib]%memgr_.NbPaquets() == 0) {
659 char msg[BRTCPMSGLEN2];
660 msg[0]='\0';
661 vsok_[fib].ReceiveAll(msg, BRTCPMSGLEN2);
662 // if (strncmp(msg,"PCIEToEthernet-SendCnt",22) != 0)
663 // cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake message : " << msg << endl;
664 bool fgbadhsm=true;
665 if (strncmp(msg,"PCIEToEthernet-Send",19)==0) {
666 if (strncmp(msg+19,"Cnt",3)==0) fgbadhsm=false;
667 else if (strncmp(msg+19,"-END",4)==0) {
668 fgbadhsm=false;
669 vec_fgsokend_[fib]=1; gl_fgsokend=true;
670 //DBG cout << "ReceiveFromSocket/DEBUG -END msg received " << endl;
671 if (prtlev_>2)
672 cout << " EthernetReader::ReceiveFromSocket/ Receive on LinkNo" << fib << " Ended OK" << endl;
673 }
674 }
675 if (fgbadhsm) {
676 cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake, LinkNo " << fib << " msg: " << msg << endl;
677 vec_fgsokend_[fib]=2;
678 }
679 }
680
681 if (vec_fgsokend_[fib]>0) { // la lecture est finie
682 memset(data, 0, len);
683 return 0;
684 }
685
686 rc += vsok_[fib].ReceiveAll(data, len);
687 vec_cntpaq_[fib]++;
688
689#ifdef BR_DO_HANDSHAKE
690 // Envoi de message de hand-shake
691 if (vec_cntpaq_[fib]%memgr_.NbPaquets() == 0) {
692 char msg[BRTCPMSGLEN2];
693 sprintf(msg,"EthernetReader-RecvCnt %d",vec_cntpaq_[fib]);
694 vsok_[fib].SendAll(msg, BRTCPMSGLEN2);
695 //DBG cout << " ReceiveFromSocket/DEBUG-B msg=" << msg << endl;
696 }
697#endif
698 return rc;
699}
700
701/* --Methode-- */
702int EthernetReader::SkipAndSyncLinks()
703{
704 uint_8 minnumpaq=vec_cntpaq_[0]; // minimum des nombres de paquets lus sur les differents liens
705 size_t minnp_fid=0; // numero de fibre correspondant au minimum de paquets lus
706 uint_8 maxnumpaq=vec_cntpaq_[0]; // maximum des nombres de paquets lus sur les differents liens
707 size_t maxnp_fid=0; // numero de fibre correspondant au maximum de paquets lus
708 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
709 if (vec_cntpaq_[fib]<=minnumpaq) {
710 minnumpaq=vec_cntpaq_[fib]; minnp_fid=fib;
711 }
712 if (vec_cntpaq_[fib]>=maxnumpaq) {
713 maxnumpaq=vec_cntpaq_[fib]; maxnp_fid=fib;
714 }
715 }
716 if (!rdsamefc_ && (minnumpaq!=maxnumpaq))
717 cout << " EthernetReader::SkipAndSyncLinks()/BUG or PROBLEM - ReadAllOK and min<>maxnumpaq="
718 << minnumpaq << "," << maxnumpaq << " !!!" << endl;
719 if ((maxnumpaq-minnumpaq)<sfc_maxdpc_) return 0;
720
721 if (prtlev_>4)
722 cout << " EthernetReader::SkipAndSyncLinks() min,maxnumpaq=" << minnumpaq << "," << maxnumpaq
723 << " min,maxnp_fid=" << minnp_fid << "," << maxnp_fid << " sfc_maxdpc_=" << sfc_maxdpc_ << endl;
724
725 for(size_t fib=0; fib<memgr_.NbFibres(); fib++) {
726 while(vec_cntpaq_[fib]<maxnumpaq) {
727 if (vec_fgsokend_[fib]>0) return 9;
728 ReceiveFromSocket(fib,dummybuff_,packsize_);
729 }
730 }
731 totnbresync_++;
732 if (prtlev_>3) {
733 cout << " EthernetReader::SkipAndSyncLinks() NbResync=" << totnbresync_ << " After Sync: vec_cntpaq_[fib]=";
734 for(size_t ii=0; ii<vec_cntpaq_.size(); ii++) cout << vec_cntpaq_[ii] << " , ";
735 cout << endl;
736 }
737
738 return 1;
739}
740
741/* --Methode-- */
742bool EthernetReader::MZoneManage(bool fgclean) // Retourne true si probleme
743{
744 /* Pour debug
745 cout << " EthernetReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_
746 << " max_targ_npaq=" << max_targ_npaq << endl;
747 */
748 if (mid_ >= 0) {
749 if (fgclean) memgr_.FreeMemZone(mid_, MemZS_Free);
750 else memgr_.FreeMemZone(mid_, MemZS_Filled);
751 }
752 mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2;
753 for (int fib=0;fib<(int)memgr_.NbFibres() ;fib++) mmbufib_[fib]=NULL;
754 if (fgclean) return false;
755 mid_ = memgr_.FindMemZoneId(MemZA_Fill);
756 mmbuf_ = memgr_.GetMemZone(mid_);
757 if (mmbuf_==NULL) return true;
758 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++)
759 mmbufib_[fib]=memgr_.GetMemZone(mid_,fib);
760 return false;
761}
Note: See TracBrowser for help on using the repository browser.