Ignore:
Timestamp:
May 28, 2009, 9:44:59 AM (15 years ago)
Author:
garonne
Message:

Commit changes

File:
1 edited

Legend:

Unmodified
Added
Removed
  • tbroadcast/HEAD/python/threadpool.py

    r236 r508  
    1 """Easy to use object-oriented thread pool framework.
    2 
    3 A thread pool is an object that maintains a pool of worker threads to perform
     1"""
     2
     3@author: Christopher Arndt
     4@version: 1.1
     5
     6@see: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/435883
     7
     8Easy to use object-oriented thread pool framework.
     9
     10A thread pool is a class that maintains a pool of worker threads to perform
    411time consuming operations in parallel. It assigns jobs to the threads
    512by putting them in a work request queue, where they are picked up by the
     
    714background and puts the results in a another queue.
    815
    9 The thread pool object can then collect the results from all threads from
     16The thread pool class can then collect the results from all threads from
    1017this queue as soon as they become available or after all threads have
    1118finished their work. It's also possible, to define callbacks to handle
     
    1623"Threaded Program Architecture". I wrapped the main program logic in the
    1724ThreadPool class, added the WorkRequest class and the callback system and
    18 tweaked the code here and there. Kudos also to Florent Aide for the exception
    19 handling mechanism.
     25tweaked the code here and there.
    2026
    2127Basic usage:
    2228
    23 >>> pool = TreadPool(poolsize)
     29>>> main = ThreadPool(poolsize)
    2430>>> requests = makeRequests(some_callable, list_of_args, callback)
    25 >>> [pool.putRequest(req) for req in requests]
    26 >>> pool.wait()
     31>>> [main.putRequests(req) for req in requests]
     32>>> main.wait()
    2733
    2834See the end of the module code for a brief, annotated usage example.
    29 
    30 Website : http://chrisarndt.de/en/software/python/threadpool/
    3135"""
    3236
    33 __all__ = [
    34   'makeRequests',
    35   'NoResultsPending',
    36   'NoWorkersAvailable',
    37   'ThreadPool',
    38   'WorkRequest',
    39   'WorkerThread'
    40 ]
    41 
    42 __author__ = "Christopher Arndt"
    43 __version__ = "1.2.3"
    44 __revision__ = "$Revision: 1.5 $"
    45 __date__ = "$Date: 2006/06/23 12:32:25 $"
    46 __license__ = 'Python license'
    47 
    48 # standard library modules
    49 import sys
    50 import threading
    51 import Queue
    52 
    53 # exceptions
     37__all__ = ['makeRequests', 'NoResultsPending', 'NoWorkersAvailable',
     38  'ThreadPool', 'WorkRequest', 'WorkerThread']
     39
     40__author__ = 'Christopher Arndt'
     41__version__ = '1.1'
     42__date__ = '2005-07-19'
     43
     44import threading, Queue
     45
    5446class NoResultsPending(Exception):
    5547    """All work requests have been processed."""
    5648    pass
    57 
    5849class NoWorkersAvailable(Exception):
    5950    """No worker threads available to process remaining requests."""
    6051    pass
    6152
    62 # classes
    6353class WorkerThread(threading.Thread):
    6454    """Background thread connected to the requests/results queues.
     
    6858    """
    6959
    70     def __init__(self, requestsQueue, resultsQueue, **kwds):
    71         """Set up thread in daemonic mode and start it immediatedly.
     60    def __init__ (self, requestsQueue, resultsQueue, **kwds):
     61        """Set up thread in damonic mode and start it immediatedly.
    7262
    7363        requestsQueue and resultQueue are instances of Queue.Queue passed
    7464        by the ThreadPool class when it creates a new worker thread.
    7565        """
    76 
    7766        threading.Thread.__init__(self, **kwds)
    7867        self.setDaemon(1)
     
    8372
    8473    def run(self):
    85         """Repeatedly process the job queue until told to exit."""
     74        """Repeatedly process the job queue until told to exit.
     75        """
    8676
    8777        while not self._dismissed.isSet():
     
    8979            request = self.workRequestQueue.get()
    9080            if self._dismissed.isSet():
    91                 # if told to exit, return the work request we just picked up
     81                # return the work request we just picked up
    9282                self.workRequestQueue.put(request)
    9383                break # and exit
    94             try:
    95                 self.resultQueue.put(
    96                     (request, request.callable(*request.args, **request.kwds))
    97                 )
    98             except:
    99                 request.exception = True
    100                 self.resultQueue.put((request, sys.exc_info()))
     84            # XXX catch exceptions here and stick them to request object
     85            self.resultQueue.put(
     86                (request, request.callable(*request.args, **request.kwds))
     87            )
    10188
    10289    def dismiss(self):
     
    11198
    11299    See the module function makeRequests() for the common case
    113     where you want to build several WorkRequests for the same callable
    114     but with different arguments for each call.
    115     """
    116 
    117     def __init__(self, callable, args=None, kwds=None, requestID=None,
    118       callback=None, exc_callback=None):
    119         """Create a work request for a callable and attach callbacks.
    120 
    121         A work request consists of the a callable to be executed by a
     100    where you want to build several work requests for the same callable
     101    but different arguments for each call.
     102    """
     103
     104    def __init__ (self, callable, args=None, kwds=None, requestID=None,
     105      callback=None):
     106        """A work request consists of the a callable to be executed by a
    122107        worker thread, a list of positional arguments, a dictionary
    123108        of keyword arguments.
     
    125110        A callback function can be specified, that is called when the results
    126111        of the request are picked up from the result queue. It must accept
    127         two arguments, the request object and the results of the callable,
    128         in that order. If you want to pass additional information to the
    129         callback, just stick it on the request object.
    130 
    131         You can also give a callback for when an exception occurs. It should
    132         also accept two arguments, the work request and a tuple with the
    133         exception details as returned by sys.exc_info().
    134 
    135         requestID, if given, must be hashable since it is used by the
    136         ThreadPool object to store the results of that work request in a
    137         dictionary. It defaults to the return value of id(self).
    138         """
    139 
     112        two arguments, the request object and it's results in that order.
     113        If you want to pass additional information to the callback, just stick
     114        it on the request object.
     115
     116        requestID, if given, must be hashable as it is used by the ThreadPool
     117        class to store the results of that work request in a dictionary.
     118        It defaults to the return value of id(self).
     119        """
    140120        if requestID is None:
    141121            self.requestID = id(self)
    142122        else:
    143             try:
    144                 hash(requestID)
    145             except TypeError:
    146                 raise TypeError("requestID must be hashable.")
    147123            self.requestID = requestID
    148         self.exception = False
    149124        self.callback = callback
    150         self.exc_callback = exc_callback
    151125        self.callable = callable
    152126        self.args = args or []
     
    160134    """
    161135
    162     def __init__(self, num_workers, q_size=0):
     136    def __init__ (self, num_workers, q_size=0):
    163137        """Set up the thread pool and start num_workers worker threads.
    164138
    165139        num_workers is the number of worker threads to start initialy.
    166         If q_size > 0 the size of the work request queue is limited and
    167         the thread pool blocks when the queue is full and it tries to put
    168         more work requests in it (see putRequest method).
     140        If q_size > 0 the size of the work request is limited and the
     141        thread pool blocks when queue is full and it tries to put more
     142        work requests in it.
    169143        """
    170144
     
    183157
    184158    def dismissWorkers(self, num_workers):
    185         """Tell num_workers worker threads to quit after their current task.
    186         """
     159        """Tell num_workers worker threads to to quit when they're done."""
    187160
    188161        for i in range(min(num_workers, len(self.workers))):
     
    190163            worker.dismiss()
    191164
    192     def putRequest(self, request, block=True, timeout=0):
    193         """Put work request into work queue and save its id for later."""
    194 
    195         assert isinstance(request, WorkRequest)
    196         self.requestsQueue.put(item=request, block=block)#,timeout=timeout)
     165    def putRequest(self, request):
     166        """Put work request into work queue and save for later."""
     167        self.requestsQueue.put(request)
    197168        self.workRequests[request.requestID] = request
    198169
    199170    def poll(self, block=False):
    200171        """Process any new results in the queue."""
    201 
    202         while True:
    203             # still results pending?
    204             if not self.workRequests:
    205                 raise NoResultsPending
    206             # are there still workers to process remaining requests?
    207             elif block and not self.workers:
    208                 raise NoWorkersAvailable
     172        while 1:
    209173            try:
     174                # still results pending?
     175                if not self.workRequests:
     176                    raise NoResultsPending
     177                # are there still workers to process remaining requests?
     178                elif block and not self.workers:
     179                    raise NoWorkersAvailable
    210180                # get back next results
    211181                request, result = self.resultsQueue.get(block=block)
    212                 # has an exception occured?
    213                 if request.exception and request.exc_callback:
    214                     request.exc_callback(request, result)
    215                 # hand results to callback, if any
    216                 if request.callback and not \
    217                   (request.exception and request.exc_callback):
     182                # and hand them to the callback, if any
     183                if request.callback:
    218184                    request.callback(request, result)
    219185                del self.workRequests[request.requestID]
     
    223189    def wait(self):
    224190        """Wait for results, blocking until all have arrived."""
     191
    225192        while 1:
    226193            try:
    227194                self.poll(True)
    228195            except NoResultsPending:
    229                 break               
    230 # helper functions
    231 def makeRequests(callable, args_list, callback=None, exc_callback=None):
    232     """Create several work requests for same callable with different arguments.
    233    
    234     Convenience function for creating several work requests for the same
    235     callable where each invocation of the callable receives different values
    236     for its arguments.
     196                break
     197
     198def makeRequests(callable, args_list, callback=None):
     199    """Convenience function for building several work requests for the same
     200    callable with different arguments for each call.
    237201
    238202    args_list contains the parameters for each invocation of callable.
    239     Each item in 'args_list' should be either a 2-item tuple of the list of
     203    Each item in 'argslist' should be either a 2-item tuple of the list of
    240204    positional arguments and a dictionary of keyword arguments or a single,
    241205    non-tuple argument.
    242206
    243     See docstring for WorkRequest for info on callback and exc_callback.
     207    callback is called when the results arrive in the result queue.
    244208    """
    245209
    246210    requests = []
     211   
    247212    for item in args_list:
    248         if isinstance(item, tuple):
     213       
     214        if item is None:
     215            """no arguments"""
    249216            requests.append(
    250               WorkRequest(callable, item[0], item[1], callback=callback,
    251                 exc_callback=exc_callback)
    252             )
     217              WorkRequest(callable, None, None, callback=callback))
     218           
     219        elif item == isinstance(item, tuple):
     220            """arguments and keywords"""
     221            requests.append(
     222              WorkRequest(callable, item[0], item[1], callback=callback))
    253223        else:
     224            """only keywords"""
    254225            requests.append(
    255               WorkRequest(callable, [item], None, callback=callback,
    256                 exc_callback=exc_callback)
    257             )
     226              WorkRequest(callable, [item], None, callback=callback))
    258227    return requests
    259228
    260229
    261  #------------------------------ EoF ------------------------------#
     230if __name__ == '__main__':
     231    import random
     232    import time
     233
     234    # the work the threads will have to do (rather trivial in our example)
     235    def do_something(data):
     236        time.sleep(random.randint(1,5))
     237        return round(random.random() * data, 5)
     238
     239    # this will be called each time a result is available
     240    def print_result(request, result):
     241        print "Result: %s from request #%s" % (result, request.requestID)
     242
     243    # assemble the arguments for each job to a list...
     244    data = [random.randint(1,10) for i in range(20)]
     245    # ... and build a WorkRequest object for each item in data
     246    requests = makeRequests(do_something, data, print_result)
     247
     248    # we create a pool of 10 worker threads
     249    main = ThreadPool(3)
     250
     251    # then we put the work requests in the queue...
     252    for req in requests:
     253        main.putRequest(req)
     254        print "Work request #%s added." % req.requestID
     255    # or shorter:
     256    # [main.putRequest(req) for req in requests]
     257
     258    # ...and wait for the results to arrive in the result queue
     259    # wait() will return when results for all work requests have arrived
     260    # main.wait()
     261
     262    # alternatively poll for results while doing something else:
     263    i = 0
     264    while 1:
     265        try:
     266            main.poll()
     267            print "Main thread working..."
     268            time.sleep(0.5)
     269            if i == 10:
     270                print "Adding 3 more worker threads..."
     271                main.createWorkers(3)
     272            i += 1
     273        except (KeyboardInterrupt, NoResultsPending):
     274            break
Note: See TracChangeset for help on using the changeset viewer.