source: tbroadcast/v2/python/tbroadcast.py @ 242

Last change on this file since 242 was 242, checked in by garonne, 18 years ago

enhance severals things

  • Property svn:executable set to *
File size: 15.2 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9import os
10import sys
11import time
12import string
13import random
14import commands
15import traceback
16from threading import BoundedSemaphore
17
18from threadpool import WorkRequest
19from threadpool import ThreadPool
20from threadpool import NoResultsPending
21from threadpool import NoWorkersAvailable
22from threadpool import  makeRequests
23from executer   import  exeCommand
24
25class Scheduler:
26
27    def __init__(self, num_workers=20):
28        self.pool            = ThreadPool(num_workers=num_workers)
29        self.current_package = self.get_current_package()
30        self.packages        = {} 
31        self.counter         = 0
32        self.semaphore       = BoundedSemaphore(1)
33        self.check_cycles()
34        self.instanciate_packages ()
35       
36    def get_counter(self):
37        self.semaphore.acquire ()       
38        self.counter = self.counter + 1
39        value = self.counter
40        self.semaphore.release()
41        return value
42       
43    def check_cycles (self):
44        cmd = 'cmt show cycles'
45        cycle_found = False
46        status, output = commands.getstatusoutput (cmd)
47        if status != 0:
48            print output
49            sys.exit(-1)   
50        lines = string.split(output, '\n')
51        for line in lines:           
52            if line!='' and line [0] != '#':           
53                if not cycle_found:
54                    cycle_found = True
55                    print "# Error: cycles found, not possible to execute broadcast with threads. See the followings packages:"
56                print line
57        if cycle_found:
58            sys.exit(-1)
59
60    def format_uses (self, content):
61        # format variables
62        lignes  = string.split(content, '\n')
63        lines   = list()
64        for ligne in lignes:
65           if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne not in ['# Selection :','#']:
66               lines.append(ligne)
67        lines.reverse()
68        return lines
69
70    def format_paths (self, content):
71        # format variables
72        lignes  = string.split(content, '\n')
73        lines   = list()
74        for ligne in lignes:
75            if ligne[:4] == "use ":             
76               lines.append(ligne)
77        return lines
78
79    def get_paths (self, content):
80        lines = self.format_paths(content)
81        for line in lines:
82                result  = string.split (line[4:len(line)], ' ')
83                if  self.packages.has_key(result[0]):
84                    if len(result)==4:
85                        name, version, offset, path = string.split (line[4:len(line)], " ")
86                        #print name, version, offset, path
87                        #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
88                        if path == '(no_auto_imports)':
89                            path   = offset
90                            offset = ''
91                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
92                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
93                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
94                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
95                        else:
96                            print '# error path not found for', name
97                            sys.exit(-1)   
98                    elif len(result)==5:
99                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")                                       
100                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
101                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
102                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
103                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
104                        else:
105                            print '# error path not found for', name
106                            sys.exit(-1)                                                                                                   
107                    elif len(result)==3:
108                        name, version, path = string.split (line[4:len(line)], " ")
109                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
110                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
111                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):   
112                            full_path = path[1:-1] + '/' +name + + '/cmt'
113                        else:
114                            print '# error path not found for', name
115                            sys.exit(-1)
116                    else:
117                        print "error:",line
118                        print str(result)
119                        sys.exit(-1) 
120                    self.packages[result[0]]['path'] = full_path
121   
122    def get_uses(self, content):       
123        # initiates variables
124        lignes = self.format_uses(content)
125        self.packages [self.current_package] = {'version': '*', 'client': list(),'uses': list(), 'status': 'waiting', 'path': os.getcwd()}
126        previous_client = self.current_package
127        previous_level  = 0
128        level_stack    = [{'name':previous_client,'level':previous_level},]
129        ligne = lignes.pop()       
130        while len(lignes)!=0:   
131            current_level = string.find(ligne, 'use')
132            while current_level > previous_level:               
133                name    = string.split (ligne)[2]
134                version = string.split (ligne)[3]                             
135                if not self.packages.has_key (name):
136                  self.packages [name] = {'version': version, 'uses': list(), 'client': list(), 'status': 'waiting', 'path': None}
137                if name not in self.packages[previous_client]['uses']:
138                   self.packages[previous_client]['uses'].append (name)               
139                level_stack.append({'name':previous_client,'level':previous_level})
140                previous_client = name 
141                previous_level = current_level                 
142                if len(lignes):
143                    ligne = lignes.pop()
144                    #print ligne
145                    current_level = string.find(ligne, 'use')
146                       
147            #self.packages [previous_client]['status'] ='queued'
148            # restore the level
149            item = level_stack.pop()               
150            while item['level'] >= current_level:
151                 item = level_stack.pop()
152            previous_client = item['name']
153            previous_level  = item['level']
154            #print previous_client, '-->',string.split (ligne)[2]
155
156    def instanciate_packages(self, file=None):
157        # We create the schedule of the work units
158        print '# First, we initialize the DAG by parsing cmt show uses'
159        if file is None:
160            cmd  = 'cmt show uses'
161        else:   
162            cmd = 'cat ' + file       
163        status, output = commands.getstatusoutput (cmd)
164        if status != 0:
165            print output
166            sys.exit(-1)
167        self.get_uses(output)   
168        self.get_paths(output)
169        #self.check_execution (package=self.current_package)
170        #self.simulate_execution()
171
172    def simulate_execution(self):
173       ok = True
174       indice = 1
175       while ok:
176           runnable  = list()
177           checkable = list()
178           for key in self.packages:
179               if  self.packages[key]['status']!='done':
180                   if len(self.packages[key]['uses']) == 0:
181                       runnable.append(key)
182           if len(runnable):
183               print '\n#--------------------------------------------------------------'
184               print "# Execute parallel actions within packages " + str(runnable) 
185           #print 'checkable:',checkable
186           for selected in runnable:       
187               print '#--------------------------------------------------------------'
188               print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path']
189               print '#--------------------------------------------------------------'
190               self.packages[selected]['status']='done'       
191               indice = indice + 1
192               for key in self.packages:
193                   if selected in self.packages[key]['uses']:
194                       self.packages[key]['uses'].remove(selected)                               
195                       #print 'remove', selected, 'from',key
196           if len(runnable)==0:
197                           ok = False       
198               
199    def check_execution(self, package, path=list(), cycles=list()):
200        #print package,'-->',self.packages[package]['uses']
201        #print path
202        if package in path:
203            if path[path.index(package):] not in cycles:
204                print 'Cycles:',path[path.index(package):], package
205                cycles = cycles + path[path.index(package):]
206                sys.exit(-1)
207        path.append(package)
208        for item in self.packages[package]['uses']:
209              self.check_execution(package=item, path=path, cycles=cycles)
210              path.pop()       
211
212    def get_current_package(self):   
213        cmd = 'cmt show macro package'
214        status, output = commands.getstatusoutput (cmd)
215        if status != 0:
216            print output
217            sys.exit(-1)   
218        lines = string.split(output, '\n')
219        for line in lines:
220            if line [0] != '#':
221                start = string.find(line,"'")
222                end   = string.find(line[start+1:len(line)],"'")
223                return line [start+1:start+end+1]
224
225    def get_work_area_path (self, name):       
226        return self.packages [name]['path']
227       
228    def get_package_path (self, name):   
229        #return os.getcwd ()
230        cmd = 'cmt -use='+name+' run pwd'
231        status, output = commands.getstatusoutput (cmd)
232        if status != 0:
233            print output
234            sys.exit(-1)   
235        lines = string.split(output, '\n')
236        for line in lines:
237            if line [0] != '#' and line[:5] != "#CMT>":
238                print line
239                return line
240 
241    def print_dependencies(self):
242        print '# ------------------------' 
243        print '# package --> dependencies' 
244        print '# ------------------------' 
245        for key in self.packages.keys():
246            print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                       
247
248    def print_status(self, status):
249        print '# ------------------------' 
250        print '# package --> dependencies' 
251        print '# ------------------------' 
252        i = 1
253        for key in self.packages.keys():
254            if self.packages[key] ['status'] == status:
255                print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                 
256                i = i + 1
257           
258    def is_work_unit_waiting (self, name):
259        return self.packages[name] ['status'] == 'waiting'
260
261    def set_work_unit_status (self, name, status):
262        self.packages[name] ['status'] = status
263
264    def get_dependencies (self, name):
265        return self.packages[name] ['uses']
266   
267    def get_next_work_units (self):
268        result = list ()
269        for key in self.packages.keys():
270            if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
271                result.append(key)
272        return result
273
274    def is_work_units (self):
275        result = list ()
276        for key in self.packages.keys():
277            if self.is_work_unit_waiting(key) :
278                return True
279        return False       
280
281    def suppress_work_unit (self, name):
282        #print '# remove', name, 'from schedule'
283        for key in self.packages.keys():
284            if name in self.packages[key]['uses']:
285                self.packages[key]['uses'].remove(name)
286
287    def add_work_unit (self, name, cmd):
288        if self.is_work_unit_waiting (name):
289            # we create requests
290            arg = {'cmd': cmd , 'package':name}
291            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception) 
292            # then we put the work request in the queue...
293            self.set_work_unit_status (name, 'queued')
294            self.pool.putRequest(req)
295            #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
296
297    def execute (self, command):
298        #self.print_dependencies ()
299        packages = self.get_next_work_units()
300        if len(packages) !=0:
301            print '\n#--------------------------------------------------------------'   
302            print '# Execute parallel actions within packages', packages                     
303            for package in packages:
304                self.add_work_unit (package, command)
305
306    def execute_all(self,command):
307        #self.print_dependencies ()
308        self.execute (command)
309        self.wait()
310        #self.print_dependencies ()
311        #self.print_status (status='waiting')       
312        #while self.is_work_units():
313        #self.wait()           
314       
315    def wait (self):
316       self.pool.wait()   
317
318    # this will be called each time a result is available
319    def result_callback(self, request, result):
320      #print "**Result: %s from request #%s" % (str(result), request.requestID)
321      #print "# Result: %s from request #%s" % (result['package'], request.requestID)
322      self.execute (result['cmd'])
323
324    # the work the threads will have to do
325    def do_execute(self, arg):
326      path = self.get_work_area_path (arg['package'])
327      if path == None:
328          raise RuntimeError('Path to package '+ arg['package'] +' not found')
329      self.set_work_unit_status (arg['package'], 'running')     
330      cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"
331      os.chdir(path)       
332      print '#--------------------------------------------------------------'
333      print '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path
334      print '#--------------------------------------------------------------'
335      cmd = arg['cmd']
336      status, output, error, pythonError  = exeCommand(cmd,  iTimeout = 10)     
337      self.suppress_work_unit (arg['package'])
338      self.set_work_unit_status (arg['package'], 'done')
339      # status, output= commands.getstatusoutput(cmd)
340      #if status != 0:
341      #   raise RuntimeError(output)
342      return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
343             
344    # this will be called when an exception occurs within a thread
345    def handle_exception(self, request, exc_info):
346        #traceback.print_stack()
347      print '#--------------------------------------------------------------'       
348      #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
349      print "# Exception occured: %s" %(exc_info[1])
350      print '#--------------------------------------------------------------'   
351      sys.exit(-1)
352#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.