source: tbroadcast/v2.0.4/python/tbroadcast.py @ 510

Last change on this file since 510 was 393, checked in by garonne, 17 years ago

See C.L 1

  • Property svn:executable set to *
File size: 25.2 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9import os
10import sys
11import time
12import string
13import random
14import os.path
15import commands
16import traceback
17import exceptions
18from threading import BoundedSemaphore
19
20from threadpool import WorkRequest
21from threadpool import ThreadPool
22from threadpool import NoResultsPending
23from threadpool import NoWorkersAvailable
24from threadpool import  makeRequests
25from executer   import  exeCommand
26
27class Scheduler:
28
29    def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None, error=None, silent = False, perf=False, keep_going=True):
30        self.pool            = ThreadPool(num_workers=num_workers)
31        self.current_package = self.get_current_package()
32        self.current_project = {'name': None, 'path': None, 'version': None}
33        self.packages        = {}
34        self.counter         = 0
35        self.semaphore       = BoundedSemaphore(1)
36        self.local           = local
37        self.ignore_cycles   = ignore_cycles
38        self.output          = output
39        self.error           = error
40        self.silent          = silent
41        self.perf            = perf
42        self.keep_going      = keep_going
43        if self.perf is not False:
44            f = open (self.perf, 'w+')
45            f.close()
46        if output is not None:
47            if not os.path.exists (output):
48                print "path",output,"no exits"       
49                sys.exit(-1)
50            if not os.path.isdir(output):
51                print "path",output,"no a valid directory"       
52                sys.exit(-1)           
53               
54        self.get_current_project()
55        self.instanciate_packages (file)
56        if self.local: self.get_local_graph()
57        self.check_cycles()
58       
59#        status, output = commands.getstatusoutput("cmt broadcast -local 'echo <package>'")
60#        lignes = string.split(output, '\n')
61#        i = 1
62#        for package in lignes:
63#            if package!='' and package[0] != '#':                           
64#                print i , package
65#                i =i +1
66#                if not self.packages.has_key(package):
67#                                    print package       
68#        print len(self.packages)
69#        sys.exit(-1)
70
71    def get_current_project(self):
72        cmd = 'cmt show projects | grep current'
73        status, output = commands.getstatusoutput (cmd)
74        if status != 0:
75            print output
76            sys.exit(-1)   
77        lines = string.split(output, '\n')
78        for line in lines:           
79            if line!='' and line [0] != '#':                           
80                item  = string.split (line, ' ')
81                self.current_project ['name']    = item[0]
82                self.current_project ['version'] = item[1]
83                self.current_project ['path']    = item[3][:-1]
84                version =  self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
85                if  self.current_project ['version'] == version:
86                    self.current_project ['path'] =  os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
87                return
88                #print self.current_project     
89       
90    def get_counter(self):
91        self.semaphore.acquire ()       
92        self.counter = self.counter + 1
93        value = self.counter
94        self.semaphore.release()
95        return value
96       
97    def check_cycles (self):
98        cmd = 'cmt -private show cycles'
99        cycle_found = False
100        status, output = commands.getstatusoutput (cmd)
101        if status != 0:
102            print output
103            sys.exit(-1)   
104        lines = string.split(output, '\n')
105        cycles = list ()
106        for line in lines:           
107            if line!='' and line [0] != '#':                   
108               cycles.append (string.split(line)) 
109        cercles =list()       
110        for cycle in cycles:
111            cycleInProject = True
112            for package in cycle:           
113                if not self.packages.has_key(package):
114                    cycleInProject = False       
115            if cycleInProject: 
116              cercles.append(cycle)
117        if len(cercles):
118            if not self.ignore_cycles:
119                print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
120                for cycle in cercles:
121                    loop = ""                   
122                    for package in cycle:
123                        loop = loop + package + ' -> '
124                    print loop + '...'
125                sys.exit(-1)       
126            else:
127                print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
128                for cycle in cercles:
129                    loop = ""                   
130                    for package in cycle:
131                        loop = loop + package + ' -> '                     
132                    if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
133                        print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
134                        self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
135#                sys.exit(-1)
136
137    def format_uses (self, content):
138        # format variables
139        lignes  = string.split(content, '\n')
140        lines   = list()
141        for ligne in lignes:
142           if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
143               lines.append(ligne)
144        lines.reverse()
145        return lines
146
147    def format_paths (self, content):
148        # format variables
149        lignes  = string.split(content, '\n')
150        lines   = list()
151        for ligne in lignes:
152            if ligne[:4] == "use ":             
153               lines.append(ligne)
154        return lines
155
156    def get_paths (self, content):
157        lines = self.format_paths(content)
158        for line in lines:
159                result  = string.split (line[4:len(line)], ' ')
160                if  self.packages.has_key(result[0]):
161                    if len(result)==4:
162                        name, version, offset, path = string.split (line[4:len(line)], " ")
163                        #print name, version, offset, path
164                        #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
165                        if path == '(no_auto_imports)':
166                            path   = offset
167                            offset = ''
168                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
169                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
170                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
171                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
172                        else:
173                            print '# error path not found for', name
174                            sys.exit(-1)   
175                    elif len(result)==5:
176                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")                                       
177                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
178                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
179                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
180                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
181                        else:
182                            print '# error path not found for', name
183                            sys.exit(-1)                                                                                                   
184                    elif len(result)==3:
185                        name, version, path = string.split (line[4:len(line)], " ")
186                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
187                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
188                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):   
189                            full_path = path[1:-1] + '/' +name + + '/cmt'
190                        else:
191                            print '# error path not found for', name
192                            sys.exit(-1)
193                    else:
194                        print "error:",line
195                        print str(result)
196                        sys.exit(-1) 
197                    self.packages[result[0]]['path'] = os.path.normpath(full_path)
198                    commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
199                    if os.path.normpath(commonprefix) == self.current_project ['path']:                   
200                        #print result[0], ' belong to project', self.current_project ['name']
201                        self.packages[result[0]]['current_project'] = True
202
203    def get_uses(self, content):       
204        # initiates variables
205        lignes = self.format_uses(content)
206        if not len(lignes): return
207        self.packages [self.current_package] = {'version': '*', 'client': list(),
208                                                'uses': list(), 'status': 'waiting', 
209                                                'current_project': True, 'path': os.getcwd()}
210        previous_client = self.current_package
211        previous_level  = 0
212        level_stack    = [{'name':previous_client,'level':previous_level},]
213        ligne = lignes.pop()       
214        while len(lignes)!=0:   
215            current_level = string.find(ligne, 'use')
216            while current_level > previous_level:               
217                name    = string.split (ligne)[2]
218                version = string.split (ligne)[3]                             
219                if not self.packages.has_key (name):
220                  self.packages [name] = {'version': version, 'uses': list(), 
221                                          'client': list(), 'status': 'waiting', 
222                                          'current_project': False, 'path': None}
223                if name not in self.packages[previous_client]['uses']:# and name != previous_client:
224                   self.packages[previous_client]['uses'].append (name)               
225                level_stack.append({'name':previous_client,'level':previous_level})
226                previous_client = name 
227                previous_level = current_level                 
228                if len(lignes):
229                    ligne = lignes.pop()
230                    #print ligne
231                    current_level = string.find(ligne, 'use')
232                                           
233            #self.packages [previous_client]['status'] ='queued'
234            # restore the level
235            if len(lignes):
236                if len(level_stack):                       
237                    item = level_stack.pop()               
238                    while item['level'] >= current_level and len(level_stack):
239                             item = level_stack.pop()
240                    previous_client = item['name']
241                    previous_level  = item['level']
242            #print previous_client, '-->',string.split (ligne)[2]
243
244    def instanciate_packages(self, file=None):
245        # We create the schedule of the work units
246        print '# First, we initialize the DAG by parsing "cmt show uses"'
247        if file is None:
248            cmd  = 'cmt show uses'
249        else:   
250            cmd = 'cat ' + file       
251        status, output = commands.getstatusoutput (cmd)
252        if status != 0:
253            print output
254            sys.exit(-1)
255        self.get_uses(output)   
256        self.get_paths(output)
257        #self.check_execution (package=self.current_package)
258        #self.simulate_execution()
259
260    def get_local_graph(self):
261        To_remove = list()
262        for key in self.packages:
263            if self.packages[key]['current_project']== False:
264                for selected in self.packages:
265                    if key in self.packages[selected]['uses']:
266                       self.packages[selected]['uses'].remove(key)
267                To_remove.append (key) 
268        for item in To_remove:
269             del self.packages[item]
270
271    def simulate_execution(self):
272       ok = True
273       indice = 1                     
274       while ok:
275           runnable  = list()
276           for key in self.packages:
277               if  self.packages[key]['status']!='done':
278                   if len(self.packages[key]['uses']) == 0:
279                       runnable.append(key)                                             
280           if len(runnable):
281               print '\n#--------------------------------------------------------------'
282               print "# Execute parallel actions within packages " + str(runnable) 
283           for selected in runnable:       
284               print '#--------------------------------------------------------------'
285               print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path']
286               print '#--------------------------------------------------------------'
287               self.packages[selected]['status']='done'       
288               indice = indice + 1
289               for key in self.packages:
290                   if selected in self.packages[key]['uses']:
291                       self.packages[key]['uses'].remove(selected)                               
292                       #print 'remove', selected, 'from',key
293           if len(runnable)==0:
294                           ok = False       
295               
296    def check_execution(self, package, path=list(), cycles=list()):
297        #print package,'-->',self.packages[package]['uses']
298        #print path
299        if package in path:
300            if path[path.index(package):] not in cycles:
301                print 'Cycles:',path[path.index(package):], package
302                cycles = cycles + path[path.index(package):]
303                sys.exit(-1)
304        path.append(package)
305        for item in self.packages[package]['uses']:
306              self.check_execution(package=item, path=path, cycles=cycles)
307              path.pop()       
308
309    def get_current_package(self):   
310        cmd = 'cmt show macro package'
311        status, output = commands.getstatusoutput (cmd)
312        if status != 0:
313            print output
314            sys.exit(-1)   
315        lines = string.split(output, '\n')
316        for line in lines:
317            if line [0] != '#':
318                start = string.find(line,"'")
319                end   = string.find(line[start+1:len(line)],"'")
320                return line [start+1:start+end+1]
321
322    def get_work_area_path (self, name):       
323        return self.packages [name]['path']
324       
325    def get_package_path (self, name):   
326        #return os.getcwd ()
327        cmd = 'cmt -use='+name+' run pwd'
328        status, output = commands.getstatusoutput (cmd)
329        if status != 0:
330            print output
331            sys.exit(-1)   
332        lines = string.split(output, '\n')
333        for line in lines:
334            if line [0] != '#' and line[:5] != "#CMT>":
335                print line
336                return line
337 
338    def print_dependencies(self):
339        print '# ------------------------' 
340        print '# package --> dependencies' 
341        print '# ------------------------' 
342        for key in self.packages.keys():
343            print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                       
344
345    def print_status(self, status):
346        print '# ------------------------' 
347        print '# package --> dependencies' 
348        print '# ------------------------' 
349        i = 1
350        for key in self.packages.keys():
351            if self.packages[key] ['status'] == status:
352                print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                 
353                i = i + 1
354           
355    def is_work_unit_waiting (self, name):
356        return self.packages[name] ['status'] == 'waiting'
357
358    def set_work_unit_status (self, name, status):
359        self.packages[name] ['status'] = status
360
361    def get_dependencies (self, name):
362        return self.packages[name] ['uses']
363   
364    def get_next_work_units (self):
365        result = list ()
366        for key in self.packages.keys():
367            if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
368                result.append(key)
369        return result
370
371    def is_work_units (self):
372        result = list ()
373        for key in self.packages.keys():
374            if self.is_work_unit_waiting(key) :
375                return True
376        return False       
377
378    def suppress_work_unit (self, name):
379        #print '# remove', name, 'from schedule'
380        for key in self.packages.keys():
381            if name in self.packages[key]['uses']:
382                self.packages[key]['uses'].remove(name)
383
384    def add_work_unit (self, name, cmd):
385        if self.is_work_unit_waiting (name):
386            # we create requests
387            arg = {'cmd': cmd , 'package':name}
388            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception) 
389            # then we put the work request in the queue...
390            self.set_work_unit_status (name, 'queued')
391            self.pool.putRequest(req)
392            #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
393
394    def execute (self, command):
395        #self.print_dependencies ()
396        packages = self.get_next_work_units()
397        if len(packages) !=0:
398            print '\n#--------------------------------------------------------------'   
399            print '# Execute parallel actions within packages', packages                     
400            for package in packages:
401                self.add_work_unit (package, command)
402
403    def execute_all(self,command):
404        #self.print_dependencies ()
405        self.execute (command)
406        self.wait()
407        #self.print_dependencies ()
408        #self.print_status (status='waiting')       
409        #while self.is_work_units():
410        #self.wait()           
411       
412    def wait (self):
413       self.pool.wait()   
414
415    # this will be called each time a result is available
416    def result_callback(self, request, result):
417      #print "**Result: %s from request #%s" % (str(result), request.requestID)
418      #print "# Result: %s from request #%s" % (result['package'], request.requestID)
419      #if result['package'] == 'CodeCheck':
420      #    sys.exit(-1)
421      self.execute (result['cmd'])   
422
423    # the work the threads will have to do
424    def do_execute(self, arg):
425      path = self.get_work_area_path (arg['package'])
426      if path == None or not os.path.exists(path):
427          raise RuntimeError('Path to package '+ arg['package'] +' not found')
428      self.set_work_unit_status (arg['package'], 'running')     
429      #cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"     
430      #os.chdir(path)
431      #arg['cmd'] = "cd "+ path +";"+ arg['cmd']
432      header = '#--------------------------------------------------------------\n'
433      header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path + '\n'
434      header = header + '#--------------------------------------------------------------\n'
435      print header
436      project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
437      log_name   =  string.replace(path, project_path, '')
438      log_name   = string.replace(log_name, '/cmt', '')
439      log_name   = string.replace(log_name, '/', '_')
440      log_name   = log_name+'.loglog'
441      arg['log'] = log_name
442      cmd = "cd "+ path +";"+ arg['cmd'] 
443      #status, output= commands.getstatusoutput(cmd)
444      # init output file
445      if self.output is not None:
446           f = open (self.output+'/'+ log_name, 'w+')
447           f.write (header)
448           f.close()     
449           if self.error is not None:
450               f = open (self.error+'/error'+log_name, 'w+')
451               f.close()
452      self.packages[arg['package']] ['startTime'] = time.time ()                           
453      status, output, error, pythonError  = exeCommand(sCmd=cmd, oLineCallback=self.redirectOutput, arg=arg)#,iTimeout = 3600)
454      if not self.keep_going and status > 0:
455        sys.exit(status)   
456                     
457      self.packages[arg['package']] ['endTime'] = time.time ()
458      if self.perf:
459          self.semaphore.acquire ()       
460          f = open (self.perf, 'a')
461          f.write (arg['package']+" "+str(self.packages[arg['package']] ['startTime'])+" "+str(self.packages[arg['package']] ['endTime'] )+'\n') 
462          f.close()
463          self.semaphore.release()
464      self.suppress_work_unit (arg['package'])
465      self.set_work_unit_status (arg['package'], 'done')
466      # status, output= commands.getstatusoutput(cmd)
467      #if status != 0:
468      #   raise RuntimeError(output)
469      return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
470
471    def redirectOutput(self, index, buffer, arg):
472        """Filter function to redirect the std output and error of the job
473           executable for real-time debugging
474        """
475        if self.output is not None:
476           if index==0:   
477               f = open (self.output+'/'+arg['log'], 'a')
478               f.write (buffer+'\n')
479               f.close()
480           elif index==1: 
481               if self.error is not None:
482                   f = open (self.error+'/error'+arg['log'], 'a')
483               else:
484                   f = open (self.output+'/'+arg['log'], 'a')                   
485               f.write (buffer+'\n')                   
486               f.close()                               
487        if not self.silent: 
488            print buffer
489             
490    # this will be called when an exception occurs within a thread
491    def handle_exception(self, request, exc_info):
492        #traceback.print_stack()
493      print '#--------------------------------------------------------------'       
494      #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
495      if exc_info[0]== exceptions.SystemExit:
496        print "Stop execution (No_keep_going option enabled): exit code == %s " %(exc_info[1]) 
497        print '#--------------------------------------------------------------'   
498        sys.exit(exc_info[1])
499      print "# Exception occured: %s" %(exc_info[1])     
500      print exc_info
501      print '#--------------------------------------------------------------'   
502      #sys.exit(-1)
503
504   
505    def generate_make (self, file, command):
506        makefile = open (file, 'w+')
507        makefile.write ('MAKE=make\n')
508        #MFLAGS= -j10
509        self.counter = len(self.packages)
510        self.recursive_make (self.current_package, command, makefile, len(self.packages))
511        makefile.close ()
512       
513    def recursive_make (self, package, command, makefile, indice,actions=list()):
514        lines = self.generate_action_make (package, command, indice)
515        makefile.write (lines)
516        #print lines               
517        for pkg in self.packages[package] ['uses']:
518            if pkg not in actions:
519                actions.append(pkg)
520                indice = indice - 1
521                self.counter = self.counter - 1
522                self.recursive_make(pkg, command,makefile, indice, actions)       
523       
524    def generate_action_make (self, package, command, indice):
525        lines = package + ' :: '       
526        # add dependencies
527        for pkg in self.packages[package] ['uses']:
528            lines = lines + ' ' + pkg           
529
530        # add the action itself
531        newcommand = string.replace (command, '<package>', package)       
532        if command =='':
533            newcommand='$(MAKE)'
534        lines = lines + '\n'
535        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
536        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'"\n'
537        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
538        lines = lines +  'ifdef LOCATION\n'
539        lines = lines +  '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'       
540        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n'
541        lines = lines +  '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n'
542        lines = lines + '\t+@cd ' + self.packages[package]['path']           
543        lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n'
544        lines = lines + 'else\n'
545        lines = lines + '\t+@cd ' + self.packages[package]['path']           
546        lines = lines + ' && ' + newcommand + '\n'
547        lines = lines + 'endif\n\n'               
548        return lines
549       
550#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.