source: tbroadcast/v2/python/executer.py@ 236

Last change on this file since 236 was 232, checked in by garonne, 19 years ago

add version 2 in python

File size: 8.2 KB
Line 
1# -*- coding: iso-8859-1 -*-
2
3import os
4import sys
5import string
6import popen2
7import stat
8import re
9import time
10import commands
11import select
12
13from os.path import join
14
15############################################################################
16## Error and success return function
17def S_ERROR( sMessage = '' ):
18 return { 'Status': -1, 'OK' : 0, 'Message' : sMessage }
19
20def S_OK( sValue = None, sPname = 'Value' ):
21 dResult = { 'Status': 'OK', 'OK' : 1 }
22 if sValue is not None:
23 dResult[ sPname ] = sValue
24 return dResult
25
26############################################################################
27class SubprocessExecuter:
28
29 def __init__( self, iTimeout = False ):
30 self.changeTimeout( iTimeout )
31 self.iBufferLimit = 5242880 # 5MB limit for data
32
33 def changeTimeout( self, iTimeout ):
34 self.iTimeout = iTimeout
35 if self.iTimeout == 0:
36 self.iTimeout = False
37
38 def __readFromPipe( self, oPipe, iBaseLength = 0 ):
39 sData = ""
40 iMaxSliceLength = 8192
41 iLastSliceLength = 8192
42
43 while iLastSliceLength == iMaxSliceLength:
44 sReadBuffer = os.read( oPipe, iMaxSliceLength )
45 iLastSliceLength = len( sReadBuffer )
46 sData += sReadBuffer
47 if len( sData ) + iBaseLength > self.iBufferLimit:
48 dRetVal = S_ERROR( "Reached maximum allowed length (%d bytes) for called function return value" % self.iBufferLimit )
49 dRetVal[ 'ReadData' ] = sData
50 return dRetVal
51
52 return S_OK( sData )
53
54 def __executePythonFunction( self, oFunc, stArgs, oWritePipe ):
55 try:
56 os.write( oWritePipe, "%s\n" % str( S_OK( oFunc( *stArgs ) ) ) )
57 except Exception, v:
58 os.write( oWritePipe, "%s\n" % str( S_ERROR( str( v ) ) ) )
59 try:
60 os.close( oWritePipe )
61 finally:
62 os._exit(0)
63
64 def __selectFD( self, lR, iTimeout = False ):
65 if self.iTimeout and not iTimeout:
66 iTimeout = self.iTimeout
67 if not iTimeout:
68 return select.select( lR , [], [] )[0]
69 else:
70 return select.select( lR , [], [], iTimeout )[0]
71
72 def pythonCall( self, oFunction, stArgs ):
73 oReadPipe, oWritePipe = os.pipe()
74 iPid = os.fork()
75 if iPid == 0:
76 os.close( oReadPipe )
77 self.__executePythonFunction( oFunction, stArgs, oWritePipe )
78 os.close( oWritePipe )
79 else:
80 os.close( oWritePipe )
81 lReadable = self.__selectFD( [ oReadPipe ] )
82 if len( lReadable ) == 0:
83 os.close( oReadPipe )
84 os.kill( iPid, 9 )
85 os.waitpid( iPid, 0 )
86 return S_ERROR( "%d seconds timeout for '%s' call" % ( self.iTimeout, oFunction.__name__ ) )
87 elif lReadable[0] == oReadPipe:
88 dData = self.__readFromPipe( oReadPipe )
89 os.close( oReadPipe )
90 os.waitpid( iPid, 0 )
91 if dData[ 'OK' ]:
92 return eval( dData[ 'Value' ] )
93 return dData
94
95 def __generateSystemCommandError( self, sMessage ):
96 retVal = S_ERROR( sMessage )
97 retVal[ 'stdout' ] = self.lBuffers[0][0]
98 retVal[ 'stderr' ] = self.lBuffers[1][0]
99 return retVal
100
101 def __readFromFile( self, oFile, iBaseLength, bAll ):
102 try:
103 if bAll:
104 sData = "".join( oFile.readlines() )
105 else:
106 sData = oFile.readline()
107 except Exception, v:
108 pass
109 if sData == "":
110 #self.checkAlive()
111 self.bAlive = False
112 if len( sData ) + iBaseLength > self.iBufferLimit:
113 dRetVal = S_ERROR( "Reached maximum allowed length (%d bytes) for called function return value" % self.iBufferLimit )
114 dRetVal[ 'ReadData' ] = sData
115 return dRetVal
116
117 return S_OK( sData )
118
119 def __readFromSystemCommandOutput( self, oFile, iDataIndex, bAll = False ):
120 retVal = self.__readFromFile( oFile, len( self.lBuffers[ iDataIndex ][0] ), bAll )
121 if retVal[ 'OK' ]:
122 self.lBuffers[ iDataIndex ][0] += retVal[ 'Value' ]
123 if not self.oCallback == None:
124 while self.__callLineCallback( iDataIndex ):
125 pass
126 return S_OK()
127 else:
128 self.lBuffers[ iDataIndex ][0] += retVal[ 'ReadData' ]
129 os.kill( self.oChild.pid, 9 )
130 self.oChild.wait()
131 return self.__generateSystemCommandError( "Exceeded maximum buffer size ( %d bytes ) timeout for '%s' call" % ( self.iBufferLimit, self.sCmd ) )
132
133 def systemCall( self, sCmd, oCallbackFunction = None ):
134 self.sCmd = sCmd
135 self.oCallback = oCallbackFunction
136 self.oChild = popen2.Popen3( self.sCmd, True )
137 self.lBuffers = [ [ "", 0 ], [ "", 0 ] ]
138 iInitialTime = time.time()
139 iExitStatus = self.oChild.poll()
140
141 while iExitStatus == -1:
142 retVal = self.__readFromCommand()
143 if not retVal[ 'OK' ]:
144 return retVal
145 if self.iTimeout and time.time() - iInitialTime > self.iTimeout:
146 os.kill( self.oChild.pid, 9 )
147 self.oChild.wait()
148 self.__readFromCommand( True )
149 self.oChild.fromchild.close()
150 self.oChild.childerr.close()
151 return self.__generateSystemCommandError( "Timeout (%d seconds) for '%s' call" % ( self.iTimeout, sCmd ) )
152 iExitStatus = self.oChild.poll()
153
154 self.__readFromCommand(True )
155
156 self.oChild.fromchild.close()
157 self.oChild.childerr.close()
158 return S_OK( ( iExitStatus / 256, self.lBuffers[0][0], self.lBuffers[1][0] ) )
159
160 def __readFromCommand( self, bLast = False ):
161 if bLast:
162 retVal = self.__readFromSystemCommandOutput( self.oChild.fromchild, 0, True )
163 if not retVal[ 'OK' ]:
164 return retVal
165 retVal = self.__readFromSystemCommandOutput( self.oChild.childerr, 1, True )
166 if not retVal[ 'OK' ]:
167 return retVal
168 else:
169 lReadable = self.__selectFD( [ self.oChild.fromchild, self.oChild.childerr ], 1 )
170 if self.oChild.fromchild in lReadable:
171 retVal = self.__readFromSystemCommandOutput( self.oChild.fromchild, 0 )
172 if not retVal[ 'OK' ]:
173 return retVal
174 if self.oChild.childerr in lReadable:
175 retVal = self.__readFromSystemCommandOutput( self.oChild.childerr, 1 )
176 if not retVal[ 'OK' ]:
177 return retVal
178 return S_OK()
179
180
181 def __callLineCallback( self, iIndex ):
182 iNextLine = self.lBuffers[ iIndex ][0][ self.lBuffers[ iIndex ][1]: ].find( "\n" )
183 if iNextLine > -1:
184 self.oCallback( iIndex, self.lBuffers[ iIndex ][0][ self.lBuffers[ iIndex ][1]: self.lBuffers[ iIndex ][1] + iNextLine ] )
185 self.lBuffers[ iIndex ][1] += iNextLine + 1
186 return True
187 return False
188
189############################################################################
190def redirectOutput(index, buffer):
191 """Filter function to redirect the std output and error of the job
192 executable for real-time debugging
193 """
194 print buffer
195
196
197############################################################################
198def exeCommand( sCmd, iTimeout = 0, oLineCallback = redirectOutput):
199 """Return ( status, output, error, pythonError ) of executing cmd in a shell."""
200 oSPE = SubprocessExecuter( iTimeout )
201 retVal = oSPE.systemCall( sCmd, oLineCallback )
202 if retVal[ 'OK' ]:
203 return retVal[ 'Value' ][0], retVal[ 'Value' ][1], retVal[ 'Value' ][2], 0
204 else:
205 if re.search("Timeout",retVal['Message']):
206 return 1, retVal['stdout'], retVal['Message'], 2
207 else:
208 return 1, retVal['stdout'], retVal['stderr'], 1
209
210#################################################################################
211###################################### EoF ######################################
212#################################################################################
Note: See TracBrowser for help on using the repository browser.