source: tbroadcast/v1/src/tbroadcast.cxx @ 231

Last change on this file since 231 was 231, checked in by garonne, 18 years ago

import tbroadcast

File size: 29.1 KB
Line 
1
2#include <TCmThread.hxx>
3
4#include <iostream>
5#include <fstream>
6#include <strstream>
7#include <vector>
8#include <map>
9#include <cstdlib>
10#include <cstdio>
11#include <sys/time.h>
12
13
14/**
15 *
16 * Transactions
17 *
18 */
19
20class Transaction
21{
22public:
23  Transaction ();
24  Transaction (const std::string& info);
25  ~Transaction ();
26
27  void lock ();
28  void unlock ();
29  const std::string& get_info () const;
30  void do_signal ();
31  bool do_wait_signal (int milliseconds);
32  void do_wait ();
33  bool do_wait (int milliseconds);
34
35  std::string _info;
36  pthread_cond_t _var;
37  pthread_mutex_t _mutex;
38  int _count;
39};
40
41#include <time.h>
42#include <unistd.h>
43
44/**
45 * Local cleanup function
46 */
47static void UnlockMutex (void* arg)
48{
49  int status;
50 
51  status = pthread_mutex_unlock ((pthread_mutex_t*) arg);
52}
53
54Transaction::Transaction ()
55{
56  pthread_mutex_init (&_mutex, 0);
57  pthread_cond_init (&_var, 0);
58  _count = 0;
59}
60
61Transaction::Transaction (const std::string& info)
62{
63  _info   = info;
64  pthread_mutex_init (&_mutex, 0);
65  pthread_cond_init (&_var, 0);
66  _count = 0;
67}
68
69Transaction::~Transaction ()
70{
71  pthread_mutex_destroy (&_mutex);
72  pthread_cond_destroy (&_var);
73}
74
75void Transaction::lock ()
76{
77  pthread_mutex_lock (&_mutex);
78}
79
80void Transaction::unlock ()
81{
82  pthread_cond_broadcast (&_var);
83  pthread_mutex_unlock (&_mutex);
84}
85
86void Transaction::do_signal ()
87{
88  pthread_mutex_lock (&_mutex);
89  if (TCmThread::debug_level () > 0)
90    {
91      std::cout << "Transaction::do_signal>" << _info << " " << _count << std::endl;
92    }
93  _count++;
94  pthread_cond_broadcast (&_var);
95  pthread_mutex_unlock (&_mutex);
96}
97
98const std::string& Transaction::get_info () const
99{
100  return (_info);
101}
102
103bool Transaction::do_wait_signal (int milliseconds)
104{
105  int status = 0;
106
107  pthread_cleanup_push (UnlockMutex, &_mutex);
108  pthread_mutex_lock (&_mutex);
109
110  struct timeval now;
111
112  TCmMutex::iolock ();
113  gettimeofday (&now, 0);
114  TCmMutex::iounlock ();
115
116  struct timespec delta;
117  struct timespec* timeout;
118
119  delta.tv_sec = now.tv_sec + milliseconds / 1000;
120  delta.tv_nsec = now.tv_usec + milliseconds * 1000;
121
122  timeout = &delta;
123
124  if (TCmThread::debug_level () > 0)
125    {
126      std::cout << "Transaction::do_wait_signal(t)> " << _info << " " << _count << std::endl;
127    }
128 
129  status = pthread_cond_timedwait (&_var, &_mutex, timeout);
130 
131  //std::cout << "Transaction::do_wait(t)> " << _info << " status=" << status << std::endl;
132 
133  pthread_cleanup_pop (1);
134
135  return (status == 0);
136}
137
138void Transaction::do_wait ()
139{
140  pthread_cleanup_push (UnlockMutex, &_mutex);
141  pthread_mutex_lock (&_mutex);
142
143  if (TCmThread::debug_level () > 0)
144    {
145      std::cout << "Transaction::do_wait> " << _info << std::endl;
146    }
147
148  pthread_cond_wait (&_var, &_mutex);
149  pthread_cleanup_pop (1);
150}
151
152bool Transaction::do_wait (int milliseconds)
153{
154  int status = 0;
155
156  pthread_cleanup_push (UnlockMutex, &_mutex);
157  pthread_mutex_lock (&_mutex);
158
159  struct timeval now;
160
161  TCmMutex::iolock ();
162  gettimeofday (&now, 0);
163  TCmMutex::iounlock ();
164
165  struct timespec delta;
166  struct timespec* timeout;
167
168  delta.tv_sec = now.tv_sec + milliseconds / 1000;
169  delta.tv_nsec = now.tv_usec + milliseconds * 1000;
170
171  timeout = &delta;
172
173  if (TCmThread::debug_level () > 0)
174    {
175      std::cout << "Transaction::do_wait(t)> " << _info << std::endl;
176    }
177 
178  status = pthread_cond_timedwait (&_var, &_mutex, timeout);
179 
180  //std::cout << "Transaction::do_wait(t)> " << _info << " status=" << status << std::endl;
181 
182  pthread_cleanup_pop (1);
183 
184  return (status == 0);
185}
186
187/**
188 * The configuration parameters for the program
189 */
190int MaxThreads = 20;
191
192/**
193 * Global variables
194 */
195int LastId = 0;
196
197/// A transaction to wake up the scheduler
198Transaction QueueTr ("QueueTr");
199
200/// Count the number of objects still pending
201int ObjectCount = 0;
202
203/// Accumulating the run time
204double TotalTime = 0;
205
206/// A mutex for ObjectCount and TotalTime
207TCmMutex CountLock ("CountLock");
208
209class A;
210
211/// All pending objects (and associated mutex)
212std::vector <A*> ThreadQueue;
213TCmMutex ThreadLock ("ThreadLock");
214
215/// Counting the running threads (and associated mutex)
216int ThreadCount = 0;
217TCmMutex ThreadCountLock ("ThreadCountLock");
218
219class Package;
220
221/**
222 * Dictionaries for packages and for runner objects
223 */
224std::map <std::string, Package*> Packages;
225std::map <std::string, Project*> Projects;
226std::map <std::string, A*> As;
227int PackageCount = 0;
228
229/**
230 *  The shell command to run in every runner
231 */
232std::string Command = "";
233
234/**
235 *  The expanded CMTPATH
236 */
237std::vector<std::string> Cmtpath_items;
238
239/**
240 *  Global parameters for selection or exclusion
241 */
242std::vector <std::string> Selections;
243std::vector <std::string> Exclusions;
244std::string Begin;
245TCmMutex BeginLock ("BeginLock");
246std::vector <std::string> Cmtpath_selections;
247
248/**
249 * Helper to get a flat random integer value
250 */
251unsigned int get_random (unsigned int range)
252{
253  double x = (((double) range) * (double) rand ()) / ((double) RAND_MAX);
254
255  return ((unsigned int) x);
256}
257
258std::string p_pwd ()
259{
260  char buffer[256] = "";
261  char* ptr = 0;
262  ptr = getcwd (buffer, sizeof (buffer));
263
264  const char* t = &buffer[0];
265  return ((std::string) t);
266}
267
268void p_basename (const std::string& file_name, std::string& result)
269{
270  unsigned int pos = file_name.find_last_of ('/');
271  if (pos == std::string::npos)
272    {
273      pos = file_name.find_last_of ('\\');
274    }
275
276  if (pos == std::string::npos)
277    {
278      result = file_name;
279    }
280  else
281    {
282      result = file_name.substr (pos + 1);
283    }
284}
285
286//--------------------------------------------------
287void p_dirname (const std::string& file_name, std::string& result)
288{
289  unsigned int pos = file_name.find_last_of ('/');
290  if (pos == std::string::npos)
291    {
292      pos = file_name.find_last_of ('\\');
293    }
294
295  if (pos == std::string::npos)
296    {
297      result = "";
298    }
299  else
300    {
301      result = file_name;
302      result.erase (pos);
303    }
304}
305
306
307/**
308 * Execute a shell command and accumulate its output into a string
309 */
310static int execute (const std::string& command, std::string& output)
311{
312  output = "";
313
314  FILE* f = popen (command.c_str (), "r"); 
315 
316  if (f != 0) 
317    { 
318      char line[256]; 
319      char* ptr;
320
321      while ((ptr = fgets (line, sizeof (line), f)) != NULL) 
322        {
323          output += ptr;
324        } 
325      pclose (f);
326
327      return (1);
328    }
329
330  return (0);
331}
332
333/**
334 * The Package class stores information about a package
335 */
336class Package
337{
338public:
339  Package (const std::string& name,
340           const std::string& version,
341           const std::string& offset,
342           const std::string& cmtpath) :
343    m_name (name),
344    m_version (version),
345    m_offset (offset),
346    m_cmtpath (cmtpath)
347  {
348  }
349
350  void set (const std::string& name,
351            const std::string& version,
352            const std::string& offset,
353            const std::string& cmtpath)
354  {
355    m_name = name;
356    m_version = version;
357    m_offset = offset;
358    m_cmtpath = cmtpath;
359
360    m_location = m_cmtpath;
361
362    if (m_cmtpath != "")
363      {
364        if (m_offset != "") 
365          {
366            m_location += m_offset;
367          }
368        m_location += "/";
369      }
370
371    m_location += m_name;
372    m_location += "/";
373    m_location += m_version;
374    m_location += "/cmt";
375  }
376
377  const std::string& name () const
378  {
379    return (m_name);
380  }
381
382  const std::string& version () const
383  {
384    return (m_version);
385  }
386
387  const std::string& offset () const
388  {
389    return (m_offset);
390  }
391
392  const std::string& cmtpath () const
393  {
394    return (m_cmtpath);
395  }
396
397  const std::string& location () const
398  {
399    return (m_location);
400  }
401
402private:
403  std::string m_name;
404  std::string m_version;
405  std::string m_offset;
406  std::string m_cmtpath;
407  std::string m_location;
408};
409
410/**
411 * A runner object. It is associated with a package.
412 */
413class A : public TCmThread
414{
415public:
416  A () : m_package (0),
417         m_started (false), 
418         m_finished (false), 
419         m_ready (false),
420         m_queued (false),
421         m_launched (false),
422         m_run_status (0)
423  {
424    ObjectCount++;
425    LastId++;
426    m_id = LastId;
427  }
428
429  A (Package* package) : m_package (package),
430                         m_started (false), 
431                         m_finished (false), 
432                         m_ready (false),
433                         m_queued (false),
434                         m_launched (false),
435                         m_run_status (0)
436  {
437    ObjectCount++;
438    LastId++;
439    m_id = LastId;
440    m_mutex.set_info (name ());
441  }
442
443  ~A ()
444  {
445  }
446
447  /**
448   * Called when a runner terminates.
449   *
450   *   1) Push back the parent to the activation queue when it's ready to start
451   *   2) wake up the scheduler
452   */
453  void terminate ()
454  {
455    if (m_parents.size () > 0)
456      {
457        std::vector <A*>::iterator it;
458
459        for (it = m_parents.begin (); it != m_parents.end (); ++it)
460          {
461            A* a = *it;
462
463            a->terminate_child (this);
464
465            if (a->should_start ())
466              {
467                a->queue ();
468              }
469          }
470      }
471
472    QueueTr.do_signal ();
473  }
474
475  /**
476   * Callback called after a thread terminates.
477   *
478   *   1) count down the number of active threads
479   *   2) Push back the parent to the activation queue when it's ready to start
480   *   3) wake up the scheduler
481   */
482  void cleanup ()
483  {
484    /*
485    TCmMutex::iolock ();
486    std::cout << "cleanup " << name () << std::endl;
487    TCmMutex::iounlock ();
488    */
489
490    ThreadCountLock.lock ("cleanup");
491    ThreadCount--;
492    ThreadCountLock.unlock ();
493
494    terminate ();
495  }
496
497  /**
498   * Construct the name of this runner from the package name
499   */
500  std::string name () const
501  {
502    if (m_package != 0)
503      {
504        return (m_package->name ());
505      }
506    else
507      {
508        char buff [80];
509        sprintf (buff, "A%d", m_id);
510
511        return (buff);
512      }
513  }
514
515  const std::string& version () const
516  {
517    if (m_package != 0)
518      {
519        return (m_package->version ());
520      }
521    else
522      {
523        static std::string empty;
524        return (empty);
525      }
526  }
527
528  const std::string& offset () const
529  {
530    if (m_package != 0)
531      {
532        return (m_package->offset ());
533      }
534    else
535      {
536        static std::string empty;
537        return (empty);
538      }
539  }
540
541  const std::string& cmtpath () const
542  {
543    if (m_package != 0)
544      {
545        return (m_package->cmtpath ());
546      }
547    else
548      {
549        static std::string empty;
550        return (empty);
551      }
552  }
553
554  const std::string& location () const
555  {
556    if (m_package != 0)
557      {
558        return (m_package->location ());
559      }
560    else
561      {
562        static std::string empty;
563        return (empty);
564      }
565  }
566
567  int id () const
568  {
569    return (m_id);
570  }
571
572  /**
573   * Recursively find if an object is already in the parent list
574   * (check for cycles)
575   */
576  bool has_parent (A* a)
577  {
578    if (a == this) return (true);
579
580    std::vector <A*>::iterator it;
581
582    for (it = m_parents.begin (); it != m_parents.end (); ++it)
583      {
584        A* p = *it;
585
586        if (p == a) return (true);
587
588        if (p->has_parent (a)) return (true);
589      }
590
591    return (false);
592  }
593
594  /**
595   *  Adds a unique child (should first check for cycles)
596   *  Avoid duplications.
597   */
598  void add_child (A* a)
599  {
600    if (a != 0)
601      {
602        std::vector <A*>::iterator it;
603
604        for (it = m_children.begin (); it != m_children.end (); ++it)
605          {
606            A* c = *it;
607            if (c == a) return;
608          }
609
610        //std::cout << name () << " add child " << a->name () << std::endl;
611        m_children.push_back (a);
612        a->add_parent (this);
613      }
614  }
615
616  /**
617   * Adds a parent without duplication
618   */
619  void add_parent (A* a)
620  {
621    std::vector <A*>::iterator it;
622
623    for (it = m_parents.begin (); it != m_parents.end (); ++it)
624      {
625        A* p = *it;
626        if (p == a) return;
627      }
628
629    m_parents.push_back (a);
630  }
631
632  void dequeue ()
633  {
634    m_mutex.lock ("dequeue");
635    m_queued = false;
636    m_mutex.unlock ();
637  }
638
639  bool is_finished ()
640  {
641    bool result;
642
643    m_mutex.lock ("is_finished");
644    result = m_finished;
645    m_mutex.unlock ();
646
647    return (result);
648  }
649
650  /**
651   *  A runner can be queued if it was never started yet and
652   *  it has no more children
653   */
654  bool can_be_queued ()
655  {
656    bool result;
657
658    m_mutex.lock ("can_be_queued");
659    result = !m_finished && !m_started && (m_children.size () == 0);
660    m_mutex.unlock ();
661
662    return (result);
663  }
664
665  /**
666   *  A runner should start when all children are finished
667   */
668  bool should_start ()
669  {
670    bool result = (check_state () == ready_to_start);
671
672    return (result);
673  }
674
675  int get_run_status ()
676  {
677    int result;
678
679    m_mutex.lock ("get_run_status");
680    result = m_run_status;
681    m_mutex.unlock ();
682
683    return (result);
684  }
685
686  typedef enum
687  {
688    already_started,
689    already_finished,
690    not_yet_completed,
691    ready_to_start,
692    really_started,
693    queued
694  } lock_status;
695
696  /**
697   *  Analyze the state of a runner
698   */
699  lock_status check_state ()
700  {
701    bool done = false;
702
703    m_mutex.lock ("check_state-1");
704    done = m_finished;
705    m_mutex.unlock ();
706
707    if (done)
708      {
709        return (already_finished);
710      }
711
712    m_mutex.lock ("check_state-2");
713    done = m_started;
714    m_mutex.unlock ();
715
716    if (done)
717      {
718        return (already_started);
719      }
720
721    // not yet started
722
723    /*
724    TCmMutex::iolock ();
725    std::cout << "check_state-1> " << name () << std::endl;
726    TCmMutex::iounlock ();
727    */
728
729    bool should_check_children = false;
730
731    m_mutex.lock ("check_state-3");
732    should_check_children = !m_ready;
733    m_mutex.unlock ();
734
735    if (should_check_children)
736      {
737        // check for unfinished children
738
739        std::vector <A*>::iterator it;
740
741        bool is_ready = true;
742
743        m_mutex.lock ("check_state-4");
744        for (it = m_children.begin (); it != m_children.end (); ++it)
745          {
746            A* c = *it;
747
748            if (!c->is_finished ()) 
749              {
750                is_ready = false;
751                break;
752              }
753          }
754        m_mutex.unlock ();
755
756        if (!is_ready)
757          {
758            return (not_yet_completed);
759          }
760
761        // all children are finished
762       
763        m_mutex.lock ("check_state-4");
764        m_ready = true;
765        m_mutex.unlock ();
766      }
767
768    return (ready_to_start);
769  }
770
771  /**
772   *  Actually ask the runner to run its thread
773   */
774  bool trigger ()
775  {
776    /*
777    TCmMutex::iolock ();
778    std::cout << "trigger> " << name ();
779    std::cout << std::endl;
780    TCmMutex::iounlock ();
781    */
782
783    ThreadCountLock.lock ("trigger");
784    ThreadCount++;
785    ThreadCountLock.unlock ();
786   
787    m_mutex.lock ("trigger");
788    m_started = true;
789    m_mutex.unlock ();
790
791    bool result = start ();
792
793    return (result);
794  }
795
796  /**
797   *  Abort the planned activity. Must emulate all
798   *  mechanisms of trigger-start-cleanup-wakeup
799   */
800  void abort ()
801  {
802    CountLock.lock ("abort");
803    ObjectCount--;
804    int oc = ObjectCount;
805    CountLock.unlock ();
806
807    TCmMutex::iolock ();
808
809    std::cout << "# Aborting " << name ()
810              << " (" << PackageCount - oc << "/" << PackageCount << ")" 
811              << std::endl;
812
813    TCmMutex::iounlock ();
814
815    m_mutex.lock ("abort");
816    m_finished = true;
817    m_run_status = 1;
818    m_mutex.unlock ();
819
820    terminate ();
821  }
822
823  /**
824   *  Ignore the planned activity. Must emulate all
825   *  mechanisms of trigger-start-cleanup-wakeup
826   */
827  void ignore ()
828  {
829    CountLock.lock ("ignore");
830    ObjectCount--;
831    //int oc = ObjectCount;
832    CountLock.unlock ();
833
834    /*
835    TCmMutex::iolock ();
836
837    std::cout << "# Ignoring " << name ()
838              << " (" << PackageCount - oc << "/" << PackageCount << ")"
839              << std::endl;
840
841    TCmMutex::iounlock ();
842    */
843
844    m_mutex.lock ("ignore");
845    m_finished = true;
846    m_run_status = 0;
847    m_mutex.unlock ();
848
849    terminate ();
850  }
851
852  /**
853   *  Construct a text representation of the runner state
854   */
855  std::string get_state ()
856  {
857    std::string s = "(";
858
859    m_mutex.lock ("get_state");
860    if (m_started) s += "s";
861    if (m_finished) s += "f";
862    if (m_ready) s += "r";
863    if (m_queued) s += "q";
864    if (m_launched) s += "l";
865    m_mutex.unlock ();
866
867    s += ")";
868
869    return (s);
870  }
871
872  /**
873   *  Debugging function
874   */
875  void special_show ()
876  {
877    std::string s = "";
878
879    s = get_state ();
880
881    s += " p[";
882    std::vector<A*>::iterator it;
883
884    for (it = m_parents.begin (); it != m_parents.end (); ++it)
885      {
886        A* a = *it;
887        s += a->name ();
888        s += a->get_state ();
889        s += " ";
890      }
891   
892    s += "]";
893    s += " c[";
894   
895    for (it = m_children.begin (); it != m_children.end (); ++it)
896      {
897        A* a = *it;
898        s += a->name ();
899        s += a->get_state ();
900        s += " ";
901      }
902   
903    s += "]";
904
905    TCmMutex::iolock ();
906    std::cout << "### " << name () << " " << s << std::endl;
907    TCmMutex::iounlock ();
908  }
909
910  /**
911   *   A child has just terminated (possibly with error)
912   *   The parent removes it from its children and inherit error.
913   */
914  void terminate_child (A* child)
915  {
916    if (child == 0) return;
917
918    std::vector <A*>::iterator cit;
919
920    m_mutex.lock ("terminate_child");
921
922    for (cit = m_children.begin (); cit != m_children.end (); ++cit)
923      {
924        A* c = *cit;
925        if (c == child)
926          {
927            int status = c->get_run_status ();
928            if (status != 0)
929              {
930                m_run_status = status;
931              }
932            m_children.erase (cit);
933            break;
934          }
935      }
936
937    m_mutex.unlock ();
938
939  }
940
941  bool selected ()
942  {
943    if ((Begin == "") && 
944        (Cmtpath_selections.size () == 0) && 
945        (Selections.size () == 0) && 
946        (Exclusions.size () == 0)) return (true);
947
948    const std::string& loc = location ();
949
950    bool result = true;
951
952    BeginLock.lock ("selected");
953    if (Begin != "")
954      {
955        if (loc.find (Begin) == std::string::npos)
956          {
957            result = false;
958          }
959        else
960          {
961            Begin = "";
962          }
963      }
964    BeginLock.unlock ();
965
966    if (!result)
967      {
968        return (result);
969      }
970
971    if (Cmtpath_selections.size () != 0)
972      {
973        result = false;
974
975        std::vector<std::string>::iterator sit;
976
977        for (sit = Cmtpath_selections.begin (); sit != Cmtpath_selections.end (); ++sit)
978          {
979            const std::string& s = *sit;
980           
981            if (loc.find (s) == 0)
982              {
983                result = true;
984                break;
985              }
986          }
987      }
988
989    if (Selections.size () != 0)
990      {
991        result = false;
992
993        std::vector<std::string>::iterator sit;
994
995        for (sit = Selections.begin (); sit != Selections.end (); ++sit)
996          {
997            const std::string& s = *sit;
998           
999            if (loc.find (s) != std::string::npos)
1000              {
1001                result = true;
1002                break;
1003              }
1004          }
1005      }
1006
1007    if (result)
1008      {
1009        std::vector<std::string>::iterator eit;
1010
1011        for (eit = Exclusions.begin (); eit != Exclusions.end (); ++eit)
1012          {
1013            const std::string& e = *eit;
1014           
1015            if (loc.find (e) != std::string::npos)
1016              {
1017                result = false;
1018                break;
1019              }
1020          }
1021      }
1022
1023    return (result);
1024  }
1025
1026  /**
1027   *  Send a runner to the activation queue.
1028   *  to optimize the next checks
1029   */
1030  void queue ()
1031  {
1032    /*
1033    TCmMutex::iolock ();
1034    std::cout << "Queue-begin> " << name ();
1035    std::cout << std::endl;
1036    TCmMutex::iounlock ();
1037    */
1038
1039    /*
1040    ThreadLock.lock ("queue-1");
1041    std::vector <A*>::iterator it;
1042    TCmMutex::iolock ();
1043    std::cout << "Q[";
1044    for (it = ThreadQueue.begin (); it != ThreadQueue.end (); ++it)
1045      {
1046        A* a = *it;
1047        std::cout << a->name () << " ";
1048      }
1049    std::cout << "]" << std::endl;
1050    TCmMutex::iounlock ();
1051    ThreadLock.unlock ();
1052    */
1053
1054    if (!selected ())
1055      {
1056        ignore ();
1057        return;
1058      }
1059
1060    m_mutex.lock ("queue-1");
1061
1062    if (!m_queued)
1063      {
1064        /*
1065          std::string s = get_state ();
1066
1067          TCmMutex::iolock ();
1068          std::cout << "Sending " << name () << " to queue";
1069          std::cout << " " << s << std::endl;
1070          TCmMutex::iounlock ();
1071        */
1072
1073        ThreadLock.lock ("queue-2");
1074        m_queued = true;
1075        ThreadQueue.push_back (this);
1076        ThreadLock.unlock ();
1077      }
1078    else
1079      {
1080        /*
1081          std::string s = get_state ();
1082
1083          TCmMutex::iolock ();
1084          std::cout << "Keeping " << name () << " in queue";
1085          std::cout << " " << s << std::endl;
1086          TCmMutex::iounlock ();
1087        */
1088      }
1089
1090    m_mutex.unlock ();
1091  }
1092
1093  /**
1094   * Initialize the activation queue by selectively pushing leaves
1095   * of the graph
1096   */
1097  void launch ()
1098  {
1099    std::map<std::string, A*>::iterator ait;
1100    for (ait = As.begin (); ait != As.end (); ++ait)
1101      {
1102        A* a = (*ait).second;
1103        if (a->can_be_queued ())
1104          {
1105            a->queue ();
1106          }
1107      }
1108
1109    /*
1110    if (m_launched) return;
1111
1112    m_launched = true;
1113
1114    if (m_children.size () > 0)
1115      {
1116        for (unsigned int i = 0; i < m_children.size (); i++)
1117          {
1118            A* child = m_children[i];
1119           
1120            child->launch ();
1121          }
1122      }
1123    else
1124      {
1125        queue ();
1126      }
1127    */
1128  }
1129
1130  /**
1131   *  The runner threaded activity.
1132   *
1133   *   1) execute the command (and accumulates its output in a text)
1134   *   2) display the output and statistics
1135   *   3) set the finished state
1136   *
1137   *   (the cleanup callback will manage the post-run activities)
1138   */
1139  void run ()
1140  {
1141    int status = 0;
1142    std::string output;
1143
1144    std::string cp = cmtpath ();
1145
1146    std::string where = location ();
1147
1148    if (cp == "")
1149      {
1150        output = "# Package ";
1151        output += name ();
1152        output += " not found";
1153      }
1154    else
1155      {
1156       
1157        std::string cmd = "(cd ";
1158        cmd += where;
1159        if (Command != "")
1160          {
1161            cmd += "; ";
1162            cmd += Command;
1163          }
1164        cmd += "; echo cmtbroadcaststatus=$?) 2>&1";
1165
1166        execute (cmd, output);
1167
1168        unsigned int status_pos = output.find ("cmtbroadcaststatus=");
1169        if (status_pos != std::string::npos)
1170          {
1171            std::string s = output.substr (status_pos);
1172            output.erase (status_pos);
1173            sscanf (s.c_str (), "cmtbroadcaststatus=%d", &status);
1174          }
1175      }
1176
1177    CountLock.lock ("run");
1178    ObjectCount--;
1179    int oc = ObjectCount;
1180    CountLock.unlock ();
1181
1182    TCmMutex::iolock ();
1183
1184    std::cout << "#--------------------------------------------------------------" << std::endl;
1185    std::cout << "# Now trying [" << Command << "] in " << where
1186              << " (" << PackageCount - oc << "/" << PackageCount << ")" 
1187              << std::endl;
1188    std::cout << "#--------------------------------------------------------------" << std::endl;
1189
1190    std::cout << output << std::endl;
1191
1192    TCmMutex::iounlock ();
1193
1194    m_mutex.lock ("run");
1195    m_finished = true;
1196    m_run_status = status;
1197    m_mutex.unlock ();
1198  }
1199
1200private:
1201  int m_id;
1202  std::vector <A*> m_children;
1203  std::vector <A*> m_parents;
1204  Package* m_package;
1205  bool m_started;
1206  bool m_finished;
1207  bool m_ready;
1208  bool m_queued;
1209  bool m_launched;
1210  TCmMutex m_mutex;
1211  int m_run_status;
1212};
1213
1214/**
1215 *  Analyze one line of the "cmt show uses" result
1216 */
1217void parse_line (const std::string& line, A& top)
1218{
1219  static std::vector <A*> Hierarchy;
1220
1221  std::istrstream s (line.c_str ());
1222
1223  if (line[0] == '#')
1224    {
1225      /**
1226       * First section : the hierarchy
1227       *  o create all Packages and Runners
1228       *  o construct the graph (avoiding cycles)
1229       */
1230
1231      std::string s1;
1232      std::string s2;
1233      s >> s1 >> s2;
1234
1235      if (s2 != "use") return;
1236
1237      unsigned int pos = 2;
1238
1239      pos = line.find ("use") - 2;
1240     
1241      unsigned int level = (pos / 2);
1242
1243      std::string n;
1244      std::string v;
1245      std::string o;
1246
1247      s >> n >> v >> o;
1248
1249      if (o[0] == '(')
1250        {
1251          o = "";
1252        }
1253
1254      std::map <std::string, Package*>::iterator p_it;
1255
1256      p_it = Packages.find (n);
1257
1258      Package* package;
1259
1260      if (p_it == Packages.end ())
1261        {
1262          package = new Package (n, v, o, "");
1263          Packages[n] = package;
1264        }
1265      else
1266        {
1267          package = (*p_it).second;
1268        }
1269
1270      //std::cout << "Level=" << level << " n=" << n << std::endl;
1271
1272      A* parent = 0;
1273
1274      if (level > 0)
1275        {
1276          parent = Hierarchy[level - 1];
1277        }
1278      else
1279        {
1280          parent = &top;
1281        }
1282
1283      std::map <std::string, A*>::iterator a_it;
1284
1285      a_it = As.find (n);
1286
1287      A* a;
1288
1289      if (a_it == As.end ())
1290        {
1291          a = new A (package);
1292          As[n] = a;
1293        }
1294      else
1295        {
1296          a = (*a_it).second;
1297        }
1298
1299      if (Hierarchy.size () <= level)
1300        {
1301          Hierarchy.push_back (a);
1302        }
1303      else
1304        {
1305          Hierarchy[level] = a;
1306        }
1307
1308      if (parent != 0)
1309        {
1310          if (parent->has_parent (a)) 
1311            {
1312              std::cout << "Cycle for package " << n << std::endl;
1313            }
1314          else
1315            {
1316              parent->add_child (a);
1317            }
1318        }
1319    }
1320  else
1321    {
1322      /**
1323       *  Second section : get physical package information
1324       */
1325
1326      std::string s1;
1327      std::string n;
1328      std::string v;
1329      std::string o;
1330      std::string p;
1331      s >> s1;
1332      if (s1 != "use") return;
1333      s >> n >> v >> o;
1334      if (o[0] == '(')
1335        {
1336          p = o.substr (1, o.length() - 2);
1337          o = "";
1338        }
1339      else
1340        {
1341          s >> p;
1342          p = p.substr (1, p.length() - 2);
1343        }
1344     
1345      //std::cout << "n=" << n << " v=" << v << " o=" << o << " p=" << p << std::endl;
1346
1347      std::map <std::string, Package*>::iterator it;
1348
1349      it = Packages.find (n);
1350
1351      if (it != Packages.end ())
1352        {
1353          Package* pa = (*it).second;
1354          pa->set (n, v, o, p);
1355        }
1356      else
1357        {
1358          Packages[n] = new Package (n, v, o, p);
1359        }
1360    }
1361}
1362
1363class Scheduler
1364{
1365public:
1366  Scheduler ()
1367  {
1368  }
1369
1370  /**
1371   *  Obtain all packages from a cmt show uses command
1372   */
1373  void create_packages_from_command (const std::string& command)
1374  {
1375    std::string pwd = p_pwd ();
1376
1377    std::string v = pwd;
1378    p_dirname (pwd, v);
1379    std::string p = v;
1380    p_basename (v, v);
1381    p_dirname (p, p);
1382    std::string c = p;
1383    p_basename (p, p);
1384
1385    p_dirname (c, c);
1386
1387    static Package current (p, v, "", c);
1388
1389    current.set (p, v, "", c);
1390
1391    static A top (&current);
1392
1393    FILE* f = popen (command.c_str (), "r"); 
1394 
1395    if (f != 0) 
1396      { 
1397        char line[256]; 
1398        char* ptr;
1399
1400        std::string s;
1401
1402        while ((ptr = fgets (line, sizeof (line), f)) != NULL) 
1403          {
1404            s = ptr;
1405            parse_line (s, top);
1406          } 
1407        pclose (f);
1408      }
1409
1410    PackageCount = Packages.size ();
1411    top.launch ();
1412  }
1413 
1414  A* get_next_pending_object ()
1415  {
1416    A* to_start = 0;
1417
1418    ThreadLock.lock ("get_next_pending_object");
1419    if (ThreadCount < MaxThreads)
1420      {
1421        if (ThreadQueue.size () > 0)
1422          {
1423            std::vector <A*>::iterator it;
1424            it = ThreadQueue.begin ();
1425            if (it != ThreadQueue.end ())
1426              {
1427                to_start = *it;
1428                ThreadQueue.erase (it);
1429                to_start->dequeue ();
1430              }
1431          }
1432      }
1433    ThreadLock.unlock ();
1434
1435    return (to_start);
1436  }
1437
1438  bool should_wait ()
1439  {
1440    bool result = true;
1441
1442    ThreadCountLock.lock ("main-1");
1443    int tc = ThreadCount;
1444    ThreadCountLock.unlock ();
1445
1446    ThreadLock.lock ("should_wait");
1447    int ts = ThreadQueue.size ();
1448    ThreadLock.unlock ();
1449
1450    if ((tc < MaxThreads) && (ts > 0)) 
1451      {
1452        result = false;
1453      }
1454
1455    return (result);
1456  }
1457
1458  void run ()
1459  {
1460    QueueTr.do_signal ();
1461
1462    for (;;)
1463      {
1464        int oc = 0;
1465       
1466        ThreadLock.lock ("run");
1467        oc = ObjectCount;
1468        ThreadLock.unlock ();
1469
1470        /// the scheduler stops when all objects have been activated
1471        if (oc == 0) break;
1472
1473        A* to_start = get_next_pending_object ();
1474
1475        if (to_start != 0)
1476          {
1477            if (to_start->should_start ())
1478              {
1479                int status = to_start->get_run_status ();
1480
1481                if (status == 0)
1482                  {
1483                    // start it now
1484
1485                    if (!to_start->trigger ())
1486                      {
1487                        to_start->queue ();
1488                      }
1489                  }
1490                else
1491                  {
1492                    to_start->abort ();
1493                  }
1494              }
1495            else
1496              {
1497                // Now that the queue only contains ready_to_start runners, this
1498                // case should never happen !!
1499                // however we re-queue it
1500               
1501                to_start->queue ();
1502              }
1503          }
1504
1505        if (should_wait ())
1506          {
1507            QueueTr.do_wait_signal (10000);
1508          }
1509      }
1510  }
1511};
1512
1513void expand_cmtpath ()
1514{
1515  const char* cmtpath_str = ::getenv ("CMTPATH");
1516
1517  if (cmtpath_str != 0)
1518    {
1519      static const char path_separator = ':';
1520
1521      std::string cmtpath_list (cmtpath_str);
1522      std::string::size_type pos = 0;
1523
1524      for (;;)
1525        {
1526          bool ending = false;
1527
1528          std::string::size_type next = cmtpath_list.find (path_separator, pos);
1529
1530          std::string path;
1531
1532          if (next == std::string::npos)
1533            {
1534              path = cmtpath_list.substr (pos);
1535              ending = true;
1536            }
1537          else
1538            {
1539              path = cmtpath_list.substr (pos, next - pos);
1540              pos = next + 1;
1541            }
1542
1543          Cmtpath_items.push_back (path);
1544          if (ending) break;
1545        }
1546    }
1547}
1548
1549/**
1550 *  Entry point for the application
1551 */
1552int main (int argc, char* argv[])
1553{
1554  argc--;
1555  argv++;
1556
1557  bool in_options = true;
1558  bool is_global = false;
1559
1560  expand_cmtpath ();
1561
1562  while (argc > 0)
1563    {
1564      if (in_options)
1565        {
1566          std::string opt = argv[0];
1567
1568          if (opt[0] == '-')
1569            {
1570              if (opt.substr (0, 9) == "-exclude=")
1571                {
1572                  std::string exclusion;
1573
1574                  exclusion = opt.substr (9);
1575
1576                  unsigned int size = exclusion.size ();
1577             
1578                  if (size >= 2)
1579                    {
1580                      if (((exclusion[0] == '"') && (exclusion[size - 1] == '"')) ||
1581                          ((exclusion[0] == '\'') && (exclusion[size - 1] == '\'')))
1582                        {
1583                          exclusion.erase (size - 1);
1584                          exclusion.erase (0, 1);
1585                        }
1586                    }
1587
1588                  std::istrstream buff (exclusion.c_str ());
1589                  while (!buff.eof ())
1590                    {
1591                      std::string s;
1592                      buff >> s;
1593                      Exclusions.push_back (s);
1594                    }
1595                }
1596              else if (opt.substr (0, 7) == "-global")
1597                {
1598                  is_global = true;
1599                }
1600              else if (opt.substr (0, 8) == "-select=")
1601                {
1602                  std::string selection;
1603
1604                  selection = opt.substr (8);
1605
1606                  unsigned int size = selection.size ();
1607             
1608                  if (size >= 2)
1609                    {
1610                      if (((selection[0] == '"') && (selection[size - 1] == '"')) ||
1611                          ((selection[0] == '\'') && (selection[size - 1] == '\'')))
1612                        {
1613                          selection.erase (size - 1);
1614                          selection.erase (0, 1);
1615                        }
1616                    }
1617
1618                  std::istrstream buff (selection.c_str ());
1619                  while (!buff.eof ())
1620                    {
1621                      std::string s;
1622                      buff >> s;
1623                      Selections.push_back (s);
1624                    }
1625                }
1626              else if (opt.substr (0, 7) == "-begin=")
1627                {
1628                  Begin = opt.substr (7);
1629                }
1630              else if (opt.substr (0, 9) == "-threads=")
1631                {
1632                  std::string o = opt.substr (7);
1633                  int value = -1;
1634                  sscanf (o.c_str (), "%d", &value);
1635
1636                  if (value > 0)
1637                    {
1638                      MaxThreads = value;
1639                    }
1640                }
1641            }
1642          else
1643            {
1644              in_options = false;
1645            }
1646        }
1647
1648      if (!in_options)
1649        {
1650          Command += argv[0];
1651          Command += " ";
1652        }
1653
1654      argc--;
1655      argv++;
1656    }
1657
1658  if (is_global)
1659    {
1660      Cmtpath_selections = Cmtpath_items;
1661    }
1662  else
1663    {
1664      if (Cmtpath_items.size () != 0)
1665        {
1666          std::string s = Cmtpath_items[0];
1667          Cmtpath_selections.push_back (s);
1668        }
1669    }
1670
1671  //TCmThread::set_debug_level (1);
1672
1673  Scheduler s;
1674
1675  s.create_packages_from_command ("cmt show uses 2>&1 ");
1676
1677  s.run ();
1678
1679  return (0);
1680}
Note: See TracBrowser for help on using the repository browser.