source: tbroadcast/v2/python/tbroadcast.py @ 240

Last change on this file since 240 was 240, checked in by garonne, 18 years ago

improve tbroadcast script

  • Property svn:executable set to *
File size: 11.2 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Date: 08/25/2006
4# -- Name: tbroadcast
5# -- Description: mainc class
6#----------------------------------#
7
8import os
9import sys
10import time
11import string
12import random
13import commands
14import traceback
15
16from threadpool import WorkRequest
17from threadpool import ThreadPool
18from threadpool import NoResultsPending
19from threadpool import NoWorkersAvailable
20from threadpool import  makeRequests
21from executer   import  exeCommand
22
23class Scheduler:
24
25    def __init__(self, num_workers=20):
26        self.pool     = ThreadPool(num_workers=num_workers)
27        self.packages = {} 
28        self.check_cycles()
29        self.instanciate_packages ()     
30
31
32    def check_cycles (self):
33        cmd = 'cmt show cycles'
34        cycle_found = False
35        status, output = commands.getstatusoutput (cmd)
36        if status != 0:
37            print output
38            sys.exit(-1)   
39        lines = string.split(output, '\n')
40        for line in lines:           
41            if line!='' and line [0] != '#':           
42                if not cycle_found:
43                    cycle_found = True
44                    print "# Error: cycles found, not possible to execute broadcast with threads. See the followings packages:"
45                print line
46        if cycle_found:
47            sys.exit(-1)
48   
49    def instanciate_packages(self):
50        # We create the schedule of the work units
51        print '# First, we initialize the DAG by parsing cmt show uses'
52        cmd = 'cmt show uses'
53        status, output = commands.getstatusoutput (cmd)
54        if status != 0:
55            print output
56            sys.exit(-1)   
57        lines = string.split(output, '\n')
58        current_package = self.get_current_package()
59        self.packages [current_package] = {'version': '*', 'dependencies': list(), 'status': 'waiting', 'path':os.getcwd()}
60        indice  = 1
61        for line in lines:
62            if line [0] == '#' and line[:5] != "#CMT>" and line not in ['# Selection :','#']:
63                name    = string.split (line)[2]
64                version = string.split (line)[3]
65                if name not in self.packages[current_package]['dependencies']:
66                   self.packages[current_package]['dependencies'].append (name)               
67                   #print '\n#', indice,':: add package', name
68                   indice = indice + 1
69                if not self.packages.has_key (name):
70                   self.packages [name] = {'version': version, 'dependencies': list(), 'status': 'waiting', 'path': None}               
71                   found = False
72                   for ligne in lines:
73                    if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne not in ['# Selection :','#']:               
74                       if found:
75                           if level < string.find(ligne, 'use'):
76                               if string.split (ligne)[2] not in self.packages[name]['dependencies']:
77                                   self.packages[name]['dependencies'].append (string.split (ligne)[2])
78                                   #print "# add dependency", string.split (ligne)[2], ' to package ',name
79                           else:
80                                found = False                                                                                                   
81                       if name == string.split (ligne)[2]:
82                           level = string.find(ligne, 'use')
83                           found = True   
84            if line[:4] == "use ":             
85                result  = string.split (line[4:len(line)], ' ')
86                if  self.packages.has_key(result[0]):
87                    if len(result)==4:
88                        name, version, offset, path = string.split (line[4:len(line)], " ")
89                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
90                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
91                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
92                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
93                        else:
94                            print '# error path not found for', name
95                            sys.exit(-1)   
96                    elif len(result)==5:
97                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")                                       
98                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
99                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
100                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
101                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
102                        else:
103                            print '# error path not found for', name
104                            sys.exit(-1)                                                                                                   
105                    elif len(result)==3:
106                        name, version, path = string.split (line[4:len(line)], " ")
107                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
108                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
109                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):   
110                            full_path = path[1:-1] + '/' +name + + '/cmt'
111                        else:
112                            print '# error path not found for', name
113                            sys.exit(-1)
114                    else:
115                        print "error:",line
116                        print str(result)
117                        sys.exit(-1) 
118                    self.packages[result[0]]['path'] = full_path
119        print '# Sometimes takes a certain time (should be improved asap)'
120
121    def get_current_package(self):   
122        cmd = 'cmt show macro package'
123        status, output = commands.getstatusoutput (cmd)
124        if status != 0:
125            print output
126            sys.exit(-1)   
127        lines = string.split(output, '\n')
128        for line in lines:
129            if line [0] != '#':
130                start = string.find(line,"'")
131                end   = string.find(line[start+1:len(line)],"'")
132                return line [start+1:start+end+1]
133
134    def get_work_area_path (self, name):       
135        return self.packages [name]['path']
136       
137    def get_package_path (self, name):   
138        #return os.getcwd ()
139        cmd = 'cmt -use='+name+' run pwd'
140        status, output = commands.getstatusoutput (cmd)
141        if status != 0:
142            print output
143            sys.exit(-1)   
144        lines = string.split(output, '\n')
145        for line in lines:
146            if line [0] != '#' and line[:5] != "#CMT>":
147                print line
148                return line
149 
150    def print_dependencies(self):
151        print '# ------------------------' 
152        print '# package --> dependencies' 
153        print '# ------------------------' 
154        for key in self.packages.keys():
155            print key, '-->', self.packages[key] ['dependencies'],',', self.packages[key] ['status']                       
156
157    def is_work_unit_waiting (self, name):
158        return self.packages[name] ['status'] == 'waiting'
159
160    def set_work_unit_status (self, name, status):
161        self.packages[name] ['status'] = status
162
163    def get_dependencies (self, name):
164        return self.packages[name] ['dependencies']
165   
166    def get_next_work_units (self):
167        result = list ()
168        for key in self.packages.keys():
169            if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
170                result.append(key)
171        return result
172
173    def is_work_units (self):
174        result = list ()
175        for key in self.packages.keys():
176            if self.is_work_unit_waiting(key) :
177                return True
178        return False       
179
180    def suppress_work_unit (self, name):
181        #print '# remove', name, 'from schedule'
182        for key in self.packages.keys():
183            if name in self.packages[key]['dependencies']:
184                self.packages[key]['dependencies'].remove(name)
185               
186    def add_work_unit (self, name, cmd):
187        if self.is_work_unit_waiting (name):
188            # we create requests
189            arg = {'cmd': cmd , 'package':name}
190            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception) 
191            # then we put the work request in the queue...
192            self.set_work_unit_status (name, 'queued')
193            self.pool.putRequest(req)
194            #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
195
196    def execute (self, command):
197        #self.print_dependencies ()
198        packages = self.get_next_work_units()
199        if len(packages) !=0:
200            print '\n#--------------------------------------------------------------'   
201            print '# Execute parallel actions within packages', packages                     
202            for package in packages:
203                self.add_work_unit (package, command)
204
205    def execute_all(self,command):
206        self.execute (command)
207        self.wait()
208        ##while self.is_work_units():
209       
210    def wait (self):
211       self.pool.wait()   
212
213    # this will be called each time a result is available
214    def result_callback(self, request, result):
215      #print "**Result: %s from request #%s" % (str(result), request.requestID)
216      #print "# Result: %s from request #%s" % (result['package'], request.requestID)
217      self.execute (result['cmd'])
218
219    # the work the threads will have to do
220    def do_execute(self, arg):
221      path = self.get_work_area_path (arg['package'])
222      if path == None:
223          raise RuntimeError('Path to package '+ arg['package'] +' not found')
224      self.set_work_unit_status (arg['package'], 'running')     
225      cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"
226      os.chdir(path)       
227      print '#--------------------------------------------------------------'
228      print '# Now trying ['+ arg['cmd']+'] in ' + path
229      print '#--------------------------------------------------------------'
230      cmd = arg['cmd']
231      status, output, error, pythonError  = exeCommand(cmd,  iTimeout = 10)     
232      self.suppress_work_unit (arg['package'])
233      self.set_work_unit_status (arg['package'], 'done')
234      # status, output= commands.getstatusoutput(cmd)
235      #if status != 0:
236      #   raise RuntimeError(output)
237      return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
238             
239    # this will be called when an exception occurs within a thread
240    def handle_exception(self, request, exc_info):
241        #traceback.print_stack()
242      print '#--------------------------------------------------------------'       
243      #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
244      print "# Exception occured: %s" %(exc_info[1])
245      print '#--------------------------------------------------------------'   
246      sys.exit(-1)
247#--------- EoF --------#   
Note: See TracBrowser for help on using the repository browser.