Changeset 1692 in Sophya for trunk/ArchTOIPipe/Kernel/toisegment.cc
- Timestamp:
- Oct 14, 2001, 11:15:01 PM (24 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/ArchTOIPipe/Kernel/toisegment.cc
r1690 r1692 5 5 #endif 6 6 7 static pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER; 8 static void cout_lock() {pthread_mutex_lock(&cout_mutex);} 9 static void cout_unlock() {pthread_mutex_unlock(&cout_mutex);} 10 #define LOG(_xxx_) \ 11 cout_lock(); \ 12 _xxx_; \ 13 cout_unlock(); 14 15 7 16 /******************************/ 8 17 /******* TOISegmented *********/ … … 21 30 TOISegmented::~TOISegmented() { 22 31 delete master; 32 } 33 34 35 void TOISegmented::addConsumer(TOIProcessor* p) { 36 TOI::addConsumer(p); 37 master->nConsumers = consumers.size(); 23 38 } 24 39 … … 82 97 } 83 98 84 void TOISegmented::doPutData(int i, double value, uint_8 flag =0) {99 void TOISegmented::doPutData(int i, double value, uint_8 flag) { 85 100 cout << "TOISegmented::doPutData unimplemented" << endl; 86 101 throw PError("TOISegmented::doPutData unimplemented"); … … 164 179 segmentSize = m->segmentSize; 165 180 firstNeeded = -1; 166 pthread_mutex_init(&mutex, NULL); 167 pthread_cond_init(&condv, NULL); 181 waiting = false; 168 182 } 169 183 170 184 TOISegmented::BufferView::~BufferView() { 171 pthread_mutex_destroy(&mutex); 172 pthread_cond_destroy(&condv); 173 } 174 175 double TOISegmented::BufferView::getData(int sn) { /* Single-thread */ 185 } 186 187 double TOISegmented::BufferView::getData(int sn) { /* Single-thread, reader thread*/ 176 188 ensure(sn); 177 189 int seg = (sn-sn0)/segmentSize; … … 179 191 } 180 192 181 uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread */193 uint_8 TOISegmented::BufferView::getFlag(int sn) { /* Single-thread, reader thread */ 182 194 ensure(sn); 183 195 int seg = (sn-sn0)/segmentSize; … … 185 197 } 186 198 187 void TOISegmented::BufferView::ensure(int sn) { /* Single-thread */199 void TOISegmented::BufferView::ensure(int sn) { /* Single-thread, reader thread */ 188 200 if (sn < sn0) { 189 throw RangeCheckError("requested sample before first"); 190 } 191 192 if (sn >= sn0 + segmentSize*segments.size()) { 193 cout << "BufferView : read fault for " << sn << endl; 201 LOG(cout << "TOISegmented::BufferView::ensure requested sample before first" << endl); 202 LOG(cout << "sn " << sn << " sn0 " << sn0 << endl); 203 abort(); 204 } 205 206 if (sn0 < 0 || 207 sn >= sn0 + segmentSize*segments.size()) { 208 LOG(cout << "BufferView " << hex << this << dec << ": read fault for " << sn << endl) 194 209 sync(); 195 while (sn >= sn0 + segmentSize*segments.size()) {210 while (sn0<0 || sn >= sn0 + segmentSize*segments.size()) { 196 211 wait(); 197 cout << "BufferView : waiting for " << sn << endl;212 LOG(cout << "BufferView " << hex << this << dec << ": waiting for " << sn << endl) 198 213 sync(); 199 214 } 200 cout << "BufferView : resuming for " << sn << endl; 201 } 202 } 203 204 void TOISegmented::BufferView::sync() { /* Single-thread */ 215 LOG(cout << "BufferView " << hex << this << dec << ": resuming for " << sn 216 << " now data for " << sn0 << " - " << sn0 + segmentSize*segments.size() 217 << " in " << segments.size() << " segments " << endl) 218 } 219 } 220 221 void TOISegmented::BufferView::sync() { /* Single-thread, reader thread */ 205 222 master->updateView(this); // update me ! 206 223 } 207 224 208 void TOISegmented::BufferView::wait() { /* From reader thread */ 209 pthread_mutex_lock(&mutex); 210 master->addToWaitList(this); // needing wake-up call 211 pthread_cond_wait(&condv, &mutex); 212 pthread_mutex_unlock(&mutex); 213 } 214 215 void TOISegmented::BufferView::signal() { /* From masterview, writer thread */ 216 pthread_mutex_lock(&mutex); 217 pthread_cond_signal(&condv); // only one thread can be sleeping 218 master->removeFromWaitList(this); 219 pthread_mutex_unlock(&mutex); 220 } 225 void TOISegmented::BufferView::wait() { /* reader thread */ 226 pthread_mutex_lock(&(master->read_wait_mutex)); 227 waiting = true; 228 pthread_cond_wait(&(master->read_wait_condv), &(master->read_wait_mutex)); 229 waiting = false; 230 pthread_mutex_unlock(&(master->read_wait_mutex)); 231 } 232 221 233 222 234 void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */ … … 236 248 segmentSize = bufsz; 237 249 sn0 = -1; 250 nConsumers = 0; 238 251 239 252 pthread_mutex_init(&views_mutex, NULL); 240 pthread_mutex_init(&write_mutex, NULL); 241 pthread_cond_init(&condv, NULL); 253 pthread_mutex_init(&read_wait_mutex, NULL); 254 pthread_cond_init(&write_wait_condv, NULL); 255 pthread_cond_init(&read_wait_condv, NULL); 242 256 pthread_key_create(&buffer_key, BufferDestroy); 243 257 … … 247 261 TOISegmented::MasterView::~MasterView() { 248 262 pthread_mutex_destroy(&views_mutex); 249 pthread_mutex_destroy(&write_mutex); 250 pthread_cond_destroy(&condv); 263 pthread_mutex_destroy(&read_wait_mutex); 264 pthread_cond_destroy(&write_wait_condv); 265 pthread_cond_destroy(&read_wait_condv); 251 266 pthread_key_delete(buffer_key); 252 267 … … 256 271 } 257 272 258 void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { 259 if (sn0<0) sn0=sn; 273 void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) { /* writer thread */ 274 if (sn0<0) { 275 LOG(cout << "***MasterView::putData sn0<0" << endl) 276 sn0=sn; 277 } 260 278 // can fit in current segment ? 261 279 if (!(currentSegment != NULL && 262 280 sn >= currentSegment->sn0 && 263 281 sn < currentSegment->sn0 + currentSegment->bufferSize)) { 264 cout << "MasterView::putData, need extend for " << sn << endl;282 LOG(cout << "MasterView::putData, need extend for " << sn << endl) 265 283 nextSegment(); 266 284 } … … 268 286 } 269 287 270 double TOISegmented::MasterView::getData(int sn) { 271 return getView()->getData(sn); 272 } 273 274 uint_8 TOISegmented::MasterView::getFlag(int sn) { 288 double TOISegmented::MasterView::getData(int sn) { /* reader thread */ 289 return getView()->getData(sn); /* thread-specific */ 290 } 291 292 uint_8 TOISegmented::MasterView::getFlag(int sn) { /* reader thread */ 275 293 return getView()->getFlag(sn); 276 }277 278 void TOISegmented::MasterView::addToWaitList(BufferView* bv) { /* reader thread */279 // A view needs to wait for new data.280 281 282 pthread_mutex_lock(&views_mutex);283 waitingBuffers.insert(bv);284 pthread_mutex_unlock(&views_mutex);285 checkDeadLock();286 }287 288 void TOISegmented::MasterView::removeFromWaitList(BufferView* bv) { /* reader thread */289 pthread_mutex_lock(&views_mutex);290 waitingBuffers.erase(bv);291 pthread_mutex_unlock(&views_mutex);292 294 } 293 295 … … 295 297 BufferView* bv = (BufferView*) pthread_getspecific(buffer_key); 296 298 if (bv == NULL) { 297 cout << "creating new view" << endl;298 299 bv = createView(); 300 LOG(cout << "creating new view " << hex << bv << dec << endl) 299 301 pthread_setspecific(buffer_key, bv); 300 302 } … … 302 304 } 303 305 304 void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ 305 pthread_mutex_lock(&views_mutex); 306 set<BufferView*> copyset = waitingBuffers; 307 pthread_mutex_unlock(&views_mutex); 308 309 for (set<BufferView*>::iterator i=copyset.begin(); 310 i != copyset.end(); i++) { 311 (*i)->signal(); 312 } 313 } 314 315 void TOISegmented::MasterView::signal() { /* reader thread */ 316 pthread_mutex_lock(&write_mutex); 317 pthread_cond_signal(&condv); // only one thread can be sleeping 318 pthread_mutex_unlock(&write_mutex); 306 void TOISegmented::MasterView::signalWaitingViews() { /* any thread */ /* views locked */ 307 pthread_mutex_lock(&read_wait_mutex); 308 pthread_cond_broadcast(&read_wait_condv); 309 pthread_mutex_unlock(&read_wait_mutex); 310 } 311 312 void TOISegmented::MasterView::signalWrite() { /* reader thread */ /* views locked */ 313 if (waitingOnWrite) { 314 LOG(cout << "MasterView : signal for wait on write" << endl) 315 pthread_cond_signal(&write_wait_condv); // only one thread can be sleeping 316 } 319 317 } 320 318 … … 326 324 // The current segment, if any, is now committed. A new 327 325 // blank buffer is allocated, if any. 328 329 cout << "MasterView::nextSegment " 330 << segments.size()+1 << "/" << maxSegments << endl; 326 pthread_mutex_lock(&views_mutex); 327 328 LOG(cout << "MasterView::nextSegment " 329 << segments.size()+1 << "/" << maxSegments << endl) 331 330 332 331 if (currentSegment != NULL) { 333 332 currentSegment->status = BufferSegment::COMMITTED; 334 pthread_mutex_lock(&views_mutex);335 333 segments.push_back(currentSegment); 336 pthread_mutex_unlock(&views_mutex);337 334 } 338 335 … … 344 341 currentSegment = new BufferSegment(segmentSize); 345 342 signalWaitingViews(); // they can ask to be updated !! 346 } 347 348 void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ 349 cout << "MasterView : write wait for clean for " << sn0 << endl; 350 pthread_mutex_lock(&write_mutex);343 pthread_mutex_unlock(&views_mutex); 344 } 345 346 void TOISegmented::MasterView::waitForCleaning() { /* writer thread */ /* views locked */ 347 LOG(cout << "MasterView : write wait for clean for " << sn0 << endl) 351 348 waitingOnWrite = true; 352 349 checkDeadLock(); 353 pthread_cond_wait(&condv, &write_mutex); 354 pthread_mutex_unlock(&write_mutex); 355 cout << "MasterView : wait done" << endl; 350 pthread_cond_wait(&write_wait_condv, &views_mutex); 351 LOG(cout << "MasterView : wait done" << endl) 356 352 } 357 353 358 354 TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */ 359 355 BufferView* bv = new BufferView(this); 356 pthread_mutex_lock(&views_mutex); 360 357 allViews.insert(bv); 358 pthread_mutex_unlock(&views_mutex); 361 359 updateView(bv); 362 360 return bv; … … 379 377 // nous. 380 378 381 int firstNeeded = MAXINT; 382 for (set<BufferView*>::iterator i = allViews.begin(); 383 i != allViews.end(); i++) { 384 if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded; 385 } 386 387 cout << "firstNeeded = " << firstNeeded << endl; 388 389 vector<BufferSegment*>::iterator j = segments.begin(); 390 bool clean = false; 391 for (vector<BufferSegment*>::iterator i = segments.begin(); 392 i != segments.end(); i++) { 393 if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 0)) { 394 clean = true; 395 j = i; 396 } else {} 379 // A condition que tous les consumers se soient fait connaitre... 380 381 if (nConsumers == allViews.size()) { 382 int firstNeeded = MAXINT; 383 for (set<BufferView*>::iterator i = allViews.begin(); 384 i != allViews.end(); i++) { 385 if ((*i)->firstNeeded < firstNeeded) firstNeeded = (*i)->firstNeeded; 386 } 387 388 LOG(cout << "MasterView : firstNeeded = " << firstNeeded << endl); 389 390 vector<BufferSegment*>::iterator j = segments.begin(); 391 bool clean = false; 392 for (vector<BufferSegment*>::iterator i = segments.begin(); 393 i != segments.end(); i++) { 394 if (((*i)->sn0+(*i)->bufferSize <= firstNeeded) && ((*i)->getRefCount() == 0)) { 395 clean = true; 396 j = i; 397 } 398 } 397 399 if (clean) { 398 400 segments.erase(segments.begin(),j); 399 401 sn0 = (*segments.begin())->sn0; 400 } 401 } 402 LOG(cout << "MasterView : purged until " << sn0 << endl); 403 } 404 } else { 405 LOG(cout << "MasterView : not yet all consumer thread known "<< allViews.size() 406 << "/" << nConsumers << endl); 407 } 402 408 403 409 for (vector<BufferSegment*>::iterator i = segments.begin(); … … 416 422 } 417 423 418 if (bv->sn0 > oldBegin && waitingOnWrite) { 419 signal(); 420 } 421 422 cout << "sync " << sn0 << " - " << newEnd << endl; 423 424 if (bv->sn0 > oldBegin) { // nettoyage de fait, reveiller le writer thread si besoin 425 signalWrite(); 426 } 427 428 LOG(cout << "sync for " << hex << bv << dec << " : " 429 << oldBegin << " - " << oldEnd << " --> " 430 << bv->sn0 << " - " << newEnd << endl); 431 432 if (newEnd > oldEnd) { // Nouveautes, reveiller les reader threads si besoin 433 signalWaitingViews(); 434 } 424 435 pthread_mutex_unlock(&views_mutex); 425 426 if (newEnd > oldEnd) { 427 signalWaitingViews(); 428 } 429 } 430 431 void TOISegmented::MasterView::checkDeadLock() { 436 } 437 438 void TOISegmented::MasterView::checkDeadLock() { /* views locked */ 432 439 // There is a possible deadlock if no view can free old segments 433 440 // and we are waiting for write. … … 437 444 // while we are asleep 438 445 439 if (!waitingOnWrite) return; // no problem, there is an active writer 446 pthread_mutex_lock(&read_wait_mutex); 447 if (!waitingOnWrite) { 448 pthread_mutex_unlock(&read_wait_mutex); 449 return; // no problem, there is an active writer 450 } 440 451 441 452 // Is any sleeping view needing our first segment ? 442 453 443 pthread_mutex_lock(&views_mutex); 444 for (set<BufferView*>::iterator i=waitingBuffers.begin(); 445 i != waitingBuffers.end(); i++) { 446 if ((*i)->firstNeeded < sn0+segmentSize) { 454 for (set<BufferView*>::iterator i=allViews.begin(); 455 i != allViews.end(); i++) { 456 if ((*i)->waiting && (*i)->firstNeeded < sn0+segmentSize) { 447 457 cout << "**** DEADLOCK detected ****" << endl; 448 458 cout << "We are waiting on write (buffer is full)"<< endl; … … 452 462 } 453 463 } 454 pthread_mutex_unlock(& views_mutex);464 pthread_mutex_unlock(&read_wait_mutex); 455 465 } 456 466
Note:
See TracChangeset
for help on using the changeset viewer.