#----------------------------------# # -- Author: V.Garonne # -- Date: 08/25/2006 # -- Name: tbroadcast # -- Description: mainc class #----------------------------------# import os import sys import time import string import random import commands from threadpool import WorkRequest from threadpool import ThreadPool from threadpool import NoResultsPending from threadpool import NoWorkersAvailable from threadpool import makeRequests from executer import exeCommand class Scheduler: def __init__(self, num_workers=20): self.pool = ThreadPool(num_workers=num_workers) self.packages = {} self.check_cycles() self.instanciate_packages () def check_cycles (self): cmd = 'cmt show cycles' cycle_found = False status, output = commands.getstatusoutput (cmd) if status != 0: print output sys.exit(-1) lines = string.split(output, '\n') for line in lines: if line [0] != '#': if not cycle_found: cycle_found = True print "# Error: cycles found, not possible to execute broadcast with threads. See the followings packages:" print line if cycle_found: sys.exit(-1) def instanciate_packages(self): # We create the schedule of the work units print '# First, we initialize the DAG by parsing cmt show uses (takes a certain time... huk should be improved asap) ' cmd = 'cmt show uses' status, output = commands.getstatusoutput (cmd) if status != 0: print output sys.exit(-1) lines = string.split(output, '\n') current_package = self.get_current_package() self.packages [current_package] = {'version': '*', 'dependencies': list(), 'status': 'waiting', 'path':os.getcwd()} indice = 1 for line in lines: if line [0] == '#' and line[:5] != "#CMT>" and line not in ['# Selection :','#']: name = string.split (line)[2] version = string.split (line)[3] if name not in self.packages[current_package]['dependencies']: self.packages[current_package]['dependencies'].append (name) #print '\n#', indice,':: add package', name indice = indice + 1 if not self.packages.has_key (name): self.packages [name] = {'version': version, 'dependencies': list(), 'status': 'waiting', 'path': None} found = False for ligne in lines: if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne not in ['# Selection :','#']: if found: if level < string.find(ligne, 'use'): if string.split (ligne)[2] not in self.packages[name]['dependencies']: self.packages[name]['dependencies'].append (string.split (ligne)[2]) #print "# add dependency", string.split (ligne)[2], ' to package ',name else: found = False if name == string.split (ligne)[2]: level = string.find(ligne, 'use') found = True if line[:4] == "use ": result = string.split (line[4:len(line)], ' ') if self.packages.has_key(result[0]): if len(result)==4: name, version, offset, path = string.split (line[4:len(line)], " ") if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'): full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt' elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'): full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt' else: print '# error path not found for', name sys.exit(-1) elif len(result)==5: name, version, offset, path, importation = string.split (line[4:len(line)], " ") if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'): full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt' elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'): full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt' else: print '# error path not found for', name sys.exit(-1) elif len(result)==3: name, version, path = string.split (line[4:len(line)], " ") if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'): full_path = path[1:-1] + '/' +name + '/' + version + '/cmt' elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'): full_path = path[1:-1] + '/' +name + + '/cmt' else: print '# error path not found for', name sys.exit(-1) else: print "error:",line print str(result) sys.exit(-1) self.packages[result[0]]['path'] = full_path print '# really takes a certain time ... ' def get_current_package(self): cmd = 'cmt show macro package' status, output = commands.getstatusoutput (cmd) if status != 0: print output sys.exit(-1) lines = string.split(output, '\n') for line in lines: if line [0] != '#': start = string.find(line,"'") end = string.find(line[start+1:len(line)],"'") return line [start+1:start+end+1] def get_work_area_path (self, name): return self.packages [name]['path'] def get_package_path (self, name): #return os.getcwd () cmd = 'cmt -use='+name+' run pwd' status, output = commands.getstatusoutput (cmd) if status != 0: print output sys.exit(-1) lines = string.split(output, '\n') for line in lines: if line [0] != '#' and line[:5] != "#CMT>": print line return line def print_dependencies(self): print '# ------------------------' print '# package --> dependencies' print '# ------------------------' for key in self.packages.keys(): print key, '-->', self.packages[key] ['dependencies'],',', self.packages[key] ['status'] def is_work_unit_waiting (self, name): return self.packages[name] ['status'] == 'waiting' def set_work_unit_status (self, name, status): self.packages[name] ['status'] = status def get_dependencies (self, name): return self.packages[name] ['dependencies'] def get_next_work_units (self): result = list () for key in self.packages.keys(): if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) : result.append(key) return result def is_work_units (self): result = list () for key in self.packages.keys(): if self.is_work_unit_waiting(key) : return True return False def suppress_work_unit (self, name): print '# remove', name, 'from schedule' for key in self.packages.keys(): if name in self.packages[key]['dependencies']: self.packages[key]['dependencies'].remove(name) def add_work_unit (self, name, cmd): if self.is_work_unit_waiting (name): # we create requests arg = {'cmd': cmd , 'package':name} req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception) # then we put the work request in the queue... self.set_work_unit_status (name, 'queued') self.pool.putRequest(req) #print "# Work request #%s added on %s." % (req.requestID, str(arg['package'])) def execute (self, command): #self.print_dependencies () packages = self.get_next_work_units() if len(packages) !=0: print '\n# Execute parallel actions within ', packages for package in packages: self.add_work_unit (package, command) def execute_all(self,command): self.execute (command) self.wait() while self.is_work_units(): self.wait() def wait (self): self.pool.wait() # this will be called each time a result is available def result_callback(self, request, result): #print "**Result: %s from request #%s" % (str(result), request.requestID) print "# Result: %s from request #%s" % (result['package'], request.requestID) self.execute (result['cmd']) # the work the threads will have to do def do_execute(self, arg): path = self.get_work_area_path (arg['package']) print '#--------------------------------------------------------------' print '# Now trying ['+ arg['cmd']+'] in ' + path print '#--------------------------------------------------------------' self.set_work_unit_status (arg['package'], 'running') cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'" os.chdir(path) cmd = arg['cmd'] status, output, error, pythonError = exeCommand(cmd, iTimeout = 10) self.suppress_work_unit (arg['package']) self.set_work_unit_status (arg['package'], 'done') # status, output= commands.getstatusoutput(cmd) #print output #if status != 0: # raise RuntimeError(output) return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']} # this will be called when an exception occurs within a thread def handle_exception(self, request, exc_info): print "# Exception occured in request #%s: %s" % \ (request.requestID, exc_info[1]) #--------- EoF --------#