source: Sophya/trunk/ArchTOIPipe/Kernel/toiseqbuff.cc@ 3302

Last change on this file since 3302 was 2454, checked in by aubourg, 22 years ago

pb longs sur magique

File size: 10.8 KB
RevLine 
[1738]1// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL
2// Eric Aubourg
3// Christophe Magneville
4// Reza Ansari
[2454]5// $Id: toiseqbuff.cc,v 1.16 2003-11-14 12:34:55 aubourg Exp $
[1738]6
[1437]7#include "toiprocessor.h"
8#include "toiseqbuff.h"
9#include <pthread.h>
10
11#ifdef WITH_SOPHYA
12#include "pexceptions.h"
13#else
14#include "apexceptions.h"
15#endif
16
17
18TOISeqBuffered::TOISeqBuffered(int wsz) {
[1484]19 data = NULL;
20 flags = NULL;
[1437]21 AllocBuffer(wsz);
[1484]22 setName("toiseqbuff");
[1992]23 syncOldWay = false;
[1437]24}
25
26TOISeqBuffered::TOISeqBuffered(string nm, int wsz) {
[1484]27 data = NULL;
28 flags = NULL;
[1437]29 AllocBuffer(wsz);
30 setName(nm);
[1992]31 syncOldWay = false;
[1437]32}
33
34TOISeqBuffered::~TOISeqBuffered() {
35 delete[] data;
36 delete[] flags;
37}
38
39void TOISeqBuffered::AllocBuffer(int wsz)
40{
41 if (wsz < 128) wsz = 128;
42 wsize = wsz;
43 buffsize = 2*wsz;
[1484]44 if (data) delete[] data;
45 if (flags) delete[] flags;
[1437]46 data = new double[buffsize];
[1532]47 flags = new uint_8[buffsize];
[1437]48 for(int k=0; k<buffsize; k++) {
49 data[k] = defaultValue;
50 flags[k] = 0;
51 }
52 next_in = next_out = -1;
53 first_in = first_out = -1;
54 started = false;
55 dbglev = 0;
56}
57
[1762]58void TOISeqBuffered::PrintStatus(::ostream & os) const
[1437]59{
[1484]60 os << "---TOISeqBuffered::PrintStatus() - Name=" << getName()
61 << "\n WindowSize= " << wsize << " BufferSize= " << buffsize << endl;
[1437]62 os << "Index: FirstIn= " << getFirstIn() << " LastIn= " << getLastIn()
63 << " Total= " << getLastIn()-getFirstIn()+1 << endl;
64 os << "Index: FirstOut= " << getFirstOut() << " LastOut= " << getLastOut()
65 << " Total= " << getLastOut()-getFirstOut()+1 << endl;
66 os << " WaitStatus: Put/" ;
67 if (isPutWaiting()) os << "Waiting " ;
68 else os << "Running ";
69 os << " PutCountWait= " << getCountWaitPut() << endl;
70 os << " WaitStatus: Get/" ;
71 if (isGetWaiting()) os << "Waiting " ;
72 else os << "Running ";
73 os << " GetCountWait= " << getCountWaitGet() << endl;
74}
75
[2454]76TOI::DataStatus TOISeqBuffered::isDataAvailNL(long iStart, long iEnd) {
[1437]77 if (iEnd < iStart)
78 throw RangeCheckError("TOISeqBuffered::isDataAvailNL : iEnd<iStart !");
79 if (!started) return DATA_NOT_YET;
80 if (iEnd >= next_in) return DATA_NOT_YET;
[1442]81 if (isDataDeleted(iStart)) return DATA_DELETED;
[1437]82 return DATA_OK;
83}
84
[2454]85TOI::DataStatus TOISeqBuffered::isDataAvailNL(long i) {
[1437]86 return TOI::isDataAvailNL(i);
87}
88
[2454]89void TOISeqBuffered::wontNeedBefore(long i) {
[1484]90 // $CHECK$ Reza 30/4/2001 - Je ne sais pas a quoi ca sert !
91 // next_out = i; $CHECK$ Reza 30/4/2001
[1437]92}
93
94
95#ifndef NO_SOPHYA
[1464]96/* ---- l'interface va etre modifiee, NE PAS UTILISER
[2454]97Array TOISeqBuffered::doGetData(long iStart, long iEnd) {
[1437]98 // if (iEnd < iStart)
99 // throw RangeCheckError("TOI::getData : iEnd<iStart !");
100 // if (iStart <= out_last)
101 if (!started) waitGet();
102 if (!isDataAvailNL(iStart, iEnd))
103 throw RangeCheckError("TOISeqBuffered::getData(iS,iE) : data not available");
104 cleanWaitGet();
105 Vector dat(iEnd - iStart + 1);
[2454]106 for (long i=0; i<iEnd-iStart+1; i++) {
[1437]107 dat(i) = dataRef(i+iStart);
108 }
109 if (first_out < 0) first_out = iStart;
110 if ((iEnd+1) > next_out) next_out = iEnd+1;
111 if (isPutWaiting() && (next_in-next_out < wsize/2 )) signalPut();
112 return dat;
113}
[1464]114 l'interface va etre modifiee, NE PAS UTILISER ---- */
[1437]115#endif
116
[2454]117double TOISeqBuffered::getData(long i)
[1985]118{
119 double val;
120 uint_8 flg;
121 getData(i, val, flg);
122 return(val);
123}
124
125
[2454]126void TOISeqBuffered::getData(long i, double & val, uint_8 & flg) {
[1993]127 lock();
[1437]128 if (!started) {
[1985]129 cout << " TOISeqBuffered::getData() - waitGet() Waiting for start ... " << endl;
[1437]130 waitGet();
131 }
132 cleanWaitGet();
[1442]133 if (isDataDeleted(i)) {
134 if (dbglev > 0)
[1985]135 cout << " TOISeqBuffered::getData() - DataDeleted() name=" << getName()
[1442]136 << " i=" << i << " next_in= " << next_in
137 << " next_out=" << next_out << endl;
[1993]138 unlock();
[1985]139 throw RangeCheckError("TOISeqBuffered::getData(i) : data deleted");
[1442]140 }
[1437]141 while (i >= next_in) {
142 if (i>next_out) next_out = i;
143 if (dbglev > 0)
[1985]144 cout << " TOISeqBuffered::getData() - waitGet() name=" << getName()
[1437]145 << " i=" << i << " next_in= " << next_in
146 << " next_out=" << next_out << endl;
147 waitGet();
148 if (dbglev > 0)
149 cout << " ... Out of waitGet() i=" << i
150 << " next_in= " << next_in << " next_out=" << next_out << endl;
[2024]151 cleanWaitGet();
[1437]152 }
[1462]153 val = dataRef(i);
154 flg = flagRef(i);
[1437]155 if (first_out < 0) first_out = i;
156 if ((i+1) > next_out) next_out = i+1;
157 if (isPutWaiting() && (next_in-next_out < wsize/2 )) {
158 if (dbglev > 0)
[1985]159 cout << " TOISeqBuffered::getData() - signalPut() name=" << getName()
[1437]160 << " i=" << i << " next_in= " << next_in
161 << " next_out=" << next_out << endl;
[1993]162 signalPut();
[1437]163 }
[2024]164 if (fgsigput) broadcast();
[1993]165 unlock();
[1462]166 return;
[1437]167}
168
169
[2454]170void TOISeqBuffered::getData(long i, int n, double* data, uint_8* flg)
[1985]171{
[1993]172 lock();
[1985]173 if (!started) {
174 cout << " TOISeqBuffered::getData(i,n ...) - waitGet() Waiting for start ... " << endl;
175 waitGet();
176 }
177 cleanWaitGet();
178 if (isDataDeleted(i)) {
179 if (dbglev > 0)
180 cout << " TOISeqBuffered::getData(i,n ...) - DataDeleted() name=" << getName()
181 << " i=" << i << " next_in= " << next_in
182 << " next_out=" << next_out << endl;
[1993]183 unlock();
[1985]184 throw RangeCheckError("TOISeqBuffered::getData(i) : data deleted");
185 }
[2454]186 for(long j=i; j<i+n; j++) {
[1985]187 while (j >= next_in) {
188 if (j>next_out) next_out = j;
189 if (dbglev > 0)
190 cout << " TOISeqBuffered::getData(i,n ...) - waitGet() name=" << getName()
191 << " j=" << j << " next_in= " << next_in
192 << " next_out=" << next_out << endl;
193 waitGet();
194 if (dbglev > 0)
195 cout << " ... Out of waitGet() j=" << j
196 << " next_in= " << next_in << " next_out=" << next_out << endl;
[2024]197 cleanWaitGet();
[1985]198 }
199 data[j-i] = dataRef(j);
200 if (flg) flg[j-i] = flagRef(j);
201 if (first_out < 0) first_out = j;
202 if ((j+1) > next_out) next_out = j+1;
203 if (isPutWaiting() && (next_in-next_out < wsize/2 )) {
204 if (dbglev > 0)
205 cout << " TOISeqBuffered::getData(i,n ...) - signalPut() name=" << getName()
206 << " i=" << i << " next_in= " << next_in
207 << " next_out=" << next_out << endl;
[2024]208 // signalPut();
209 broadcast();
[1985]210 }
211 }
[1993]212 unlock();
[2024]213 // if (fgsigput) signal();
[1993]214 return;
[1985]215}
216
217
[1437]218#ifndef NO_SOPHYA
[1464]219/* ---- l'interface va etre modifiee, NE PAS UTILISER
[2454]220TArray<int_4> TOISeqBuffered::doGetFlag(long iStart, long iEnd) {
[1437]221 if (!started) waitGet();
222 cleanWaitGet();
223 if (!isDataAvailNL(iStart, iEnd))
224 throw RangeCheckError("TOISeqBuffered::getFlag(iS,iE) : data not available");
225 TVector<int_4> dat(iEnd - iStart + 1);
[2454]226 for (long i=0; i<iEnd-iStart+1; i++) {
[1437]227 dat[i] = flagRef(i+iStart);
228 }
229 return dat;
230}
[1464]231 l'interface va etre modifiee, NE PAS UTILISER ---- */
[1437]232#endif
233
[1462]234/*RZCMV
[2454]235int_4 TOISeqBuffered::doGetFlag(long i) {
[1437]236 if (!started) waitGet();
237 cleanWaitGet();
238 if (isDataDeleted(i))
239 throw RangeCheckError("TOISeqBuffered::doGetFlag(i) : data deleted");
240 while (i >= next_in) waitGet();
241 int_4 dat = flagRef(i);
242 return dat;
243}
[1462]244*/
[1437]245
246
[2454]247void TOISeqBuffered::putData(long i, double value, uint_8 flag) {
[1993]248 lock();
[1437]249 if (!started) {
250 first_in = next_in = i;
251 next_out = next_in;
252 started = true;
253 }
254 else {
255 if (i != next_in) {
[1442]256 if (dbglev > 0)
[1985]257 cout << " TOISeqBuffered::putData() - i!=next_in() name=" << getName()
[1442]258 << " i=" << i << " next_in= " << next_in
259 << " next_out=" << next_out << endl;
[1985]260 string msg = "TOISeqBuffered::putData() : i!=next_in TOIname=" + getName();
[1993]261 unlock();
[1437]262 throw RangeCheckError(msg);
263 }
264 if (next_in-next_out >= wsize) {
265 if (dbglev > 0)
[1985]266 cout << " TOISeqBuffered::putData() - waitPut() " << getName()
[1437]267 << " i=" << i
268 << " next_in= " << next_in << " next_out=" << next_out << endl;
269 waitPut();
270 if (dbglev > 0)
271 cout << " ... Out of waitPut() i=" << i
272 << " next_in= " << next_in << " next_out=" << next_out << endl;
273 }
274 cleanWaitPut();
275 }
276 dataRef(i) = value;
277 flagRef(i) = flag;
278 next_in = i+1;
279 if (isGetWaiting() && (next_in-next_out > wsize/8)) {
280 if (dbglev > 0)
[1985]281 cout << " TOISeqBuffered::putData() - signalGet() name=" << getName()
[1437]282 << " i=" << i << " next_in= " << next_in
283 << " next_out=" << next_out << endl;
[2024]284 broadcast();
285 // signalGet();
[1437]286 }
[2024]287 // if (fgsigget) broadcast();
[1993]288 unlock();
289 return;
[1437]290}
291
[2454]292void TOISeqBuffered::putData(long i, int n, double const* val, uint_8 const* flg)
[1993]293{
294 lock();
[1985]295 if (!started) {
296 first_in = next_in = i;
297 next_out = next_in;
298 started = true;
299 }
300 else {
301 if (i != next_in) {
302 if (dbglev > 0)
303 cout << " TOISeqBuffered::putData(i,n ...) - i!=next_in() name=" << getName()
304 << " i=" << i << " next_in= " << next_in
305 << " next_out=" << next_out << endl;
306 string msg = "TOISeqBuffered::putData() : i!=next_in TOIname=" + getName();
[1993]307 unlock();
[1985]308 throw RangeCheckError(msg);
309 }
310 }
[2454]311 for(long j=i; j<i+n; j++) {
[1985]312 if (next_in-next_out >= wsize) {
313 if (dbglev > 0)
314 cout << " TOISeqBuffered::putData(i,n ...) - waitPut() " << getName()
315 << " j=" << j
316 << " next_in= " << next_in << " next_out=" << next_out << endl;
[2024]317 broadcast();
[1985]318 waitPut();
319 if (dbglev > 0)
320 cout << " ... Out of waitPut() j=" << j
321 << " next_in= " << next_in << " next_out=" << next_out << endl;
[2024]322 cleanWaitPut();
[1985]323 }
324 dataRef(j) = val[j-i];
325 if (flg) flagRef(j) = flg[j-i];
326 else flagRef(j) = 0;
327 next_in = j+1;
[2024]328 if (isGetWaiting() && (next_in-next_out > wsize/8)) {
329 if (dbglev > 0)
330 cout << " TOISeqBuffered::putData(i,n ...) - signalGet() name=" << getName()
331 << " i=" << i << " next_in= " << next_in
332 << " next_out=" << next_out << endl;
333 // signalGet();
334 broadcast();
335 }
[1985]336 }
[2024]337 /*
[1985]338 if (isGetWaiting() && (next_in-next_out > wsize/8)) {
339 if (dbglev > 0)
340 cout << " TOISeqBuffered::putData(i,n ...) - signalGet() name=" << getName()
341 << " i=" << i << " next_in= " << next_in
342 << " next_out=" << next_out << endl;
[2024]343 //signalGet();
344 broadcast();
[1985]345 }
[2024]346 */
347 // if (fgsigget) broadcast();
[1993]348 unlock();
349 return;
[1985]350}
351
[1437]352bool TOISeqBuffered::hasSomeData() {
353 lock();
354 bool x = started;
355 unlock();
356 return x;
357}
358
[2454]359long TOISeqBuffered::nextDataAvail(long iAfter) {
[1437]360 lock();
361 if (iAfter >= next_in ) {unlock(); return -1;}
362 if (iAfter < (next_in-buffsize)) {unlock(); return (next_in-buffsize);}
363 unlock();
364 return iAfter+1;
365}
366
[2454]367void TOISeqBuffered::doGetData(long i, double & val, uint_8 & flg)
[1985]368{
369 cerr << " TOISeqBuffered::doGetData() not implemented !"
370 << " \n A quoi ca set ??? Reza - Mai 2002 " << endl;
371 throw NotAvailableOperation("TOISeqBuffered::doGetData() not implemented !");
372}
[1437]373
[2454]374void TOISeqBuffered::doPutData(long i, double value, uint_8 flag)
[1985]375{
376 cerr << " TOISeqBuffered::doGetData() not implemented !"
377 << " \n A quoi ca set ??? Reza - Mai 2002 " << endl;
378 throw NotAvailableOperation("TOISeqBuffered::doGetData() not implemented !");
379}
Note: See TracBrowser for help on using the repository browser.