source: tbroadcast/HEAD/python/tbroadcast.py @ 509

Last change on this file since 509 was 508, checked in by garonne, 15 years ago

Commit changes

  • Property svn:executable set to *
File size: 24.6 KB
RevLine 
[232]1#----------------------------------#
2# -- Author: V.Garonne
[242]3# -- Mail: garonne@lal.in2p3.fr
[232]4# -- Date: 08/25/2006
5# -- Name: tbroadcast
[242]6# -- Description: main class
[232]7#----------------------------------#
8
9import os
10import sys
11import time
12import string
13import random
[243]14import os.path
[232]15import commands
[239]16import traceback
[393]17import exceptions
[241]18from threading import BoundedSemaphore
[232]19
[508]20from threadpool  import WorkRequest
21from threadpool  import ThreadPool
22from threadpool  import NoResultsPending
23from threadpool  import NoWorkersAvailable
24from threadpool  import  makeRequests
[232]25
[508]26from  subprocess import Popen
27
[232]28class Scheduler:
29
[393]30    def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None, error=None, silent = False, perf=False, keep_going=True):
[241]31        self.pool            = ThreadPool(num_workers=num_workers)
32        self.current_package = self.get_current_package()
[243]33        self.current_project = {'name': None, 'path': None, 'version': None}
[244]34        self.packages        = {}
[241]35        self.counter         = 0
36        self.semaphore       = BoundedSemaphore(1)
[243]37        self.local           = local
38        self.ignore_cycles   = ignore_cycles
39        self.output          = output
[244]40        self.error           = error
41        self.silent          = silent
[245]42        self.perf            = perf
[393]43        self.keep_going      = keep_going
[248]44        if self.perf is not False:
45            f = open (self.perf, 'w+')
[245]46            f.close()
[243]47        if output is not None:
48            if not os.path.exists (output):
49                print "path",output,"no exits"       
50                sys.exit(-1)
51            if not os.path.isdir(output):
52                print "path",output,"no a valid directory"       
53                sys.exit(-1)           
54               
55        self.get_current_project()
56        self.instanciate_packages (file)
57        if self.local: self.get_local_graph()
[238]58        self.check_cycles()
[244]59       
60#        status, output = commands.getstatusoutput("cmt broadcast -local 'echo <package>'")
61#        lignes = string.split(output, '\n')
62#        i = 1
63#        for package in lignes:
64#            if package!='' and package[0] != '#':                           
65#                print i , package
66#                i =i +1
67#                if not self.packages.has_key(package):
68#                                    print package       
69#        print len(self.packages)
70#        sys.exit(-1)
[243]71
72    def get_current_project(self):
73        cmd = 'cmt show projects | grep current'
[508]74        status, output = commands.getstatusoutput (cmd)
[243]75        if status != 0:
76            print output
77            sys.exit(-1)   
78        lines = string.split(output, '\n')
79        for line in lines:           
80            if line!='' and line [0] != '#':                           
81                item  = string.split (line, ' ')
82                self.current_project ['name']    = item[0]
83                self.current_project ['version'] = item[1]
84                self.current_project ['path']    = item[3][:-1]
85                version =  self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
86                if  self.current_project ['version'] == version:
87                    self.current_project ['path'] =  os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
[316]88                return
[243]89                #print self.current_project     
[241]90       
91    def get_counter(self):
92        self.semaphore.acquire ()       
93        self.counter = self.counter + 1
94        value = self.counter
95        self.semaphore.release()
96        return value
97       
[238]98    def check_cycles (self):
[243]99        cmd = 'cmt -private show cycles'
[238]100        cycle_found = False
[508]101        status, output = commands.getstatusoutput (cmd)
[238]102        if status != 0:
103            print output
104            sys.exit(-1)   
105        lines = string.split(output, '\n')
[243]106        cycles = list ()
[238]107        for line in lines:           
[243]108            if line!='' and line [0] != '#':                   
109               cycles.append (string.split(line)) 
110        cercles =list()       
111        for cycle in cycles:
112            cycleInProject = True
113            for package in cycle:           
114                if not self.packages.has_key(package):
115                    cycleInProject = False       
116            if cycleInProject: 
117              cercles.append(cycle)
118        if len(cercles):
119            if not self.ignore_cycles:
120                print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
121                for cycle in cercles:
122                    loop = ""                   
123                    for package in cycle:
124                        loop = loop + package + ' -> '
125                    print loop + '...'
126                sys.exit(-1)       
127            else:
128                print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
129                for cycle in cercles:
130                    loop = ""                   
131                    for package in cycle:
132                        loop = loop + package + ' -> '                     
133                    if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
134                        print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
135                        self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
136#                sys.exit(-1)
[242]137
138    def format_uses (self, content):
139        # format variables
140        lignes  = string.split(content, '\n')
141        lines   = list()
142        for ligne in lignes:
[243]143           if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
[242]144               lines.append(ligne)
145        lines.reverse()
146        return lines
147
148    def format_paths (self, content):
149        # format variables
150        lignes  = string.split(content, '\n')
151        lines   = list()
152        for ligne in lignes:
153            if ligne[:4] == "use ":             
154               lines.append(ligne)
155        return lines
156
157    def get_paths (self, content):
158        lines = self.format_paths(content)
[232]159        for line in lines:
[237]160                result  = string.split (line[4:len(line)], ' ')
161                if  self.packages.has_key(result[0]):
162                    if len(result)==4:
163                        name, version, offset, path = string.split (line[4:len(line)], " ")
[241]164                        #print name, version, offset, path
165                        #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
166                        if path == '(no_auto_imports)':
167                            path   = offset
168                            offset = ''
[237]169                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
170                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
171                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
172                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
173                        else:
174                            print '# error path not found for', name
175                            sys.exit(-1)   
176                    elif len(result)==5:
177                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")                                       
178                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
179                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
180                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
181                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
182                        else:
183                            print '# error path not found for', name
184                            sys.exit(-1)                                                                                                   
185                    elif len(result)==3:
186                        name, version, path = string.split (line[4:len(line)], " ")
187                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
188                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
189                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):   
190                            full_path = path[1:-1] + '/' +name + + '/cmt'
191                        else:
192                            print '# error path not found for', name
193                            sys.exit(-1)
194                    else:
195                        print "error:",line
196                        print str(result)
197                        sys.exit(-1) 
[243]198                    self.packages[result[0]]['path'] = os.path.normpath(full_path)
199                    commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
200                    if os.path.normpath(commonprefix) == self.current_project ['path']:                   
201                        #print result[0], ' belong to project', self.current_project ['name']
202                        self.packages[result[0]]['current_project'] = True
203
[242]204    def get_uses(self, content):       
205        # initiates variables
206        lignes = self.format_uses(content)
[243]207        if not len(lignes): return
208        self.packages [self.current_package] = {'version': '*', 'client': list(),
209                                                'uses': list(), 'status': 'waiting', 
210                                                'current_project': True, 'path': os.getcwd()}
[242]211        previous_client = self.current_package
212        previous_level  = 0
213        level_stack    = [{'name':previous_client,'level':previous_level},]
214        ligne = lignes.pop()       
215        while len(lignes)!=0:   
216            current_level = string.find(ligne, 'use')
217            while current_level > previous_level:               
218                name    = string.split (ligne)[2]
219                version = string.split (ligne)[3]                             
220                if not self.packages.has_key (name):
[243]221                  self.packages [name] = {'version': version, 'uses': list(), 
222                                          'client': list(), 'status': 'waiting', 
223                                          'current_project': False, 'path': None}
224                if name not in self.packages[previous_client]['uses']:# and name != previous_client:
[242]225                   self.packages[previous_client]['uses'].append (name)               
226                level_stack.append({'name':previous_client,'level':previous_level})
227                previous_client = name 
228                previous_level = current_level                 
229                if len(lignes):
230                    ligne = lignes.pop()
231                    #print ligne
232                    current_level = string.find(ligne, 'use')
[243]233                                           
[242]234            #self.packages [previous_client]['status'] ='queued'
235            # restore the level
[243]236            if len(lignes):
237                if len(level_stack):                       
238                    item = level_stack.pop()               
239                    while item['level'] >= current_level and len(level_stack):
240                             item = level_stack.pop()
241                    previous_client = item['name']
242                    previous_level  = item['level']
[242]243            #print previous_client, '-->',string.split (ligne)[2]
[232]244
[242]245    def instanciate_packages(self, file=None):
246        # We create the schedule of the work units
[243]247        print '# First, we initialize the DAG by parsing "cmt show uses"'
[242]248        if file is None:
249            cmd  = 'cmt show uses'
250        else:   
251            cmd = 'cat ' + file       
[508]252        status, output = commands.getstatusoutput (cmd)
[242]253        if status != 0:
254            print output
255            sys.exit(-1)
256        self.get_uses(output)   
257        self.get_paths(output)
258        #self.check_execution (package=self.current_package)
259        #self.simulate_execution()
260
[243]261    def get_local_graph(self):
262        To_remove = list()
263        for key in self.packages:
264            if self.packages[key]['current_project']== False:
265                for selected in self.packages:
266                    if key in self.packages[selected]['uses']:
267                       self.packages[selected]['uses'].remove(key)
268                To_remove.append (key) 
269        for item in To_remove:
[244]270             del self.packages[item]
[243]271
[242]272    def simulate_execution(self):
273       ok = True
[243]274       indice = 1                     
[242]275       while ok:
276           runnable  = list()
277           for key in self.packages:
278               if  self.packages[key]['status']!='done':
279                   if len(self.packages[key]['uses']) == 0:
[243]280                       runnable.append(key)                                             
[242]281           if len(runnable):
282               print '\n#--------------------------------------------------------------'
283               print "# Execute parallel actions within packages " + str(runnable) 
284           for selected in runnable:       
285               print '#--------------------------------------------------------------'
286               print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path']
287               print '#--------------------------------------------------------------'
288               self.packages[selected]['status']='done'       
289               indice = indice + 1
290               for key in self.packages:
291                   if selected in self.packages[key]['uses']:
292                       self.packages[key]['uses'].remove(selected)                               
293                       #print 'remove', selected, 'from',key
294           if len(runnable)==0:
295                           ok = False       
296               
297    def check_execution(self, package, path=list(), cycles=list()):
298        #print package,'-->',self.packages[package]['uses']
299        #print path
300        if package in path:
301            if path[path.index(package):] not in cycles:
302                print 'Cycles:',path[path.index(package):], package
303                cycles = cycles + path[path.index(package):]
304                sys.exit(-1)
305        path.append(package)
306        for item in self.packages[package]['uses']:
307              self.check_execution(package=item, path=path, cycles=cycles)
308              path.pop()       
309
[232]310    def get_current_package(self):   
[508]311        cmd            = 'cmt show macro package'
312        status, output = commands.getstatusoutput (cmd)
[232]313        if status != 0:
314            print output
315            sys.exit(-1)   
316        lines = string.split(output, '\n')
317        for line in lines:
318            if line [0] != '#':
319                start = string.find(line,"'")
320                end   = string.find(line[start+1:len(line)],"'")
321                return line [start+1:start+end+1]
322
323    def get_work_area_path (self, name):       
324        return self.packages [name]['path']
325       
326    def get_package_path (self, name):   
[237]327        #return os.getcwd ()
[232]328        cmd = 'cmt -use='+name+' run pwd'
[508]329        status, output = commands.getstatusoutput (cmd)
[232]330        if status != 0:
331            print output
332            sys.exit(-1)   
333        lines = string.split(output, '\n')
334        for line in lines:
[237]335            if line [0] != '#' and line[:5] != "#CMT>":
336                print line
[232]337                return line
338 
339    def print_dependencies(self):
340        print '# ------------------------' 
341        print '# package --> dependencies' 
342        print '# ------------------------' 
343        for key in self.packages.keys():
[242]344            print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                       
[232]345
[241]346    def print_status(self, status):
347        print '# ------------------------' 
348        print '# package --> dependencies' 
349        print '# ------------------------' 
350        i = 1
351        for key in self.packages.keys():
352            if self.packages[key] ['status'] == status:
[242]353                print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                 
[241]354                i = i + 1
355           
[232]356    def is_work_unit_waiting (self, name):
357        return self.packages[name] ['status'] == 'waiting'
358
359    def set_work_unit_status (self, name, status):
360        self.packages[name] ['status'] = status
361
362    def get_dependencies (self, name):
[242]363        return self.packages[name] ['uses']
[232]364   
365    def get_next_work_units (self):
366        result = list ()
367        for key in self.packages.keys():
368            if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
369                result.append(key)
370        return result
371
[238]372    def is_work_units (self):
373        result = list ()
374        for key in self.packages.keys():
375            if self.is_work_unit_waiting(key) :
376                return True
377        return False       
378
[232]379    def suppress_work_unit (self, name):
[239]380        #print '# remove', name, 'from schedule'
[232]381        for key in self.packages.keys():
[242]382            if name in self.packages[key]['uses']:
383                self.packages[key]['uses'].remove(name)
[241]384
[232]385    def add_work_unit (self, name, cmd):
386        if self.is_work_unit_waiting (name):
387            # we create requests
388            arg = {'cmd': cmd , 'package':name}
[508]389            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback) 
[232]390            # then we put the work request in the queue...
391            self.set_work_unit_status (name, 'queued')
392            self.pool.putRequest(req)
393            #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
394
395    def execute (self, command):
[238]396        #self.print_dependencies ()
[232]397        packages = self.get_next_work_units()
398        if len(packages) !=0:
[240]399            print '\n#--------------------------------------------------------------'   
400            print '# Execute parallel actions within packages', packages                     
[238]401            for package in packages:
402                self.add_work_unit (package, command)
[232]403
404    def execute_all(self,command):
[241]405        #self.print_dependencies ()
[232]406        self.execute (command)
407        self.wait()
[241]408        #self.print_dependencies ()
409        #self.print_status (status='waiting')       
410        #while self.is_work_units():
[242]411        #self.wait()           
[232]412       
413    def wait (self):
414       self.pool.wait()   
415
416    # this will be called each time a result is available
417    def result_callback(self, request, result):
418      #print "**Result: %s from request #%s" % (str(result), request.requestID)
[239]419      #print "# Result: %s from request #%s" % (result['package'], request.requestID)
[245]420      #if result['package'] == 'CodeCheck':
421      #    sys.exit(-1)
422      self.execute (result['cmd'])   
[232]423
424    # the work the threads will have to do
425    def do_execute(self, arg):
[240]426      path = self.get_work_area_path (arg['package'])
[245]427      if path == None or not os.path.exists(path):
[240]428          raise RuntimeError('Path to package '+ arg['package'] +' not found')
429      self.set_work_unit_status (arg['package'], 'running')     
[245]430      #cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"     
431      #os.chdir(path)
432      #arg['cmd'] = "cd "+ path +";"+ arg['cmd']
[260]433      header = '#--------------------------------------------------------------\n'
434      header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path + '\n'
435      header = header + '#--------------------------------------------------------------\n'
436      print header
[261]437      project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
[508]438      log_name     = string.replace(path, project_path, '')
439      log_name     = string.replace(log_name, '/cmt', '')
440      log_name     = string.replace(log_name, '/', '_')
441      log_name     = log_name+'.loglog'
442      arg['log']   = log_name
[245]443      cmd = "cd "+ path +";"+ arg['cmd'] 
[244]444      #status, output= commands.getstatusoutput(cmd)
445      # init output file
[508]446
447      self.packages[arg['package']] ['startTime'] = time.time ()                           
448
[243]449      if self.output is not None:
[508]450           f1 = open (self.output+'/'+ log_name, 'w+')           
451           f1.write (header)
452           f1.close()
453           f1 = open (self.output+'/'+ log_name, 'a')
454           if self.error is not None:       
455               f2 = open (self.error+'/error'+log_name, 'w+')               
456               Popen(cmd, shell=True, stdout=f1, stderr=f2).communicate()
457               f2.close() 
458           else:
459               Popen(cmd, shell=True, stdout=f1, stderr=f1).communicate()
460           f1.close()               
461      else:
462          Popen(cmd, shell=True).communicate()
463     
[393]464      if not self.keep_going and status > 0:
465        sys.exit(status)   
466                     
[245]467      self.packages[arg['package']] ['endTime'] = time.time ()
468      if self.perf:
469          self.semaphore.acquire ()       
[248]470          f = open (self.perf, 'a')
[245]471          f.write (arg['package']+" "+str(self.packages[arg['package']] ['startTime'])+" "+str(self.packages[arg['package']] ['endTime'] )+'\n') 
472          f.close()
473          self.semaphore.release()
[240]474      self.suppress_work_unit (arg['package'])
475      self.set_work_unit_status (arg['package'], 'done')
476      # status, output= commands.getstatusoutput(cmd)
477      #if status != 0:
478      #   raise RuntimeError(output)
[508]479      return {'cmd': arg['cmd'], 'package':arg['package']}
[244]480
[239]481             
[232]482    # this will be called when an exception occurs within a thread
483    def handle_exception(self, request, exc_info):
[239]484        #traceback.print_stack()
[240]485      print '#--------------------------------------------------------------'       
486      #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
[393]487      if exc_info[0]== exceptions.SystemExit:
488        print "Stop execution (No_keep_going option enabled): exit code == %s " %(exc_info[1]) 
489        print '#--------------------------------------------------------------'   
490        sys.exit(exc_info[1])
491      print "# Exception occured: %s" %(exc_info[1])     
[245]492      print exc_info
[240]493      print '#--------------------------------------------------------------'   
[245]494      #sys.exit(-1)
[244]495
[306]496   
497    def generate_make (self, file, command):
498        makefile = open (file, 'w+')
[315]499        makefile.write ('MAKE=make\n')
[312]500        #MFLAGS= -j10
[310]501        self.counter = len(self.packages)
[306]502        self.recursive_make (self.current_package, command, makefile, len(self.packages))
503        makefile.close ()
504       
505    def recursive_make (self, package, command, makefile, indice,actions=list()):
506        lines = self.generate_action_make (package, command, indice)
507        makefile.write (lines)
[307]508        #print lines               
[306]509        for pkg in self.packages[package] ['uses']:
510            if pkg not in actions:
511                actions.append(pkg)
512                indice = indice - 1
[310]513                self.counter = self.counter - 1
[306]514                self.recursive_make(pkg, command,makefile, indice, actions)       
515       
516    def generate_action_make (self, package, command, indice):
517        lines = package + ' :: '       
518        # add dependencies
519        for pkg in self.packages[package] ['uses']:
520            lines = lines + ' ' + pkg           
521
522        # add the action itself
[310]523        newcommand = string.replace (command, '<package>', package)       
524        if command =='':
[312]525            newcommand='$(MAKE)'
[306]526        lines = lines + '\n'
527        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
[310]528        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'"\n'
[306]529        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
[314]530        lines = lines +  'ifdef LOCATION\n'
[315]531        lines = lines +  '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'       
532        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n'
533        lines = lines +  '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n'
[311]534        lines = lines + '\t+@cd ' + self.packages[package]['path']           
[315]535        lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n'
[314]536        lines = lines + 'else\n'
537        lines = lines + '\t+@cd ' + self.packages[package]['path']           
538        lines = lines + ' && ' + newcommand + '\n'
539        lines = lines + 'endif\n\n'               
[306]540        return lines
541       
[478]542#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.