source: tbroadcast/v2/python/tbroadcast.py@ 238

Last change on this file since 238 was 238, checked in by garonne, 19 years ago

check cycles

  • Property svn:executable set to *
File size: 10.8 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Date: 08/25/2006
4# -- Name: tbroadcast
5# -- Description: mainc class
6#----------------------------------#
7
8import os
9import sys
10import time
11import string
12import random
13import commands
14
15from threadpool import WorkRequest
16from threadpool import ThreadPool
17from threadpool import NoResultsPending
18from threadpool import NoWorkersAvailable
19from threadpool import makeRequests
20from executer import exeCommand
21
22class Scheduler:
23
24 def __init__(self, num_workers=20):
25 self.pool = ThreadPool(num_workers=num_workers)
26 self.packages = {}
27 self.check_cycles()
28 self.instanciate_packages ()
29
30
31 def check_cycles (self):
32 cmd = 'cmt show cycles'
33 cycle_found = False
34 status, output = commands.getstatusoutput (cmd)
35 if status != 0:
36 print output
37 sys.exit(-1)
38 lines = string.split(output, '\n')
39 for line in lines:
40 if line [0] != '#':
41 if not cycle_found:
42 cycle_found = True
43 print "# Error: cycles found, not possible to execute broadcast with threads. See the followings packages:"
44 print line
45 if cycle_found:
46 sys.exit(-1)
47
48 def instanciate_packages(self):
49 # We create the schedule of the work units
50 print '# First, we initialize the DAG by parsing cmt show uses (takes a certain time... huk should be improved asap) '
51 cmd = 'cmt show uses'
52 status, output = commands.getstatusoutput (cmd)
53 if status != 0:
54 print output
55 sys.exit(-1)
56 lines = string.split(output, '\n')
57 current_package = self.get_current_package()
58 self.packages [current_package] = {'version': '*', 'dependencies': list(), 'status': 'waiting', 'path':os.getcwd()}
59 indice = 1
60 for line in lines:
61 if line [0] == '#' and line[:5] != "#CMT>" and line not in ['# Selection :','#']:
62 name = string.split (line)[2]
63 version = string.split (line)[3]
64 if name not in self.packages[current_package]['dependencies']:
65 self.packages[current_package]['dependencies'].append (name)
66 #print '\n#', indice,':: add package', name
67 indice = indice + 1
68 if not self.packages.has_key (name):
69 self.packages [name] = {'version': version, 'dependencies': list(), 'status': 'waiting', 'path': None}
70 found = False
71 for ligne in lines:
72 if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne not in ['# Selection :','#']:
73 if found:
74 if level < string.find(ligne, 'use'):
75 if string.split (ligne)[2] not in self.packages[name]['dependencies']:
76 self.packages[name]['dependencies'].append (string.split (ligne)[2])
77 #print "# add dependency", string.split (ligne)[2], ' to package ',name
78 else:
79 found = False
80 if name == string.split (ligne)[2]:
81 level = string.find(ligne, 'use')
82 found = True
83 if line[:4] == "use ":
84 result = string.split (line[4:len(line)], ' ')
85 if self.packages.has_key(result[0]):
86 if len(result)==4:
87 name, version, offset, path = string.split (line[4:len(line)], " ")
88 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
89 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
90 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
91 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
92 else:
93 print '# error path not found for', name
94 sys.exit(-1)
95 elif len(result)==5:
96 name, version, offset, path, importation = string.split (line[4:len(line)], " ")
97 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
98 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
99 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
100 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
101 else:
102 print '# error path not found for', name
103 sys.exit(-1)
104 elif len(result)==3:
105 name, version, path = string.split (line[4:len(line)], " ")
106 if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
107 full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
108 elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):
109 full_path = path[1:-1] + '/' +name + + '/cmt'
110 else:
111 print '# error path not found for', name
112 sys.exit(-1)
113 else:
114 print "error:",line
115 print str(result)
116 sys.exit(-1)
117 self.packages[result[0]]['path'] = full_path
118 print '# really takes a certain time ... '
119
120 def get_current_package(self):
121 cmd = 'cmt show macro package'
122 status, output = commands.getstatusoutput (cmd)
123 if status != 0:
124 print output
125 sys.exit(-1)
126 lines = string.split(output, '\n')
127 for line in lines:
128 if line [0] != '#':
129 start = string.find(line,"'")
130 end = string.find(line[start+1:len(line)],"'")
131 return line [start+1:start+end+1]
132
133 def get_work_area_path (self, name):
134 return self.packages [name]['path']
135
136 def get_package_path (self, name):
137 #return os.getcwd ()
138 cmd = 'cmt -use='+name+' run pwd'
139 status, output = commands.getstatusoutput (cmd)
140 if status != 0:
141 print output
142 sys.exit(-1)
143 lines = string.split(output, '\n')
144 for line in lines:
145 if line [0] != '#' and line[:5] != "#CMT>":
146 print line
147 return line
148
149 def print_dependencies(self):
150 print '# ------------------------'
151 print '# package --> dependencies'
152 print '# ------------------------'
153 for key in self.packages.keys():
154 print key, '-->', self.packages[key] ['dependencies'],',', self.packages[key] ['status']
155
156 def is_work_unit_waiting (self, name):
157 return self.packages[name] ['status'] == 'waiting'
158
159 def set_work_unit_status (self, name, status):
160 self.packages[name] ['status'] = status
161
162 def get_dependencies (self, name):
163 return self.packages[name] ['dependencies']
164
165 def get_next_work_units (self):
166 result = list ()
167 for key in self.packages.keys():
168 if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
169 result.append(key)
170 return result
171
172 def is_work_units (self):
173 result = list ()
174 for key in self.packages.keys():
175 if self.is_work_unit_waiting(key) :
176 return True
177 return False
178
179 def suppress_work_unit (self, name):
180 print '# remove', name, 'from schedule'
181 for key in self.packages.keys():
182 if name in self.packages[key]['dependencies']:
183 self.packages[key]['dependencies'].remove(name)
184
185 def add_work_unit (self, name, cmd):
186 if self.is_work_unit_waiting (name):
187 # we create requests
188 arg = {'cmd': cmd , 'package':name}
189 req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception)
190 # then we put the work request in the queue...
191 self.set_work_unit_status (name, 'queued')
192 self.pool.putRequest(req)
193 #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
194
195 def execute (self, command):
196 #self.print_dependencies ()
197 packages = self.get_next_work_units()
198 if len(packages) !=0:
199 print '\n# Execute parallel actions within ', packages
200 for package in packages:
201 self.add_work_unit (package, command)
202
203 def execute_all(self,command):
204 self.execute (command)
205 self.wait()
206 while self.is_work_units():
207 self.wait()
208
209 def wait (self):
210 self.pool.wait()
211
212 # this will be called each time a result is available
213 def result_callback(self, request, result):
214 #print "**Result: %s from request #%s" % (str(result), request.requestID)
215 print "# Result: %s from request #%s" % (result['package'], request.requestID)
216 self.execute (result['cmd'])
217
218 # the work the threads will have to do
219 def do_execute(self, arg):
220 path = self.get_work_area_path (arg['package'])
221 print '#--------------------------------------------------------------'
222 print '# Now trying ['+ arg['cmd']+'] in ' + path
223 print '#--------------------------------------------------------------'
224 self.set_work_unit_status (arg['package'], 'running')
225 cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"
226 os.chdir(path)
227 cmd = arg['cmd']
228 status, output, error, pythonError = exeCommand(cmd, iTimeout = 10)
229 self.suppress_work_unit (arg['package'])
230 self.set_work_unit_status (arg['package'], 'done')
231 # status, output= commands.getstatusoutput(cmd)
232 #print output
233 #if status != 0:
234 # raise RuntimeError(output)
235 return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
236
237 # this will be called when an exception occurs within a thread
238 def handle_exception(self, request, exc_info):
239 print "# Exception occured in request #%s: %s" % \
240 (request.requestID, exc_info[1])
241#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.