source: tbroadcast/HEAD/python/executer.py @ 478

Last change on this file since 478 was 478, checked in by rybkin, 16 years ago

See C.L. 2

File size: 8.8 KB
Line 
1# -*- coding: iso-8859-1 -*-
2
3import os
4import sys
5import string
6import popen2
7import stat
8import re
9import time
10import commands
11import select
12
13from os.path import join
14
15############################################################################
16## Error and success return function
17def S_ERROR( sMessage = '' ):
18  return { 'Status': -1, 'OK' : 0, 'Message' : sMessage  }
19   
20def S_OK( sValue = None, sPname = 'Value' ):
21  dResult = { 'Status': 'OK', 'OK' : 1 }
22  if sValue is not None:
23    dResult[ sPname ] = sValue
24  return dResult
25 
26############################################################################ 
27class SubprocessExecuter:
28
29    def __init__( self, iTimeout = False ):
30        self.changeTimeout( iTimeout )
31        self.iBufferLimit = 5242880 # 5MB limit for data
32
33    def changeTimeout( self, iTimeout ):
34        self.iTimeout = iTimeout
35        if self.iTimeout == 0:
36            self.iTimeout = False
37       
38    def __readFromPipe( self, oPipe, iBaseLength = 0 ):
39        sData = ""
40        iMaxSliceLength = 8192
41        iLastSliceLength = 8192
42       
43        while iLastSliceLength == iMaxSliceLength:
44            sReadBuffer = os.read( oPipe, iMaxSliceLength )
45            iLastSliceLength = len( sReadBuffer )
46            sData += sReadBuffer
47            if len( sData ) + iBaseLength > self.iBufferLimit:
48                dRetVal = S_ERROR( "Reached maximum allowed length (%d bytes) for called function return value" % self.iBufferLimit )
49                dRetVal[ 'ReadData' ] = sData
50                return dRetVal
51           
52        return S_OK( sData )
53                   
54    def __executePythonFunction( self, oFunc, stArgs, oWritePipe ):
55        try:
56            os.write( oWritePipe, "%s\n" % str( S_OK( oFunc( *stArgs ) ) ) )
57        except Exception, v:
58            os.write( oWritePipe, "%s\n" % str( S_ERROR( str( v ) ) ) )
59        try:
60            os.close( oWritePipe )
61        finally:
62            os._exit(0)
63   
64    def __selectFD( self, lR, iTimeout = False ):
65        if self.iTimeout and not iTimeout:
66            iTimeout = self.iTimeout
67        if not iTimeout: 
68            return select.select( lR , [], [] )[0]
69        else:
70            return select.select( lR , [], [], iTimeout )[0]
71   
72    def pythonCall( self, oFunction, stArgs ):
73        oReadPipe, oWritePipe = os.pipe()
74        iPid = os.fork()
75        if iPid == 0:
76            os.close( oReadPipe )
77            self.__executePythonFunction( oFunction, stArgs, oWritePipe )
78            os.close( oWritePipe )
79        else:
80            os.close( oWritePipe )
81            lReadable = self.__selectFD( [ oReadPipe ] )
82            if len( lReadable ) == 0:
83                os.close( oReadPipe )
84                os.kill( iPid, 9 )
85                os.waitpid( iPid, 0 )
86                return S_ERROR( "%d seconds timeout for '%s' call" % ( self.iTimeout, oFunction.__name__ ) )
87            elif lReadable[0] == oReadPipe:
88                dData = self.__readFromPipe( oReadPipe )
89                os.close( oReadPipe )
90                os.waitpid( iPid, 0 )
91                if dData[ 'OK' ]:
92                    return eval( dData[ 'Value' ] )
93                return dData
94           
95    def __generateSystemCommandError( self, sMessage ):
96        retVal = S_ERROR( sMessage )
97        retVal[ 'stdout' ] = self.lBuffers[0][0]
98        retVal[ 'stderr' ] = self.lBuffers[1][0]
99        return retVal
100       
101    def __readFromFile( self, oFile, iBaseLength, bAll ):
102        try:
103            if bAll:
104                sData = "".join( oFile.readlines() )
105            else:
106                sData = oFile.readline()
107        except Exception, v:
108            pass 
109        if sData == "":
110            #self.checkAlive()
111            self.bAlive = False
112        if len( sData ) + iBaseLength > self.iBufferLimit:
113            dRetVal = S_ERROR( "Reached maximum allowed length (%d bytes) for called function return value" % self.iBufferLimit )
114            dRetVal[ 'ReadData' ] = sData
115            return dRetVal
116           
117        return S_OK( sData )
118
119    def __readFromSystemCommandOutput( self, oFile, iDataIndex, bAll = False ):
120        retVal = self.__readFromFile( oFile, len( self.lBuffers[ iDataIndex ][0] ), bAll )
121        if retVal[ 'OK' ]:
122            self.lBuffers[ iDataIndex ][0] += retVal[ 'Value' ]
123            if not self.oCallback == None:
124                while self.__callLineCallback( iDataIndex ):
125                    pass
126            return S_OK()
127        else:
128            self.lBuffers[ iDataIndex ][0] += retVal[ 'ReadData' ]
129            os.kill( self.oChild.pid, 9 )
130            self.oChild.wait()
131            return self.__generateSystemCommandError( "Exceeded maximum buffer size ( %d bytes ) timeout for '%s' call" % ( self.iBufferLimit, self.sCmd ) )
132
133    def systemCall( self, sCmd, oCallbackFunction = None, arg=None ):
134        self.sCmd = sCmd
135        self.oCallback = oCallbackFunction
136        self.arg    = arg
137        self.oChild = popen2.Popen3( self.sCmd, True )
138        self.lBuffers = [ [ "", 0 ], [ "", 0 ] ]
139        iInitialTime = time.time()
140        iExitStatus = self.oChild.poll()
141
142        while iExitStatus == -1:
143            retVal = self.__readFromCommand()
144            if not retVal[ 'OK' ]:
145                return retVal
146            if self.iTimeout and time.time() - iInitialTime > self.iTimeout:
147                os.kill( self.oChild.pid, 9 )
148                self.oChild.wait()
149                self.__readFromCommand( True )
150                self.oChild.fromchild.close()
151                self.oChild.childerr.close()
152                return self.__generateSystemCommandError( "Timeout (%d seconds) for '%s' call" % ( self.iTimeout, sCmd ) )
153            iExitStatus = self.oChild.poll()
154 
155        self.__readFromCommand(True )
156
157        self.oChild.fromchild.close()
158        self.oChild.childerr.close() 
159        return S_OK( ( iExitStatus / 256, self.lBuffers[0][0], self.lBuffers[1][0] ) )
160
161    def __readFromCommand( self, bLast = False ):
162        if bLast:
163            retVal = self.__readFromSystemCommandOutput( self.oChild.fromchild, 0, True )
164            if not retVal[ 'OK' ]:
165                return retVal
166            retVal = self.__readFromSystemCommandOutput( self.oChild.childerr, 1, True )
167            if not retVal[ 'OK' ]:
168                return retVal
169        else:
170            lReadable = self.__selectFD( [ self.oChild.fromchild, self.oChild.childerr ], 1 )
171            if self.oChild.fromchild in lReadable:
172                retVal = self.__readFromSystemCommandOutput( self.oChild.fromchild, 0 )
173                if not retVal[ 'OK' ]:
174                    return retVal
175            if self.oChild.childerr in lReadable:
176                retVal = self.__readFromSystemCommandOutput( self.oChild.childerr, 1 )
177                if not retVal[ 'OK' ]:
178                    return retVal
179        return S_OK()
180
181   
182    def __callLineCallback( self, iIndex ):
183        iNextLine = self.lBuffers[ iIndex ][0][ self.lBuffers[ iIndex ][1]: ].find( "\n" )
184        if iNextLine > -1:
185            self.oCallback( iIndex, self.lBuffers[ iIndex ][0][ self.lBuffers[ iIndex ][1]: self.lBuffers[ iIndex ][1] + iNextLine ], arg=self.arg )
186            self.lBuffers[ iIndex ][1] += iNextLine + 1 
187            return True
188        return False
189
190############################################################################   
191def redirectOutput(index, buffer):
192    """Filter function to redirect the std output and error of the job
193       executable for real-time debugging
194    """
195    print buffer
196
197
198############################################################################     
199def exeCommand( sCmd, iTimeout = 0, oLineCallback = redirectOutput, arg=None):
200    """Return ( status, output, error, pythonError ) of executing cmd in a shell."""
201    oSPE = SubprocessExecuter( iTimeout )
202    retVal = oSPE.systemCall( sCmd, oLineCallback, arg=arg)
203    if retVal[ 'OK' ]:
204        return retVal[ 'Value' ][0], retVal[ 'Value' ][1], retVal[ 'Value' ][2], 0
205    else:
206        if re.search("Timeout",retVal['Message']):
207          return 1, retVal['stdout'], retVal['Message'], 2
208        else: 
209          return 1, retVal['stdout'], retVal['stderr'], 1
210   
211############################################################################     
212def getstatusoutput(cmd):
213    """Return (status, stdout) of executing cmd in a shell.
214   
215    A trailing line separator is removed from the output string. The exit status of the command is encoded in the format specified for wait(), when the exit status is zero (termination without errors), 0 is returned.
216    """
217    import os
218    p = os.popen(cmd, 'r')
219    out = p.read()
220    sts = p.close()
221    if sts is None: sts = 0
222    if out.endswith(os.linesep):
223        out = out[:out.rindex(os.linesep)]
224    return sts, out
225
226#################################################################################
227###################################### EoF ######################################
228#################################################################################
Note: See TracBrowser for help on using the repository browser.