source: tbroadcast/HEAD/python/tbroadcast.py @ 478

Last change on this file since 478 was 478, checked in by rybkin, 16 years ago

See C.L. 2

  • Property svn:executable set to *
File size: 25.5 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9import os
10import sys
11import time
12import string
13import random
14import os.path
15import commands
16import traceback
17import exceptions
18from threading import BoundedSemaphore
19
20from threadpool import WorkRequest
21from threadpool import ThreadPool
22from threadpool import NoResultsPending
23from threadpool import NoWorkersAvailable
24from threadpool import  makeRequests
25from executer   import  exeCommand
26from executer   import  getstatusoutput
27
28class Scheduler:
29
30    def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None, error=None, silent = False, perf=False, keep_going=True):
31        self.pool            = ThreadPool(num_workers=num_workers)
32        self.current_package = self.get_current_package()
33        self.current_project = {'name': None, 'path': None, 'version': None}
34        self.packages        = {}
35        self.counter         = 0
36        self.semaphore       = BoundedSemaphore(1)
37        self.local           = local
38        self.ignore_cycles   = ignore_cycles
39        self.output          = output
40        self.error           = error
41        self.silent          = silent
42        self.perf            = perf
43        self.keep_going      = keep_going
44        if self.perf is not False:
45            f = open (self.perf, 'w+')
46            f.close()
47        if output is not None:
48            if not os.path.exists (output):
49                print "path",output,"no exits"       
50                sys.exit(-1)
51            if not os.path.isdir(output):
52                print "path",output,"no a valid directory"       
53                sys.exit(-1)           
54               
55        self.get_current_project()
56        self.instanciate_packages (file)
57        if self.local: self.get_local_graph()
58        self.check_cycles()
59       
60#        status, output = commands.getstatusoutput("cmt broadcast -local 'echo <package>'")
61#        lignes = string.split(output, '\n')
62#        i = 1
63#        for package in lignes:
64#            if package!='' and package[0] != '#':                           
65#                print i , package
66#                i =i +1
67#                if not self.packages.has_key(package):
68#                                    print package       
69#        print len(self.packages)
70#        sys.exit(-1)
71
72    def get_current_project(self):
73        cmd = 'cmt show projects | grep current'
74        status, output = getstatusoutput (cmd)
75#        status, output = commands.getstatusoutput (cmd)
76        if status != 0:
77            print output
78            sys.exit(-1)   
79        lines = string.split(output, '\n')
80        for line in lines:           
81            if line!='' and line [0] != '#':                           
82                item  = string.split (line, ' ')
83                self.current_project ['name']    = item[0]
84                self.current_project ['version'] = item[1]
85                self.current_project ['path']    = item[3][:-1]
86                version =  self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
87                if  self.current_project ['version'] == version:
88                    self.current_project ['path'] =  os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
89                return
90                #print self.current_project     
91       
92    def get_counter(self):
93        self.semaphore.acquire ()       
94        self.counter = self.counter + 1
95        value = self.counter
96        self.semaphore.release()
97        return value
98       
99    def check_cycles (self):
100        cmd = 'cmt -private show cycles'
101        cycle_found = False
102        status, output = getstatusoutput (cmd)
103#        status, output = commands.getstatusoutput (cmd)
104        if status != 0:
105            print output
106            sys.exit(-1)   
107        lines = string.split(output, '\n')
108        cycles = list ()
109        for line in lines:           
110            if line!='' and line [0] != '#':                   
111               cycles.append (string.split(line)) 
112        cercles =list()       
113        for cycle in cycles:
114            cycleInProject = True
115            for package in cycle:           
116                if not self.packages.has_key(package):
117                    cycleInProject = False       
118            if cycleInProject: 
119              cercles.append(cycle)
120        if len(cercles):
121            if not self.ignore_cycles:
122                print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
123                for cycle in cercles:
124                    loop = ""                   
125                    for package in cycle:
126                        loop = loop + package + ' -> '
127                    print loop + '...'
128                sys.exit(-1)       
129            else:
130                print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
131                for cycle in cercles:
132                    loop = ""                   
133                    for package in cycle:
134                        loop = loop + package + ' -> '                     
135                    if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
136                        print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
137                        self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
138#                sys.exit(-1)
139
140    def format_uses (self, content):
141        # format variables
142        lignes  = string.split(content, '\n')
143        lines   = list()
144        for ligne in lignes:
145           if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
146               lines.append(ligne)
147        lines.reverse()
148        return lines
149
150    def format_paths (self, content):
151        # format variables
152        lignes  = string.split(content, '\n')
153        lines   = list()
154        for ligne in lignes:
155            if ligne[:4] == "use ":             
156               lines.append(ligne)
157        return lines
158
159    def get_paths (self, content):
160        lines = self.format_paths(content)
161        for line in lines:
162                result  = string.split (line[4:len(line)], ' ')
163                if  self.packages.has_key(result[0]):
164                    if len(result)==4:
165                        name, version, offset, path = string.split (line[4:len(line)], " ")
166                        #print name, version, offset, path
167                        #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
168                        if path == '(no_auto_imports)':
169                            path   = offset
170                            offset = ''
171                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
172                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
173                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
174                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
175                        else:
176                            print '# error path not found for', name
177                            sys.exit(-1)   
178                    elif len(result)==5:
179                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")                                       
180                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
181                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
182                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
183                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
184                        else:
185                            print '# error path not found for', name
186                            sys.exit(-1)                                                                                                   
187                    elif len(result)==3:
188                        name, version, path = string.split (line[4:len(line)], " ")
189                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
190                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
191                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):   
192                            full_path = path[1:-1] + '/' +name + + '/cmt'
193                        else:
194                            print '# error path not found for', name
195                            sys.exit(-1)
196                    else:
197                        print "error:",line
198                        print str(result)
199                        sys.exit(-1) 
200                    self.packages[result[0]]['path'] = os.path.normpath(full_path)
201                    commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
202                    if os.path.normpath(commonprefix) == self.current_project ['path']:                   
203                        #print result[0], ' belong to project', self.current_project ['name']
204                        self.packages[result[0]]['current_project'] = True
205
206    def get_uses(self, content):       
207        # initiates variables
208        lignes = self.format_uses(content)
209        if not len(lignes): return
210        self.packages [self.current_package] = {'version': '*', 'client': list(),
211                                                'uses': list(), 'status': 'waiting', 
212                                                'current_project': True, 'path': os.getcwd()}
213        previous_client = self.current_package
214        previous_level  = 0
215        level_stack    = [{'name':previous_client,'level':previous_level},]
216        ligne = lignes.pop()       
217        while len(lignes)!=0:   
218            current_level = string.find(ligne, 'use')
219            while current_level > previous_level:               
220                name    = string.split (ligne)[2]
221                version = string.split (ligne)[3]                             
222                if not self.packages.has_key (name):
223                  self.packages [name] = {'version': version, 'uses': list(), 
224                                          'client': list(), 'status': 'waiting', 
225                                          'current_project': False, 'path': None}
226                if name not in self.packages[previous_client]['uses']:# and name != previous_client:
227                   self.packages[previous_client]['uses'].append (name)               
228                level_stack.append({'name':previous_client,'level':previous_level})
229                previous_client = name 
230                previous_level = current_level                 
231                if len(lignes):
232                    ligne = lignes.pop()
233                    #print ligne
234                    current_level = string.find(ligne, 'use')
235                                           
236            #self.packages [previous_client]['status'] ='queued'
237            # restore the level
238            if len(lignes):
239                if len(level_stack):                       
240                    item = level_stack.pop()               
241                    while item['level'] >= current_level and len(level_stack):
242                             item = level_stack.pop()
243                    previous_client = item['name']
244                    previous_level  = item['level']
245            #print previous_client, '-->',string.split (ligne)[2]
246
247    def instanciate_packages(self, file=None):
248        # We create the schedule of the work units
249        print '# First, we initialize the DAG by parsing "cmt show uses"'
250        if file is None:
251            cmd  = 'cmt show uses'
252        else:   
253            cmd = 'cat ' + file       
254        status, output = getstatusoutput (cmd)
255#        status, output = commands.getstatusoutput (cmd)
256        if status != 0:
257            print output
258            sys.exit(-1)
259        self.get_uses(output)   
260        self.get_paths(output)
261        #self.check_execution (package=self.current_package)
262        #self.simulate_execution()
263
264    def get_local_graph(self):
265        To_remove = list()
266        for key in self.packages:
267            if self.packages[key]['current_project']== False:
268                for selected in self.packages:
269                    if key in self.packages[selected]['uses']:
270                       self.packages[selected]['uses'].remove(key)
271                To_remove.append (key) 
272        for item in To_remove:
273             del self.packages[item]
274
275    def simulate_execution(self):
276       ok = True
277       indice = 1                     
278       while ok:
279           runnable  = list()
280           for key in self.packages:
281               if  self.packages[key]['status']!='done':
282                   if len(self.packages[key]['uses']) == 0:
283                       runnable.append(key)                                             
284           if len(runnable):
285               print '\n#--------------------------------------------------------------'
286               print "# Execute parallel actions within packages " + str(runnable) 
287           for selected in runnable:       
288               print '#--------------------------------------------------------------'
289               print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path']
290               print '#--------------------------------------------------------------'
291               self.packages[selected]['status']='done'       
292               indice = indice + 1
293               for key in self.packages:
294                   if selected in self.packages[key]['uses']:
295                       self.packages[key]['uses'].remove(selected)                               
296                       #print 'remove', selected, 'from',key
297           if len(runnable)==0:
298                           ok = False       
299               
300    def check_execution(self, package, path=list(), cycles=list()):
301        #print package,'-->',self.packages[package]['uses']
302        #print path
303        if package in path:
304            if path[path.index(package):] not in cycles:
305                print 'Cycles:',path[path.index(package):], package
306                cycles = cycles + path[path.index(package):]
307                sys.exit(-1)
308        path.append(package)
309        for item in self.packages[package]['uses']:
310              self.check_execution(package=item, path=path, cycles=cycles)
311              path.pop()       
312
313    def get_current_package(self):   
314        cmd = 'cmt show macro package'
315        status, output = getstatusoutput (cmd)
316#        status, output = commands.getstatusoutput (cmd)
317        if status != 0:
318            print output
319            sys.exit(-1)   
320        lines = string.split(output, '\n')
321        for line in lines:
322            if line [0] != '#':
323                start = string.find(line,"'")
324                end   = string.find(line[start+1:len(line)],"'")
325                return line [start+1:start+end+1]
326
327    def get_work_area_path (self, name):       
328        return self.packages [name]['path']
329       
330    def get_package_path (self, name):   
331        #return os.getcwd ()
332        cmd = 'cmt -use='+name+' run pwd'
333        status, output = getstatusoutput (cmd)
334#        status, output = commands.getstatusoutput (cmd)
335        if status != 0:
336            print output
337            sys.exit(-1)   
338        lines = string.split(output, '\n')
339        for line in lines:
340            if line [0] != '#' and line[:5] != "#CMT>":
341                print line
342                return line
343 
344    def print_dependencies(self):
345        print '# ------------------------' 
346        print '# package --> dependencies' 
347        print '# ------------------------' 
348        for key in self.packages.keys():
349            print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                       
350
351    def print_status(self, status):
352        print '# ------------------------' 
353        print '# package --> dependencies' 
354        print '# ------------------------' 
355        i = 1
356        for key in self.packages.keys():
357            if self.packages[key] ['status'] == status:
358                print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                 
359                i = i + 1
360           
361    def is_work_unit_waiting (self, name):
362        return self.packages[name] ['status'] == 'waiting'
363
364    def set_work_unit_status (self, name, status):
365        self.packages[name] ['status'] = status
366
367    def get_dependencies (self, name):
368        return self.packages[name] ['uses']
369   
370    def get_next_work_units (self):
371        result = list ()
372        for key in self.packages.keys():
373            if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
374                result.append(key)
375        return result
376
377    def is_work_units (self):
378        result = list ()
379        for key in self.packages.keys():
380            if self.is_work_unit_waiting(key) :
381                return True
382        return False       
383
384    def suppress_work_unit (self, name):
385        #print '# remove', name, 'from schedule'
386        for key in self.packages.keys():
387            if name in self.packages[key]['uses']:
388                self.packages[key]['uses'].remove(name)
389
390    def add_work_unit (self, name, cmd):
391        if self.is_work_unit_waiting (name):
392            # we create requests
393            arg = {'cmd': cmd , 'package':name}
394            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception) 
395            # then we put the work request in the queue...
396            self.set_work_unit_status (name, 'queued')
397            self.pool.putRequest(req)
398            #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
399
400    def execute (self, command):
401        #self.print_dependencies ()
402        packages = self.get_next_work_units()
403        if len(packages) !=0:
404            print '\n#--------------------------------------------------------------'   
405            print '# Execute parallel actions within packages', packages                     
406            for package in packages:
407                self.add_work_unit (package, command)
408
409    def execute_all(self,command):
410        #self.print_dependencies ()
411        self.execute (command)
412        self.wait()
413        #self.print_dependencies ()
414        #self.print_status (status='waiting')       
415        #while self.is_work_units():
416        #self.wait()           
417       
418    def wait (self):
419       self.pool.wait()   
420
421    # this will be called each time a result is available
422    def result_callback(self, request, result):
423      #print "**Result: %s from request #%s" % (str(result), request.requestID)
424      #print "# Result: %s from request #%s" % (result['package'], request.requestID)
425      #if result['package'] == 'CodeCheck':
426      #    sys.exit(-1)
427      self.execute (result['cmd'])   
428
429    # the work the threads will have to do
430    def do_execute(self, arg):
431      path = self.get_work_area_path (arg['package'])
432      if path == None or not os.path.exists(path):
433          raise RuntimeError('Path to package '+ arg['package'] +' not found')
434      self.set_work_unit_status (arg['package'], 'running')     
435      #cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"     
436      #os.chdir(path)
437      #arg['cmd'] = "cd "+ path +";"+ arg['cmd']
438      header = '#--------------------------------------------------------------\n'
439      header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path + '\n'
440      header = header + '#--------------------------------------------------------------\n'
441      print header
442      project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
443      log_name   =  string.replace(path, project_path, '')
444      log_name   = string.replace(log_name, '/cmt', '')
445      log_name   = string.replace(log_name, '/', '_')
446      log_name   = log_name+'.loglog'
447      arg['log'] = log_name
448      cmd = "cd "+ path +";"+ arg['cmd'] 
449      #status, output= commands.getstatusoutput(cmd)
450      # init output file
451      if self.output is not None:
452           f = open (self.output+'/'+ log_name, 'w+')
453           f.write (header)
454           f.close()     
455           if self.error is not None:
456               f = open (self.error+'/error'+log_name, 'w+')
457               f.close()
458      self.packages[arg['package']] ['startTime'] = time.time ()                           
459      status, output, error, pythonError  = exeCommand(sCmd=cmd, oLineCallback=self.redirectOutput, arg=arg)#,iTimeout = 3600)
460      if not self.keep_going and status > 0:
461        sys.exit(status)   
462                     
463      self.packages[arg['package']] ['endTime'] = time.time ()
464      if self.perf:
465          self.semaphore.acquire ()       
466          f = open (self.perf, 'a')
467          f.write (arg['package']+" "+str(self.packages[arg['package']] ['startTime'])+" "+str(self.packages[arg['package']] ['endTime'] )+'\n') 
468          f.close()
469          self.semaphore.release()
470      self.suppress_work_unit (arg['package'])
471      self.set_work_unit_status (arg['package'], 'done')
472      # status, output= commands.getstatusoutput(cmd)
473      #if status != 0:
474      #   raise RuntimeError(output)
475      return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
476
477    def redirectOutput(self, index, buffer, arg):
478        """Filter function to redirect the std output and error of the job
479           executable for real-time debugging
480        """
481        if self.output is not None:
482           if index==0:   
483               f = open (self.output+'/'+arg['log'], 'a')
484               f.write (buffer+'\n')
485               f.close()
486           elif index==1: 
487               if self.error is not None:
488                   f = open (self.error+'/error'+arg['log'], 'a')
489               else:
490                   f = open (self.output+'/'+arg['log'], 'a')                   
491               f.write (buffer+'\n')                   
492               f.close()                               
493        if not self.silent: 
494            print buffer
495             
496    # this will be called when an exception occurs within a thread
497    def handle_exception(self, request, exc_info):
498        #traceback.print_stack()
499      print '#--------------------------------------------------------------'       
500      #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
501      if exc_info[0]== exceptions.SystemExit:
502        print "Stop execution (No_keep_going option enabled): exit code == %s " %(exc_info[1]) 
503        print '#--------------------------------------------------------------'   
504        sys.exit(exc_info[1])
505      print "# Exception occured: %s" %(exc_info[1])     
506      print exc_info
507      print '#--------------------------------------------------------------'   
508      #sys.exit(-1)
509
510   
511    def generate_make (self, file, command):
512        makefile = open (file, 'w+')
513        makefile.write ('MAKE=make\n')
514        #MFLAGS= -j10
515        self.counter = len(self.packages)
516        self.recursive_make (self.current_package, command, makefile, len(self.packages))
517        makefile.close ()
518       
519    def recursive_make (self, package, command, makefile, indice,actions=list()):
520        lines = self.generate_action_make (package, command, indice)
521        makefile.write (lines)
522        #print lines               
523        for pkg in self.packages[package] ['uses']:
524            if pkg not in actions:
525                actions.append(pkg)
526                indice = indice - 1
527                self.counter = self.counter - 1
528                self.recursive_make(pkg, command,makefile, indice, actions)       
529       
530    def generate_action_make (self, package, command, indice):
531        lines = package + ' :: '       
532        # add dependencies
533        for pkg in self.packages[package] ['uses']:
534            lines = lines + ' ' + pkg           
535
536        # add the action itself
537        newcommand = string.replace (command, '<package>', package)       
538        if command =='':
539            newcommand='$(MAKE)'
540        lines = lines + '\n'
541        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
542        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'"\n'
543        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
544        lines = lines +  'ifdef LOCATION\n'
545        lines = lines +  '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'       
546        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n'
547        lines = lines +  '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n'
548        lines = lines + '\t+@cd ' + self.packages[package]['path']           
549        lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n'
550        lines = lines + 'else\n'
551        lines = lines + '\t+@cd ' + self.packages[package]['path']           
552        lines = lines + ' && ' + newcommand + '\n'
553        lines = lines + 'endif\n\n'               
554        return lines
555       
556#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.