source: tbroadcast/v2/python/executer.py@ 245

Last change on this file since 245 was 244, checked in by garonne, 19 years ago

MàJ

File size: 8.3 KB
Line 
1# -*- coding: iso-8859-1 -*-
2
3import os
4import sys
5import string
6import popen2
7import stat
8import re
9import time
10import commands
11import select
12
13from os.path import join
14
15############################################################################
16## Error and success return function
17def S_ERROR( sMessage = '' ):
18 return { 'Status': -1, 'OK' : 0, 'Message' : sMessage }
19
20def S_OK( sValue = None, sPname = 'Value' ):
21 dResult = { 'Status': 'OK', 'OK' : 1 }
22 if sValue is not None:
23 dResult[ sPname ] = sValue
24 return dResult
25
26############################################################################
27class SubprocessExecuter:
28
29 def __init__( self, iTimeout = False ):
30 self.changeTimeout( iTimeout )
31 self.iBufferLimit = 5242880 # 5MB limit for data
32
33 def changeTimeout( self, iTimeout ):
34 self.iTimeout = iTimeout
35 if self.iTimeout == 0:
36 self.iTimeout = False
37
38 def __readFromPipe( self, oPipe, iBaseLength = 0 ):
39 sData = ""
40 iMaxSliceLength = 8192
41 iLastSliceLength = 8192
42
43 while iLastSliceLength == iMaxSliceLength:
44 sReadBuffer = os.read( oPipe, iMaxSliceLength )
45 iLastSliceLength = len( sReadBuffer )
46 sData += sReadBuffer
47 if len( sData ) + iBaseLength > self.iBufferLimit:
48 dRetVal = S_ERROR( "Reached maximum allowed length (%d bytes) for called function return value" % self.iBufferLimit )
49 dRetVal[ 'ReadData' ] = sData
50 return dRetVal
51
52 return S_OK( sData )
53
54 def __executePythonFunction( self, oFunc, stArgs, oWritePipe ):
55 try:
56 os.write( oWritePipe, "%s\n" % str( S_OK( oFunc( *stArgs ) ) ) )
57 except Exception, v:
58 os.write( oWritePipe, "%s\n" % str( S_ERROR( str( v ) ) ) )
59 try:
60 os.close( oWritePipe )
61 finally:
62 os._exit(0)
63
64 def __selectFD( self, lR, iTimeout = False ):
65 if self.iTimeout and not iTimeout:
66 iTimeout = self.iTimeout
67 if not iTimeout:
68 return select.select( lR , [], [] )[0]
69 else:
70 return select.select( lR , [], [], iTimeout )[0]
71
72 def pythonCall( self, oFunction, stArgs ):
73 oReadPipe, oWritePipe = os.pipe()
74 iPid = os.fork()
75 if iPid == 0:
76 os.close( oReadPipe )
77 self.__executePythonFunction( oFunction, stArgs, oWritePipe )
78 os.close( oWritePipe )
79 else:
80 os.close( oWritePipe )
81 lReadable = self.__selectFD( [ oReadPipe ] )
82 if len( lReadable ) == 0:
83 os.close( oReadPipe )
84 os.kill( iPid, 9 )
85 os.waitpid( iPid, 0 )
86 return S_ERROR( "%d seconds timeout for '%s' call" % ( self.iTimeout, oFunction.__name__ ) )
87 elif lReadable[0] == oReadPipe:
88 dData = self.__readFromPipe( oReadPipe )
89 os.close( oReadPipe )
90 os.waitpid( iPid, 0 )
91 if dData[ 'OK' ]:
92 return eval( dData[ 'Value' ] )
93 return dData
94
95 def __generateSystemCommandError( self, sMessage ):
96 retVal = S_ERROR( sMessage )
97 retVal[ 'stdout' ] = self.lBuffers[0][0]
98 retVal[ 'stderr' ] = self.lBuffers[1][0]
99 return retVal
100
101 def __readFromFile( self, oFile, iBaseLength, bAll ):
102 try:
103 if bAll:
104 sData = "".join( oFile.readlines() )
105 else:
106 sData = oFile.readline()
107 except Exception, v:
108 pass
109 if sData == "":
110 #self.checkAlive()
111 self.bAlive = False
112 if len( sData ) + iBaseLength > self.iBufferLimit:
113 dRetVal = S_ERROR( "Reached maximum allowed length (%d bytes) for called function return value" % self.iBufferLimit )
114 dRetVal[ 'ReadData' ] = sData
115 return dRetVal
116
117 return S_OK( sData )
118
119 def __readFromSystemCommandOutput( self, oFile, iDataIndex, bAll = False ):
120 retVal = self.__readFromFile( oFile, len( self.lBuffers[ iDataIndex ][0] ), bAll )
121 if retVal[ 'OK' ]:
122 self.lBuffers[ iDataIndex ][0] += retVal[ 'Value' ]
123 if not self.oCallback == None:
124 while self.__callLineCallback( iDataIndex ):
125 pass
126 return S_OK()
127 else:
128 self.lBuffers[ iDataIndex ][0] += retVal[ 'ReadData' ]
129 os.kill( self.oChild.pid, 9 )
130 self.oChild.wait()
131 return self.__generateSystemCommandError( "Exceeded maximum buffer size ( %d bytes ) timeout for '%s' call" % ( self.iBufferLimit, self.sCmd ) )
132
133 def systemCall( self, sCmd, oCallbackFunction = None, arg=None ):
134 self.sCmd = sCmd
135 self.oCallback = oCallbackFunction
136 self.arg = arg
137 self.oChild = popen2.Popen3( self.sCmd, True )
138 self.lBuffers = [ [ "", 0 ], [ "", 0 ] ]
139 iInitialTime = time.time()
140 iExitStatus = self.oChild.poll()
141
142 while iExitStatus == -1:
143 retVal = self.__readFromCommand()
144 if not retVal[ 'OK' ]:
145 return retVal
146 if self.iTimeout and time.time() - iInitialTime > self.iTimeout:
147 os.kill( self.oChild.pid, 9 )
148 self.oChild.wait()
149 self.__readFromCommand( True )
150 self.oChild.fromchild.close()
151 self.oChild.childerr.close()
152 return self.__generateSystemCommandError( "Timeout (%d seconds) for '%s' call" % ( self.iTimeout, sCmd ) )
153 iExitStatus = self.oChild.poll()
154
155 self.__readFromCommand(True )
156
157 self.oChild.fromchild.close()
158 self.oChild.childerr.close()
159 return S_OK( ( iExitStatus / 256, self.lBuffers[0][0], self.lBuffers[1][0] ) )
160
161 def __readFromCommand( self, bLast = False ):
162 if bLast:
163 retVal = self.__readFromSystemCommandOutput( self.oChild.fromchild, 0, True )
164 if not retVal[ 'OK' ]:
165 return retVal
166 retVal = self.__readFromSystemCommandOutput( self.oChild.childerr, 1, True )
167 if not retVal[ 'OK' ]:
168 return retVal
169 else:
170 lReadable = self.__selectFD( [ self.oChild.fromchild, self.oChild.childerr ], 1 )
171 if self.oChild.fromchild in lReadable:
172 retVal = self.__readFromSystemCommandOutput( self.oChild.fromchild, 0 )
173 if not retVal[ 'OK' ]:
174 return retVal
175 if self.oChild.childerr in lReadable:
176 retVal = self.__readFromSystemCommandOutput( self.oChild.childerr, 1 )
177 if not retVal[ 'OK' ]:
178 return retVal
179 return S_OK()
180
181
182 def __callLineCallback( self, iIndex ):
183 iNextLine = self.lBuffers[ iIndex ][0][ self.lBuffers[ iIndex ][1]: ].find( "\n" )
184 if iNextLine > -1:
185 self.oCallback( iIndex, self.lBuffers[ iIndex ][0][ self.lBuffers[ iIndex ][1]: self.lBuffers[ iIndex ][1] + iNextLine ], arg=self.arg )
186 self.lBuffers[ iIndex ][1] += iNextLine + 1
187 return True
188 return False
189
190############################################################################
191def redirectOutput(index, buffer):
192 """Filter function to redirect the std output and error of the job
193 executable for real-time debugging
194 """
195 print buffer
196
197
198############################################################################
199def exeCommand( sCmd, iTimeout = 0, oLineCallback = redirectOutput, arg=None):
200 """Return ( status, output, error, pythonError ) of executing cmd in a shell."""
201 oSPE = SubprocessExecuter( iTimeout )
202 retVal = oSPE.systemCall( sCmd, oLineCallback, arg=arg)
203 if retVal[ 'OK' ]:
204 return retVal[ 'Value' ][0], retVal[ 'Value' ][1], retVal[ 'Value' ][2], 0
205 else:
206 if re.search("Timeout",retVal['Message']):
207 return 1, retVal['stdout'], retVal['Message'], 2
208 else:
209 return 1, retVal['stdout'], retVal['stderr'], 1
210
211#################################################################################
212###################################### EoF ######################################
213#################################################################################
Note: See TracBrowser for help on using the repository browser.