Changeset 517 for tbroadcast


Ignore:
Timestamp:
Sep 7, 2009, 2:43:28 PM (15 years ago)
Author:
rybkin
Message:

Version v2.0.6_rc4 from Igor Kachaev

Location:
tbroadcast/HEAD
Files:
2 deleted
5 edited

Legend:

Unmodified
Added
Removed
  • tbroadcast/HEAD/Changelog

    r508 r517  
     12009-08-11 Igor Kachaev <Igor.Katchaev@cern.ch>
     2        * python/threadpool.py update to version 1.2.5 (2008-11-19)
     3        * python/tbroadcast.py implement package sort by use count;
     4                               protect common structures with semaphore
     5        * python/__init__.py   remove from the package
     6        * scripts/tbroadcast   better help; option '-sort' to try sorting
     7
     82009-06-25 Igor Kachaev <Igor.Katchaev@cern.ch>
     9
     10        * scripts/tbroadcast    Check correctness of options supplied;
     11                                do '-help' if empty command line
     12        * python/tbroadcast.py  add wait() after communicate() to prevent loop at the
     13                                end of job; disable -no_keep_going internally as it blocks;
     14                                use local copy of getstatusoutput() - else blocks at nightly
     15        * cmt/requirements      remove python dir from the path
     16
    1172009-05-28 Vincent Garonne <vincent.garonne@cern.ch>
    218
    319        * scripts/tbroadcast    Added the python version checking (>= 2.5)
    420        * python/threadpool.py  Upgraded version to 1.1
    5         * python/tbroadcast.py: Replaced the execute method by communicate from the Popen py25 module to avoid deadlocks
    6                                 due to any of the other OS pipe buffers filling up and blocking the child process
     21        * python/tbroadcast.py: Replaced the execute method by communicate from the Popen py25
     22                                module to avoid deadlocks due to any of the other OS pipe buffers
     23                                filling up and blocking the child process
    724        * python/executer.py    Removed from the package
    825
    9 2007-03-22 Vincent Garonne <garonne@lal.in2p3.fr> 1
     262007-03-22 Vincent Garonne <garonne@lal.in2p3.fr>
    1027
    1128        * scripts/tbroadcast, python/tbroadcast.py: Added the -no_keep_going option
  • tbroadcast/HEAD/cmt/requirements

    r243 r517  
    33author Vincent Garonne <garonne@lal.in2p3.fr>
    44
    5 path_append PATH  "${tbroadcast_root}/python"
     5# path_append PATH  "${tbroadcast_root}/python"
    66
    77path_append PATH  "${tbroadcast_root}/scripts"
  • tbroadcast/HEAD/python/tbroadcast.py

    r508 r517  
    22# -- Author: V.Garonne
    33# -- Mail: garonne@lal.in2p3.fr
    4 # -- Date: 08/25/2006 
     4# -- Date: 08/25/2006
    55# -- Name: tbroadcast
    66# -- Description: main class
    77#----------------------------------#
     8
     9# 21-Jul-2009 compile most used packages first; protect critical sections
    810
    911import os
     
    1113import time
    1214import string
    13 import random
    1415import os.path
    15 import commands
     16# import commands
    1617import traceback
    1718import exceptions
     
    2021from threadpool  import WorkRequest
    2122from threadpool  import ThreadPool
    22 from threadpool  import NoResultsPending
    23 from threadpool  import NoWorkersAvailable
    24 from threadpool  import  makeRequests
    2523
    2624from  subprocess import Popen
     
    2826class Scheduler:
    2927
    30     def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None, error=None, silent = False, perf=False, keep_going=True):
    31         self.pool            = ThreadPool(num_workers=num_workers)
    32         self.current_package = self.get_current_package()
     28    def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, sort=False,
     29                 output=None, error=None, silent = False, perf=False, keep_going=True):
     30        self.pool            = ThreadPool(num_workers=num_workers, poll_timeout=3)
     31        self.num_workers     = num_workers
    3332        self.current_project = {'name': None, 'path': None, 'version': None}
    3433        self.packages        = {}
     
    3635        self.semaphore       = BoundedSemaphore(1)
    3736        self.local           = local
     37        self.sort            = sort
    3838        self.ignore_cycles   = ignore_cycles
    3939        self.output          = output
     
    4242        self.perf            = perf
    4343        self.keep_going      = keep_going
     44        if self.sort:
     45            print "Compile packages sorted according to use count"
    4446        if self.perf is not False:
    4547            f = open (self.perf, 'w+')
     
    4749        if output is not None:
    4850            if not os.path.exists (output):
    49                 print "path",output,"no exits"       
     51                print "path",output,"does not exists"
    5052                sys.exit(-1)
    5153            if not os.path.isdir(output):
    52                 print "path",output,"no a valid directory"       
    53                 sys.exit(-1)           
    54                
     54                print "path",output,"is not a valid directory"
     55                sys.exit(-1)
     56
     57        # init cmt stuff
    5558        self.get_current_project()
     59        self.current_package = self.get_current_package()
    5660        self.instanciate_packages (file)
    5761        if self.local: self.get_local_graph()
    5862        self.check_cycles()
    59        
    60 #        status, output = commands.getstatusoutput("cmt broadcast -local 'echo <package>'")
    61 #        lignes = string.split(output, '\n')
    62 #        i = 1
    63 #        for package in lignes:
    64 #            if package!='' and package[0] != '#':                           
    65 #                print i , package
    66 #                i =i +1
    67 #                if not self.packages.has_key(package):
    68 #                                    print package       
    69 #        print len(self.packages)
    70 #        sys.exit(-1)
     63        self.get_use_count()
    7164
    7265    def get_current_project(self):
    7366        cmd = 'cmt show projects | grep current'
    74         status, output = commands.getstatusoutput (cmd)
     67        status, output = getstatusoutput (cmd)
     68        #status, output = commands.getstatusoutput (cmd)
    7569        if status != 0:
    7670            print output
    77             sys.exit(-1)   
     71            sys.exit(-1)
    7872        lines = string.split(output, '\n')
    79         for line in lines:           
    80             if line!='' and line [0] != '#':                           
     73        for line in lines:
     74            if line!='' and line [0] != '#':
    8175                item  = string.split (line, ' ')
    8276                self.current_project ['name']    = item[0]
     
    8781                    self.current_project ['path'] =  os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
    8882                return
    89                 #print self.current_project     
    90        
     83
    9184    def get_counter(self):
    92         self.semaphore.acquire ()       
     85        self.semaphore.acquire ()
    9386        self.counter = self.counter + 1
    94         value = self.counter 
     87        value = self.counter
    9588        self.semaphore.release()
    9689        return value
    97        
     90
    9891    def check_cycles (self):
    9992        cmd = 'cmt -private show cycles'
    10093        cycle_found = False
    101         status, output = commands.getstatusoutput (cmd)
     94        status, output = getstatusoutput (cmd)
     95        #status, output = commands.getstatusoutput (cmd)
    10296        if status != 0:
    10397            print output
    104             sys.exit(-1)   
     98            sys.exit(-1)
    10599        lines = string.split(output, '\n')
    106         cycles = list ()
    107         for line in lines:           
    108             if line!='' and line [0] != '#':                   
    109                cycles.append (string.split(line)) 
    110         cercles =list()       
     100        cycles = list()
     101        for line in lines:
     102            if line!='' and line [0] != '#':
     103               cycles.append (string.split(line))
     104        cercles =list()
    111105        for cycle in cycles:
    112106            cycleInProject = True
    113             for package in cycle:           
     107            for package in cycle:
    114108                if not self.packages.has_key(package):
    115                     cycleInProject = False       
    116             if cycleInProject: 
     109                    cycleInProject = False
     110            if cycleInProject:
    117111              cercles.append(cycle)
    118112        if len(cercles):
     
    120114                print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
    121115                for cycle in cercles:
    122                     loop = ""                   
     116                    loop = ""
    123117                    for package in cycle:
    124118                        loop = loop + package + ' -> '
    125119                    print loop + '...'
    126                 sys.exit(-1)       
     120                sys.exit(-1)
    127121            else:
    128122                print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
    129123                for cycle in cercles:
    130                     loop = ""                   
     124                    loop = ""
    131125                    for package in cycle:
    132                         loop = loop + package + ' -> '                     
     126                        loop = loop + package + ' -> '
    133127                    if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
    134128                        print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
     
    151145        lines   = list()
    152146        for ligne in lignes:
    153             if ligne[:4] == "use ":             
     147            if ligne[:4] == "use ":
    154148               lines.append(ligne)
    155149        return lines
     
    169163                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
    170164                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
    171                         elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
     165                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
    172166                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
    173167                        else:
    174168                            print '# error path not found for', name
    175                             sys.exit(-1)   
     169                            sys.exit(-1)
    176170                    elif len(result)==5:
    177                         name, version, offset, path, importation = string.split (line[4:len(line)], " ")                                       
     171                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")
    178172                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
    179173                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
    180                         elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
     174                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
    181175                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
    182176                        else:
    183177                            print '# error path not found for', name
    184                             sys.exit(-1)                                                                                                   
     178                            sys.exit(-1)
    185179                    elif len(result)==3:
    186180                        name, version, path = string.split (line[4:len(line)], " ")
    187181                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
    188182                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
    189                         elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):   
     183                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):
    190184                            full_path = path[1:-1] + '/' +name + + '/cmt'
    191185                        else:
     
    195189                        print "error:",line
    196190                        print str(result)
    197                         sys.exit(-1) 
     191                        sys.exit(-1)
    198192                    self.packages[result[0]]['path'] = os.path.normpath(full_path)
    199193                    commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
    200                     if os.path.normpath(commonprefix) == self.current_project ['path']:                   
     194                    if os.path.normpath(commonprefix) == self.current_project ['path']:
    201195                        #print result[0], ' belong to project', self.current_project ['name']
    202196                        self.packages[result[0]]['current_project'] = True
    203197
    204     def get_uses(self, content):       
     198    def get_uses(self, content):
    205199        # initiates variables
    206200        lignes = self.format_uses(content)
    207201        if not len(lignes): return
    208         self.packages [self.current_package] = {'version': '*', 'client': list(),
    209                                                 'uses': list(), 'status': 'waiting', 
     202        self.packages [self.current_package] = {'version': '*', 'use_count': 0,
     203                                                'uses': list(), 'status': 'waiting',
    210204                                                'current_project': True, 'path': os.getcwd()}
    211205        previous_client = self.current_package
    212206        previous_level  = 0
    213207        level_stack    = [{'name':previous_client,'level':previous_level},]
    214         ligne = lignes.pop()       
    215         while len(lignes)!=0:   
     208        ligne = lignes.pop()
     209        while len(lignes)!=0:
    216210            current_level = string.find(ligne, 'use')
    217             while current_level > previous_level:               
     211            while current_level > previous_level:
    218212                name    = string.split (ligne)[2]
    219                 version = string.split (ligne)[3]                             
     213                version = string.split (ligne)[3]
    220214                if not self.packages.has_key (name):
    221                   self.packages [name] = {'version': version, 'uses': list(),
    222                                           'client': list(), 'status': 'waiting',
     215                  self.packages [name] = {'version': version, 'use_count': 0,
     216                                          'uses': list(), 'status': 'waiting',
    223217                                          'current_project': False, 'path': None}
    224218                if name not in self.packages[previous_client]['uses']:# and name != previous_client:
    225                    self.packages[previous_client]['uses'].append (name)               
     219                   self.packages[previous_client]['uses'].append (name)
    226220                level_stack.append({'name':previous_client,'level':previous_level})
    227                 previous_client = name 
    228                 previous_level = current_level                 
     221                previous_client = name
     222                previous_level = current_level
    229223                if len(lignes):
    230224                    ligne = lignes.pop()
    231225                    #print ligne
    232226                    current_level = string.find(ligne, 'use')
    233                                            
    234             #self.packages [previous_client]['status'] ='queued'
     227
    235228            # restore the level
    236229            if len(lignes):
    237                 if len(level_stack):                       
    238                     item = level_stack.pop()               
     230                if len(level_stack):
     231                    item = level_stack.pop()
    239232                    while item['level'] >= current_level and len(level_stack):
    240233                             item = level_stack.pop()
     
    248241        if file is None:
    249242            cmd  = 'cmt show uses'
    250         else:   
    251             cmd = 'cat ' + file       
    252         status, output = commands.getstatusoutput (cmd)
     243        else:
     244            cmd = 'cat ' + file
     245        status, output = getstatusoutput (cmd)
     246        #status, output = commands.getstatusoutput (cmd)
    253247        if status != 0:
    254248            print output
    255249            sys.exit(-1)
    256         self.get_uses(output)   
     250        self.get_uses(output)
    257251        self.get_paths(output)
    258         #self.check_execution (package=self.current_package)
    259252        #self.simulate_execution()
     253
     254    def get_use_count(self):
     255        for key in self.packages:
     256            count = 0
     257            for parent in self.packages:
     258                if key in self.packages[parent]['uses']: count += 1
     259            self.packages[key]['use_count'] = count
     260            # print "Package",key,"use_count",count
    260261
    261262    def get_local_graph(self):
     
    266267                    if key in self.packages[selected]['uses']:
    267268                       self.packages[selected]['uses'].remove(key)
    268                 To_remove.append (key) 
     269                To_remove.append (key)
    269270        for item in To_remove:
    270              del self.packages[item]
     271            del self.packages[item]
    271272
    272273    def simulate_execution(self):
    273        ok = True
    274        indice = 1                     
    275        while ok:
    276            runnable  = list()
    277            for key in self.packages:
    278                if  self.packages[key]['status']!='done':
    279                    if len(self.packages[key]['uses']) == 0:
    280                        runnable.append(key)                                             
    281            if len(runnable):
    282                print '\n#--------------------------------------------------------------'
    283                print "# Execute parallel actions within packages " + str(runnable)
    284            for selected in runnable:       
    285                print '#--------------------------------------------------------------'
    286                print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path']
    287                print '#--------------------------------------------------------------'
    288                self.packages[selected]['status']='done'       
    289                indice = indice + 1
    290                for key in self.packages:
    291                    if selected in self.packages[key]['uses']:
    292                        self.packages[key]['uses'].remove(selected)                               
    293                        #print 'remove', selected, 'from',key
    294            if len(runnable)==0:
    295                            ok = False       
    296                
    297     def check_execution(self, package, path=list(), cycles=list()):
    298         #print package,'-->',self.packages[package]['uses']
    299         #print path
    300         if package in path:
    301             if path[path.index(package):] not in cycles:
    302                 print 'Cycles:',path[path.index(package):], package
    303                 cycles = cycles + path[path.index(package):]
    304                 sys.exit(-1)
    305         path.append(package)
    306         for item in self.packages[package]['uses']:
    307               self.check_execution(package=item, path=path, cycles=cycles)
    308               path.pop()       
    309 
    310     def get_current_package(self):   
     274        while True:
     275            ndone = self.simulate_requests()
     276            if ndone == 0: break
     277
     278    def simulate_requests(self):
     279        runnable = self.get_next_work_units()
     280        if len(runnable):
     281            print '\n#--------------------------------------------------------------'
     282            print "# Execute parallel actions within packages - total", len(runnable)
     283            print '#--------------------------------------------------------------'
     284        for selected in runnable:
     285            use_count = self.packages[selected]['use_count']
     286            path = self.packages[selected]['path']
     287            print '#--------------------------------------------------------------'
     288            print '# (%d/%d %d) Now trying [] in %s' % (self.get_counter(), len(self.packages), use_count, path)
     289            print '#--------------------------------------------------------------'
     290            self.suppress_work_unit(selected)
     291        return len(runnable)
     292
     293    def get_current_package(self):
    311294        cmd            = 'cmt show macro package'
    312         status, output = commands.getstatusoutput (cmd)
     295        status, output = getstatusoutput (cmd)
     296        #status, output = commands.getstatusoutput (cmd)
    313297        if status != 0:
    314298            print output
    315             sys.exit(-1)   
     299            sys.exit(-1)
    316300        lines = string.split(output, '\n')
    317301        for line in lines:
     
    321305                return line [start+1:start+end+1]
    322306
    323     def get_work_area_path (self, name):       
    324         return self.packages [name]['path']
    325        
    326     def get_package_path (self, name):   
    327         #return os.getcwd ()
    328         cmd = 'cmt -use='+name+' run pwd'
    329         status, output = commands.getstatusoutput (cmd)
    330         if status != 0:
    331             print output
    332             sys.exit(-1)   
    333         lines = string.split(output, '\n')
    334         for line in lines:
    335             if line [0] != '#' and line[:5] != "#CMT>":
    336                 print line
    337                 return line
    338  
    339307    def print_dependencies(self):
    340         print '# ------------------------'
    341         print '# package --> dependencies'
    342         print '# ------------------------'
     308        print '# -------------------------------------------'
     309        print '# package --> dependencies, status, use_count'
     310        print '# -------------------------------------------'
    343311        for key in self.packages.keys():
    344             print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                       
     312            print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status'],',', self.packages[key] ['use_count']
    345313
    346314    def print_status(self, status):
    347         print '# ------------------------' 
    348         print '# package --> dependencies' 
    349         print '# ------------------------' 
     315        print '# ------------------------'
     316        print '# package --> dependencies'
     317        print '# ------------------------'
    350318        i = 1
    351319        for key in self.packages.keys():
    352320            if self.packages[key] ['status'] == status:
    353                 print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                 
     321                print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']
    354322                i = i + 1
    355            
     323
    356324    def is_work_unit_waiting (self, name):
    357325        return self.packages[name] ['status'] == 'waiting'
     
    360328        self.packages[name] ['status'] = status
    361329
    362     def get_dependencies (self, name):
    363         return self.packages[name] ['uses']
    364    
    365330    def get_next_work_units (self):
    366         result = list ()
    367         for key in self.packages.keys():
    368             if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
    369                 result.append(key)
     331        # by default returned list is in arbitrary order - may be this is better
     332        # if self.sort is set returned list is sorted - most used packages first
     333        runnable = list()
     334        for key in self.packages:
     335            if len(self.packages[key]['uses']) == 0 and self.packages[key]['status'] == 'waiting':
     336                use_count = self.packages[key]['use_count']
     337                runnable.append((use_count,key))
     338        if self.sort:
     339            runnable.sort()
     340            runnable.reverse()
     341        result = [ pair[1] for pair in runnable ]
    370342        return result
    371343
    372     def is_work_units (self):
    373         result = list ()
    374         for key in self.packages.keys():
    375             if self.is_work_unit_waiting(key) :
    376                 return True
    377         return False       
    378 
    379344    def suppress_work_unit (self, name):
    380         #print '# remove', name, 'from schedule'
     345        #print '# remove', name, 'from schedule'
     346        self.semaphore.acquire()
     347        self.packages[name]['status']='done'
    381348        for key in self.packages.keys():
    382349            if name in self.packages[key]['uses']:
    383350                self.packages[key]['uses'].remove(name)
     351        self.semaphore.release()
    384352
    385353    def add_work_unit (self, name, cmd):
    386354        if self.is_work_unit_waiting (name):
    387355            # we create requests
    388             arg = {'cmd': cmd , 'package':name}
    389             req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback)
     356            arg = {'cmd': cmd , 'package': name}
     357            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback)
     358#           req = WorkRequest(self.do_execute, [arg] , None,
     359#                 callback=self.result_callback, exc_callback=self.handle_exception)
    390360            # then we put the work request in the queue...
    391361            self.set_work_unit_status (name, 'queued')
    392362            self.pool.putRequest(req)
    393             #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
     363            # print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
    394364
    395365    def execute (self, command):
    396366        #self.print_dependencies ()
     367        self.semaphore.acquire()
    397368        packages = self.get_next_work_units()
    398         if len(packages) !=0:
    399             print '\n#--------------------------------------------------------------'   
    400             print '# Execute parallel actions within packages', packages                     
     369        if len(packages):
     370            print '\n#--------------------------------------------------------------'
     371            print '# Execute parallel actions within packages (total',len(packages),')',packages
     372            print '\n#--------------------------------------------------------------'
    401373            for package in packages:
    402374                self.add_work_unit (package, command)
     375            sys.stdout.flush()
     376        self.semaphore.release()
    403377
    404378    def execute_all(self,command):
     
    406380        self.execute (command)
    407381        self.wait()
     382        if self.counter != len(self.packages):
     383            print 'tbroadcast: warning: compiled',self.counter,'out of',len(self.packages),'packages'
     384        self.pool.dismissWorkers(self.num_workers, do_join=True)
    408385        #self.print_dependencies ()
    409         #self.print_status (status='waiting')       
    410         #while self.is_work_units():
    411         #self.wait()           
    412        
     386        #self.print_status (status='waiting')
     387
    413388    def wait (self):
    414        self.pool.wait()   
     389       self.pool.wait()
    415390
    416391    # this will be called each time a result is available
     
    418393      #print "**Result: %s from request #%s" % (str(result), request.requestID)
    419394      #print "# Result: %s from request #%s" % (result['package'], request.requestID)
    420       #if result['package'] == 'CodeCheck':
    421       #    sys.exit(-1)
    422       self.execute (result['cmd'])   
    423 
    424     # the work the threads will have to do
     395      self.execute (result['cmd'])
     396
     397    # the work the threads will have to do
    425398    def do_execute(self, arg):
    426       path = self.get_work_area_path (arg['package'])
     399      package = arg['package']
     400      path = self.packages[package]['path']
    427401      if path == None or not os.path.exists(path):
    428           raise RuntimeError('Path to package '+ arg['package'] +' not found')
    429       self.set_work_unit_status (arg['package'], 'running')     
    430       #cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"     
    431       #os.chdir(path)
    432       #arg['cmd'] = "cd "+ path +";"+ arg['cmd']
    433       header = '#--------------------------------------------------------------\n'
    434       header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path + '\n'
     402          raise RuntimeError('Path to package '+ package +' not found')
     403      self.set_work_unit_status(package, 'running')
     404      header =          '#--------------------------------------------------------------\n'
     405      header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+arg['cmd']+'] in '+path+'\n'
    435406      header = header + '#--------------------------------------------------------------\n'
    436407      print header
     408      sys.stdout.flush()
    437409      project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
    438410      log_name     = string.replace(path, project_path, '')
     
    440412      log_name     = string.replace(log_name, '/', '_')
    441413      log_name     = log_name+'.loglog'
    442       arg['log']   = log_name
    443       cmd = "cd "+ path +";"+ arg['cmd'] 
    444       #status, output= commands.getstatusoutput(cmd)
     414      # arg['log']   = log_name
     415      cmd = "cd "+ path +";"+ arg['cmd']
     416      # status, output = commands.getstatusoutput(cmd)
    445417      # init output file
    446418
    447       self.packages[arg['package']] ['startTime'] = time.time ()                           
     419      self.packages[package] ['startTime'] = time.time()
    448420
    449421      if self.output is not None:
    450            f1 = open (self.output+'/'+ log_name, 'w+')           
     422           f1 = open (self.output+'/'+ log_name, 'w+')
    451423           f1.write (header)
     424           f1.flush()
     425           if self.error is not None:
     426               f2 = open (self.error+'/error'+log_name, 'w+')
     427               fp = Popen(cmd, shell=True, stdout=f1, stderr=f2)
     428               fp.communicate()
     429               status = fp.wait()
     430               f2.close()
     431           else:
     432               fp = Popen(cmd, shell=True, stdout=f1, stderr=f1)
     433               fp.communicate()
     434               status = fp.wait()
    452435           f1.close()
    453            f1 = open (self.output+'/'+ log_name, 'a')
    454            if self.error is not None:       
    455                f2 = open (self.error+'/error'+log_name, 'w+')               
    456                Popen(cmd, shell=True, stdout=f1, stderr=f2).communicate()
    457                f2.close() 
    458            else:
    459                Popen(cmd, shell=True, stdout=f1, stderr=f1).communicate()
    460            f1.close()               
    461436      else:
    462           Popen(cmd, shell=True).communicate()
    463      
     437           fp = Popen(cmd, shell=True)
     438           fp.communicate()
     439           status = fp.wait()
     440      sys.stdout.flush()
     441      sys.stderr.flush()
     442
     443      # Error is not handled - exit() is forbidden here
    464444      if not self.keep_going and status > 0:
    465         sys.exit(status)   
    466                      
    467       self.packages[arg['package']] ['endTime'] = time.time ()
     445          print 'Error',status,'for package',package
     446          # sys.exit(status)
     447
     448      self.packages[package] ['endTime'] = time.time()
    468449      if self.perf:
    469           self.semaphore.acquire ()       
     450          self.semaphore.acquire()
    470451          f = open (self.perf, 'a')
    471           f.write (arg['package']+" "+str(self.packages[arg['package']] ['startTime'])+" "+str(self.packages[arg['package']] ['endTime'] )+'\n') 
     452          f.write (package+" "+str(self.packages[package]['startTime'])+" "+str(self.packages[package]['endTime'] )+'\n')
    472453          f.close()
    473454          self.semaphore.release()
    474       self.suppress_work_unit (arg['package'])
    475       self.set_work_unit_status (arg['package'], 'done')
    476       # status, output= commands.getstatusoutput(cmd)
    477       #if status != 0:
    478       #   raise RuntimeError(output)
     455      self.suppress_work_unit(package)
    479456      return {'cmd': arg['cmd'], 'package':arg['package']}
    480457
    481              
     458
    482459    # this will be called when an exception occurs within a thread
    483460    def handle_exception(self, request, exc_info):
    484         #traceback.print_stack()
    485       print '#--------------------------------------------------------------'       
     461      #traceback.print_stack()
     462      print '#--------------------------------------------------------------'
    486463      #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
    487464      if exc_info[0]== exceptions.SystemExit:
    488         print "Stop execution (No_keep_going option enabled): exit code == %s " %(exc_info[1]) 
    489         print '#--------------------------------------------------------------'   
     465        print "Stop execution (No_keep_going option enabled): exit code == %s " %(exc_info[1])
     466        print '#--------------------------------------------------------------'
    490467        sys.exit(exc_info[1])
    491       print "# Exception occured: %s" %(exc_info[1])     
     468      print "# Exception occured: %s" %(exc_info[1])
    492469      print exc_info
    493       print '#--------------------------------------------------------------'   
     470      print '#--------------------------------------------------------------'
    494471      #sys.exit(-1)
    495472
    496    
     473
    497474    def generate_make (self, file, command):
    498475        makefile = open (file, 'w+')
     
    502479        self.recursive_make (self.current_package, command, makefile, len(self.packages))
    503480        makefile.close ()
    504        
     481
    505482    def recursive_make (self, package, command, makefile, indice,actions=list()):
    506483        lines = self.generate_action_make (package, command, indice)
    507484        makefile.write (lines)
    508         #print lines               
     485        #print lines
    509486        for pkg in self.packages[package] ['uses']:
    510487            if pkg not in actions:
     
    512489                indice = indice - 1
    513490                self.counter = self.counter - 1
    514                 self.recursive_make(pkg, command,makefile, indice, actions)       
    515        
     491                self.recursive_make(pkg, command,makefile, indice, actions)
     492
    516493    def generate_action_make (self, package, command, indice):
    517         lines = package + ' :: '       
     494        lines = package + ' :: '
    518495        # add dependencies
    519496        for pkg in self.packages[package] ['uses']:
    520             lines = lines + ' ' + pkg           
     497            lines = lines + ' ' + pkg
    521498
    522499        # add the action itself
    523         newcommand = string.replace (command, '<package>', package)       
     500        newcommand = string.replace (command, '<package>', package)
    524501        if command =='':
    525502            newcommand='$(MAKE)'
     
    529506        lines = lines +  '\t@echo "#--------------------------------------------------------------"\n'
    530507        lines = lines +  'ifdef LOCATION\n'
    531         lines = lines +  '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'       
     508        lines = lines +  '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'
    532509        lines = lines +  '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n'
    533510        lines = lines +  '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n'
    534         lines = lines + '\t+@cd ' + self.packages[package]['path']           
     511        lines = lines + '\t+@cd ' + self.packages[package]['path']
    535512        lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n'
    536513        lines = lines + 'else\n'
    537         lines = lines + '\t+@cd ' + self.packages[package]['path']           
     514        lines = lines + '\t+@cd ' + self.packages[package]['path']
    538515        lines = lines + ' && ' + newcommand + '\n'
    539         lines = lines + 'endif\n\n'               
     516        lines = lines + 'endif\n\n'
    540517        return lines
    541        
     518
     519
     520# own copy of getstatusoutput
     521def getstatusoutput(cmd):
     522    """Return (status, stdout) of executing cmd in a shell.
     523
     524    A trailing line separator is removed from the output string.
     525    The exit status of the command is encoded in the format specified for wait(),
     526    when the exit status is zero (termination without errors), 0 is returned.
     527    """
     528    import os
     529    p = os.popen(cmd, 'r')
     530    out = p.read()
     531    sts = p.close()
     532    if sts is None: sts = 0
     533    if out.endswith(os.linesep):
     534        out = out[:out.rindex(os.linesep)]
     535    return sts, out
     536
    542537#--------- EoF --------#
  • tbroadcast/HEAD/python/threadpool.py

    r508 r517  
    1 """
    2 
    3 @author: Christopher Arndt
    4 @version: 1.1
    5 
    6 @see: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/435883
    7 
    8 Easy to use object-oriented thread pool framework.
    9 
    10 A thread pool is a class that maintains a pool of worker threads to perform
     1# -*- coding: UTF-8 -*-
     2"""Easy to use object-oriented thread pool framework.
     3
     4A thread pool is an object that maintains a pool of worker threads to perform
    115time consuming operations in parallel. It assigns jobs to the threads
    126by putting them in a work request queue, where they are picked up by the
    137next available thread. This then performs the requested operation in the
    14 background and puts the results in a another queue.
    15 
    16 The thread pool class can then collect the results from all threads from
     8background and puts the results in another queue.
     9
     10The thread pool object can then collect the results from all threads from
    1711this queue as soon as they become available or after all threads have
    1812finished their work. It's also possible, to define callbacks to handle
    1913each result as it comes in.
    2014
    21 The basic concept and some code was taken from the book "Python in a Nutshell"
    22 by Alex Martelli, copyright 2003, ISBN 0-596-00188-6, from section 14.5
    23 "Threaded Program Architecture". I wrapped the main program logic in the
     15The basic concept and some code was taken from the book "Python in a Nutshell,
     162nd edition" by Alex Martelli, O'Reilly 2006, ISBN 0-596-10046-9, from section
     1714.5 "Threaded Program Architecture". I wrapped the main program logic in the
    2418ThreadPool class, added the WorkRequest class and the callback system and
    25 tweaked the code here and there.
    26 
    27 Basic usage:
    28 
    29 >>> main = ThreadPool(poolsize)
    30 >>> requests = makeRequests(some_callable, list_of_args, callback)
    31 >>> [main.putRequests(req) for req in requests]
    32 >>> main.wait()
     19tweaked the code here and there. Kudos also to Florent Aide for the exception
     20handling mechanism.
     21
     22Basic usage::
     23
     24    >>> pool = ThreadPool(poolsize)
     25    >>> requests = makeRequests(some_callable, list_of_args, callback)
     26    >>> [pool.putRequest(req) for req in requests]
     27    >>> pool.wait()
    3328
    3429See the end of the module code for a brief, annotated usage example.
     30
     31Website : http://chrisarndt.de/projects/threadpool/
     32
    3533"""
    36 
    37 __all__ = ['makeRequests', 'NoResultsPending', 'NoWorkersAvailable',
    38   'ThreadPool', 'WorkRequest', 'WorkerThread']
    39 
    40 __author__ = 'Christopher Arndt'
    41 __version__ = '1.1'
    42 __date__ = '2005-07-19'
    43 
    44 import threading, Queue
    45 
     34__docformat__ = "restructuredtext en"
     35
     36__all__ = [
     37    'makeRequests',
     38    'NoResultsPending',
     39    'NoWorkersAvailable',
     40    'ThreadPool',
     41    'WorkRequest',
     42    'WorkerThread'
     43]
     44
     45__author__ = "Christopher Arndt"
     46__version__ = "1.2.5"
     47__revision__ = "$Revision: 354 $"
     48__date__ = "$Date: 2008-11-19 18:34:46 +0100 (Wed, 19 Nov 2008) $"
     49__license__ = 'MIT license'
     50
     51
     52# standard library modules
     53import sys
     54import threading
     55import Queue
     56import traceback
     57
     58
     59# exceptions
    4660class NoResultsPending(Exception):
    4761    """All work requests have been processed."""
    4862    pass
     63
    4964class NoWorkersAvailable(Exception):
    5065    """No worker threads available to process remaining requests."""
    5166    pass
    5267
     68
     69# internal module helper functions
     70def _handle_thread_exception(request, exc_info):
     71    """Default exception handler callback function.
     72
     73    This just prints the exception info via ``traceback.print_exception``.
     74
     75    """
     76    traceback.print_exception(*exc_info)
     77
     78
     79# utility functions
     80def makeRequests(callable_, args_list, callback=None,
     81        exc_callback=_handle_thread_exception):
     82    """Create several work requests for same callable with different arguments.
     83
     84    Convenience function for creating several work requests for the same
     85    callable where each invocation of the callable receives different values
     86    for its arguments.
     87
     88    ``args_list`` contains the parameters for each invocation of callable.
     89    Each item in ``args_list`` should be either a 2-item tuple of the list of
     90    positional arguments and a dictionary of keyword arguments or a single,
     91    non-tuple argument.
     92
     93    See docstring for ``WorkRequest`` for info on ``callback`` and
     94    ``exc_callback``.
     95
     96    """
     97    requests = []
     98    for item in args_list:
     99        if isinstance(item, tuple):
     100            requests.append(
     101                WorkRequest(callable_, item[0], item[1], callback=callback,
     102                    exc_callback=exc_callback)
     103            )
     104        else:
     105            requests.append(
     106                WorkRequest(callable_, [item], None, callback=callback,
     107                    exc_callback=exc_callback)
     108            )
     109    return requests
     110
     111
     112# classes
    53113class WorkerThread(threading.Thread):
    54114    """Background thread connected to the requests/results queues.
     
    56116    A worker thread sits in the background and picks up work requests from
    57117    one queue and puts the results in another until it is dismissed.
     118
    58119    """
    59120
    60     def __init__ (self, requestsQueue, resultsQueue, **kwds):
    61         """Set up thread in damonic mode and start it immediatedly.
    62 
    63         requestsQueue and resultQueue are instances of Queue.Queue passed
    64         by the ThreadPool class when it creates a new worker thread.
     121    def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
     122        """Set up thread in daemonic mode and start it immediatedly.
     123
     124        ``requests_queue`` and ``results_queue`` are instances of
     125        ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new
     126        worker thread.
     127
    65128        """
    66129        threading.Thread.__init__(self, **kwds)
    67130        self.setDaemon(1)
    68         self.workRequestQueue = requestsQueue
    69         self.resultQueue = resultsQueue
     131        self._requests_queue = requests_queue
     132        self._results_queue = results_queue
     133        self._poll_timeout = poll_timeout
    70134        self._dismissed = threading.Event()
    71135        self.start()
    72136
    73137    def run(self):
    74         """Repeatedly process the job queue until told to exit.
    75         """
    76 
    77         while not self._dismissed.isSet():
    78             # thread blocks here, if queue empty
    79             request = self.workRequestQueue.get()
     138        """Repeatedly process the job queue until told to exit."""
     139        while True:
    80140            if self._dismissed.isSet():
    81                 # return the work request we just picked up
    82                 self.workRequestQueue.put(request)
    83                 break # and exit
    84             # XXX catch exceptions here and stick them to request object
    85             self.resultQueue.put(
    86                 (request, request.callable(*request.args, **request.kwds))
    87             )
     141                # we are dismissed, break out of loop
     142                break
     143            # get next work request. If we don't get a new request from the
     144            # queue after self._poll_timout seconds, we jump to the start of
     145            # the while loop again, to give the thread a chance to exit.
     146            try:
     147                request = self._requests_queue.get(True, self._poll_timeout)
     148            except Queue.Empty:
     149                continue
     150            else:
     151                if self._dismissed.isSet():
     152                    # we are dismissed, put back request in queue and exit loop
     153                    self._requests_queue.put(request)
     154                    break
     155                try:
     156                    result = request.callable(*request.args, **request.kwds)
     157                    self._results_queue.put((request, result))
     158                except:
     159                    request.exception = True
     160                    self._results_queue.put((request, sys.exc_info()))
    88161
    89162    def dismiss(self):
    90         """Sets a flag to tell the thread to exit when done with current job.
    91         """
    92 
     163        """Sets a flag to tell the thread to exit when done with current job."""
    93164        self._dismissed.set()
    94165
     
    97168    """A request to execute a callable for putting in the request queue later.
    98169
    99     See the module function makeRequests() for the common case
    100     where you want to build several work requests for the same callable
    101     but different arguments for each call.
     170    See the module function ``makeRequests`` for the common case
     171    where you want to build several ``WorkRequest`` objects for the same
     172    callable but with different arguments for each call.
     173
    102174    """
    103175
    104     def __init__ (self, callable, args=None, kwds=None, requestID=None,
    105       callback=None):
    106         """A work request consists of the a callable to be executed by a
     176    def __init__(self, callable_, args=None, kwds=None, requestID=None,
     177            callback=None, exc_callback=_handle_thread_exception):
     178        """Create a work request for a callable and attach callbacks.
     179
     180        A work request consists of the a callable to be executed by a
    107181        worker thread, a list of positional arguments, a dictionary
    108182        of keyword arguments.
    109183
    110         A callback function can be specified, that is called when the results
    111         of the request are picked up from the result queue. It must accept
    112         two arguments, the request object and it's results in that order.
    113         If you want to pass additional information to the callback, just stick
    114         it on the request object.
    115 
    116         requestID, if given, must be hashable as it is used by the ThreadPool
    117         class to store the results of that work request in a dictionary.
    118         It defaults to the return value of id(self).
     184        A ``callback`` function can be specified, that is called when the
     185        results of the request are picked up from the result queue. It must
     186        accept two anonymous arguments, the ``WorkRequest`` object and the
     187        results of the callable, in that order. If you want to pass additional
     188        information to the callback, just stick it on the request object.
     189
     190        You can also give custom callback for when an exception occurs with
     191        the ``exc_callback`` keyword parameter. It should also accept two
     192        anonymous arguments, the ``WorkRequest`` and a tuple with the exception
     193        details as returned by ``sys.exc_info()``. The default implementation
     194        of this callback just prints the exception info via
     195        ``traceback.print_exception``. If you want no exception handler
     196        callback, just pass in ``None``.
     197
     198        ``requestID``, if given, must be hashable since it is used by
     199        ``ThreadPool`` object to store the results of that work request in a
     200        dictionary. It defaults to the return value of ``id(self)``.
     201
    119202        """
    120203        if requestID is None:
    121204            self.requestID = id(self)
    122205        else:
    123             self.requestID = requestID
     206            try:
     207                self.requestID = hash(requestID)
     208            except TypeError:
     209                raise TypeError("requestID must be hashable.")
     210        self.exception = False
    124211        self.callback = callback
    125         self.callable = callable
     212        self.exc_callback = exc_callback
     213        self.callable = callable_
    126214        self.args = args or []
    127215        self.kwds = kwds or {}
    128216
     217    def __str__(self):
     218        return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \
     219            (self.requestID, self.args, self.kwds, self.exception)
    129220
    130221class ThreadPool:
    131222    """A thread pool, distributing work requests and collecting results.
    132223
    133     See the module doctring for more information.
     224    See the module docstring for more information.
     225
    134226    """
    135227
    136     def __init__ (self, num_workers, q_size=0):
     228    def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
    137229        """Set up the thread pool and start num_workers worker threads.
    138230
    139         num_workers is the number of worker threads to start initialy.
    140         If q_size > 0 the size of the work request is limited and the
    141         thread pool blocks when queue is full and it tries to put more
    142         work requests in it.
     231        ``num_workers`` is the number of worker threads to start initially.
     232
     233        If ``q_size > 0`` the size of the work *request queue* is limited and
     234        the thread pool blocks when the queue is full and it tries to put
     235        more work requests in it (see ``putRequest`` method), unless you also
     236        use a positive ``timeout`` value for ``putRequest``.
     237
     238        If ``resq_size > 0`` the size of the *results queue* is limited and the
     239        worker threads will block when the queue is full and they try to put
     240        new results in it.
     241
     242        .. warning:
     243            If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is
     244            the possibilty of a deadlock, when the results queue is not pulled
     245            regularly and too many jobs are put in the work requests queue.
     246            To prevent this, always set ``timeout > 0`` when calling
     247            ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.
     248
    143249        """
    144 
    145         self.requestsQueue = Queue.Queue(q_size)
    146         self.resultsQueue = Queue.Queue()
     250        self._requests_queue = Queue.Queue(q_size)
     251        self._results_queue = Queue.Queue(resq_size)
    147252        self.workers = []
     253        self.dismissedWorkers = []
    148254        self.workRequests = {}
    149         self.createWorkers(num_workers)
    150 
    151     def createWorkers(self, num_workers):
    152         """Add num_workers worker threads to the pool."""
    153 
     255        self.createWorkers(num_workers, poll_timeout)
     256
     257    def createWorkers(self, num_workers, poll_timeout=5):
     258        """Add num_workers worker threads to the pool.
     259
     260        ``poll_timout`` sets the interval in seconds (int or float) for how
     261        ofte threads should check whether they are dismissed, while waiting for
     262        requests.
     263
     264        """
    154265        for i in range(num_workers):
    155             self.workers.append(WorkerThread(self.requestsQueue,
    156               self.resultsQueue))
    157 
    158     def dismissWorkers(self, num_workers):
    159         """Tell num_workers worker threads to to quit when they're done."""
    160 
     266            self.workers.append(WorkerThread(self._requests_queue,
     267                self._results_queue, poll_timeout=poll_timeout))
     268
     269    def dismissWorkers(self, num_workers, do_join=False):
     270        """Tell num_workers worker threads to quit after their current task."""
     271        dismiss_list = []
    161272        for i in range(min(num_workers, len(self.workers))):
    162273            worker = self.workers.pop()
    163274            worker.dismiss()
    164 
    165     def putRequest(self, request):
    166         """Put work request into work queue and save for later."""
    167         self.requestsQueue.put(request)
     275            dismiss_list.append(worker)
     276
     277        if do_join:
     278            for worker in dismiss_list:
     279                worker.join()
     280        else:
     281            self.dismissedWorkers.extend(dismiss_list)
     282
     283    def joinAllDismissedWorkers(self):
     284        """Perform Thread.join() on all worker threads that have been dismissed.
     285        """
     286        for worker in self.dismissedWorkers:
     287            worker.join()
     288        self.dismissedWorkers = []
     289
     290    def putRequest(self, request, block=True, timeout=0):
     291        """Put work request into work queue and save its id for later."""
     292        assert isinstance(request, WorkRequest)
     293        # don't reuse old work requests
     294        assert not getattr(request, 'exception', None)
     295        self._requests_queue.put(request, block, timeout)
    168296        self.workRequests[request.requestID] = request
    169297
    170298    def poll(self, block=False):
    171299        """Process any new results in the queue."""
    172         while 1:
     300        while True:
     301            # still results pending?
     302            if not self.workRequests:
     303                raise NoResultsPending
     304            # are there still workers to process remaining requests?
     305            elif block and not self.workers:
     306                raise NoWorkersAvailable
    173307            try:
    174                 # still results pending?
    175                 if not self.workRequests:
    176                     raise NoResultsPending
    177                 # are there still workers to process remaining requests?
    178                 elif block and not self.workers:
    179                     raise NoWorkersAvailable
    180308                # get back next results
    181                 request, result = self.resultsQueue.get(block=block)
    182                 # and hand them to the callback, if any
    183                 if request.callback:
     309                request, result = self._results_queue.get(block=block)
     310                # has an exception occured?
     311                if request.exception and request.exc_callback:
     312                    request.exc_callback(request, result)
     313                # hand results to callback, if any
     314                if request.callback and not \
     315                       (request.exception and request.exc_callback):
    184316                    request.callback(request, result)
    185317                del self.workRequests[request.requestID]
     
    189321    def wait(self):
    190322        """Wait for results, blocking until all have arrived."""
    191 
    192323        while 1:
    193324            try:
     
    196327                break
    197328
    198 def makeRequests(callable, args_list, callback=None):
    199     """Convenience function for building several work requests for the same
    200     callable with different arguments for each call.
    201 
    202     args_list contains the parameters for each invocation of callable.
    203     Each item in 'argslist' should be either a 2-item tuple of the list of
    204     positional arguments and a dictionary of keyword arguments or a single,
    205     non-tuple argument.
    206 
    207     callback is called when the results arrive in the result queue.
    208     """
    209 
    210     requests = []
    211    
    212     for item in args_list:
    213        
    214         if item is None:
    215             """no arguments"""
    216             requests.append(
    217               WorkRequest(callable, None, None, callback=callback))
    218            
    219         elif item == isinstance(item, tuple):
    220             """arguments and keywords"""
    221             requests.append(
    222               WorkRequest(callable, item[0], item[1], callback=callback))
    223         else:
    224             """only keywords"""
    225             requests.append(
    226               WorkRequest(callable, [item], None, callback=callback))
    227     return requests
    228 
     329
     330################
     331# USAGE EXAMPLE
     332################
    229333
    230334if __name__ == '__main__':
     
    235339    def do_something(data):
    236340        time.sleep(random.randint(1,5))
    237         return round(random.random() * data, 5)
     341        result = round(random.random() * data, 5)
     342        # just to show off, we throw an exception once in a while
     343        if result > 5:
     344            raise RuntimeError("Something extraordinary happened!")
     345        return result
    238346
    239347    # this will be called each time a result is available
    240348    def print_result(request, result):
    241         print "Result: %s from request #%s" % (result, request.requestID)
     349        print "**** Result from request #%s: %r" % (request.requestID, result)
     350
     351    # this will be called when an exception occurs within a thread
     352    # this example exception handler does little more than the default handler
     353    def handle_exception(request, exc_info):
     354        if not isinstance(exc_info, tuple):
     355            # Something is seriously wrong...
     356            print request
     357            print exc_info
     358            raise SystemExit
     359        print "**** Exception occured in request #%s: %s" % \
     360          (request.requestID, exc_info)
    242361
    243362    # assemble the arguments for each job to a list...
    244363    data = [random.randint(1,10) for i in range(20)]
    245364    # ... and build a WorkRequest object for each item in data
    246     requests = makeRequests(do_something, data, print_result)
    247 
    248     # we create a pool of 10 worker threads
     365    requests = makeRequests(do_something, data, print_result, handle_exception)
     366    # to use the default exception handler, uncomment next line and comment out
     367    # the preceding one.
     368    #requests = makeRequests(do_something, data, print_result)
     369
     370    # or the other form of args_lists accepted by makeRequests: ((,), {})
     371    data = [((random.randint(1,10),), {}) for i in range(20)]
     372    requests.extend(
     373        makeRequests(do_something, data, print_result, handle_exception)
     374        #makeRequests(do_something, data, print_result)
     375        # to use the default exception handler, uncomment next line and comment
     376        # out the preceding one.
     377    )
     378
     379    # we create a pool of 3 worker threads
     380    print "Creating thread pool with 3 worker threads."
    249381    main = ThreadPool(3)
    250382
     
    257389
    258390    # ...and wait for the results to arrive in the result queue
    259     # wait() will return when results for all work requests have arrived
     391    # by using ThreadPool.wait(). This would block until results for
     392    # all work requests have arrived:
    260393    # main.wait()
    261394
    262     # alternatively poll for results while doing something else:
     395    # instead we can poll for results while doing something else:
    263396    i = 0
    264     while 1:
     397    while True:
    265398        try:
     399            time.sleep(0.5)
    266400            main.poll()
    267             print "Main thread working..."
    268             time.sleep(0.5)
     401            print "Main thread working...",
     402            print "(active worker threads: %i)" % (threading.activeCount()-1, )
    269403            if i == 10:
    270                 print "Adding 3 more worker threads..."
     404                print "**** Adding 3 more worker threads..."
    271405                main.createWorkers(3)
     406            if i == 20:
     407                print "**** Dismissing 2 worker threads..."
     408                main.dismissWorkers(2)
    272409            i += 1
    273         except (KeyboardInterrupt, NoResultsPending):
     410        except KeyboardInterrupt:
     411            print "**** Interrupted!"
    274412            break
     413        except NoResultsPending:
     414            print "**** No pending results."
     415            break
     416    if main.dismissedWorkers:
     417        print "Joining all dismissed worker threads..."
     418        main.joinAllDismissedWorkers()
  • tbroadcast/HEAD/scripts/tbroadcast

    r508 r517  
    33# -- Author: V. Garonne
    44# -- Mail: garonne@lal.in2p3.fr
    5 # -- Date: 08/25/2006 
     5# -- Date: 08/25/2006
    66# -- Name: tbroadcast
    77# -- Description: main program
    88#----------------------------------#
    99
    10 import os
    1110import sys
    12 import string
     11import time
    1312
    1413def usage():
    15     print 'Usage : > tbroadcast [global options] [<command>]'
    16     print '# command :'
    17     print '#   <command>: command to execute'
    18     print '# global options :'
    19     print '#   -f=<file>             : Input file'
    20     print '#   -help                 : Print help'
    21     print '#   -local                : Reach packages only within the current project'
    22     print '#   -global               : Reach packages in all CMTPATH/CMTPROJECTPATH items'   
    23     print '#   -ignore_cycles        : Suppress automatically the cycles'
    24     print '#   -make=<file>          : Generate a recursive Make, [see: http://www.tip.net.au/~millerp/rmch/recu-make-cons-harm.html]'
    25     print '#   -nb=<num_worker>      : Change the total number of threads[default is 20]'
    26     print '#   -no_keep_going        : Exit after the first exit code > 1 found and return it in the shell'
    27     print '#   -output=<location>    : Output directory to store output files with the form <package>_output.log'
    28     print '#   -error=<location>     : Output directory to store error output with the form <package>_error.log'
    29     print '#   -perf=<file>          : Store for each package the time for executing the command in the <file> file'
    30     print '#   -print                : Print dependencies for each package'
    31     print '#   -version              : version of tbroadcast'       
    32     print '#   -silent               : Disable print'     
    33     print '#   -test                 : Simulate execution'
     14    print """
     15Usage: tbroadcast [global options] [<command>]
     16#
     17# <command> is executed in <package>/cmt
     18#
     19# global options :
     20#   -help                 : Print help
     21#   -local                : Reach packages only within the current project
     22#                         : if not specified reach packages in all CMTPATH/CMTPROJECTPATH items
     23#   -ignore[_cycles]      : Suppress automatically the cycles
     24#   -sort                 : Compile packages in order of use count, most significant first
     25#   -nb=<num_worker>      : Change the total number of threads[default is 20]
     26#   -output=<location>    : Output directory to store output files with the form <package>_output.log
     27#   -error=<location>     : Output directory to store error output with the form <package>_error.log
     28#   -perf=<file>          : Store for each package the time for executing the command in the <file> file
     29#   -make=<file>          : Generate a recursive Make, [see: http://www.tip.net.au/~millerp/rmch/recu-make-cons-harm.html]
     30#   -print                : Print dependencies for each package and exit
     31#   -version              : Print version of tbroadcast and exit
     32#   -test                 : Simulate execution and exit
     33#   -                     : is accepted and does nothing
     34#
     35# Example:
     36#   tbroadcast -local -ignore -nb=4 'make -j6'
     37"""
     38
     39# Unused options
     40#   -f=<file>             : Input file (option for debug only)
     41#   -no_keep_going        : Exit after the first exit code > 1 found and return it in the shell
     42#   -silent               : Disable print
    3443
    3544if __name__ == '__main__':
     
    3847    cur_version = sys.version_info
    3948
    40     if not (cur_version[0] > req_version[0] or (cur_version[0] == req_version[0]  and cur_version[1] >= req_version[1])):
    41         raise "must use python 2.5 or greater"
     49    if not (cur_version[0] > req_version[0] or (cur_version[0] == req_version[0] and cur_version[1] >= req_version[1])):
     50        print "tbroadcast: must use python 2.5 or greater"
     51        sys.exit(-1)
    4252
    4353    from tbroadcast import Scheduler
     
    4656    num_worker    = 20
    4757    command       = ''
    48     version       = 'v2.0.4'
     58    version       = 'v2.0.7'
    4959    test          = False
    50     check         = False
    5160    print_graph   = False
    5261    local         = False
    5362    ignore_cycles = False
    5463    silent        = False
    55     perf          = False     
     64    perf          = False
     65    sort          = False
    5666    output        = None
    5767    error         = None
     
    6070    makefile      = 'Makefile'
    6171    keep_going    = True
    62    
     72
    6373    if len(sys.argv) == 1:
    64         test = True
    65     else:       
    66         for arg in sys.argv[1:len(sys.argv)]:           
     74        usage()
     75        sys.exit(-1)
     76    else:
     77        for arg in sys.argv[1:len(sys.argv)]:
     78#            print "Argument is",arg
    6779             if arg[0]=='-':
    68                  option = string.split(arg,'=')[0]
     80                 option = arg.split('=')[0]
    6981                 if option == '-version':
    7082                     print version
    7183                     sys.exit(-1)
    72                  if option == '-nb':
    73                     num_worker = int (string.split(arg,'=')[1])
    74                  if option == '-f':
    75                     file = string.split(arg,'=')[1]
    76                  if option == '-perf':
    77                     perf = string.split(arg,'=')[1]
    78                  if option == '-output':
    79                     output = string.split(arg,'=')[1]
    80                  if option == '-error':
    81                     error = string.split(arg,'=')[1]
    82                  if option == '-local':
    83                      local= True
    84                  if option == '-ignore_cycles':             
     84                 elif option == '-nb':
     85                     num_worker = int (arg.split('=')[1])
     86                 elif option == '-f':
     87                     file = arg.split('=')[1]
     88                 elif option == '-perf':
     89                     perf = arg.split('=')[1]
     90                 elif option == '-output':
     91                     output = arg.split('=')[1]
     92                 elif option == '-error':
     93                     error = arg.split('=')[1]
     94                 elif option == '-local':
     95                     local = True
     96                 elif option == '-sort':
     97                     sort = True
     98                 elif option[:7] == '-ignore':
    8599                     ignore_cycles = True
    86                  if option == '-silent':             
    87                      silent = True       
    88                  if option == '-no_keep_going':             
     100                 elif option == '-silent':
     101                     silent = True
     102                 elif option == '-no_keep_going':
    89103                     keep_going = False
    90                              
    91                  if option == '-help':   
    92                       usage()
    93                       sys.exit(-1)     
    94                                  
    95                  if option == '-test':
    96                          test = True   
    97                  elif option == '-check':
    98                          check = True   
     104                 elif option == '-help':
     105                     usage()
     106                     sys.exit(-1)
     107                 elif option == '-test':
     108                     test = True
    99109                 elif option == '-print':
    100                          print_graph = True   
    101                  elif option == '-make':       
     110                     print_graph = True
     111                 elif option == '-make':
    102112                     make     = True
    103                      makefile = string.split(arg,'=')[1]
     113                     makefile = arg.split('=')[1]
     114                 elif option == '-':
     115                     pass
     116                 else:
     117                     print 'tbroadcast: bad option "%s", use -help for help' % option
     118                     sys.exit(-1)
    104119             else:
    105                   command = arg
     120                 command = arg
    106121
    107     master = Scheduler (num_workers=num_worker, file=file, ignore_cycles=ignore_cycles,
    108                         local=local, output=output, error=error, silent=silent, perf=perf,
    109                         keep_going=keep_going)
     122#   print "End of arguments. Command to execute", command
     123
     124    if not (command or test or print_graph):
     125        print 'tbroadcast: no command specified'
     126        sys.exit(-1)
     127
     128    master = Scheduler (num_workers=num_worker, file=file, ignore_cycles=ignore_cycles,
     129                        local=local, output=output, error=error, silent=silent, perf=perf,
     130                        keep_going=keep_going, sort=sort)
    110131    if test:
    111132        master.simulate_execution()
    112     elif check:
    113         master.check_execution (package=master.get_current_package())   
    114133    elif print_graph:
    115         master.print_dependencies ()
     134        master.print_dependencies()
    116135    elif make:
    117         master.generate_make (makefile, command)   
     136        master.generate_make (makefile, command)
    118137    else:
     138        print 'tbroadcast: start of job at', time.strftime('%d-%b-%Y %T')
    119139        master.execute_all (command)
    120     #sys.exit(-1);
     140        print 'tbroadcast: end of job at', time.strftime('%d-%b-%Y %T')
     141    #sys.exit(-1)
    121142#--------- EoF --------#
Note: See TracChangeset for help on using the changeset viewer.