Changeset 1689 in Sophya
- Timestamp:
- Oct 14, 2001, 1:56:14 AM (24 years ago)
- Location:
- trunk/ArchTOIPipe
- Files:
-
- 1 added
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/ArchTOIPipe/Kernel/Makefile.in
r1685 r1689 72 72 SRCFILES = toi.cc toimanager.cc toiprocessor.cc toiseqbuff.cc \ 73 73 fitstoirdr.cc fitstoiwtr.cc asciitoiwtr.cc \ 74 toiregwindow.cc 74 toiregwindow.cc toisegment.cc 75 75 76 76 FILES=$(patsubst %.c,%.o,$(SRCFILES:.cc=.o)) -
trunk/ArchTOIPipe/Kernel/toi.h
r1532 r1689 57 57 //RZCMV virtual void putDataError(int i, double value, 58 58 // double error, int_4 flag=0); 59 virtual void putDone() {} 59 60 60 61 virtual void wontNeedBefore(int i); -
trunk/ArchTOIPipe/Kernel/toiprocessor.cc
r1663 r1689 242 242 } 243 243 244 void TOIProcessor::warnPutDone() { 245 int n = outIx.size(); 246 for (int i=0; i<n; i++) { 247 TOI* toi = outTOIs[i]; 248 toi->putDone(); 249 } 250 } 251 244 252 void* TOIProcessor::ThreadStart(void* arg) { 245 253 TOIProcessor* p = (TOIProcessor*) arg; 246 254 // cout << p->name << " new thread running " << pthread_self() << endl; 247 255 p->run(); 256 p->warnPutDone(); 248 257 pthread_exit(NULL); 249 258 // cout << p->name << " thread done " << pthread_self() << endl; -
trunk/ArchTOIPipe/Kernel/toiprocessor.h
r1532 r1689 151 151 152 152 static void* ThreadStart(void *); 153 void warnPutDone(); 153 154 }; 154 155 -
trunk/ArchTOIPipe/Kernel/toisegment.cc
r1686 r1689 1 1 #include "toisegment.h" 2 3 /******************************/ 4 /******* TOISegmented *********/ 5 /******************************/ 6 7 TOISegmented::TOISegmented(int bufsz, int maxseg) { 8 master = new MasterView(bufsz, maxseg); 9 setName("TOISegmented"); 10 } 11 12 TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) { 13 master = new MasterView(bufsz, maxseg); 14 setName(nm); 15 } 16 17 TOISegmented::~TOISegmented() { 18 delete master; 19 } 20 21 22 double TOISegmented::getData(int i) { /* reader thread */ 23 return master->getData(i); 24 } 25 26 void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */ 27 data = master->getData(i); 28 flag = master->getFlag(i); 29 } 30 31 void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */ 32 master->putData(i, value, flag); 33 } 34 35 void TOISegmented::putDone() { 36 master->putDone(); 37 } 38 39 void TOISegmented::wontNeedBefore(int i) { /* reader thread */ 40 master->getView()->wontNeedBefore(i); 41 } 42 43 TOI::DataStatus TOISegmented::isDataAvail(int i, int j) { 44 // return master->getView()->isDataAvail(i, j); 45 cout << "TOISegmented::isDataAvail unimplemented" << endl; 46 throw PError("TOISegmented::isDataAvail unimplemented"); 47 } 48 49 TOI::DataStatus TOISegmented::isDataAvail(int i) { 50 return isDataAvail(i,i); 51 } 52 53 TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) { 54 return isDataAvail(i,j); 55 } 56 57 void TOISegmented::waitForData(int iStart, int iEnd) { 58 // get will wait... 59 } 60 61 void TOISegmented::waitForData(int i) { 62 // get will wait... 63 } 64 65 void TOISegmented::waitForAnyData() { 66 cout << "TOISegmented::waitForAnyData unimplemented" << endl; 67 throw PError("TOISegmented::waitForAnyData unimplemented"); 68 } 69 70 int TOISegmented::nextDataAvail(int iAfter) { 71 cout << "TOISegmented::nextDataAvail" << endl; 72 return iAfter+1; 73 } 74 75 bool TOISegmented::hasSomeData() { 76 cout << "TOISegmented::hasSomeData" << endl; 77 return true; 78 } 79 80 void TOISegmented::doPutData(int i, double value, uint_8 flag=0) { 81 cout << "TOISegmented::doPutData unimplemented" << endl; 82 throw PError("TOISegmented::doPutData unimplemented"); 83 } 84 85 void TOISegmented::doGetData(int i, double& value, uint_8& flag) { 86 cout << "TOISegmented::doGetData unimplemented" << endl; 87 throw PError("TOISegmented::doGetData unimplemented"); 88 } 89 2 90 3 91 /*******************************/ … … 27 115 } 28 116 29 void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) { 117 void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) { 118 /* writer thread*/ 30 119 if (status == NEW) { 31 120 status = WRITE; … … 70 159 sn0 = -1; 71 160 segmentSize = m->segmentSize; 161 firstNeeded = -1; 72 162 pthread_mutex_init(&mutex, NULL); 73 163 pthread_cond_init(&condv, NULL); … … 124 214 master->removeFromWaitList(this); 125 215 pthread_mutex_unlock(&mutex); 216 } 217 218 void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */ 219 if (sn > firstNeeded) { 220 firstNeeded = sn; 221 } 126 222 } 127 223 … … 142 238 pthread_key_create(&buffer_key, BufferDestroy); 143 239 144 wait Status = NO_WAIT;240 waitingOnWrite = false; 145 241 } 146 242 … … 157 253 158 254 void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { 255 if (sn0<0) sn0=sn; 159 256 // can fit in current segment ? 160 257 if (!(currentSegment != NULL && 161 258 sn >= currentSegment->sn0 && 162 259 sn < currentSegment->sn0 + currentSegment->bufferSize)) { 260 cout << "MasterView::putData, need extend for " << sn << endl; 163 261 nextSegment(); 164 262 } … … 177 275 // A view needs to wait for new data. 178 276 179 // There is a possible deadlock if no view can free old segments180 // and we are waiting for write.181 182 // we need to record "wont need before" for each view, and183 // signal deadlock if any view that needs first segment data is sleeping184 // while we are asleep185 277 186 278 pthread_mutex_lock(&views_mutex); … … 199 291 BufferView* bv = (BufferView*) pthread_getspecific(buffer_key); 200 292 if (bv == NULL) { 293 cout << "creating new view" << endl; 201 294 bv = createView(); 202 295 pthread_setspecific(buffer_key, bv); … … 204 297 return bv; 205 298 } 299 300 void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ 301 pthread_mutex_lock(&views_mutex); 302 set<BufferView*> copyset = waitingBuffers; 303 pthread_mutex_unlock(&views_mutex); 304 305 for (set<BufferView*>::iterator i=copyset.begin(); 306 i != copyset.end(); i++) { 307 (*i)->signal(); 308 } 309 } 310 311 void TOISegmented::MasterView::signal() { /* reader thread */ 312 pthread_mutex_lock(&write_mutex); 313 pthread_cond_signal(&condv); // only one thread can be sleeping 314 pthread_mutex_unlock(&write_mutex); 315 } 316 317 void TOISegmented::MasterView::putDone() { 318 nextSegment(); // cree un segment inutile, a nettoyer 319 } 320 321 void TOISegmented::MasterView::nextSegment() { /* writer thread */ 322 // The current segment, if any, is now committed. A new 323 // blank buffer is allocated, if any. 324 325 cout << "MasterView::nextSegment " 326 << segments.size()+1 << "/" << maxSegments << endl; 327 328 if (currentSegment != NULL) { 329 currentSegment->status = BufferSegment::COMMITTED; 330 pthread_mutex_lock(&views_mutex); 331 segments.push_back(currentSegment); 332 pthread_mutex_unlock(&views_mutex); 333 } 334 335 currentSegment = NULL; 336 while (segments.size() >= maxSegments) { 337 waitForCleaning(); 338 } 339 340 currentSegment = new BufferSegment(segmentSize); 341 signalWaitingViews(); // they can ask to be updated !! 342 } 343 344 void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ 345 cout << "MasterView : write wait for clean for " << sn0 << endl; 346 pthread_mutex_lock(&write_mutex); 347 waitingOnWrite = true; 348 checkDeadLock(); 349 pthread_cond_wait(&condv, &write_mutex); 350 pthread_mutex_unlock(&write_mutex); 351 cout << "MasterView : wait done" << endl; 352 } 353 354 TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */ 355 BufferView* bv = new BufferView(this); 356 updateView(bv); 357 return bv; 358 } 359 360 void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */ 361 pthread_mutex_lock(&views_mutex); 362 363 int oldBegin = bv->sn0; 364 int oldEnd = bv->sn0 + bv->segmentSize * bv->segments.size(); 365 366 for (vector<BufferSegment*>::iterator i = bv->segments.begin(); 367 i != bv->segments.end(); i++) { 368 (*i)->decRefCount(); 369 } 370 371 bv->segments.clear(); 372 373 // TODO : utiliser firstNeeded de toute les vues pour faire le menage chez 374 // nous. 375 376 377 for (vector<BufferSegment*>::iterator i = segments.begin(); 378 i != segments.end(); i++) { 379 if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) { 380 (*i)->incRefCount(); 381 bv->segments.push_back(*i); 382 } 383 } 384 385 bv->sn0 = -1; 386 int newEnd = -1; 387 if (segments.size() > 0) { 388 bv->sn0 = bv->segments[0]->sn0; 389 newEnd = bv->sn0 + bv->segmentSize * bv->segments.size(); 390 } 391 392 if (bv->sn0 > oldBegin && waitingOnWrite) { 393 signal(); 394 } 395 396 cout << "sync " << sn0 << " - " << newEnd << endl; 397 398 pthread_mutex_unlock(&views_mutex); 399 400 if (newEnd > oldEnd) { 401 signalWaitingViews(); 402 } 403 } 404 405 void TOISegmented::MasterView::checkDeadLock() { 406 // There is a possible deadlock if no view can free old segments 407 // and we are waiting for write. 408 409 // we need to record "wont need before" for each view, and 410 // signal deadlock if any view that needs first segment data is sleeping 411 // while we are asleep 412 413 if (!waitingOnWrite) return; // no problem, there is an active writer 414 415 // Is any sleeping view needing our first segment ? 416 417 pthread_mutex_lock(&views_mutex); 418 for (set<BufferView*>::iterator i=waitingBuffers.begin(); 419 i != waitingBuffers.end(); i++) { 420 if ((*i)->firstNeeded < sn0+segmentSize) { 421 cout << "**** DEADLOCK detected ****" << endl; 422 cout << "We are waiting on write (buffer is full)"<< endl; 423 cout << "but a waiting reader still needs our first segment" << endl; 424 cout << "restart with bigger buffers" << endl; 425 abort(); 426 } 427 } 428 pthread_mutex_unlock(&views_mutex); 429 } 430 431 432 433 void TOISegmented::MasterView::BufferDestroy(void* p) { 434 BufferView* bv = (BufferView*) p; 435 delete bv; 436 } -
trunk/ArchTOIPipe/Kernel/toisegment.h
r1686 r1689 18 18 class TOISegmented : public TOIRegular { 19 19 public: 20 TOISegmented( );21 TOISegmented(string nm );20 TOISegmented(int bufsz=256, int maxseg=20); 21 TOISegmented(string nm, int bufsz=256, int maxseg=20); 22 22 ~TOISegmented(); 23 24 virtual double getData(int i); 25 virtual void getData(int i, double& data, uint_8& flag); 26 virtual void putData(int i, double value, uint_8 flag=0); 27 virtual void wontNeedBefore(int i); 28 virtual void putDone(); 29 30 31 virtual DataStatus isDataAvail(int iStart, int iEnd); 32 virtual DataStatus isDataAvail(int i); 33 virtual DataStatus isDataAvailNL(int iStart, int iEnd); // abstract 34 virtual void waitForData(int iStart, int iEnd); 35 virtual void waitForData(int i); 36 virtual void waitForAnyData(); 37 virtual int nextDataAvail(int iAfter); // abstract 38 virtual bool hasSomeData(); // abstract 39 virtual void doGetData(int i, double& data, uint_8& flag); // abs 40 virtual void doPutData(int i, double value, uint_8 flag=0); // abs 23 41 24 42 … … 27 45 class BufferView; 28 46 class MasterView; 47 48 MasterView* master; 29 49 30 50 … … 85 105 double getData(int sn); 86 106 uint_8 getFlag(int sn); 107 BufferView* getView(); // thread-specific 108 void putDone(); 87 109 88 110 protected: … … 90 112 void removeFromWaitList(BufferView* bv); 91 113 92 BufferView* getView(); // thread-specific93 114 94 115 friend class BufferView; 95 void signalWaitingViews(); 116 void signalWaitingViews(); // views are waiting on read 117 void signal(); // we are waiting on write 96 118 void nextSegment(); 119 void waitForCleaning(); 97 120 BufferView* createView(); 98 121 void updateView(BufferView*); // called on reader thread of the view … … 107 130 pthread_mutex_t views_mutex; // lock for master buffer list access 108 131 pthread_mutex_t write_mutex; // for write waiting 109 pthread_cond_t condv; // waiting (read or write) (write only ?)132 pthread_cond_t condv; // waiting for cleaning (on writer thread) 110 133 pthread_key_t buffer_key; // thread-specific buffer view 111 134 static void BufferDestroy(void *); 112 135 113 static const int NO_WAIT = 0; 114 static const int WAIT_READ = 1; 115 static const int WAIT_WRITE= 2; 116 int waitStatus; 136 bool waitingOnWrite; // wait on writer thread 117 137 118 138 set<BufferView*> waitingBuffers; … … 130 150 double getData(int sn); 131 151 uint_8 getFlag(int sn); 152 153 void wontNeedBefore(int sn); 132 154 133 155 protected: … … 142 164 int sn0; 143 165 int segmentSize; 166 int firstNeeded; 144 167 pthread_mutex_t mutex; // lock pour attente de segments 145 168 pthread_cond_t condv; // attente de segments (en lecture)
Note:
See TracChangeset
for help on using the changeset viewer.