source: tbroadcast/v2.0.6/python/threadpool.py@ 577

Last change on this file since 577 was 508, checked in by garonne, 16 years ago

Commit changes

File size: 9.4 KB
Line 
1"""
2
3@author: Christopher Arndt
4@version: 1.1
5
6@see: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/435883
7
8Easy to use object-oriented thread pool framework.
9
10A thread pool is a class that maintains a pool of worker threads to perform
11time consuming operations in parallel. It assigns jobs to the threads
12by putting them in a work request queue, where they are picked up by the
13next available thread. This then performs the requested operation in the
14background and puts the results in a another queue.
15
16The thread pool class can then collect the results from all threads from
17this queue as soon as they become available or after all threads have
18finished their work. It's also possible, to define callbacks to handle
19each result as it comes in.
20
21The basic concept and some code was taken from the book "Python in a Nutshell"
22by Alex Martelli, copyright 2003, ISBN 0-596-00188-6, from section 14.5
23"Threaded Program Architecture". I wrapped the main program logic in the
24ThreadPool class, added the WorkRequest class and the callback system and
25tweaked the code here and there.
26
27Basic usage:
28
29>>> main = ThreadPool(poolsize)
30>>> requests = makeRequests(some_callable, list_of_args, callback)
31>>> [main.putRequests(req) for req in requests]
32>>> main.wait()
33
34See the end of the module code for a brief, annotated usage example.
35"""
36
37__all__ = ['makeRequests', 'NoResultsPending', 'NoWorkersAvailable',
38 'ThreadPool', 'WorkRequest', 'WorkerThread']
39
40__author__ = 'Christopher Arndt'
41__version__ = '1.1'
42__date__ = '2005-07-19'
43
44import threading, Queue
45
46class NoResultsPending(Exception):
47 """All work requests have been processed."""
48 pass
49class NoWorkersAvailable(Exception):
50 """No worker threads available to process remaining requests."""
51 pass
52
53class WorkerThread(threading.Thread):
54 """Background thread connected to the requests/results queues.
55
56 A worker thread sits in the background and picks up work requests from
57 one queue and puts the results in another until it is dismissed.
58 """
59
60 def __init__ (self, requestsQueue, resultsQueue, **kwds):
61 """Set up thread in damonic mode and start it immediatedly.
62
63 requestsQueue and resultQueue are instances of Queue.Queue passed
64 by the ThreadPool class when it creates a new worker thread.
65 """
66 threading.Thread.__init__(self, **kwds)
67 self.setDaemon(1)
68 self.workRequestQueue = requestsQueue
69 self.resultQueue = resultsQueue
70 self._dismissed = threading.Event()
71 self.start()
72
73 def run(self):
74 """Repeatedly process the job queue until told to exit.
75 """
76
77 while not self._dismissed.isSet():
78 # thread blocks here, if queue empty
79 request = self.workRequestQueue.get()
80 if self._dismissed.isSet():
81 # return the work request we just picked up
82 self.workRequestQueue.put(request)
83 break # and exit
84 # XXX catch exceptions here and stick them to request object
85 self.resultQueue.put(
86 (request, request.callable(*request.args, **request.kwds))
87 )
88
89 def dismiss(self):
90 """Sets a flag to tell the thread to exit when done with current job.
91 """
92
93 self._dismissed.set()
94
95
96class WorkRequest:
97 """A request to execute a callable for putting in the request queue later.
98
99 See the module function makeRequests() for the common case
100 where you want to build several work requests for the same callable
101 but different arguments for each call.
102 """
103
104 def __init__ (self, callable, args=None, kwds=None, requestID=None,
105 callback=None):
106 """A work request consists of the a callable to be executed by a
107 worker thread, a list of positional arguments, a dictionary
108 of keyword arguments.
109
110 A callback function can be specified, that is called when the results
111 of the request are picked up from the result queue. It must accept
112 two arguments, the request object and it's results in that order.
113 If you want to pass additional information to the callback, just stick
114 it on the request object.
115
116 requestID, if given, must be hashable as it is used by the ThreadPool
117 class to store the results of that work request in a dictionary.
118 It defaults to the return value of id(self).
119 """
120 if requestID is None:
121 self.requestID = id(self)
122 else:
123 self.requestID = requestID
124 self.callback = callback
125 self.callable = callable
126 self.args = args or []
127 self.kwds = kwds or {}
128
129
130class ThreadPool:
131 """A thread pool, distributing work requests and collecting results.
132
133 See the module doctring for more information.
134 """
135
136 def __init__ (self, num_workers, q_size=0):
137 """Set up the thread pool and start num_workers worker threads.
138
139 num_workers is the number of worker threads to start initialy.
140 If q_size > 0 the size of the work request is limited and the
141 thread pool blocks when queue is full and it tries to put more
142 work requests in it.
143 """
144
145 self.requestsQueue = Queue.Queue(q_size)
146 self.resultsQueue = Queue.Queue()
147 self.workers = []
148 self.workRequests = {}
149 self.createWorkers(num_workers)
150
151 def createWorkers(self, num_workers):
152 """Add num_workers worker threads to the pool."""
153
154 for i in range(num_workers):
155 self.workers.append(WorkerThread(self.requestsQueue,
156 self.resultsQueue))
157
158 def dismissWorkers(self, num_workers):
159 """Tell num_workers worker threads to to quit when they're done."""
160
161 for i in range(min(num_workers, len(self.workers))):
162 worker = self.workers.pop()
163 worker.dismiss()
164
165 def putRequest(self, request):
166 """Put work request into work queue and save for later."""
167 self.requestsQueue.put(request)
168 self.workRequests[request.requestID] = request
169
170 def poll(self, block=False):
171 """Process any new results in the queue."""
172 while 1:
173 try:
174 # still results pending?
175 if not self.workRequests:
176 raise NoResultsPending
177 # are there still workers to process remaining requests?
178 elif block and not self.workers:
179 raise NoWorkersAvailable
180 # get back next results
181 request, result = self.resultsQueue.get(block=block)
182 # and hand them to the callback, if any
183 if request.callback:
184 request.callback(request, result)
185 del self.workRequests[request.requestID]
186 except Queue.Empty:
187 break
188
189 def wait(self):
190 """Wait for results, blocking until all have arrived."""
191
192 while 1:
193 try:
194 self.poll(True)
195 except NoResultsPending:
196 break
197
198def makeRequests(callable, args_list, callback=None):
199 """Convenience function for building several work requests for the same
200 callable with different arguments for each call.
201
202 args_list contains the parameters for each invocation of callable.
203 Each item in 'argslist' should be either a 2-item tuple of the list of
204 positional arguments and a dictionary of keyword arguments or a single,
205 non-tuple argument.
206
207 callback is called when the results arrive in the result queue.
208 """
209
210 requests = []
211
212 for item in args_list:
213
214 if item is None:
215 """no arguments"""
216 requests.append(
217 WorkRequest(callable, None, None, callback=callback))
218
219 elif item == isinstance(item, tuple):
220 """arguments and keywords"""
221 requests.append(
222 WorkRequest(callable, item[0], item[1], callback=callback))
223 else:
224 """only keywords"""
225 requests.append(
226 WorkRequest(callable, [item], None, callback=callback))
227 return requests
228
229
230if __name__ == '__main__':
231 import random
232 import time
233
234 # the work the threads will have to do (rather trivial in our example)
235 def do_something(data):
236 time.sleep(random.randint(1,5))
237 return round(random.random() * data, 5)
238
239 # this will be called each time a result is available
240 def print_result(request, result):
241 print "Result: %s from request #%s" % (result, request.requestID)
242
243 # assemble the arguments for each job to a list...
244 data = [random.randint(1,10) for i in range(20)]
245 # ... and build a WorkRequest object for each item in data
246 requests = makeRequests(do_something, data, print_result)
247
248 # we create a pool of 10 worker threads
249 main = ThreadPool(3)
250
251 # then we put the work requests in the queue...
252 for req in requests:
253 main.putRequest(req)
254 print "Work request #%s added." % req.requestID
255 # or shorter:
256 # [main.putRequest(req) for req in requests]
257
258 # ...and wait for the results to arrive in the result queue
259 # wait() will return when results for all work requests have arrived
260 # main.wait()
261
262 # alternatively poll for results while doing something else:
263 i = 0
264 while 1:
265 try:
266 main.poll()
267 print "Main thread working..."
268 time.sleep(0.5)
269 if i == 10:
270 print "Adding 3 more worker threads..."
271 main.createWorkers(3)
272 i += 1
273 except (KeyboardInterrupt, NoResultsPending):
274 break
Note: See TracBrowser for help on using the repository browser.