source: tbroadcast/HEAD/python/tbroadcast.py @ 573

Last change on this file since 573 was 573, checked in by rybkin, 13 years ago

See C.L. 6

  • Property svn:executable set to *
File size: 23.7 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9# 21-Jul-2009 compile most used packages first; protect critical sections
10
11import os
12import sys
13import time
14import string
15import os.path
16# import commands
17import traceback
18import exceptions
19from threading import BoundedSemaphore
20
21from threadpool  import WorkRequest
22from threadpool  import ThreadPool
23
24from  subprocess import Popen
25
26class Scheduler:
27
28    def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, sort=False,
29                 output=None, error=None, silent = False, perf=False, keep_going=True):
30        self.pool            = ThreadPool(num_workers=num_workers, poll_timeout=3)
31        self.num_workers     = num_workers
32        self.current_project = {'name': None, 'path': None, 'version': None}
33        self.packages        = {}
34        self.counter         = 0
35        self.semaphore       = BoundedSemaphore(1)
36        self.local           = local
37        self.sort            = sort
38        self.ignore_cycles   = ignore_cycles
39        self.output          = output
40        self.error           = error
41        self.silent          = silent
42        self.perf            = perf
43        self.keep_going      = keep_going
44        if self.sort:
45            print "Compile packages sorted according to use count"
46        if self.perf is not False:
47            f = open (self.perf, 'w+')
48            f.close()
49        if output is not None:
50            if not os.path.exists (output):
51                print "path",output,"does not exists"
52                sys.exit(-1)
53            if not os.path.isdir(output):
54                print "path",output,"is not a valid directory"
55                sys.exit(-1)
56
57        # init cmt stuff
58        self.get_current_project()
59        self.current_package = self.get_current_package()
60        self.instanciate_packages (file)
61        if self.local: self.get_local_graph()
62        self.check_cycles()
63        self.get_use_count()
64
65    def get_current_project(self):
66        cmd = 'cmt show projects | grep current'
67        status, output = getstatusoutput (cmd)
68        #status, output = commands.getstatusoutput (cmd)
69        if status != 0:
70            print "WARNING: CMT exited with non-zero status!"
71            print output
72            sys.exit(-1)
73        lines = string.split(output, '\n')
74        for line in lines:
75            if line!='' and line [0] != '#':
76                item  = string.split (line, ' ')
77                self.current_project ['name']    = item[0]
78                self.current_project ['version'] = item[1]
79                self.current_project ['path']    = item[3][:-1]
80                version =  self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
81                if  self.current_project ['version'] == version:
82                    self.current_project ['path'] =  os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
83                return
84
85    def get_counter(self):
86        self.semaphore.acquire ()
87        self.counter = self.counter + 1
88        value = self.counter
89        self.semaphore.release()
90        return value
91
92    def check_cycles (self):
93        cmd = 'cmt -private show cycles'
94        cycle_found = False
95        status, output = getstatusoutput (cmd)
96        #status, output = commands.getstatusoutput (cmd)
97        if status != 0:
98            print "WARNING: CMT exited with non-zero status!"
99            print output
100            sys.exit(-1)
101        lines = string.split(output, '\n')
102        cycles = list()
103        for line in lines:
104            if line!='' and line [0] != '#':
105               cycles.append (string.split(line))
106        cercles =list()
107        for cycle in cycles:
108            cycleInProject = True
109            for package in cycle:
110                if not self.packages.has_key(package):
111                    cycleInProject = False
112            if cycleInProject:
113              cercles.append(cycle)
114        if len(cercles):
115            if not self.ignore_cycles:
116                print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
117                for cycle in cercles:
118                    loop = ""
119                    for package in cycle:
120                        loop = loop + package + ' -> '
121                    print loop + '...'
122                sys.exit(-1)
123            else:
124                print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
125                for cycle in cercles:
126                    loop = ""
127                    for package in cycle:
128                        loop = loop + package + ' -> '
129                    if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
130                        print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
131                        self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
132#                sys.exit(-1)
133
134    def format_uses (self, content):
135        # format variables
136        lignes  = string.split(content, '\n')
137        lines   = list()
138        for ligne in lignes:
139           if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
140               lines.append(ligne)
141        lines.reverse()
142        return lines
143
144    def format_paths (self, content):
145        # format variables
146        lignes  = string.split(content, '\n')
147        lines   = list()
148        for ligne in lignes:
149            if ligne[:4] == "use ":
150               lines.append(ligne)
151        return lines
152
153    def get_paths (self, content):
154        lines = self.format_paths(content)
155        for line in lines:
156                result  = string.split (line[4:len(line)], ' ')
157                if  self.packages.has_key(result[0]):
158                    if len(result)==4:
159                        name, version, offset, path = string.split (line[4:len(line)], " ")
160                        #print name, version, offset, path
161                        #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
162                        if path == '(no_auto_imports)':
163                            path   = offset
164                            offset = ''
165                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
166                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
167                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
168                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
169                        else:
170                            print '# error path not found for', name
171                            sys.exit(-1)
172                    elif len(result)==5:
173                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")
174                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
175                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
176                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
177                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
178                        else:
179                            print '# error path not found for', name
180                            sys.exit(-1)
181                    elif len(result)==3:
182                        name, version, path = string.split (line[4:len(line)], " ")
183                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
184                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
185                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):
186                            full_path = path[1:-1] + '/' +name + + '/cmt'
187                        else:
188                            print '# error path not found for', name
189                            sys.exit(-1)
190                    else:
191                        print "error:",line
192                        print str(result)
193                        sys.exit(-1)
194                    self.packages[result[0]]['path'] = os.path.normpath(full_path)
195                    commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
196                    if os.path.normpath(commonprefix) == self.current_project ['path']:
197                        #print result[0], ' belong to project', self.current_project ['name']
198                        self.packages[result[0]]['current_project'] = True
199
200    def get_uses(self, content):
201        # initiates variables
202        lignes = self.format_uses(content)
203        if not len(lignes): return
204        self.packages [self.current_package] = {'version': '*', 'use_count': 0,
205                                                'uses': list(), 'status': 'waiting',
206                                                'current_project': True, 'path': os.getcwd()}
207        previous_client = self.current_package
208        previous_level  = 0
209        level_stack    = [{'name':previous_client,'level':previous_level},]
210        ligne = lignes.pop()
211        current_level = string.find(ligne, 'use')
212        while True:
213        #while len(lignes)!=0:
214            #current_level = string.find(ligne, 'use')
215            while current_level > previous_level:
216                name    = string.split (ligne)[2]
217                version = string.split (ligne)[3]
218                if not self.packages.has_key (name):
219                  self.packages [name] = {'version': version, 'use_count': 0,
220                                          'uses': list(), 'status': 'waiting',
221                                          'current_project': False, 'path': None}
222                if name not in self.packages[previous_client]['uses']:# and name != previous_client:
223                   self.packages[previous_client]['uses'].append (name)
224                level_stack.append({'name':previous_client,'level':previous_level})
225                previous_client = name
226                previous_level = current_level
227                if len(lignes):
228                    ligne = lignes.pop()
229                    #print ligne
230                    current_level = string.find(ligne, 'use')
231                else: return
232
233            # restore the level
234            #if len(lignes):
235            if len(level_stack):
236                item = level_stack.pop()
237                while item['level'] >= current_level and len(level_stack):
238                    item = level_stack.pop()
239                previous_client = item['name']
240                previous_level  = item['level']
241
242            if current_level <= previous_level: return
243            #print previous_client, '-->',string.split (ligne)[2]
244
245    def instanciate_packages(self, file=None):
246        # We create the schedule of the work units
247        print '# First, we initialize the DAG by parsing "cmt show uses"'
248        if file is None:
249            cmd  = 'cmt show uses'
250        else:
251            cmd = 'cat ' + file
252        status, output = getstatusoutput (cmd)
253        #status, output = commands.getstatusoutput (cmd)
254        if status != 0:
255            print "WARNING: CMT exited with non-zero status!"
256            #sys.exit(-1)
257        self.get_uses(output)
258        self.get_paths(output)
259        #self.simulate_execution()
260
261    def get_use_count(self):
262        for key in self.packages:
263            count = 0
264            for parent in self.packages:
265                if key in self.packages[parent]['uses']: count += 1
266            self.packages[key]['use_count'] = count
267            # print "Package",key,"use_count",count
268
269    def get_local_graph(self):
270        To_remove = list()
271        for key in self.packages:
272            if self.packages[key]['current_project']== False:
273                for selected in self.packages:
274                    if key in self.packages[selected]['uses']:
275                       self.packages[selected]['uses'].remove(key)
276                To_remove.append (key)
277        for item in To_remove:
278            del self.packages[item]
279
280    def simulate_execution(self):
281        while True:
282            ndone = self.simulate_requests()
283            if ndone == 0: break
284
285    def simulate_requests(self):
286        runnable = self.get_next_work_units()
287        if len(runnable):
288            print '\n#--------------------------------------------------------------'
289            print "# Execute parallel actions within packages - total", len(runnable)
290            print '#--------------------------------------------------------------'
291        for selected in runnable:
292            use_count = self.packages[selected]['use_count']
293            path = self.packages[selected]['path']
294            print '#--------------------------------------------------------------'
295            print '# (%d/%d %d) Now trying [] in %s' % (self.get_counter(), len(self.packages), use_count, path)
296            print '#--------------------------------------------------------------'
297            self.suppress_work_unit(selected)
298        return len(runnable)
299
300    def get_current_package(self):
301        cmd            = 'cmt show macro package'
302        status, output = getstatusoutput (cmd)
303        #status, output = commands.getstatusoutput (cmd)
304        if status != 0:
305            print "WARNING: CMT exited with non-zero status!"
306            print output
307            sys.exit(-1)
308        lines = string.split(output, '\n')
309        for line in lines:
310            if line [0] != '#':
311                start = string.find(line,"'")
312                end   = string.find(line[start+1:len(line)],"'")
313                return line [start+1:start+end+1]
314
315    def print_dependencies(self):
316        print '# -------------------------------------------'
317        print '# package --> dependencies, status, use_count'
318        print '# -------------------------------------------'
319        for key in self.packages.keys():
320            print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status'],',', self.packages[key] ['use_count']
321
322    def print_status(self, status):
323        print '# ------------------------'
324        print '# package --> dependencies'
325        print '# ------------------------'
326        i = 1
327        for key in self.packages.keys():
328            if self.packages[key] ['status'] == status:
329                print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']
330                i = i + 1
331
332    def is_work_unit_waiting (self, name):
333        return self.packages[name] ['status'] == 'waiting'
334
335    def set_work_unit_status (self, name, status):
336        self.packages[name] ['status'] = status
337
338    def get_next_work_units (self):
339        # by default returned list is in arbitrary order - may be this is better
340        # if self.sort is set returned list is sorted - most used packages first
341        runnable = list()
342        for key in self.packages:
343            if len(self.packages[key]['uses']) == 0 and self.packages[key]['status'] == 'waiting':
344                use_count = self.packages[key]['use_count']
345                runnable.append((use_count,key))
346        if self.sort:
347            runnable.sort()
348            runnable.reverse()
349        result = [ pair[1] for pair in runnable ]
350        return result
351
352    def suppress_work_unit (self, name):
353        #print '# remove', name, 'from schedule'
354        self.semaphore.acquire()
355        self.packages[name]['status']='done'
356        for key in self.packages.keys():
357            if name in self.packages[key]['uses']:
358                self.packages[key]['uses'].remove(name)
359        self.semaphore.release()
360
361    def add_work_unit (self, name, cmd):
362        if self.is_work_unit_waiting (name):
363            # we create requests
364            arg = {'cmd': cmd , 'package': name}
365            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback)
366#           req = WorkRequest(self.do_execute, [arg] , None,
367#                 callback=self.result_callback, exc_callback=self.handle_exception)
368            # then we put the work request in the queue...
369            self.set_work_unit_status (name, 'queued')
370            self.pool.putRequest(req)
371            # print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
372
373    def execute (self, command):
374        #self.print_dependencies ()
375        self.semaphore.acquire()
376        packages = self.get_next_work_units()
377        if len(packages):
378            print '\n#--------------------------------------------------------------'
379            print '# Execute parallel actions within packages (total',len(packages),')',packages
380            print '\n#--------------------------------------------------------------'
381            for package in packages:
382                self.add_work_unit (package, command)
383            sys.stdout.flush()
384        self.semaphore.release()
385
386    def execute_all(self,command):
387        #self.print_dependencies ()
388        self.execute (command)
389        self.wait()
390        if self.counter != len(self.packages):
391            print 'tbroadcast: warning: compiled',self.counter,'out of',len(self.packages),'packages'
392        self.pool.dismissWorkers(self.num_workers, do_join=True)
393        #self.print_dependencies ()
394        #self.print_status (status='waiting')
395
396    def wait (self):
397       self.pool.wait()
398
399    # this will be called each time a result is available
400    def result_callback(self, request, result):
401      #print "**Result: %s from request #%s" % (str(result), request.requestID)
402      #print "# Result: %s from request #%s" % (result['package'], request.requestID)
403      self.execute (result['cmd'])
404
405    # the work the threads will have to do
406    def do_execute(self, arg):
407      package = arg['package']
408      path = self.packages[package]['path']
409      if path == None or not os.path.exists(path):
410          raise RuntimeError('Path to package '+ package +' not found')
411      self.set_work_unit_status(package, 'running')
412      header =          '#--------------------------------------------------------------\n'
413      header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+arg['cmd']+'] in '+path+'\n'
414      header = header + '#--------------------------------------------------------------\n'
415      print header
416      sys.stdout.flush()
417      project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
418      log_name     = string.replace(path, project_path, '')
419      log_name     = string.replace(log_name, '/cmt', '')
420      log_name     = string.replace(log_name, '/', '_')
421      log_name     = log_name+'.loglog'
422      # arg['log']   = log_name
423      cmd = "cd "+ path +";"+ arg['cmd']
424      # status, output = commands.getstatusoutput(cmd)
425      # init output file
426
427      self.packages[package] ['startTime'] = time.time()
428
429      if self.output is not None:
430           f1 = open (self.output+'/'+ log_name, 'w+')
431           f1.write (header)
432           f1.flush()
433           if self.error is not None:
434               f2 = open (self.error+'/error'+log_name, 'w+')
435               fp = Popen(cmd, shell=True, stdout=f1, stderr=f2)
436               fp.communicate()
437               status = fp.wait()
438               f2.close()
439           else:
440               fp = Popen(cmd, shell=True, stdout=f1, stderr=f1)
441               fp.communicate()
442               status = fp.wait()
443           f1.close()
444      else:
445           fp = Popen(cmd, shell=True)
446           fp.communicate()
447           status = fp.wait()
448      sys.stdout.flush()
449      sys.stderr.flush()
450
451      # Error is not handled - exit() is forbidden here
452      if not self.keep_going and status > 0:
453          print 'Error',status,'for package',package
454          # sys.exit(status)
455
456      self.packages[package] ['endTime'] = time.time()
457      if self.perf:
458          self.semaphore.acquire()
459          f = open (self.perf, 'a')
460          f.write (package+" "+str(self.packages[package]['startTime'])+" "+str(self.packages[package]['endTime'] )+'\n')
461          f.close()
462          self.semaphore.release()
463      self.suppress_work_unit(package)
464      return {'cmd': arg['cmd'], 'package':arg['package']}
465
466
467    # this will be called when an exception occurs within a thread
468    def handle_exception(self, request, exc_info):
469      #traceback.print_stack()
470      print '#--------------------------------------------------------------'
471      #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
472      if exc_info[0]== exceptions.SystemExit:
473        print "Stop execution (No_keep_going option enabled): exit code == %s " %(exc_info[1])
474        print '#--------------------------------------------------------------'
475        sys.exit(exc_info[1])
476      print "# Exception occured: %s" %(exc_info[1])
477      print exc_info
478      print '#--------------------------------------------------------------'
479      #sys.exit(-1)
480
481
482    def generate_make (self, file, command):
483        makefile = open (file, 'w+')
484        makefile.write ('MAKE=make\n')
485        #MFLAGS= -j10
486        self.counter = len(self.packages)
487        self.recursive_make (self.current_package, command, makefile, len(self.packages))
488        makefile.close ()
489
490    def recursive_make (self, package, command, makefile, indice,actions=list()):
491        lines = self.generate_action_make (package, command, indice)
492        makefile.write (lines)
493        #print lines
494        for pkg in self.packages[package] ['uses']:
495            if pkg not in actions:
496                actions.append(pkg)
497                indice = indice - 1
498                self.counter = self.counter - 1
499                self.recursive_make(pkg, command,makefile, indice, actions)
500
501    def generate_action_make (self, package, command, indice):
502        lines = package + ' :: '
503        # add dependencies
504        for pkg in self.packages[package] ['uses']:
505            lines = lines + ' ' + pkg
506
507        # add the action itself
508        newcommand = string.replace (command, '<package>', package)
509        if command =='':
510            newcommand='$(MAKE)'
511        lines = lines + '\n'
512        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
513        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'"\n'
514        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
515        lines = lines +  'ifdef LOCATION\n'
516        lines = lines +  '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'
517        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n'
518        lines = lines +  '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n'
519        lines = lines + '\t+@cd ' + self.packages[package]['path']
520        lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n'
521        lines = lines + 'else\n'
522        lines = lines + '\t+@cd ' + self.packages[package]['path']
523        lines = lines + ' && ' + newcommand + '\n'
524        lines = lines + 'endif\n\n'
525        return lines
526
527
528# own copy of getstatusoutput
529def getstatusoutput(cmd):
530    """Return (status, stdout) of executing cmd in a shell.
531
532    A trailing line separator is removed from the output string.
533    The exit status of the command is encoded in the format specified for wait(),
534    when the exit status is zero (termination without errors), 0 is returned.
535    """
536    import os
537    p = os.popen(cmd, 'r')
538    out = p.read()
539    sts = p.close()
540    if sts is None: sts = 0
541    if out.endswith(os.linesep):
542        out = out[:out.rindex(os.linesep)]
543    return sts, out
544
545#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.