source: Sophya/trunk/AddOn/TAcq/sockrawstream.cc@ 3731

Last change on this file since 3731 was 3639, checked in by cmv, 16 years ago

ajout des #include C pour compil sur dernieres versions gcc/g++, cmv 27/05/2009

File size: 6.4 KB
Line 
1#include <stdlib.h>
2#include <string.h>
3
4#include "sockrawstream.h"
5#include "sopnamsp.h"
6#include "pexceptions.h"
7
8
9/*!
10 \class SOPHYA::RawInOutSocketStream
11 \ingroup SysTools
12 This class implements the interface defined by RawInOutStream
13 over a socket (read and write operations).
14 It is mainly intended to be used by PPF In/Out streams.
15*/
16
17// Taille de base pour les messages echanges = Longueur header = long. Trailer
18// Longueur de trailer pour marquer la fin
19#define RIOS_BBSIZE 64
20
21/* --Methode-- */
22RawInOutSocketStream::RawInOutSocketStream(Socket &skt, size_t npaqbuff)
23 : RawInOutStream() , _skt(skt)
24{
25
26// ---- preparation de la chaine d'identification / taille du buffer
27 const char * hdrmarq = "SIPSS-RawInOutSocketStream-Header";
28// 0123456789012345678901234567890123456789
29 char msg[RIOS_BBSIZE];
30 for(size_t k=30; k<RIOS_BBSIZE; k++) msg[k]='\0';
31 strcpy(msg, hdrmarq);
32 if (npaqbuff < 1) npaqbuff = 1;
33 size_t sbsz = npaqbuff*RIOS_BBSIZE;
34 sprintf(msg+48,"%ld",(long)sbsz);
35// ----- Envoi du marqueur de debut et negociation de taille du buffer
36 Send(msg, (size_t)RIOS_BBSIZE);
37 Receive(msg, (size_t)RIOS_BBSIZE);
38 if (strcmp(msg, hdrmarq)!=0)
39 throw SocketException("RawInOutSocketStream::RawInOutSocketStream()- ExchangeMsg check failed");
40
41 size_t rbsz = atol(msg+48);
42 size_t bufsize = (sbsz >= rbsz) ? sbsz : rbsz;
43
44//DBG cout << "RIOSS::RawInOutSocketStream()*DBG* sbsz=" << sbsz << " rbsz=" << rbsz << " -->bufsize=" << bufsize << endl;
45
46 _rdbuff.buff = new char[bufsize];
47 _rdbuff.sz = bufsize;
48 _rdbuff.cpos = bufsize;
49
50 _wrbuff.buff = new char[bufsize];
51 _wrbuff.sz = bufsize;
52 _wrbuff.cpos = 0;
53
54}
55
56/* --Methode-- */
57RawInOutSocketStream::~RawInOutSocketStream()
58{
59// cout << " ---- DESTRUCTEUR ---- ~RawInOutSocketStream() " << endl;
60 //----- ecriture/envoi du buffer si pas vide
61 if (_wrbuff.cpos > 0) {
62 for (size_t k= _wrbuff.cpos; k<_wrbuff.sz; k++) _wrbuff.buff[k] = '\0';
63 _wrbuff.cpos = _wrbuff.sz;
64 SendBuffer();
65 }
66 // Envoi du marquer de fin (trailer)
67 const char * trlmarq = "TRAILER-SIPSS-RawInOutSocketStream-Trailer";
68// 0123456789012345678901234567890123456789
69 char msg[RIOS_BBSIZE];
70 for(size_t k=30; k<RIOS_BBSIZE; k++) msg[k]='\0';
71 strcpy(msg, trlmarq);
72 Send(msg, (size_t)RIOS_BBSIZE);
73
74 bool oktrailer=false;
75 size_t nrdt = 0;
76 size_t maxtry = 16*_rdbuff.sz / RIOS_BBSIZE;
77 while(!oktrailer && (nrdt<maxtry)) {
78 Receive(msg, (size_t)RIOS_BBSIZE);
79 nrdt++;
80 if (strcmp(msg, trlmarq)==0) oktrailer = true;
81 }
82//DBG cout << "RIOSS::~RawInOutSocketStream()*DBG* nrdt=" << nrdt << " Trailer:" << ((oktrailer)?" OK":" ERROR") << endl;
83
84 delete [] _rdbuff.buff;
85 delete [] _wrbuff.buff;
86 if (!oktrailer)
87 throw SocketException("RawInOutSocketStream::~RawInOutSocketStream(): EndOfStream marker not found");
88}
89
90/* --Methode-- */
91size_t RawInOutSocketStream::CopyToSendBuffer(const char* s, size_t n)
92{
93
94 size_t len = _wrbuff.sz-_wrbuff.cpos;
95 if (len > n) len = n;
96 if ( (_wrbuff.cpos == 0) && (len == _wrbuff.sz) ) {
97 //DBG cout << "RawInOutSocketStream::CopyToSend/DBG-1 - n="
98//DBG << n << " len=" << len << endl;
99 Send(s, len);
100 }
101 else {
102 //DBG cout << "RawInOutSocketStream::CopyToSend/DBG-2 - n="
103//DBG << n << " len=" << len << " cpos=" << _wrbuff.cpos << endl;
104 memcpy(_wrbuff.buff+_wrbuff.cpos, s, len);
105 //DBG cout << " memcpy OK - cpos= " << _wrbuff.cpos << endl;
106 _wrbuff.cpos += len;
107 if (_wrbuff.cpos == _wrbuff.sz) SendBuffer();
108 }
109 return len;
110}
111
112/* --Methode-- */
113size_t RawInOutSocketStream::CopyFromRecvBuffer(char* s, size_t n)
114{
115 if ( (_rdbuff.cpos == _rdbuff.sz) && ( n >= _rdbuff.sz)) {
116 size_t len = _rdbuff.sz;
117 //DBG cout << "RawInOutSocketStream::CopyFromRecv/DBG-1 - n=" << n
118//DBG << " len=" << len << endl;
119 Receive(s, len);
120 return len;
121 }
122 else {
123//DBG cout << "RawInOutSocketStream::CopyFromRecv/DBG-2 - n=" << n << endl;
124 if (_rdbuff.cpos == _rdbuff.sz) ReceiveBuffer();
125 size_t len = _rdbuff.sz-_rdbuff.cpos;
126 if (len > n) len = n;
127 memcpy(s, _rdbuff.buff+_rdbuff.cpos, len);
128 _rdbuff.cpos += len;
129 return len;
130 }
131}
132
133/* --Methode-- */
134void RawInOutSocketStream::SendBuffer()
135{
136//DBG cout << "RawInOutSocketStream::SendBuffer/DBG - cpos="
137 //DBG << _wrbuff.cpos << " sz=" << _wrbuff.sz << endl;
138 // if (_wrbuff.cpos != _wrbuff.sz) return;
139 Send(_wrbuff.buff, _wrbuff.sz);
140 _wrbuff.cpos = 0;
141}
142
143/* --Methode-- */
144void RawInOutSocketStream::ReceiveBuffer()
145{
146 // if (_rdbuff.cpos != _rdbuff.sz) return;
147 //DBG cout << "RawInOutSocketStream::ReceiveBuffer/DBG - cpos="
148 //DBG << _rdbuff.cpos << " sz=" << _rdbuff.sz << endl;
149 Receive(_rdbuff.buff, _rdbuff.sz);
150 _rdbuff.cpos = 0;
151
152}
153
154/* --Methode-- */
155size_t RawInOutSocketStream::Send(const char* s, size_t n)
156{
157 size_t nst = 0;
158 while (nst < n) {
159 size_t ns = _skt.Send(s+nst, n-nst);
160 if (ns < 1) break;
161 nst += ns;
162 }
163 if ( nst < n)
164 throw IOExc("RawInOutSocketStream::Send()/write() Error nwrite < n");
165 return nst;
166}
167
168/* --Methode-- */
169size_t RawInOutSocketStream::Receive(char* s, size_t n)
170{
171 size_t nst = 0;
172 int ntry = 0;
173 while (nst < n) {
174 size_t ns = _skt.Receive(s+nst, n-nst);
175 ntry++;
176 if (ns < 1) break;
177 nst += ns;
178 }
179 if ( nst < n) {
180 cout << " RawInOutSocketStream::Receive() / Pb ! ntry=" << ntry
181 << " nst=" << nst << " n=" << n << endl;
182 throw IOExc("RawInOutSocketStream::Receive/read() Error nread < n");
183 }
184 return nst;
185}
186
187int_8 RawInOutSocketStream::tellg()
188{
189 return _totnrd;
190}
191
192/* --Methode-- */
193RawInOutStream& RawInOutSocketStream::read(char* s, uint_8 n)
194{
195//DBG cout << "RawInOutSocketStream::read()/DBG - n=" << n << endl;
196 size_t nst = 0;
197 while (nst < n) {
198 size_t ns = CopyFromRecvBuffer(s+nst, n-nst);
199 if (ns < 1) break;
200 nst += ns;
201 }
202 if ( nst < n)
203 throw IOExc("RawInOutSocketStream::read() Error nread < n");
204 _totnrd += n;
205 return *this;
206}
207
208/* --Methode-- */
209int_8 RawInOutSocketStream::tellp()
210{
211 return _totnwr;
212}
213
214/* --Methode-- */
215RawInOutStream& RawInOutSocketStream::write(const char* s, uint_8 n)
216{
217//DBG cout << "RawInOutSocketStream::write()/DBG - n=" << n << endl;
218 size_t nst = 0;
219 while (nst < n) {
220 size_t ns = CopyToSendBuffer(s+nst, n-nst);
221 if (ns < 1) break;
222 nst += ns;
223 }
224 //DBG cout << "RawInOutSocketStream::write()/DBG ---> nst=" << nst << endl;
225
226 if ( nst < n)
227 throw IOExc("RawInOutSocketStream::write() Error nwrite < n");
228 _totnwr += n;
229 return *this;
230}
231
232
Note: See TracBrowser for help on using the repository browser.