// This may look like C code, but it is really -*- C++ -*- // ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL // Eric Aubourg // Christophe Magneville // Reza Ansari // $Id: toisegment.h,v 1.18 2002-03-23 23:05:22 aubourg Exp $ #ifndef TOISEGMENT_H #define TOISEGMENT_H #include "config.h" #include #include #include "toi.h" // ------------ TOISegmented ------------------------------ // Classe de TOI pour echantillonage regulier, avec buffer // segmente pour optimiser le multithread et limiter les // verrous. // Il faut que les fournisseurs fassent arriver les donnees // par samplenum croissant et continu. // -------------------------------------------------------- class TOISegmented : public TOIRegular { public: TOISegmented(int bufsz=1024, int maxseg=20); TOISegmented(string nm, int bufsz=1024, int maxseg=20); TOISegmented(char* cnm, int bufsz=1024, int maxseg=20); ~TOISegmented(); virtual double getData(int i); virtual void getData(int i, double& data, uint_8& flag); virtual void getData(int i, int n, double* data, uint_8* flg=0); virtual void putData(int i, double value, uint_8 flag=0); virtual void putData(int i, int n, double const* val, uint_8 const* flg=0); virtual void wontNeedBefore(int i); virtual void putDone(); virtual void addConsumer(TOIProcessor*); // Methodes ignorees car on reimplemente les methodes de base virtual DataStatus isDataAvail(int iStart, int iEnd); virtual DataStatus isDataAvail(int i); virtual DataStatus isDataAvailNL(int i); virtual DataStatus isDataAvailNL(int iStart, int iEnd); // abstract virtual void waitForData(int iStart, int iEnd); virtual void waitForData(int i); virtual void waitForAnyData(); virtual int nextDataAvail(int iAfter); // abstract virtual bool hasSomeData(); // abstract virtual void doGetData(int i, double& data, uint_8& flag); // abs virtual void doPutData(int i, double value, uint_8 flag=0); // abs protected: class BufferSegment; class BufferView; class MasterView; MasterView* master; class BufferSegment { public: BufferSegment(int sz); ~BufferSegment(); static const int NEW = 0; static const int WRITE = 1; // single-thread write access static const int COMMITTED = 2; // multiple read without lock are ok int getStatus() {return status;} void incRefCount(); void decRefCount(); int getRefCount(); void putData(int sn, double data, uint_8 flag); void putData(int sn, int n, double const* data, uint_8 const* flag); inline double getData(int sn); inline uint_8 getFlag(int sn); void getData(int sn, int n, double* data, uint_8* flag); bool isPastEnd(int sn) { return sn >= sn0+bufferSize; } private: void checkCommitted() { if (status != COMMITTED) { cerr << "TOISegment: Read on not committed buffer segment" << endl; throw(ForbiddenError("TOISegment: Read on not committed buffer segment")); } } void checkInRange(int sn) { if (sn < sn0 || sn >= sn0+bufferSize) { cerr << "TOISegment: out of range access in buffer segment " << sn << " not in " << sn0 << " - " << sn0+bufferSize << endl; throw(RangeCheckError("TOISegment: out of range access in buffer segment")); } } int status; // NEW, WRITE, COMMITTED int bufferSize; int sn0; // Samplenum du premier echantillon int refcount; // Nombre de vues qui utilisent pthread_mutex_t refcount_mutex; // Pour modification refcount double* data; uint_8* flags; friend class BufferView; friend class MasterView; }; // Master view, gere le lock, et l'ecriture class MasterView { public: MasterView(int bufsz=256, int maxseg=20, string nm=""); ~MasterView(); void putData(int sn, double data, uint_8 flag); double getData(int sn); uint_8 getFlag(int sn); void getData(int i, int n, double* data, uint_8* flg); void putData(int i, int n, double const* val, uint_8 const* flg); BufferView* getView(); // thread-specific void putDone(); protected: friend class BufferView; friend class TOISegmented; string name; void signalWaitingViews(); // views are waiting on read void signalWrite(); // we are waiting on write void nextSegment(); void waitForCleaning(); BufferView* createView(); void updateView(BufferView*); // called on reader thread of the view BufferSegment* currentSegment; int maxSegments; int segmentSize; int sn0; // First sn in first segment vector segments; // Committed segments int nConsumers; pthread_mutex_t views_mutex; // lock for master buffer list access pthread_cond_t write_wait_condv; // waiting for cleaning (on writer thread) pthread_key_t buffer_key; // thread-specific buffer view static void BufferDestroy(void *); pthread_mutex_t read_wait_mutex; pthread_cond_t read_wait_condv; bool waitingOnWrite; // wait on writer thread int waitingViews; set allViews; void checkDeadLock(); }; // per-thread read-only view of a buffer set class BufferView { public: BufferView(MasterView*); ~BufferView(); double getData(int sn); uint_8 getFlag(int sn); void getData(int i, int n, double* data, uint_8* flg); void wontNeedBefore(int sn); protected: void wait(int sn); // Passe en attente d'un nouveau segment -- lecture void sync(); // recupere les nouveaux segments, resync avec master void ensure(int sn); bool waiting; int waitingFor; friend class MasterView; MasterView* master; vector segments; // Committed int sn0; int segmentSize; int firstNeeded; }; }; /***********************************/ /* Inline methods -- BufferSegment */ /***********************************/ double TOISegmented::BufferSegment::getData(int sn) { checkCommitted(); checkInRange(sn); return data[sn-sn0]; } uint_8 TOISegmented::BufferSegment::getFlag(int sn) { checkCommitted(); checkInRange(sn); return flags[sn-sn0]; } #endif