// ArchTOIPipe (C) CEA/DAPNIA/SPP IN2P3/LAL // Eric Aubourg // Christophe Magneville // Reza Ansari // $Id: toimanager.cc,v 1.20 2003-02-24 14:14:51 cecile Exp $ #include "toimanager.h" #include #include #include #include #include #ifndef MAXINT #define MAXINT 2147483647 #endif TOIManager::TOIManager() { reqBegin = 0; reqEnd = MAXINT; // -----------ajout cgt vf 19/08/2002 // par defaut TOISegmented selectTOISegmented(1024, 20); // ----------- fin ajout cgt } TOIManager* TOIManager::instance = NULL; TOIManager* TOIManager::getManager() { if (instance == NULL) instance = new TOIManager(); return instance; } // ajout vf 26/07/2002 // enregistrement d'un processeur dans la liste des processeurs pour une execution en groupe void TOIManager::registerProcessor(TOIProcessor* proc) { cout << "Adding processor to TOIManager for group execution" << endl; processors.push_back(proc); } // demarrage de tous les processeurs et verification auto des samplenum pour chaque processeur parametre void TOIManager::startAll() { // verification des samplenum bool samples_ok=checkSamplesLimits(1); if (samples_ok) { cout << "All limits ok" << endl << "Starting processors" << endl; } else { cout << "One or more limits ajusted for execution" << endl << "Starting processors" << endl; } // mise a jour des limites apres verification checkSamplesLimits(2); checkSamplesLimits(3); cout<<"Fin checks"<::iterator i = processors.begin(); i != processors.end(); i++) { TOIProcessor* proc = *i; proc->printLimits(); } } // demarrage for (vector::iterator i = processors.begin(); i != processors.end(); i++) { TOIProcessor* proc = *i; cout << "**********************" << endl; cout << "starting processor " << endl; proc->start(); cout << "processor started " << endl; } cout << "**********************" << endl; } bool TOIManager::checkSamplesLimits(int pass) { bool processor_ok=true; bool samples_ok=true; for (vector::iterator i = processors.begin(); i != processors.end(); i++) { TOIProcessor* proc = *i; cout << "testing processor limits " << endl; // test du processeur // test seulement pour les processor cle //if (proc->getRequested()) { processor_ok = proc->checkSampleLimits(pass); //} if (processor_ok) { cout << "processor limits ok " << endl; } else { cout << "processor limits ajusted" << endl; samples_ok = false; } } return samples_ok; } // fin ajout vf void TOIManager::setRequestedSample(int begin, int end) { reqBegin = begin; reqEnd = end; } int TOIManager::getRequestedBegin() { return reqBegin; } int TOIManager::getRequestedEnd() { return reqEnd; } void TOIManager::addThread(pthread_t* t) { // cout << "adding thread " << t << endl; threads.push_back(t); } void TOIManager::joinAll() { waitForAll(); } void TOIManager::waitForAll() { for (vector::iterator i = threads.begin(); i != threads.end(); i++) { pthread_t* pth = *i; cout << "joining thread " << pth << endl; pthread_join(*pth, NULL); cout << "thread joined " << pth << endl; } } // -----------ajout cgt vf 19/08/2002 void TOIManager::selectTOISegmented(int bufsz, int maxseg) { fgSegmented = true; segBuffsz = bufsz; segMaxseg = maxseg; } void TOIManager::selectTOISeqBuffered(int wsz) { fgSegmented = false; segBuffsz = wsz; } // methode connect de cgt simplifiee et corrigee TOI& TOIManager::connect(TOIProcessor& prout, string out, TOIProcessor& prin, string in, string nom, int wbsz, bool withFlag) { TOI* toi; if (nom.length() < 1) { char buff[128]; sprintf(buff, "TOI%s_[%s-%s]", nom, in, out); nom = buff; } if (wbsz < 16) wbsz = segBuffsz; // ajout test pour eviter de creer 2 tois en sortie if ((toi=prout.getOutToi(out)) == NULL) { //cout << "toi cree" << endl; if (fgSegmented) toi = new TOISegmented(nom, wbsz, segMaxseg); else toi = new TOISeqBuffered(nom, wbsz); // on ajoute le toi de sortie prout.addOutput(out, toi); } else { //cout << "toi deja cree stop" << endl; } if (withFlag) { // Si c'est un FITSTOIWriter FITSTOIWriter* ftw = dynamic_cast< FITSTOIWriter* >(&prin); if (ftw) ftw->addInput(in, toi, withFlag); else prin.addInput(in, toi); } else prin.addInput(in, toi); return(*toi); } TOI& TOIManager::connect(TOIProcessor& prout, const char* out, TOIProcessor& prin, const char* in, string nom, int wbsz, bool withFlag) { string outs = out; string ins = in; return connect(prout, outs, prin, ins, nom, wbsz, withFlag); } // ----------- fin ajout cgt // ----------------------------------------------------------------- // Classe pour affichage de l'avancement des TOIProcessors // Reza 08/2001 // ----------------------------------------------------------------- RzProcSampleCounter::RzProcSampleCounter() { _msg = "SampleCounter/Info"; _rate = 50; } RzProcSampleCounter::~RzProcSampleCounter() { } long RzProcSampleCounter::PrintStats() { int istart = 0; int iend = 0; long dns_print = 1000; int dns_print_fac = _rate; int nbmax_dns_print = 2; TOIManager* mgr = TOIManager::getManager(); // istart = mgr->getRequestedBegin(); // iend = mgr->getRequestedEnd(); istart = SampleBegin(); iend = SampleEnd(); dns_print = (iend-istart)/dns_print_fac; if (dns_print < 1000) dns_print = ((iend-istart) < 1000) ? (iend-istart) : 1000; if (dns_print < 1) dns_print = 1; nbmax_dns_print = (iend-istart)/dns_print; cout << "RzProcSampleCounter::PrintStats() InfoMessage=" << _msg << "\n ... " << _msg << " istart=" << istart << " iend= " << iend << " dns_print= " << dns_print << " nbmax_dns_print= " << nbmax_dns_print << endl; // ------------------- Impression continu de stat ------------------------ long nb_dns_print = 0; int nb_sleep = 0; long last_sample_count = 0; long processed_samples = 0; long total_sample_count = dns_print*nbmax_dns_print; bool alldone = false; double fracperc = 0.; int fperc = 0; while (!alldone) { processed_samples = ProcessedSampleCount(); if ( (processed_samples-last_sample_count > dns_print) || (processed_samples > total_sample_count-10) ) { last_sample_count = processed_samples; if (nb_dns_print == 0) cout << "\n"; nb_dns_print++; fracperc = (double)processed_samples*100./(double)total_sample_count; fperc = fracperc*100; cout << ">>> " << _msg << ": ProcessedSampleCount()= " << last_sample_count << " Frac done = " << (double)fperc/100. << " %" << endl; if (last_sample_count > total_sample_count-10) alldone = true; nb_sleep = 0; } else if ((nb_sleep+1)%5 == 0) { fracperc = (double)processed_samples*100./(double)total_sample_count; fperc = fracperc*100; cout << "> " << _msg << ": ProcSamples()= " << processed_samples << " Done = " << " %" << (double)fperc/100. << " NbSleep(1) = " << nb_sleep << endl; } sleep(1); nb_sleep++; } // ----------------------------------------------------------------------- return last_sample_count; }