source: tbroadcast/v1/src/tbroadcast.cxx@ 232

Last change on this file since 232 was 231, checked in by garonne, 19 years ago

import tbroadcast

File size: 29.1 KB
Line 
1
2#include <TCmThread.hxx>
3
4#include <iostream>
5#include <fstream>
6#include <strstream>
7#include <vector>
8#include <map>
9#include <cstdlib>
10#include <cstdio>
11#include <sys/time.h>
12
13
14/**
15 *
16 * Transactions
17 *
18 */
19
20class Transaction
21{
22public:
23 Transaction ();
24 Transaction (const std::string& info);
25 ~Transaction ();
26
27 void lock ();
28 void unlock ();
29 const std::string& get_info () const;
30 void do_signal ();
31 bool do_wait_signal (int milliseconds);
32 void do_wait ();
33 bool do_wait (int milliseconds);
34
35 std::string _info;
36 pthread_cond_t _var;
37 pthread_mutex_t _mutex;
38 int _count;
39};
40
41#include <time.h>
42#include <unistd.h>
43
44/**
45 * Local cleanup function
46 */
47static void UnlockMutex (void* arg)
48{
49 int status;
50
51 status = pthread_mutex_unlock ((pthread_mutex_t*) arg);
52}
53
54Transaction::Transaction ()
55{
56 pthread_mutex_init (&_mutex, 0);
57 pthread_cond_init (&_var, 0);
58 _count = 0;
59}
60
61Transaction::Transaction (const std::string& info)
62{
63 _info = info;
64 pthread_mutex_init (&_mutex, 0);
65 pthread_cond_init (&_var, 0);
66 _count = 0;
67}
68
69Transaction::~Transaction ()
70{
71 pthread_mutex_destroy (&_mutex);
72 pthread_cond_destroy (&_var);
73}
74
75void Transaction::lock ()
76{
77 pthread_mutex_lock (&_mutex);
78}
79
80void Transaction::unlock ()
81{
82 pthread_cond_broadcast (&_var);
83 pthread_mutex_unlock (&_mutex);
84}
85
86void Transaction::do_signal ()
87{
88 pthread_mutex_lock (&_mutex);
89 if (TCmThread::debug_level () > 0)
90 {
91 std::cout << "Transaction::do_signal>" << _info << " " << _count << std::endl;
92 }
93 _count++;
94 pthread_cond_broadcast (&_var);
95 pthread_mutex_unlock (&_mutex);
96}
97
98const std::string& Transaction::get_info () const
99{
100 return (_info);
101}
102
103bool Transaction::do_wait_signal (int milliseconds)
104{
105 int status = 0;
106
107 pthread_cleanup_push (UnlockMutex, &_mutex);
108 pthread_mutex_lock (&_mutex);
109
110 struct timeval now;
111
112 TCmMutex::iolock ();
113 gettimeofday (&now, 0);
114 TCmMutex::iounlock ();
115
116 struct timespec delta;
117 struct timespec* timeout;
118
119 delta.tv_sec = now.tv_sec + milliseconds / 1000;
120 delta.tv_nsec = now.tv_usec + milliseconds * 1000;
121
122 timeout = &delta;
123
124 if (TCmThread::debug_level () > 0)
125 {
126 std::cout << "Transaction::do_wait_signal(t)> " << _info << " " << _count << std::endl;
127 }
128
129 status = pthread_cond_timedwait (&_var, &_mutex, timeout);
130
131 //std::cout << "Transaction::do_wait(t)> " << _info << " status=" << status << std::endl;
132
133 pthread_cleanup_pop (1);
134
135 return (status == 0);
136}
137
138void Transaction::do_wait ()
139{
140 pthread_cleanup_push (UnlockMutex, &_mutex);
141 pthread_mutex_lock (&_mutex);
142
143 if (TCmThread::debug_level () > 0)
144 {
145 std::cout << "Transaction::do_wait> " << _info << std::endl;
146 }
147
148 pthread_cond_wait (&_var, &_mutex);
149 pthread_cleanup_pop (1);
150}
151
152bool Transaction::do_wait (int milliseconds)
153{
154 int status = 0;
155
156 pthread_cleanup_push (UnlockMutex, &_mutex);
157 pthread_mutex_lock (&_mutex);
158
159 struct timeval now;
160
161 TCmMutex::iolock ();
162 gettimeofday (&now, 0);
163 TCmMutex::iounlock ();
164
165 struct timespec delta;
166 struct timespec* timeout;
167
168 delta.tv_sec = now.tv_sec + milliseconds / 1000;
169 delta.tv_nsec = now.tv_usec + milliseconds * 1000;
170
171 timeout = &delta;
172
173 if (TCmThread::debug_level () > 0)
174 {
175 std::cout << "Transaction::do_wait(t)> " << _info << std::endl;
176 }
177
178 status = pthread_cond_timedwait (&_var, &_mutex, timeout);
179
180 //std::cout << "Transaction::do_wait(t)> " << _info << " status=" << status << std::endl;
181
182 pthread_cleanup_pop (1);
183
184 return (status == 0);
185}
186
187/**
188 * The configuration parameters for the program
189 */
190int MaxThreads = 20;
191
192/**
193 * Global variables
194 */
195int LastId = 0;
196
197/// A transaction to wake up the scheduler
198Transaction QueueTr ("QueueTr");
199
200/// Count the number of objects still pending
201int ObjectCount = 0;
202
203/// Accumulating the run time
204double TotalTime = 0;
205
206/// A mutex for ObjectCount and TotalTime
207TCmMutex CountLock ("CountLock");
208
209class A;
210
211/// All pending objects (and associated mutex)
212std::vector <A*> ThreadQueue;
213TCmMutex ThreadLock ("ThreadLock");
214
215/// Counting the running threads (and associated mutex)
216int ThreadCount = 0;
217TCmMutex ThreadCountLock ("ThreadCountLock");
218
219class Package;
220
221/**
222 * Dictionaries for packages and for runner objects
223 */
224std::map <std::string, Package*> Packages;
225std::map <std::string, Project*> Projects;
226std::map <std::string, A*> As;
227int PackageCount = 0;
228
229/**
230 * The shell command to run in every runner
231 */
232std::string Command = "";
233
234/**
235 * The expanded CMTPATH
236 */
237std::vector<std::string> Cmtpath_items;
238
239/**
240 * Global parameters for selection or exclusion
241 */
242std::vector <std::string> Selections;
243std::vector <std::string> Exclusions;
244std::string Begin;
245TCmMutex BeginLock ("BeginLock");
246std::vector <std::string> Cmtpath_selections;
247
248/**
249 * Helper to get a flat random integer value
250 */
251unsigned int get_random (unsigned int range)
252{
253 double x = (((double) range) * (double) rand ()) / ((double) RAND_MAX);
254
255 return ((unsigned int) x);
256}
257
258std::string p_pwd ()
259{
260 char buffer[256] = "";
261 char* ptr = 0;
262 ptr = getcwd (buffer, sizeof (buffer));
263
264 const char* t = &buffer[0];
265 return ((std::string) t);
266}
267
268void p_basename (const std::string& file_name, std::string& result)
269{
270 unsigned int pos = file_name.find_last_of ('/');
271 if (pos == std::string::npos)
272 {
273 pos = file_name.find_last_of ('\\');
274 }
275
276 if (pos == std::string::npos)
277 {
278 result = file_name;
279 }
280 else
281 {
282 result = file_name.substr (pos + 1);
283 }
284}
285
286//--------------------------------------------------
287void p_dirname (const std::string& file_name, std::string& result)
288{
289 unsigned int pos = file_name.find_last_of ('/');
290 if (pos == std::string::npos)
291 {
292 pos = file_name.find_last_of ('\\');
293 }
294
295 if (pos == std::string::npos)
296 {
297 result = "";
298 }
299 else
300 {
301 result = file_name;
302 result.erase (pos);
303 }
304}
305
306
307/**
308 * Execute a shell command and accumulate its output into a string
309 */
310static int execute (const std::string& command, std::string& output)
311{
312 output = "";
313
314 FILE* f = popen (command.c_str (), "r");
315
316 if (f != 0)
317 {
318 char line[256];
319 char* ptr;
320
321 while ((ptr = fgets (line, sizeof (line), f)) != NULL)
322 {
323 output += ptr;
324 }
325 pclose (f);
326
327 return (1);
328 }
329
330 return (0);
331}
332
333/**
334 * The Package class stores information about a package
335 */
336class Package
337{
338public:
339 Package (const std::string& name,
340 const std::string& version,
341 const std::string& offset,
342 const std::string& cmtpath) :
343 m_name (name),
344 m_version (version),
345 m_offset (offset),
346 m_cmtpath (cmtpath)
347 {
348 }
349
350 void set (const std::string& name,
351 const std::string& version,
352 const std::string& offset,
353 const std::string& cmtpath)
354 {
355 m_name = name;
356 m_version = version;
357 m_offset = offset;
358 m_cmtpath = cmtpath;
359
360 m_location = m_cmtpath;
361
362 if (m_cmtpath != "")
363 {
364 if (m_offset != "")
365 {
366 m_location += m_offset;
367 }
368 m_location += "/";
369 }
370
371 m_location += m_name;
372 m_location += "/";
373 m_location += m_version;
374 m_location += "/cmt";
375 }
376
377 const std::string& name () const
378 {
379 return (m_name);
380 }
381
382 const std::string& version () const
383 {
384 return (m_version);
385 }
386
387 const std::string& offset () const
388 {
389 return (m_offset);
390 }
391
392 const std::string& cmtpath () const
393 {
394 return (m_cmtpath);
395 }
396
397 const std::string& location () const
398 {
399 return (m_location);
400 }
401
402private:
403 std::string m_name;
404 std::string m_version;
405 std::string m_offset;
406 std::string m_cmtpath;
407 std::string m_location;
408};
409
410/**
411 * A runner object. It is associated with a package.
412 */
413class A : public TCmThread
414{
415public:
416 A () : m_package (0),
417 m_started (false),
418 m_finished (false),
419 m_ready (false),
420 m_queued (false),
421 m_launched (false),
422 m_run_status (0)
423 {
424 ObjectCount++;
425 LastId++;
426 m_id = LastId;
427 }
428
429 A (Package* package) : m_package (package),
430 m_started (false),
431 m_finished (false),
432 m_ready (false),
433 m_queued (false),
434 m_launched (false),
435 m_run_status (0)
436 {
437 ObjectCount++;
438 LastId++;
439 m_id = LastId;
440 m_mutex.set_info (name ());
441 }
442
443 ~A ()
444 {
445 }
446
447 /**
448 * Called when a runner terminates.
449 *
450 * 1) Push back the parent to the activation queue when it's ready to start
451 * 2) wake up the scheduler
452 */
453 void terminate ()
454 {
455 if (m_parents.size () > 0)
456 {
457 std::vector <A*>::iterator it;
458
459 for (it = m_parents.begin (); it != m_parents.end (); ++it)
460 {
461 A* a = *it;
462
463 a->terminate_child (this);
464
465 if (a->should_start ())
466 {
467 a->queue ();
468 }
469 }
470 }
471
472 QueueTr.do_signal ();
473 }
474
475 /**
476 * Callback called after a thread terminates.
477 *
478 * 1) count down the number of active threads
479 * 2) Push back the parent to the activation queue when it's ready to start
480 * 3) wake up the scheduler
481 */
482 void cleanup ()
483 {
484 /*
485 TCmMutex::iolock ();
486 std::cout << "cleanup " << name () << std::endl;
487 TCmMutex::iounlock ();
488 */
489
490 ThreadCountLock.lock ("cleanup");
491 ThreadCount--;
492 ThreadCountLock.unlock ();
493
494 terminate ();
495 }
496
497 /**
498 * Construct the name of this runner from the package name
499 */
500 std::string name () const
501 {
502 if (m_package != 0)
503 {
504 return (m_package->name ());
505 }
506 else
507 {
508 char buff [80];
509 sprintf (buff, "A%d", m_id);
510
511 return (buff);
512 }
513 }
514
515 const std::string& version () const
516 {
517 if (m_package != 0)
518 {
519 return (m_package->version ());
520 }
521 else
522 {
523 static std::string empty;
524 return (empty);
525 }
526 }
527
528 const std::string& offset () const
529 {
530 if (m_package != 0)
531 {
532 return (m_package->offset ());
533 }
534 else
535 {
536 static std::string empty;
537 return (empty);
538 }
539 }
540
541 const std::string& cmtpath () const
542 {
543 if (m_package != 0)
544 {
545 return (m_package->cmtpath ());
546 }
547 else
548 {
549 static std::string empty;
550 return (empty);
551 }
552 }
553
554 const std::string& location () const
555 {
556 if (m_package != 0)
557 {
558 return (m_package->location ());
559 }
560 else
561 {
562 static std::string empty;
563 return (empty);
564 }
565 }
566
567 int id () const
568 {
569 return (m_id);
570 }
571
572 /**
573 * Recursively find if an object is already in the parent list
574 * (check for cycles)
575 */
576 bool has_parent (A* a)
577 {
578 if (a == this) return (true);
579
580 std::vector <A*>::iterator it;
581
582 for (it = m_parents.begin (); it != m_parents.end (); ++it)
583 {
584 A* p = *it;
585
586 if (p == a) return (true);
587
588 if (p->has_parent (a)) return (true);
589 }
590
591 return (false);
592 }
593
594 /**
595 * Adds a unique child (should first check for cycles)
596 * Avoid duplications.
597 */
598 void add_child (A* a)
599 {
600 if (a != 0)
601 {
602 std::vector <A*>::iterator it;
603
604 for (it = m_children.begin (); it != m_children.end (); ++it)
605 {
606 A* c = *it;
607 if (c == a) return;
608 }
609
610 //std::cout << name () << " add child " << a->name () << std::endl;
611 m_children.push_back (a);
612 a->add_parent (this);
613 }
614 }
615
616 /**
617 * Adds a parent without duplication
618 */
619 void add_parent (A* a)
620 {
621 std::vector <A*>::iterator it;
622
623 for (it = m_parents.begin (); it != m_parents.end (); ++it)
624 {
625 A* p = *it;
626 if (p == a) return;
627 }
628
629 m_parents.push_back (a);
630 }
631
632 void dequeue ()
633 {
634 m_mutex.lock ("dequeue");
635 m_queued = false;
636 m_mutex.unlock ();
637 }
638
639 bool is_finished ()
640 {
641 bool result;
642
643 m_mutex.lock ("is_finished");
644 result = m_finished;
645 m_mutex.unlock ();
646
647 return (result);
648 }
649
650 /**
651 * A runner can be queued if it was never started yet and
652 * it has no more children
653 */
654 bool can_be_queued ()
655 {
656 bool result;
657
658 m_mutex.lock ("can_be_queued");
659 result = !m_finished && !m_started && (m_children.size () == 0);
660 m_mutex.unlock ();
661
662 return (result);
663 }
664
665 /**
666 * A runner should start when all children are finished
667 */
668 bool should_start ()
669 {
670 bool result = (check_state () == ready_to_start);
671
672 return (result);
673 }
674
675 int get_run_status ()
676 {
677 int result;
678
679 m_mutex.lock ("get_run_status");
680 result = m_run_status;
681 m_mutex.unlock ();
682
683 return (result);
684 }
685
686 typedef enum
687 {
688 already_started,
689 already_finished,
690 not_yet_completed,
691 ready_to_start,
692 really_started,
693 queued
694 } lock_status;
695
696 /**
697 * Analyze the state of a runner
698 */
699 lock_status check_state ()
700 {
701 bool done = false;
702
703 m_mutex.lock ("check_state-1");
704 done = m_finished;
705 m_mutex.unlock ();
706
707 if (done)
708 {
709 return (already_finished);
710 }
711
712 m_mutex.lock ("check_state-2");
713 done = m_started;
714 m_mutex.unlock ();
715
716 if (done)
717 {
718 return (already_started);
719 }
720
721 // not yet started
722
723 /*
724 TCmMutex::iolock ();
725 std::cout << "check_state-1> " << name () << std::endl;
726 TCmMutex::iounlock ();
727 */
728
729 bool should_check_children = false;
730
731 m_mutex.lock ("check_state-3");
732 should_check_children = !m_ready;
733 m_mutex.unlock ();
734
735 if (should_check_children)
736 {
737 // check for unfinished children
738
739 std::vector <A*>::iterator it;
740
741 bool is_ready = true;
742
743 m_mutex.lock ("check_state-4");
744 for (it = m_children.begin (); it != m_children.end (); ++it)
745 {
746 A* c = *it;
747
748 if (!c->is_finished ())
749 {
750 is_ready = false;
751 break;
752 }
753 }
754 m_mutex.unlock ();
755
756 if (!is_ready)
757 {
758 return (not_yet_completed);
759 }
760
761 // all children are finished
762
763 m_mutex.lock ("check_state-4");
764 m_ready = true;
765 m_mutex.unlock ();
766 }
767
768 return (ready_to_start);
769 }
770
771 /**
772 * Actually ask the runner to run its thread
773 */
774 bool trigger ()
775 {
776 /*
777 TCmMutex::iolock ();
778 std::cout << "trigger> " << name ();
779 std::cout << std::endl;
780 TCmMutex::iounlock ();
781 */
782
783 ThreadCountLock.lock ("trigger");
784 ThreadCount++;
785 ThreadCountLock.unlock ();
786
787 m_mutex.lock ("trigger");
788 m_started = true;
789 m_mutex.unlock ();
790
791 bool result = start ();
792
793 return (result);
794 }
795
796 /**
797 * Abort the planned activity. Must emulate all
798 * mechanisms of trigger-start-cleanup-wakeup
799 */
800 void abort ()
801 {
802 CountLock.lock ("abort");
803 ObjectCount--;
804 int oc = ObjectCount;
805 CountLock.unlock ();
806
807 TCmMutex::iolock ();
808
809 std::cout << "# Aborting " << name ()
810 << " (" << PackageCount - oc << "/" << PackageCount << ")"
811 << std::endl;
812
813 TCmMutex::iounlock ();
814
815 m_mutex.lock ("abort");
816 m_finished = true;
817 m_run_status = 1;
818 m_mutex.unlock ();
819
820 terminate ();
821 }
822
823 /**
824 * Ignore the planned activity. Must emulate all
825 * mechanisms of trigger-start-cleanup-wakeup
826 */
827 void ignore ()
828 {
829 CountLock.lock ("ignore");
830 ObjectCount--;
831 //int oc = ObjectCount;
832 CountLock.unlock ();
833
834 /*
835 TCmMutex::iolock ();
836
837 std::cout << "# Ignoring " << name ()
838 << " (" << PackageCount - oc << "/" << PackageCount << ")"
839 << std::endl;
840
841 TCmMutex::iounlock ();
842 */
843
844 m_mutex.lock ("ignore");
845 m_finished = true;
846 m_run_status = 0;
847 m_mutex.unlock ();
848
849 terminate ();
850 }
851
852 /**
853 * Construct a text representation of the runner state
854 */
855 std::string get_state ()
856 {
857 std::string s = "(";
858
859 m_mutex.lock ("get_state");
860 if (m_started) s += "s";
861 if (m_finished) s += "f";
862 if (m_ready) s += "r";
863 if (m_queued) s += "q";
864 if (m_launched) s += "l";
865 m_mutex.unlock ();
866
867 s += ")";
868
869 return (s);
870 }
871
872 /**
873 * Debugging function
874 */
875 void special_show ()
876 {
877 std::string s = "";
878
879 s = get_state ();
880
881 s += " p[";
882 std::vector<A*>::iterator it;
883
884 for (it = m_parents.begin (); it != m_parents.end (); ++it)
885 {
886 A* a = *it;
887 s += a->name ();
888 s += a->get_state ();
889 s += " ";
890 }
891
892 s += "]";
893 s += " c[";
894
895 for (it = m_children.begin (); it != m_children.end (); ++it)
896 {
897 A* a = *it;
898 s += a->name ();
899 s += a->get_state ();
900 s += " ";
901 }
902
903 s += "]";
904
905 TCmMutex::iolock ();
906 std::cout << "### " << name () << " " << s << std::endl;
907 TCmMutex::iounlock ();
908 }
909
910 /**
911 * A child has just terminated (possibly with error)
912 * The parent removes it from its children and inherit error.
913 */
914 void terminate_child (A* child)
915 {
916 if (child == 0) return;
917
918 std::vector <A*>::iterator cit;
919
920 m_mutex.lock ("terminate_child");
921
922 for (cit = m_children.begin (); cit != m_children.end (); ++cit)
923 {
924 A* c = *cit;
925 if (c == child)
926 {
927 int status = c->get_run_status ();
928 if (status != 0)
929 {
930 m_run_status = status;
931 }
932 m_children.erase (cit);
933 break;
934 }
935 }
936
937 m_mutex.unlock ();
938
939 }
940
941 bool selected ()
942 {
943 if ((Begin == "") &&
944 (Cmtpath_selections.size () == 0) &&
945 (Selections.size () == 0) &&
946 (Exclusions.size () == 0)) return (true);
947
948 const std::string& loc = location ();
949
950 bool result = true;
951
952 BeginLock.lock ("selected");
953 if (Begin != "")
954 {
955 if (loc.find (Begin) == std::string::npos)
956 {
957 result = false;
958 }
959 else
960 {
961 Begin = "";
962 }
963 }
964 BeginLock.unlock ();
965
966 if (!result)
967 {
968 return (result);
969 }
970
971 if (Cmtpath_selections.size () != 0)
972 {
973 result = false;
974
975 std::vector<std::string>::iterator sit;
976
977 for (sit = Cmtpath_selections.begin (); sit != Cmtpath_selections.end (); ++sit)
978 {
979 const std::string& s = *sit;
980
981 if (loc.find (s) == 0)
982 {
983 result = true;
984 break;
985 }
986 }
987 }
988
989 if (Selections.size () != 0)
990 {
991 result = false;
992
993 std::vector<std::string>::iterator sit;
994
995 for (sit = Selections.begin (); sit != Selections.end (); ++sit)
996 {
997 const std::string& s = *sit;
998
999 if (loc.find (s) != std::string::npos)
1000 {
1001 result = true;
1002 break;
1003 }
1004 }
1005 }
1006
1007 if (result)
1008 {
1009 std::vector<std::string>::iterator eit;
1010
1011 for (eit = Exclusions.begin (); eit != Exclusions.end (); ++eit)
1012 {
1013 const std::string& e = *eit;
1014
1015 if (loc.find (e) != std::string::npos)
1016 {
1017 result = false;
1018 break;
1019 }
1020 }
1021 }
1022
1023 return (result);
1024 }
1025
1026 /**
1027 * Send a runner to the activation queue.
1028 * to optimize the next checks
1029 */
1030 void queue ()
1031 {
1032 /*
1033 TCmMutex::iolock ();
1034 std::cout << "Queue-begin> " << name ();
1035 std::cout << std::endl;
1036 TCmMutex::iounlock ();
1037 */
1038
1039 /*
1040 ThreadLock.lock ("queue-1");
1041 std::vector <A*>::iterator it;
1042 TCmMutex::iolock ();
1043 std::cout << "Q[";
1044 for (it = ThreadQueue.begin (); it != ThreadQueue.end (); ++it)
1045 {
1046 A* a = *it;
1047 std::cout << a->name () << " ";
1048 }
1049 std::cout << "]" << std::endl;
1050 TCmMutex::iounlock ();
1051 ThreadLock.unlock ();
1052 */
1053
1054 if (!selected ())
1055 {
1056 ignore ();
1057 return;
1058 }
1059
1060 m_mutex.lock ("queue-1");
1061
1062 if (!m_queued)
1063 {
1064 /*
1065 std::string s = get_state ();
1066
1067 TCmMutex::iolock ();
1068 std::cout << "Sending " << name () << " to queue";
1069 std::cout << " " << s << std::endl;
1070 TCmMutex::iounlock ();
1071 */
1072
1073 ThreadLock.lock ("queue-2");
1074 m_queued = true;
1075 ThreadQueue.push_back (this);
1076 ThreadLock.unlock ();
1077 }
1078 else
1079 {
1080 /*
1081 std::string s = get_state ();
1082
1083 TCmMutex::iolock ();
1084 std::cout << "Keeping " << name () << " in queue";
1085 std::cout << " " << s << std::endl;
1086 TCmMutex::iounlock ();
1087 */
1088 }
1089
1090 m_mutex.unlock ();
1091 }
1092
1093 /**
1094 * Initialize the activation queue by selectively pushing leaves
1095 * of the graph
1096 */
1097 void launch ()
1098 {
1099 std::map<std::string, A*>::iterator ait;
1100 for (ait = As.begin (); ait != As.end (); ++ait)
1101 {
1102 A* a = (*ait).second;
1103 if (a->can_be_queued ())
1104 {
1105 a->queue ();
1106 }
1107 }
1108
1109 /*
1110 if (m_launched) return;
1111
1112 m_launched = true;
1113
1114 if (m_children.size () > 0)
1115 {
1116 for (unsigned int i = 0; i < m_children.size (); i++)
1117 {
1118 A* child = m_children[i];
1119
1120 child->launch ();
1121 }
1122 }
1123 else
1124 {
1125 queue ();
1126 }
1127 */
1128 }
1129
1130 /**
1131 * The runner threaded activity.
1132 *
1133 * 1) execute the command (and accumulates its output in a text)
1134 * 2) display the output and statistics
1135 * 3) set the finished state
1136 *
1137 * (the cleanup callback will manage the post-run activities)
1138 */
1139 void run ()
1140 {
1141 int status = 0;
1142 std::string output;
1143
1144 std::string cp = cmtpath ();
1145
1146 std::string where = location ();
1147
1148 if (cp == "")
1149 {
1150 output = "# Package ";
1151 output += name ();
1152 output += " not found";
1153 }
1154 else
1155 {
1156
1157 std::string cmd = "(cd ";
1158 cmd += where;
1159 if (Command != "")
1160 {
1161 cmd += "; ";
1162 cmd += Command;
1163 }
1164 cmd += "; echo cmtbroadcaststatus=$?) 2>&1";
1165
1166 execute (cmd, output);
1167
1168 unsigned int status_pos = output.find ("cmtbroadcaststatus=");
1169 if (status_pos != std::string::npos)
1170 {
1171 std::string s = output.substr (status_pos);
1172 output.erase (status_pos);
1173 sscanf (s.c_str (), "cmtbroadcaststatus=%d", &status);
1174 }
1175 }
1176
1177 CountLock.lock ("run");
1178 ObjectCount--;
1179 int oc = ObjectCount;
1180 CountLock.unlock ();
1181
1182 TCmMutex::iolock ();
1183
1184 std::cout << "#--------------------------------------------------------------" << std::endl;
1185 std::cout << "# Now trying [" << Command << "] in " << where
1186 << " (" << PackageCount - oc << "/" << PackageCount << ")"
1187 << std::endl;
1188 std::cout << "#--------------------------------------------------------------" << std::endl;
1189
1190 std::cout << output << std::endl;
1191
1192 TCmMutex::iounlock ();
1193
1194 m_mutex.lock ("run");
1195 m_finished = true;
1196 m_run_status = status;
1197 m_mutex.unlock ();
1198 }
1199
1200private:
1201 int m_id;
1202 std::vector <A*> m_children;
1203 std::vector <A*> m_parents;
1204 Package* m_package;
1205 bool m_started;
1206 bool m_finished;
1207 bool m_ready;
1208 bool m_queued;
1209 bool m_launched;
1210 TCmMutex m_mutex;
1211 int m_run_status;
1212};
1213
1214/**
1215 * Analyze one line of the "cmt show uses" result
1216 */
1217void parse_line (const std::string& line, A& top)
1218{
1219 static std::vector <A*> Hierarchy;
1220
1221 std::istrstream s (line.c_str ());
1222
1223 if (line[0] == '#')
1224 {
1225 /**
1226 * First section : the hierarchy
1227 * o create all Packages and Runners
1228 * o construct the graph (avoiding cycles)
1229 */
1230
1231 std::string s1;
1232 std::string s2;
1233 s >> s1 >> s2;
1234
1235 if (s2 != "use") return;
1236
1237 unsigned int pos = 2;
1238
1239 pos = line.find ("use") - 2;
1240
1241 unsigned int level = (pos / 2);
1242
1243 std::string n;
1244 std::string v;
1245 std::string o;
1246
1247 s >> n >> v >> o;
1248
1249 if (o[0] == '(')
1250 {
1251 o = "";
1252 }
1253
1254 std::map <std::string, Package*>::iterator p_it;
1255
1256 p_it = Packages.find (n);
1257
1258 Package* package;
1259
1260 if (p_it == Packages.end ())
1261 {
1262 package = new Package (n, v, o, "");
1263 Packages[n] = package;
1264 }
1265 else
1266 {
1267 package = (*p_it).second;
1268 }
1269
1270 //std::cout << "Level=" << level << " n=" << n << std::endl;
1271
1272 A* parent = 0;
1273
1274 if (level > 0)
1275 {
1276 parent = Hierarchy[level - 1];
1277 }
1278 else
1279 {
1280 parent = &top;
1281 }
1282
1283 std::map <std::string, A*>::iterator a_it;
1284
1285 a_it = As.find (n);
1286
1287 A* a;
1288
1289 if (a_it == As.end ())
1290 {
1291 a = new A (package);
1292 As[n] = a;
1293 }
1294 else
1295 {
1296 a = (*a_it).second;
1297 }
1298
1299 if (Hierarchy.size () <= level)
1300 {
1301 Hierarchy.push_back (a);
1302 }
1303 else
1304 {
1305 Hierarchy[level] = a;
1306 }
1307
1308 if (parent != 0)
1309 {
1310 if (parent->has_parent (a))
1311 {
1312 std::cout << "Cycle for package " << n << std::endl;
1313 }
1314 else
1315 {
1316 parent->add_child (a);
1317 }
1318 }
1319 }
1320 else
1321 {
1322 /**
1323 * Second section : get physical package information
1324 */
1325
1326 std::string s1;
1327 std::string n;
1328 std::string v;
1329 std::string o;
1330 std::string p;
1331 s >> s1;
1332 if (s1 != "use") return;
1333 s >> n >> v >> o;
1334 if (o[0] == '(')
1335 {
1336 p = o.substr (1, o.length() - 2);
1337 o = "";
1338 }
1339 else
1340 {
1341 s >> p;
1342 p = p.substr (1, p.length() - 2);
1343 }
1344
1345 //std::cout << "n=" << n << " v=" << v << " o=" << o << " p=" << p << std::endl;
1346
1347 std::map <std::string, Package*>::iterator it;
1348
1349 it = Packages.find (n);
1350
1351 if (it != Packages.end ())
1352 {
1353 Package* pa = (*it).second;
1354 pa->set (n, v, o, p);
1355 }
1356 else
1357 {
1358 Packages[n] = new Package (n, v, o, p);
1359 }
1360 }
1361}
1362
1363class Scheduler
1364{
1365public:
1366 Scheduler ()
1367 {
1368 }
1369
1370 /**
1371 * Obtain all packages from a cmt show uses command
1372 */
1373 void create_packages_from_command (const std::string& command)
1374 {
1375 std::string pwd = p_pwd ();
1376
1377 std::string v = pwd;
1378 p_dirname (pwd, v);
1379 std::string p = v;
1380 p_basename (v, v);
1381 p_dirname (p, p);
1382 std::string c = p;
1383 p_basename (p, p);
1384
1385 p_dirname (c, c);
1386
1387 static Package current (p, v, "", c);
1388
1389 current.set (p, v, "", c);
1390
1391 static A top (&current);
1392
1393 FILE* f = popen (command.c_str (), "r");
1394
1395 if (f != 0)
1396 {
1397 char line[256];
1398 char* ptr;
1399
1400 std::string s;
1401
1402 while ((ptr = fgets (line, sizeof (line), f)) != NULL)
1403 {
1404 s = ptr;
1405 parse_line (s, top);
1406 }
1407 pclose (f);
1408 }
1409
1410 PackageCount = Packages.size ();
1411 top.launch ();
1412 }
1413
1414 A* get_next_pending_object ()
1415 {
1416 A* to_start = 0;
1417
1418 ThreadLock.lock ("get_next_pending_object");
1419 if (ThreadCount < MaxThreads)
1420 {
1421 if (ThreadQueue.size () > 0)
1422 {
1423 std::vector <A*>::iterator it;
1424 it = ThreadQueue.begin ();
1425 if (it != ThreadQueue.end ())
1426 {
1427 to_start = *it;
1428 ThreadQueue.erase (it);
1429 to_start->dequeue ();
1430 }
1431 }
1432 }
1433 ThreadLock.unlock ();
1434
1435 return (to_start);
1436 }
1437
1438 bool should_wait ()
1439 {
1440 bool result = true;
1441
1442 ThreadCountLock.lock ("main-1");
1443 int tc = ThreadCount;
1444 ThreadCountLock.unlock ();
1445
1446 ThreadLock.lock ("should_wait");
1447 int ts = ThreadQueue.size ();
1448 ThreadLock.unlock ();
1449
1450 if ((tc < MaxThreads) && (ts > 0))
1451 {
1452 result = false;
1453 }
1454
1455 return (result);
1456 }
1457
1458 void run ()
1459 {
1460 QueueTr.do_signal ();
1461
1462 for (;;)
1463 {
1464 int oc = 0;
1465
1466 ThreadLock.lock ("run");
1467 oc = ObjectCount;
1468 ThreadLock.unlock ();
1469
1470 /// the scheduler stops when all objects have been activated
1471 if (oc == 0) break;
1472
1473 A* to_start = get_next_pending_object ();
1474
1475 if (to_start != 0)
1476 {
1477 if (to_start->should_start ())
1478 {
1479 int status = to_start->get_run_status ();
1480
1481 if (status == 0)
1482 {
1483 // start it now
1484
1485 if (!to_start->trigger ())
1486 {
1487 to_start->queue ();
1488 }
1489 }
1490 else
1491 {
1492 to_start->abort ();
1493 }
1494 }
1495 else
1496 {
1497 // Now that the queue only contains ready_to_start runners, this
1498 // case should never happen !!
1499 // however we re-queue it
1500
1501 to_start->queue ();
1502 }
1503 }
1504
1505 if (should_wait ())
1506 {
1507 QueueTr.do_wait_signal (10000);
1508 }
1509 }
1510 }
1511};
1512
1513void expand_cmtpath ()
1514{
1515 const char* cmtpath_str = ::getenv ("CMTPATH");
1516
1517 if (cmtpath_str != 0)
1518 {
1519 static const char path_separator = ':';
1520
1521 std::string cmtpath_list (cmtpath_str);
1522 std::string::size_type pos = 0;
1523
1524 for (;;)
1525 {
1526 bool ending = false;
1527
1528 std::string::size_type next = cmtpath_list.find (path_separator, pos);
1529
1530 std::string path;
1531
1532 if (next == std::string::npos)
1533 {
1534 path = cmtpath_list.substr (pos);
1535 ending = true;
1536 }
1537 else
1538 {
1539 path = cmtpath_list.substr (pos, next - pos);
1540 pos = next + 1;
1541 }
1542
1543 Cmtpath_items.push_back (path);
1544 if (ending) break;
1545 }
1546 }
1547}
1548
1549/**
1550 * Entry point for the application
1551 */
1552int main (int argc, char* argv[])
1553{
1554 argc--;
1555 argv++;
1556
1557 bool in_options = true;
1558 bool is_global = false;
1559
1560 expand_cmtpath ();
1561
1562 while (argc > 0)
1563 {
1564 if (in_options)
1565 {
1566 std::string opt = argv[0];
1567
1568 if (opt[0] == '-')
1569 {
1570 if (opt.substr (0, 9) == "-exclude=")
1571 {
1572 std::string exclusion;
1573
1574 exclusion = opt.substr (9);
1575
1576 unsigned int size = exclusion.size ();
1577
1578 if (size >= 2)
1579 {
1580 if (((exclusion[0] == '"') && (exclusion[size - 1] == '"')) ||
1581 ((exclusion[0] == '\'') && (exclusion[size - 1] == '\'')))
1582 {
1583 exclusion.erase (size - 1);
1584 exclusion.erase (0, 1);
1585 }
1586 }
1587
1588 std::istrstream buff (exclusion.c_str ());
1589 while (!buff.eof ())
1590 {
1591 std::string s;
1592 buff >> s;
1593 Exclusions.push_back (s);
1594 }
1595 }
1596 else if (opt.substr (0, 7) == "-global")
1597 {
1598 is_global = true;
1599 }
1600 else if (opt.substr (0, 8) == "-select=")
1601 {
1602 std::string selection;
1603
1604 selection = opt.substr (8);
1605
1606 unsigned int size = selection.size ();
1607
1608 if (size >= 2)
1609 {
1610 if (((selection[0] == '"') && (selection[size - 1] == '"')) ||
1611 ((selection[0] == '\'') && (selection[size - 1] == '\'')))
1612 {
1613 selection.erase (size - 1);
1614 selection.erase (0, 1);
1615 }
1616 }
1617
1618 std::istrstream buff (selection.c_str ());
1619 while (!buff.eof ())
1620 {
1621 std::string s;
1622 buff >> s;
1623 Selections.push_back (s);
1624 }
1625 }
1626 else if (opt.substr (0, 7) == "-begin=")
1627 {
1628 Begin = opt.substr (7);
1629 }
1630 else if (opt.substr (0, 9) == "-threads=")
1631 {
1632 std::string o = opt.substr (7);
1633 int value = -1;
1634 sscanf (o.c_str (), "%d", &value);
1635
1636 if (value > 0)
1637 {
1638 MaxThreads = value;
1639 }
1640 }
1641 }
1642 else
1643 {
1644 in_options = false;
1645 }
1646 }
1647
1648 if (!in_options)
1649 {
1650 Command += argv[0];
1651 Command += " ";
1652 }
1653
1654 argc--;
1655 argv++;
1656 }
1657
1658 if (is_global)
1659 {
1660 Cmtpath_selections = Cmtpath_items;
1661 }
1662 else
1663 {
1664 if (Cmtpath_items.size () != 0)
1665 {
1666 std::string s = Cmtpath_items[0];
1667 Cmtpath_selections.push_back (s);
1668 }
1669 }
1670
1671 //TCmThread::set_debug_level (1);
1672
1673 Scheduler s;
1674
1675 s.create_packages_from_command ("cmt show uses 2>&1 ");
1676
1677 s.run ();
1678
1679 return (0);
1680}
Note: See TracBrowser for help on using the repository browser.