| 1 | // ArchTOIPipe           (C)     CEA/DAPNIA/SPP IN2P3/LAL
 | 
|---|
| 2 | //                               Eric Aubourg
 | 
|---|
| 3 | //                               Christophe Magneville
 | 
|---|
| 4 | //                               Reza Ansari
 | 
|---|
| 5 | // $Id: toisegment.cc,v 1.28 2002-11-28 15:49:21 aubourg Exp $
 | 
|---|
| 6 | 
 | 
|---|
| 7 | #include "toisegment.h"
 | 
|---|
| 8 | 
 | 
|---|
| 9 | #include <iostream.h>
 | 
|---|
| 10 | 
 | 
|---|
| 11 | #ifndef MAXINT
 | 
|---|
| 12 | #define MAXINT 2147483647
 | 
|---|
| 13 | #endif
 | 
|---|
| 14 | 
 | 
|---|
| 15 | static pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
 | 
|---|
| 16 | static void cout_lock() {pthread_mutex_lock(&cout_mutex);}
 | 
|---|
| 17 | static void cout_unlock() {pthread_mutex_unlock(&cout_mutex);}
 | 
|---|
| 18 | #define LOG(_xxx_)
 | 
|---|
| 19 | /*
 | 
|---|
| 20 | #define LOG(_xxx_) \
 | 
|---|
| 21 | cout_lock(); \
 | 
|---|
| 22 | _xxx_; \
 | 
|---|
| 23 | cout_unlock();
 | 
|---|
| 24 | */
 | 
|---|
| 25 | 
 | 
|---|
| 26 | /******************************/
 | 
|---|
| 27 | /******* TOISegmented *********/
 | 
|---|
| 28 | /******************************/
 | 
|---|
| 29 | 
 | 
|---|
| 30 | TOISegmented::TOISegmented(int bufsz, int maxseg) {
 | 
|---|
| 31 |   master = new MasterView(bufsz, maxseg, "");
 | 
|---|
| 32 |   setName("TOISegmented");
 | 
|---|
| 33 |   syncOldWay = false;
 | 
|---|
| 34 | }
 | 
|---|
| 35 | 
 | 
|---|
| 36 | TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) {
 | 
|---|
| 37 |   master = new MasterView(bufsz, maxseg, nm);
 | 
|---|
| 38 |   setName(nm);
 | 
|---|
| 39 |   syncOldWay = false;
 | 
|---|
| 40 | }
 | 
|---|
| 41 | 
 | 
|---|
| 42 | TOISegmented::TOISegmented(char* cnm, int bufsz, int maxseg) {
 | 
|---|
| 43 |   string nm = cnm;
 | 
|---|
| 44 |   master = new MasterView(bufsz, maxseg, nm);
 | 
|---|
| 45 |   setName(nm);
 | 
|---|
| 46 |   syncOldWay = false;
 | 
|---|
| 47 | }
 | 
|---|
| 48 | 
 | 
|---|
| 49 | TOISegmented::~TOISegmented() {
 | 
|---|
| 50 |   delete master;
 | 
|---|
| 51 | }
 | 
|---|
| 52 | 
 | 
|---|
| 53 | 
 | 
|---|
| 54 | void TOISegmented::addConsumer(TOIProcessor* p) {
 | 
|---|
| 55 |   TOI::addConsumer(p);
 | 
|---|
| 56 |   master->nConsumers = consumers.size();
 | 
|---|
| 57 | }
 | 
|---|
| 58 | 
 | 
|---|
| 59 | 
 | 
|---|
| 60 | double TOISegmented::getData(int i) { /* reader thread */
 | 
|---|
| 61 |   return master->getData(i);
 | 
|---|
| 62 | }
 | 
|---|
| 63 | 
 | 
|---|
| 64 | void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */
 | 
|---|
| 65 |   data = master->getData(i);
 | 
|---|
| 66 |   flag = master->getFlag(i);
 | 
|---|
| 67 | }
 | 
|---|
| 68 | 
 | 
|---|
| 69 | void TOISegmented::getData(int i, int n, double* data, uint_8* flg) { /* reader thread */
 | 
|---|
| 70 |   master->getData(i, n, data, flg);
 | 
|---|
| 71 | }
 | 
|---|
| 72 | 
 | 
|---|
| 73 | void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */
 | 
|---|
| 74 |   master->putData(i, value, flag);
 | 
|---|
| 75 | }
 | 
|---|
| 76 | 
 | 
|---|
| 77 | void TOISegmented::putData(int i, int n, double const* val, uint_8 const* flg) { /* writer thread */
 | 
|---|
| 78 |   master->putData(i, n, val, flg);
 | 
|---|
| 79 | }
 | 
|---|
| 80 | 
 | 
|---|
| 81 | void TOISegmented::putDone() {
 | 
|---|
| 82 |   master->putDone();
 | 
|---|
| 83 | }
 | 
|---|
| 84 | 
 | 
|---|
| 85 | void TOISegmented::wontNeedBefore(int i) {  /* reader thread */
 | 
|---|
| 86 |   master->getView()->wontNeedBefore(i);
 | 
|---|
| 87 | }
 | 
|---|
| 88 | 
 | 
|---|
| 89 | TOI::DataStatus TOISegmented::isDataAvail(int i, int j) {
 | 
|---|
| 90 |   // return master->getView()->isDataAvail(i, j);
 | 
|---|
| 91 |   cout << "TOISegmented::isDataAvail unimplemented" << endl;
 | 
|---|
| 92 |   throw PError("TOISegmented::isDataAvail unimplemented");
 | 
|---|
| 93 | }
 | 
|---|
| 94 | 
 | 
|---|
| 95 | TOI::DataStatus TOISegmented::isDataAvail(int i) {
 | 
|---|
| 96 |   return isDataAvail(i,i);
 | 
|---|
| 97 | }
 | 
|---|
| 98 | 
 | 
|---|
| 99 | TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) {
 | 
|---|
| 100 |   return isDataAvail(i,j);
 | 
|---|
| 101 | }
 | 
|---|
| 102 | 
 | 
|---|
| 103 | TOI::DataStatus TOISegmented::isDataAvailNL(int i) {
 | 
|---|
| 104 |   return isDataAvail(i);
 | 
|---|
| 105 | }
 | 
|---|
| 106 | 
 | 
|---|
| 107 | void TOISegmented::waitForData(int iStart, int iEnd) {
 | 
|---|
| 108 |   // get will wait...
 | 
|---|
| 109 | }
 | 
|---|
| 110 | 
 | 
|---|
| 111 | void TOISegmented::waitForData(int i) {
 | 
|---|
| 112 |   // get will wait...
 | 
|---|
| 113 | }
 | 
|---|
| 114 | 
 | 
|---|
| 115 | void TOISegmented::waitForAnyData() {
 | 
|---|
| 116 |   cout << "TOISegmented::waitForAnyData unimplemented" << endl;
 | 
|---|
| 117 |   throw PError("TOISegmented::waitForAnyData unimplemented");
 | 
|---|
| 118 | }
 | 
|---|
| 119 | 
 | 
|---|
| 120 | int TOISegmented::nextDataAvail(int iAfter) {
 | 
|---|
| 121 |   cout << "TOISegmented::nextDataAvail" << endl;
 | 
|---|
| 122 |   return iAfter+1;
 | 
|---|
| 123 | }
 | 
|---|
| 124 | 
 | 
|---|
| 125 | bool TOISegmented::hasSomeData() {
 | 
|---|
| 126 |   cout << "TOISegmented::hasSomeData" << endl;
 | 
|---|
| 127 |   return true;
 | 
|---|
| 128 | }
 | 
|---|
| 129 | 
 | 
|---|
| 130 | void TOISegmented::doPutData(int i, double value, uint_8 flag) {
 | 
|---|
| 131 |   cout << "TOISegmented::doPutData unimplemented" << endl;
 | 
|---|
| 132 |   throw PError("TOISegmented::doPutData unimplemented");
 | 
|---|
| 133 | }
 | 
|---|
| 134 | 
 | 
|---|
| 135 | void TOISegmented::doGetData(int i, double& value, uint_8& flag) {
 | 
|---|
| 136 |   cout << "TOISegmented::doGetData unimplemented" << endl;
 | 
|---|
| 137 |   throw PError("TOISegmented::doGetData unimplemented");
 | 
|---|
| 138 | }
 | 
|---|
| 139 | 
 | 
|---|
| 140 | 
 | 
|---|
| 141 | /*******************************/
 | 
|---|
| 142 | /******* BufferSegment *********/
 | 
|---|
| 143 | /*******************************/
 | 
|---|
| 144 | 
 | 
|---|
| 145 | TOISegmented::BufferSegment::BufferSegment(int sz) {
 | 
|---|
| 146 |   status     = NEW;
 | 
|---|
| 147 |   bufferSize = sz;
 | 
|---|
| 148 |   sn0        = -1;
 | 
|---|
| 149 | 
 | 
|---|
| 150 |   refcount   = 0;
 | 
|---|
| 151 | 
 | 
|---|
| 152 |   data       = new double[sz];
 | 
|---|
| 153 |   flags     = new uint_8[sz];
 | 
|---|
| 154 | 
 | 
|---|
| 155 |   pthread_mutex_init(&refcount_mutex, NULL);
 | 
|---|
| 156 | }
 | 
|---|
| 157 | 
 | 
|---|
| 158 | TOISegmented::BufferSegment::~BufferSegment() {
 | 
|---|
| 159 |   if (refcount > 0) {
 | 
|---|
| 160 |     cerr << "TOISegment : delete Buffer with refcount>0" << endl;
 | 
|---|
| 161 |     throw(ForbiddenError("TOISegment : delete Buffer with refcount>0"));
 | 
|---|
| 162 |   }
 | 
|---|
| 163 |   LOG(cout << "Destroying buffersegment sn0 "<< sn0 << endl);
 | 
|---|
| 164 |   delete[] data;
 | 
|---|
| 165 |   delete[] flags;
 | 
|---|
| 166 |   pthread_mutex_destroy(&refcount_mutex);
 | 
|---|
| 167 | }
 | 
|---|
| 168 | 
 | 
|---|
| 169 | void TOISegmented::BufferSegment::getData(int sn, int n, double* d, uint_8* f) {
 | 
|---|
| 170 |   checkCommitted();
 | 
|---|
| 171 |   checkInRange(sn);
 | 
|---|
| 172 |   checkInRange(sn+n-1);
 | 
|---|
| 173 |   memcpy(d, data+(sn-sn0), n*sizeof(double));
 | 
|---|
| 174 |   if (f != NULL) {
 | 
|---|
| 175 |     memcpy(f, flags+(sn-sn0), n*sizeof(uint_8));
 | 
|---|
| 176 |   }
 | 
|---|
| 177 | }
 | 
|---|
| 178 | 
 | 
|---|
| 179 | void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) { 
 | 
|---|
| 180 |   /* writer thread*/
 | 
|---|
| 181 |   if (status == NEW) {
 | 
|---|
| 182 |     status = WRITE;
 | 
|---|
| 183 |     sn0 = sn;
 | 
|---|
| 184 |   }
 | 
|---|
| 185 |   if (status == COMMITTED) {
 | 
|---|
| 186 |     cerr << "TOISegment : putData in committed buffer" << endl;
 | 
|---|
| 187 |     throw(ForbiddenError("TOISegment : putData in committed buffer"));
 | 
|---|
| 188 |   }
 | 
|---|
| 189 |   checkInRange(sn);
 | 
|---|
| 190 |   data[sn-sn0] = d;
 | 
|---|
| 191 |   flags[sn-sn0] = f;
 | 
|---|
| 192 | }
 | 
|---|
| 193 | 
 | 
|---|
| 194 | void TOISegmented::BufferSegment::putData(int sn, int n, double const* d, uint_8 const* f) {
 | 
|---|
| 195 |   if (status == NEW) {
 | 
|---|
| 196 |     status = WRITE;
 | 
|---|
| 197 |     sn0 = sn;
 | 
|---|
| 198 |   }
 | 
|---|
| 199 |   if (status == COMMITTED) {
 | 
|---|
| 200 |     cerr << "TOISegment : putData in committed buffer" << endl;
 | 
|---|
| 201 |     throw(ForbiddenError("TOISegment : putData in committed buffer"));
 | 
|---|
| 202 |   }
 | 
|---|
| 203 |   if (n==0) return;
 | 
|---|
| 204 |   checkInRange(sn);
 | 
|---|
| 205 |   checkInRange(sn+n-1);
 | 
|---|
| 206 |   memcpy(data+(sn-sn0), d, n*sizeof(double));
 | 
|---|
| 207 |   if (f != NULL) {
 | 
|---|
| 208 |     memcpy(flags+(sn-sn0), f, n*sizeof(uint_8));
 | 
|---|
| 209 |   } else {
 | 
|---|
| 210 |     memset(flags+(sn-sn0), 0, n*sizeof(uint_8));
 | 
|---|
| 211 |   }
 | 
|---|
| 212 | }
 | 
|---|
| 213 | 
 | 
|---|
| 214 | void TOISegmented::BufferSegment::incRefCount() {
 | 
|---|
| 215 |   pthread_mutex_lock(&refcount_mutex);
 | 
|---|
| 216 |   refcount++;
 | 
|---|
| 217 |   pthread_mutex_unlock(&refcount_mutex);
 | 
|---|
| 218 | }
 | 
|---|
| 219 | 
 | 
|---|
| 220 | void TOISegmented::BufferSegment::decRefCount() {
 | 
|---|
| 221 |   pthread_mutex_lock(&refcount_mutex);
 | 
|---|
| 222 |   int nrc = --refcount;
 | 
|---|
| 223 |   pthread_mutex_unlock(&refcount_mutex);
 | 
|---|
| 224 |   if (nrc<0) {
 | 
|---|
| 225 |     cerr << "TOISegment : buffer refcount < 0" << endl;
 | 
|---|
| 226 |     throw(ForbiddenError("TOISegment : buffer refcount < 0"));
 | 
|---|
| 227 |   }
 | 
|---|
| 228 | }
 | 
|---|
| 229 | 
 | 
|---|
| 230 | int TOISegmented::BufferSegment::getRefCount() {
 | 
|---|
| 231 |   pthread_mutex_lock(&refcount_mutex);
 | 
|---|
| 232 |   int rc = refcount;
 | 
|---|
| 233 |   pthread_mutex_unlock(&refcount_mutex);
 | 
|---|
| 234 |   return rc;
 | 
|---|
| 235 | }
 | 
|---|
| 236 | 
 | 
|---|
| 237 | 
 | 
|---|
| 238 | /*******************************/
 | 
|---|
| 239 | /********** BufferView *********/
 | 
|---|
| 240 | /*******************************/
 | 
|---|
| 241 | 
 | 
|---|
| 242 | TOISegmented::BufferView::BufferView(MasterView* m) {
 | 
|---|
| 243 |   master = m;
 | 
|---|
| 244 |   sn0 = -1;
 | 
|---|
| 245 |   segmentSize = m->segmentSize;
 | 
|---|
| 246 |   firstNeeded = -1;
 | 
|---|
| 247 |   waiting = false;
 | 
|---|
| 248 |   waitingFor = -1;
 | 
|---|
| 249 | }
 | 
|---|
| 250 | 
 | 
|---|
| 251 | TOISegmented::BufferView::~BufferView() {
 | 
|---|
| 252 | }
 | 
|---|
| 253 | 
 | 
|---|
| 254 | double TOISegmented::BufferView::getData(int sn) { /* Single-thread, reader thread*/
 | 
|---|
| 255 |   ensure(sn);
 | 
|---|
| 256 |   int seg = (sn-sn0)/segmentSize;
 | 
|---|
| 257 |   return segments[seg]->getData(sn);
 | 
|---|
| 258 | }
 | 
|---|
| 259 | 
 | 
|---|
| 260 | uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread, reader thread */
 | 
|---|
| 261 |   ensure(sn);
 | 
|---|
| 262 |   int seg = (sn-sn0)/segmentSize;
 | 
|---|
| 263 |   return segments[seg]->getFlag(sn);
 | 
|---|
| 264 | }
 | 
|---|
| 265 | 
 | 
|---|
| 266 | void TOISegmented::BufferView::getData(int sn, int n, double* dat, uint_8* flg) { /* Single-thread, reader thread */
 | 
|---|
| 267 |   ensure(sn);
 | 
|---|
| 268 |   ensure(sn+n-1);
 | 
|---|
| 269 |   
 | 
|---|
| 270 |   int  sn1  = sn;
 | 
|---|
| 271 |   int  nsam = n;
 | 
|---|
| 272 |   double* pdat = dat;
 | 
|---|
| 273 |   uint_8* pflg = flg;
 | 
|---|
| 274 | 
 | 
|---|
| 275 |   while (true) {
 | 
|---|
| 276 |     int seg = (sn1-sn0)/segmentSize;
 | 
|---|
| 277 |     BufferSegment* s = segments[seg];
 | 
|---|
| 278 |     int snmax = s->sn0 + s->bufferSize - 1;
 | 
|---|
| 279 |     int sn2 = snmax > (sn1+nsam-1) ? (sn1+nsam-1) : snmax;
 | 
|---|
| 280 |     int nget = sn2-sn1+1;
 | 
|---|
| 281 |     s->getData(sn1, nget, pdat, pflg);
 | 
|---|
| 282 |     pdat += nget;
 | 
|---|
| 283 |     if (pflg != NULL) pflg += nget;
 | 
|---|
| 284 |     nsam -= nget;
 | 
|---|
| 285 |     sn1  += nget;
 | 
|---|
| 286 |     if (nsam <= 0) break;
 | 
|---|
| 287 |   }
 | 
|---|
| 288 | }
 | 
|---|
| 289 | 
 | 
|---|
| 290 | void  TOISegmented::BufferView::ensure(int sn) { /* Single-thread, reader thread */
 | 
|---|
| 291 |   if (sn < sn0) {
 | 
|---|
| 292 |     cout << "TOISegmented::BufferView::ensure : requested sample before first" << endl;
 | 
|---|
| 293 |     cout << "sn " << sn << " sn0 " << sn0 << endl;
 | 
|---|
| 294 |     cout << "name : " << master->name << endl;
 | 
|---|
| 295 |     abort();
 | 
|---|
| 296 |   }
 | 
|---|
| 297 | 
 | 
|---|
| 298 |   if (sn0 < 0 ||
 | 
|---|
| 299 |       sn >= sn0 + segmentSize*segments.size()) {
 | 
|---|
| 300 |     LOG(cout << master->name << " BufferView " 
 | 
|---|
| 301 |         << hex << this << dec << ": read fault for " << sn << endl)
 | 
|---|
| 302 |     sync();
 | 
|---|
| 303 |     pthread_mutex_lock(&(master->read_wait_mutex));
 | 
|---|
| 304 |     while (sn0<0 || sn >= sn0 + segmentSize*segments.size()) {
 | 
|---|
| 305 |       wait(sn); // must be atomic with loop test // $CHECK$ est-ce vrai ?
 | 
|---|
| 306 |       pthread_mutex_unlock(&(master->read_wait_mutex));
 | 
|---|
| 307 |       LOG(cout << master->name << " BufferView " << hex << this << dec << ": waiting for " << sn << endl)
 | 
|---|
| 308 |       sync();
 | 
|---|
| 309 |       pthread_mutex_lock(&(master->read_wait_mutex));
 | 
|---|
| 310 |     }
 | 
|---|
| 311 |     pthread_mutex_unlock(&(master->read_wait_mutex));
 | 
|---|
| 312 | 
 | 
|---|
| 313 |     LOG(cout << master->name << " BufferView " << hex << this << dec << ": resuming for " << sn 
 | 
|---|
| 314 |         << " now data for " << sn0 << " - " << sn0 + segmentSize*segments.size() 
 | 
|---|
| 315 |         << " in " << segments.size() << " segments " << endl)
 | 
|---|
| 316 |   }
 | 
|---|
| 317 | }
 | 
|---|
| 318 | 
 | 
|---|
| 319 | void TOISegmented::BufferView::sync() { /* Single-thread, reader thread */
 | 
|---|
| 320 |   master->updateView(this); // update me !
 | 
|---|
| 321 | }
 | 
|---|
| 322 | 
 | 
|---|
| 323 | void TOISegmented::BufferView::wait(int sn) { /* reader thread, master read wait lock taken */
 | 
|---|
| 324 |   //pthread_mutex_lock(&(master->read_wait_mutex));
 | 
|---|
| 325 |   waiting = true;
 | 
|---|
| 326 |   waitingFor = sn;
 | 
|---|
| 327 |   master->waitingViews++;
 | 
|---|
| 328 |   pthread_cond_wait(&(master->read_wait_condv), &(master->read_wait_mutex)); 
 | 
|---|
| 329 |   waiting = false;
 | 
|---|
| 330 |   waitingFor = -1;
 | 
|---|
| 331 |   master->waitingViews--;
 | 
|---|
| 332 |   //pthread_mutex_unlock(&(master->read_wait_mutex));
 | 
|---|
| 333 | }
 | 
|---|
| 334 | 
 | 
|---|
| 335 | 
 | 
|---|
| 336 | void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */
 | 
|---|
| 337 |   if (sn > firstNeeded) {
 | 
|---|
| 338 |     firstNeeded = sn;
 | 
|---|
| 339 |     // C'est peut-etre le moment de faire unl sync, si on est coince par ailleurs...
 | 
|---|
| 340 |     //pthread_mutex_lock(&(master->read_wait_mutex));
 | 
|---|
| 341 |     if (sn >= sn0 + segmentSize){  //  && master->waitingViews>0) {
 | 
|---|
| 342 | //       LOG(cout<<master->name<< " sync on wontneed, waitingViews=" << master->waitingViews << endl);
 | 
|---|
| 343 |        LOG(cout<<master->name<< " sync on wontneed, sn = " << sn << " sn0 = " << sn0 << endl);
 | 
|---|
| 344 |        sync();
 | 
|---|
| 345 |     }
 | 
|---|
| 346 |     //pthread_mutex_unlock(&(master->read_wait_mutex));
 | 
|---|
| 347 |   }
 | 
|---|
| 348 | }
 | 
|---|
| 349 | 
 | 
|---|
| 350 | 
 | 
|---|
| 351 | /*******************************/
 | 
|---|
| 352 | /********** MasterView *********/
 | 
|---|
| 353 | /*******************************/
 | 
|---|
| 354 | 
 | 
|---|
| 355 | TOISegmented::MasterView::MasterView(int bufsz, int maxseg, string nm) {
 | 
|---|
| 356 |   currentSegment = NULL;
 | 
|---|
| 357 |   maxSegments    = maxseg;
 | 
|---|
| 358 |   segmentSize    = bufsz;
 | 
|---|
| 359 |   sn0            = -1;
 | 
|---|
| 360 |   nConsumers = 0;
 | 
|---|
| 361 |   name = nm;
 | 
|---|
| 362 |   
 | 
|---|
| 363 |   pthread_mutex_init(&views_mutex, NULL);
 | 
|---|
| 364 |   pthread_mutex_init(&read_wait_mutex, NULL);
 | 
|---|
| 365 |   pthread_cond_init(&write_wait_condv, NULL);
 | 
|---|
| 366 |   pthread_cond_init(&read_wait_condv, NULL);
 | 
|---|
| 367 |   pthread_key_create(&buffer_key, BufferDestroy);
 | 
|---|
| 368 | 
 | 
|---|
| 369 |   waitingOnWrite = false;
 | 
|---|
| 370 |   waitingViews = 0;
 | 
|---|
| 371 | }
 | 
|---|
| 372 | 
 | 
|---|
| 373 | TOISegmented::MasterView::~MasterView() {
 | 
|---|
| 374 |   pthread_mutex_destroy(&views_mutex);
 | 
|---|
| 375 |   pthread_mutex_destroy(&read_wait_mutex);
 | 
|---|
| 376 |   pthread_cond_destroy(&write_wait_condv);
 | 
|---|
| 377 |   pthread_cond_destroy(&read_wait_condv);
 | 
|---|
| 378 |   pthread_key_delete(buffer_key);
 | 
|---|
| 379 | 
 | 
|---|
| 380 |   // There should not be any BufferView left... Check ?
 | 
|---|
| 381 |   
 | 
|---|
| 382 |   // decrement count for segments ?
 | 
|---|
| 383 | }
 | 
|---|
| 384 | 
 | 
|---|
| 385 | void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { /* writer thread */
 | 
|---|
| 386 |   if (sn0<0) {
 | 
|---|
| 387 |     LOG(cout << "***MasterView::putData sn0<0 -> " << sn << endl);
 | 
|---|
| 388 |     sn0=sn;
 | 
|---|
| 389 |   }
 | 
|---|
| 390 |   // can fit in current segment ?
 | 
|---|
| 391 |   if (!(currentSegment != NULL &&
 | 
|---|
| 392 |         sn >= currentSegment->sn0 &&
 | 
|---|
| 393 |         sn < currentSegment->sn0 + currentSegment->bufferSize)) {
 | 
|---|
| 394 |     LOG(cout << name << " MasterView::putData, need extend for " << sn << endl)
 | 
|---|
| 395 |     nextSegment();
 | 
|---|
| 396 |   }
 | 
|---|
| 397 |   currentSegment->putData(sn, data, flags);
 | 
|---|
| 398 | }
 | 
|---|
| 399 | 
 | 
|---|
| 400 | void TOISegmented::MasterView::putData(int sn, int n, double const* data, uint_8 const* flags) { /* writer thread */
 | 
|---|
| 401 |   if (sn0<0) {
 | 
|---|
| 402 |     LOG(cout << "***MasterView::putData sn0<0 -> " << sn << endl);
 | 
|---|
| 403 |     sn0=sn;
 | 
|---|
| 404 |   }
 | 
|---|
| 405 |   double const*  pdat = data;
 | 
|---|
| 406 |   uint_8 const*  pflg = flags;
 | 
|---|
| 407 |   int     nsam  = n;
 | 
|---|
| 408 |   int     sn1   = sn;
 | 
|---|
| 409 |   while (true) {
 | 
|---|
| 410 |     // maximum that current segment can take
 | 
|---|
| 411 |     int snmax = -1;
 | 
|---|
| 412 |     if (currentSegment != NULL) {
 | 
|---|
| 413 |       snmax = currentSegment->sn0 + currentSegment->bufferSize-1;
 | 
|---|
| 414 |     }
 | 
|---|
| 415 |     int sn2 = snmax > (sn1+nsam-1) ? (sn1+nsam-1) : snmax;
 | 
|---|
| 416 |     int nput = sn2-sn1+1;
 | 
|---|
| 417 |     if (snmax>0 && nput>0) {
 | 
|---|
| 418 |       currentSegment->putData(sn1, nput, pdat, pflg);
 | 
|---|
| 419 |       pdat += nput;
 | 
|---|
| 420 |       if (pflg != NULL) pflg += nput;
 | 
|---|
| 421 |       nsam -= nput;
 | 
|---|
| 422 |       sn1  += nput;
 | 
|---|
| 423 |     }
 | 
|---|
| 424 |     if (nsam <= 0) break;
 | 
|---|
| 425 |     nextSegment();
 | 
|---|
| 426 |     currentSegment->putData(sn1, 0, 0); // dummy, to initialize sn0 in segment : add method ?
 | 
|---|
| 427 |   }
 | 
|---|
| 428 | }
 | 
|---|
| 429 | 
 | 
|---|
| 430 | double TOISegmented::MasterView::getData(int sn) { /* reader thread */
 | 
|---|
| 431 |   return getView()->getData(sn); /* thread-specific */
 | 
|---|
| 432 | }
 | 
|---|
| 433 | 
 | 
|---|
| 434 | uint_8 TOISegmented::MasterView::getFlag(int sn) { /* reader thread */
 | 
|---|
| 435 |   return getView()->getFlag(sn);
 | 
|---|
| 436 | }
 | 
|---|
| 437 | 
 | 
|---|
| 438 | void TOISegmented::MasterView::getData(int sn, int n, double* dat, uint_8* flg) { /* reader thread */
 | 
|---|
| 439 |   getView()->getData(sn, n, dat, flg);
 | 
|---|
| 440 | }
 | 
|---|
| 441 | 
 | 
|---|
| 442 | TOISegmented::BufferView* TOISegmented::MasterView::getView() { /* reader thread */
 | 
|---|
| 443 |   BufferView* bv = (BufferView*) pthread_getspecific(buffer_key);
 | 
|---|
| 444 |   if (bv == NULL) {
 | 
|---|
| 445 |     bv = createView();
 | 
|---|
| 446 |     LOG(cout << "creating new view " << hex << bv << dec << endl)
 | 
|---|
| 447 |     pthread_setspecific(buffer_key, bv);
 | 
|---|
| 448 |   }
 | 
|---|
| 449 |   return bv;
 | 
|---|
| 450 | }
 | 
|---|
| 451 | 
 | 
|---|
| 452 | void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ /* views locked */
 | 
|---|
| 453 |   pthread_mutex_lock(&read_wait_mutex);
 | 
|---|
| 454 |   pthread_cond_broadcast(&read_wait_condv);
 | 
|---|
| 455 |   pthread_mutex_unlock(&read_wait_mutex);
 | 
|---|
| 456 | }
 | 
|---|
| 457 | 
 | 
|---|
| 458 | void TOISegmented::MasterView::signalWrite() { /* reader thread */ /* views locked */
 | 
|---|
| 459 |   if (waitingOnWrite) {
 | 
|---|
| 460 |     LOG(cout << name << " MasterView : signal for wait on write" << endl)
 | 
|---|
| 461 |     pthread_cond_signal(&write_wait_condv); // only one thread can be sleeping
 | 
|---|
| 462 |   }
 | 
|---|
| 463 | }
 | 
|---|
| 464 | 
 | 
|---|
| 465 | void TOISegmented::MasterView::putDone() {
 | 
|---|
| 466 |   nextSegment(); // cree un segment inutile, a nettoyer
 | 
|---|
| 467 | }
 | 
|---|
| 468 | 
 | 
|---|
| 469 | void TOISegmented::MasterView::nextSegment() { /* writer thread */
 | 
|---|
| 470 |   // The current segment, if any, is now committed. A new
 | 
|---|
| 471 |   // blank buffer is allocated, if any.
 | 
|---|
| 472 |   pthread_mutex_lock(&views_mutex);
 | 
|---|
| 473 | 
 | 
|---|
| 474 |   LOG(cout << "MasterView::nextSegment " 
 | 
|---|
| 475 |       << segments.size()+1 << "/" << maxSegments << endl)
 | 
|---|
| 476 | 
 | 
|---|
| 477 |   if (currentSegment != NULL) {
 | 
|---|
| 478 |     currentSegment->status = BufferSegment::COMMITTED;
 | 
|---|
| 479 |     segments.push_back(currentSegment);
 | 
|---|
| 480 |   }
 | 
|---|
| 481 | 
 | 
|---|
| 482 |   currentSegment = NULL;
 | 
|---|
| 483 |   while (segments.size() >= maxSegments) {
 | 
|---|
| 484 |     waitForCleaning();
 | 
|---|
| 485 |   }
 | 
|---|
| 486 | 
 | 
|---|
| 487 |   currentSegment = new BufferSegment(segmentSize);
 | 
|---|
| 488 |   currentSegment->incRefCount();
 | 
|---|
| 489 |   signalWaitingViews(); // they can ask to be updated !!
 | 
|---|
| 490 |   pthread_mutex_unlock(&views_mutex);
 | 
|---|
| 491 | }
 | 
|---|
| 492 | 
 | 
|---|
| 493 | void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ /* views locked */
 | 
|---|
| 494 |   LOG(cout << name << " MasterView : write wait for clean for " << sn0 << endl)
 | 
|---|
| 495 |   waitingOnWrite = true;
 | 
|---|
| 496 |   checkDeadLock();
 | 
|---|
| 497 |   pthread_cond_wait(&write_wait_condv, &views_mutex);
 | 
|---|
| 498 |   LOG(cout << name << " MasterView : wait done" << endl)
 | 
|---|
| 499 | }
 | 
|---|
| 500 | 
 | 
|---|
| 501 | TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */
 | 
|---|
| 502 |   BufferView* bv =  new BufferView(this);
 | 
|---|
| 503 |   pthread_mutex_lock(&views_mutex);
 | 
|---|
| 504 |   allViews.insert(bv);
 | 
|---|
| 505 |   pthread_mutex_unlock(&views_mutex);
 | 
|---|
| 506 |   updateView(bv);
 | 
|---|
| 507 |   return bv;
 | 
|---|
| 508 | }
 | 
|---|
| 509 | 
 | 
|---|
| 510 | void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */
 | 
|---|
| 511 |   pthread_mutex_lock(&views_mutex);
 | 
|---|
| 512 | 
 | 
|---|
| 513 | //  int oldBegin = bv->sn0;
 | 
|---|
| 514 | //  int oldEnd   = bv->sn0 + bv->segmentSize * bv->segments.size();
 | 
|---|
| 515 |   int oldBegin = sn0;
 | 
|---|
| 516 |   int oldEnd   = sn0 + bv->segmentSize * segments.size();
 | 
|---|
| 517 | 
 | 
|---|
| 518 |   for (vector<BufferSegment*>::iterator i = bv->segments.begin();
 | 
|---|
| 519 |        i != bv->segments.end(); i++) {
 | 
|---|
| 520 |     (*i)->decRefCount();
 | 
|---|
| 521 |   }
 | 
|---|
| 522 |   
 | 
|---|
| 523 |   bv->segments.clear();
 | 
|---|
| 524 | 
 | 
|---|
| 525 |   // utiliser firstNeeded de toutes les vues pour faire le menage chez
 | 
|---|
| 526 |   // nous.
 | 
|---|
| 527 |   
 | 
|---|
| 528 |   // A condition que tous les consumers se soient fait connaitre...
 | 
|---|
| 529 | 
 | 
|---|
| 530 |   if (nConsumers == allViews.size()) {
 | 
|---|
| 531 |     int firstNeeded = MAXINT;
 | 
|---|
| 532 |     for (set<BufferView*>::iterator i = allViews.begin();
 | 
|---|
| 533 |          i != allViews.end(); i++) {
 | 
|---|
| 534 |       LOG(cout << name << " View firstneeded " << (*i)->firstNeeded << endl);
 | 
|---|
| 535 |       if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded;
 | 
|---|
| 536 |     }
 | 
|---|
| 537 |     
 | 
|---|
| 538 |     LOG(cout << name << " MasterView : firstNeeded = " << firstNeeded << endl);
 | 
|---|
| 539 |       
 | 
|---|
| 540 |     vector<BufferSegment*>::iterator j = segments.begin();
 | 
|---|
| 541 |     bool clean = false;
 | 
|---|
| 542 |     {for (vector<BufferSegment*>::iterator i = segments.begin();
 | 
|---|
| 543 |          i != segments.end(); i++) { 
 | 
|---|
| 544 |       //LOG(cout << "Updating : rc = " << (*i)->getRefCount() << " sn0 = " << (*i)->sn0 << endl;);
 | 
|---|
| 545 |       if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 1)) {
 | 
|---|
| 546 |         clean = true;
 | 
|---|
| 547 |         (*i)->decRefCount();
 | 
|---|
| 548 |         delete (*i);
 | 
|---|
| 549 |         j = i;
 | 
|---|
| 550 |       } 
 | 
|---|
| 551 |     }}
 | 
|---|
| 552 |     j++;
 | 
|---|
| 553 |     if (clean) {
 | 
|---|
| 554 |       segments.erase(segments.begin(),j);
 | 
|---|
| 555 |       sn0 = (*segments.begin())->sn0;
 | 
|---|
| 556 |       LOG(cout << "MasterView : purged until " << sn0 << endl);
 | 
|---|
| 557 |     }
 | 
|---|
| 558 |   } else {
 | 
|---|
| 559 |     LOG(cout << "MasterView : not yet all consumer thread known "<< allViews.size()
 | 
|---|
| 560 |         << "/" << nConsumers  << endl);
 | 
|---|
| 561 |   }
 | 
|---|
| 562 | 
 | 
|---|
| 563 |   {for (vector<BufferSegment*>::iterator i = segments.begin();
 | 
|---|
| 564 |        i != segments.end(); i++) {
 | 
|---|
| 565 |     if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) {
 | 
|---|
| 566 |       (*i)->incRefCount();
 | 
|---|
| 567 |       bv->segments.push_back(*i);
 | 
|---|
| 568 |     }
 | 
|---|
| 569 |   }}
 | 
|---|
| 570 | 
 | 
|---|
| 571 |   bv->sn0 = -1;
 | 
|---|
| 572 |   int newEnd = -1;
 | 
|---|
| 573 |   if (segments.size() > 0) {
 | 
|---|
| 574 |     bv->sn0 = bv->segments[0]->sn0;
 | 
|---|
| 575 |     newEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
 | 
|---|
| 576 |   }
 | 
|---|
| 577 | 
 | 
|---|
| 578 |   if (sn0 > oldBegin) { // nettoyage de fait, reveiller le writer thread si besoin
 | 
|---|
| 579 |     signalWrite();
 | 
|---|
| 580 |   }
 | 
|---|
| 581 | 
 | 
|---|
| 582 |   LOG(cout << name << " sync for " << hex << bv << dec << " : "
 | 
|---|
| 583 |       << oldBegin << " - " << oldEnd << "  -->  "
 | 
|---|
| 584 |       << sn0 << " - " << newEnd << endl);
 | 
|---|
| 585 |   
 | 
|---|
| 586 |   if (newEnd > oldEnd) { // Nouveautes, reveiller les reader threads si besoin
 | 
|---|
| 587 |     signalWaitingViews();
 | 
|---|
| 588 |   }
 | 
|---|
| 589 |   pthread_mutex_unlock(&views_mutex);
 | 
|---|
| 590 | }
 | 
|---|
| 591 | 
 | 
|---|
| 592 | void TOISegmented::MasterView::checkDeadLock() { /* views locked */
 | 
|---|
| 593 |    // There is a possible deadlock if no view can free old segments
 | 
|---|
| 594 |   // and we are waiting for write.
 | 
|---|
| 595 | 
 | 
|---|
| 596 |   // we need to record "wont need before" for each view, and 
 | 
|---|
| 597 |   // signal deadlock if any view that needs first segment data is sleeping
 | 
|---|
| 598 |   // while we are asleep
 | 
|---|
| 599 | 
 | 
|---|
| 600 |   pthread_mutex_lock(&read_wait_mutex);
 | 
|---|
| 601 |   if (!waitingOnWrite) {
 | 
|---|
| 602 |     pthread_mutex_unlock(&read_wait_mutex);
 | 
|---|
| 603 |     return; // no problem, there is an active writer
 | 
|---|
| 604 |   }
 | 
|---|
| 605 | 
 | 
|---|
| 606 |   // Is any sleeping view needing our first segment ?
 | 
|---|
| 607 | 
 | 
|---|
| 608 |   for (set<BufferView*>::iterator i=allViews.begin();
 | 
|---|
| 609 |        i != allViews.end(); i++) {
 | 
|---|
| 610 |     if ((*i)->waiting && (*i)->firstNeeded < sn0+segmentSize) {
 | 
|---|
| 611 |       cout << "**** DEADLOCK detected ****" << endl;
 | 
|---|
| 612 |       cout << "We are waiting on write (buffer is full)"<< endl;
 | 
|---|
| 613 |       cout << "but a waiting reader still needs our first segment" << endl;
 | 
|---|
| 614 |       cout << "restart with bigger buffers" << endl;
 | 
|---|
| 615 | 
 | 
|---|
| 616 |       cout << "master has range " << sn0 << " - " 
 | 
|---|
| 617 |            << sn0+segments.size()*segmentSize-1 <<endl;
 | 
|---|
| 618 |       cout << "in " << segments.size() << " segments" << endl;
 | 
|---|
| 619 |       cout << "waiting bufferview is waiting for " << (*i)->waitingFor 
 | 
|---|
| 620 |            << " and still needs " << (*i)->firstNeeded << endl;
 | 
|---|
| 621 |       cout << "it has sn0= " << (*i)->sn0 << " nseg = " << (*i)->segments.size() 
 | 
|---|
| 622 |            << " snlast = " << (*i)->sn0+(*i)->segments.size()*(*i)->segmentSize -1 
 | 
|---|
| 623 |            << endl;
 | 
|---|
| 624 |         
 | 
|---|
| 625 |       cout << "lets try something before failing, but the program may block" << endl;
 | 
|---|
| 626 |       pthread_cond_broadcast(&read_wait_condv);
 | 
|---|
| 627 |       //usleep(1000);
 | 
|---|
| 628 |       //      abort();
 | 
|---|
| 629 |     }
 | 
|---|
| 630 |   }
 | 
|---|
| 631 |   pthread_mutex_unlock(&read_wait_mutex);
 | 
|---|
| 632 | }
 | 
|---|
| 633 | 
 | 
|---|
| 634 | 
 | 
|---|
| 635 | 
 | 
|---|
| 636 | void TOISegmented::MasterView::BufferDestroy(void* p) {
 | 
|---|
| 637 |   BufferView* bv = (BufferView*) p;
 | 
|---|
| 638 |   delete bv;
 | 
|---|
| 639 | }
 | 
|---|