Changeset 508 for tbroadcast/HEAD/python
- Timestamp:
- May 28, 2009, 9:44:59 AM (15 years ago)
- Location:
- tbroadcast/HEAD/python
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
tbroadcast/HEAD/python/tbroadcast.py
r478 r508 18 18 from threading import BoundedSemaphore 19 19 20 from threadpool import WorkRequest21 from threadpool import ThreadPool22 from threadpool import NoResultsPending23 from threadpool import NoWorkersAvailable24 from threadpool import makeRequests25 from executer import exeCommand 26 from executer import getstatusoutput20 from threadpool import WorkRequest 21 from threadpool import ThreadPool 22 from threadpool import NoResultsPending 23 from threadpool import NoWorkersAvailable 24 from threadpool import makeRequests 25 26 from subprocess import Popen 27 27 28 28 class Scheduler: … … 72 72 def get_current_project(self): 73 73 cmd = 'cmt show projects | grep current' 74 status, output = getstatusoutput (cmd) 75 # status, output = commands.getstatusoutput (cmd) 74 status, output = commands.getstatusoutput (cmd) 76 75 if status != 0: 77 76 print output … … 100 99 cmd = 'cmt -private show cycles' 101 100 cycle_found = False 102 status, output = getstatusoutput (cmd) 103 # status, output = commands.getstatusoutput (cmd) 101 status, output = commands.getstatusoutput (cmd) 104 102 if status != 0: 105 103 print output … … 252 250 else: 253 251 cmd = 'cat ' + file 254 status, output = getstatusoutput (cmd) 255 # status, output = commands.getstatusoutput (cmd) 252 status, output = commands.getstatusoutput (cmd) 256 253 if status != 0: 257 254 print output … … 312 309 313 310 def get_current_package(self): 314 cmd = 'cmt show macro package' 315 status, output = getstatusoutput (cmd) 316 # status, output = commands.getstatusoutput (cmd) 311 cmd = 'cmt show macro package' 312 status, output = commands.getstatusoutput (cmd) 317 313 if status != 0: 318 314 print output … … 331 327 #return os.getcwd () 332 328 cmd = 'cmt -use='+name+' run pwd' 333 status, output = getstatusoutput (cmd) 334 # status, output = commands.getstatusoutput (cmd) 329 status, output = commands.getstatusoutput (cmd) 335 330 if status != 0: 336 331 print output … … 392 387 # we create requests 393 388 arg = {'cmd': cmd , 'package':name} 394 req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback , exc_callback=self.handle_exception)389 req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback) 395 390 # then we put the work request in the queue... 396 391 self.set_work_unit_status (name, 'queued') … … 441 436 print header 442 437 project_path = self.current_project['path']+'/'+self.current_project['version']+'/' 443 log_name =string.replace(path, project_path, '')444 log_name = string.replace(log_name, '/cmt', '')445 log_name = string.replace(log_name, '/', '_')446 log_name = log_name+'.loglog'447 arg['log'] = log_name438 log_name = string.replace(path, project_path, '') 439 log_name = string.replace(log_name, '/cmt', '') 440 log_name = string.replace(log_name, '/', '_') 441 log_name = log_name+'.loglog' 442 arg['log'] = log_name 448 443 cmd = "cd "+ path +";"+ arg['cmd'] 449 444 #status, output= commands.getstatusoutput(cmd) 450 445 # init output file 446 447 self.packages[arg['package']] ['startTime'] = time.time () 448 451 449 if self.output is not None: 452 f = open (self.output+'/'+ log_name, 'w+') 453 f.write (header) 454 f.close() 455 if self.error is not None: 456 f = open (self.error+'/error'+log_name, 'w+') 457 f.close() 458 self.packages[arg['package']] ['startTime'] = time.time () 459 status, output, error, pythonError = exeCommand(sCmd=cmd, oLineCallback=self.redirectOutput, arg=arg)#,iTimeout = 3600) 450 f1 = open (self.output+'/'+ log_name, 'w+') 451 f1.write (header) 452 f1.close() 453 f1 = open (self.output+'/'+ log_name, 'a') 454 if self.error is not None: 455 f2 = open (self.error+'/error'+log_name, 'w+') 456 Popen(cmd, shell=True, stdout=f1, stderr=f2).communicate() 457 f2.close() 458 else: 459 Popen(cmd, shell=True, stdout=f1, stderr=f1).communicate() 460 f1.close() 461 else: 462 Popen(cmd, shell=True).communicate() 463 460 464 if not self.keep_going and status > 0: 461 465 sys.exit(status) … … 473 477 #if status != 0: 474 478 # raise RuntimeError(output) 475 return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']} 476 477 def redirectOutput(self, index, buffer, arg): 478 """Filter function to redirect the std output and error of the job 479 executable for real-time debugging 480 """ 481 if self.output is not None: 482 if index==0: 483 f = open (self.output+'/'+arg['log'], 'a') 484 f.write (buffer+'\n') 485 f.close() 486 elif index==1: 487 if self.error is not None: 488 f = open (self.error+'/error'+arg['log'], 'a') 489 else: 490 f = open (self.output+'/'+arg['log'], 'a') 491 f.write (buffer+'\n') 492 f.close() 493 if not self.silent: 494 print buffer 479 return {'cmd': arg['cmd'], 'package':arg['package']} 480 495 481 496 482 # this will be called when an exception occurs within a thread -
tbroadcast/HEAD/python/threadpool.py
r236 r508 1 """Easy to use object-oriented thread pool framework. 2 3 A thread pool is an object that maintains a pool of worker threads to perform 1 """ 2 3 @author: Christopher Arndt 4 @version: 1.1 5 6 @see: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/435883 7 8 Easy to use object-oriented thread pool framework. 9 10 A thread pool is a class that maintains a pool of worker threads to perform 4 11 time consuming operations in parallel. It assigns jobs to the threads 5 12 by putting them in a work request queue, where they are picked up by the … … 7 14 background and puts the results in a another queue. 8 15 9 The thread pool objectcan then collect the results from all threads from16 The thread pool class can then collect the results from all threads from 10 17 this queue as soon as they become available or after all threads have 11 18 finished their work. It's also possible, to define callbacks to handle … … 16 23 "Threaded Program Architecture". I wrapped the main program logic in the 17 24 ThreadPool class, added the WorkRequest class and the callback system and 18 tweaked the code here and there. Kudos also to Florent Aide for the exception 19 handling mechanism. 25 tweaked the code here and there. 20 26 21 27 Basic usage: 22 28 23 >>> pool = TreadPool(poolsize)29 >>> main = ThreadPool(poolsize) 24 30 >>> requests = makeRequests(some_callable, list_of_args, callback) 25 >>> [ pool.putRequest(req) for req in requests]26 >>> pool.wait()31 >>> [main.putRequests(req) for req in requests] 32 >>> main.wait() 27 33 28 34 See the end of the module code for a brief, annotated usage example. 29 30 Website : http://chrisarndt.de/en/software/python/threadpool/31 35 """ 32 36 33 __all__ = [ 34 'makeRequests', 35 'NoResultsPending', 36 'NoWorkersAvailable', 37 'ThreadPool', 38 'WorkRequest', 39 'WorkerThread' 40 ] 41 42 __author__ = "Christopher Arndt" 43 __version__ = "1.2.3" 44 __revision__ = "$Revision: 1.5 $" 45 __date__ = "$Date: 2006/06/23 12:32:25 $" 46 __license__ = 'Python license' 47 48 # standard library modules 49 import sys 50 import threading 51 import Queue 52 53 # exceptions 37 __all__ = ['makeRequests', 'NoResultsPending', 'NoWorkersAvailable', 38 'ThreadPool', 'WorkRequest', 'WorkerThread'] 39 40 __author__ = 'Christopher Arndt' 41 __version__ = '1.1' 42 __date__ = '2005-07-19' 43 44 import threading, Queue 45 54 46 class NoResultsPending(Exception): 55 47 """All work requests have been processed.""" 56 48 pass 57 58 49 class NoWorkersAvailable(Exception): 59 50 """No worker threads available to process remaining requests.""" 60 51 pass 61 52 62 # classes63 53 class WorkerThread(threading.Thread): 64 54 """Background thread connected to the requests/results queues. … … 68 58 """ 69 59 70 def __init__ (self, requestsQueue, resultsQueue, **kwds):71 """Set up thread in da emonic mode and start it immediatedly.60 def __init__ (self, requestsQueue, resultsQueue, **kwds): 61 """Set up thread in damonic mode and start it immediatedly. 72 62 73 63 requestsQueue and resultQueue are instances of Queue.Queue passed 74 64 by the ThreadPool class when it creates a new worker thread. 75 65 """ 76 77 66 threading.Thread.__init__(self, **kwds) 78 67 self.setDaemon(1) … … 83 72 84 73 def run(self): 85 """Repeatedly process the job queue until told to exit.""" 74 """Repeatedly process the job queue until told to exit. 75 """ 86 76 87 77 while not self._dismissed.isSet(): … … 89 79 request = self.workRequestQueue.get() 90 80 if self._dismissed.isSet(): 91 # if told to exit,return the work request we just picked up81 # return the work request we just picked up 92 82 self.workRequestQueue.put(request) 93 83 break # and exit 94 try: 95 self.resultQueue.put( 96 (request, request.callable(*request.args, **request.kwds)) 97 ) 98 except: 99 request.exception = True 100 self.resultQueue.put((request, sys.exc_info())) 84 # XXX catch exceptions here and stick them to request object 85 self.resultQueue.put( 86 (request, request.callable(*request.args, **request.kwds)) 87 ) 101 88 102 89 def dismiss(self): … … 111 98 112 99 See the module function makeRequests() for the common case 113 where you want to build several WorkRequests for the same callable 114 but with different arguments for each call. 115 """ 116 117 def __init__(self, callable, args=None, kwds=None, requestID=None, 118 callback=None, exc_callback=None): 119 """Create a work request for a callable and attach callbacks. 120 121 A work request consists of the a callable to be executed by a 100 where you want to build several work requests for the same callable 101 but different arguments for each call. 102 """ 103 104 def __init__ (self, callable, args=None, kwds=None, requestID=None, 105 callback=None): 106 """A work request consists of the a callable to be executed by a 122 107 worker thread, a list of positional arguments, a dictionary 123 108 of keyword arguments. … … 125 110 A callback function can be specified, that is called when the results 126 111 of the request are picked up from the result queue. It must accept 127 two arguments, the request object and the results of the callable, 128 in that order. If you want to pass additional information to the 129 callback, just stick it on the request object. 130 131 You can also give a callback for when an exception occurs. It should 132 also accept two arguments, the work request and a tuple with the 133 exception details as returned by sys.exc_info(). 134 135 requestID, if given, must be hashable since it is used by the 136 ThreadPool object to store the results of that work request in a 137 dictionary. It defaults to the return value of id(self). 138 """ 139 112 two arguments, the request object and it's results in that order. 113 If you want to pass additional information to the callback, just stick 114 it on the request object. 115 116 requestID, if given, must be hashable as it is used by the ThreadPool 117 class to store the results of that work request in a dictionary. 118 It defaults to the return value of id(self). 119 """ 140 120 if requestID is None: 141 121 self.requestID = id(self) 142 122 else: 143 try:144 hash(requestID)145 except TypeError:146 raise TypeError("requestID must be hashable.")147 123 self.requestID = requestID 148 self.exception = False149 124 self.callback = callback 150 self.exc_callback = exc_callback151 125 self.callable = callable 152 126 self.args = args or [] … … 160 134 """ 161 135 162 def __init__ (self, num_workers, q_size=0):136 def __init__ (self, num_workers, q_size=0): 163 137 """Set up the thread pool and start num_workers worker threads. 164 138 165 139 num_workers is the number of worker threads to start initialy. 166 If q_size > 0 the size of the work request queue is limited and167 th e thread pool blocks when the queue is full and it tries to put168 more work requests in it (see putRequest method).140 If q_size > 0 the size of the work request is limited and the 141 thread pool blocks when queue is full and it tries to put more 142 work requests in it. 169 143 """ 170 144 … … 183 157 184 158 def dismissWorkers(self, num_workers): 185 """Tell num_workers worker threads to quit after their current task. 186 """ 159 """Tell num_workers worker threads to to quit when they're done.""" 187 160 188 161 for i in range(min(num_workers, len(self.workers))): … … 190 163 worker.dismiss() 191 164 192 def putRequest(self, request, block=True, timeout=0): 193 """Put work request into work queue and save its id for later.""" 194 195 assert isinstance(request, WorkRequest) 196 self.requestsQueue.put(item=request, block=block)#,timeout=timeout) 165 def putRequest(self, request): 166 """Put work request into work queue and save for later.""" 167 self.requestsQueue.put(request) 197 168 self.workRequests[request.requestID] = request 198 169 199 170 def poll(self, block=False): 200 171 """Process any new results in the queue.""" 201 202 while True: 203 # still results pending? 204 if not self.workRequests: 205 raise NoResultsPending 206 # are there still workers to process remaining requests? 207 elif block and not self.workers: 208 raise NoWorkersAvailable 172 while 1: 209 173 try: 174 # still results pending? 175 if not self.workRequests: 176 raise NoResultsPending 177 # are there still workers to process remaining requests? 178 elif block and not self.workers: 179 raise NoWorkersAvailable 210 180 # get back next results 211 181 request, result = self.resultsQueue.get(block=block) 212 # has an exception occured? 213 if request.exception and request.exc_callback: 214 request.exc_callback(request, result) 215 # hand results to callback, if any 216 if request.callback and not \ 217 (request.exception and request.exc_callback): 182 # and hand them to the callback, if any 183 if request.callback: 218 184 request.callback(request, result) 219 185 del self.workRequests[request.requestID] … … 223 189 def wait(self): 224 190 """Wait for results, blocking until all have arrived.""" 191 225 192 while 1: 226 193 try: 227 194 self.poll(True) 228 195 except NoResultsPending: 229 break 230 # helper functions 231 def makeRequests(callable, args_list, callback=None, exc_callback=None): 232 """Create several work requests for same callable with different arguments. 233 234 Convenience function for creating several work requests for the same 235 callable where each invocation of the callable receives different values 236 for its arguments. 196 break 197 198 def makeRequests(callable, args_list, callback=None): 199 """Convenience function for building several work requests for the same 200 callable with different arguments for each call. 237 201 238 202 args_list contains the parameters for each invocation of callable. 239 Each item in 'args _list' should be either a 2-item tuple of the list of203 Each item in 'argslist' should be either a 2-item tuple of the list of 240 204 positional arguments and a dictionary of keyword arguments or a single, 241 205 non-tuple argument. 242 206 243 See docstring for WorkRequest for info on callback and exc_callback.207 callback is called when the results arrive in the result queue. 244 208 """ 245 209 246 210 requests = [] 211 247 212 for item in args_list: 248 if isinstance(item, tuple): 213 214 if item is None: 215 """no arguments""" 249 216 requests.append( 250 WorkRequest(callable, item[0], item[1], callback=callback, 251 exc_callback=exc_callback) 252 ) 217 WorkRequest(callable, None, None, callback=callback)) 218 219 elif item == isinstance(item, tuple): 220 """arguments and keywords""" 221 requests.append( 222 WorkRequest(callable, item[0], item[1], callback=callback)) 253 223 else: 224 """only keywords""" 254 225 requests.append( 255 WorkRequest(callable, [item], None, callback=callback, 256 exc_callback=exc_callback) 257 ) 226 WorkRequest(callable, [item], None, callback=callback)) 258 227 return requests 259 228 260 229 261 #------------------------------ EoF ------------------------------# 230 if __name__ == '__main__': 231 import random 232 import time 233 234 # the work the threads will have to do (rather trivial in our example) 235 def do_something(data): 236 time.sleep(random.randint(1,5)) 237 return round(random.random() * data, 5) 238 239 # this will be called each time a result is available 240 def print_result(request, result): 241 print "Result: %s from request #%s" % (result, request.requestID) 242 243 # assemble the arguments for each job to a list... 244 data = [random.randint(1,10) for i in range(20)] 245 # ... and build a WorkRequest object for each item in data 246 requests = makeRequests(do_something, data, print_result) 247 248 # we create a pool of 10 worker threads 249 main = ThreadPool(3) 250 251 # then we put the work requests in the queue... 252 for req in requests: 253 main.putRequest(req) 254 print "Work request #%s added." % req.requestID 255 # or shorter: 256 # [main.putRequest(req) for req in requests] 257 258 # ...and wait for the results to arrive in the result queue 259 # wait() will return when results for all work requests have arrived 260 # main.wait() 261 262 # alternatively poll for results while doing something else: 263 i = 0 264 while 1: 265 try: 266 main.poll() 267 print "Main thread working..." 268 time.sleep(0.5) 269 if i == 10: 270 print "Adding 3 more worker threads..." 271 main.createWorkers(3) 272 i += 1 273 except (KeyboardInterrupt, NoResultsPending): 274 break
Note: See TracChangeset
for help on using the changeset viewer.