source: tbroadcast/v2/python/executer.py @ 244

Last change on this file since 244 was 244, checked in by garonne, 18 years ago

MàJ

File size: 8.3 KB
Line 
1# -*- coding: iso-8859-1 -*-
2
3import os
4import sys
5import string
6import popen2
7import stat
8import re
9import time
10import commands
11import select
12
13from os.path import join
14
15############################################################################
16## Error and success return function
17def S_ERROR( sMessage = '' ):
18  return { 'Status': -1, 'OK' : 0, 'Message' : sMessage  }
19   
20def S_OK( sValue = None, sPname = 'Value' ):
21  dResult = { 'Status': 'OK', 'OK' : 1 }
22  if sValue is not None:
23    dResult[ sPname ] = sValue
24  return dResult
25 
26############################################################################ 
27class SubprocessExecuter:
28
29    def __init__( self, iTimeout = False ):
30        self.changeTimeout( iTimeout )
31        self.iBufferLimit = 5242880 # 5MB limit for data
32
33    def changeTimeout( self, iTimeout ):
34        self.iTimeout = iTimeout
35        if self.iTimeout == 0:
36            self.iTimeout = False
37       
38    def __readFromPipe( self, oPipe, iBaseLength = 0 ):
39        sData = ""
40        iMaxSliceLength = 8192
41        iLastSliceLength = 8192
42       
43        while iLastSliceLength == iMaxSliceLength:
44            sReadBuffer = os.read( oPipe, iMaxSliceLength )
45            iLastSliceLength = len( sReadBuffer )
46            sData += sReadBuffer
47            if len( sData ) + iBaseLength > self.iBufferLimit:
48                dRetVal = S_ERROR( "Reached maximum allowed length (%d bytes) for called function return value" % self.iBufferLimit )
49                dRetVal[ 'ReadData' ] = sData
50                return dRetVal
51           
52        return S_OK( sData )
53                   
54    def __executePythonFunction( self, oFunc, stArgs, oWritePipe ):
55        try:
56            os.write( oWritePipe, "%s\n" % str( S_OK( oFunc( *stArgs ) ) ) )
57        except Exception, v:
58            os.write( oWritePipe, "%s\n" % str( S_ERROR( str( v ) ) ) )
59        try:
60            os.close( oWritePipe )
61        finally:
62            os._exit(0)
63   
64    def __selectFD( self, lR, iTimeout = False ):
65        if self.iTimeout and not iTimeout:
66            iTimeout = self.iTimeout
67        if not iTimeout: 
68            return select.select( lR , [], [] )[0]
69        else:
70            return select.select( lR , [], [], iTimeout )[0]
71   
72    def pythonCall( self, oFunction, stArgs ):
73        oReadPipe, oWritePipe = os.pipe()
74        iPid = os.fork()
75        if iPid == 0:
76            os.close( oReadPipe )
77            self.__executePythonFunction( oFunction, stArgs, oWritePipe )
78            os.close( oWritePipe )
79        else:
80            os.close( oWritePipe )
81            lReadable = self.__selectFD( [ oReadPipe ] )
82            if len( lReadable ) == 0:
83                os.close( oReadPipe )
84                os.kill( iPid, 9 )
85                os.waitpid( iPid, 0 )
86                return S_ERROR( "%d seconds timeout for '%s' call" % ( self.iTimeout, oFunction.__name__ ) )
87            elif lReadable[0] == oReadPipe:
88                dData = self.__readFromPipe( oReadPipe )
89                os.close( oReadPipe )
90                os.waitpid( iPid, 0 )
91                if dData[ 'OK' ]:
92                    return eval( dData[ 'Value' ] )
93                return dData
94           
95    def __generateSystemCommandError( self, sMessage ):
96        retVal = S_ERROR( sMessage )
97        retVal[ 'stdout' ] = self.lBuffers[0][0]
98        retVal[ 'stderr' ] = self.lBuffers[1][0]
99        return retVal
100       
101    def __readFromFile( self, oFile, iBaseLength, bAll ):
102        try:
103            if bAll:
104                sData = "".join( oFile.readlines() )
105            else:
106                sData = oFile.readline()
107        except Exception, v:
108            pass 
109        if sData == "":
110            #self.checkAlive()
111            self.bAlive = False
112        if len( sData ) + iBaseLength > self.iBufferLimit:
113            dRetVal = S_ERROR( "Reached maximum allowed length (%d bytes) for called function return value" % self.iBufferLimit )
114            dRetVal[ 'ReadData' ] = sData
115            return dRetVal
116           
117        return S_OK( sData )
118
119    def __readFromSystemCommandOutput( self, oFile, iDataIndex, bAll = False ):
120        retVal = self.__readFromFile( oFile, len( self.lBuffers[ iDataIndex ][0] ), bAll )
121        if retVal[ 'OK' ]:
122            self.lBuffers[ iDataIndex ][0] += retVal[ 'Value' ]
123            if not self.oCallback == None:
124                while self.__callLineCallback( iDataIndex ):
125                    pass
126            return S_OK()
127        else:
128            self.lBuffers[ iDataIndex ][0] += retVal[ 'ReadData' ]
129            os.kill( self.oChild.pid, 9 )
130            self.oChild.wait()
131            return self.__generateSystemCommandError( "Exceeded maximum buffer size ( %d bytes ) timeout for '%s' call" % ( self.iBufferLimit, self.sCmd ) )
132
133    def systemCall( self, sCmd, oCallbackFunction = None, arg=None ):
134        self.sCmd = sCmd
135        self.oCallback = oCallbackFunction
136        self.arg    = arg
137        self.oChild = popen2.Popen3( self.sCmd, True )
138        self.lBuffers = [ [ "", 0 ], [ "", 0 ] ]
139        iInitialTime = time.time()
140        iExitStatus = self.oChild.poll()
141
142        while iExitStatus == -1:
143            retVal = self.__readFromCommand()
144            if not retVal[ 'OK' ]:
145                return retVal
146            if self.iTimeout and time.time() - iInitialTime > self.iTimeout:
147                os.kill( self.oChild.pid, 9 )
148                self.oChild.wait()
149                self.__readFromCommand( True )
150                self.oChild.fromchild.close()
151                self.oChild.childerr.close()
152                return self.__generateSystemCommandError( "Timeout (%d seconds) for '%s' call" % ( self.iTimeout, sCmd ) )
153            iExitStatus = self.oChild.poll()
154 
155        self.__readFromCommand(True )
156
157        self.oChild.fromchild.close()
158        self.oChild.childerr.close() 
159        return S_OK( ( iExitStatus / 256, self.lBuffers[0][0], self.lBuffers[1][0] ) )
160
161    def __readFromCommand( self, bLast = False ):
162        if bLast:
163            retVal = self.__readFromSystemCommandOutput( self.oChild.fromchild, 0, True )
164            if not retVal[ 'OK' ]:
165                return retVal
166            retVal = self.__readFromSystemCommandOutput( self.oChild.childerr, 1, True )
167            if not retVal[ 'OK' ]:
168                return retVal
169        else:
170            lReadable = self.__selectFD( [ self.oChild.fromchild, self.oChild.childerr ], 1 )
171            if self.oChild.fromchild in lReadable:
172                retVal = self.__readFromSystemCommandOutput( self.oChild.fromchild, 0 )
173                if not retVal[ 'OK' ]:
174                    return retVal
175            if self.oChild.childerr in lReadable:
176                retVal = self.__readFromSystemCommandOutput( self.oChild.childerr, 1 )
177                if not retVal[ 'OK' ]:
178                    return retVal
179        return S_OK()
180
181   
182    def __callLineCallback( self, iIndex ):
183        iNextLine = self.lBuffers[ iIndex ][0][ self.lBuffers[ iIndex ][1]: ].find( "\n" )
184        if iNextLine > -1:
185            self.oCallback( iIndex, self.lBuffers[ iIndex ][0][ self.lBuffers[ iIndex ][1]: self.lBuffers[ iIndex ][1] + iNextLine ], arg=self.arg )
186            self.lBuffers[ iIndex ][1] += iNextLine + 1 
187            return True
188        return False
189
190############################################################################   
191def redirectOutput(index, buffer):
192    """Filter function to redirect the std output and error of the job
193       executable for real-time debugging
194    """
195    print buffer
196
197
198############################################################################     
199def exeCommand( sCmd, iTimeout = 0, oLineCallback = redirectOutput, arg=None):
200    """Return ( status, output, error, pythonError ) of executing cmd in a shell."""
201    oSPE = SubprocessExecuter( iTimeout )
202    retVal = oSPE.systemCall( sCmd, oLineCallback, arg=arg)
203    if retVal[ 'OK' ]:
204        return retVal[ 'Value' ][0], retVal[ 'Value' ][1], retVal[ 'Value' ][2], 0
205    else:
206        if re.search("Timeout",retVal['Message']):
207          return 1, retVal['stdout'], retVal['Message'], 2
208        else: 
209          return 1, retVal['stdout'], retVal['stderr'], 1
210   
211#################################################################################
212###################################### EoF ######################################
213#################################################################################
Note: See TracBrowser for help on using the repository browser.