Changeset 508 for tbroadcast


Ignore:
Timestamp:
May 28, 2009, 9:44:59 AM (15 years ago)
Author:
garonne
Message:

Commit changes

Location:
tbroadcast/HEAD
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • tbroadcast/HEAD/Changelog

    r478 r508  
    1 2008-11-28    <rybkin@lal.in2p3.fr> 2
    2        
    3         * python/executer.py: Introduce function getstatusoutput returning
    4         (status, stdout) of executing cmd
    5         * python/tbroadcast.py: Use the introduced getstatusoutput instead of
    6         the function of the same name from the standard module commands so as
    7         not to mix stdout and stderr when parsing commands output
     12009-05-28 Vincent Garonne <vincent.garonne@cern.ch>
     2
     3        * scripts/tbroadcast    Added the python version checking (>= 2.5)
     4        * python/threadpool.py  Upgraded version to 1.1
     5        * python/tbroadcast.py: Replaced the execute method by communicate from the Popen py25 module to avoid deadlocks
     6                                due to any of the other OS pipe buffers filling up and blocking the child process
     7        * python/executer.py    Removed from the package
    88
    992007-03-22 Vincent Garonne <garonne@lal.in2p3.fr> 1
    1010
    11         * scripts/tbroadcast, python/tbroadcast.py: Add the -no_keep_going option
    12 
    13 2007-01-22 Vincent Garonne <garonne@lal.in2p3.fr>
    14         * First official release
    15 
     11        * scripts/tbroadcast, python/tbroadcast.py: Added the -no_keep_going option
  • tbroadcast/HEAD/python/tbroadcast.py

    r478 r508  
    1818from threading import BoundedSemaphore
    1919
    20 from threadpool import WorkRequest
    21 from threadpool import ThreadPool
    22 from threadpool import NoResultsPending
    23 from threadpool import NoWorkersAvailable
    24 from threadpool import  makeRequests
    25 from executer   import  exeCommand
    26 from executer   import  getstatusoutput
     20from threadpool  import WorkRequest
     21from threadpool  import ThreadPool
     22from threadpool  import NoResultsPending
     23from threadpool  import NoWorkersAvailable
     24from threadpool  import  makeRequests
     25
     26from  subprocess import Popen
    2727
    2828class Scheduler:
     
    7272    def get_current_project(self):
    7373        cmd = 'cmt show projects | grep current'
    74         status, output = getstatusoutput (cmd)
    75 #        status, output = commands.getstatusoutput (cmd)
     74        status, output = commands.getstatusoutput (cmd)
    7675        if status != 0:
    7776            print output
     
    10099        cmd = 'cmt -private show cycles'
    101100        cycle_found = False
    102         status, output = getstatusoutput (cmd)
    103 #        status, output = commands.getstatusoutput (cmd)
     101        status, output = commands.getstatusoutput (cmd)
    104102        if status != 0:
    105103            print output
     
    252250        else:   
    253251            cmd = 'cat ' + file       
    254         status, output = getstatusoutput (cmd)
    255 #        status, output = commands.getstatusoutput (cmd)
     252        status, output = commands.getstatusoutput (cmd)
    256253        if status != 0:
    257254            print output
     
    312309
    313310    def get_current_package(self):   
    314         cmd = 'cmt show macro package'
    315         status, output = getstatusoutput (cmd)
    316 #        status, output = commands.getstatusoutput (cmd)
     311        cmd            = 'cmt show macro package'
     312        status, output = commands.getstatusoutput (cmd)
    317313        if status != 0:
    318314            print output
     
    331327        #return os.getcwd ()
    332328        cmd = 'cmt -use='+name+' run pwd'
    333         status, output = getstatusoutput (cmd)
    334 #        status, output = commands.getstatusoutput (cmd)
     329        status, output = commands.getstatusoutput (cmd)
    335330        if status != 0:
    336331            print output
     
    392387            # we create requests
    393388            arg = {'cmd': cmd , 'package':name}
    394             req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception)
     389            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback)
    395390            # then we put the work request in the queue...
    396391            self.set_work_unit_status (name, 'queued')
     
    441436      print header
    442437      project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
    443       log_name   = string.replace(path, project_path, '')
    444       log_name   = string.replace(log_name, '/cmt', '')
    445       log_name   = string.replace(log_name, '/', '_')
    446       log_name   = log_name+'.loglog'
    447       arg['log'] = log_name
     438      log_name     = string.replace(path, project_path, '')
     439      log_name     = string.replace(log_name, '/cmt', '')
     440      log_name     = string.replace(log_name, '/', '_')
     441      log_name     = log_name+'.loglog'
     442      arg['log']   = log_name
    448443      cmd = "cd "+ path +";"+ arg['cmd']
    449444      #status, output= commands.getstatusoutput(cmd)
    450445      # init output file
     446
     447      self.packages[arg['package']] ['startTime'] = time.time ()                           
     448
    451449      if self.output is not None:
    452            f = open (self.output+'/'+ log_name, 'w+')
    453            f.write (header)
    454            f.close()     
    455            if self.error is not None:
    456                f = open (self.error+'/error'+log_name, 'w+')
    457                f.close()
    458       self.packages[arg['package']] ['startTime'] = time.time ()                           
    459       status, output, error, pythonError  = exeCommand(sCmd=cmd, oLineCallback=self.redirectOutput, arg=arg)#,iTimeout = 3600)
     450           f1 = open (self.output+'/'+ log_name, 'w+')           
     451           f1.write (header)
     452           f1.close()
     453           f1 = open (self.output+'/'+ log_name, 'a')
     454           if self.error is not None:       
     455               f2 = open (self.error+'/error'+log_name, 'w+')               
     456               Popen(cmd, shell=True, stdout=f1, stderr=f2).communicate()
     457               f2.close() 
     458           else:
     459               Popen(cmd, shell=True, stdout=f1, stderr=f1).communicate()
     460           f1.close()               
     461      else:
     462          Popen(cmd, shell=True).communicate()
     463     
    460464      if not self.keep_going and status > 0:
    461465        sys.exit(status)   
     
    473477      #if status != 0:
    474478      #   raise RuntimeError(output)
    475       return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
    476 
    477     def redirectOutput(self, index, buffer, arg):
    478         """Filter function to redirect the std output and error of the job
    479            executable for real-time debugging
    480         """
    481         if self.output is not None:
    482            if index==0:   
    483                f = open (self.output+'/'+arg['log'], 'a')
    484                f.write (buffer+'\n')
    485                f.close()
    486            elif index==1:
    487                if self.error is not None:
    488                    f = open (self.error+'/error'+arg['log'], 'a')
    489                else:
    490                    f = open (self.output+'/'+arg['log'], 'a')                   
    491                f.write (buffer+'\n')                   
    492                f.close()                               
    493         if not self.silent:
    494             print buffer
     479      return {'cmd': arg['cmd'], 'package':arg['package']}
     480
    495481             
    496482    # this will be called when an exception occurs within a thread
  • tbroadcast/HEAD/python/threadpool.py

    r236 r508  
    1 """Easy to use object-oriented thread pool framework.
    2 
    3 A thread pool is an object that maintains a pool of worker threads to perform
     1"""
     2
     3@author: Christopher Arndt
     4@version: 1.1
     5
     6@see: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/435883
     7
     8Easy to use object-oriented thread pool framework.
     9
     10A thread pool is a class that maintains a pool of worker threads to perform
    411time consuming operations in parallel. It assigns jobs to the threads
    512by putting them in a work request queue, where they are picked up by the
     
    714background and puts the results in a another queue.
    815
    9 The thread pool object can then collect the results from all threads from
     16The thread pool class can then collect the results from all threads from
    1017this queue as soon as they become available or after all threads have
    1118finished their work. It's also possible, to define callbacks to handle
     
    1623"Threaded Program Architecture". I wrapped the main program logic in the
    1724ThreadPool class, added the WorkRequest class and the callback system and
    18 tweaked the code here and there. Kudos also to Florent Aide for the exception
    19 handling mechanism.
     25tweaked the code here and there.
    2026
    2127Basic usage:
    2228
    23 >>> pool = TreadPool(poolsize)
     29>>> main = ThreadPool(poolsize)
    2430>>> requests = makeRequests(some_callable, list_of_args, callback)
    25 >>> [pool.putRequest(req) for req in requests]
    26 >>> pool.wait()
     31>>> [main.putRequests(req) for req in requests]
     32>>> main.wait()
    2733
    2834See the end of the module code for a brief, annotated usage example.
    29 
    30 Website : http://chrisarndt.de/en/software/python/threadpool/
    3135"""
    3236
    33 __all__ = [
    34   'makeRequests',
    35   'NoResultsPending',
    36   'NoWorkersAvailable',
    37   'ThreadPool',
    38   'WorkRequest',
    39   'WorkerThread'
    40 ]
    41 
    42 __author__ = "Christopher Arndt"
    43 __version__ = "1.2.3"
    44 __revision__ = "$Revision: 1.5 $"
    45 __date__ = "$Date: 2006/06/23 12:32:25 $"
    46 __license__ = 'Python license'
    47 
    48 # standard library modules
    49 import sys
    50 import threading
    51 import Queue
    52 
    53 # exceptions
     37__all__ = ['makeRequests', 'NoResultsPending', 'NoWorkersAvailable',
     38  'ThreadPool', 'WorkRequest', 'WorkerThread']
     39
     40__author__ = 'Christopher Arndt'
     41__version__ = '1.1'
     42__date__ = '2005-07-19'
     43
     44import threading, Queue
     45
    5446class NoResultsPending(Exception):
    5547    """All work requests have been processed."""
    5648    pass
    57 
    5849class NoWorkersAvailable(Exception):
    5950    """No worker threads available to process remaining requests."""
    6051    pass
    6152
    62 # classes
    6353class WorkerThread(threading.Thread):
    6454    """Background thread connected to the requests/results queues.
     
    6858    """
    6959
    70     def __init__(self, requestsQueue, resultsQueue, **kwds):
    71         """Set up thread in daemonic mode and start it immediatedly.
     60    def __init__ (self, requestsQueue, resultsQueue, **kwds):
     61        """Set up thread in damonic mode and start it immediatedly.
    7262
    7363        requestsQueue and resultQueue are instances of Queue.Queue passed
    7464        by the ThreadPool class when it creates a new worker thread.
    7565        """
    76 
    7766        threading.Thread.__init__(self, **kwds)
    7867        self.setDaemon(1)
     
    8372
    8473    def run(self):
    85         """Repeatedly process the job queue until told to exit."""
     74        """Repeatedly process the job queue until told to exit.
     75        """
    8676
    8777        while not self._dismissed.isSet():
     
    8979            request = self.workRequestQueue.get()
    9080            if self._dismissed.isSet():
    91                 # if told to exit, return the work request we just picked up
     81                # return the work request we just picked up
    9282                self.workRequestQueue.put(request)
    9383                break # and exit
    94             try:
    95                 self.resultQueue.put(
    96                     (request, request.callable(*request.args, **request.kwds))
    97                 )
    98             except:
    99                 request.exception = True
    100                 self.resultQueue.put((request, sys.exc_info()))
     84            # XXX catch exceptions here and stick them to request object
     85            self.resultQueue.put(
     86                (request, request.callable(*request.args, **request.kwds))
     87            )
    10188
    10289    def dismiss(self):
     
    11198
    11299    See the module function makeRequests() for the common case
    113     where you want to build several WorkRequests for the same callable
    114     but with different arguments for each call.
    115     """
    116 
    117     def __init__(self, callable, args=None, kwds=None, requestID=None,
    118       callback=None, exc_callback=None):
    119         """Create a work request for a callable and attach callbacks.
    120 
    121         A work request consists of the a callable to be executed by a
     100    where you want to build several work requests for the same callable
     101    but different arguments for each call.
     102    """
     103
     104    def __init__ (self, callable, args=None, kwds=None, requestID=None,
     105      callback=None):
     106        """A work request consists of the a callable to be executed by a
    122107        worker thread, a list of positional arguments, a dictionary
    123108        of keyword arguments.
     
    125110        A callback function can be specified, that is called when the results
    126111        of the request are picked up from the result queue. It must accept
    127         two arguments, the request object and the results of the callable,
    128         in that order. If you want to pass additional information to the
    129         callback, just stick it on the request object.
    130 
    131         You can also give a callback for when an exception occurs. It should
    132         also accept two arguments, the work request and a tuple with the
    133         exception details as returned by sys.exc_info().
    134 
    135         requestID, if given, must be hashable since it is used by the
    136         ThreadPool object to store the results of that work request in a
    137         dictionary. It defaults to the return value of id(self).
    138         """
    139 
     112        two arguments, the request object and it's results in that order.
     113        If you want to pass additional information to the callback, just stick
     114        it on the request object.
     115
     116        requestID, if given, must be hashable as it is used by the ThreadPool
     117        class to store the results of that work request in a dictionary.
     118        It defaults to the return value of id(self).
     119        """
    140120        if requestID is None:
    141121            self.requestID = id(self)
    142122        else:
    143             try:
    144                 hash(requestID)
    145             except TypeError:
    146                 raise TypeError("requestID must be hashable.")
    147123            self.requestID = requestID
    148         self.exception = False
    149124        self.callback = callback
    150         self.exc_callback = exc_callback
    151125        self.callable = callable
    152126        self.args = args or []
     
    160134    """
    161135
    162     def __init__(self, num_workers, q_size=0):
     136    def __init__ (self, num_workers, q_size=0):
    163137        """Set up the thread pool and start num_workers worker threads.
    164138
    165139        num_workers is the number of worker threads to start initialy.
    166         If q_size > 0 the size of the work request queue is limited and
    167         the thread pool blocks when the queue is full and it tries to put
    168         more work requests in it (see putRequest method).
     140        If q_size > 0 the size of the work request is limited and the
     141        thread pool blocks when queue is full and it tries to put more
     142        work requests in it.
    169143        """
    170144
     
    183157
    184158    def dismissWorkers(self, num_workers):
    185         """Tell num_workers worker threads to quit after their current task.
    186         """
     159        """Tell num_workers worker threads to to quit when they're done."""
    187160
    188161        for i in range(min(num_workers, len(self.workers))):
     
    190163            worker.dismiss()
    191164
    192     def putRequest(self, request, block=True, timeout=0):
    193         """Put work request into work queue and save its id for later."""
    194 
    195         assert isinstance(request, WorkRequest)
    196         self.requestsQueue.put(item=request, block=block)#,timeout=timeout)
     165    def putRequest(self, request):
     166        """Put work request into work queue and save for later."""
     167        self.requestsQueue.put(request)
    197168        self.workRequests[request.requestID] = request
    198169
    199170    def poll(self, block=False):
    200171        """Process any new results in the queue."""
    201 
    202         while True:
    203             # still results pending?
    204             if not self.workRequests:
    205                 raise NoResultsPending
    206             # are there still workers to process remaining requests?
    207             elif block and not self.workers:
    208                 raise NoWorkersAvailable
     172        while 1:
    209173            try:
     174                # still results pending?
     175                if not self.workRequests:
     176                    raise NoResultsPending
     177                # are there still workers to process remaining requests?
     178                elif block and not self.workers:
     179                    raise NoWorkersAvailable
    210180                # get back next results
    211181                request, result = self.resultsQueue.get(block=block)
    212                 # has an exception occured?
    213                 if request.exception and request.exc_callback:
    214                     request.exc_callback(request, result)
    215                 # hand results to callback, if any
    216                 if request.callback and not \
    217                   (request.exception and request.exc_callback):
     182                # and hand them to the callback, if any
     183                if request.callback:
    218184                    request.callback(request, result)
    219185                del self.workRequests[request.requestID]
     
    223189    def wait(self):
    224190        """Wait for results, blocking until all have arrived."""
     191
    225192        while 1:
    226193            try:
    227194                self.poll(True)
    228195            except NoResultsPending:
    229                 break               
    230 # helper functions
    231 def makeRequests(callable, args_list, callback=None, exc_callback=None):
    232     """Create several work requests for same callable with different arguments.
    233    
    234     Convenience function for creating several work requests for the same
    235     callable where each invocation of the callable receives different values
    236     for its arguments.
     196                break
     197
     198def makeRequests(callable, args_list, callback=None):
     199    """Convenience function for building several work requests for the same
     200    callable with different arguments for each call.
    237201
    238202    args_list contains the parameters for each invocation of callable.
    239     Each item in 'args_list' should be either a 2-item tuple of the list of
     203    Each item in 'argslist' should be either a 2-item tuple of the list of
    240204    positional arguments and a dictionary of keyword arguments or a single,
    241205    non-tuple argument.
    242206
    243     See docstring for WorkRequest for info on callback and exc_callback.
     207    callback is called when the results arrive in the result queue.
    244208    """
    245209
    246210    requests = []
     211   
    247212    for item in args_list:
    248         if isinstance(item, tuple):
     213       
     214        if item is None:
     215            """no arguments"""
    249216            requests.append(
    250               WorkRequest(callable, item[0], item[1], callback=callback,
    251                 exc_callback=exc_callback)
    252             )
     217              WorkRequest(callable, None, None, callback=callback))
     218           
     219        elif item == isinstance(item, tuple):
     220            """arguments and keywords"""
     221            requests.append(
     222              WorkRequest(callable, item[0], item[1], callback=callback))
    253223        else:
     224            """only keywords"""
    254225            requests.append(
    255               WorkRequest(callable, [item], None, callback=callback,
    256                 exc_callback=exc_callback)
    257             )
     226              WorkRequest(callable, [item], None, callback=callback))
    258227    return requests
    259228
    260229
    261  #------------------------------ EoF ------------------------------#
     230if __name__ == '__main__':
     231    import random
     232    import time
     233
     234    # the work the threads will have to do (rather trivial in our example)
     235    def do_something(data):
     236        time.sleep(random.randint(1,5))
     237        return round(random.random() * data, 5)
     238
     239    # this will be called each time a result is available
     240    def print_result(request, result):
     241        print "Result: %s from request #%s" % (result, request.requestID)
     242
     243    # assemble the arguments for each job to a list...
     244    data = [random.randint(1,10) for i in range(20)]
     245    # ... and build a WorkRequest object for each item in data
     246    requests = makeRequests(do_something, data, print_result)
     247
     248    # we create a pool of 10 worker threads
     249    main = ThreadPool(3)
     250
     251    # then we put the work requests in the queue...
     252    for req in requests:
     253        main.putRequest(req)
     254        print "Work request #%s added." % req.requestID
     255    # or shorter:
     256    # [main.putRequest(req) for req in requests]
     257
     258    # ...and wait for the results to arrive in the result queue
     259    # wait() will return when results for all work requests have arrived
     260    # main.wait()
     261
     262    # alternatively poll for results while doing something else:
     263    i = 0
     264    while 1:
     265        try:
     266            main.poll()
     267            print "Main thread working..."
     268            time.sleep(0.5)
     269            if i == 10:
     270                print "Adding 3 more worker threads..."
     271                main.createWorkers(3)
     272            i += 1
     273        except (KeyboardInterrupt, NoResultsPending):
     274            break
  • tbroadcast/HEAD/scripts/tbroadcast

    r393 r508  
    1111import sys
    1212import string
    13 
    14 from tbroadcast import Scheduler
    1513
    1614def usage():
     
    3634
    3735if __name__ == '__main__':
     36    # check python version
     37    req_version = (2,5)
     38    cur_version = sys.version_info
     39
     40    if not (cur_version[0] > req_version[0] or (cur_version[0] == req_version[0]  and cur_version[1] >= req_version[1])):
     41        raise "must use python 2.5 or greater"
     42
     43    from tbroadcast import Scheduler
     44
    3845    # Default options
    3946    num_worker    = 20
    4047    command       = ''
    41     version       = 'HEAD'
     48    version       = 'v2.0.4'
    4249    test          = False
    4350    check         = False
Note: See TracChangeset for help on using the changeset viewer.