source: tbroadcast/v2/python/threadpool.py @ 236

Last change on this file since 236 was 236, checked in by garonne, 18 years ago

MàJ

File size: 9.2 KB
Line 
1"""Easy to use object-oriented thread pool framework.
2
3A thread pool is an object that maintains a pool of worker threads to perform
4time consuming operations in parallel. It assigns jobs to the threads
5by putting them in a work request queue, where they are picked up by the
6next available thread. This then performs the requested operation in the
7background and puts the results in a another queue.
8
9The thread pool object can then collect the results from all threads from
10this queue as soon as they become available or after all threads have
11finished their work. It's also possible, to define callbacks to handle
12each result as it comes in.
13
14The basic concept and some code was taken from the book "Python in a Nutshell"
15by Alex Martelli, copyright 2003, ISBN 0-596-00188-6, from section 14.5
16"Threaded Program Architecture". I wrapped the main program logic in the
17ThreadPool class, added the WorkRequest class and the callback system and
18tweaked the code here and there. Kudos also to Florent Aide for the exception
19handling mechanism.
20
21Basic usage:
22
23>>> pool = TreadPool(poolsize)
24>>> requests = makeRequests(some_callable, list_of_args, callback)
25>>> [pool.putRequest(req) for req in requests]
26>>> pool.wait()
27
28See the end of the module code for a brief, annotated usage example.
29
30Website : http://chrisarndt.de/en/software/python/threadpool/
31"""
32
33__all__ = [
34  'makeRequests',
35  'NoResultsPending',
36  'NoWorkersAvailable',
37  'ThreadPool',
38  'WorkRequest',
39  'WorkerThread'
40]
41
42__author__ = "Christopher Arndt"
43__version__ = "1.2.3"
44__revision__ = "$Revision: 1.5 $"
45__date__ = "$Date: 2006/06/23 12:32:25 $"
46__license__ = 'Python license'
47
48# standard library modules
49import sys
50import threading
51import Queue
52
53# exceptions
54class NoResultsPending(Exception):
55    """All work requests have been processed."""
56    pass
57
58class NoWorkersAvailable(Exception):
59    """No worker threads available to process remaining requests."""
60    pass
61
62# classes
63class WorkerThread(threading.Thread):
64    """Background thread connected to the requests/results queues.
65
66    A worker thread sits in the background and picks up work requests from
67    one queue and puts the results in another until it is dismissed.
68    """
69
70    def __init__(self, requestsQueue, resultsQueue, **kwds):
71        """Set up thread in daemonic mode and start it immediatedly.
72
73        requestsQueue and resultQueue are instances of Queue.Queue passed
74        by the ThreadPool class when it creates a new worker thread.
75        """
76
77        threading.Thread.__init__(self, **kwds)
78        self.setDaemon(1)
79        self.workRequestQueue = requestsQueue
80        self.resultQueue = resultsQueue
81        self._dismissed = threading.Event()
82        self.start()
83
84    def run(self):
85        """Repeatedly process the job queue until told to exit."""
86
87        while not self._dismissed.isSet():
88            # thread blocks here, if queue empty
89            request = self.workRequestQueue.get()
90            if self._dismissed.isSet():
91                # if told to exit, return the work request we just picked up
92                self.workRequestQueue.put(request)
93                break # and exit
94            try:
95                self.resultQueue.put(
96                    (request, request.callable(*request.args, **request.kwds))
97                )
98            except:
99                request.exception = True
100                self.resultQueue.put((request, sys.exc_info()))
101
102    def dismiss(self):
103        """Sets a flag to tell the thread to exit when done with current job.
104        """
105
106        self._dismissed.set()
107
108
109class WorkRequest:
110    """A request to execute a callable for putting in the request queue later.
111
112    See the module function makeRequests() for the common case
113    where you want to build several WorkRequests for the same callable
114    but with different arguments for each call.
115    """
116
117    def __init__(self, callable, args=None, kwds=None, requestID=None,
118      callback=None, exc_callback=None):
119        """Create a work request for a callable and attach callbacks.
120
121        A work request consists of the a callable to be executed by a
122        worker thread, a list of positional arguments, a dictionary
123        of keyword arguments.
124
125        A callback function can be specified, that is called when the results
126        of the request are picked up from the result queue. It must accept
127        two arguments, the request object and the results of the callable,
128        in that order. If you want to pass additional information to the
129        callback, just stick it on the request object.
130
131        You can also give a callback for when an exception occurs. It should
132        also accept two arguments, the work request and a tuple with the
133        exception details as returned by sys.exc_info().
134
135        requestID, if given, must be hashable since it is used by the
136        ThreadPool object to store the results of that work request in a
137        dictionary. It defaults to the return value of id(self).
138        """
139
140        if requestID is None:
141            self.requestID = id(self)
142        else:
143            try:
144                hash(requestID)
145            except TypeError:
146                raise TypeError("requestID must be hashable.")
147            self.requestID = requestID
148        self.exception = False
149        self.callback = callback
150        self.exc_callback = exc_callback
151        self.callable = callable
152        self.args = args or []
153        self.kwds = kwds or {}
154
155
156class ThreadPool:
157    """A thread pool, distributing work requests and collecting results.
158
159    See the module doctring for more information.
160    """
161
162    def __init__(self, num_workers, q_size=0):
163        """Set up the thread pool and start num_workers worker threads.
164
165        num_workers is the number of worker threads to start initialy.
166        If q_size > 0 the size of the work request queue is limited and
167        the thread pool blocks when the queue is full and it tries to put
168        more work requests in it (see putRequest method).
169        """
170
171        self.requestsQueue = Queue.Queue(q_size)
172        self.resultsQueue = Queue.Queue()
173        self.workers = []
174        self.workRequests = {}
175        self.createWorkers(num_workers)
176
177    def createWorkers(self, num_workers):
178        """Add num_workers worker threads to the pool."""
179
180        for i in range(num_workers):
181            self.workers.append(WorkerThread(self.requestsQueue,
182              self.resultsQueue))
183
184    def dismissWorkers(self, num_workers):
185        """Tell num_workers worker threads to quit after their current task.
186        """
187
188        for i in range(min(num_workers, len(self.workers))):
189            worker = self.workers.pop()
190            worker.dismiss()
191
192    def putRequest(self, request, block=True, timeout=0):
193        """Put work request into work queue and save its id for later."""
194
195        assert isinstance(request, WorkRequest)
196        self.requestsQueue.put(item=request, block=block)#,timeout=timeout)
197        self.workRequests[request.requestID] = request
198
199    def poll(self, block=False):
200        """Process any new results in the queue."""
201
202        while True:
203            # still results pending?
204            if not self.workRequests:
205                raise NoResultsPending
206            # are there still workers to process remaining requests?
207            elif block and not self.workers:
208                raise NoWorkersAvailable
209            try:
210                # get back next results
211                request, result = self.resultsQueue.get(block=block)
212                # has an exception occured?
213                if request.exception and request.exc_callback:
214                    request.exc_callback(request, result)
215                # hand results to callback, if any
216                if request.callback and not \
217                  (request.exception and request.exc_callback):
218                    request.callback(request, result)
219                del self.workRequests[request.requestID]
220            except Queue.Empty:
221                break
222
223    def wait(self):
224        """Wait for results, blocking until all have arrived."""
225        while 1:
226            try:
227                self.poll(True)
228            except NoResultsPending:
229                break               
230# helper functions
231def makeRequests(callable, args_list, callback=None, exc_callback=None):
232    """Create several work requests for same callable with different arguments.
233   
234    Convenience function for creating several work requests for the same
235    callable where each invocation of the callable receives different values
236    for its arguments.
237
238    args_list contains the parameters for each invocation of callable.
239    Each item in 'args_list' should be either a 2-item tuple of the list of
240    positional arguments and a dictionary of keyword arguments or a single,
241    non-tuple argument.
242
243    See docstring for WorkRequest for info on callback and exc_callback.
244    """
245
246    requests = []
247    for item in args_list:
248        if isinstance(item, tuple):
249            requests.append(
250              WorkRequest(callable, item[0], item[1], callback=callback,
251                exc_callback=exc_callback)
252            )
253        else:
254            requests.append(
255              WorkRequest(callable, [item], None, callback=callback,
256                exc_callback=exc_callback)
257            )
258    return requests
259
260
261 #------------------------------ EoF ------------------------------#
Note: See TracBrowser for help on using the repository browser.