Changeset 517 for tbroadcast
- Timestamp:
- Sep 7, 2009, 2:43:28 PM (15 years ago)
- Location:
- tbroadcast/HEAD
- Files:
-
- 2 deleted
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
tbroadcast/HEAD/Changelog
r508 r517 1 2009-08-11 Igor Kachaev <Igor.Katchaev@cern.ch> 2 * python/threadpool.py update to version 1.2.5 (2008-11-19) 3 * python/tbroadcast.py implement package sort by use count; 4 protect common structures with semaphore 5 * python/__init__.py remove from the package 6 * scripts/tbroadcast better help; option '-sort' to try sorting 7 8 2009-06-25 Igor Kachaev <Igor.Katchaev@cern.ch> 9 10 * scripts/tbroadcast Check correctness of options supplied; 11 do '-help' if empty command line 12 * python/tbroadcast.py add wait() after communicate() to prevent loop at the 13 end of job; disable -no_keep_going internally as it blocks; 14 use local copy of getstatusoutput() - else blocks at nightly 15 * cmt/requirements remove python dir from the path 16 1 17 2009-05-28 Vincent Garonne <vincent.garonne@cern.ch> 2 18 3 19 * scripts/tbroadcast Added the python version checking (>= 2.5) 4 20 * python/threadpool.py Upgraded version to 1.1 5 * python/tbroadcast.py: Replaced the execute method by communicate from the Popen py25 module to avoid deadlocks 6 due to any of the other OS pipe buffers filling up and blocking the child process 21 * python/tbroadcast.py: Replaced the execute method by communicate from the Popen py25 22 module to avoid deadlocks due to any of the other OS pipe buffers 23 filling up and blocking the child process 7 24 * python/executer.py Removed from the package 8 25 9 2007-03-22 Vincent Garonne <garonne@lal.in2p3.fr> 126 2007-03-22 Vincent Garonne <garonne@lal.in2p3.fr> 10 27 11 28 * scripts/tbroadcast, python/tbroadcast.py: Added the -no_keep_going option -
tbroadcast/HEAD/cmt/requirements
r243 r517 3 3 author Vincent Garonne <garonne@lal.in2p3.fr> 4 4 5 path_append PATH "${tbroadcast_root}/python"5 # path_append PATH "${tbroadcast_root}/python" 6 6 7 7 path_append PATH "${tbroadcast_root}/scripts" -
tbroadcast/HEAD/python/tbroadcast.py
r508 r517 2 2 # -- Author: V.Garonne 3 3 # -- Mail: garonne@lal.in2p3.fr 4 # -- Date: 08/25/2006 4 # -- Date: 08/25/2006 5 5 # -- Name: tbroadcast 6 6 # -- Description: main class 7 7 #----------------------------------# 8 9 # 21-Jul-2009 compile most used packages first; protect critical sections 8 10 9 11 import os … … 11 13 import time 12 14 import string 13 import random14 15 import os.path 15 import commands16 # import commands 16 17 import traceback 17 18 import exceptions … … 20 21 from threadpool import WorkRequest 21 22 from threadpool import ThreadPool 22 from threadpool import NoResultsPending23 from threadpool import NoWorkersAvailable24 from threadpool import makeRequests25 23 26 24 from subprocess import Popen … … 28 26 class Scheduler: 29 27 30 def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None, error=None, silent = False, perf=False, keep_going=True): 31 self.pool = ThreadPool(num_workers=num_workers) 32 self.current_package = self.get_current_package() 28 def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, sort=False, 29 output=None, error=None, silent = False, perf=False, keep_going=True): 30 self.pool = ThreadPool(num_workers=num_workers, poll_timeout=3) 31 self.num_workers = num_workers 33 32 self.current_project = {'name': None, 'path': None, 'version': None} 34 33 self.packages = {} … … 36 35 self.semaphore = BoundedSemaphore(1) 37 36 self.local = local 37 self.sort = sort 38 38 self.ignore_cycles = ignore_cycles 39 39 self.output = output … … 42 42 self.perf = perf 43 43 self.keep_going = keep_going 44 if self.sort: 45 print "Compile packages sorted according to use count" 44 46 if self.perf is not False: 45 47 f = open (self.perf, 'w+') … … 47 49 if output is not None: 48 50 if not os.path.exists (output): 49 print "path",output," no exits"51 print "path",output,"does not exists" 50 52 sys.exit(-1) 51 53 if not os.path.isdir(output): 52 print "path",output,"no a valid directory" 53 sys.exit(-1) 54 54 print "path",output,"is not a valid directory" 55 sys.exit(-1) 56 57 # init cmt stuff 55 58 self.get_current_project() 59 self.current_package = self.get_current_package() 56 60 self.instanciate_packages (file) 57 61 if self.local: self.get_local_graph() 58 62 self.check_cycles() 59 60 # status, output = commands.getstatusoutput("cmt broadcast -local 'echo <package>'") 61 # lignes = string.split(output, '\n') 62 # i = 1 63 # for package in lignes: 64 # if package!='' and package[0] != '#': 65 # print i , package 66 # i =i +1 67 # if not self.packages.has_key(package): 68 # print package 69 # print len(self.packages) 70 # sys.exit(-1) 63 self.get_use_count() 71 64 72 65 def get_current_project(self): 73 66 cmd = 'cmt show projects | grep current' 74 status, output = commands.getstatusoutput (cmd) 67 status, output = getstatusoutput (cmd) 68 #status, output = commands.getstatusoutput (cmd) 75 69 if status != 0: 76 70 print output 77 sys.exit(-1) 71 sys.exit(-1) 78 72 lines = string.split(output, '\n') 79 for line in lines: 80 if line!='' and line [0] != '#': 73 for line in lines: 74 if line!='' and line [0] != '#': 81 75 item = string.split (line, ' ') 82 76 self.current_project ['name'] = item[0] … … 87 81 self.current_project ['path'] = os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )]) 88 82 return 89 #print self.current_project 90 83 91 84 def get_counter(self): 92 self.semaphore.acquire () 85 self.semaphore.acquire () 93 86 self.counter = self.counter + 1 94 value = self.counter 87 value = self.counter 95 88 self.semaphore.release() 96 89 return value 97 90 98 91 def check_cycles (self): 99 92 cmd = 'cmt -private show cycles' 100 93 cycle_found = False 101 status, output = commands.getstatusoutput (cmd) 94 status, output = getstatusoutput (cmd) 95 #status, output = commands.getstatusoutput (cmd) 102 96 if status != 0: 103 97 print output 104 sys.exit(-1) 98 sys.exit(-1) 105 99 lines = string.split(output, '\n') 106 cycles = list 107 for line in lines: 108 if line!='' and line [0] != '#': 109 cycles.append (string.split(line)) 110 cercles =list() 100 cycles = list() 101 for line in lines: 102 if line!='' and line [0] != '#': 103 cycles.append (string.split(line)) 104 cercles =list() 111 105 for cycle in cycles: 112 106 cycleInProject = True 113 for package in cycle: 107 for package in cycle: 114 108 if not self.packages.has_key(package): 115 cycleInProject = False 116 if cycleInProject: 109 cycleInProject = False 110 if cycleInProject: 117 111 cercles.append(cycle) 118 112 if len(cercles): … … 120 114 print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:" 121 115 for cycle in cercles: 122 loop = "" 116 loop = "" 123 117 for package in cycle: 124 118 loop = loop + package + ' -> ' 125 119 print loop + '...' 126 sys.exit(-1) 120 sys.exit(-1) 127 121 else: 128 122 print "# Warning: There are cycles and you have selected the automatic suppress cycles mode" 129 123 for cycle in cercles: 130 loop = "" 124 loop = "" 131 125 for package in cycle: 132 loop = loop + package + ' -> ' 126 loop = loop + package + ' -> ' 133 127 if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']: 134 128 print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0] … … 151 145 lines = list() 152 146 for ligne in lignes: 153 if ligne[:4] == "use ": 147 if ligne[:4] == "use ": 154 148 lines.append(ligne) 155 149 return lines … … 169 163 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'): 170 164 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt' 171 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'): 165 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'): 172 166 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt' 173 167 else: 174 168 print '# error path not found for', name 175 sys.exit(-1) 169 sys.exit(-1) 176 170 elif len(result)==5: 177 name, version, offset, path, importation = string.split (line[4:len(line)], " ") 171 name, version, offset, path, importation = string.split (line[4:len(line)], " ") 178 172 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'): 179 173 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt' 180 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'): 174 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'): 181 175 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt' 182 176 else: 183 177 print '# error path not found for', name 184 sys.exit(-1) 178 sys.exit(-1) 185 179 elif len(result)==3: 186 180 name, version, path = string.split (line[4:len(line)], " ") 187 181 if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'): 188 182 full_path = path[1:-1] + '/' +name + '/' + version + '/cmt' 189 elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'): 183 elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'): 190 184 full_path = path[1:-1] + '/' +name + + '/cmt' 191 185 else: … … 195 189 print "error:",line 196 190 print str(result) 197 sys.exit(-1) 191 sys.exit(-1) 198 192 self.packages[result[0]]['path'] = os.path.normpath(full_path) 199 193 commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']]) 200 if os.path.normpath(commonprefix) == self.current_project ['path']: 194 if os.path.normpath(commonprefix) == self.current_project ['path']: 201 195 #print result[0], ' belong to project', self.current_project ['name'] 202 196 self.packages[result[0]]['current_project'] = True 203 197 204 def get_uses(self, content): 198 def get_uses(self, content): 205 199 # initiates variables 206 200 lignes = self.format_uses(content) 207 201 if not len(lignes): return 208 self.packages [self.current_package] = {'version': '*', ' client': list(),209 'uses': list(), 'status': 'waiting', 202 self.packages [self.current_package] = {'version': '*', 'use_count': 0, 203 'uses': list(), 'status': 'waiting', 210 204 'current_project': True, 'path': os.getcwd()} 211 205 previous_client = self.current_package 212 206 previous_level = 0 213 207 level_stack = [{'name':previous_client,'level':previous_level},] 214 ligne = lignes.pop() 215 while len(lignes)!=0: 208 ligne = lignes.pop() 209 while len(lignes)!=0: 216 210 current_level = string.find(ligne, 'use') 217 while current_level > previous_level: 211 while current_level > previous_level: 218 212 name = string.split (ligne)[2] 219 version = string.split (ligne)[3] 213 version = string.split (ligne)[3] 220 214 if not self.packages.has_key (name): 221 self.packages [name] = {'version': version, 'use s': list(),222 ' client': list(), 'status': 'waiting',215 self.packages [name] = {'version': version, 'use_count': 0, 216 'uses': list(), 'status': 'waiting', 223 217 'current_project': False, 'path': None} 224 218 if name not in self.packages[previous_client]['uses']:# and name != previous_client: 225 self.packages[previous_client]['uses'].append (name) 219 self.packages[previous_client]['uses'].append (name) 226 220 level_stack.append({'name':previous_client,'level':previous_level}) 227 previous_client = name 228 previous_level = current_level 221 previous_client = name 222 previous_level = current_level 229 223 if len(lignes): 230 224 ligne = lignes.pop() 231 225 #print ligne 232 226 current_level = string.find(ligne, 'use') 233 234 #self.packages [previous_client]['status'] ='queued' 227 235 228 # restore the level 236 229 if len(lignes): 237 if len(level_stack): 238 item = level_stack.pop() 230 if len(level_stack): 231 item = level_stack.pop() 239 232 while item['level'] >= current_level and len(level_stack): 240 233 item = level_stack.pop() … … 248 241 if file is None: 249 242 cmd = 'cmt show uses' 250 else: 251 cmd = 'cat ' + file 252 status, output = commands.getstatusoutput (cmd) 243 else: 244 cmd = 'cat ' + file 245 status, output = getstatusoutput (cmd) 246 #status, output = commands.getstatusoutput (cmd) 253 247 if status != 0: 254 248 print output 255 249 sys.exit(-1) 256 self.get_uses(output) 250 self.get_uses(output) 257 251 self.get_paths(output) 258 #self.check_execution (package=self.current_package)259 252 #self.simulate_execution() 253 254 def get_use_count(self): 255 for key in self.packages: 256 count = 0 257 for parent in self.packages: 258 if key in self.packages[parent]['uses']: count += 1 259 self.packages[key]['use_count'] = count 260 # print "Package",key,"use_count",count 260 261 261 262 def get_local_graph(self): … … 266 267 if key in self.packages[selected]['uses']: 267 268 self.packages[selected]['uses'].remove(key) 268 To_remove.append (key) 269 To_remove.append (key) 269 270 for item in To_remove: 270 271 del self.packages[item] 271 272 272 273 def simulate_execution(self): 273 ok = True 274 indice = 1 275 while ok: 276 runnable = list() 277 for key in self.packages: 278 if self.packages[key]['status']!='done': 279 if len(self.packages[key]['uses']) == 0: 280 runnable.append(key) 281 if len(runnable): 282 print '\n#--------------------------------------------------------------' 283 print "# Execute parallel actions within packages " + str(runnable) 284 for selected in runnable: 285 print '#--------------------------------------------------------------' 286 print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path'] 287 print '#--------------------------------------------------------------' 288 self.packages[selected]['status']='done' 289 indice = indice + 1 290 for key in self.packages: 291 if selected in self.packages[key]['uses']: 292 self.packages[key]['uses'].remove(selected) 293 #print 'remove', selected, 'from',key 294 if len(runnable)==0: 295 ok = False 296 297 def check_execution(self, package, path=list(), cycles=list()): 298 #print package,'-->',self.packages[package]['uses'] 299 #print path 300 if package in path: 301 if path[path.index(package):] not in cycles: 302 print 'Cycles:',path[path.index(package):], package 303 cycles = cycles + path[path.index(package):] 304 sys.exit(-1) 305 path.append(package) 306 for item in self.packages[package]['uses']: 307 self.check_execution(package=item, path=path, cycles=cycles) 308 path.pop() 309 310 def get_current_package(self): 274 while True: 275 ndone = self.simulate_requests() 276 if ndone == 0: break 277 278 def simulate_requests(self): 279 runnable = self.get_next_work_units() 280 if len(runnable): 281 print '\n#--------------------------------------------------------------' 282 print "# Execute parallel actions within packages - total", len(runnable) 283 print '#--------------------------------------------------------------' 284 for selected in runnable: 285 use_count = self.packages[selected]['use_count'] 286 path = self.packages[selected]['path'] 287 print '#--------------------------------------------------------------' 288 print '# (%d/%d %d) Now trying [] in %s' % (self.get_counter(), len(self.packages), use_count, path) 289 print '#--------------------------------------------------------------' 290 self.suppress_work_unit(selected) 291 return len(runnable) 292 293 def get_current_package(self): 311 294 cmd = 'cmt show macro package' 312 status, output = commands.getstatusoutput (cmd) 295 status, output = getstatusoutput (cmd) 296 #status, output = commands.getstatusoutput (cmd) 313 297 if status != 0: 314 298 print output 315 sys.exit(-1) 299 sys.exit(-1) 316 300 lines = string.split(output, '\n') 317 301 for line in lines: … … 321 305 return line [start+1:start+end+1] 322 306 323 def get_work_area_path (self, name):324 return self.packages [name]['path']325 326 def get_package_path (self, name):327 #return os.getcwd ()328 cmd = 'cmt -use='+name+' run pwd'329 status, output = commands.getstatusoutput (cmd)330 if status != 0:331 print output332 sys.exit(-1)333 lines = string.split(output, '\n')334 for line in lines:335 if line [0] != '#' and line[:5] != "#CMT>":336 print line337 return line338 339 307 def print_dependencies(self): 340 print '# ------------------------ '341 print '# package --> dependencies '342 print '# ------------------------ '308 print '# -------------------------------------------' 309 print '# package --> dependencies, status, use_count' 310 print '# -------------------------------------------' 343 311 for key in self.packages.keys(): 344 print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status'] 312 print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status'],',', self.packages[key] ['use_count'] 345 313 346 314 def print_status(self, status): 347 print '# ------------------------' 348 print '# package --> dependencies' 349 print '# ------------------------' 315 print '# ------------------------' 316 print '# package --> dependencies' 317 print '# ------------------------' 350 318 i = 1 351 319 for key in self.packages.keys(): 352 320 if self.packages[key] ['status'] == status: 353 print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status'] 321 print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status'] 354 322 i = i + 1 355 323 356 324 def is_work_unit_waiting (self, name): 357 325 return self.packages[name] ['status'] == 'waiting' … … 360 328 self.packages[name] ['status'] = status 361 329 362 def get_dependencies (self, name):363 return self.packages[name] ['uses']364 365 330 def get_next_work_units (self): 366 result = list () 367 for key in self.packages.keys(): 368 if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) : 369 result.append(key) 331 # by default returned list is in arbitrary order - may be this is better 332 # if self.sort is set returned list is sorted - most used packages first 333 runnable = list() 334 for key in self.packages: 335 if len(self.packages[key]['uses']) == 0 and self.packages[key]['status'] == 'waiting': 336 use_count = self.packages[key]['use_count'] 337 runnable.append((use_count,key)) 338 if self.sort: 339 runnable.sort() 340 runnable.reverse() 341 result = [ pair[1] for pair in runnable ] 370 342 return result 371 343 372 def is_work_units (self):373 result = list ()374 for key in self.packages.keys():375 if self.is_work_unit_waiting(key) :376 return True377 return False378 379 344 def suppress_work_unit (self, name): 380 #print '# remove', name, 'from schedule' 345 #print '# remove', name, 'from schedule' 346 self.semaphore.acquire() 347 self.packages[name]['status']='done' 381 348 for key in self.packages.keys(): 382 349 if name in self.packages[key]['uses']: 383 350 self.packages[key]['uses'].remove(name) 351 self.semaphore.release() 384 352 385 353 def add_work_unit (self, name, cmd): 386 354 if self.is_work_unit_waiting (name): 387 355 # we create requests 388 arg = {'cmd': cmd , 'package':name} 389 req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback) 356 arg = {'cmd': cmd , 'package': name} 357 req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback) 358 # req = WorkRequest(self.do_execute, [arg] , None, 359 # callback=self.result_callback, exc_callback=self.handle_exception) 390 360 # then we put the work request in the queue... 391 361 self.set_work_unit_status (name, 'queued') 392 362 self.pool.putRequest(req) 393 # print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))363 # print "# Work request #%s added on %s." % (req.requestID, str(arg['package'])) 394 364 395 365 def execute (self, command): 396 366 #self.print_dependencies () 367 self.semaphore.acquire() 397 368 packages = self.get_next_work_units() 398 if len(packages) !=0: 399 print '\n#--------------------------------------------------------------' 400 print '# Execute parallel actions within packages', packages 369 if len(packages): 370 print '\n#--------------------------------------------------------------' 371 print '# Execute parallel actions within packages (total',len(packages),')',packages 372 print '\n#--------------------------------------------------------------' 401 373 for package in packages: 402 374 self.add_work_unit (package, command) 375 sys.stdout.flush() 376 self.semaphore.release() 403 377 404 378 def execute_all(self,command): … … 406 380 self.execute (command) 407 381 self.wait() 382 if self.counter != len(self.packages): 383 print 'tbroadcast: warning: compiled',self.counter,'out of',len(self.packages),'packages' 384 self.pool.dismissWorkers(self.num_workers, do_join=True) 408 385 #self.print_dependencies () 409 #self.print_status (status='waiting') 410 #while self.is_work_units(): 411 #self.wait() 412 386 #self.print_status (status='waiting') 387 413 388 def wait (self): 414 self.pool.wait() 389 self.pool.wait() 415 390 416 391 # this will be called each time a result is available … … 418 393 #print "**Result: %s from request #%s" % (str(result), request.requestID) 419 394 #print "# Result: %s from request #%s" % (result['package'], request.requestID) 420 #if result['package'] == 'CodeCheck': 421 # sys.exit(-1) 422 self.execute (result['cmd']) 423 424 # the work the threads will have to do 395 self.execute (result['cmd']) 396 397 # the work the threads will have to do 425 398 def do_execute(self, arg): 426 path = self.get_work_area_path (arg['package']) 399 package = arg['package'] 400 path = self.packages[package]['path'] 427 401 if path == None or not os.path.exists(path): 428 raise RuntimeError('Path to package '+ arg['package'] +' not found') 429 self.set_work_unit_status (arg['package'], 'running') 430 #cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'" 431 #os.chdir(path) 432 #arg['cmd'] = "cd "+ path +";"+ arg['cmd'] 433 header = '#--------------------------------------------------------------\n' 434 header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path + '\n' 402 raise RuntimeError('Path to package '+ package +' not found') 403 self.set_work_unit_status(package, 'running') 404 header = '#--------------------------------------------------------------\n' 405 header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+arg['cmd']+'] in '+path+'\n' 435 406 header = header + '#--------------------------------------------------------------\n' 436 407 print header 408 sys.stdout.flush() 437 409 project_path = self.current_project['path']+'/'+self.current_project['version']+'/' 438 410 log_name = string.replace(path, project_path, '') … … 440 412 log_name = string.replace(log_name, '/', '_') 441 413 log_name = log_name+'.loglog' 442 arg['log'] = log_name443 cmd = "cd "+ path +";"+ arg['cmd'] 444 # status, output= commands.getstatusoutput(cmd)414 # arg['log'] = log_name 415 cmd = "cd "+ path +";"+ arg['cmd'] 416 # status, output = commands.getstatusoutput(cmd) 445 417 # init output file 446 418 447 self.packages[ arg['package']] ['startTime'] = time.time ()419 self.packages[package] ['startTime'] = time.time() 448 420 449 421 if self.output is not None: 450 f1 = open (self.output+'/'+ log_name, 'w+') 422 f1 = open (self.output+'/'+ log_name, 'w+') 451 423 f1.write (header) 424 f1.flush() 425 if self.error is not None: 426 f2 = open (self.error+'/error'+log_name, 'w+') 427 fp = Popen(cmd, shell=True, stdout=f1, stderr=f2) 428 fp.communicate() 429 status = fp.wait() 430 f2.close() 431 else: 432 fp = Popen(cmd, shell=True, stdout=f1, stderr=f1) 433 fp.communicate() 434 status = fp.wait() 452 435 f1.close() 453 f1 = open (self.output+'/'+ log_name, 'a')454 if self.error is not None:455 f2 = open (self.error+'/error'+log_name, 'w+')456 Popen(cmd, shell=True, stdout=f1, stderr=f2).communicate()457 f2.close()458 else:459 Popen(cmd, shell=True, stdout=f1, stderr=f1).communicate()460 f1.close()461 436 else: 462 Popen(cmd, shell=True).communicate() 463 437 fp = Popen(cmd, shell=True) 438 fp.communicate() 439 status = fp.wait() 440 sys.stdout.flush() 441 sys.stderr.flush() 442 443 # Error is not handled - exit() is forbidden here 464 444 if not self.keep_going and status > 0: 465 sys.exit(status) 466 467 self.packages[arg['package']] ['endTime'] = time.time () 445 print 'Error',status,'for package',package 446 # sys.exit(status) 447 448 self.packages[package] ['endTime'] = time.time() 468 449 if self.perf: 469 self.semaphore.acquire ()450 self.semaphore.acquire() 470 451 f = open (self.perf, 'a') 471 f.write ( arg['package']+" "+str(self.packages[arg['package']] ['startTime'])+" "+str(self.packages[arg['package']] ['endTime'] )+'\n')452 f.write (package+" "+str(self.packages[package]['startTime'])+" "+str(self.packages[package]['endTime'] )+'\n') 472 453 f.close() 473 454 self.semaphore.release() 474 self.suppress_work_unit (arg['package']) 475 self.set_work_unit_status (arg['package'], 'done') 476 # status, output= commands.getstatusoutput(cmd) 477 #if status != 0: 478 # raise RuntimeError(output) 455 self.suppress_work_unit(package) 479 456 return {'cmd': arg['cmd'], 'package':arg['package']} 480 457 481 458 482 459 # this will be called when an exception occurs within a thread 483 460 def handle_exception(self, request, exc_info): 484 485 print '#--------------------------------------------------------------' 461 #traceback.print_stack() 462 print '#--------------------------------------------------------------' 486 463 #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1]) 487 464 if exc_info[0]== exceptions.SystemExit: 488 print "Stop execution (No_keep_going option enabled): exit code == %s " %(exc_info[1]) 489 print '#--------------------------------------------------------------' 465 print "Stop execution (No_keep_going option enabled): exit code == %s " %(exc_info[1]) 466 print '#--------------------------------------------------------------' 490 467 sys.exit(exc_info[1]) 491 print "# Exception occured: %s" %(exc_info[1]) 468 print "# Exception occured: %s" %(exc_info[1]) 492 469 print exc_info 493 print '#--------------------------------------------------------------' 470 print '#--------------------------------------------------------------' 494 471 #sys.exit(-1) 495 472 496 473 497 474 def generate_make (self, file, command): 498 475 makefile = open (file, 'w+') … … 502 479 self.recursive_make (self.current_package, command, makefile, len(self.packages)) 503 480 makefile.close () 504 481 505 482 def recursive_make (self, package, command, makefile, indice,actions=list()): 506 483 lines = self.generate_action_make (package, command, indice) 507 484 makefile.write (lines) 508 #print lines 485 #print lines 509 486 for pkg in self.packages[package] ['uses']: 510 487 if pkg not in actions: … … 512 489 indice = indice - 1 513 490 self.counter = self.counter - 1 514 self.recursive_make(pkg, command,makefile, indice, actions) 515 491 self.recursive_make(pkg, command,makefile, indice, actions) 492 516 493 def generate_action_make (self, package, command, indice): 517 lines = package + ' :: ' 494 lines = package + ' :: ' 518 495 # add dependencies 519 496 for pkg in self.packages[package] ['uses']: 520 lines = lines + ' ' + pkg 497 lines = lines + ' ' + pkg 521 498 522 499 # add the action itself 523 newcommand = string.replace (command, '<package>', package) 500 newcommand = string.replace (command, '<package>', package) 524 501 if command =='': 525 502 newcommand='$(MAKE)' … … 529 506 lines = lines + '\t@echo "#--------------------------------------------------------------"\n' 530 507 lines = lines + 'ifdef LOCATION\n' 531 lines = lines + '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n' 508 lines = lines + '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n' 532 509 lines = lines + '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n' 533 510 lines = lines + '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n' 534 lines = lines + '\t+@cd ' + self.packages[package]['path'] 511 lines = lines + '\t+@cd ' + self.packages[package]['path'] 535 512 lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n' 536 513 lines = lines + 'else\n' 537 lines = lines + '\t+@cd ' + self.packages[package]['path'] 514 lines = lines + '\t+@cd ' + self.packages[package]['path'] 538 515 lines = lines + ' && ' + newcommand + '\n' 539 lines = lines + 'endif\n\n' 516 lines = lines + 'endif\n\n' 540 517 return lines 541 518 519 520 # own copy of getstatusoutput 521 def getstatusoutput(cmd): 522 """Return (status, stdout) of executing cmd in a shell. 523 524 A trailing line separator is removed from the output string. 525 The exit status of the command is encoded in the format specified for wait(), 526 when the exit status is zero (termination without errors), 0 is returned. 527 """ 528 import os 529 p = os.popen(cmd, 'r') 530 out = p.read() 531 sts = p.close() 532 if sts is None: sts = 0 533 if out.endswith(os.linesep): 534 out = out[:out.rindex(os.linesep)] 535 return sts, out 536 542 537 #--------- EoF --------# -
tbroadcast/HEAD/python/threadpool.py
r508 r517 1 """ 2 3 @author: Christopher Arndt 4 @version: 1.1 5 6 @see: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/435883 7 8 Easy to use object-oriented thread pool framework. 9 10 A thread pool is a class that maintains a pool of worker threads to perform 1 # -*- coding: UTF-8 -*- 2 """Easy to use object-oriented thread pool framework. 3 4 A thread pool is an object that maintains a pool of worker threads to perform 11 5 time consuming operations in parallel. It assigns jobs to the threads 12 6 by putting them in a work request queue, where they are picked up by the 13 7 next available thread. This then performs the requested operation in the 14 background and puts the results in a another queue.15 16 The thread pool classcan then collect the results from all threads from8 background and puts the results in another queue. 9 10 The thread pool object can then collect the results from all threads from 17 11 this queue as soon as they become available or after all threads have 18 12 finished their work. It's also possible, to define callbacks to handle 19 13 each result as it comes in. 20 14 21 The basic concept and some code was taken from the book "Python in a Nutshell "22 by Alex Martelli, copyright 2003, ISBN 0-596-00188-6, from section 14.5 23 "Threaded Program Architecture". I wrapped the main program logic in the15 The basic concept and some code was taken from the book "Python in a Nutshell, 16 2nd edition" by Alex Martelli, O'Reilly 2006, ISBN 0-596-10046-9, from section 17 14.5 "Threaded Program Architecture". I wrapped the main program logic in the 24 18 ThreadPool class, added the WorkRequest class and the callback system and 25 tweaked the code here and there. 26 27 Basic usage: 28 29 >>> main = ThreadPool(poolsize) 30 >>> requests = makeRequests(some_callable, list_of_args, callback) 31 >>> [main.putRequests(req) for req in requests] 32 >>> main.wait() 19 tweaked the code here and there. Kudos also to Florent Aide for the exception 20 handling mechanism. 21 22 Basic usage:: 23 24 >>> pool = ThreadPool(poolsize) 25 >>> requests = makeRequests(some_callable, list_of_args, callback) 26 >>> [pool.putRequest(req) for req in requests] 27 >>> pool.wait() 33 28 34 29 See the end of the module code for a brief, annotated usage example. 30 31 Website : http://chrisarndt.de/projects/threadpool/ 32 35 33 """ 36 37 __all__ = ['makeRequests', 'NoResultsPending', 'NoWorkersAvailable', 38 'ThreadPool', 'WorkRequest', 'WorkerThread'] 39 40 __author__ = 'Christopher Arndt' 41 __version__ = '1.1' 42 __date__ = '2005-07-19' 43 44 import threading, Queue 45 34 __docformat__ = "restructuredtext en" 35 36 __all__ = [ 37 'makeRequests', 38 'NoResultsPending', 39 'NoWorkersAvailable', 40 'ThreadPool', 41 'WorkRequest', 42 'WorkerThread' 43 ] 44 45 __author__ = "Christopher Arndt" 46 __version__ = "1.2.5" 47 __revision__ = "$Revision: 354 $" 48 __date__ = "$Date: 2008-11-19 18:34:46 +0100 (Wed, 19 Nov 2008) $" 49 __license__ = 'MIT license' 50 51 52 # standard library modules 53 import sys 54 import threading 55 import Queue 56 import traceback 57 58 59 # exceptions 46 60 class NoResultsPending(Exception): 47 61 """All work requests have been processed.""" 48 62 pass 63 49 64 class NoWorkersAvailable(Exception): 50 65 """No worker threads available to process remaining requests.""" 51 66 pass 52 67 68 69 # internal module helper functions 70 def _handle_thread_exception(request, exc_info): 71 """Default exception handler callback function. 72 73 This just prints the exception info via ``traceback.print_exception``. 74 75 """ 76 traceback.print_exception(*exc_info) 77 78 79 # utility functions 80 def makeRequests(callable_, args_list, callback=None, 81 exc_callback=_handle_thread_exception): 82 """Create several work requests for same callable with different arguments. 83 84 Convenience function for creating several work requests for the same 85 callable where each invocation of the callable receives different values 86 for its arguments. 87 88 ``args_list`` contains the parameters for each invocation of callable. 89 Each item in ``args_list`` should be either a 2-item tuple of the list of 90 positional arguments and a dictionary of keyword arguments or a single, 91 non-tuple argument. 92 93 See docstring for ``WorkRequest`` for info on ``callback`` and 94 ``exc_callback``. 95 96 """ 97 requests = [] 98 for item in args_list: 99 if isinstance(item, tuple): 100 requests.append( 101 WorkRequest(callable_, item[0], item[1], callback=callback, 102 exc_callback=exc_callback) 103 ) 104 else: 105 requests.append( 106 WorkRequest(callable_, [item], None, callback=callback, 107 exc_callback=exc_callback) 108 ) 109 return requests 110 111 112 # classes 53 113 class WorkerThread(threading.Thread): 54 114 """Background thread connected to the requests/results queues. … … 56 116 A worker thread sits in the background and picks up work requests from 57 117 one queue and puts the results in another until it is dismissed. 118 58 119 """ 59 120 60 def __init__ (self, requestsQueue, resultsQueue, **kwds): 61 """Set up thread in damonic mode and start it immediatedly. 62 63 requestsQueue and resultQueue are instances of Queue.Queue passed 64 by the ThreadPool class when it creates a new worker thread. 121 def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds): 122 """Set up thread in daemonic mode and start it immediatedly. 123 124 ``requests_queue`` and ``results_queue`` are instances of 125 ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new 126 worker thread. 127 65 128 """ 66 129 threading.Thread.__init__(self, **kwds) 67 130 self.setDaemon(1) 68 self.workRequestQueue = requestsQueue 69 self.resultQueue = resultsQueue 131 self._requests_queue = requests_queue 132 self._results_queue = results_queue 133 self._poll_timeout = poll_timeout 70 134 self._dismissed = threading.Event() 71 135 self.start() 72 136 73 137 def run(self): 74 """Repeatedly process the job queue until told to exit. 75 """ 76 77 while not self._dismissed.isSet(): 78 # thread blocks here, if queue empty 79 request = self.workRequestQueue.get() 138 """Repeatedly process the job queue until told to exit.""" 139 while True: 80 140 if self._dismissed.isSet(): 81 # return the work request we just picked up 82 self.workRequestQueue.put(request) 83 break # and exit 84 # XXX catch exceptions here and stick them to request object 85 self.resultQueue.put( 86 (request, request.callable(*request.args, **request.kwds)) 87 ) 141 # we are dismissed, break out of loop 142 break 143 # get next work request. If we don't get a new request from the 144 # queue after self._poll_timout seconds, we jump to the start of 145 # the while loop again, to give the thread a chance to exit. 146 try: 147 request = self._requests_queue.get(True, self._poll_timeout) 148 except Queue.Empty: 149 continue 150 else: 151 if self._dismissed.isSet(): 152 # we are dismissed, put back request in queue and exit loop 153 self._requests_queue.put(request) 154 break 155 try: 156 result = request.callable(*request.args, **request.kwds) 157 self._results_queue.put((request, result)) 158 except: 159 request.exception = True 160 self._results_queue.put((request, sys.exc_info())) 88 161 89 162 def dismiss(self): 90 """Sets a flag to tell the thread to exit when done with current job. 91 """ 92 163 """Sets a flag to tell the thread to exit when done with current job.""" 93 164 self._dismissed.set() 94 165 … … 97 168 """A request to execute a callable for putting in the request queue later. 98 169 99 See the module function makeRequests() for the common case 100 where you want to build several work requests for the same callable 101 but different arguments for each call. 170 See the module function ``makeRequests`` for the common case 171 where you want to build several ``WorkRequest`` objects for the same 172 callable but with different arguments for each call. 173 102 174 """ 103 175 104 def __init__ (self, callable, args=None, kwds=None, requestID=None, 105 callback=None): 106 """A work request consists of the a callable to be executed by a 176 def __init__(self, callable_, args=None, kwds=None, requestID=None, 177 callback=None, exc_callback=_handle_thread_exception): 178 """Create a work request for a callable and attach callbacks. 179 180 A work request consists of the a callable to be executed by a 107 181 worker thread, a list of positional arguments, a dictionary 108 182 of keyword arguments. 109 183 110 A callback function can be specified, that is called when the results 111 of the request are picked up from the result queue. It must accept 112 two arguments, the request object and it's results in that order. 113 If you want to pass additional information to the callback, just stick 114 it on the request object. 115 116 requestID, if given, must be hashable as it is used by the ThreadPool 117 class to store the results of that work request in a dictionary. 118 It defaults to the return value of id(self). 184 A ``callback`` function can be specified, that is called when the 185 results of the request are picked up from the result queue. It must 186 accept two anonymous arguments, the ``WorkRequest`` object and the 187 results of the callable, in that order. If you want to pass additional 188 information to the callback, just stick it on the request object. 189 190 You can also give custom callback for when an exception occurs with 191 the ``exc_callback`` keyword parameter. It should also accept two 192 anonymous arguments, the ``WorkRequest`` and a tuple with the exception 193 details as returned by ``sys.exc_info()``. The default implementation 194 of this callback just prints the exception info via 195 ``traceback.print_exception``. If you want no exception handler 196 callback, just pass in ``None``. 197 198 ``requestID``, if given, must be hashable since it is used by 199 ``ThreadPool`` object to store the results of that work request in a 200 dictionary. It defaults to the return value of ``id(self)``. 201 119 202 """ 120 203 if requestID is None: 121 204 self.requestID = id(self) 122 205 else: 123 self.requestID = requestID 206 try: 207 self.requestID = hash(requestID) 208 except TypeError: 209 raise TypeError("requestID must be hashable.") 210 self.exception = False 124 211 self.callback = callback 125 self.callable = callable 212 self.exc_callback = exc_callback 213 self.callable = callable_ 126 214 self.args = args or [] 127 215 self.kwds = kwds or {} 128 216 217 def __str__(self): 218 return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \ 219 (self.requestID, self.args, self.kwds, self.exception) 129 220 130 221 class ThreadPool: 131 222 """A thread pool, distributing work requests and collecting results. 132 223 133 See the module doctring for more information. 224 See the module docstring for more information. 225 134 226 """ 135 227 136 def __init__ (self, num_workers, q_size=0):228 def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): 137 229 """Set up the thread pool and start num_workers worker threads. 138 230 139 num_workers is the number of worker threads to start initialy. 140 If q_size > 0 the size of the work request is limited and the 141 thread pool blocks when queue is full and it tries to put more 142 work requests in it. 231 ``num_workers`` is the number of worker threads to start initially. 232 233 If ``q_size > 0`` the size of the work *request queue* is limited and 234 the thread pool blocks when the queue is full and it tries to put 235 more work requests in it (see ``putRequest`` method), unless you also 236 use a positive ``timeout`` value for ``putRequest``. 237 238 If ``resq_size > 0`` the size of the *results queue* is limited and the 239 worker threads will block when the queue is full and they try to put 240 new results in it. 241 242 .. warning: 243 If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is 244 the possibilty of a deadlock, when the results queue is not pulled 245 regularly and too many jobs are put in the work requests queue. 246 To prevent this, always set ``timeout > 0`` when calling 247 ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. 248 143 249 """ 144 145 self.requestsQueue = Queue.Queue(q_size) 146 self.resultsQueue = Queue.Queue() 250 self._requests_queue = Queue.Queue(q_size) 251 self._results_queue = Queue.Queue(resq_size) 147 252 self.workers = [] 253 self.dismissedWorkers = [] 148 254 self.workRequests = {} 149 self.createWorkers(num_workers) 150 151 def createWorkers(self, num_workers): 152 """Add num_workers worker threads to the pool.""" 153 255 self.createWorkers(num_workers, poll_timeout) 256 257 def createWorkers(self, num_workers, poll_timeout=5): 258 """Add num_workers worker threads to the pool. 259 260 ``poll_timout`` sets the interval in seconds (int or float) for how 261 ofte threads should check whether they are dismissed, while waiting for 262 requests. 263 264 """ 154 265 for i in range(num_workers): 155 self.workers.append(WorkerThread(self. requestsQueue,156 self.resultsQueue))157 158 def dismissWorkers(self, num_workers ):159 """Tell num_workers worker threads to to quit when they're done."""160 266 self.workers.append(WorkerThread(self._requests_queue, 267 self._results_queue, poll_timeout=poll_timeout)) 268 269 def dismissWorkers(self, num_workers, do_join=False): 270 """Tell num_workers worker threads to quit after their current task.""" 271 dismiss_list = [] 161 272 for i in range(min(num_workers, len(self.workers))): 162 273 worker = self.workers.pop() 163 274 worker.dismiss() 164 165 def putRequest(self, request): 166 """Put work request into work queue and save for later.""" 167 self.requestsQueue.put(request) 275 dismiss_list.append(worker) 276 277 if do_join: 278 for worker in dismiss_list: 279 worker.join() 280 else: 281 self.dismissedWorkers.extend(dismiss_list) 282 283 def joinAllDismissedWorkers(self): 284 """Perform Thread.join() on all worker threads that have been dismissed. 285 """ 286 for worker in self.dismissedWorkers: 287 worker.join() 288 self.dismissedWorkers = [] 289 290 def putRequest(self, request, block=True, timeout=0): 291 """Put work request into work queue and save its id for later.""" 292 assert isinstance(request, WorkRequest) 293 # don't reuse old work requests 294 assert not getattr(request, 'exception', None) 295 self._requests_queue.put(request, block, timeout) 168 296 self.workRequests[request.requestID] = request 169 297 170 298 def poll(self, block=False): 171 299 """Process any new results in the queue.""" 172 while 1: 300 while True: 301 # still results pending? 302 if not self.workRequests: 303 raise NoResultsPending 304 # are there still workers to process remaining requests? 305 elif block and not self.workers: 306 raise NoWorkersAvailable 173 307 try: 174 # still results pending?175 if not self.workRequests:176 raise NoResultsPending177 # are there still workers to process remaining requests?178 elif block and not self.workers:179 raise NoWorkersAvailable180 308 # get back next results 181 request, result = self.resultsQueue.get(block=block) 182 # and hand them to the callback, if any 183 if request.callback: 309 request, result = self._results_queue.get(block=block) 310 # has an exception occured? 311 if request.exception and request.exc_callback: 312 request.exc_callback(request, result) 313 # hand results to callback, if any 314 if request.callback and not \ 315 (request.exception and request.exc_callback): 184 316 request.callback(request, result) 185 317 del self.workRequests[request.requestID] … … 189 321 def wait(self): 190 322 """Wait for results, blocking until all have arrived.""" 191 192 323 while 1: 193 324 try: … … 196 327 break 197 328 198 def makeRequests(callable, args_list, callback=None): 199 """Convenience function for building several work requests for the same 200 callable with different arguments for each call. 201 202 args_list contains the parameters for each invocation of callable. 203 Each item in 'argslist' should be either a 2-item tuple of the list of 204 positional arguments and a dictionary of keyword arguments or a single, 205 non-tuple argument. 206 207 callback is called when the results arrive in the result queue. 208 """ 209 210 requests = [] 211 212 for item in args_list: 213 214 if item is None: 215 """no arguments""" 216 requests.append( 217 WorkRequest(callable, None, None, callback=callback)) 218 219 elif item == isinstance(item, tuple): 220 """arguments and keywords""" 221 requests.append( 222 WorkRequest(callable, item[0], item[1], callback=callback)) 223 else: 224 """only keywords""" 225 requests.append( 226 WorkRequest(callable, [item], None, callback=callback)) 227 return requests 228 329 330 ################ 331 # USAGE EXAMPLE 332 ################ 229 333 230 334 if __name__ == '__main__': … … 235 339 def do_something(data): 236 340 time.sleep(random.randint(1,5)) 237 return round(random.random() * data, 5) 341 result = round(random.random() * data, 5) 342 # just to show off, we throw an exception once in a while 343 if result > 5: 344 raise RuntimeError("Something extraordinary happened!") 345 return result 238 346 239 347 # this will be called each time a result is available 240 348 def print_result(request, result): 241 print "Result: %s from request #%s" % (result, request.requestID) 349 print "**** Result from request #%s: %r" % (request.requestID, result) 350 351 # this will be called when an exception occurs within a thread 352 # this example exception handler does little more than the default handler 353 def handle_exception(request, exc_info): 354 if not isinstance(exc_info, tuple): 355 # Something is seriously wrong... 356 print request 357 print exc_info 358 raise SystemExit 359 print "**** Exception occured in request #%s: %s" % \ 360 (request.requestID, exc_info) 242 361 243 362 # assemble the arguments for each job to a list... 244 363 data = [random.randint(1,10) for i in range(20)] 245 364 # ... and build a WorkRequest object for each item in data 246 requests = makeRequests(do_something, data, print_result) 247 248 # we create a pool of 10 worker threads 365 requests = makeRequests(do_something, data, print_result, handle_exception) 366 # to use the default exception handler, uncomment next line and comment out 367 # the preceding one. 368 #requests = makeRequests(do_something, data, print_result) 369 370 # or the other form of args_lists accepted by makeRequests: ((,), {}) 371 data = [((random.randint(1,10),), {}) for i in range(20)] 372 requests.extend( 373 makeRequests(do_something, data, print_result, handle_exception) 374 #makeRequests(do_something, data, print_result) 375 # to use the default exception handler, uncomment next line and comment 376 # out the preceding one. 377 ) 378 379 # we create a pool of 3 worker threads 380 print "Creating thread pool with 3 worker threads." 249 381 main = ThreadPool(3) 250 382 … … 257 389 258 390 # ...and wait for the results to arrive in the result queue 259 # wait() will return when results for all work requests have arrived 391 # by using ThreadPool.wait(). This would block until results for 392 # all work requests have arrived: 260 393 # main.wait() 261 394 262 # alternativelypoll for results while doing something else:395 # instead we can poll for results while doing something else: 263 396 i = 0 264 while 1:397 while True: 265 398 try: 399 time.sleep(0.5) 266 400 main.poll() 267 print "Main thread working..." 268 time.sleep(0.5)401 print "Main thread working...", 402 print "(active worker threads: %i)" % (threading.activeCount()-1, ) 269 403 if i == 10: 270 print " Adding 3 more worker threads..."404 print "**** Adding 3 more worker threads..." 271 405 main.createWorkers(3) 406 if i == 20: 407 print "**** Dismissing 2 worker threads..." 408 main.dismissWorkers(2) 272 409 i += 1 273 except (KeyboardInterrupt, NoResultsPending): 410 except KeyboardInterrupt: 411 print "**** Interrupted!" 274 412 break 413 except NoResultsPending: 414 print "**** No pending results." 415 break 416 if main.dismissedWorkers: 417 print "Joining all dismissed worker threads..." 418 main.joinAllDismissedWorkers() -
tbroadcast/HEAD/scripts/tbroadcast
r508 r517 3 3 # -- Author: V. Garonne 4 4 # -- Mail: garonne@lal.in2p3.fr 5 # -- Date: 08/25/2006 5 # -- Date: 08/25/2006 6 6 # -- Name: tbroadcast 7 7 # -- Description: main program 8 8 #----------------------------------# 9 9 10 import os11 10 import sys 12 import string11 import time 13 12 14 13 def usage(): 15 print 'Usage : > tbroadcast [global options] [<command>]' 16 print '# command :' 17 print '# <command>: command to execute' 18 print '# global options :' 19 print '# -f=<file> : Input file' 20 print '# -help : Print help' 21 print '# -local : Reach packages only within the current project' 22 print '# -global : Reach packages in all CMTPATH/CMTPROJECTPATH items' 23 print '# -ignore_cycles : Suppress automatically the cycles' 24 print '# -make=<file> : Generate a recursive Make, [see: http://www.tip.net.au/~millerp/rmch/recu-make-cons-harm.html]' 25 print '# -nb=<num_worker> : Change the total number of threads[default is 20]' 26 print '# -no_keep_going : Exit after the first exit code > 1 found and return it in the shell' 27 print '# -output=<location> : Output directory to store output files with the form <package>_output.log' 28 print '# -error=<location> : Output directory to store error output with the form <package>_error.log' 29 print '# -perf=<file> : Store for each package the time for executing the command in the <file> file' 30 print '# -print : Print dependencies for each package' 31 print '# -version : version of tbroadcast' 32 print '# -silent : Disable print' 33 print '# -test : Simulate execution' 14 print """ 15 Usage: tbroadcast [global options] [<command>] 16 # 17 # <command> is executed in <package>/cmt 18 # 19 # global options : 20 # -help : Print help 21 # -local : Reach packages only within the current project 22 # : if not specified reach packages in all CMTPATH/CMTPROJECTPATH items 23 # -ignore[_cycles] : Suppress automatically the cycles 24 # -sort : Compile packages in order of use count, most significant first 25 # -nb=<num_worker> : Change the total number of threads[default is 20] 26 # -output=<location> : Output directory to store output files with the form <package>_output.log 27 # -error=<location> : Output directory to store error output with the form <package>_error.log 28 # -perf=<file> : Store for each package the time for executing the command in the <file> file 29 # -make=<file> : Generate a recursive Make, [see: http://www.tip.net.au/~millerp/rmch/recu-make-cons-harm.html] 30 # -print : Print dependencies for each package and exit 31 # -version : Print version of tbroadcast and exit 32 # -test : Simulate execution and exit 33 # - : is accepted and does nothing 34 # 35 # Example: 36 # tbroadcast -local -ignore -nb=4 'make -j6' 37 """ 38 39 # Unused options 40 # -f=<file> : Input file (option for debug only) 41 # -no_keep_going : Exit after the first exit code > 1 found and return it in the shell 42 # -silent : Disable print 34 43 35 44 if __name__ == '__main__': … … 38 47 cur_version = sys.version_info 39 48 40 if not (cur_version[0] > req_version[0] or (cur_version[0] == req_version[0] and cur_version[1] >= req_version[1])): 41 raise "must use python 2.5 or greater" 49 if not (cur_version[0] > req_version[0] or (cur_version[0] == req_version[0] and cur_version[1] >= req_version[1])): 50 print "tbroadcast: must use python 2.5 or greater" 51 sys.exit(-1) 42 52 43 53 from tbroadcast import Scheduler … … 46 56 num_worker = 20 47 57 command = '' 48 version = 'v2.0. 4'58 version = 'v2.0.7' 49 59 test = False 50 check = False51 60 print_graph = False 52 61 local = False 53 62 ignore_cycles = False 54 63 silent = False 55 perf = False 64 perf = False 65 sort = False 56 66 output = None 57 67 error = None … … 60 70 makefile = 'Makefile' 61 71 keep_going = True 62 72 63 73 if len(sys.argv) == 1: 64 test = True 65 else: 66 for arg in sys.argv[1:len(sys.argv)]: 74 usage() 75 sys.exit(-1) 76 else: 77 for arg in sys.argv[1:len(sys.argv)]: 78 # print "Argument is",arg 67 79 if arg[0]=='-': 68 option = string.split(arg,'=')[0]80 option = arg.split('=')[0] 69 81 if option == '-version': 70 82 print version 71 83 sys.exit(-1) 72 if option == '-nb': 73 num_worker = int (string.split(arg,'=')[1]) 74 if option == '-f': 75 file = string.split(arg,'=')[1] 76 if option == '-perf': 77 perf = string.split(arg,'=')[1] 78 if option == '-output': 79 output = string.split(arg,'=')[1] 80 if option == '-error': 81 error = string.split(arg,'=')[1] 82 if option == '-local': 83 local= True 84 if option == '-ignore_cycles': 84 elif option == '-nb': 85 num_worker = int (arg.split('=')[1]) 86 elif option == '-f': 87 file = arg.split('=')[1] 88 elif option == '-perf': 89 perf = arg.split('=')[1] 90 elif option == '-output': 91 output = arg.split('=')[1] 92 elif option == '-error': 93 error = arg.split('=')[1] 94 elif option == '-local': 95 local = True 96 elif option == '-sort': 97 sort = True 98 elif option[:7] == '-ignore': 85 99 ignore_cycles = True 86 if option == '-silent':87 silent = True 88 if option == '-no_keep_going':100 elif option == '-silent': 101 silent = True 102 elif option == '-no_keep_going': 89 103 keep_going = False 90 91 if option == '-help': 92 usage() 93 sys.exit(-1) 94 95 if option == '-test': 96 test = True 97 elif option == '-check': 98 check = True 104 elif option == '-help': 105 usage() 106 sys.exit(-1) 107 elif option == '-test': 108 test = True 99 109 elif option == '-print': 100 print_graph = True101 elif option == '-make': 110 print_graph = True 111 elif option == '-make': 102 112 make = True 103 makefile = string.split(arg,'=')[1] 113 makefile = arg.split('=')[1] 114 elif option == '-': 115 pass 116 else: 117 print 'tbroadcast: bad option "%s", use -help for help' % option 118 sys.exit(-1) 104 119 else: 105 120 command = arg 106 121 107 master = Scheduler (num_workers=num_worker, file=file, ignore_cycles=ignore_cycles, 108 local=local, output=output, error=error, silent=silent, perf=perf, 109 keep_going=keep_going) 122 # print "End of arguments. Command to execute", command 123 124 if not (command or test or print_graph): 125 print 'tbroadcast: no command specified' 126 sys.exit(-1) 127 128 master = Scheduler (num_workers=num_worker, file=file, ignore_cycles=ignore_cycles, 129 local=local, output=output, error=error, silent=silent, perf=perf, 130 keep_going=keep_going, sort=sort) 110 131 if test: 111 132 master.simulate_execution() 112 elif check:113 master.check_execution (package=master.get_current_package())114 133 elif print_graph: 115 master.print_dependencies 134 master.print_dependencies() 116 135 elif make: 117 master.generate_make (makefile, command) 136 master.generate_make (makefile, command) 118 137 else: 138 print 'tbroadcast: start of job at', time.strftime('%d-%b-%Y %T') 119 139 master.execute_all (command) 120 #sys.exit(-1); 140 print 'tbroadcast: end of job at', time.strftime('%d-%b-%Y %T') 141 #sys.exit(-1) 121 142 #--------- EoF --------#
Note: See TracChangeset
for help on using the changeset viewer.