source: tbroadcast/HEAD/python/executer.py@ 488

Last change on this file since 488 was 478, checked in by rybkin, 17 years ago

See C.L. 2

File size: 8.8 KB
Line 
1# -*- coding: iso-8859-1 -*-
2
3import os
4import sys
5import string
6import popen2
7import stat
8import re
9import time
10import commands
11import select
12
13from os.path import join
14
15############################################################################
16## Error and success return function
17def S_ERROR( sMessage = '' ):
18 return { 'Status': -1, 'OK' : 0, 'Message' : sMessage }
19
20def S_OK( sValue = None, sPname = 'Value' ):
21 dResult = { 'Status': 'OK', 'OK' : 1 }
22 if sValue is not None:
23 dResult[ sPname ] = sValue
24 return dResult
25
26############################################################################
27class SubprocessExecuter:
28
29 def __init__( self, iTimeout = False ):
30 self.changeTimeout( iTimeout )
31 self.iBufferLimit = 5242880 # 5MB limit for data
32
33 def changeTimeout( self, iTimeout ):
34 self.iTimeout = iTimeout
35 if self.iTimeout == 0:
36 self.iTimeout = False
37
38 def __readFromPipe( self, oPipe, iBaseLength = 0 ):
39 sData = ""
40 iMaxSliceLength = 8192
41 iLastSliceLength = 8192
42
43 while iLastSliceLength == iMaxSliceLength:
44 sReadBuffer = os.read( oPipe, iMaxSliceLength )
45 iLastSliceLength = len( sReadBuffer )
46 sData += sReadBuffer
47 if len( sData ) + iBaseLength > self.iBufferLimit:
48 dRetVal = S_ERROR( "Reached maximum allowed length (%d bytes) for called function return value" % self.iBufferLimit )
49 dRetVal[ 'ReadData' ] = sData
50 return dRetVal
51
52 return S_OK( sData )
53
54 def __executePythonFunction( self, oFunc, stArgs, oWritePipe ):
55 try:
56 os.write( oWritePipe, "%s\n" % str( S_OK( oFunc( *stArgs ) ) ) )
57 except Exception, v:
58 os.write( oWritePipe, "%s\n" % str( S_ERROR( str( v ) ) ) )
59 try:
60 os.close( oWritePipe )
61 finally:
62 os._exit(0)
63
64 def __selectFD( self, lR, iTimeout = False ):
65 if self.iTimeout and not iTimeout:
66 iTimeout = self.iTimeout
67 if not iTimeout:
68 return select.select( lR , [], [] )[0]
69 else:
70 return select.select( lR , [], [], iTimeout )[0]
71
72 def pythonCall( self, oFunction, stArgs ):
73 oReadPipe, oWritePipe = os.pipe()
74 iPid = os.fork()
75 if iPid == 0:
76 os.close( oReadPipe )
77 self.__executePythonFunction( oFunction, stArgs, oWritePipe )
78 os.close( oWritePipe )
79 else:
80 os.close( oWritePipe )
81 lReadable = self.__selectFD( [ oReadPipe ] )
82 if len( lReadable ) == 0:
83 os.close( oReadPipe )
84 os.kill( iPid, 9 )
85 os.waitpid( iPid, 0 )
86 return S_ERROR( "%d seconds timeout for '%s' call" % ( self.iTimeout, oFunction.__name__ ) )
87 elif lReadable[0] == oReadPipe:
88 dData = self.__readFromPipe( oReadPipe )
89 os.close( oReadPipe )
90 os.waitpid( iPid, 0 )
91 if dData[ 'OK' ]:
92 return eval( dData[ 'Value' ] )
93 return dData
94
95 def __generateSystemCommandError( self, sMessage ):
96 retVal = S_ERROR( sMessage )
97 retVal[ 'stdout' ] = self.lBuffers[0][0]
98 retVal[ 'stderr' ] = self.lBuffers[1][0]
99 return retVal
100
101 def __readFromFile( self, oFile, iBaseLength, bAll ):
102 try:
103 if bAll:
104 sData = "".join( oFile.readlines() )
105 else:
106 sData = oFile.readline()
107 except Exception, v:
108 pass
109 if sData == "":
110 #self.checkAlive()
111 self.bAlive = False
112 if len( sData ) + iBaseLength > self.iBufferLimit:
113 dRetVal = S_ERROR( "Reached maximum allowed length (%d bytes) for called function return value" % self.iBufferLimit )
114 dRetVal[ 'ReadData' ] = sData
115 return dRetVal
116
117 return S_OK( sData )
118
119 def __readFromSystemCommandOutput( self, oFile, iDataIndex, bAll = False ):
120 retVal = self.__readFromFile( oFile, len( self.lBuffers[ iDataIndex ][0] ), bAll )
121 if retVal[ 'OK' ]:
122 self.lBuffers[ iDataIndex ][0] += retVal[ 'Value' ]
123 if not self.oCallback == None:
124 while self.__callLineCallback( iDataIndex ):
125 pass
126 return S_OK()
127 else:
128 self.lBuffers[ iDataIndex ][0] += retVal[ 'ReadData' ]
129 os.kill( self.oChild.pid, 9 )
130 self.oChild.wait()
131 return self.__generateSystemCommandError( "Exceeded maximum buffer size ( %d bytes ) timeout for '%s' call" % ( self.iBufferLimit, self.sCmd ) )
132
133 def systemCall( self, sCmd, oCallbackFunction = None, arg=None ):
134 self.sCmd = sCmd
135 self.oCallback = oCallbackFunction
136 self.arg = arg
137 self.oChild = popen2.Popen3( self.sCmd, True )
138 self.lBuffers = [ [ "", 0 ], [ "", 0 ] ]
139 iInitialTime = time.time()
140 iExitStatus = self.oChild.poll()
141
142 while iExitStatus == -1:
143 retVal = self.__readFromCommand()
144 if not retVal[ 'OK' ]:
145 return retVal
146 if self.iTimeout and time.time() - iInitialTime > self.iTimeout:
147 os.kill( self.oChild.pid, 9 )
148 self.oChild.wait()
149 self.__readFromCommand( True )
150 self.oChild.fromchild.close()
151 self.oChild.childerr.close()
152 return self.__generateSystemCommandError( "Timeout (%d seconds) for '%s' call" % ( self.iTimeout, sCmd ) )
153 iExitStatus = self.oChild.poll()
154
155 self.__readFromCommand(True )
156
157 self.oChild.fromchild.close()
158 self.oChild.childerr.close()
159 return S_OK( ( iExitStatus / 256, self.lBuffers[0][0], self.lBuffers[1][0] ) )
160
161 def __readFromCommand( self, bLast = False ):
162 if bLast:
163 retVal = self.__readFromSystemCommandOutput( self.oChild.fromchild, 0, True )
164 if not retVal[ 'OK' ]:
165 return retVal
166 retVal = self.__readFromSystemCommandOutput( self.oChild.childerr, 1, True )
167 if not retVal[ 'OK' ]:
168 return retVal
169 else:
170 lReadable = self.__selectFD( [ self.oChild.fromchild, self.oChild.childerr ], 1 )
171 if self.oChild.fromchild in lReadable:
172 retVal = self.__readFromSystemCommandOutput( self.oChild.fromchild, 0 )
173 if not retVal[ 'OK' ]:
174 return retVal
175 if self.oChild.childerr in lReadable:
176 retVal = self.__readFromSystemCommandOutput( self.oChild.childerr, 1 )
177 if not retVal[ 'OK' ]:
178 return retVal
179 return S_OK()
180
181
182 def __callLineCallback( self, iIndex ):
183 iNextLine = self.lBuffers[ iIndex ][0][ self.lBuffers[ iIndex ][1]: ].find( "\n" )
184 if iNextLine > -1:
185 self.oCallback( iIndex, self.lBuffers[ iIndex ][0][ self.lBuffers[ iIndex ][1]: self.lBuffers[ iIndex ][1] + iNextLine ], arg=self.arg )
186 self.lBuffers[ iIndex ][1] += iNextLine + 1
187 return True
188 return False
189
190############################################################################
191def redirectOutput(index, buffer):
192 """Filter function to redirect the std output and error of the job
193 executable for real-time debugging
194 """
195 print buffer
196
197
198############################################################################
199def exeCommand( sCmd, iTimeout = 0, oLineCallback = redirectOutput, arg=None):
200 """Return ( status, output, error, pythonError ) of executing cmd in a shell."""
201 oSPE = SubprocessExecuter( iTimeout )
202 retVal = oSPE.systemCall( sCmd, oLineCallback, arg=arg)
203 if retVal[ 'OK' ]:
204 return retVal[ 'Value' ][0], retVal[ 'Value' ][1], retVal[ 'Value' ][2], 0
205 else:
206 if re.search("Timeout",retVal['Message']):
207 return 1, retVal['stdout'], retVal['Message'], 2
208 else:
209 return 1, retVal['stdout'], retVal['stderr'], 1
210
211############################################################################
212def getstatusoutput(cmd):
213 """Return (status, stdout) of executing cmd in a shell.
214
215 A trailing line separator is removed from the output string. The exit status of the command is encoded in the format specified for wait(), when the exit status is zero (termination without errors), 0 is returned.
216 """
217 import os
218 p = os.popen(cmd, 'r')
219 out = p.read()
220 sts = p.close()
221 if sts is None: sts = 0
222 if out.endswith(os.linesep):
223 out = out[:out.rindex(os.linesep)]
224 return sts, out
225
226#################################################################################
227###################################### EoF ######################################
228#################################################################################
Note: See TracBrowser for help on using the repository browser.