source: tbroadcast/HEAD/python/threadpool.py @ 517

Last change on this file since 517 was 517, checked in by rybkin, 15 years ago

Version v2.0.6_rc4 from Igor Kachaev

File size: 15.3 KB
Line 
1# -*- coding: UTF-8 -*-
2"""Easy to use object-oriented thread pool framework.
3
4A thread pool is an object that maintains a pool of worker threads to perform
5time consuming operations in parallel. It assigns jobs to the threads
6by putting them in a work request queue, where they are picked up by the
7next available thread. This then performs the requested operation in the
8background and puts the results in another queue.
9
10The thread pool object can then collect the results from all threads from
11this queue as soon as they become available or after all threads have
12finished their work. It's also possible, to define callbacks to handle
13each result as it comes in.
14
15The basic concept and some code was taken from the book "Python in a Nutshell,
162nd edition" by Alex Martelli, O'Reilly 2006, ISBN 0-596-10046-9, from section
1714.5 "Threaded Program Architecture". I wrapped the main program logic in the
18ThreadPool class, added the WorkRequest class and the callback system and
19tweaked the code here and there. Kudos also to Florent Aide for the exception
20handling mechanism.
21
22Basic usage::
23
24    >>> pool = ThreadPool(poolsize)
25    >>> requests = makeRequests(some_callable, list_of_args, callback)
26    >>> [pool.putRequest(req) for req in requests]
27    >>> pool.wait()
28
29See the end of the module code for a brief, annotated usage example.
30
31Website : http://chrisarndt.de/projects/threadpool/
32
33"""
34__docformat__ = "restructuredtext en"
35
36__all__ = [
37    'makeRequests',
38    'NoResultsPending',
39    'NoWorkersAvailable',
40    'ThreadPool',
41    'WorkRequest',
42    'WorkerThread'
43]
44
45__author__ = "Christopher Arndt"
46__version__ = "1.2.5"
47__revision__ = "$Revision: 354 $"
48__date__ = "$Date: 2008-11-19 18:34:46 +0100 (Wed, 19 Nov 2008) $"
49__license__ = 'MIT license'
50
51
52# standard library modules
53import sys
54import threading
55import Queue
56import traceback
57
58
59# exceptions
60class NoResultsPending(Exception):
61    """All work requests have been processed."""
62    pass
63
64class NoWorkersAvailable(Exception):
65    """No worker threads available to process remaining requests."""
66    pass
67
68
69# internal module helper functions
70def _handle_thread_exception(request, exc_info):
71    """Default exception handler callback function.
72
73    This just prints the exception info via ``traceback.print_exception``.
74
75    """
76    traceback.print_exception(*exc_info)
77
78
79# utility functions
80def makeRequests(callable_, args_list, callback=None,
81        exc_callback=_handle_thread_exception):
82    """Create several work requests for same callable with different arguments.
83
84    Convenience function for creating several work requests for the same
85    callable where each invocation of the callable receives different values
86    for its arguments.
87
88    ``args_list`` contains the parameters for each invocation of callable.
89    Each item in ``args_list`` should be either a 2-item tuple of the list of
90    positional arguments and a dictionary of keyword arguments or a single,
91    non-tuple argument.
92
93    See docstring for ``WorkRequest`` for info on ``callback`` and
94    ``exc_callback``.
95
96    """
97    requests = []
98    for item in args_list:
99        if isinstance(item, tuple):
100            requests.append(
101                WorkRequest(callable_, item[0], item[1], callback=callback,
102                    exc_callback=exc_callback)
103            )
104        else:
105            requests.append(
106                WorkRequest(callable_, [item], None, callback=callback,
107                    exc_callback=exc_callback)
108            )
109    return requests
110
111
112# classes
113class WorkerThread(threading.Thread):
114    """Background thread connected to the requests/results queues.
115
116    A worker thread sits in the background and picks up work requests from
117    one queue and puts the results in another until it is dismissed.
118
119    """
120
121    def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
122        """Set up thread in daemonic mode and start it immediatedly.
123
124        ``requests_queue`` and ``results_queue`` are instances of
125        ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new
126        worker thread.
127
128        """
129        threading.Thread.__init__(self, **kwds)
130        self.setDaemon(1)
131        self._requests_queue = requests_queue
132        self._results_queue = results_queue
133        self._poll_timeout = poll_timeout
134        self._dismissed = threading.Event()
135        self.start()
136
137    def run(self):
138        """Repeatedly process the job queue until told to exit."""
139        while True:
140            if self._dismissed.isSet():
141                # we are dismissed, break out of loop
142                break
143            # get next work request. If we don't get a new request from the
144            # queue after self._poll_timout seconds, we jump to the start of
145            # the while loop again, to give the thread a chance to exit.
146            try:
147                request = self._requests_queue.get(True, self._poll_timeout)
148            except Queue.Empty:
149                continue
150            else:
151                if self._dismissed.isSet():
152                    # we are dismissed, put back request in queue and exit loop
153                    self._requests_queue.put(request)
154                    break
155                try:
156                    result = request.callable(*request.args, **request.kwds)
157                    self._results_queue.put((request, result))
158                except:
159                    request.exception = True
160                    self._results_queue.put((request, sys.exc_info()))
161
162    def dismiss(self):
163        """Sets a flag to tell the thread to exit when done with current job."""
164        self._dismissed.set()
165
166
167class WorkRequest:
168    """A request to execute a callable for putting in the request queue later.
169
170    See the module function ``makeRequests`` for the common case
171    where you want to build several ``WorkRequest`` objects for the same
172    callable but with different arguments for each call.
173
174    """
175
176    def __init__(self, callable_, args=None, kwds=None, requestID=None,
177            callback=None, exc_callback=_handle_thread_exception):
178        """Create a work request for a callable and attach callbacks.
179
180        A work request consists of the a callable to be executed by a
181        worker thread, a list of positional arguments, a dictionary
182        of keyword arguments.
183
184        A ``callback`` function can be specified, that is called when the
185        results of the request are picked up from the result queue. It must
186        accept two anonymous arguments, the ``WorkRequest`` object and the
187        results of the callable, in that order. If you want to pass additional
188        information to the callback, just stick it on the request object.
189
190        You can also give custom callback for when an exception occurs with
191        the ``exc_callback`` keyword parameter. It should also accept two
192        anonymous arguments, the ``WorkRequest`` and a tuple with the exception
193        details as returned by ``sys.exc_info()``. The default implementation
194        of this callback just prints the exception info via
195        ``traceback.print_exception``. If you want no exception handler
196        callback, just pass in ``None``.
197
198        ``requestID``, if given, must be hashable since it is used by
199        ``ThreadPool`` object to store the results of that work request in a
200        dictionary. It defaults to the return value of ``id(self)``.
201
202        """
203        if requestID is None:
204            self.requestID = id(self)
205        else:
206            try:
207                self.requestID = hash(requestID)
208            except TypeError:
209                raise TypeError("requestID must be hashable.")
210        self.exception = False
211        self.callback = callback
212        self.exc_callback = exc_callback
213        self.callable = callable_
214        self.args = args or []
215        self.kwds = kwds or {}
216
217    def __str__(self):
218        return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \
219            (self.requestID, self.args, self.kwds, self.exception)
220
221class ThreadPool:
222    """A thread pool, distributing work requests and collecting results.
223
224    See the module docstring for more information.
225
226    """
227
228    def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
229        """Set up the thread pool and start num_workers worker threads.
230
231        ``num_workers`` is the number of worker threads to start initially.
232
233        If ``q_size > 0`` the size of the work *request queue* is limited and
234        the thread pool blocks when the queue is full and it tries to put
235        more work requests in it (see ``putRequest`` method), unless you also
236        use a positive ``timeout`` value for ``putRequest``.
237
238        If ``resq_size > 0`` the size of the *results queue* is limited and the
239        worker threads will block when the queue is full and they try to put
240        new results in it.
241
242        .. warning:
243            If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is
244            the possibilty of a deadlock, when the results queue is not pulled
245            regularly and too many jobs are put in the work requests queue.
246            To prevent this, always set ``timeout > 0`` when calling
247            ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.
248
249        """
250        self._requests_queue = Queue.Queue(q_size)
251        self._results_queue = Queue.Queue(resq_size)
252        self.workers = []
253        self.dismissedWorkers = []
254        self.workRequests = {}
255        self.createWorkers(num_workers, poll_timeout)
256
257    def createWorkers(self, num_workers, poll_timeout=5):
258        """Add num_workers worker threads to the pool.
259
260        ``poll_timout`` sets the interval in seconds (int or float) for how
261        ofte threads should check whether they are dismissed, while waiting for
262        requests.
263
264        """
265        for i in range(num_workers):
266            self.workers.append(WorkerThread(self._requests_queue,
267                self._results_queue, poll_timeout=poll_timeout))
268
269    def dismissWorkers(self, num_workers, do_join=False):
270        """Tell num_workers worker threads to quit after their current task."""
271        dismiss_list = []
272        for i in range(min(num_workers, len(self.workers))):
273            worker = self.workers.pop()
274            worker.dismiss()
275            dismiss_list.append(worker)
276
277        if do_join:
278            for worker in dismiss_list:
279                worker.join()
280        else:
281            self.dismissedWorkers.extend(dismiss_list)
282
283    def joinAllDismissedWorkers(self):
284        """Perform Thread.join() on all worker threads that have been dismissed.
285        """
286        for worker in self.dismissedWorkers:
287            worker.join()
288        self.dismissedWorkers = []
289
290    def putRequest(self, request, block=True, timeout=0):
291        """Put work request into work queue and save its id for later."""
292        assert isinstance(request, WorkRequest)
293        # don't reuse old work requests
294        assert not getattr(request, 'exception', None)
295        self._requests_queue.put(request, block, timeout)
296        self.workRequests[request.requestID] = request
297
298    def poll(self, block=False):
299        """Process any new results in the queue."""
300        while True:
301            # still results pending?
302            if not self.workRequests:
303                raise NoResultsPending
304            # are there still workers to process remaining requests?
305            elif block and not self.workers:
306                raise NoWorkersAvailable
307            try:
308                # get back next results
309                request, result = self._results_queue.get(block=block)
310                # has an exception occured?
311                if request.exception and request.exc_callback:
312                    request.exc_callback(request, result)
313                # hand results to callback, if any
314                if request.callback and not \
315                       (request.exception and request.exc_callback):
316                    request.callback(request, result)
317                del self.workRequests[request.requestID]
318            except Queue.Empty:
319                break
320
321    def wait(self):
322        """Wait for results, blocking until all have arrived."""
323        while 1:
324            try:
325                self.poll(True)
326            except NoResultsPending:
327                break
328
329
330################
331# USAGE EXAMPLE
332################
333
334if __name__ == '__main__':
335    import random
336    import time
337
338    # the work the threads will have to do (rather trivial in our example)
339    def do_something(data):
340        time.sleep(random.randint(1,5))
341        result = round(random.random() * data, 5)
342        # just to show off, we throw an exception once in a while
343        if result > 5:
344            raise RuntimeError("Something extraordinary happened!")
345        return result
346
347    # this will be called each time a result is available
348    def print_result(request, result):
349        print "**** Result from request #%s: %r" % (request.requestID, result)
350
351    # this will be called when an exception occurs within a thread
352    # this example exception handler does little more than the default handler
353    def handle_exception(request, exc_info):
354        if not isinstance(exc_info, tuple):
355            # Something is seriously wrong...
356            print request
357            print exc_info
358            raise SystemExit
359        print "**** Exception occured in request #%s: %s" % \
360          (request.requestID, exc_info)
361
362    # assemble the arguments for each job to a list...
363    data = [random.randint(1,10) for i in range(20)]
364    # ... and build a WorkRequest object for each item in data
365    requests = makeRequests(do_something, data, print_result, handle_exception)
366    # to use the default exception handler, uncomment next line and comment out
367    # the preceding one.
368    #requests = makeRequests(do_something, data, print_result)
369
370    # or the other form of args_lists accepted by makeRequests: ((,), {})
371    data = [((random.randint(1,10),), {}) for i in range(20)]
372    requests.extend(
373        makeRequests(do_something, data, print_result, handle_exception)
374        #makeRequests(do_something, data, print_result)
375        # to use the default exception handler, uncomment next line and comment
376        # out the preceding one.
377    )
378
379    # we create a pool of 3 worker threads
380    print "Creating thread pool with 3 worker threads."
381    main = ThreadPool(3)
382
383    # then we put the work requests in the queue...
384    for req in requests:
385        main.putRequest(req)
386        print "Work request #%s added." % req.requestID
387    # or shorter:
388    # [main.putRequest(req) for req in requests]
389
390    # ...and wait for the results to arrive in the result queue
391    # by using ThreadPool.wait(). This would block until results for
392    # all work requests have arrived:
393    # main.wait()
394
395    # instead we can poll for results while doing something else:
396    i = 0
397    while True:
398        try:
399            time.sleep(0.5)
400            main.poll()
401            print "Main thread working...",
402            print "(active worker threads: %i)" % (threading.activeCount()-1, )
403            if i == 10:
404                print "**** Adding 3 more worker threads..."
405                main.createWorkers(3)
406            if i == 20:
407                print "**** Dismissing 2 worker threads..."
408                main.dismissWorkers(2)
409            i += 1
410        except KeyboardInterrupt:
411            print "**** Interrupted!"
412            break
413        except NoResultsPending:
414            print "**** No pending results."
415            break
416    if main.dismissedWorkers:
417        print "Joining all dismissed worker threads..."
418        main.joinAllDismissedWorkers()
Note: See TracBrowser for help on using the repository browser.