source: tbroadcast/v2/python/executer.py @ 232

Last change on this file since 232 was 232, checked in by garonne, 18 years ago

add version 2 in python

File size: 8.2 KB
Line 
1# -*- coding: iso-8859-1 -*-
2
3import os
4import sys
5import string
6import popen2
7import stat
8import re
9import time
10import commands
11import select
12
13from os.path import join
14
15############################################################################
16## Error and success return function
17def S_ERROR( sMessage = '' ):
18  return { 'Status': -1, 'OK' : 0, 'Message' : sMessage  }
19   
20def S_OK( sValue = None, sPname = 'Value' ):
21  dResult = { 'Status': 'OK', 'OK' : 1 }
22  if sValue is not None:
23    dResult[ sPname ] = sValue
24  return dResult
25 
26############################################################################ 
27class SubprocessExecuter:
28
29    def __init__( self, iTimeout = False ):
30        self.changeTimeout( iTimeout )
31        self.iBufferLimit = 5242880 # 5MB limit for data
32
33    def changeTimeout( self, iTimeout ):
34        self.iTimeout = iTimeout
35        if self.iTimeout == 0:
36            self.iTimeout = False
37       
38    def __readFromPipe( self, oPipe, iBaseLength = 0 ):
39        sData = ""
40        iMaxSliceLength = 8192
41        iLastSliceLength = 8192
42       
43        while iLastSliceLength == iMaxSliceLength:
44            sReadBuffer = os.read( oPipe, iMaxSliceLength )
45            iLastSliceLength = len( sReadBuffer )
46            sData += sReadBuffer
47            if len( sData ) + iBaseLength > self.iBufferLimit:
48                dRetVal = S_ERROR( "Reached maximum allowed length (%d bytes) for called function return value" % self.iBufferLimit )
49                dRetVal[ 'ReadData' ] = sData
50                return dRetVal
51           
52        return S_OK( sData )
53                   
54    def __executePythonFunction( self, oFunc, stArgs, oWritePipe ):
55        try:
56            os.write( oWritePipe, "%s\n" % str( S_OK( oFunc( *stArgs ) ) ) )
57        except Exception, v:
58            os.write( oWritePipe, "%s\n" % str( S_ERROR( str( v ) ) ) )
59        try:
60            os.close( oWritePipe )
61        finally:
62            os._exit(0)
63   
64    def __selectFD( self, lR, iTimeout = False ):
65        if self.iTimeout and not iTimeout:
66            iTimeout = self.iTimeout
67        if not iTimeout: 
68            return select.select( lR , [], [] )[0]
69        else:
70            return select.select( lR , [], [], iTimeout )[0]
71   
72    def pythonCall( self, oFunction, stArgs ):
73        oReadPipe, oWritePipe = os.pipe()
74        iPid = os.fork()
75        if iPid == 0:
76            os.close( oReadPipe )
77            self.__executePythonFunction( oFunction, stArgs, oWritePipe )
78            os.close( oWritePipe )
79        else:
80            os.close( oWritePipe )
81            lReadable = self.__selectFD( [ oReadPipe ] )
82            if len( lReadable ) == 0:
83                os.close( oReadPipe )
84                os.kill( iPid, 9 )
85                os.waitpid( iPid, 0 )
86                return S_ERROR( "%d seconds timeout for '%s' call" % ( self.iTimeout, oFunction.__name__ ) )
87            elif lReadable[0] == oReadPipe:
88                dData = self.__readFromPipe( oReadPipe )
89                os.close( oReadPipe )
90                os.waitpid( iPid, 0 )
91                if dData[ 'OK' ]:
92                    return eval( dData[ 'Value' ] )
93                return dData
94           
95    def __generateSystemCommandError( self, sMessage ):
96        retVal = S_ERROR( sMessage )
97        retVal[ 'stdout' ] = self.lBuffers[0][0]
98        retVal[ 'stderr' ] = self.lBuffers[1][0]
99        return retVal
100       
101    def __readFromFile( self, oFile, iBaseLength, bAll ):
102        try:
103            if bAll:
104                sData = "".join( oFile.readlines() )
105            else:
106                sData = oFile.readline()
107        except Exception, v:
108            pass 
109        if sData == "":
110            #self.checkAlive()
111            self.bAlive = False
112        if len( sData ) + iBaseLength > self.iBufferLimit:
113            dRetVal = S_ERROR( "Reached maximum allowed length (%d bytes) for called function return value" % self.iBufferLimit )
114            dRetVal[ 'ReadData' ] = sData
115            return dRetVal
116           
117        return S_OK( sData )
118
119    def __readFromSystemCommandOutput( self, oFile, iDataIndex, bAll = False ):
120        retVal = self.__readFromFile( oFile, len( self.lBuffers[ iDataIndex ][0] ), bAll )
121        if retVal[ 'OK' ]:
122            self.lBuffers[ iDataIndex ][0] += retVal[ 'Value' ]
123            if not self.oCallback == None:
124                while self.__callLineCallback( iDataIndex ):
125                    pass
126            return S_OK()
127        else:
128            self.lBuffers[ iDataIndex ][0] += retVal[ 'ReadData' ]
129            os.kill( self.oChild.pid, 9 )
130            self.oChild.wait()
131            return self.__generateSystemCommandError( "Exceeded maximum buffer size ( %d bytes ) timeout for '%s' call" % ( self.iBufferLimit, self.sCmd ) )
132
133    def systemCall( self, sCmd, oCallbackFunction = None ):
134        self.sCmd = sCmd
135        self.oCallback = oCallbackFunction
136        self.oChild = popen2.Popen3( self.sCmd, True )
137        self.lBuffers = [ [ "", 0 ], [ "", 0 ] ]
138        iInitialTime = time.time()
139        iExitStatus = self.oChild.poll()
140
141        while iExitStatus == -1:
142            retVal = self.__readFromCommand()
143            if not retVal[ 'OK' ]:
144                return retVal
145            if self.iTimeout and time.time() - iInitialTime > self.iTimeout:
146                os.kill( self.oChild.pid, 9 )
147                self.oChild.wait()
148                self.__readFromCommand( True )
149                self.oChild.fromchild.close()
150                self.oChild.childerr.close()
151                return self.__generateSystemCommandError( "Timeout (%d seconds) for '%s' call" % ( self.iTimeout, sCmd ) )
152            iExitStatus = self.oChild.poll()
153 
154        self.__readFromCommand(True )
155
156        self.oChild.fromchild.close()
157        self.oChild.childerr.close() 
158        return S_OK( ( iExitStatus / 256, self.lBuffers[0][0], self.lBuffers[1][0] ) )
159
160    def __readFromCommand( self, bLast = False ):
161        if bLast:
162            retVal = self.__readFromSystemCommandOutput( self.oChild.fromchild, 0, True )
163            if not retVal[ 'OK' ]:
164                return retVal
165            retVal = self.__readFromSystemCommandOutput( self.oChild.childerr, 1, True )
166            if not retVal[ 'OK' ]:
167                return retVal
168        else:
169            lReadable = self.__selectFD( [ self.oChild.fromchild, self.oChild.childerr ], 1 )
170            if self.oChild.fromchild in lReadable:
171                retVal = self.__readFromSystemCommandOutput( self.oChild.fromchild, 0 )
172                if not retVal[ 'OK' ]:
173                    return retVal
174            if self.oChild.childerr in lReadable:
175                retVal = self.__readFromSystemCommandOutput( self.oChild.childerr, 1 )
176                if not retVal[ 'OK' ]:
177                    return retVal
178        return S_OK()
179
180   
181    def __callLineCallback( self, iIndex ):
182        iNextLine = self.lBuffers[ iIndex ][0][ self.lBuffers[ iIndex ][1]: ].find( "\n" )
183        if iNextLine > -1:
184            self.oCallback( iIndex, self.lBuffers[ iIndex ][0][ self.lBuffers[ iIndex ][1]: self.lBuffers[ iIndex ][1] + iNextLine ] )
185            self.lBuffers[ iIndex ][1] += iNextLine + 1 
186            return True
187        return False
188
189############################################################################   
190def redirectOutput(index, buffer):
191    """Filter function to redirect the std output and error of the job
192       executable for real-time debugging
193    """
194    print buffer
195
196
197############################################################################     
198def exeCommand( sCmd, iTimeout = 0, oLineCallback = redirectOutput):
199    """Return ( status, output, error, pythonError ) of executing cmd in a shell."""
200    oSPE = SubprocessExecuter( iTimeout )
201    retVal = oSPE.systemCall( sCmd, oLineCallback )
202    if retVal[ 'OK' ]:
203        return retVal[ 'Value' ][0], retVal[ 'Value' ][1], retVal[ 'Value' ][2], 0
204    else:
205        if re.search("Timeout",retVal['Message']):
206          return 1, retVal['stdout'], retVal['Message'], 2
207        else: 
208          return 1, retVal['stdout'], retVal['stderr'], 1
209   
210#################################################################################
211###################################### EoF ######################################
212#################################################################################
Note: See TracBrowser for help on using the repository browser.