source: Sophya/trunk/AddOn/TAcq/racqueth.cc@ 3780

Last change on this file since 3780 was 3764, checked in by ansari, 15 years ago

Suppression du parametre taille des blocs pour transfert ethernet ds le code pour les classes PCIEToEthernet et EthernetReader, Reza 03/05/2010

File size: 21.0 KB
Line 
1//----------------------------------------------------------------
2// ---- classes de threads pour lecture (transfert DMA)
3// et ecriture disque pour acquisition BAORadio -----
4// LAL - R. Ansari - Juin/Juillet 2008
5//----------------------------------------------------------------
6
7#include "racqueth.h"
8
9#include <stdlib.h>
10#include <unistd.h>
11#include <fstream>
12#include <signal.h>
13#include "pexceptions.h"
14#include "ctimer.h"
15#include "timestamp.h"
16
17#include "pciewrap.h"
18#include "brpaqu.h"
19
20#include "resusage.h" // Pour mesure temps elapsed/CPU ...
21#include <sys/time.h> // pour gettimeofday
22
23////////////////////////////////////////////////////////////////////////////////////////////////////////
24//----------------------------------------------------------------------------------------------------------
25// Classe thread de lecture PCI-Express et recopie sur interface reseau (Ethernet)
26//----------------------------------------------------------------------------------------------------------
27
28
29/* --Methode-- */
30PCIEToEthernet::PCIEToEthernet(vector<PCIEWrapperInterface*> vec_pciw, vector<string>& destname, BRParList const& par, int portid)
31 : par_(par), vec_pciw_ (vec_pciw), destname_(destname), tcpportid_(portid)
32{
33 nmaxpaq_ = par_.MaxNbPaquets();
34 swapall_ = par_.GetDataConvFg(); // select data swap/format conversion for BRPaquet
35 stop_ = false;
36 packSize_ = par_.RecvPaquetSize();
37 packSizeInMgr_=par_.MMgrPaquetSize();
38 sizeFr_=par_.DMASizeBytes();
39 if (vec_pciw.size() > MAXNBFIB)
40 throw ParmError("PCIEToEthernet:ERROR/ vec_pciw.size() > MAXNBFIB ");
41 nbDma_= vec_pciw.size();
42
43 // true -> direct transfer of data to ethernet
44 fgdirectsend_ = (par_.pci2eth_fgdirect && (swapall_==BR_Copy) ) ? true : false;
45 char msg[BRTCPMSGLEN];
46 cout << "PCIEToEthernet/Info: Establishing TCP/IP connections ... " << endl;
47 for(size_t i=0; i<vec_pciw_.size(); i++) {
48 vector<ClientSocket> vskt;
49 for(size_t j=0; j<destname_.size(); j++) {
50 ClientSocket sok(destname_[j], tcpportid_);
51 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
52 uint_4 dmasz = (fgdirectsend_) ? vec_pciw_[0]->TransferSize() : 0;
53 sprintf(msg,"BAORadio-PCIEToEthernet %d %d %d %d %d", par_.MMgrNbPaquet(), par_.MMgrPaquetSize(), dmasz, 0,i);
54 // 123456789012345678901234567890
55 sok.SendAll(msg,BRTCPMSGLEN);
56 sok.ReceiveAll(msg,BRTCPMSGLEN);
57 if (strncmp(msg,"BAORadio-EthernetReader-OK",26)!=0)
58 throw SocketException("PCIEToEthernet:ERROR/ Connection to EthernetReader not established ");
59 cout << " PCIEToEthernet: Ethernet connection established for DMA/fiber" << i << " with " << destname_[j] << endl;
60 vskt.push_back(sok);
61 }
62 vvec_skt_.push_back(vskt);
63 vec_cntpaq_.push_back(0);
64 }
65 totrdsnd_ = 0;
66 SetPrintLevel();
67}
68
69/* --Methode-- */
70PCIEToEthernet::~PCIEToEthernet()
71{
72 for(size_t i=0; i<vec_pciw_.size(); i++) {
73 vector<ClientSocket>& vskt = vvec_skt_[i];
74 for(size_t j=0; j<destname_.size(); j++) vskt[j].Close();
75 }
76}
77
78/* --Methode-- */
79void PCIEToEthernet::run()
80{
81
82 struct timeval tv1,tv2;
83 gettimeofday(&tv1, NULL);
84
85 cout << " PCIEToEthernet::run() - Starting , NMaxMemZones=" << par_.MaxNbBlocs()
86 << " MMgrNbPaquet=" << par_.MMgrNbPaquet() << " MMgr.PaqSiz=" << par_.MMgrPaquetSize()
87 << " DMA-Paqsize " << packSize_ << " " << BRPaquet::FmtConvToString(swapall_) << endl;
88 setRC(1);
89
90 // sigaddset(&act.sa_mask,SIGINT); // pour proteger le transfert DMA
91 //sigaction(SIGINT,&act,NULL);
92 uint_4 paqszmm = par_.MMgrPaquetSize();
93 uint_4 paqsz = packSize_;
94 uint_4 dmasz = vec_pciw_[0]->TransferSize();
95 //DEL vec_pciw_[0]->StartTransfers();
96
97 BRPaqChecker pcheck[MAXNBFIB]; // Verification/comptage des paquets
98 Byte* Datas[MAXNBFIB];
99 Byte* tampon[MAXNBFIB] ;
100 Byte* nexpaq[MAXNBFIB] ; // Pour recevevoir les paquets apres reduction de taille
101
102 Byte* predtampon=NULL; // tampon de recopie pour la reduction des tailles de paquets
103 Byte* nextpaq=NULL;
104 uint_4 off_acheval=0;
105
106 int nerrdma = 0;
107 int maxerrdma = 10;
108 bool fgarret = false;
109
110 // Initialisation des tampons pour recopie des paquets a cheval pour chaque DMA
111 for (int i=0;i< (int)nbDma_ ;i++) {
112 // cout << " DEBUG*AB paqsz=" << paqsz << " paqszmm=" << paqszmm << endl;
113 tampon[i]=nexpaq[i]=NULL;
114 tampon[i] = new Byte[paqsz];
115 nexpaq[i] = new Byte[paqszmm];
116 }
117
118 if (fgdirectsend_)
119 cout << " PCIEToEthernet::run() Direct transfer mode DMA to Ethernet ... " << endl;
120 bool fgredpaq=par_.fgreducpsize;
121 if (fgredpaq) {
122 cout << " PCIEToEthernet::run() - PaquetSizeReduction - RedSize=" << par_.redpqsize
123 << " Offset=" << par_.reducoffset << " " << ((par_.reducneedcopy)?"NeedCopy":"NOCopy")
124 << " " << BRPaquet::ReducActionToString(par_.pqreducmode) << endl;
125 predtampon = new Byte[paqsz];
126 }
127
128
129 uint_8 trnbytes[MAXNBFIB];
130 uint_4 npaqfait[MAXNBFIB];
131 for (int i=0;i< (int)nbDma_ ;i++) {
132 npaqfait[i]=0; trnbytes[i]=0;
133 }
134 // Attente du message GO des taches lecteurs
135 cout << " PCIEToEthernet::run() Waiting for GO message from EthernetReader's ..." << endl;
136 char msg[BRTCPMSGLEN];
137 for(size_t i=0; i<vec_pciw_.size(); i++) {
138 vector<ClientSocket>& vskt=vvec_skt_[i];
139 for(size_t j=0; j<vskt.size(); j++) {
140 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
141 vskt[j].ReceiveAll(msg,BRTCPMSGLEN);
142 if (strncmp(msg,"BAORadio-EthernetReader-Ready-GO",32)!=0) {
143 msg[BRTCPMSGLEN-1]='\0';
144 cout << "PCIEToEthernet:ERROR BAD Go for fiber=" << i << " dest=" << j << " msg: " << msg << endl;
145 throw SocketException("PCIEToEthernet:ERROR/ BAD Go message from EthernetReader ");
146 }
147 cout << " PCIEToEthernet::run() Received GO message for Fiber/DMA " << i << " from " << destname_[j] << endl;
148 }
149 }
150
151 cout << " PCIEToEthernet::run() Starting DMA to Ethernet transfers - NbFibers=NbDMA=" << vec_pciw_.size()
152 << " x NbEthDestinations=" << destname_.size() << endl;
153
154 // Byte* nextdma = locdata+((kmz%memgr.NbZones())*(paqsz*memgr.NbPaquets()));
155 uint_4 npaqfaitg = 0;
156 uint_4 prtmod = par_.BlocPerFile()*par_.MMgrNbPaquet();
157 if (prtmod < 500) prtmod=500;
158
159 while (npaqfaitg < nmaxpaq_) { // Boucle global G
160 if (fgarret) break;
161 if (stop_) break;
162
163 // Lancement des DMA
164 for (int dma=0; dma < (int)nbDma_ ;dma++) vec_pciw_[dma]->StartTransfers();
165
166 // On pointe vers le debut de la zone a remplir aver le prochain DMA
167 //-- Zone memoire locale Byte* nextdma = buff+i*paqsz;
168
169 bool fgbaddma=false;
170 // On boucle sur les nbDma_ en attente de leurs terminaison
171 for (int dma=0; dma <(int) nbDma_ ;dma++) {
172 Datas[dma]=vec_pciw_[dma]->GetData();
173 if (Datas[dma] == NULL) { // No data Read in DMA
174 nerrdma ++; fgbaddma=true;
175 cout << "PCIEToEthernetChecker/Erreur Waiting for datas ..." << endl;
176 vec_pciw_[dma]->PrintStatus(cout);
177 if (nerrdma>=maxerrdma) { fgarret = true; break; }
178 }
179 }
180 if (fgbaddma) continue;
181 if ((prtlev_>0)&&(npaqfaitg%prtmod==0))
182 cout << " PCIEToEthernet::run()/Info NPaqFait= " << npaqfaitg << endl;
183 if (fgdirectsend_) { // Pas de copie / reduction de taille de paquet, on rebalance tel quel ...
184 for (int fib=0; fib<(int) nbDma_ ;fib++) {
185 //DBG cout << " DEBUG*B fib=" << fib << " Datas[fib]=" << hex << Datas[fib] << dec << endl;
186 SendToTargets(fib, Datas[fib], dmasz);
187 trnbytes[fib] += (uint_8)dmasz;
188 npaqfait[fib] += (trnbytes[fib]/(uint_8)paqszmm);
189 if (fib==nbDma_-1) npaqfaitg += (trnbytes[fib]/(uint_8)paqszmm);
190 trnbytes[fib] = trnbytes[fib]%(uint_8)paqszmm;
191 }
192 continue; // On bypasse le reste
193 }
194
195 // Si on arrive ici, c'est qu'il faut copier / reduire les paquets ....
196 uint_4 curoff=0;
197 //1- On traite le paquet a cheval, rempli partiellement avec le DMA d'avant si necessaire pour les n fibres
198 if (off_acheval > 0) { // IF Numero B
199 if ((paqsz-off_acheval)< dmasz) { // IF Numero A
200 for(uint_4 fib=0; fib<nbDma_; fib++)
201 memcpy((void *)((tampon[fib])+off_acheval), (void *)Datas[fib], paqsz-off_acheval);
202 curoff = paqsz-off_acheval; off_acheval = 0;
203 for(uint_4 fib=0; fib<nbDma_; fib++) {
204 // CHECK S'il faut faire une reduction de taille de paquet
205 if (fgredpaq) { // reduction taille de paquet
206 if (par_.reducneedcopy) {
207 BRPaquet paqc1(tampon[fib], predtampon, paqsz, swapall_);
208 BRPaquet paqc2(nexpaq[fib], par_.redpqsize);
209 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
210 }
211 else {
212 BRPaquet paqc1(tampon[fib], paqsz);
213 BRPaquet paqc2(nexpaq[fib], par_.redpqsize);
214 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
215 }
216 }
217 else {
218 BRPaquet paqc(tampon[fib], nexpaq[fib], paqsz, swapall_);
219 }
220 BRPaquet paq(nexpaq[fib], packSizeInMgr_);
221 SendToTargets(fib, nexpaq[fib], packSizeInMgr_);
222 npaqfait[fib]++;
223 if (fib==nbDma_-1) npaqfaitg++; // Ne pas oublier le compteur de paquets faits
224 pcheck[fib].Check(paq); // Verification du paquet / FrameCounter
225 }
226 }
227 else { // se rapporte au IF numero A
228 for(uint_4 fib=0; fib<nbDma_; fib++)
229 memcpy((void *)(tampon[fib]+off_acheval), (void *)Datas[fib], dmasz);
230 curoff =dmasz;
231 off_acheval = (dmasz+off_acheval);
232 }
233 } // Fin IF Numero B
234
235 //2- On traite les paquets complets qui se trouvent dans la zone du DMA
236 while ((curoff+paqsz)<=dmasz) { // while numero C
237 // if ((dma==nbDma_-1)&&(npaqfait >= nmax_* memgr.NbPaquets())) break;
238 // CHECK S'il faut faire une reduction de taille de paquet
239 for(uint_4 fib=0; fib<nbDma_; fib++) {
240 if (fgredpaq) { // reduction taille de paquet
241 if (par_.reducneedcopy) {
242 BRPaquet paqc1(Datas[fib]+curoff, predtampon, paqsz, swapall_);
243 BRPaquet paqc2(nexpaq[fib], par_.redpqsize);
244 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
245 }
246 else {
247 BRPaquet paqc1(Datas[fib]+curoff, paqsz);
248 BRPaquet paqc2(nexpaq[fib], par_.redpqsize);
249 paqc2.CopyFrom(paqc1, par_.pqreducmode, par_.reducoffset);
250 }
251 }
252 else {
253 BRPaquet paqc(Datas[fib]+curoff, nexpaq[fib], paqsz, swapall_);
254 }
255 BRPaquet paq(nexpaq[fib], packSizeInMgr_);
256 SendToTargets(fib, nexpaq[fib], packSizeInMgr_);
257 npaqfait[fib]++;
258 if (fib==nbDma_-1) npaqfaitg++; // Ne pas oublier le compteur de paquets faits
259 pcheck[fib].Check(paq); // Verification du paquet / FrameCounter
260 }
261 curoff += paqsz; // On avance l'index dans le buffer du DMA
262 } // -- FIN traitement des paquets complets ds un DMA - FIN du while numero C
263 //3- On copie si besoin la fin du DMA dans la zone tampon
264 if (curoff < dmasz) { // IF numero D
265 off_acheval = dmasz-curoff;
266 for(uint_4 fib=0; fib<nbDma_; fib++)
267 memcpy(tampon[fib], (void*)(Datas[fib]+curoff), off_acheval);
268 // ne sert a rien curoff += off_acheval;
269 } // FIN du if numero D
270 } // FIN Boucle global G
271
272 setRC(0);
273 gettimeofday(&tv2, NULL);
274 double tmelaps2 = (tv2.tv_sec-tv1.tv_sec)*1000.+(tv2.tv_usec-tv1.tv_usec)/1000.;
275 if (tmelaps2<0.1) tmelaps2=0.1;
276 cout << " ---------- PCIEToEthernet::run()-End summary NPaqFait=" << npaqfaitg
277 << " TotSend (kb)=" << totrdsnd_/1024 << "------ " << endl;
278 for (int dma=0; dma < (int)nbDma_ ;dma++) {
279 cout << " --Fib=" << dma << " NPaqFait=" << npaqfait[dma] << " TotDMATransfer="
280 << vec_pciw_[dma]->TotTransferBytes()/1024
281 << " ElapsTime=" << tmelaps2 << " ms ->"
282 << (double)vec_pciw_[dma]->TotTransferBytes()/tmelaps2 << " kb/s" << endl;
283
284 vector<ClientSocket>& vskt = vvec_skt_[dma];
285 cout << " TotEthTransfer=";
286 for(size_t j=0; j<vskt.size(); j++)
287 cout << vskt[j].NBytesSent()/1024 << " , ";
288 cout << " kb" << endl;
289
290 if (!fgdirectsend_) pcheck[dma].Print(cout);
291 }
292 cout << " --------------------------------------------------------------------" << endl;
293
294 for (int i=0;i< (int)nbDma_ ;i++) {
295 delete[] tampon[i];
296 delete[] nexpaq[i];
297 }
298 if ((fgredpaq)&&predtampon) delete[] predtampon;
299
300 //DBG cout << " fin thread ========================" <<endl;
301 return;
302}
303
304
305/* --Methode-- */
306void PCIEToEthernet::Stop()
307{
308 // cout << " PCIEReaderChecker::stop() ........ STOP" <<endl;
309 stop_ = true;
310
311}
312
313static long cnt_prt=0;
314/* --Methode-- */
315size_t PCIEToEthernet::SendToTargets(int fib, Byte* data, size_t len)
316{
317 //DBG cout << " SendToTargets/DBG" << cnt_prt << " len=" << len << " data=" << hex << (unsigned long)data << dec << endl; cnt_prt++;
318 vector<ClientSocket>& vskt = vvec_skt_[fib];
319 size_t rc=0;
320 if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
321 char msg[BRTCPMSGLEN2];
322 sprintf(msg,"PCIEToEthernet-SendCnt %d",vec_cntpaq_[fib]);
323 for(size_t j=0; j<vskt.size(); j++) {
324 vskt[j].SendAll(msg, BRTCPMSGLEN2);
325 }
326 if (vec_cntpaq_[fib]>0) {
327 for(size_t j=0; j<vskt.size(); j++) {
328 vskt[j].ReceiveAll(msg, BRTCPMSGLEN2);
329 msg[BRTCPMSGLEN2-1]='\0';
330 if (strncmp(msg,"EthernetReader-RecvCnt",22) != 0)
331 cout << " PCIEToEthernet::SendToTargets/ BAD HandShake message : " << msg << endl;
332 }
333 }
334 }
335 for(size_t j=0; j<vskt.size(); j++) {
336 rc += vskt[j].SendAll((const char *)data, len);
337 totrdsnd_ += len;
338 }
339 vec_cntpaq_[fib]++;
340 return rc;
341}
342//------------------------------------------------------------------
343// Classe thread de lecture sur interface reseau (Ethernet)
344//------------------------------------------------------------------
345
346/* --Methode-- */
347EthernetReader::EthernetReader(RAcqMemZoneMgr& mem, BRParList const& par, int portid, bool rdsamefc)
348 : memgr_(mem), par_(par), stop_(false), rdsamefc_(rdsamefc), tcpportid_(portid)
349{
350 totnbytesrd_ = 0;
351 totnpaqrd_ = 0;
352 totsamefc_ = 0;
353 if (memgr_.NbFibres() > MAXANAFIB)
354 throw BAORadioException("EthernetReader::EthernetReader/ NbFibres>MAXANAFIB ");
355 if (par_.ethr_nlink != memgr_.NbFibres())
356 throw BAORadioException("EthernetReader::EthernetReader/ NbFibres != ethr_nlink");
357
358 packsize_=memgr_.PaqSize();
359 mid_=-2;
360 mmbuf_=NULL;
361 max_targ_npaq = memgr_.NbPaquets();
362 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) mmbufib_[fib]=NULL;
363
364 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
365 vpaq_.push_back(BRPaquet(NULL,memgr_.PaqSize()));
366 vpchk_.push_back(BRPaqChecker(true,0));
367 curfc_.push_back(0);
368 totnpqrd_.push_back(0);
369 totnpqok_.push_back(0);
370 }
371 ServerSocket srv(tcpportid_, par_.ethr_nlink);
372 char msg[BRTCPMSGLEN];
373 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
374 Socket sok=srv.WaitClientConnection();
375 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
376 sok.ReceiveAll(msg,BRTCPMSGLEN);
377 if (strncmp(msg,"BAORadio-PCIEToEthernet",23)!=0)
378 throw SocketException("EthernetReader:ERROR/ Bad message from PCIEToEthernet client !");
379 int ia,ib,ic,id,ie;
380 sscanf(msg+25,"%d %d %d %d %d",&ia,&ib,&ic,&id,&ie);
381 if (((uint_4)ia!=par_.MMgrNbPaquet()) || ((uint_4)ib!=par_.MMgrPaquetSize())) {
382 strcpy(msg,"BAORadio-EthernetReader-BAD MMgrNbPaquet/MMgrPaquetSize()");
383 sok.SendAll(msg,BRTCPMSGLEN);
384 throw SocketException("EthernetReader:ERROR/ Bad MMgrNbPaquet/MMgrPaquetSize() from PCIEToEthernet client !");
385 }
386 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
387 strcpy(msg,"BAORadio-EthernetReader-OK");
388 sok.SendAll(msg,BRTCPMSGLEN);
389 vsok_.push_back(sok);
390 cout << " EthernetReader/Info connection/link" << fib << " established." << endl;
391 vec_cntpaq_.push_back(0);
392 }
393
394 SetPrintLevel();
395}
396
397
398/* --Methode-- */
399void EthernetReader::run()
400{
401 setRC(1);
402 try {
403 TimeStamp ts;
404 Timer tm("EthernetReader", false);
405 cout << " EthernetReader::run() - Starting " << ts << " NbFibres()=" << memgr_.NbFibres()
406 << " PaqSize() = " << memgr_.PaqSize() << endl;
407 cout << " ...ReadMode: " << ((rdsamefc_)?"Paquets With SameFrameCounter":"All OK paquets") << endl;
408
409 cout << " Sending GO message to all PCIEToEthernet clients ... " << endl;
410 char msg[BRTCPMSGLEN];
411 for(int ii=0; ii<BRTCPMSGLEN; ii++) msg[ii]='\0';
412 strcpy(msg,"BAORadio-EthernetReader-Ready-GO");
413 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
414 cout << " EthernetReader/Debug - Sending GO message to sender " << fib << endl;
415 vsok_[fib].SendAll(msg,BRTCPMSGLEN);
416 }
417 Byte* nextpaq=NULL;
418 bool fgok=true;
419 while (fgok&&(totnpaqrd_<par_.MaxNbPaquets())) {
420 if (stop_) break;
421 if ( MoveToNextTarget() ) {
422 cout << "EthernetReader::run()/Error-A- MoveToNextTarget() returned true ->STOP 9" << endl;
423 setRC(7); fgok=false; break;
424 }
425 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
426 nextpaq=GetPaquetTarget(fib);
427 if (nextpaq == NULL) { // Cela ne devrait pas arriver
428 cout << "EthernetReader::run()/Error-A2- GetPaquetTarget(fib) returned NULL ->STOP 9" << endl;
429 setRC(9); fgok=false; break;
430 }
431 vpaq_[fib].Set(nextpaq);
432 }
433 if (ReadNextAllFibers()) { fgok=false; break; }
434 totnpaqrd_++;
435 }
436
437 MoveToNextTarget(); // Pour faire traiter le dernier paquet si plein
438 MZoneManage(true); // Nettoyage final
439 usleep(50000); // Attente de traitement du dernier paquet
440 memgr_.Stop(); // Arret
441
442 cout << " ------------------ EthernetReader::run() END ----------------- " << endl;
443 ts.SetNow();
444 tm.SplitQ();
445 cout << " END reading TotNPaq/Link=" << totnpaqrd_ << " :" << ts ;
446 if (rdsamefc_) cout << " NSameFC=" << totsamefc_ << endl;
447 else cout << endl;
448 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
449 int perc=0;
450 if (totnpqrd_[fib]>0) perc=100*totsamefc_/totnpqrd_[fib];
451 cout << " Fiber" << fib << " TotNPaqRd=" << totnpqrd_[fib] << " TotNPaqOK=" << totnpqok_[fib]
452 << " FracSameFC=" << perc << " %" << endl;
453 if (prtlev_ > 0) vpchk_[fib].Print(cout);
454 }
455 cout << " TotalEthernetRead= " << totnbytesrd_/(1024*1024) << " MBytes Ethernet-Read rate= "
456 << (double)(totnbytesrd_)/1024./tm.PartialElapsedTimems() << " MB/s" << endl;
457 cout << " EthernetReader::run()/Timing: \n";
458 tm.Print();
459 cout << " ---------------------------------------------------------- " << endl;
460
461 } // Fin du bloc try
462 catch (std::exception& exc) {
463 cout << " EthernetReader::run()/catched execption msg= " << exc.what() << endl;
464 setRC(3);
465 return;
466 }
467 catch(...) {
468 cout << " EthernetReader::run()/catched unknown ... exception " << endl;
469 setRC(4);
470 return;
471 }
472 setRC(0);
473 return;
474}
475
476/* --Methode-- */
477bool EthernetReader::ReadNextAllFibers()
478{
479 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++) {
480 if (ReadNext(fib)) return true; // probleme
481 }
482 if (!rdsamefc_ || (memgr_.NbFibres()<2)) {
483 totsamefc_++; return false; // c'est OK
484 }
485 uint_8 cfc=curfc_[0];
486 bool fgsamefc=true;
487 for(size_t fib=1; fib<memgr_.NbFibres(); fib++) {
488 if (curfc_[fib]!=cfc) {
489 fgsamefc=false;
490 if (curfc_[fib] > cfc) cfc=curfc_[fib];
491 }
492 }
493 if (fgsamefc) {
494 totsamefc_++; return false; // c'est OK , same framecounter
495 }
496 else { // else !fgsame
497 for(uint_4 fib=0; fib<memgr_.NbFibres(); fib++) {
498 while (curfc_[fib]<cfc) {
499 if (ReadNext(fib)) return true; // probleme
500 }
501 }
502 } // fin de else !fgsame
503 totsamefc_++;
504 return false; // c'est OK
505}
506
507/* --Methode-- */
508bool EthernetReader::ReadNext(int fib)
509{
510 bool fggood=false;
511 while(!fggood) {
512 ReceiveFromSocket(fib, (char *)vpaq_[fib].Begin(), packsize_);
513 totnbytesrd_+=packsize_;
514 totnpqrd_[fib]++;
515 fggood = vpchk_[fib].Check(vpaq_[fib],curfc_[fib]);
516 }
517 totnpqok_[fib]++;
518 return false;
519}
520
521/* --Methode-- */
522size_t EthernetReader::ReceiveFromSocket(int fib, char* data, size_t len)
523{
524 size_t rc =0;
525 if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
526 char msg[BRTCPMSGLEN2];
527 msg[0]='\0';
528 vsok_[fib].ReceiveAll(msg, BRTCPMSGLEN2);
529 if (strncmp(msg,"PCIEToEthernet-SendCnt",22) != 0)
530 cout << " EthernetReader::ReceiveFromSocket/ BAD HandShake message : " << msg << endl;
531 }
532 rc += vsok_[fib].ReceiveAll(data, len);
533 vec_cntpaq_[fib]++;
534
535 /*
536 size_t nblk = len/ethr_bsz_;
537 size_t fblk = len%ethr_bsz_;
538 size_t off = 0;
539 if (nblk>0) {
540 for(size_t i=0; i<nblk; i++) {
541 rc += vsok_[fib].ReceiveAll(data+off, ethr_bsz_);
542 off += ethr_bsz_;
543 }
544 }
545 if (fblk>0) {
546 rc += vsok_[fib].ReceiveAll(data+off, fblk);
547 }
548 vec_cntpaq_[fib]++;
549 */
550
551 if (vec_cntpaq_[fib]%par_.MMgrNbPaquet() == 0) {
552 char msg[BRTCPMSGLEN2];
553 sprintf(msg,"EthernetReader-RecvCnt %d",vec_cntpaq_[fib]);
554 vsok_[fib].SendAll(msg, BRTCPMSGLEN2);
555 }
556 return rc;
557}
558
559/* --Methode-- */
560bool EthernetReader::MZoneManage(bool fgclean) // Retourne true si probleme
561{
562 /* Pour debug
563 cout << " EthernetReader::MZoneManage() mid_=" << mid_ << " arg_npaq_= " << targ_npaq_
564 << " max_targ_npaq=" << max_targ_npaq << endl;
565 */
566 if (mid_ >= 0) {
567 if (fgclean) memgr_.FreeMemZone(mid_, MemZS_Free);
568 else memgr_.FreeMemZone(mid_, MemZS_Filled);
569 }
570 mmbuf_ = NULL; targ_npaq_ = 0; mid_ = -2;
571 for (int fib=0;fib<(int)memgr_.NbFibres() ;fib++) mmbufib_[fib]=NULL;
572 if (fgclean) return false;
573 mid_ = memgr_.FindMemZoneId(MemZA_Fill);
574 mmbuf_ = memgr_.GetMemZone(mid_);
575 if (mmbuf_==NULL) return true;
576 for(size_t fib=0; fib<(size_t)memgr_.NbFibres(); fib++)
577 mmbufib_[fib]=memgr_.GetMemZone(mid_,fib);
578 return false;
579}
Note: See TracBrowser for help on using the repository browser.