source: tbroadcast/v2.0.3/python/tbroadcast.py @ 315

Last change on this file since 315 was 315, checked in by garonne, 18 years ago

format output

  • Property svn:executable set to *
File size: 24.7 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9import os
10import sys
11import time
12import string
13import random
14import os.path
15import commands
16import traceback
17from threading import BoundedSemaphore
18
19from threadpool import WorkRequest
20from threadpool import ThreadPool
21from threadpool import NoResultsPending
22from threadpool import NoWorkersAvailable
23from threadpool import  makeRequests
24from executer   import  exeCommand
25
26class Scheduler:
27
28    def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None, error=None, silent = False, perf=False):
29        self.pool            = ThreadPool(num_workers=num_workers)
30        self.current_package = self.get_current_package()
31        self.current_project = {'name': None, 'path': None, 'version': None}
32        self.packages        = {}
33        self.counter         = 0
34        self.semaphore       = BoundedSemaphore(1)
35        self.local           = local
36        self.ignore_cycles   = ignore_cycles
37        self.output          = output
38        self.error           = error
39        self.silent          = silent
40        self.perf            = perf
41        if self.perf is not False:
42            f = open (self.perf, 'w+')
43            f.close()
44        if output is not None:
45            if not os.path.exists (output):
46                print "path",output,"no exits"       
47                sys.exit(-1)
48            if not os.path.isdir(output):
49                print "path",output,"no a valid directory"       
50                sys.exit(-1)           
51               
52        self.get_current_project()
53        self.instanciate_packages (file)
54        if self.local: self.get_local_graph()
55        self.check_cycles()
56       
57#        status, output = commands.getstatusoutput("cmt broadcast -local 'echo <package>'")
58#        lignes = string.split(output, '\n')
59#        i = 1
60#        for package in lignes:
61#            if package!='' and package[0] != '#':                           
62#                print i , package
63#                i =i +1
64#                if not self.packages.has_key(package):
65#                                    print package       
66#        print len(self.packages)
67#        sys.exit(-1)
68
69    def get_current_project(self):
70        cmd = 'cmt show projects | grep current'
71        status, output = commands.getstatusoutput (cmd)
72        if status != 0:
73            print output
74            sys.exit(-1)   
75        lines = string.split(output, '\n')
76        for line in lines:           
77            if line!='' and line [0] != '#':                           
78                item  = string.split (line, ' ')
79                self.current_project ['name']    = item[0]
80                self.current_project ['version'] = item[1]
81                self.current_project ['path']    = item[3][:-1]
82                version =  self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
83                if  self.current_project ['version'] == version:
84                    self.current_project ['path'] =  os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
85                #print self.current_project     
86       
87    def get_counter(self):
88        self.semaphore.acquire ()       
89        self.counter = self.counter + 1
90        value = self.counter
91        self.semaphore.release()
92        return value
93       
94    def check_cycles (self):
95        cmd = 'cmt -private show cycles'
96        cycle_found = False
97        status, output = commands.getstatusoutput (cmd)
98        if status != 0:
99            print output
100            sys.exit(-1)   
101        lines = string.split(output, '\n')
102        cycles = list ()
103        for line in lines:           
104            if line!='' and line [0] != '#':                   
105               cycles.append (string.split(line)) 
106        cercles =list()       
107        for cycle in cycles:
108            cycleInProject = True
109            for package in cycle:           
110                if not self.packages.has_key(package):
111                    cycleInProject = False       
112            if cycleInProject: 
113              cercles.append(cycle)
114        if len(cercles):
115            if not self.ignore_cycles:
116                print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
117                for cycle in cercles:
118                    loop = ""                   
119                    for package in cycle:
120                        loop = loop + package + ' -> '
121                    print loop + '...'
122                sys.exit(-1)       
123            else:
124                print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
125                for cycle in cercles:
126                    loop = ""                   
127                    for package in cycle:
128                        loop = loop + package + ' -> '                     
129                    if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
130                        print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
131                        self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
132#                sys.exit(-1)
133
134    def format_uses (self, content):
135        # format variables
136        lignes  = string.split(content, '\n')
137        lines   = list()
138        for ligne in lignes:
139           if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
140               lines.append(ligne)
141        lines.reverse()
142        return lines
143
144    def format_paths (self, content):
145        # format variables
146        lignes  = string.split(content, '\n')
147        lines   = list()
148        for ligne in lignes:
149            if ligne[:4] == "use ":             
150               lines.append(ligne)
151        return lines
152
153    def get_paths (self, content):
154        lines = self.format_paths(content)
155        for line in lines:
156                result  = string.split (line[4:len(line)], ' ')
157                if  self.packages.has_key(result[0]):
158                    if len(result)==4:
159                        name, version, offset, path = string.split (line[4:len(line)], " ")
160                        #print name, version, offset, path
161                        #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
162                        if path == '(no_auto_imports)':
163                            path   = offset
164                            offset = ''
165                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
166                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
167                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
168                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
169                        else:
170                            print '# error path not found for', name
171                            sys.exit(-1)   
172                    elif len(result)==5:
173                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")                                       
174                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
175                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
176                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
177                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
178                        else:
179                            print '# error path not found for', name
180                            sys.exit(-1)                                                                                                   
181                    elif len(result)==3:
182                        name, version, path = string.split (line[4:len(line)], " ")
183                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
184                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
185                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):   
186                            full_path = path[1:-1] + '/' +name + + '/cmt'
187                        else:
188                            print '# error path not found for', name
189                            sys.exit(-1)
190                    else:
191                        print "error:",line
192                        print str(result)
193                        sys.exit(-1) 
194                    self.packages[result[0]]['path'] = os.path.normpath(full_path)
195                    commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
196                    if os.path.normpath(commonprefix) == self.current_project ['path']:                   
197                        #print result[0], ' belong to project', self.current_project ['name']
198                        self.packages[result[0]]['current_project'] = True
199
200    def get_uses(self, content):       
201        # initiates variables
202        lignes = self.format_uses(content)
203        if not len(lignes): return
204        self.packages [self.current_package] = {'version': '*', 'client': list(),
205                                                'uses': list(), 'status': 'waiting', 
206                                                'current_project': True, 'path': os.getcwd()}
207        previous_client = self.current_package
208        previous_level  = 0
209        level_stack    = [{'name':previous_client,'level':previous_level},]
210        ligne = lignes.pop()       
211        while len(lignes)!=0:   
212            current_level = string.find(ligne, 'use')
213            while current_level > previous_level:               
214                name    = string.split (ligne)[2]
215                version = string.split (ligne)[3]                             
216                if not self.packages.has_key (name):
217                  self.packages [name] = {'version': version, 'uses': list(), 
218                                          'client': list(), 'status': 'waiting', 
219                                          'current_project': False, 'path': None}
220                if name not in self.packages[previous_client]['uses']:# and name != previous_client:
221                   self.packages[previous_client]['uses'].append (name)               
222                level_stack.append({'name':previous_client,'level':previous_level})
223                previous_client = name 
224                previous_level = current_level                 
225                if len(lignes):
226                    ligne = lignes.pop()
227                    #print ligne
228                    current_level = string.find(ligne, 'use')
229                                           
230            #self.packages [previous_client]['status'] ='queued'
231            # restore the level
232            if len(lignes):
233                if len(level_stack):                       
234                    item = level_stack.pop()               
235                    while item['level'] >= current_level and len(level_stack):
236                             item = level_stack.pop()
237                    previous_client = item['name']
238                    previous_level  = item['level']
239            #print previous_client, '-->',string.split (ligne)[2]
240
241    def instanciate_packages(self, file=None):
242        # We create the schedule of the work units
243        print '# First, we initialize the DAG by parsing "cmt show uses"'
244        if file is None:
245            cmd  = 'cmt show uses'
246        else:   
247            cmd = 'cat ' + file       
248        status, output = commands.getstatusoutput (cmd)
249        if status != 0:
250            print output
251            sys.exit(-1)
252        self.get_uses(output)   
253        self.get_paths(output)
254        #self.check_execution (package=self.current_package)
255        #self.simulate_execution()
256
257    def get_local_graph(self):
258        To_remove = list()
259        for key in self.packages:
260            if self.packages[key]['current_project']== False:
261                for selected in self.packages:
262                    if key in self.packages[selected]['uses']:
263                       self.packages[selected]['uses'].remove(key)
264                To_remove.append (key) 
265        for item in To_remove:
266             del self.packages[item]
267
268    def simulate_execution(self):
269       ok = True
270       indice = 1                     
271       while ok:
272           runnable  = list()
273           for key in self.packages:
274               if  self.packages[key]['status']!='done':
275                   if len(self.packages[key]['uses']) == 0:
276                       runnable.append(key)                                             
277           if len(runnable):
278               print '\n#--------------------------------------------------------------'
279               print "# Execute parallel actions within packages " + str(runnable) 
280           for selected in runnable:       
281               print '#--------------------------------------------------------------'
282               print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path']
283               print '#--------------------------------------------------------------'
284               self.packages[selected]['status']='done'       
285               indice = indice + 1
286               for key in self.packages:
287                   if selected in self.packages[key]['uses']:
288                       self.packages[key]['uses'].remove(selected)                               
289                       #print 'remove', selected, 'from',key
290           if len(runnable)==0:
291                           ok = False       
292               
293    def check_execution(self, package, path=list(), cycles=list()):
294        #print package,'-->',self.packages[package]['uses']
295        #print path
296        if package in path:
297            if path[path.index(package):] not in cycles:
298                print 'Cycles:',path[path.index(package):], package
299                cycles = cycles + path[path.index(package):]
300                sys.exit(-1)
301        path.append(package)
302        for item in self.packages[package]['uses']:
303              self.check_execution(package=item, path=path, cycles=cycles)
304              path.pop()       
305
306    def get_current_package(self):   
307        cmd = 'cmt show macro package'
308        status, output = commands.getstatusoutput (cmd)
309        if status != 0:
310            print output
311            sys.exit(-1)   
312        lines = string.split(output, '\n')
313        for line in lines:
314            if line [0] != '#':
315                start = string.find(line,"'")
316                end   = string.find(line[start+1:len(line)],"'")
317                return line [start+1:start+end+1]
318
319    def get_work_area_path (self, name):       
320        return self.packages [name]['path']
321       
322    def get_package_path (self, name):   
323        #return os.getcwd ()
324        cmd = 'cmt -use='+name+' run pwd'
325        status, output = commands.getstatusoutput (cmd)
326        if status != 0:
327            print output
328            sys.exit(-1)   
329        lines = string.split(output, '\n')
330        for line in lines:
331            if line [0] != '#' and line[:5] != "#CMT>":
332                print line
333                return line
334 
335    def print_dependencies(self):
336        print '# ------------------------' 
337        print '# package --> dependencies' 
338        print '# ------------------------' 
339        for key in self.packages.keys():
340            print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                       
341
342    def print_status(self, status):
343        print '# ------------------------' 
344        print '# package --> dependencies' 
345        print '# ------------------------' 
346        i = 1
347        for key in self.packages.keys():
348            if self.packages[key] ['status'] == status:
349                print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                 
350                i = i + 1
351           
352    def is_work_unit_waiting (self, name):
353        return self.packages[name] ['status'] == 'waiting'
354
355    def set_work_unit_status (self, name, status):
356        self.packages[name] ['status'] = status
357
358    def get_dependencies (self, name):
359        return self.packages[name] ['uses']
360   
361    def get_next_work_units (self):
362        result = list ()
363        for key in self.packages.keys():
364            if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
365                result.append(key)
366        return result
367
368    def is_work_units (self):
369        result = list ()
370        for key in self.packages.keys():
371            if self.is_work_unit_waiting(key) :
372                return True
373        return False       
374
375    def suppress_work_unit (self, name):
376        #print '# remove', name, 'from schedule'
377        for key in self.packages.keys():
378            if name in self.packages[key]['uses']:
379                self.packages[key]['uses'].remove(name)
380
381    def add_work_unit (self, name, cmd):
382        if self.is_work_unit_waiting (name):
383            # we create requests
384            arg = {'cmd': cmd , 'package':name}
385            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception) 
386            # then we put the work request in the queue...
387            self.set_work_unit_status (name, 'queued')
388            self.pool.putRequest(req)
389            #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
390
391    def execute (self, command):
392        #self.print_dependencies ()
393        packages = self.get_next_work_units()
394        if len(packages) !=0:
395            print '\n#--------------------------------------------------------------'   
396            print '# Execute parallel actions within packages', packages                     
397            for package in packages:
398                self.add_work_unit (package, command)
399
400    def execute_all(self,command):
401        #self.print_dependencies ()
402        self.execute (command)
403        self.wait()
404        #self.print_dependencies ()
405        #self.print_status (status='waiting')       
406        #while self.is_work_units():
407        #self.wait()           
408       
409    def wait (self):
410       self.pool.wait()   
411
412    # this will be called each time a result is available
413    def result_callback(self, request, result):
414      #print "**Result: %s from request #%s" % (str(result), request.requestID)
415      #print "# Result: %s from request #%s" % (result['package'], request.requestID)
416      #if result['package'] == 'CodeCheck':
417      #    sys.exit(-1)
418      self.execute (result['cmd'])   
419
420    # the work the threads will have to do
421    def do_execute(self, arg):
422      path = self.get_work_area_path (arg['package'])
423      if path == None or not os.path.exists(path):
424          raise RuntimeError('Path to package '+ arg['package'] +' not found')
425      self.set_work_unit_status (arg['package'], 'running')     
426      #cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"     
427      #os.chdir(path)
428      #arg['cmd'] = "cd "+ path +";"+ arg['cmd']
429      header = '#--------------------------------------------------------------\n'
430      header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path + '\n'
431      header = header + '#--------------------------------------------------------------\n'
432      print header
433      project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
434      log_name   =  string.replace(path, project_path, '')
435      log_name   = string.replace(log_name, '/cmt', '')
436      log_name   = string.replace(log_name, '/', '_')
437      log_name   = log_name+'.loglog'
438      arg['log'] = log_name
439      cmd = "cd "+ path +";"+ arg['cmd'] 
440      #status, output= commands.getstatusoutput(cmd)
441      # init output file
442      if self.output is not None:
443           f = open (self.output+'/'+ log_name, 'w+')
444           f.write (header)
445           f.close()     
446           if self.error is not None:
447               f = open (self.error+'/error'+log_name, 'w+')
448               f.close()
449      self.packages[arg['package']] ['startTime'] = time.time ()                           
450      status, output, error, pythonError  = exeCommand(sCmd=cmd, oLineCallback=self.redirectOutput, arg=arg)#,iTimeout = 3600)
451      self.packages[arg['package']] ['endTime'] = time.time ()
452      if self.perf:
453          self.semaphore.acquire ()       
454          f = open (self.perf, 'a')
455          f.write (arg['package']+" "+str(self.packages[arg['package']] ['startTime'])+" "+str(self.packages[arg['package']] ['endTime'] )+'\n') 
456          f.close()
457          self.semaphore.release()
458      self.suppress_work_unit (arg['package'])
459      self.set_work_unit_status (arg['package'], 'done')
460      # status, output= commands.getstatusoutput(cmd)
461      #if status != 0:
462      #   raise RuntimeError(output)
463      return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
464
465    def redirectOutput(self, index, buffer, arg):
466        """Filter function to redirect the std output and error of the job
467           executable for real-time debugging
468        """
469        if self.output is not None:
470           if index==0:   
471               f = open (self.output+'/'+arg['log'], 'a')
472               f.write (buffer+'\n')
473               f.close()
474           elif index==1: 
475               if self.error is not None:
476                   f = open (self.error+'/error'+arg['log'], 'a')
477               else:
478                   f = open (self.output+'/'+arg['log'], 'a')                   
479               f.write (buffer+'\n')                   
480               f.close()                               
481        if not self.silent: 
482            print buffer
483             
484    # this will be called when an exception occurs within a thread
485    def handle_exception(self, request, exc_info):
486        #traceback.print_stack()
487      print '#--------------------------------------------------------------'       
488      #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
489      print "# Exception occured: %s" %(exc_info[1])
490      print exc_info
491      print '#--------------------------------------------------------------'   
492      #sys.exit(-1)
493
494   
495    def generate_make (self, file, command):
496        makefile = open (file, 'w+')
497        makefile.write ('MAKE=make\n')
498        #MFLAGS= -j10
499        self.counter = len(self.packages)
500        self.recursive_make (self.current_package, command, makefile, len(self.packages))
501        makefile.close ()
502       
503    def recursive_make (self, package, command, makefile, indice,actions=list()):
504        lines = self.generate_action_make (package, command, indice)
505        makefile.write (lines)
506        #print lines               
507        for pkg in self.packages[package] ['uses']:
508            if pkg not in actions:
509                actions.append(pkg)
510                indice = indice - 1
511                self.counter = self.counter - 1
512                self.recursive_make(pkg, command,makefile, indice, actions)       
513       
514    def generate_action_make (self, package, command, indice):
515        lines = package + ' :: '       
516        # add dependencies
517        for pkg in self.packages[package] ['uses']:
518            lines = lines + ' ' + pkg           
519
520        # add the action itself
521        newcommand = string.replace (command, '<package>', package)       
522        if command =='':
523            newcommand='$(MAKE)'
524        lines = lines + '\n'
525        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
526        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'"\n'
527        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
528        lines = lines +  'ifdef LOCATION\n'
529        lines = lines +  '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'       
530        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n'
531        lines = lines +  '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n'
532        lines = lines + '\t+@cd ' + self.packages[package]['path']           
533        lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n'
534        lines = lines + 'else\n'
535        lines = lines + '\t+@cd ' + self.packages[package]['path']           
536        lines = lines + ' && ' + newcommand + '\n'
537        lines = lines + 'endif\n\n'               
538        return lines
539       
540#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.