source: tbroadcast/v2.0.3/python/tbroadcast.py @ 316

Last change on this file since 316 was 316, checked in by garonne, 18 years ago

fixed a bug

  • Property svn:executable set to *
File size: 24.8 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9import os
10import sys
11import time
12import string
13import random
14import os.path
15import commands
16import traceback
17from threading import BoundedSemaphore
18
19from threadpool import WorkRequest
20from threadpool import ThreadPool
21from threadpool import NoResultsPending
22from threadpool import NoWorkersAvailable
23from threadpool import  makeRequests
24from executer   import  exeCommand
25
26class Scheduler:
27
28    def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None, error=None, silent = False, perf=False):
29        self.pool            = ThreadPool(num_workers=num_workers)
30        self.current_package = self.get_current_package()
31        self.current_project = {'name': None, 'path': None, 'version': None}
32        self.packages        = {}
33        self.counter         = 0
34        self.semaphore       = BoundedSemaphore(1)
35        self.local           = local
36        self.ignore_cycles   = ignore_cycles
37        self.output          = output
38        self.error           = error
39        self.silent          = silent
40        self.perf            = perf
41        if self.perf is not False:
42            f = open (self.perf, 'w+')
43            f.close()
44        if output is not None:
45            if not os.path.exists (output):
46                print "path",output,"no exits"       
47                sys.exit(-1)
48            if not os.path.isdir(output):
49                print "path",output,"no a valid directory"       
50                sys.exit(-1)           
51               
52        self.get_current_project()
53        self.instanciate_packages (file)
54        if self.local: self.get_local_graph()
55        self.check_cycles()
56       
57#        status, output = commands.getstatusoutput("cmt broadcast -local 'echo <package>'")
58#        lignes = string.split(output, '\n')
59#        i = 1
60#        for package in lignes:
61#            if package!='' and package[0] != '#':                           
62#                print i , package
63#                i =i +1
64#                if not self.packages.has_key(package):
65#                                    print package       
66#        print len(self.packages)
67#        sys.exit(-1)
68
69    def get_current_project(self):
70        cmd = 'cmt show projects | grep current'
71        status, output = commands.getstatusoutput (cmd)
72        if status != 0:
73            print output
74            sys.exit(-1)   
75        lines = string.split(output, '\n')
76        for line in lines:           
77            if line!='' and line [0] != '#':                           
78                item  = string.split (line, ' ')
79                self.current_project ['name']    = item[0]
80                self.current_project ['version'] = item[1]
81                self.current_project ['path']    = item[3][:-1]
82                version =  self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
83                if  self.current_project ['version'] == version:
84                    self.current_project ['path'] =  os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
85                return
86                #print self.current_project     
87       
88    def get_counter(self):
89        self.semaphore.acquire ()       
90        self.counter = self.counter + 1
91        value = self.counter
92        self.semaphore.release()
93        return value
94       
95    def check_cycles (self):
96        cmd = 'cmt -private show cycles'
97        cycle_found = False
98        status, output = commands.getstatusoutput (cmd)
99        if status != 0:
100            print output
101            sys.exit(-1)   
102        lines = string.split(output, '\n')
103        cycles = list ()
104        for line in lines:           
105            if line!='' and line [0] != '#':                   
106               cycles.append (string.split(line)) 
107        cercles =list()       
108        for cycle in cycles:
109            cycleInProject = True
110            for package in cycle:           
111                if not self.packages.has_key(package):
112                    cycleInProject = False       
113            if cycleInProject: 
114              cercles.append(cycle)
115        if len(cercles):
116            if not self.ignore_cycles:
117                print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
118                for cycle in cercles:
119                    loop = ""                   
120                    for package in cycle:
121                        loop = loop + package + ' -> '
122                    print loop + '...'
123                sys.exit(-1)       
124            else:
125                print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
126                for cycle in cercles:
127                    loop = ""                   
128                    for package in cycle:
129                        loop = loop + package + ' -> '                     
130                    if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
131                        print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
132                        self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
133#                sys.exit(-1)
134
135    def format_uses (self, content):
136        # format variables
137        lignes  = string.split(content, '\n')
138        lines   = list()
139        for ligne in lignes:
140           if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
141               lines.append(ligne)
142        lines.reverse()
143        return lines
144
145    def format_paths (self, content):
146        # format variables
147        lignes  = string.split(content, '\n')
148        lines   = list()
149        for ligne in lignes:
150            if ligne[:4] == "use ":             
151               lines.append(ligne)
152        return lines
153
154    def get_paths (self, content):
155        lines = self.format_paths(content)
156        for line in lines:
157                result  = string.split (line[4:len(line)], ' ')
158                if  self.packages.has_key(result[0]):
159                    if len(result)==4:
160                        name, version, offset, path = string.split (line[4:len(line)], " ")
161                        #print name, version, offset, path
162                        #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
163                        if path == '(no_auto_imports)':
164                            path   = offset
165                            offset = ''
166                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
167                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
168                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
169                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
170                        else:
171                            print '# error path not found for', name
172                            sys.exit(-1)   
173                    elif len(result)==5:
174                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")                                       
175                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
176                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
177                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
178                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
179                        else:
180                            print '# error path not found for', name
181                            sys.exit(-1)                                                                                                   
182                    elif len(result)==3:
183                        name, version, path = string.split (line[4:len(line)], " ")
184                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
185                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
186                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):   
187                            full_path = path[1:-1] + '/' +name + + '/cmt'
188                        else:
189                            print '# error path not found for', name
190                            sys.exit(-1)
191                    else:
192                        print "error:",line
193                        print str(result)
194                        sys.exit(-1) 
195                    self.packages[result[0]]['path'] = os.path.normpath(full_path)
196                    commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
197                    if os.path.normpath(commonprefix) == self.current_project ['path']:                   
198                        #print result[0], ' belong to project', self.current_project ['name']
199                        self.packages[result[0]]['current_project'] = True
200
201    def get_uses(self, content):       
202        # initiates variables
203        lignes = self.format_uses(content)
204        if not len(lignes): return
205        self.packages [self.current_package] = {'version': '*', 'client': list(),
206                                                'uses': list(), 'status': 'waiting', 
207                                                'current_project': True, 'path': os.getcwd()}
208        previous_client = self.current_package
209        previous_level  = 0
210        level_stack    = [{'name':previous_client,'level':previous_level},]
211        ligne = lignes.pop()       
212        while len(lignes)!=0:   
213            current_level = string.find(ligne, 'use')
214            while current_level > previous_level:               
215                name    = string.split (ligne)[2]
216                version = string.split (ligne)[3]                             
217                if not self.packages.has_key (name):
218                  self.packages [name] = {'version': version, 'uses': list(), 
219                                          'client': list(), 'status': 'waiting', 
220                                          'current_project': False, 'path': None}
221                if name not in self.packages[previous_client]['uses']:# and name != previous_client:
222                   self.packages[previous_client]['uses'].append (name)               
223                level_stack.append({'name':previous_client,'level':previous_level})
224                previous_client = name 
225                previous_level = current_level                 
226                if len(lignes):
227                    ligne = lignes.pop()
228                    #print ligne
229                    current_level = string.find(ligne, 'use')
230                                           
231            #self.packages [previous_client]['status'] ='queued'
232            # restore the level
233            if len(lignes):
234                if len(level_stack):                       
235                    item = level_stack.pop()               
236                    while item['level'] >= current_level and len(level_stack):
237                             item = level_stack.pop()
238                    previous_client = item['name']
239                    previous_level  = item['level']
240            #print previous_client, '-->',string.split (ligne)[2]
241
242    def instanciate_packages(self, file=None):
243        # We create the schedule of the work units
244        print '# First, we initialize the DAG by parsing "cmt show uses"'
245        if file is None:
246            cmd  = 'cmt show uses'
247        else:   
248            cmd = 'cat ' + file       
249        status, output = commands.getstatusoutput (cmd)
250        if status != 0:
251            print output
252            sys.exit(-1)
253        self.get_uses(output)   
254        self.get_paths(output)
255        #self.check_execution (package=self.current_package)
256        #self.simulate_execution()
257
258    def get_local_graph(self):
259        To_remove = list()
260        for key in self.packages:
261            if self.packages[key]['current_project']== False:
262                for selected in self.packages:
263                    if key in self.packages[selected]['uses']:
264                       self.packages[selected]['uses'].remove(key)
265                To_remove.append (key) 
266        for item in To_remove:
267             del self.packages[item]
268
269    def simulate_execution(self):
270       ok = True
271       indice = 1                     
272       while ok:
273           runnable  = list()
274           for key in self.packages:
275               if  self.packages[key]['status']!='done':
276                   if len(self.packages[key]['uses']) == 0:
277                       runnable.append(key)                                             
278           if len(runnable):
279               print '\n#--------------------------------------------------------------'
280               print "# Execute parallel actions within packages " + str(runnable) 
281           for selected in runnable:       
282               print '#--------------------------------------------------------------'
283               print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path']
284               print '#--------------------------------------------------------------'
285               self.packages[selected]['status']='done'       
286               indice = indice + 1
287               for key in self.packages:
288                   if selected in self.packages[key]['uses']:
289                       self.packages[key]['uses'].remove(selected)                               
290                       #print 'remove', selected, 'from',key
291           if len(runnable)==0:
292                           ok = False       
293               
294    def check_execution(self, package, path=list(), cycles=list()):
295        #print package,'-->',self.packages[package]['uses']
296        #print path
297        if package in path:
298            if path[path.index(package):] not in cycles:
299                print 'Cycles:',path[path.index(package):], package
300                cycles = cycles + path[path.index(package):]
301                sys.exit(-1)
302        path.append(package)
303        for item in self.packages[package]['uses']:
304              self.check_execution(package=item, path=path, cycles=cycles)
305              path.pop()       
306
307    def get_current_package(self):   
308        cmd = 'cmt show macro package'
309        status, output = commands.getstatusoutput (cmd)
310        if status != 0:
311            print output
312            sys.exit(-1)   
313        lines = string.split(output, '\n')
314        for line in lines:
315            if line [0] != '#':
316                start = string.find(line,"'")
317                end   = string.find(line[start+1:len(line)],"'")
318                return line [start+1:start+end+1]
319
320    def get_work_area_path (self, name):       
321        return self.packages [name]['path']
322       
323    def get_package_path (self, name):   
324        #return os.getcwd ()
325        cmd = 'cmt -use='+name+' run pwd'
326        status, output = commands.getstatusoutput (cmd)
327        if status != 0:
328            print output
329            sys.exit(-1)   
330        lines = string.split(output, '\n')
331        for line in lines:
332            if line [0] != '#' and line[:5] != "#CMT>":
333                print line
334                return line
335 
336    def print_dependencies(self):
337        print '# ------------------------' 
338        print '# package --> dependencies' 
339        print '# ------------------------' 
340        for key in self.packages.keys():
341            print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                       
342
343    def print_status(self, status):
344        print '# ------------------------' 
345        print '# package --> dependencies' 
346        print '# ------------------------' 
347        i = 1
348        for key in self.packages.keys():
349            if self.packages[key] ['status'] == status:
350                print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                 
351                i = i + 1
352           
353    def is_work_unit_waiting (self, name):
354        return self.packages[name] ['status'] == 'waiting'
355
356    def set_work_unit_status (self, name, status):
357        self.packages[name] ['status'] = status
358
359    def get_dependencies (self, name):
360        return self.packages[name] ['uses']
361   
362    def get_next_work_units (self):
363        result = list ()
364        for key in self.packages.keys():
365            if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
366                result.append(key)
367        return result
368
369    def is_work_units (self):
370        result = list ()
371        for key in self.packages.keys():
372            if self.is_work_unit_waiting(key) :
373                return True
374        return False       
375
376    def suppress_work_unit (self, name):
377        #print '# remove', name, 'from schedule'
378        for key in self.packages.keys():
379            if name in self.packages[key]['uses']:
380                self.packages[key]['uses'].remove(name)
381
382    def add_work_unit (self, name, cmd):
383        if self.is_work_unit_waiting (name):
384            # we create requests
385            arg = {'cmd': cmd , 'package':name}
386            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception) 
387            # then we put the work request in the queue...
388            self.set_work_unit_status (name, 'queued')
389            self.pool.putRequest(req)
390            #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
391
392    def execute (self, command):
393        #self.print_dependencies ()
394        packages = self.get_next_work_units()
395        if len(packages) !=0:
396            print '\n#--------------------------------------------------------------'   
397            print '# Execute parallel actions within packages', packages                     
398            for package in packages:
399                self.add_work_unit (package, command)
400
401    def execute_all(self,command):
402        #self.print_dependencies ()
403        self.execute (command)
404        self.wait()
405        #self.print_dependencies ()
406        #self.print_status (status='waiting')       
407        #while self.is_work_units():
408        #self.wait()           
409       
410    def wait (self):
411       self.pool.wait()   
412
413    # this will be called each time a result is available
414    def result_callback(self, request, result):
415      #print "**Result: %s from request #%s" % (str(result), request.requestID)
416      #print "# Result: %s from request #%s" % (result['package'], request.requestID)
417      #if result['package'] == 'CodeCheck':
418      #    sys.exit(-1)
419      self.execute (result['cmd'])   
420
421    # the work the threads will have to do
422    def do_execute(self, arg):
423      path = self.get_work_area_path (arg['package'])
424      if path == None or not os.path.exists(path):
425          raise RuntimeError('Path to package '+ arg['package'] +' not found')
426      self.set_work_unit_status (arg['package'], 'running')     
427      #cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"     
428      #os.chdir(path)
429      #arg['cmd'] = "cd "+ path +";"+ arg['cmd']
430      header = '#--------------------------------------------------------------\n'
431      header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path + '\n'
432      header = header + '#--------------------------------------------------------------\n'
433      print header
434      project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
435      log_name   =  string.replace(path, project_path, '')
436      log_name   = string.replace(log_name, '/cmt', '')
437      log_name   = string.replace(log_name, '/', '_')
438      log_name   = log_name+'.loglog'
439      arg['log'] = log_name
440      cmd = "cd "+ path +";"+ arg['cmd'] 
441      #status, output= commands.getstatusoutput(cmd)
442      # init output file
443      if self.output is not None:
444           f = open (self.output+'/'+ log_name, 'w+')
445           f.write (header)
446           f.close()     
447           if self.error is not None:
448               f = open (self.error+'/error'+log_name, 'w+')
449               f.close()
450      self.packages[arg['package']] ['startTime'] = time.time ()                           
451      status, output, error, pythonError  = exeCommand(sCmd=cmd, oLineCallback=self.redirectOutput, arg=arg)#,iTimeout = 3600)
452      self.packages[arg['package']] ['endTime'] = time.time ()
453      if self.perf:
454          self.semaphore.acquire ()       
455          f = open (self.perf, 'a')
456          f.write (arg['package']+" "+str(self.packages[arg['package']] ['startTime'])+" "+str(self.packages[arg['package']] ['endTime'] )+'\n') 
457          f.close()
458          self.semaphore.release()
459      self.suppress_work_unit (arg['package'])
460      self.set_work_unit_status (arg['package'], 'done')
461      # status, output= commands.getstatusoutput(cmd)
462      #if status != 0:
463      #   raise RuntimeError(output)
464      return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
465
466    def redirectOutput(self, index, buffer, arg):
467        """Filter function to redirect the std output and error of the job
468           executable for real-time debugging
469        """
470        if self.output is not None:
471           if index==0:   
472               f = open (self.output+'/'+arg['log'], 'a')
473               f.write (buffer+'\n')
474               f.close()
475           elif index==1: 
476               if self.error is not None:
477                   f = open (self.error+'/error'+arg['log'], 'a')
478               else:
479                   f = open (self.output+'/'+arg['log'], 'a')                   
480               f.write (buffer+'\n')                   
481               f.close()                               
482        if not self.silent: 
483            print buffer
484             
485    # this will be called when an exception occurs within a thread
486    def handle_exception(self, request, exc_info):
487        #traceback.print_stack()
488      print '#--------------------------------------------------------------'       
489      #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
490      print "# Exception occured: %s" %(exc_info[1])
491      print exc_info
492      print '#--------------------------------------------------------------'   
493      #sys.exit(-1)
494
495   
496    def generate_make (self, file, command):
497        makefile = open (file, 'w+')
498        makefile.write ('MAKE=make\n')
499        #MFLAGS= -j10
500        self.counter = len(self.packages)
501        self.recursive_make (self.current_package, command, makefile, len(self.packages))
502        makefile.close ()
503       
504    def recursive_make (self, package, command, makefile, indice,actions=list()):
505        lines = self.generate_action_make (package, command, indice)
506        makefile.write (lines)
507        #print lines               
508        for pkg in self.packages[package] ['uses']:
509            if pkg not in actions:
510                actions.append(pkg)
511                indice = indice - 1
512                self.counter = self.counter - 1
513                self.recursive_make(pkg, command,makefile, indice, actions)       
514       
515    def generate_action_make (self, package, command, indice):
516        lines = package + ' :: '       
517        # add dependencies
518        for pkg in self.packages[package] ['uses']:
519            lines = lines + ' ' + pkg           
520
521        # add the action itself
522        newcommand = string.replace (command, '<package>', package)       
523        if command =='':
524            newcommand='$(MAKE)'
525        lines = lines + '\n'
526        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
527        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'"\n'
528        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
529        lines = lines +  'ifdef LOCATION\n'
530        lines = lines +  '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'       
531        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n'
532        lines = lines +  '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n'
533        lines = lines + '\t+@cd ' + self.packages[package]['path']           
534        lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n'
535        lines = lines + 'else\n'
536        lines = lines + '\t+@cd ' + self.packages[package]['path']           
537        lines = lines + ' && ' + newcommand + '\n'
538        lines = lines + 'endif\n\n'               
539        return lines
540       
541#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.