source: Sophya/trunk/AddOn/TAcq/sockrawstream.cc@ 3593

Last change on this file since 3593 was 3544, checked in by ansari, 17 years ago

debug classes RawInOutSocketStream et ajout d'envoi/reception de messages de header/trailer - Reza 24/10/2008

File size: 6.4 KB
Line 
1#include "sockrawstream.h"
2#include "sopnamsp.h"
3#include "pexceptions.h"
4
5/*!
6 \class SOPHYA::RawInOutSocketStream
7 \ingroup SysTools
8 This class implements the interface defined by RawInOutStream
9 over a socket (read and write operations).
10 It is mainly intended to be used by PPF In/Out streams.
11*/
12
13// Taille de base pour les messages echanges = Longueur header = long. Trailer
14// Longueur de trailer pour marquer la fin
15#define RIOS_BBSIZE 64
16
17/* --Methode-- */
18RawInOutSocketStream::RawInOutSocketStream(Socket &skt, size_t npaqbuff)
19 : RawInOutStream() , _skt(skt)
20{
21
22// ---- preparation de la chaine d'identification / taille du buffer
23 const char * hdrmarq = "SIPSS-RawInOutSocketStream-Header";
24// 0123456789012345678901234567890123456789
25 char msg[RIOS_BBSIZE];
26 for(size_t k=30; k<RIOS_BBSIZE; k++) msg[k]='\0';
27 strcpy(msg, hdrmarq);
28 if (npaqbuff < 1) npaqbuff = 1;
29 size_t sbsz = npaqbuff*RIOS_BBSIZE;
30 sprintf(msg+48,"%ld",(long)sbsz);
31// ----- Envoi du marqueur de debut et negociation de taille du buffer
32 Send(msg, (size_t)RIOS_BBSIZE);
33 Receive(msg, (size_t)RIOS_BBSIZE);
34 if (strcmp(msg, hdrmarq)!=0)
35 throw SocketException("RawInOutSocketStream::RawInOutSocketStream()- ExchangeMsg check failed");
36
37 size_t rbsz = atol(msg+48);
38 size_t bufsize = (sbsz >= rbsz) ? sbsz : rbsz;
39
40//DBG cout << "RIOSS::RawInOutSocketStream()*DBG* sbsz=" << sbsz << " rbsz=" << rbsz << " -->bufsize=" << bufsize << endl;
41
42 _rdbuff.buff = new char[bufsize];
43 _rdbuff.sz = bufsize;
44 _rdbuff.cpos = bufsize;
45
46 _wrbuff.buff = new char[bufsize];
47 _wrbuff.sz = bufsize;
48 _wrbuff.cpos = 0;
49
50}
51
52/* --Methode-- */
53RawInOutSocketStream::~RawInOutSocketStream()
54{
55// cout << " ---- DESTRUCTEUR ---- ~RawInOutSocketStream() " << endl;
56 //----- ecriture/envoi du buffer si pas vide
57 if (_wrbuff.cpos > 0) {
58 for (size_t k= _wrbuff.cpos; k<_wrbuff.sz; k++) _wrbuff.buff[k] = '\0';
59 _wrbuff.cpos = _wrbuff.sz;
60 SendBuffer();
61 }
62 // Envoi du marquer de fin (trailer)
63 const char * trlmarq = "TRAILER-SIPSS-RawInOutSocketStream-Trailer";
64// 0123456789012345678901234567890123456789
65 char msg[RIOS_BBSIZE];
66 for(size_t k=30; k<RIOS_BBSIZE; k++) msg[k]='\0';
67 strcpy(msg, trlmarq);
68 Send(msg, (size_t)RIOS_BBSIZE);
69
70 bool oktrailer=false;
71 size_t nrdt = 0;
72 size_t maxtry = 16*_rdbuff.sz / RIOS_BBSIZE;
73 while(!oktrailer && (nrdt<maxtry)) {
74 Receive(msg, (size_t)RIOS_BBSIZE);
75 nrdt++;
76 if (strcmp(msg, trlmarq)==0) oktrailer = true;
77 }
78//DBG cout << "RIOSS::~RawInOutSocketStream()*DBG* nrdt=" << nrdt << " Trailer:" << ((oktrailer)?" OK":" ERROR") << endl;
79
80 delete [] _rdbuff.buff;
81 delete [] _wrbuff.buff;
82 if (!oktrailer)
83 throw SocketException("RawInOutSocketStream::~RawInOutSocketStream(): EndOfStream marker not found");
84}
85
86/* --Methode-- */
87size_t RawInOutSocketStream::CopyToSendBuffer(const char* s, size_t n)
88{
89
90 size_t len = _wrbuff.sz-_wrbuff.cpos;
91 if (len > n) len = n;
92 if ( (_wrbuff.cpos == 0) && (len == _wrbuff.sz) ) {
93 //DBG cout << "RawInOutSocketStream::CopyToSend/DBG-1 - n="
94//DBG << n << " len=" << len << endl;
95 Send(s, len);
96 }
97 else {
98 //DBG cout << "RawInOutSocketStream::CopyToSend/DBG-2 - n="
99//DBG << n << " len=" << len << " cpos=" << _wrbuff.cpos << endl;
100 memcpy(_wrbuff.buff+_wrbuff.cpos, s, len);
101 //DBG cout << " memcpy OK - cpos= " << _wrbuff.cpos << endl;
102 _wrbuff.cpos += len;
103 if (_wrbuff.cpos == _wrbuff.sz) SendBuffer();
104 }
105 return len;
106}
107
108/* --Methode-- */
109size_t RawInOutSocketStream::CopyFromRecvBuffer(char* s, size_t n)
110{
111 if ( (_rdbuff.cpos == _rdbuff.sz) && ( n >= _rdbuff.sz)) {
112 size_t len = _rdbuff.sz;
113 //DBG cout << "RawInOutSocketStream::CopyFromRecv/DBG-1 - n=" << n
114//DBG << " len=" << len << endl;
115 Receive(s, len);
116 return len;
117 }
118 else {
119//DBG cout << "RawInOutSocketStream::CopyFromRecv/DBG-2 - n=" << n << endl;
120 if (_rdbuff.cpos == _rdbuff.sz) ReceiveBuffer();
121 size_t len = _rdbuff.sz-_rdbuff.cpos;
122 if (len > n) len = n;
123 memcpy(s, _rdbuff.buff+_rdbuff.cpos, len);
124 _rdbuff.cpos += len;
125 return len;
126 }
127}
128
129/* --Methode-- */
130void RawInOutSocketStream::SendBuffer()
131{
132//DBG cout << "RawInOutSocketStream::SendBuffer/DBG - cpos="
133 //DBG << _wrbuff.cpos << " sz=" << _wrbuff.sz << endl;
134 // if (_wrbuff.cpos != _wrbuff.sz) return;
135 Send(_wrbuff.buff, _wrbuff.sz);
136 _wrbuff.cpos = 0;
137}
138
139/* --Methode-- */
140void RawInOutSocketStream::ReceiveBuffer()
141{
142 // if (_rdbuff.cpos != _rdbuff.sz) return;
143 //DBG cout << "RawInOutSocketStream::ReceiveBuffer/DBG - cpos="
144 //DBG << _rdbuff.cpos << " sz=" << _rdbuff.sz << endl;
145 Receive(_rdbuff.buff, _rdbuff.sz);
146 _rdbuff.cpos = 0;
147
148}
149
150/* --Methode-- */
151size_t RawInOutSocketStream::Send(const char* s, size_t n)
152{
153 size_t nst = 0;
154 while (nst < n) {
155 size_t ns = _skt.Send(s+nst, n-nst);
156 if (ns < 1) break;
157 nst += ns;
158 }
159 if ( nst < n)
160 throw IOExc("RawInOutSocketStream::Send()/write() Error nwrite < n");
161 return nst;
162}
163
164/* --Methode-- */
165size_t RawInOutSocketStream::Receive(char* s, size_t n)
166{
167 size_t nst = 0;
168 int ntry = 0;
169 while (nst < n) {
170 size_t ns = _skt.Receive(s+nst, n-nst);
171 ntry++;
172 if (ns < 1) break;
173 nst += ns;
174 }
175 if ( nst < n) {
176 cout << " RawInOutSocketStream::Receive() / Pb ! ntry=" << ntry
177 << " nst=" << nst << " n=" << n << endl;
178 throw IOExc("RawInOutSocketStream::Receive/read() Error nread < n");
179 }
180 return nst;
181}
182
183int_8 RawInOutSocketStream::tellg()
184{
185 return _totnrd;
186}
187
188/* --Methode-- */
189RawInOutStream& RawInOutSocketStream::read(char* s, uint_8 n)
190{
191//DBG cout << "RawInOutSocketStream::read()/DBG - n=" << n << endl;
192 size_t nst = 0;
193 while (nst < n) {
194 size_t ns = CopyFromRecvBuffer(s+nst, n-nst);
195 if (ns < 1) break;
196 nst += ns;
197 }
198 if ( nst < n)
199 throw IOExc("RawInOutSocketStream::read() Error nread < n");
200 _totnrd += n;
201 return *this;
202}
203
204/* --Methode-- */
205int_8 RawInOutSocketStream::tellp()
206{
207 return _totnwr;
208}
209
210/* --Methode-- */
211RawInOutStream& RawInOutSocketStream::write(const char* s, uint_8 n)
212{
213//DBG cout << "RawInOutSocketStream::write()/DBG - n=" << n << endl;
214 size_t nst = 0;
215 while (nst < n) {
216 size_t ns = CopyToSendBuffer(s+nst, n-nst);
217 if (ns < 1) break;
218 nst += ns;
219 }
220 //DBG cout << "RawInOutSocketStream::write()/DBG ---> nst=" << nst << endl;
221
222 if ( nst < n)
223 throw IOExc("RawInOutSocketStream::write() Error nwrite < n");
224 _totnwr += n;
225 return *this;
226}
227
228
Note: See TracBrowser for help on using the repository browser.