Changeset 243 for tbroadcast
- Timestamp:
- Sep 1, 2006, 3:33:44 PM (18 years ago)
- Location:
- tbroadcast/v2
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
tbroadcast/v2/cmt/requirements
r232 r243 8 8 9 9 path_append PYTHONPATH "${tbroadcast_root}/python" 10 11 action tbroadcast "tbroadcast $(cmt_args)" -
tbroadcast/v2/python/tbroadcast.py
r242 r243 12 12 import string 13 13 import random 14 import os.path 14 15 import commands 15 16 import traceback … … 25 26 class Scheduler: 26 27 27 def __init__(self, num_workers=20 ):28 def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None): 28 29 self.pool = ThreadPool(num_workers=num_workers) 29 30 self.current_package = self.get_current_package() 31 self.current_project = {'name': None, 'path': None, 'version': None} 30 32 self.packages = {} 31 33 self.counter = 0 32 34 self.semaphore = BoundedSemaphore(1) 35 self.local = local 36 self.ignore_cycles = ignore_cycles 37 self.output = output 38 if output is not None: 39 if not os.path.exists (output): 40 print "path",output,"no exits" 41 sys.exit(-1) 42 if not os.path.isdir(output): 43 print "path",output,"no a valid directory" 44 sys.exit(-1) 45 46 self.get_current_project() 47 self.instanciate_packages (file) 48 if self.local: self.get_local_graph() 33 49 self.check_cycles() 34 self.instanciate_packages () 50 51 def get_current_project(self): 52 cmd = 'cmt show projects | grep current' 53 status, output = commands.getstatusoutput (cmd) 54 if status != 0: 55 print output 56 sys.exit(-1) 57 lines = string.split(output, '\n') 58 for line in lines: 59 if line!='' and line [0] != '#': 60 item = string.split (line, ' ') 61 self.current_project ['name'] = item[0] 62 self.current_project ['version'] = item[1] 63 self.current_project ['path'] = item[3][:-1] 64 version = self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):] 65 if self.current_project ['version'] == version: 66 self.current_project ['path'] = os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )]) 67 #print self.current_project 35 68 36 69 def get_counter(self): … … 42 75 43 76 def check_cycles (self): 44 cmd = 'cmt show cycles'77 cmd = 'cmt -private show cycles' 45 78 cycle_found = False 46 79 status, output = commands.getstatusoutput (cmd) … … 49 82 sys.exit(-1) 50 83 lines = string.split(output, '\n') 84 cycles = list () 51 85 for line in lines: 52 if line!='' and line [0] != '#': 53 if not cycle_found: 54 cycle_found = True 55 print "# Error: cycles found, not possible to execute broadcast with threads. See the followings packages:" 56 print line 57 if cycle_found: 58 sys.exit(-1) 86 if line!='' and line [0] != '#': 87 cycles.append (string.split(line)) 88 cercles =list() 89 for cycle in cycles: 90 cycleInProject = True 91 for package in cycle: 92 if not self.packages.has_key(package): 93 cycleInProject = False 94 if cycleInProject: 95 cercles.append(cycle) 96 if len(cercles): 97 if not self.ignore_cycles: 98 print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:" 99 for cycle in cercles: 100 loop = "" 101 for package in cycle: 102 loop = loop + package + ' -> ' 103 print loop + '...' 104 sys.exit(-1) 105 else: 106 print "# Warning: There are cycles and you have selected the automatic suppress cycles mode" 107 for cycle in cercles: 108 loop = "" 109 for package in cycle: 110 loop = loop + package + ' -> ' 111 if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']: 112 print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0] 113 self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0]) 114 # sys.exit(-1) 59 115 60 116 def format_uses (self, content): … … 63 119 lines = list() 64 120 for ligne in lignes: 65 if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne not in ['# Selection :','#']:121 if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']: 66 122 lines.append(ligne) 67 123 lines.reverse() … … 118 174 print str(result) 119 175 sys.exit(-1) 120 self.packages[result[0]]['path'] = full_path 121 176 self.packages[result[0]]['path'] = os.path.normpath(full_path) 177 commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']]) 178 if os.path.normpath(commonprefix) == self.current_project ['path']: 179 #print result[0], ' belong to project', self.current_project ['name'] 180 self.packages[result[0]]['current_project'] = True 181 122 182 def get_uses(self, content): 123 183 # initiates variables 124 184 lignes = self.format_uses(content) 125 self.packages [self.current_package] = {'version': '*', 'client': list(),'uses': list(), 'status': 'waiting', 'path': os.getcwd()} 185 if not len(lignes): return 186 self.packages [self.current_package] = {'version': '*', 'client': list(), 187 'uses': list(), 'status': 'waiting', 188 'current_project': True, 'path': os.getcwd()} 126 189 previous_client = self.current_package 127 190 previous_level = 0 … … 134 197 version = string.split (ligne)[3] 135 198 if not self.packages.has_key (name): 136 self.packages [name] = {'version': version, 'uses': list(), 'client': list(), 'status': 'waiting', 'path': None} 137 if name not in self.packages[previous_client]['uses']: 199 self.packages [name] = {'version': version, 'uses': list(), 200 'client': list(), 'status': 'waiting', 201 'current_project': False, 'path': None} 202 if name not in self.packages[previous_client]['uses']:# and name != previous_client: 138 203 self.packages[previous_client]['uses'].append (name) 139 204 level_stack.append({'name':previous_client,'level':previous_level}) … … 144 209 #print ligne 145 210 current_level = string.find(ligne, 'use') 146 211 147 212 #self.packages [previous_client]['status'] ='queued' 148 213 # restore the level 149 item = level_stack.pop() 150 while item['level'] >= current_level: 151 item = level_stack.pop() 152 previous_client = item['name'] 153 previous_level = item['level'] 214 if len(lignes): 215 if len(level_stack): 216 item = level_stack.pop() 217 while item['level'] >= current_level and len(level_stack): 218 item = level_stack.pop() 219 previous_client = item['name'] 220 previous_level = item['level'] 154 221 #print previous_client, '-->',string.split (ligne)[2] 155 222 156 223 def instanciate_packages(self, file=None): 157 224 # We create the schedule of the work units 158 print '# First, we initialize the DAG by parsing cmt show uses'225 print '# First, we initialize the DAG by parsing "cmt show uses"' 159 226 if file is None: 160 227 cmd = 'cmt show uses' … … 170 237 #self.simulate_execution() 171 238 239 def get_local_graph(self): 240 To_remove = list() 241 for key in self.packages: 242 if self.packages[key]['current_project']== False: 243 for selected in self.packages: 244 if key in self.packages[selected]['uses']: 245 self.packages[selected]['uses'].remove(key) 246 To_remove.append (key) 247 for item in To_remove: 248 self.packages.pop(item) 249 172 250 def simulate_execution(self): 173 251 ok = True 174 indice = 1 252 indice = 1 175 253 while ok: 176 254 runnable = list() 177 checkable = list()178 255 for key in self.packages: 179 256 if self.packages[key]['status']!='done': 180 257 if len(self.packages[key]['uses']) == 0: 181 runnable.append(key) 258 runnable.append(key) 182 259 if len(runnable): 183 260 print '\n#--------------------------------------------------------------' 184 261 print "# Execute parallel actions within packages " + str(runnable) 185 #print 'checkable:',checkable186 262 for selected in runnable: 187 263 print '#--------------------------------------------------------------' … … 334 410 print '#--------------------------------------------------------------' 335 411 cmd = arg['cmd'] 336 status, output, error, pythonError = exeCommand(cmd, iTimeout = 10) 412 status, output, error, pythonError = exeCommand(cmd)#,iTimeout = 3600) 413 if self.output is not None: 414 f = open (self.output+'/'+arg['package']+'_output.log', 'w+') 415 f.write (output) 416 f.close() 417 f = open (self.output+'/'+arg['package']+'_error.log', 'w+') 418 f.write (str(error)) 419 f.close() 337 420 self.suppress_work_unit (arg['package']) 338 421 self.set_work_unit_status (arg['package'], 'done') -
tbroadcast/v2/scripts/tbroadcast
r242 r243 19 19 print '# <command>: command to execute' 20 20 print '# global options :' 21 print '# -nb=<num_worker>] : Total number of threads' 21 print '# -check : Check execution (deadlocks)' 22 print '# -f=<file> : Input file' 23 print '# -help : Print help' 24 print '# -local : ' 25 print '# -ignore_cycles : ' 26 print '# -nb=<num_worker> : Total number of threads' 27 print '# -output=<location> : Output directory to store output files with the form <package>.log' 28 print '# -print : Print dependency graph' 22 29 print '# -test : Simulate execution' 23 print '# -check : Check execution (deadlocks)'24 print '# -print : Print dependency graph'25 print '# -help] : Print help'26 30 27 31 if __name__ == '__main__': 28 32 # Default options 29 num_worker = 20 30 command = '' 31 test = False 32 check = False 33 print_graph = False 33 num_worker = 20 34 command = '' 35 test = False 36 check = False 37 print_graph = False 38 local = False 39 ignore_cycles = False 40 output = None 41 file = None 34 42 for arg in sys.argv[1:len(sys.argv)]: 35 43 if arg[0]=='-': 36 44 option = string.split(arg,'=')[0] 37 45 if option == '-nb': 38 num_worker = int (string.split(arg,'=')[1]) 39 elif option == '-test': 40 test = True 46 num_worker = int (string.split(arg,'=')[1]) 47 if option == '-f': 48 file = string.split(arg,'=')[1] 49 if option == '-output': 50 output = string.split(arg,'=')[1] 51 if option == '-local': 52 local= True 53 if option == '-ignore_cycles': 54 ignore_cycles = True 55 if option == '-help': 56 usage() 57 sys.exit(-1) 58 59 if option == '-test': 60 test = True 41 61 elif option == '-check': 42 check = True62 check = True 43 63 elif option == '-print': 44 print_graph = True 45 else: 46 usage() 47 sys.exit(-1) 64 print_graph = True 48 65 else: 49 66 command = arg 50 67 51 master = Scheduler (num_workers=num_worker )68 master = Scheduler (num_workers=num_worker, file=file, ignore_cycles=ignore_cycles, local=local, output=output) 52 69 if test: 53 70 master.simulate_execution()
Note: See TracChangeset
for help on using the changeset viewer.