Changeset 244 for tbroadcast


Ignore:
Timestamp:
Sep 7, 2006, 10:31:17 AM (18 years ago)
Author:
garonne
Message:

MàJ

Location:
tbroadcast/v2
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • tbroadcast/v2/python/executer.py

    r232 r244  
    131131            return self.__generateSystemCommandError( "Exceeded maximum buffer size ( %d bytes ) timeout for '%s' call" % ( self.iBufferLimit, self.sCmd ) )
    132132
    133     def systemCall( self, sCmd, oCallbackFunction = None ):
     133    def systemCall( self, sCmd, oCallbackFunction = None, arg=None ):
    134134        self.sCmd = sCmd
    135135        self.oCallback = oCallbackFunction
     136        self.arg    = arg
    136137        self.oChild = popen2.Popen3( self.sCmd, True )
    137138        self.lBuffers = [ [ "", 0 ], [ "", 0 ] ]
     
    182183        iNextLine = self.lBuffers[ iIndex ][0][ self.lBuffers[ iIndex ][1]: ].find( "\n" )
    183184        if iNextLine > -1:
    184             self.oCallback( iIndex, self.lBuffers[ iIndex ][0][ self.lBuffers[ iIndex ][1]: self.lBuffers[ iIndex ][1] + iNextLine ] )
     185            self.oCallback( iIndex, self.lBuffers[ iIndex ][0][ self.lBuffers[ iIndex ][1]: self.lBuffers[ iIndex ][1] + iNextLine ], arg=self.arg )
    185186            self.lBuffers[ iIndex ][1] += iNextLine + 1
    186187            return True
     
    196197
    197198############################################################################     
    198 def exeCommand( sCmd, iTimeout = 0, oLineCallback = redirectOutput):
     199def exeCommand( sCmd, iTimeout = 0, oLineCallback = redirectOutput, arg=None):
    199200    """Return ( status, output, error, pythonError ) of executing cmd in a shell."""
    200201    oSPE = SubprocessExecuter( iTimeout )
    201     retVal = oSPE.systemCall( sCmd, oLineCallback )
     202    retVal = oSPE.systemCall( sCmd, oLineCallback, arg=arg)
    202203    if retVal[ 'OK' ]:
    203204        return retVal[ 'Value' ][0], retVal[ 'Value' ][1], retVal[ 'Value' ][2], 0
  • tbroadcast/v2/python/tbroadcast.py

    r243 r244  
    2626class Scheduler:
    2727
    28     def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None):
     28    def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None, error=None, silent = False):
    2929        self.pool            = ThreadPool(num_workers=num_workers)
    3030        self.current_package = self.get_current_package()
    3131        self.current_project = {'name': None, 'path': None, 'version': None}
    32         self.packages        = {} 
     32        self.packages        = {}
    3333        self.counter         = 0
    3434        self.semaphore       = BoundedSemaphore(1)
     
    3636        self.ignore_cycles   = ignore_cycles
    3737        self.output          = output
     38        self.error           = error
     39        self.silent          = silent
    3840        if output is not None:
    3941            if not os.path.exists (output):
     
    4850        if self.local: self.get_local_graph()
    4951        self.check_cycles()
     52       
     53#        status, output = commands.getstatusoutput("cmt broadcast -local 'echo <package>'")
     54#        lignes = string.split(output, '\n')
     55#        i = 1
     56#        for package in lignes:
     57#            if package!='' and package[0] != '#':                           
     58#                print i , package
     59#                i =i +1
     60#                if not self.packages.has_key(package):
     61#                                    print package       
     62#        print len(self.packages)
     63#        sys.exit(-1)
    5064
    5165    def get_current_project(self):
     
    246260                To_remove.append (key)
    247261        for item in To_remove:
    248             self.packages.pop(item)
     262             del self.packages[item]
    249263
    250264    def simulate_execution(self):
     
    409423      print '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path
    410424      print '#--------------------------------------------------------------'
    411       cmd = arg['cmd']
    412       status, output, error, pythonError  = exeCommand(cmd)#,iTimeout = 3600)
     425      cmd = arg['cmd']
     426      #status, output= commands.getstatusoutput(cmd)
     427      # init output file
    413428      if self.output is not None:
    414          f = open (self.output+'/'+arg['package']+'_output.log', 'w+')
    415          f.write (output)
    416          f.close()
    417          f = open (self.output+'/'+arg['package']+'_error.log', 'w+')
    418          f.write (str(error))
    419          f.close()
     429           f = open (self.output+'/'+arg['package']+'_output.log', 'w+')
     430           f.close()     
     431           if self.error is not None:
     432               f = open (self.error+'/'+arg['package']+'_error.log', 'w+')
     433               f.close()           
     434      status, output, error, pythonError  = exeCommand(sCmd=cmd, oLineCallback=self.redirectOutput, arg=arg)#,iTimeout = 3600)
    420435      self.suppress_work_unit (arg['package'])
    421436      self.set_work_unit_status (arg['package'], 'done')
     
    424439      #   raise RuntimeError(output)
    425440      return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
     441
     442    def redirectOutput(self, index, buffer, arg):
     443        """Filter function to redirect the std output and error of the job
     444           executable for real-time debugging
     445        """
     446        if self.output is not None:
     447           if index==0:   
     448               f = open (self.output+'/'+arg['package']+'_output.log', 'a')
     449               f.write (buffer+'\n')
     450               f.close()
     451           elif index==1:
     452               if self.error is not None:
     453                   f = open (self.error+'/'+arg['package']+'_error.log', 'a')
     454               else:
     455                   f = open (self.output+'/'+arg['package']+'_output.log', 'a')                   
     456               f.write (buffer+'\n')                   
     457               f.close()                               
     458        if not self.silent:
     459            print buffer
    426460             
    427461    # this will be called when an exception occurs within a thread
     
    433467      print '#--------------------------------------------------------------'   
    434468      sys.exit(-1)
     469
     470
    435471#--------- EoF --------#
  • tbroadcast/v2/scripts/tbroadcast

    r243 r244  
    2525    print '#   -ignore_cycles        : '
    2626    print '#   -nb=<num_worker>      : Total number of threads'
    27     print '#   -output=<location>    : Output directory to store output files with the form <package>.log'
     27    print '#   -output=<location>    : Output directory to store output files with the form <package>_output.log'
     28    print '#   -error=<location>     : Output directory to store error output with the form <package>_error.log'
    2829    print '#   -print                : Print dependency graph'
    2930    print '#   -test                 : Simulate execution'
     
    3839    local         = False
    3940    ignore_cycles = False
     41    silent        = False
    4042    output        = None
     43    error         = None
    4144    file          = None
    4245    for arg in sys.argv[1:len(sys.argv)]:           
     
    4952             if option == '-output':
    5053                output = string.split(arg,'=')[1]
     54             if option == '-error':
     55                error = string.split(arg,'=')[1]
    5156             if option == '-local':
    5257                 local= True
    5358             if option == '-ignore_cycles':             
    5459                 ignore_cycles = True
     60             if option == '-silent':             
     61                 silent = True
     62                 
    5563             if option == '-help':   
    5664                  usage()
     
    6674              command = arg
    6775
    68     master = Scheduler (num_workers=num_worker, file=file, ignore_cycles=ignore_cycles, local=local, output=output)
     76    master = Scheduler (num_workers=num_worker, file=file, ignore_cycles=ignore_cycles, local=local, output=output, error=error, silent=silent)
    6977    if test:
    7078        master.simulate_execution()
Note: See TracChangeset for help on using the changeset viewer.