source: Sophya/trunk/ArchTOIPipe/Kernel/toiseqbuff.cc@ 3720

Last change on this file since 3720 was 2454, checked in by aubourg, 22 years ago

pb longs sur magique

File size: 10.8 KB
Line 
1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
5// $Id: toiseqbuff.cc,v 1.16 2003-11-14 12:34:55 aubourg Exp $
6
7#include "toiprocessor.h"
8#include "toiseqbuff.h"
9#include <pthread.h>
10
11#ifdef WITH_SOPHYA
12#include "pexceptions.h"
13#else
14#include "apexceptions.h"
15#endif
16
17
18TOISeqBuffered::TOISeqBuffered(int wsz) {
19 data = NULL;
20 flags = NULL;
21 AllocBuffer(wsz);
22 setName("toiseqbuff");
23 syncOldWay = false;
24}
25
26TOISeqBuffered::TOISeqBuffered(string nm, int wsz) {
27 data = NULL;
28 flags = NULL;
29 AllocBuffer(wsz);
30 setName(nm);
31 syncOldWay = false;
32}
33
34TOISeqBuffered::~TOISeqBuffered() {
35 delete[] data;
36 delete[] flags;
37}
38
39void TOISeqBuffered::AllocBuffer(int wsz)
40{
41 if (wsz < 128) wsz = 128;
42 wsize = wsz;
43 buffsize = 2*wsz;
44 if (data) delete[] data;
45 if (flags) delete[] flags;
46 data = new double[buffsize];
47 flags = new uint_8[buffsize];
48 for(int k=0; k<buffsize; k++) {
49 data[k] = defaultValue;
50 flags[k] = 0;
51 }
52 next_in = next_out = -1;
53 first_in = first_out = -1;
54 started = false;
55 dbglev = 0;
56}
57
58void TOISeqBuffered::PrintStatus(::ostream & os) const
59{
60 os << "---TOISeqBuffered::PrintStatus() - Name=" << getName()
61 << "\n WindowSize= " << wsize << " BufferSize= " << buffsize << endl;
62 os << "Index: FirstIn= " << getFirstIn() << " LastIn= " << getLastIn()
63 << " Total= " << getLastIn()-getFirstIn()+1 << endl;
64 os << "Index: FirstOut= " << getFirstOut() << " LastOut= " << getLastOut()
65 << " Total= " << getLastOut()-getFirstOut()+1 << endl;
66 os << " WaitStatus: Put/" ;
67 if (isPutWaiting()) os << "Waiting " ;
68 else os << "Running ";
69 os << " PutCountWait= " << getCountWaitPut() << endl;
70 os << " WaitStatus: Get/" ;
71 if (isGetWaiting()) os << "Waiting " ;
72 else os << "Running ";
73 os << " GetCountWait= " << getCountWaitGet() << endl;
74}
75
76TOI::DataStatus TOISeqBuffered::isDataAvailNL(long iStart, long iEnd) {
77 if (iEnd < iStart)
78 throw RangeCheckError("TOISeqBuffered::isDataAvailNL : iEnd<iStart !");
79 if (!started) return DATA_NOT_YET;
80 if (iEnd >= next_in) return DATA_NOT_YET;
81 if (isDataDeleted(iStart)) return DATA_DELETED;
82 return DATA_OK;
83}
84
85TOI::DataStatus TOISeqBuffered::isDataAvailNL(long i) {
86 return TOI::isDataAvailNL(i);
87}
88
89void TOISeqBuffered::wontNeedBefore(long i) {
90 // $CHECK$ Reza 30/4/2001 - Je ne sais pas a quoi ca sert !
91 // next_out = i; $CHECK$ Reza 30/4/2001
92}
93
94
95#ifndef NO_SOPHYA
96/* ---- l'interface va etre modifiee, NE PAS UTILISER
97Array TOISeqBuffered::doGetData(long iStart, long iEnd) {
98 // if (iEnd < iStart)
99 // throw RangeCheckError("TOI::getData : iEnd<iStart !");
100 // if (iStart <= out_last)
101 if (!started) waitGet();
102 if (!isDataAvailNL(iStart, iEnd))
103 throw RangeCheckError("TOISeqBuffered::getData(iS,iE) : data not available");
104 cleanWaitGet();
105 Vector dat(iEnd - iStart + 1);
106 for (long i=0; i<iEnd-iStart+1; i++) {
107 dat(i) = dataRef(i+iStart);
108 }
109 if (first_out < 0) first_out = iStart;
110 if ((iEnd+1) > next_out) next_out = iEnd+1;
111 if (isPutWaiting() && (next_in-next_out < wsize/2 )) signalPut();
112 return dat;
113}
114 l'interface va etre modifiee, NE PAS UTILISER ---- */
115#endif
116
117double TOISeqBuffered::getData(long i)
118{
119 double val;
120 uint_8 flg;
121 getData(i, val, flg);
122 return(val);
123}
124
125
126void TOISeqBuffered::getData(long i, double & val, uint_8 & flg) {
127 lock();
128 if (!started) {
129 cout << " TOISeqBuffered::getData() - waitGet() Waiting for start ... " << endl;
130 waitGet();
131 }
132 cleanWaitGet();
133 if (isDataDeleted(i)) {
134 if (dbglev > 0)
135 cout << " TOISeqBuffered::getData() - DataDeleted() name=" << getName()
136 << " i=" << i << " next_in= " << next_in
137 << " next_out=" << next_out << endl;
138 unlock();
139 throw RangeCheckError("TOISeqBuffered::getData(i) : data deleted");
140 }
141 while (i >= next_in) {
142 if (i>next_out) next_out = i;
143 if (dbglev > 0)
144 cout << " TOISeqBuffered::getData() - waitGet() name=" << getName()
145 << " i=" << i << " next_in= " << next_in
146 << " next_out=" << next_out << endl;
147 waitGet();
148 if (dbglev > 0)
149 cout << " ... Out of waitGet() i=" << i
150 << " next_in= " << next_in << " next_out=" << next_out << endl;
151 cleanWaitGet();
152 }
153 val = dataRef(i);
154 flg = flagRef(i);
155 if (first_out < 0) first_out = i;
156 if ((i+1) > next_out) next_out = i+1;
157 if (isPutWaiting() && (next_in-next_out < wsize/2 )) {
158 if (dbglev > 0)
159 cout << " TOISeqBuffered::getData() - signalPut() name=" << getName()
160 << " i=" << i << " next_in= " << next_in
161 << " next_out=" << next_out << endl;
162 signalPut();
163 }
164 if (fgsigput) broadcast();
165 unlock();
166 return;
167}
168
169
170void TOISeqBuffered::getData(long i, int n, double* data, uint_8* flg)
171{
172 lock();
173 if (!started) {
174 cout << " TOISeqBuffered::getData(i,n ...) - waitGet() Waiting for start ... " << endl;
175 waitGet();
176 }
177 cleanWaitGet();
178 if (isDataDeleted(i)) {
179 if (dbglev > 0)
180 cout << " TOISeqBuffered::getData(i,n ...) - DataDeleted() name=" << getName()
181 << " i=" << i << " next_in= " << next_in
182 << " next_out=" << next_out << endl;
183 unlock();
184 throw RangeCheckError("TOISeqBuffered::getData(i) : data deleted");
185 }
186 for(long j=i; j<i+n; j++) {
187 while (j >= next_in) {
188 if (j>next_out) next_out = j;
189 if (dbglev > 0)
190 cout << " TOISeqBuffered::getData(i,n ...) - waitGet() name=" << getName()
191 << " j=" << j << " next_in= " << next_in
192 << " next_out=" << next_out << endl;
193 waitGet();
194 if (dbglev > 0)
195 cout << " ... Out of waitGet() j=" << j
196 << " next_in= " << next_in << " next_out=" << next_out << endl;
197 cleanWaitGet();
198 }
199 data[j-i] = dataRef(j);
200 if (flg) flg[j-i] = flagRef(j);
201 if (first_out < 0) first_out = j;
202 if ((j+1) > next_out) next_out = j+1;
203 if (isPutWaiting() && (next_in-next_out < wsize/2 )) {
204 if (dbglev > 0)
205 cout << " TOISeqBuffered::getData(i,n ...) - signalPut() name=" << getName()
206 << " i=" << i << " next_in= " << next_in
207 << " next_out=" << next_out << endl;
208 // signalPut();
209 broadcast();
210 }
211 }
212 unlock();
213 // if (fgsigput) signal();
214 return;
215}
216
217
218#ifndef NO_SOPHYA
219/* ---- l'interface va etre modifiee, NE PAS UTILISER
220TArray<int_4> TOISeqBuffered::doGetFlag(long iStart, long iEnd) {
221 if (!started) waitGet();
222 cleanWaitGet();
223 if (!isDataAvailNL(iStart, iEnd))
224 throw RangeCheckError("TOISeqBuffered::getFlag(iS,iE) : data not available");
225 TVector<int_4> dat(iEnd - iStart + 1);
226 for (long i=0; i<iEnd-iStart+1; i++) {
227 dat[i] = flagRef(i+iStart);
228 }
229 return dat;
230}
231 l'interface va etre modifiee, NE PAS UTILISER ---- */
232#endif
233
234/*RZCMV
235int_4 TOISeqBuffered::doGetFlag(long i) {
236 if (!started) waitGet();
237 cleanWaitGet();
238 if (isDataDeleted(i))
239 throw RangeCheckError("TOISeqBuffered::doGetFlag(i) : data deleted");
240 while (i >= next_in) waitGet();
241 int_4 dat = flagRef(i);
242 return dat;
243}
244*/
245
246
247void TOISeqBuffered::putData(long i, double value, uint_8 flag) {
248 lock();
249 if (!started) {
250 first_in = next_in = i;
251 next_out = next_in;
252 started = true;
253 }
254 else {
255 if (i != next_in) {
256 if (dbglev > 0)
257 cout << " TOISeqBuffered::putData() - i!=next_in() name=" << getName()
258 << " i=" << i << " next_in= " << next_in
259 << " next_out=" << next_out << endl;
260 string msg = "TOISeqBuffered::putData() : i!=next_in TOIname=" + getName();
261 unlock();
262 throw RangeCheckError(msg);
263 }
264 if (next_in-next_out >= wsize) {
265 if (dbglev > 0)
266 cout << " TOISeqBuffered::putData() - waitPut() " << getName()
267 << " i=" << i
268 << " next_in= " << next_in << " next_out=" << next_out << endl;
269 waitPut();
270 if (dbglev > 0)
271 cout << " ... Out of waitPut() i=" << i
272 << " next_in= " << next_in << " next_out=" << next_out << endl;
273 }
274 cleanWaitPut();
275 }
276 dataRef(i) = value;
277 flagRef(i) = flag;
278 next_in = i+1;
279 if (isGetWaiting() && (next_in-next_out > wsize/8)) {
280 if (dbglev > 0)
281 cout << " TOISeqBuffered::putData() - signalGet() name=" << getName()
282 << " i=" << i << " next_in= " << next_in
283 << " next_out=" << next_out << endl;
284 broadcast();
285 // signalGet();
286 }
287 // if (fgsigget) broadcast();
288 unlock();
289 return;
290}
291
292void TOISeqBuffered::putData(long i, int n, double const* val, uint_8 const* flg)
293{
294 lock();
295 if (!started) {
296 first_in = next_in = i;
297 next_out = next_in;
298 started = true;
299 }
300 else {
301 if (i != next_in) {
302 if (dbglev > 0)
303 cout << " TOISeqBuffered::putData(i,n ...) - i!=next_in() name=" << getName()
304 << " i=" << i << " next_in= " << next_in
305 << " next_out=" << next_out << endl;
306 string msg = "TOISeqBuffered::putData() : i!=next_in TOIname=" + getName();
307 unlock();
308 throw RangeCheckError(msg);
309 }
310 }
311 for(long j=i; j<i+n; j++) {
312 if (next_in-next_out >= wsize) {
313 if (dbglev > 0)
314 cout << " TOISeqBuffered::putData(i,n ...) - waitPut() " << getName()
315 << " j=" << j
316 << " next_in= " << next_in << " next_out=" << next_out << endl;
317 broadcast();
318 waitPut();
319 if (dbglev > 0)
320 cout << " ... Out of waitPut() j=" << j
321 << " next_in= " << next_in << " next_out=" << next_out << endl;
322 cleanWaitPut();
323 }
324 dataRef(j) = val[j-i];
325 if (flg) flagRef(j) = flg[j-i];
326 else flagRef(j) = 0;
327 next_in = j+1;
328 if (isGetWaiting() && (next_in-next_out > wsize/8)) {
329 if (dbglev > 0)
330 cout << " TOISeqBuffered::putData(i,n ...) - signalGet() name=" << getName()
331 << " i=" << i << " next_in= " << next_in
332 << " next_out=" << next_out << endl;
333 // signalGet();
334 broadcast();
335 }
336 }
337 /*
338 if (isGetWaiting() && (next_in-next_out > wsize/8)) {
339 if (dbglev > 0)
340 cout << " TOISeqBuffered::putData(i,n ...) - signalGet() name=" << getName()
341 << " i=" << i << " next_in= " << next_in
342 << " next_out=" << next_out << endl;
343 //signalGet();
344 broadcast();
345 }
346 */
347 // if (fgsigget) broadcast();
348 unlock();
349 return;
350}
351
352bool TOISeqBuffered::hasSomeData() {
353 lock();
354 bool x = started;
355 unlock();
356 return x;
357}
358
359long TOISeqBuffered::nextDataAvail(long iAfter) {
360 lock();
361 if (iAfter >= next_in ) {unlock(); return -1;}
362 if (iAfter < (next_in-buffsize)) {unlock(); return (next_in-buffsize);}
363 unlock();
364 return iAfter+1;
365}
366
367void TOISeqBuffered::doGetData(long i, double & val, uint_8 & flg)
368{
369 cerr << " TOISeqBuffered::doGetData() not implemented !"
370 << " \n A quoi ca set ??? Reza - Mai 2002 " << endl;
371 throw NotAvailableOperation("TOISeqBuffered::doGetData() not implemented !");
372}
373
374void TOISeqBuffered::doPutData(long i, double value, uint_8 flag)
375{
376 cerr << " TOISeqBuffered::doGetData() not implemented !"
377 << " \n A quoi ca set ??? Reza - Mai 2002 " << endl;
378 throw NotAvailableOperation("TOISeqBuffered::doGetData() not implemented !");
379}
Note: See TracBrowser for help on using the repository browser.