Changeset 1689 in Sophya


Ignore:
Timestamp:
Oct 14, 2001, 1:56:14 AM (24 years ago)
Author:
aubourg
Message:

segmented buffers...

Location:
trunk/ArchTOIPipe
Files:
1 added
6 edited

Legend:

Unmodified
Added
Removed
  • trunk/ArchTOIPipe/Kernel/Makefile.in

    r1685 r1689  
    7272SRCFILES = toi.cc toimanager.cc toiprocessor.cc toiseqbuff.cc \
    7373           fitstoirdr.cc fitstoiwtr.cc asciitoiwtr.cc \
    74            toiregwindow.cc
     74           toiregwindow.cc toisegment.cc
    7575
    7676FILES=$(patsubst %.c,%.o,$(SRCFILES:.cc=.o))
  • trunk/ArchTOIPipe/Kernel/toi.h

    r1532 r1689  
    5757  //RZCMV  virtual void          putDataError(int i, double value,
    5858  //                                 double error, int_4 flag=0);
     59  virtual void          putDone() {}
    5960
    6061  virtual void          wontNeedBefore(int i);
  • trunk/ArchTOIPipe/Kernel/toiprocessor.cc

    r1663 r1689  
    242242}
    243243
     244void TOIProcessor::warnPutDone() {
     245  int n = outIx.size();
     246  for (int i=0; i<n; i++) {
     247    TOI* toi = outTOIs[i];
     248    toi->putDone();
     249  }
     250}
     251
    244252void* TOIProcessor::ThreadStart(void* arg) {
    245253  TOIProcessor* p = (TOIProcessor*) arg;
    246254  //  cout << p->name << " new thread running " << pthread_self() << endl;
    247255  p->run();
     256  p->warnPutDone();
    248257  pthread_exit(NULL);
    249258  //  cout << p->name << " thread done " << pthread_self() << endl;
  • trunk/ArchTOIPipe/Kernel/toiprocessor.h

    r1532 r1689  
    151151 
    152152  static void* ThreadStart(void *);
     153  void warnPutDone();
    153154};
    154155
  • trunk/ArchTOIPipe/Kernel/toisegment.cc

    r1686 r1689  
    11#include "toisegment.h"
     2
     3/******************************/
     4/******* TOISegmented *********/
     5/******************************/
     6
     7TOISegmented::TOISegmented(int bufsz, int maxseg) {
     8  master = new MasterView(bufsz, maxseg);
     9  setName("TOISegmented");
     10}
     11
     12TOISegmented::TOISegmented(string nm, int bufsz, int maxseg) {
     13  master = new MasterView(bufsz, maxseg);
     14  setName(nm);
     15}
     16
     17TOISegmented::~TOISegmented() {
     18  delete master;
     19}
     20
     21
     22double TOISegmented::getData(int i) { /* reader thread */
     23  return master->getData(i);
     24}
     25
     26void TOISegmented::getData(int i, double& data, uint_8& flag) { /* reader thread */
     27  data = master->getData(i);
     28  flag = master->getFlag(i);
     29}
     30
     31void TOISegmented::putData(int i, double value, uint_8 flag) { /* writer thread */
     32  master->putData(i, value, flag);
     33}
     34
     35void TOISegmented::putDone() {
     36  master->putDone();
     37}
     38
     39void TOISegmented::wontNeedBefore(int i) {  /* reader thread */
     40  master->getView()->wontNeedBefore(i);
     41}
     42
     43TOI::DataStatus TOISegmented::isDataAvail(int i, int j) {
     44  // return master->getView()->isDataAvail(i, j);
     45  cout << "TOISegmented::isDataAvail unimplemented" << endl;
     46  throw PError("TOISegmented::isDataAvail unimplemented");
     47}
     48
     49TOI::DataStatus TOISegmented::isDataAvail(int i) {
     50  return isDataAvail(i,i);
     51}
     52
     53TOI::DataStatus TOISegmented::isDataAvailNL(int i, int j) {
     54  return isDataAvail(i,j);
     55}
     56
     57void TOISegmented::waitForData(int iStart, int iEnd) {
     58  // get will wait...
     59}
     60
     61void TOISegmented::waitForData(int i) {
     62  // get will wait...
     63}
     64
     65void TOISegmented::waitForAnyData() {
     66  cout << "TOISegmented::waitForAnyData unimplemented" << endl;
     67  throw PError("TOISegmented::waitForAnyData unimplemented");
     68}
     69
     70int TOISegmented::nextDataAvail(int iAfter) {
     71  cout << "TOISegmented::nextDataAvail" << endl;
     72  return iAfter+1;
     73}
     74
     75bool TOISegmented::hasSomeData() {
     76  cout << "TOISegmented::hasSomeData" << endl;
     77  return true;
     78}
     79
     80void TOISegmented::doPutData(int i, double value, uint_8 flag=0) {
     81  cout << "TOISegmented::doPutData unimplemented" << endl;
     82  throw PError("TOISegmented::doPutData unimplemented");
     83}
     84
     85void TOISegmented::doGetData(int i, double& value, uint_8& flag) {
     86  cout << "TOISegmented::doGetData unimplemented" << endl;
     87  throw PError("TOISegmented::doGetData unimplemented");
     88}
     89
    290
    391/*******************************/
     
    27115}
    28116
    29 void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) {
     117void TOISegmented::BufferSegment::putData(int sn, double d, uint_8 f) {
     118  /* writer thread*/
    30119  if (status == NEW) {
    31120    status = WRITE;
     
    70159  sn0 = -1;
    71160  segmentSize = m->segmentSize;
     161  firstNeeded = -1;
    72162  pthread_mutex_init(&mutex, NULL);
    73163  pthread_cond_init(&condv, NULL);
     
    124214  master->removeFromWaitList(this);
    125215  pthread_mutex_unlock(&mutex); 
     216}
     217
     218void TOISegmented::BufferView::wontNeedBefore(int sn) { /* reader thread */
     219  if (sn > firstNeeded) {
     220    firstNeeded = sn;
     221  }
    126222}
    127223
     
    142238  pthread_key_create(&buffer_key, BufferDestroy);
    143239
    144   waitStatus = NO_WAIT;
     240  waitingOnWrite = false;
    145241}
    146242
     
    157253
    158254void TOISegmented::MasterView::putData(int sn, double data, uint_8 flags) {
     255  if (sn0<0) sn0=sn;
    159256  // can fit in current segment ?
    160257  if (!(currentSegment != NULL &&
    161258        sn >= currentSegment->sn0 &&
    162259        sn < currentSegment->sn0 + currentSegment->bufferSize)) {
     260    cout << "MasterView::putData, need extend for " << sn << endl;
    163261    nextSegment();
    164262  }
     
    177275  // A view needs to wait for new data.
    178276
    179   // There is a possible deadlock if no view can free old segments
    180   // and we are waiting for write.
    181 
    182   // we need to record "wont need before" for each view, and
    183   // signal deadlock if any view that needs first segment data is sleeping
    184   // while we are asleep
    185277
    186278  pthread_mutex_lock(&views_mutex);
     
    199291  BufferView* bv = (BufferView*) pthread_getspecific(buffer_key);
    200292  if (bv == NULL) {
     293    cout << "creating new view" << endl;
    201294    bv = createView();
    202295    pthread_setspecific(buffer_key, bv);
     
    204297  return bv;
    205298}
     299
     300void TOISegmented::MasterView::signalWaitingViews() { /* any thread */
     301  pthread_mutex_lock(&views_mutex);
     302  set<BufferView*> copyset = waitingBuffers;
     303  pthread_mutex_unlock(&views_mutex);
     304
     305  for (set<BufferView*>::iterator i=copyset.begin();
     306       i != copyset.end(); i++) {
     307    (*i)->signal();
     308  }
     309}
     310
     311void TOISegmented::MasterView::signal() { /* reader thread */
     312  pthread_mutex_lock(&write_mutex);
     313  pthread_cond_signal(&condv); // only one thread can be sleeping
     314  pthread_mutex_unlock(&write_mutex); 
     315}
     316
     317void TOISegmented::MasterView::putDone() {
     318  nextSegment(); // cree un segment inutile, a nettoyer
     319}
     320
     321void TOISegmented::MasterView::nextSegment() { /* writer thread */
     322  // The current segment, if any, is now committed. A new
     323  // blank buffer is allocated, if any.
     324
     325  cout << "MasterView::nextSegment "
     326       << segments.size()+1 << "/" << maxSegments << endl;
     327
     328  if (currentSegment != NULL) {
     329    currentSegment->status = BufferSegment::COMMITTED;
     330    pthread_mutex_lock(&views_mutex);
     331    segments.push_back(currentSegment);
     332    pthread_mutex_unlock(&views_mutex);
     333  }
     334
     335  currentSegment = NULL;
     336  while (segments.size() >= maxSegments) {
     337    waitForCleaning();
     338  }
     339
     340  currentSegment = new BufferSegment(segmentSize);
     341  signalWaitingViews(); // they can ask to be updated !!
     342}
     343
     344void TOISegmented::MasterView::waitForCleaning() { /* writer thread */
     345  cout << "MasterView : write wait for clean for " << sn0 << endl;
     346  pthread_mutex_lock(&write_mutex);
     347  waitingOnWrite = true;
     348  checkDeadLock();
     349  pthread_cond_wait(&condv, &write_mutex);
     350  pthread_mutex_unlock(&write_mutex);
     351  cout << "MasterView : wait done" << endl;
     352}
     353
     354TOISegmented::BufferView* TOISegmented::MasterView::createView() { /* reader thread */
     355  BufferView* bv =  new BufferView(this);
     356  updateView(bv);
     357  return bv;
     358}
     359
     360void TOISegmented::MasterView::updateView(BufferView* bv) { /* reader thread */
     361  pthread_mutex_lock(&views_mutex);
     362
     363  int oldBegin = bv->sn0;
     364  int oldEnd   = bv->sn0 + bv->segmentSize * bv->segments.size();
     365
     366  for (vector<BufferSegment*>::iterator i = bv->segments.begin();
     367       i != bv->segments.end(); i++) {
     368    (*i)->decRefCount();
     369  }
     370 
     371  bv->segments.clear();
     372
     373  // TODO : utiliser firstNeeded de toute les vues pour faire le menage chez
     374  // nous.
     375
     376
     377  for (vector<BufferSegment*>::iterator i = segments.begin();
     378       i != segments.end(); i++) {
     379    if ( (*i)->sn0+(*i)->bufferSize > bv->firstNeeded ) {
     380      (*i)->incRefCount();
     381      bv->segments.push_back(*i);
     382    }
     383  }
     384
     385  bv->sn0 = -1;
     386  int newEnd = -1;
     387  if (segments.size() > 0) {
     388    bv->sn0 = bv->segments[0]->sn0;
     389    newEnd = bv->sn0 + bv->segmentSize * bv->segments.size();
     390  }
     391
     392  if (bv->sn0 > oldBegin && waitingOnWrite) {
     393    signal();
     394  }
     395
     396  cout << "sync " << sn0 << " - " << newEnd << endl;
     397
     398  pthread_mutex_unlock(&views_mutex);
     399
     400  if (newEnd > oldEnd) {
     401    signalWaitingViews();
     402  }
     403}
     404
     405void TOISegmented::MasterView::checkDeadLock() {
     406   // There is a possible deadlock if no view can free old segments
     407  // and we are waiting for write.
     408
     409  // we need to record "wont need before" for each view, and
     410  // signal deadlock if any view that needs first segment data is sleeping
     411  // while we are asleep
     412
     413  if (!waitingOnWrite) return; // no problem, there is an active writer
     414
     415  // Is any sleeping view needing our first segment ?
     416
     417  pthread_mutex_lock(&views_mutex);
     418  for (set<BufferView*>::iterator i=waitingBuffers.begin();
     419       i != waitingBuffers.end(); i++) {
     420    if ((*i)->firstNeeded < sn0+segmentSize) {
     421      cout << "**** DEADLOCK detected ****" << endl;
     422      cout << "We are waiting on write (buffer is full)"<< endl;
     423      cout << "but a waiting reader still needs our first segment" << endl;
     424      cout << "restart with bigger buffers" << endl;
     425      abort();
     426    }
     427  }
     428  pthread_mutex_unlock(&views_mutex);
     429}
     430
     431
     432
     433void TOISegmented::MasterView::BufferDestroy(void* p) {
     434  BufferView* bv = (BufferView*) p;
     435  delete bv;
     436}
  • trunk/ArchTOIPipe/Kernel/toisegment.h

    r1686 r1689  
    1818class TOISegmented : public TOIRegular {
    1919 public:
    20   TOISegmented();
    21   TOISegmented(string nm);
     20  TOISegmented(int bufsz=256, int maxseg=20);
     21  TOISegmented(string nm, int bufsz=256, int maxseg=20);
    2222  ~TOISegmented();
     23
     24  virtual double        getData(int i);
     25  virtual void          getData(int i, double& data,  uint_8& flag);
     26  virtual void          putData(int i, double  value, uint_8  flag=0);
     27  virtual void          wontNeedBefore(int i);
     28  virtual void          putDone();
     29
     30
     31  virtual DataStatus    isDataAvail(int iStart, int iEnd);
     32  virtual DataStatus    isDataAvail(int i);
     33  virtual DataStatus    isDataAvailNL(int iStart, int iEnd); // abstract
     34  virtual void          waitForData(int iStart, int iEnd);
     35  virtual void          waitForData(int i);
     36  virtual void          waitForAnyData();
     37  virtual int           nextDataAvail(int iAfter); // abstract
     38  virtual bool          hasSomeData(); // abstract
     39  virtual void          doGetData(int i, double& data, uint_8& flag); // abs
     40  virtual void          doPutData(int i, double value, uint_8 flag=0); // abs
    2341
    2442 
     
    2745  class BufferView;
    2846  class MasterView;
     47
     48  MasterView* master;
    2949
    3050
     
    85105    double getData(int sn);
    86106    uint_8 getFlag(int sn);
     107    BufferView* getView(); // thread-specific
     108    void putDone();
    87109
    88110  protected:
     
    90112    void removeFromWaitList(BufferView* bv);
    91113
    92     BufferView* getView(); // thread-specific
    93114
    94115    friend class BufferView;
    95     void signalWaitingViews();
     116    void signalWaitingViews(); // views are waiting on read
     117    void signal(); // we are waiting on write
    96118    void nextSegment();
     119    void waitForCleaning();
    97120    BufferView* createView();
    98121    void updateView(BufferView*); // called on reader thread of the view
     
    107130    pthread_mutex_t  views_mutex; // lock for master buffer list access
    108131    pthread_mutex_t  write_mutex; // for write waiting
    109     pthread_cond_t   condv; // waiting (read or write) (write only ?)
     132    pthread_cond_t   condv; // waiting for cleaning (on writer thread)
    110133    pthread_key_t    buffer_key; // thread-specific buffer view
    111134    static void BufferDestroy(void *);
    112135
    113     static const int NO_WAIT   = 0;
    114     static const int WAIT_READ = 1;
    115     static const int WAIT_WRITE= 2;
    116     int  waitStatus;
     136    bool   waitingOnWrite; // wait on writer thread
    117137
    118138    set<BufferView*>  waitingBuffers;
     
    130150    double getData(int sn);
    131151    uint_8 getFlag(int sn);   
     152
     153    void wontNeedBefore(int sn);
    132154   
    133155  protected:
     
    142164    int sn0;
    143165    int segmentSize;
     166    int firstNeeded;
    144167    pthread_mutex_t  mutex; // lock pour attente de segments
    145168    pthread_cond_t   condv; // attente de segments (en lecture)
Note: See TracChangeset for help on using the changeset viewer.