source: tbroadcast/v2/python/threadpool.py@ 327

Last change on this file since 327 was 236, checked in by garonne, 19 years ago

MàJ

File size: 9.2 KB
Line 
1"""Easy to use object-oriented thread pool framework.
2
3A thread pool is an object that maintains a pool of worker threads to perform
4time consuming operations in parallel. It assigns jobs to the threads
5by putting them in a work request queue, where they are picked up by the
6next available thread. This then performs the requested operation in the
7background and puts the results in a another queue.
8
9The thread pool object can then collect the results from all threads from
10this queue as soon as they become available or after all threads have
11finished their work. It's also possible, to define callbacks to handle
12each result as it comes in.
13
14The basic concept and some code was taken from the book "Python in a Nutshell"
15by Alex Martelli, copyright 2003, ISBN 0-596-00188-6, from section 14.5
16"Threaded Program Architecture". I wrapped the main program logic in the
17ThreadPool class, added the WorkRequest class and the callback system and
18tweaked the code here and there. Kudos also to Florent Aide for the exception
19handling mechanism.
20
21Basic usage:
22
23>>> pool = TreadPool(poolsize)
24>>> requests = makeRequests(some_callable, list_of_args, callback)
25>>> [pool.putRequest(req) for req in requests]
26>>> pool.wait()
27
28See the end of the module code for a brief, annotated usage example.
29
30Website : http://chrisarndt.de/en/software/python/threadpool/
31"""
32
33__all__ = [
34 'makeRequests',
35 'NoResultsPending',
36 'NoWorkersAvailable',
37 'ThreadPool',
38 'WorkRequest',
39 'WorkerThread'
40]
41
42__author__ = "Christopher Arndt"
43__version__ = "1.2.3"
44__revision__ = "$Revision: 1.5 $"
45__date__ = "$Date: 2006/06/23 12:32:25 $"
46__license__ = 'Python license'
47
48# standard library modules
49import sys
50import threading
51import Queue
52
53# exceptions
54class NoResultsPending(Exception):
55 """All work requests have been processed."""
56 pass
57
58class NoWorkersAvailable(Exception):
59 """No worker threads available to process remaining requests."""
60 pass
61
62# classes
63class WorkerThread(threading.Thread):
64 """Background thread connected to the requests/results queues.
65
66 A worker thread sits in the background and picks up work requests from
67 one queue and puts the results in another until it is dismissed.
68 """
69
70 def __init__(self, requestsQueue, resultsQueue, **kwds):
71 """Set up thread in daemonic mode and start it immediatedly.
72
73 requestsQueue and resultQueue are instances of Queue.Queue passed
74 by the ThreadPool class when it creates a new worker thread.
75 """
76
77 threading.Thread.__init__(self, **kwds)
78 self.setDaemon(1)
79 self.workRequestQueue = requestsQueue
80 self.resultQueue = resultsQueue
81 self._dismissed = threading.Event()
82 self.start()
83
84 def run(self):
85 """Repeatedly process the job queue until told to exit."""
86
87 while not self._dismissed.isSet():
88 # thread blocks here, if queue empty
89 request = self.workRequestQueue.get()
90 if self._dismissed.isSet():
91 # if told to exit, return the work request we just picked up
92 self.workRequestQueue.put(request)
93 break # and exit
94 try:
95 self.resultQueue.put(
96 (request, request.callable(*request.args, **request.kwds))
97 )
98 except:
99 request.exception = True
100 self.resultQueue.put((request, sys.exc_info()))
101
102 def dismiss(self):
103 """Sets a flag to tell the thread to exit when done with current job.
104 """
105
106 self._dismissed.set()
107
108
109class WorkRequest:
110 """A request to execute a callable for putting in the request queue later.
111
112 See the module function makeRequests() for the common case
113 where you want to build several WorkRequests for the same callable
114 but with different arguments for each call.
115 """
116
117 def __init__(self, callable, args=None, kwds=None, requestID=None,
118 callback=None, exc_callback=None):
119 """Create a work request for a callable and attach callbacks.
120
121 A work request consists of the a callable to be executed by a
122 worker thread, a list of positional arguments, a dictionary
123 of keyword arguments.
124
125 A callback function can be specified, that is called when the results
126 of the request are picked up from the result queue. It must accept
127 two arguments, the request object and the results of the callable,
128 in that order. If you want to pass additional information to the
129 callback, just stick it on the request object.
130
131 You can also give a callback for when an exception occurs. It should
132 also accept two arguments, the work request and a tuple with the
133 exception details as returned by sys.exc_info().
134
135 requestID, if given, must be hashable since it is used by the
136 ThreadPool object to store the results of that work request in a
137 dictionary. It defaults to the return value of id(self).
138 """
139
140 if requestID is None:
141 self.requestID = id(self)
142 else:
143 try:
144 hash(requestID)
145 except TypeError:
146 raise TypeError("requestID must be hashable.")
147 self.requestID = requestID
148 self.exception = False
149 self.callback = callback
150 self.exc_callback = exc_callback
151 self.callable = callable
152 self.args = args or []
153 self.kwds = kwds or {}
154
155
156class ThreadPool:
157 """A thread pool, distributing work requests and collecting results.
158
159 See the module doctring for more information.
160 """
161
162 def __init__(self, num_workers, q_size=0):
163 """Set up the thread pool and start num_workers worker threads.
164
165 num_workers is the number of worker threads to start initialy.
166 If q_size > 0 the size of the work request queue is limited and
167 the thread pool blocks when the queue is full and it tries to put
168 more work requests in it (see putRequest method).
169 """
170
171 self.requestsQueue = Queue.Queue(q_size)
172 self.resultsQueue = Queue.Queue()
173 self.workers = []
174 self.workRequests = {}
175 self.createWorkers(num_workers)
176
177 def createWorkers(self, num_workers):
178 """Add num_workers worker threads to the pool."""
179
180 for i in range(num_workers):
181 self.workers.append(WorkerThread(self.requestsQueue,
182 self.resultsQueue))
183
184 def dismissWorkers(self, num_workers):
185 """Tell num_workers worker threads to quit after their current task.
186 """
187
188 for i in range(min(num_workers, len(self.workers))):
189 worker = self.workers.pop()
190 worker.dismiss()
191
192 def putRequest(self, request, block=True, timeout=0):
193 """Put work request into work queue and save its id for later."""
194
195 assert isinstance(request, WorkRequest)
196 self.requestsQueue.put(item=request, block=block)#,timeout=timeout)
197 self.workRequests[request.requestID] = request
198
199 def poll(self, block=False):
200 """Process any new results in the queue."""
201
202 while True:
203 # still results pending?
204 if not self.workRequests:
205 raise NoResultsPending
206 # are there still workers to process remaining requests?
207 elif block and not self.workers:
208 raise NoWorkersAvailable
209 try:
210 # get back next results
211 request, result = self.resultsQueue.get(block=block)
212 # has an exception occured?
213 if request.exception and request.exc_callback:
214 request.exc_callback(request, result)
215 # hand results to callback, if any
216 if request.callback and not \
217 (request.exception and request.exc_callback):
218 request.callback(request, result)
219 del self.workRequests[request.requestID]
220 except Queue.Empty:
221 break
222
223 def wait(self):
224 """Wait for results, blocking until all have arrived."""
225 while 1:
226 try:
227 self.poll(True)
228 except NoResultsPending:
229 break
230# helper functions
231def makeRequests(callable, args_list, callback=None, exc_callback=None):
232 """Create several work requests for same callable with different arguments.
233
234 Convenience function for creating several work requests for the same
235 callable where each invocation of the callable receives different values
236 for its arguments.
237
238 args_list contains the parameters for each invocation of callable.
239 Each item in 'args_list' should be either a 2-item tuple of the list of
240 positional arguments and a dictionary of keyword arguments or a single,
241 non-tuple argument.
242
243 See docstring for WorkRequest for info on callback and exc_callback.
244 """
245
246 requests = []
247 for item in args_list:
248 if isinstance(item, tuple):
249 requests.append(
250 WorkRequest(callable, item[0], item[1], callback=callback,
251 exc_callback=exc_callback)
252 )
253 else:
254 requests.append(
255 WorkRequest(callable, [item], None, callback=callback,
256 exc_callback=exc_callback)
257 )
258 return requests
259
260
261 #------------------------------ EoF ------------------------------#
Note: See TracBrowser for help on using the repository browser.