source: tbroadcast/HEAD/python/tbroadcast.py @ 517

Last change on this file since 517 was 517, checked in by rybkin, 15 years ago

Version v2.0.6_rc4 from Igor Kachaev

  • Property svn:executable set to *
File size: 23.4 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9# 21-Jul-2009 compile most used packages first; protect critical sections
10
11import os
12import sys
13import time
14import string
15import os.path
16# import commands
17import traceback
18import exceptions
19from threading import BoundedSemaphore
20
21from threadpool  import WorkRequest
22from threadpool  import ThreadPool
23
24from  subprocess import Popen
25
26class Scheduler:
27
28    def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, sort=False,
29                 output=None, error=None, silent = False, perf=False, keep_going=True):
30        self.pool            = ThreadPool(num_workers=num_workers, poll_timeout=3)
31        self.num_workers     = num_workers
32        self.current_project = {'name': None, 'path': None, 'version': None}
33        self.packages        = {}
34        self.counter         = 0
35        self.semaphore       = BoundedSemaphore(1)
36        self.local           = local
37        self.sort            = sort
38        self.ignore_cycles   = ignore_cycles
39        self.output          = output
40        self.error           = error
41        self.silent          = silent
42        self.perf            = perf
43        self.keep_going      = keep_going
44        if self.sort:
45            print "Compile packages sorted according to use count"
46        if self.perf is not False:
47            f = open (self.perf, 'w+')
48            f.close()
49        if output is not None:
50            if not os.path.exists (output):
51                print "path",output,"does not exists"
52                sys.exit(-1)
53            if not os.path.isdir(output):
54                print "path",output,"is not a valid directory"
55                sys.exit(-1)
56
57        # init cmt stuff
58        self.get_current_project()
59        self.current_package = self.get_current_package()
60        self.instanciate_packages (file)
61        if self.local: self.get_local_graph()
62        self.check_cycles()
63        self.get_use_count()
64
65    def get_current_project(self):
66        cmd = 'cmt show projects | grep current'
67        status, output = getstatusoutput (cmd)
68        #status, output = commands.getstatusoutput (cmd)
69        if status != 0:
70            print output
71            sys.exit(-1)
72        lines = string.split(output, '\n')
73        for line in lines:
74            if line!='' and line [0] != '#':
75                item  = string.split (line, ' ')
76                self.current_project ['name']    = item[0]
77                self.current_project ['version'] = item[1]
78                self.current_project ['path']    = item[3][:-1]
79                version =  self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
80                if  self.current_project ['version'] == version:
81                    self.current_project ['path'] =  os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
82                return
83
84    def get_counter(self):
85        self.semaphore.acquire ()
86        self.counter = self.counter + 1
87        value = self.counter
88        self.semaphore.release()
89        return value
90
91    def check_cycles (self):
92        cmd = 'cmt -private show cycles'
93        cycle_found = False
94        status, output = getstatusoutput (cmd)
95        #status, output = commands.getstatusoutput (cmd)
96        if status != 0:
97            print output
98            sys.exit(-1)
99        lines = string.split(output, '\n')
100        cycles = list()
101        for line in lines:
102            if line!='' and line [0] != '#':
103               cycles.append (string.split(line))
104        cercles =list()
105        for cycle in cycles:
106            cycleInProject = True
107            for package in cycle:
108                if not self.packages.has_key(package):
109                    cycleInProject = False
110            if cycleInProject:
111              cercles.append(cycle)
112        if len(cercles):
113            if not self.ignore_cycles:
114                print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
115                for cycle in cercles:
116                    loop = ""
117                    for package in cycle:
118                        loop = loop + package + ' -> '
119                    print loop + '...'
120                sys.exit(-1)
121            else:
122                print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
123                for cycle in cercles:
124                    loop = ""
125                    for package in cycle:
126                        loop = loop + package + ' -> '
127                    if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
128                        print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
129                        self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
130#                sys.exit(-1)
131
132    def format_uses (self, content):
133        # format variables
134        lignes  = string.split(content, '\n')
135        lines   = list()
136        for ligne in lignes:
137           if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
138               lines.append(ligne)
139        lines.reverse()
140        return lines
141
142    def format_paths (self, content):
143        # format variables
144        lignes  = string.split(content, '\n')
145        lines   = list()
146        for ligne in lignes:
147            if ligne[:4] == "use ":
148               lines.append(ligne)
149        return lines
150
151    def get_paths (self, content):
152        lines = self.format_paths(content)
153        for line in lines:
154                result  = string.split (line[4:len(line)], ' ')
155                if  self.packages.has_key(result[0]):
156                    if len(result)==4:
157                        name, version, offset, path = string.split (line[4:len(line)], " ")
158                        #print name, version, offset, path
159                        #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
160                        if path == '(no_auto_imports)':
161                            path   = offset
162                            offset = ''
163                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
164                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
165                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
166                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
167                        else:
168                            print '# error path not found for', name
169                            sys.exit(-1)
170                    elif len(result)==5:
171                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")
172                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
173                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
174                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
175                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
176                        else:
177                            print '# error path not found for', name
178                            sys.exit(-1)
179                    elif len(result)==3:
180                        name, version, path = string.split (line[4:len(line)], " ")
181                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
182                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
183                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):
184                            full_path = path[1:-1] + '/' +name + + '/cmt'
185                        else:
186                            print '# error path not found for', name
187                            sys.exit(-1)
188                    else:
189                        print "error:",line
190                        print str(result)
191                        sys.exit(-1)
192                    self.packages[result[0]]['path'] = os.path.normpath(full_path)
193                    commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
194                    if os.path.normpath(commonprefix) == self.current_project ['path']:
195                        #print result[0], ' belong to project', self.current_project ['name']
196                        self.packages[result[0]]['current_project'] = True
197
198    def get_uses(self, content):
199        # initiates variables
200        lignes = self.format_uses(content)
201        if not len(lignes): return
202        self.packages [self.current_package] = {'version': '*', 'use_count': 0,
203                                                'uses': list(), 'status': 'waiting',
204                                                'current_project': True, 'path': os.getcwd()}
205        previous_client = self.current_package
206        previous_level  = 0
207        level_stack    = [{'name':previous_client,'level':previous_level},]
208        ligne = lignes.pop()
209        while len(lignes)!=0:
210            current_level = string.find(ligne, 'use')
211            while current_level > previous_level:
212                name    = string.split (ligne)[2]
213                version = string.split (ligne)[3]
214                if not self.packages.has_key (name):
215                  self.packages [name] = {'version': version, 'use_count': 0,
216                                          'uses': list(), 'status': 'waiting',
217                                          'current_project': False, 'path': None}
218                if name not in self.packages[previous_client]['uses']:# and name != previous_client:
219                   self.packages[previous_client]['uses'].append (name)
220                level_stack.append({'name':previous_client,'level':previous_level})
221                previous_client = name
222                previous_level = current_level
223                if len(lignes):
224                    ligne = lignes.pop()
225                    #print ligne
226                    current_level = string.find(ligne, 'use')
227
228            # restore the level
229            if len(lignes):
230                if len(level_stack):
231                    item = level_stack.pop()
232                    while item['level'] >= current_level and len(level_stack):
233                             item = level_stack.pop()
234                    previous_client = item['name']
235                    previous_level  = item['level']
236            #print previous_client, '-->',string.split (ligne)[2]
237
238    def instanciate_packages(self, file=None):
239        # We create the schedule of the work units
240        print '# First, we initialize the DAG by parsing "cmt show uses"'
241        if file is None:
242            cmd  = 'cmt show uses'
243        else:
244            cmd = 'cat ' + file
245        status, output = getstatusoutput (cmd)
246        #status, output = commands.getstatusoutput (cmd)
247        if status != 0:
248            print output
249            sys.exit(-1)
250        self.get_uses(output)
251        self.get_paths(output)
252        #self.simulate_execution()
253
254    def get_use_count(self):
255        for key in self.packages:
256            count = 0
257            for parent in self.packages:
258                if key in self.packages[parent]['uses']: count += 1
259            self.packages[key]['use_count'] = count
260            # print "Package",key,"use_count",count
261
262    def get_local_graph(self):
263        To_remove = list()
264        for key in self.packages:
265            if self.packages[key]['current_project']== False:
266                for selected in self.packages:
267                    if key in self.packages[selected]['uses']:
268                       self.packages[selected]['uses'].remove(key)
269                To_remove.append (key)
270        for item in To_remove:
271            del self.packages[item]
272
273    def simulate_execution(self):
274        while True:
275            ndone = self.simulate_requests()
276            if ndone == 0: break
277
278    def simulate_requests(self):
279        runnable = self.get_next_work_units()
280        if len(runnable):
281            print '\n#--------------------------------------------------------------'
282            print "# Execute parallel actions within packages - total", len(runnable)
283            print '#--------------------------------------------------------------'
284        for selected in runnable:
285            use_count = self.packages[selected]['use_count']
286            path = self.packages[selected]['path']
287            print '#--------------------------------------------------------------'
288            print '# (%d/%d %d) Now trying [] in %s' % (self.get_counter(), len(self.packages), use_count, path)
289            print '#--------------------------------------------------------------'
290            self.suppress_work_unit(selected)
291        return len(runnable)
292
293    def get_current_package(self):
294        cmd            = 'cmt show macro package'
295        status, output = getstatusoutput (cmd)
296        #status, output = commands.getstatusoutput (cmd)
297        if status != 0:
298            print output
299            sys.exit(-1)
300        lines = string.split(output, '\n')
301        for line in lines:
302            if line [0] != '#':
303                start = string.find(line,"'")
304                end   = string.find(line[start+1:len(line)],"'")
305                return line [start+1:start+end+1]
306
307    def print_dependencies(self):
308        print '# -------------------------------------------'
309        print '# package --> dependencies, status, use_count'
310        print '# -------------------------------------------'
311        for key in self.packages.keys():
312            print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status'],',', self.packages[key] ['use_count']
313
314    def print_status(self, status):
315        print '# ------------------------'
316        print '# package --> dependencies'
317        print '# ------------------------'
318        i = 1
319        for key in self.packages.keys():
320            if self.packages[key] ['status'] == status:
321                print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']
322                i = i + 1
323
324    def is_work_unit_waiting (self, name):
325        return self.packages[name] ['status'] == 'waiting'
326
327    def set_work_unit_status (self, name, status):
328        self.packages[name] ['status'] = status
329
330    def get_next_work_units (self):
331        # by default returned list is in arbitrary order - may be this is better
332        # if self.sort is set returned list is sorted - most used packages first
333        runnable = list()
334        for key in self.packages:
335            if len(self.packages[key]['uses']) == 0 and self.packages[key]['status'] == 'waiting':
336                use_count = self.packages[key]['use_count']
337                runnable.append((use_count,key))
338        if self.sort:
339            runnable.sort()
340            runnable.reverse()
341        result = [ pair[1] for pair in runnable ]
342        return result
343
344    def suppress_work_unit (self, name):
345        #print '# remove', name, 'from schedule'
346        self.semaphore.acquire()
347        self.packages[name]['status']='done'
348        for key in self.packages.keys():
349            if name in self.packages[key]['uses']:
350                self.packages[key]['uses'].remove(name)
351        self.semaphore.release()
352
353    def add_work_unit (self, name, cmd):
354        if self.is_work_unit_waiting (name):
355            # we create requests
356            arg = {'cmd': cmd , 'package': name}
357            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback)
358#           req = WorkRequest(self.do_execute, [arg] , None,
359#                 callback=self.result_callback, exc_callback=self.handle_exception)
360            # then we put the work request in the queue...
361            self.set_work_unit_status (name, 'queued')
362            self.pool.putRequest(req)
363            # print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
364
365    def execute (self, command):
366        #self.print_dependencies ()
367        self.semaphore.acquire()
368        packages = self.get_next_work_units()
369        if len(packages):
370            print '\n#--------------------------------------------------------------'
371            print '# Execute parallel actions within packages (total',len(packages),')',packages
372            print '\n#--------------------------------------------------------------'
373            for package in packages:
374                self.add_work_unit (package, command)
375            sys.stdout.flush()
376        self.semaphore.release()
377
378    def execute_all(self,command):
379        #self.print_dependencies ()
380        self.execute (command)
381        self.wait()
382        if self.counter != len(self.packages):
383            print 'tbroadcast: warning: compiled',self.counter,'out of',len(self.packages),'packages'
384        self.pool.dismissWorkers(self.num_workers, do_join=True)
385        #self.print_dependencies ()
386        #self.print_status (status='waiting')
387
388    def wait (self):
389       self.pool.wait()
390
391    # this will be called each time a result is available
392    def result_callback(self, request, result):
393      #print "**Result: %s from request #%s" % (str(result), request.requestID)
394      #print "# Result: %s from request #%s" % (result['package'], request.requestID)
395      self.execute (result['cmd'])
396
397    # the work the threads will have to do
398    def do_execute(self, arg):
399      package = arg['package']
400      path = self.packages[package]['path']
401      if path == None or not os.path.exists(path):
402          raise RuntimeError('Path to package '+ package +' not found')
403      self.set_work_unit_status(package, 'running')
404      header =          '#--------------------------------------------------------------\n'
405      header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+arg['cmd']+'] in '+path+'\n'
406      header = header + '#--------------------------------------------------------------\n'
407      print header
408      sys.stdout.flush()
409      project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
410      log_name     = string.replace(path, project_path, '')
411      log_name     = string.replace(log_name, '/cmt', '')
412      log_name     = string.replace(log_name, '/', '_')
413      log_name     = log_name+'.loglog'
414      # arg['log']   = log_name
415      cmd = "cd "+ path +";"+ arg['cmd']
416      # status, output = commands.getstatusoutput(cmd)
417      # init output file
418
419      self.packages[package] ['startTime'] = time.time()
420
421      if self.output is not None:
422           f1 = open (self.output+'/'+ log_name, 'w+')
423           f1.write (header)
424           f1.flush()
425           if self.error is not None:
426               f2 = open (self.error+'/error'+log_name, 'w+')
427               fp = Popen(cmd, shell=True, stdout=f1, stderr=f2)
428               fp.communicate()
429               status = fp.wait()
430               f2.close()
431           else:
432               fp = Popen(cmd, shell=True, stdout=f1, stderr=f1)
433               fp.communicate()
434               status = fp.wait()
435           f1.close()
436      else:
437           fp = Popen(cmd, shell=True)
438           fp.communicate()
439           status = fp.wait()
440      sys.stdout.flush()
441      sys.stderr.flush()
442
443      # Error is not handled - exit() is forbidden here
444      if not self.keep_going and status > 0:
445          print 'Error',status,'for package',package
446          # sys.exit(status)
447
448      self.packages[package] ['endTime'] = time.time()
449      if self.perf:
450          self.semaphore.acquire()
451          f = open (self.perf, 'a')
452          f.write (package+" "+str(self.packages[package]['startTime'])+" "+str(self.packages[package]['endTime'] )+'\n')
453          f.close()
454          self.semaphore.release()
455      self.suppress_work_unit(package)
456      return {'cmd': arg['cmd'], 'package':arg['package']}
457
458
459    # this will be called when an exception occurs within a thread
460    def handle_exception(self, request, exc_info):
461      #traceback.print_stack()
462      print '#--------------------------------------------------------------'
463      #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
464      if exc_info[0]== exceptions.SystemExit:
465        print "Stop execution (No_keep_going option enabled): exit code == %s " %(exc_info[1])
466        print '#--------------------------------------------------------------'
467        sys.exit(exc_info[1])
468      print "# Exception occured: %s" %(exc_info[1])
469      print exc_info
470      print '#--------------------------------------------------------------'
471      #sys.exit(-1)
472
473
474    def generate_make (self, file, command):
475        makefile = open (file, 'w+')
476        makefile.write ('MAKE=make\n')
477        #MFLAGS= -j10
478        self.counter = len(self.packages)
479        self.recursive_make (self.current_package, command, makefile, len(self.packages))
480        makefile.close ()
481
482    def recursive_make (self, package, command, makefile, indice,actions=list()):
483        lines = self.generate_action_make (package, command, indice)
484        makefile.write (lines)
485        #print lines
486        for pkg in self.packages[package] ['uses']:
487            if pkg not in actions:
488                actions.append(pkg)
489                indice = indice - 1
490                self.counter = self.counter - 1
491                self.recursive_make(pkg, command,makefile, indice, actions)
492
493    def generate_action_make (self, package, command, indice):
494        lines = package + ' :: '
495        # add dependencies
496        for pkg in self.packages[package] ['uses']:
497            lines = lines + ' ' + pkg
498
499        # add the action itself
500        newcommand = string.replace (command, '<package>', package)
501        if command =='':
502            newcommand='$(MAKE)'
503        lines = lines + '\n'
504        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
505        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'"\n'
506        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
507        lines = lines +  'ifdef LOCATION\n'
508        lines = lines +  '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'
509        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n'
510        lines = lines +  '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n'
511        lines = lines + '\t+@cd ' + self.packages[package]['path']
512        lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n'
513        lines = lines + 'else\n'
514        lines = lines + '\t+@cd ' + self.packages[package]['path']
515        lines = lines + ' && ' + newcommand + '\n'
516        lines = lines + 'endif\n\n'
517        return lines
518
519
520# own copy of getstatusoutput
521def getstatusoutput(cmd):
522    """Return (status, stdout) of executing cmd in a shell.
523
524    A trailing line separator is removed from the output string.
525    The exit status of the command is encoded in the format specified for wait(),
526    when the exit status is zero (termination without errors), 0 is returned.
527    """
528    import os
529    p = os.popen(cmd, 'r')
530    out = p.read()
531    sts = p.close()
532    if sts is None: sts = 0
533    if out.endswith(os.linesep):
534        out = out[:out.rindex(os.linesep)]
535    return sts, out
536
537#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.