source: tbroadcast/v2.0.1/python/tbroadcast.py@ 291

Last change on this file since 291 was 248, checked in by garonne, 19 years ago

Mise au propre option + script generateGantt

  • Property svn:executable set to *
File size: 21.9 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9import os
10import sys
11import time
12import string
13import random
14import os.path
15import commands
16import traceback
17from threading import BoundedSemaphore
18
19from threadpool import WorkRequest
20from threadpool import ThreadPool
21from threadpool import NoResultsPending
22from threadpool import NoWorkersAvailable
23from threadpool import makeRequests
24from executer import exeCommand
25
26class Scheduler:
27
28 def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None, error=None, silent = False, perf=False):
29 self.pool = ThreadPool(num_workers=num_workers)
30 self.current_package = self.get_current_package()
31 self.current_project = {'name': None, 'path': None, 'version': None}
32 self.packages = {}
33 self.counter = 0
34 self.semaphore = BoundedSemaphore(1)
35 self.local = local
36 self.ignore_cycles = ignore_cycles
37 self.output = output
38 self.error = error
39 self.silent = silent
40 self.perf = perf
41 if self.perf is not False:
42 f = open (self.perf, 'w+')
43 f.close()
44 if output is not None:
45 if not os.path.exists (output):
46 print "path",output,"no exits"
47 sys.exit(-1)
48 if not os.path.isdir(output):
49 print "path",output,"no a valid directory"
50 sys.exit(-1)
51
52 self.get_current_project()
53 self.instanciate_packages (file)
54 if self.local: self.get_local_graph()
55 self.check_cycles()
56
57# status, output = commands.getstatusoutput("cmt broadcast -local 'echo <package>'")
58# lignes = string.split(output, '\n')
59# i = 1
60# for package in lignes:
61# if package!='' and package[0] != '#':
62# print i , package
63# i =i +1
64# if not self.packages.has_key(package):
65# print package
66# print len(self.packages)
67# sys.exit(-1)
68
69 def get_current_project(self):
70 cmd = 'cmt show projects | grep current'
71 status, output = commands.getstatusoutput (cmd)
72 if status != 0:
73 print output
74 sys.exit(-1)
75 lines = string.split(output, '\n')
76 for line in lines:
77 if line!='' and line [0] != '#':
78 item = string.split (line, ' ')
79 self.current_project ['name'] = item[0]
80 self.current_project ['version'] = item[1]
81 self.current_project ['path'] = item[3][:-1]
82 version = self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
83 if self.current_project ['version'] == version:
84 self.current_project ['path'] = os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
85 #print self.current_project
86
87 def get_counter(self):
88 self.semaphore.acquire ()
89 self.counter = self.counter + 1
90 value = self.counter
91 self.semaphore.release()
92 return value
93
94 def check_cycles (self):
95 cmd = 'cmt -private show cycles'
96 cycle_found = False
97 status, output = commands.getstatusoutput (cmd)
98 if status != 0:
99 print output
100 sys.exit(-1)
101 lines = string.split(output, '\n')
102 cycles = list ()
103 for line in lines:
104 if line!='' and line [0] != '#':
105 cycles.append (string.split(line))
106 cercles =list()
107 for cycle in cycles:
108 cycleInProject = True
109 for package in cycle:
110 if not self.packages.has_key(package):
111 cycleInProject = False
112 if cycleInProject:
113 cercles.append(cycle)
114 if len(cercles):
115 if not self.ignore_cycles:
116 print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
117 for cycle in cercles:
118 loop = ""
119 for package in cycle:
120 loop = loop + package + ' -> '
121 print loop + '...'
122 sys.exit(-1)
123 else:
124 print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
125 for cycle in cercles:
126 loop = ""
127 for package in cycle:
128 loop = loop + package + ' -> '
129 if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
130 print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
131 self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
132# sys.exit(-1)
133
134 def format_uses (self, content):
135 # format variables
136 lignes = string.split(content, '\n')
137 lines = list()
138 for ligne in lignes:
139 if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
140 lines.append(ligne)
141 lines.reverse()
142 return lines
143
144 def format_paths (self, content):
145 # format variables
146 lignes = string.split(content, '\n')
147 lines = list()
148 for ligne in lignes:
149 if ligne[:4] == "use ":
150 lines.append(ligne)
151 return lines
152
153 def get_paths (self, content):
154 lines = self.format_paths(content)
155 for line in lines:
156 result = string.split (line[4:len(line)], ' ')
157 if self.packages.has_key(result[0]):
158 if len(result)==4:
159 name, version, offset, path = string.split (line[4:len(line)], " ")
160 #print name, version, offset, path
161 #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
162 if path == '(no_auto_imports)':
163 path = offset
164 offset = ''
165 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
166 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
167 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
168 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
169 else:
170 print '# error path not found for', name
171 sys.exit(-1)
172 elif len(result)==5:
173 name, version, offset, path, importation = string.split (line[4:len(line)], " ")
174 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
175 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
176 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
177 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
178 else:
179 print '# error path not found for', name
180 sys.exit(-1)
181 elif len(result)==3:
182 name, version, path = string.split (line[4:len(line)], " ")
183 if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
184 full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
185 elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):
186 full_path = path[1:-1] + '/' +name + + '/cmt'
187 else:
188 print '# error path not found for', name
189 sys.exit(-1)
190 else:
191 print "error:",line
192 print str(result)
193 sys.exit(-1)
194 self.packages[result[0]]['path'] = os.path.normpath(full_path)
195 commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
196 if os.path.normpath(commonprefix) == self.current_project ['path']:
197 #print result[0], ' belong to project', self.current_project ['name']
198 self.packages[result[0]]['current_project'] = True
199
200 def get_uses(self, content):
201 # initiates variables
202 lignes = self.format_uses(content)
203 if not len(lignes): return
204 self.packages [self.current_package] = {'version': '*', 'client': list(),
205 'uses': list(), 'status': 'waiting',
206 'current_project': True, 'path': os.getcwd()}
207 previous_client = self.current_package
208 previous_level = 0
209 level_stack = [{'name':previous_client,'level':previous_level},]
210 ligne = lignes.pop()
211 while len(lignes)!=0:
212 current_level = string.find(ligne, 'use')
213 while current_level > previous_level:
214 name = string.split (ligne)[2]
215 version = string.split (ligne)[3]
216 if not self.packages.has_key (name):
217 self.packages [name] = {'version': version, 'uses': list(),
218 'client': list(), 'status': 'waiting',
219 'current_project': False, 'path': None}
220 if name not in self.packages[previous_client]['uses']:# and name != previous_client:
221 self.packages[previous_client]['uses'].append (name)
222 level_stack.append({'name':previous_client,'level':previous_level})
223 previous_client = name
224 previous_level = current_level
225 if len(lignes):
226 ligne = lignes.pop()
227 #print ligne
228 current_level = string.find(ligne, 'use')
229
230 #self.packages [previous_client]['status'] ='queued'
231 # restore the level
232 if len(lignes):
233 if len(level_stack):
234 item = level_stack.pop()
235 while item['level'] >= current_level and len(level_stack):
236 item = level_stack.pop()
237 previous_client = item['name']
238 previous_level = item['level']
239 #print previous_client, '-->',string.split (ligne)[2]
240
241 def instanciate_packages(self, file=None):
242 # We create the schedule of the work units
243 print '# First, we initialize the DAG by parsing "cmt show uses"'
244 if file is None:
245 cmd = 'cmt show uses'
246 else:
247 cmd = 'cat ' + file
248 status, output = commands.getstatusoutput (cmd)
249 if status != 0:
250 print output
251 sys.exit(-1)
252 self.get_uses(output)
253 self.get_paths(output)
254 #self.check_execution (package=self.current_package)
255 #self.simulate_execution()
256
257 def get_local_graph(self):
258 To_remove = list()
259 for key in self.packages:
260 if self.packages[key]['current_project']== False:
261 for selected in self.packages:
262 if key in self.packages[selected]['uses']:
263 self.packages[selected]['uses'].remove(key)
264 To_remove.append (key)
265 for item in To_remove:
266 del self.packages[item]
267
268 def simulate_execution(self):
269 ok = True
270 indice = 1
271 while ok:
272 runnable = list()
273 for key in self.packages:
274 if self.packages[key]['status']!='done':
275 if len(self.packages[key]['uses']) == 0:
276 runnable.append(key)
277 if len(runnable):
278 print '\n#--------------------------------------------------------------'
279 print "# Execute parallel actions within packages " + str(runnable)
280 for selected in runnable:
281 print '#--------------------------------------------------------------'
282 print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path']
283 print '#--------------------------------------------------------------'
284 self.packages[selected]['status']='done'
285 indice = indice + 1
286 for key in self.packages:
287 if selected in self.packages[key]['uses']:
288 self.packages[key]['uses'].remove(selected)
289 #print 'remove', selected, 'from',key
290 if len(runnable)==0:
291 ok = False
292
293 def check_execution(self, package, path=list(), cycles=list()):
294 #print package,'-->',self.packages[package]['uses']
295 #print path
296 if package in path:
297 if path[path.index(package):] not in cycles:
298 print 'Cycles:',path[path.index(package):], package
299 cycles = cycles + path[path.index(package):]
300 sys.exit(-1)
301 path.append(package)
302 for item in self.packages[package]['uses']:
303 self.check_execution(package=item, path=path, cycles=cycles)
304 path.pop()
305
306 def get_current_package(self):
307 cmd = 'cmt show macro package'
308 status, output = commands.getstatusoutput (cmd)
309 if status != 0:
310 print output
311 sys.exit(-1)
312 lines = string.split(output, '\n')
313 for line in lines:
314 if line [0] != '#':
315 start = string.find(line,"'")
316 end = string.find(line[start+1:len(line)],"'")
317 return line [start+1:start+end+1]
318
319 def get_work_area_path (self, name):
320 return self.packages [name]['path']
321
322 def get_package_path (self, name):
323 #return os.getcwd ()
324 cmd = 'cmt -use='+name+' run pwd'
325 status, output = commands.getstatusoutput (cmd)
326 if status != 0:
327 print output
328 sys.exit(-1)
329 lines = string.split(output, '\n')
330 for line in lines:
331 if line [0] != '#' and line[:5] != "#CMT>":
332 print line
333 return line
334
335 def print_dependencies(self):
336 print '# ------------------------'
337 print '# package --> dependencies'
338 print '# ------------------------'
339 for key in self.packages.keys():
340 print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']
341
342 def print_status(self, status):
343 print '# ------------------------'
344 print '# package --> dependencies'
345 print '# ------------------------'
346 i = 1
347 for key in self.packages.keys():
348 if self.packages[key] ['status'] == status:
349 print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']
350 i = i + 1
351
352 def is_work_unit_waiting (self, name):
353 return self.packages[name] ['status'] == 'waiting'
354
355 def set_work_unit_status (self, name, status):
356 self.packages[name] ['status'] = status
357
358 def get_dependencies (self, name):
359 return self.packages[name] ['uses']
360
361 def get_next_work_units (self):
362 result = list ()
363 for key in self.packages.keys():
364 if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
365 result.append(key)
366 return result
367
368 def is_work_units (self):
369 result = list ()
370 for key in self.packages.keys():
371 if self.is_work_unit_waiting(key) :
372 return True
373 return False
374
375 def suppress_work_unit (self, name):
376 #print '# remove', name, 'from schedule'
377 for key in self.packages.keys():
378 if name in self.packages[key]['uses']:
379 self.packages[key]['uses'].remove(name)
380
381 def add_work_unit (self, name, cmd):
382 if self.is_work_unit_waiting (name):
383 # we create requests
384 arg = {'cmd': cmd , 'package':name}
385 req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception)
386 # then we put the work request in the queue...
387 self.set_work_unit_status (name, 'queued')
388 self.pool.putRequest(req)
389 #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
390
391 def execute (self, command):
392 #self.print_dependencies ()
393 packages = self.get_next_work_units()
394 if len(packages) !=0:
395 print '\n#--------------------------------------------------------------'
396 print '# Execute parallel actions within packages', packages
397 for package in packages:
398 self.add_work_unit (package, command)
399
400 def execute_all(self,command):
401 #self.print_dependencies ()
402 self.execute (command)
403 self.wait()
404 #self.print_dependencies ()
405 #self.print_status (status='waiting')
406 #while self.is_work_units():
407 #self.wait()
408
409 def wait (self):
410 self.pool.wait()
411
412 # this will be called each time a result is available
413 def result_callback(self, request, result):
414 #print "**Result: %s from request #%s" % (str(result), request.requestID)
415 #print "# Result: %s from request #%s" % (result['package'], request.requestID)
416 #if result['package'] == 'CodeCheck':
417 # sys.exit(-1)
418 self.execute (result['cmd'])
419
420 # the work the threads will have to do
421 def do_execute(self, arg):
422 path = self.get_work_area_path (arg['package'])
423 if path == None or not os.path.exists(path):
424 raise RuntimeError('Path to package '+ arg['package'] +' not found')
425 self.set_work_unit_status (arg['package'], 'running')
426 #cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"
427 #os.chdir(path)
428 #arg['cmd'] = "cd "+ path +";"+ arg['cmd']
429 print '#--------------------------------------------------------------'
430 print '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path
431 print '#--------------------------------------------------------------'
432 cmd = "cd "+ path +";"+ arg['cmd']
433 #status, output= commands.getstatusoutput(cmd)
434 # init output file
435 if self.output is not None:
436 f = open (self.output+'/'+arg['package']+'_output.log', 'w+')
437 f.close()
438 if self.error is not None:
439 f = open (self.error+'/'+arg['package']+'_error.log', 'w+')
440 f.close()
441 self.packages[arg['package']] ['startTime'] = time.time ()
442 status, output, error, pythonError = exeCommand(sCmd=cmd, oLineCallback=self.redirectOutput, arg=arg)#,iTimeout = 3600)
443 self.packages[arg['package']] ['endTime'] = time.time ()
444 if self.perf:
445 self.semaphore.acquire ()
446 f = open (self.perf, 'a')
447 f.write (arg['package']+" "+str(self.packages[arg['package']] ['startTime'])+" "+str(self.packages[arg['package']] ['endTime'] )+'\n')
448 f.close()
449 self.semaphore.release()
450 self.suppress_work_unit (arg['package'])
451 self.set_work_unit_status (arg['package'], 'done')
452 # status, output= commands.getstatusoutput(cmd)
453 #if status != 0:
454 # raise RuntimeError(output)
455 return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
456
457 def redirectOutput(self, index, buffer, arg):
458 """Filter function to redirect the std output and error of the job
459 executable for real-time debugging
460 """
461 if self.output is not None:
462 if index==0:
463 f = open (self.output+'/'+arg['package']+'_output.log', 'a')
464 f.write (buffer+'\n')
465 f.close()
466 elif index==1:
467 if self.error is not None:
468 f = open (self.error+'/'+arg['package']+'_error.log', 'a')
469 else:
470 f = open (self.output+'/'+arg['package']+'_output.log', 'a')
471 f.write (buffer+'\n')
472 f.close()
473 if not self.silent:
474 print buffer
475
476 # this will be called when an exception occurs within a thread
477 def handle_exception(self, request, exc_info):
478 #traceback.print_stack()
479 print '#--------------------------------------------------------------'
480 #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
481 print "# Exception occured: %s" %(exc_info[1])
482 print exc_info
483 print '#--------------------------------------------------------------'
484 #sys.exit(-1)
485
486#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.