source: tbroadcast/HEAD/python/threadpool.py @ 508

Last change on this file since 508 was 508, checked in by garonne, 15 years ago

Commit changes

File size: 9.4 KB
Line 
1"""
2
3@author: Christopher Arndt
4@version: 1.1
5
6@see: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/435883
7
8Easy to use object-oriented thread pool framework.
9
10A thread pool is a class that maintains a pool of worker threads to perform
11time consuming operations in parallel. It assigns jobs to the threads
12by putting them in a work request queue, where they are picked up by the
13next available thread. This then performs the requested operation in the
14background and puts the results in a another queue.
15
16The thread pool class can then collect the results from all threads from
17this queue as soon as they become available or after all threads have
18finished their work. It's also possible, to define callbacks to handle
19each result as it comes in.
20
21The basic concept and some code was taken from the book "Python in a Nutshell"
22by Alex Martelli, copyright 2003, ISBN 0-596-00188-6, from section 14.5
23"Threaded Program Architecture". I wrapped the main program logic in the
24ThreadPool class, added the WorkRequest class and the callback system and
25tweaked the code here and there.
26
27Basic usage:
28
29>>> main = ThreadPool(poolsize)
30>>> requests = makeRequests(some_callable, list_of_args, callback)
31>>> [main.putRequests(req) for req in requests]
32>>> main.wait()
33
34See the end of the module code for a brief, annotated usage example.
35"""
36
37__all__ = ['makeRequests', 'NoResultsPending', 'NoWorkersAvailable',
38  'ThreadPool', 'WorkRequest', 'WorkerThread']
39
40__author__ = 'Christopher Arndt'
41__version__ = '1.1'
42__date__ = '2005-07-19'
43
44import threading, Queue
45
46class NoResultsPending(Exception):
47    """All work requests have been processed."""
48    pass
49class NoWorkersAvailable(Exception):
50    """No worker threads available to process remaining requests."""
51    pass
52
53class WorkerThread(threading.Thread):
54    """Background thread connected to the requests/results queues.
55
56    A worker thread sits in the background and picks up work requests from
57    one queue and puts the results in another until it is dismissed.
58    """
59
60    def __init__ (self, requestsQueue, resultsQueue, **kwds):
61        """Set up thread in damonic mode and start it immediatedly.
62
63        requestsQueue and resultQueue are instances of Queue.Queue passed
64        by the ThreadPool class when it creates a new worker thread.
65        """
66        threading.Thread.__init__(self, **kwds)
67        self.setDaemon(1)
68        self.workRequestQueue = requestsQueue
69        self.resultQueue = resultsQueue
70        self._dismissed = threading.Event()
71        self.start()
72
73    def run(self):
74        """Repeatedly process the job queue until told to exit.
75        """
76
77        while not self._dismissed.isSet():
78            # thread blocks here, if queue empty
79            request = self.workRequestQueue.get()
80            if self._dismissed.isSet():
81                # return the work request we just picked up
82                self.workRequestQueue.put(request)
83                break # and exit
84            # XXX catch exceptions here and stick them to request object
85            self.resultQueue.put(
86                (request, request.callable(*request.args, **request.kwds))
87            )
88
89    def dismiss(self):
90        """Sets a flag to tell the thread to exit when done with current job.
91        """
92
93        self._dismissed.set()
94
95
96class WorkRequest:
97    """A request to execute a callable for putting in the request queue later.
98
99    See the module function makeRequests() for the common case
100    where you want to build several work requests for the same callable
101    but different arguments for each call.
102    """
103
104    def __init__ (self, callable, args=None, kwds=None, requestID=None,
105      callback=None):
106        """A work request consists of the a callable to be executed by a
107        worker thread, a list of positional arguments, a dictionary
108        of keyword arguments.
109
110        A callback function can be specified, that is called when the results
111        of the request are picked up from the result queue. It must accept
112        two arguments, the request object and it's results in that order.
113        If you want to pass additional information to the callback, just stick
114        it on the request object.
115
116        requestID, if given, must be hashable as it is used by the ThreadPool
117        class to store the results of that work request in a dictionary.
118        It defaults to the return value of id(self).
119        """
120        if requestID is None:
121            self.requestID = id(self)
122        else:
123            self.requestID = requestID
124        self.callback = callback
125        self.callable = callable
126        self.args = args or []
127        self.kwds = kwds or {}
128
129
130class ThreadPool:
131    """A thread pool, distributing work requests and collecting results.
132
133    See the module doctring for more information.
134    """
135
136    def __init__ (self, num_workers, q_size=0):
137        """Set up the thread pool and start num_workers worker threads.
138
139        num_workers is the number of worker threads to start initialy.
140        If q_size > 0 the size of the work request is limited and the
141        thread pool blocks when queue is full and it tries to put more
142        work requests in it.
143        """
144
145        self.requestsQueue = Queue.Queue(q_size)
146        self.resultsQueue = Queue.Queue()
147        self.workers = []
148        self.workRequests = {}
149        self.createWorkers(num_workers)
150
151    def createWorkers(self, num_workers):
152        """Add num_workers worker threads to the pool."""
153
154        for i in range(num_workers):
155            self.workers.append(WorkerThread(self.requestsQueue,
156              self.resultsQueue))
157
158    def dismissWorkers(self, num_workers):
159        """Tell num_workers worker threads to to quit when they're done."""
160
161        for i in range(min(num_workers, len(self.workers))):
162            worker = self.workers.pop()
163            worker.dismiss()
164
165    def putRequest(self, request):
166        """Put work request into work queue and save for later."""
167        self.requestsQueue.put(request)
168        self.workRequests[request.requestID] = request
169
170    def poll(self, block=False):
171        """Process any new results in the queue."""
172        while 1:
173            try:
174                # still results pending?
175                if not self.workRequests:
176                    raise NoResultsPending
177                # are there still workers to process remaining requests?
178                elif block and not self.workers:
179                    raise NoWorkersAvailable
180                # get back next results
181                request, result = self.resultsQueue.get(block=block)
182                # and hand them to the callback, if any
183                if request.callback:
184                    request.callback(request, result)
185                del self.workRequests[request.requestID]
186            except Queue.Empty:
187                break
188
189    def wait(self):
190        """Wait for results, blocking until all have arrived."""
191
192        while 1:
193            try:
194                self.poll(True)
195            except NoResultsPending:
196                break
197
198def makeRequests(callable, args_list, callback=None):
199    """Convenience function for building several work requests for the same
200    callable with different arguments for each call.
201
202    args_list contains the parameters for each invocation of callable.
203    Each item in 'argslist' should be either a 2-item tuple of the list of
204    positional arguments and a dictionary of keyword arguments or a single,
205    non-tuple argument.
206
207    callback is called when the results arrive in the result queue.
208    """
209
210    requests = []
211   
212    for item in args_list:
213       
214        if item is None:
215            """no arguments"""
216            requests.append(
217              WorkRequest(callable, None, None, callback=callback))
218           
219        elif item == isinstance(item, tuple):
220            """arguments and keywords"""
221            requests.append(
222              WorkRequest(callable, item[0], item[1], callback=callback))
223        else:
224            """only keywords"""
225            requests.append(
226              WorkRequest(callable, [item], None, callback=callback))
227    return requests
228
229
230if __name__ == '__main__':
231    import random
232    import time
233
234    # the work the threads will have to do (rather trivial in our example)
235    def do_something(data):
236        time.sleep(random.randint(1,5))
237        return round(random.random() * data, 5)
238
239    # this will be called each time a result is available
240    def print_result(request, result):
241        print "Result: %s from request #%s" % (result, request.requestID)
242
243    # assemble the arguments for each job to a list...
244    data = [random.randint(1,10) for i in range(20)]
245    # ... and build a WorkRequest object for each item in data
246    requests = makeRequests(do_something, data, print_result)
247
248    # we create a pool of 10 worker threads
249    main = ThreadPool(3)
250
251    # then we put the work requests in the queue...
252    for req in requests:
253        main.putRequest(req)
254        print "Work request #%s added." % req.requestID
255    # or shorter:
256    # [main.putRequest(req) for req in requests]
257
258    # ...and wait for the results to arrive in the result queue
259    # wait() will return when results for all work requests have arrived
260    # main.wait()
261
262    # alternatively poll for results while doing something else:
263    i = 0
264    while 1:
265        try:
266            main.poll()
267            print "Main thread working..."
268            time.sleep(0.5)
269            if i == 10:
270                print "Adding 3 more worker threads..."
271                main.createWorkers(3)
272            i += 1
273        except (KeyboardInterrupt, NoResultsPending):
274            break
Note: See TracBrowser for help on using the repository browser.