source: tbroadcast/v2.0.6/python/tbroadcast.py @ 517

Last change on this file since 517 was 508, checked in by garonne, 15 years ago

Commit changes

  • Property svn:executable set to *
File size: 24.6 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9import os
10import sys
11import time
12import string
13import random
14import os.path
15import commands
16import traceback
17import exceptions
18from threading import BoundedSemaphore
19
20from threadpool  import WorkRequest
21from threadpool  import ThreadPool
22from threadpool  import NoResultsPending
23from threadpool  import NoWorkersAvailable
24from threadpool  import  makeRequests
25
26from  subprocess import Popen
27
28class Scheduler:
29
30    def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None, error=None, silent = False, perf=False, keep_going=True):
31        self.pool            = ThreadPool(num_workers=num_workers)
32        self.current_package = self.get_current_package()
33        self.current_project = {'name': None, 'path': None, 'version': None}
34        self.packages        = {}
35        self.counter         = 0
36        self.semaphore       = BoundedSemaphore(1)
37        self.local           = local
38        self.ignore_cycles   = ignore_cycles
39        self.output          = output
40        self.error           = error
41        self.silent          = silent
42        self.perf            = perf
43        self.keep_going      = keep_going
44        if self.perf is not False:
45            f = open (self.perf, 'w+')
46            f.close()
47        if output is not None:
48            if not os.path.exists (output):
49                print "path",output,"no exits"       
50                sys.exit(-1)
51            if not os.path.isdir(output):
52                print "path",output,"no a valid directory"       
53                sys.exit(-1)           
54               
55        self.get_current_project()
56        self.instanciate_packages (file)
57        if self.local: self.get_local_graph()
58        self.check_cycles()
59       
60#        status, output = commands.getstatusoutput("cmt broadcast -local 'echo <package>'")
61#        lignes = string.split(output, '\n')
62#        i = 1
63#        for package in lignes:
64#            if package!='' and package[0] != '#':                           
65#                print i , package
66#                i =i +1
67#                if not self.packages.has_key(package):
68#                                    print package       
69#        print len(self.packages)
70#        sys.exit(-1)
71
72    def get_current_project(self):
73        cmd = 'cmt show projects | grep current'
74        status, output = commands.getstatusoutput (cmd)
75        if status != 0:
76            print output
77            sys.exit(-1)   
78        lines = string.split(output, '\n')
79        for line in lines:           
80            if line!='' and line [0] != '#':                           
81                item  = string.split (line, ' ')
82                self.current_project ['name']    = item[0]
83                self.current_project ['version'] = item[1]
84                self.current_project ['path']    = item[3][:-1]
85                version =  self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
86                if  self.current_project ['version'] == version:
87                    self.current_project ['path'] =  os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
88                return
89                #print self.current_project     
90       
91    def get_counter(self):
92        self.semaphore.acquire ()       
93        self.counter = self.counter + 1
94        value = self.counter
95        self.semaphore.release()
96        return value
97       
98    def check_cycles (self):
99        cmd = 'cmt -private show cycles'
100        cycle_found = False
101        status, output = commands.getstatusoutput (cmd)
102        if status != 0:
103            print output
104            sys.exit(-1)   
105        lines = string.split(output, '\n')
106        cycles = list ()
107        for line in lines:           
108            if line!='' and line [0] != '#':                   
109               cycles.append (string.split(line)) 
110        cercles =list()       
111        for cycle in cycles:
112            cycleInProject = True
113            for package in cycle:           
114                if not self.packages.has_key(package):
115                    cycleInProject = False       
116            if cycleInProject: 
117              cercles.append(cycle)
118        if len(cercles):
119            if not self.ignore_cycles:
120                print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
121                for cycle in cercles:
122                    loop = ""                   
123                    for package in cycle:
124                        loop = loop + package + ' -> '
125                    print loop + '...'
126                sys.exit(-1)       
127            else:
128                print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
129                for cycle in cercles:
130                    loop = ""                   
131                    for package in cycle:
132                        loop = loop + package + ' -> '                     
133                    if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
134                        print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
135                        self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
136#                sys.exit(-1)
137
138    def format_uses (self, content):
139        # format variables
140        lignes  = string.split(content, '\n')
141        lines   = list()
142        for ligne in lignes:
143           if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
144               lines.append(ligne)
145        lines.reverse()
146        return lines
147
148    def format_paths (self, content):
149        # format variables
150        lignes  = string.split(content, '\n')
151        lines   = list()
152        for ligne in lignes:
153            if ligne[:4] == "use ":             
154               lines.append(ligne)
155        return lines
156
157    def get_paths (self, content):
158        lines = self.format_paths(content)
159        for line in lines:
160                result  = string.split (line[4:len(line)], ' ')
161                if  self.packages.has_key(result[0]):
162                    if len(result)==4:
163                        name, version, offset, path = string.split (line[4:len(line)], " ")
164                        #print name, version, offset, path
165                        #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
166                        if path == '(no_auto_imports)':
167                            path   = offset
168                            offset = ''
169                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
170                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
171                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
172                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
173                        else:
174                            print '# error path not found for', name
175                            sys.exit(-1)   
176                    elif len(result)==5:
177                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")                                       
178                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
179                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
180                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
181                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
182                        else:
183                            print '# error path not found for', name
184                            sys.exit(-1)                                                                                                   
185                    elif len(result)==3:
186                        name, version, path = string.split (line[4:len(line)], " ")
187                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
188                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
189                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):   
190                            full_path = path[1:-1] + '/' +name + + '/cmt'
191                        else:
192                            print '# error path not found for', name
193                            sys.exit(-1)
194                    else:
195                        print "error:",line
196                        print str(result)
197                        sys.exit(-1) 
198                    self.packages[result[0]]['path'] = os.path.normpath(full_path)
199                    commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
200                    if os.path.normpath(commonprefix) == self.current_project ['path']:                   
201                        #print result[0], ' belong to project', self.current_project ['name']
202                        self.packages[result[0]]['current_project'] = True
203
204    def get_uses(self, content):       
205        # initiates variables
206        lignes = self.format_uses(content)
207        if not len(lignes): return
208        self.packages [self.current_package] = {'version': '*', 'client': list(),
209                                                'uses': list(), 'status': 'waiting', 
210                                                'current_project': True, 'path': os.getcwd()}
211        previous_client = self.current_package
212        previous_level  = 0
213        level_stack    = [{'name':previous_client,'level':previous_level},]
214        ligne = lignes.pop()       
215        while len(lignes)!=0:   
216            current_level = string.find(ligne, 'use')
217            while current_level > previous_level:               
218                name    = string.split (ligne)[2]
219                version = string.split (ligne)[3]                             
220                if not self.packages.has_key (name):
221                  self.packages [name] = {'version': version, 'uses': list(), 
222                                          'client': list(), 'status': 'waiting', 
223                                          'current_project': False, 'path': None}
224                if name not in self.packages[previous_client]['uses']:# and name != previous_client:
225                   self.packages[previous_client]['uses'].append (name)               
226                level_stack.append({'name':previous_client,'level':previous_level})
227                previous_client = name 
228                previous_level = current_level                 
229                if len(lignes):
230                    ligne = lignes.pop()
231                    #print ligne
232                    current_level = string.find(ligne, 'use')
233                                           
234            #self.packages [previous_client]['status'] ='queued'
235            # restore the level
236            if len(lignes):
237                if len(level_stack):                       
238                    item = level_stack.pop()               
239                    while item['level'] >= current_level and len(level_stack):
240                             item = level_stack.pop()
241                    previous_client = item['name']
242                    previous_level  = item['level']
243            #print previous_client, '-->',string.split (ligne)[2]
244
245    def instanciate_packages(self, file=None):
246        # We create the schedule of the work units
247        print '# First, we initialize the DAG by parsing "cmt show uses"'
248        if file is None:
249            cmd  = 'cmt show uses'
250        else:   
251            cmd = 'cat ' + file       
252        status, output = commands.getstatusoutput (cmd)
253        if status != 0:
254            print output
255            sys.exit(-1)
256        self.get_uses(output)   
257        self.get_paths(output)
258        #self.check_execution (package=self.current_package)
259        #self.simulate_execution()
260
261    def get_local_graph(self):
262        To_remove = list()
263        for key in self.packages:
264            if self.packages[key]['current_project']== False:
265                for selected in self.packages:
266                    if key in self.packages[selected]['uses']:
267                       self.packages[selected]['uses'].remove(key)
268                To_remove.append (key) 
269        for item in To_remove:
270             del self.packages[item]
271
272    def simulate_execution(self):
273       ok = True
274       indice = 1                     
275       while ok:
276           runnable  = list()
277           for key in self.packages:
278               if  self.packages[key]['status']!='done':
279                   if len(self.packages[key]['uses']) == 0:
280                       runnable.append(key)                                             
281           if len(runnable):
282               print '\n#--------------------------------------------------------------'
283               print "# Execute parallel actions within packages " + str(runnable) 
284           for selected in runnable:       
285               print '#--------------------------------------------------------------'
286               print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path']
287               print '#--------------------------------------------------------------'
288               self.packages[selected]['status']='done'       
289               indice = indice + 1
290               for key in self.packages:
291                   if selected in self.packages[key]['uses']:
292                       self.packages[key]['uses'].remove(selected)                               
293                       #print 'remove', selected, 'from',key
294           if len(runnable)==0:
295                           ok = False       
296               
297    def check_execution(self, package, path=list(), cycles=list()):
298        #print package,'-->',self.packages[package]['uses']
299        #print path
300        if package in path:
301            if path[path.index(package):] not in cycles:
302                print 'Cycles:',path[path.index(package):], package
303                cycles = cycles + path[path.index(package):]
304                sys.exit(-1)
305        path.append(package)
306        for item in self.packages[package]['uses']:
307              self.check_execution(package=item, path=path, cycles=cycles)
308              path.pop()       
309
310    def get_current_package(self):   
311        cmd            = 'cmt show macro package'
312        status, output = commands.getstatusoutput (cmd)
313        if status != 0:
314            print output
315            sys.exit(-1)   
316        lines = string.split(output, '\n')
317        for line in lines:
318            if line [0] != '#':
319                start = string.find(line,"'")
320                end   = string.find(line[start+1:len(line)],"'")
321                return line [start+1:start+end+1]
322
323    def get_work_area_path (self, name):       
324        return self.packages [name]['path']
325       
326    def get_package_path (self, name):   
327        #return os.getcwd ()
328        cmd = 'cmt -use='+name+' run pwd'
329        status, output = commands.getstatusoutput (cmd)
330        if status != 0:
331            print output
332            sys.exit(-1)   
333        lines = string.split(output, '\n')
334        for line in lines:
335            if line [0] != '#' and line[:5] != "#CMT>":
336                print line
337                return line
338 
339    def print_dependencies(self):
340        print '# ------------------------' 
341        print '# package --> dependencies' 
342        print '# ------------------------' 
343        for key in self.packages.keys():
344            print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                       
345
346    def print_status(self, status):
347        print '# ------------------------' 
348        print '# package --> dependencies' 
349        print '# ------------------------' 
350        i = 1
351        for key in self.packages.keys():
352            if self.packages[key] ['status'] == status:
353                print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                 
354                i = i + 1
355           
356    def is_work_unit_waiting (self, name):
357        return self.packages[name] ['status'] == 'waiting'
358
359    def set_work_unit_status (self, name, status):
360        self.packages[name] ['status'] = status
361
362    def get_dependencies (self, name):
363        return self.packages[name] ['uses']
364   
365    def get_next_work_units (self):
366        result = list ()
367        for key in self.packages.keys():
368            if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
369                result.append(key)
370        return result
371
372    def is_work_units (self):
373        result = list ()
374        for key in self.packages.keys():
375            if self.is_work_unit_waiting(key) :
376                return True
377        return False       
378
379    def suppress_work_unit (self, name):
380        #print '# remove', name, 'from schedule'
381        for key in self.packages.keys():
382            if name in self.packages[key]['uses']:
383                self.packages[key]['uses'].remove(name)
384
385    def add_work_unit (self, name, cmd):
386        if self.is_work_unit_waiting (name):
387            # we create requests
388            arg = {'cmd': cmd , 'package':name}
389            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback) 
390            # then we put the work request in the queue...
391            self.set_work_unit_status (name, 'queued')
392            self.pool.putRequest(req)
393            #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
394
395    def execute (self, command):
396        #self.print_dependencies ()
397        packages = self.get_next_work_units()
398        if len(packages) !=0:
399            print '\n#--------------------------------------------------------------'   
400            print '# Execute parallel actions within packages', packages                     
401            for package in packages:
402                self.add_work_unit (package, command)
403
404    def execute_all(self,command):
405        #self.print_dependencies ()
406        self.execute (command)
407        self.wait()
408        #self.print_dependencies ()
409        #self.print_status (status='waiting')       
410        #while self.is_work_units():
411        #self.wait()           
412       
413    def wait (self):
414       self.pool.wait()   
415
416    # this will be called each time a result is available
417    def result_callback(self, request, result):
418      #print "**Result: %s from request #%s" % (str(result), request.requestID)
419      #print "# Result: %s from request #%s" % (result['package'], request.requestID)
420      #if result['package'] == 'CodeCheck':
421      #    sys.exit(-1)
422      self.execute (result['cmd'])   
423
424    # the work the threads will have to do
425    def do_execute(self, arg):
426      path = self.get_work_area_path (arg['package'])
427      if path == None or not os.path.exists(path):
428          raise RuntimeError('Path to package '+ arg['package'] +' not found')
429      self.set_work_unit_status (arg['package'], 'running')     
430      #cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"     
431      #os.chdir(path)
432      #arg['cmd'] = "cd "+ path +";"+ arg['cmd']
433      header = '#--------------------------------------------------------------\n'
434      header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path + '\n'
435      header = header + '#--------------------------------------------------------------\n'
436      print header
437      project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
438      log_name     = string.replace(path, project_path, '')
439      log_name     = string.replace(log_name, '/cmt', '')
440      log_name     = string.replace(log_name, '/', '_')
441      log_name     = log_name+'.loglog'
442      arg['log']   = log_name
443      cmd = "cd "+ path +";"+ arg['cmd'] 
444      #status, output= commands.getstatusoutput(cmd)
445      # init output file
446
447      self.packages[arg['package']] ['startTime'] = time.time ()                           
448
449      if self.output is not None:
450           f1 = open (self.output+'/'+ log_name, 'w+')           
451           f1.write (header)
452           f1.close()
453           f1 = open (self.output+'/'+ log_name, 'a')
454           if self.error is not None:       
455               f2 = open (self.error+'/error'+log_name, 'w+')               
456               Popen(cmd, shell=True, stdout=f1, stderr=f2).communicate()
457               f2.close() 
458           else:
459               Popen(cmd, shell=True, stdout=f1, stderr=f1).communicate()
460           f1.close()               
461      else:
462          Popen(cmd, shell=True).communicate()
463     
464      if not self.keep_going and status > 0:
465        sys.exit(status)   
466                     
467      self.packages[arg['package']] ['endTime'] = time.time ()
468      if self.perf:
469          self.semaphore.acquire ()       
470          f = open (self.perf, 'a')
471          f.write (arg['package']+" "+str(self.packages[arg['package']] ['startTime'])+" "+str(self.packages[arg['package']] ['endTime'] )+'\n') 
472          f.close()
473          self.semaphore.release()
474      self.suppress_work_unit (arg['package'])
475      self.set_work_unit_status (arg['package'], 'done')
476      # status, output= commands.getstatusoutput(cmd)
477      #if status != 0:
478      #   raise RuntimeError(output)
479      return {'cmd': arg['cmd'], 'package':arg['package']}
480
481             
482    # this will be called when an exception occurs within a thread
483    def handle_exception(self, request, exc_info):
484        #traceback.print_stack()
485      print '#--------------------------------------------------------------'       
486      #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
487      if exc_info[0]== exceptions.SystemExit:
488        print "Stop execution (No_keep_going option enabled): exit code == %s " %(exc_info[1]) 
489        print '#--------------------------------------------------------------'   
490        sys.exit(exc_info[1])
491      print "# Exception occured: %s" %(exc_info[1])     
492      print exc_info
493      print '#--------------------------------------------------------------'   
494      #sys.exit(-1)
495
496   
497    def generate_make (self, file, command):
498        makefile = open (file, 'w+')
499        makefile.write ('MAKE=make\n')
500        #MFLAGS= -j10
501        self.counter = len(self.packages)
502        self.recursive_make (self.current_package, command, makefile, len(self.packages))
503        makefile.close ()
504       
505    def recursive_make (self, package, command, makefile, indice,actions=list()):
506        lines = self.generate_action_make (package, command, indice)
507        makefile.write (lines)
508        #print lines               
509        for pkg in self.packages[package] ['uses']:
510            if pkg not in actions:
511                actions.append(pkg)
512                indice = indice - 1
513                self.counter = self.counter - 1
514                self.recursive_make(pkg, command,makefile, indice, actions)       
515       
516    def generate_action_make (self, package, command, indice):
517        lines = package + ' :: '       
518        # add dependencies
519        for pkg in self.packages[package] ['uses']:
520            lines = lines + ' ' + pkg           
521
522        # add the action itself
523        newcommand = string.replace (command, '<package>', package)       
524        if command =='':
525            newcommand='$(MAKE)'
526        lines = lines + '\n'
527        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
528        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'"\n'
529        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
530        lines = lines +  'ifdef LOCATION\n'
531        lines = lines +  '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'       
532        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n'
533        lines = lines +  '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n'
534        lines = lines + '\t+@cd ' + self.packages[package]['path']           
535        lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n'
536        lines = lines + 'else\n'
537        lines = lines + '\t+@cd ' + self.packages[package]['path']           
538        lines = lines + ' && ' + newcommand + '\n'
539        lines = lines + 'endif\n\n'               
540        return lines
541       
542#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.