source: tbroadcast/v2.0.6/python/tbroadcast.py@ 577

Last change on this file since 577 was 508, checked in by garonne, 16 years ago

Commit changes

  • Property svn:executable set to *
File size: 24.6 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9import os
10import sys
11import time
12import string
13import random
14import os.path
15import commands
16import traceback
17import exceptions
18from threading import BoundedSemaphore
19
20from threadpool import WorkRequest
21from threadpool import ThreadPool
22from threadpool import NoResultsPending
23from threadpool import NoWorkersAvailable
24from threadpool import makeRequests
25
26from subprocess import Popen
27
28class Scheduler:
29
30 def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None, error=None, silent = False, perf=False, keep_going=True):
31 self.pool = ThreadPool(num_workers=num_workers)
32 self.current_package = self.get_current_package()
33 self.current_project = {'name': None, 'path': None, 'version': None}
34 self.packages = {}
35 self.counter = 0
36 self.semaphore = BoundedSemaphore(1)
37 self.local = local
38 self.ignore_cycles = ignore_cycles
39 self.output = output
40 self.error = error
41 self.silent = silent
42 self.perf = perf
43 self.keep_going = keep_going
44 if self.perf is not False:
45 f = open (self.perf, 'w+')
46 f.close()
47 if output is not None:
48 if not os.path.exists (output):
49 print "path",output,"no exits"
50 sys.exit(-1)
51 if not os.path.isdir(output):
52 print "path",output,"no a valid directory"
53 sys.exit(-1)
54
55 self.get_current_project()
56 self.instanciate_packages (file)
57 if self.local: self.get_local_graph()
58 self.check_cycles()
59
60# status, output = commands.getstatusoutput("cmt broadcast -local 'echo <package>'")
61# lignes = string.split(output, '\n')
62# i = 1
63# for package in lignes:
64# if package!='' and package[0] != '#':
65# print i , package
66# i =i +1
67# if not self.packages.has_key(package):
68# print package
69# print len(self.packages)
70# sys.exit(-1)
71
72 def get_current_project(self):
73 cmd = 'cmt show projects | grep current'
74 status, output = commands.getstatusoutput (cmd)
75 if status != 0:
76 print output
77 sys.exit(-1)
78 lines = string.split(output, '\n')
79 for line in lines:
80 if line!='' and line [0] != '#':
81 item = string.split (line, ' ')
82 self.current_project ['name'] = item[0]
83 self.current_project ['version'] = item[1]
84 self.current_project ['path'] = item[3][:-1]
85 version = self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
86 if self.current_project ['version'] == version:
87 self.current_project ['path'] = os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
88 return
89 #print self.current_project
90
91 def get_counter(self):
92 self.semaphore.acquire ()
93 self.counter = self.counter + 1
94 value = self.counter
95 self.semaphore.release()
96 return value
97
98 def check_cycles (self):
99 cmd = 'cmt -private show cycles'
100 cycle_found = False
101 status, output = commands.getstatusoutput (cmd)
102 if status != 0:
103 print output
104 sys.exit(-1)
105 lines = string.split(output, '\n')
106 cycles = list ()
107 for line in lines:
108 if line!='' and line [0] != '#':
109 cycles.append (string.split(line))
110 cercles =list()
111 for cycle in cycles:
112 cycleInProject = True
113 for package in cycle:
114 if not self.packages.has_key(package):
115 cycleInProject = False
116 if cycleInProject:
117 cercles.append(cycle)
118 if len(cercles):
119 if not self.ignore_cycles:
120 print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
121 for cycle in cercles:
122 loop = ""
123 for package in cycle:
124 loop = loop + package + ' -> '
125 print loop + '...'
126 sys.exit(-1)
127 else:
128 print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
129 for cycle in cercles:
130 loop = ""
131 for package in cycle:
132 loop = loop + package + ' -> '
133 if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
134 print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
135 self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
136# sys.exit(-1)
137
138 def format_uses (self, content):
139 # format variables
140 lignes = string.split(content, '\n')
141 lines = list()
142 for ligne in lignes:
143 if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
144 lines.append(ligne)
145 lines.reverse()
146 return lines
147
148 def format_paths (self, content):
149 # format variables
150 lignes = string.split(content, '\n')
151 lines = list()
152 for ligne in lignes:
153 if ligne[:4] == "use ":
154 lines.append(ligne)
155 return lines
156
157 def get_paths (self, content):
158 lines = self.format_paths(content)
159 for line in lines:
160 result = string.split (line[4:len(line)], ' ')
161 if self.packages.has_key(result[0]):
162 if len(result)==4:
163 name, version, offset, path = string.split (line[4:len(line)], " ")
164 #print name, version, offset, path
165 #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
166 if path == '(no_auto_imports)':
167 path = offset
168 offset = ''
169 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
170 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
171 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
172 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
173 else:
174 print '# error path not found for', name
175 sys.exit(-1)
176 elif len(result)==5:
177 name, version, offset, path, importation = string.split (line[4:len(line)], " ")
178 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
179 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
180 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
181 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
182 else:
183 print '# error path not found for', name
184 sys.exit(-1)
185 elif len(result)==3:
186 name, version, path = string.split (line[4:len(line)], " ")
187 if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
188 full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
189 elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):
190 full_path = path[1:-1] + '/' +name + + '/cmt'
191 else:
192 print '# error path not found for', name
193 sys.exit(-1)
194 else:
195 print "error:",line
196 print str(result)
197 sys.exit(-1)
198 self.packages[result[0]]['path'] = os.path.normpath(full_path)
199 commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
200 if os.path.normpath(commonprefix) == self.current_project ['path']:
201 #print result[0], ' belong to project', self.current_project ['name']
202 self.packages[result[0]]['current_project'] = True
203
204 def get_uses(self, content):
205 # initiates variables
206 lignes = self.format_uses(content)
207 if not len(lignes): return
208 self.packages [self.current_package] = {'version': '*', 'client': list(),
209 'uses': list(), 'status': 'waiting',
210 'current_project': True, 'path': os.getcwd()}
211 previous_client = self.current_package
212 previous_level = 0
213 level_stack = [{'name':previous_client,'level':previous_level},]
214 ligne = lignes.pop()
215 while len(lignes)!=0:
216 current_level = string.find(ligne, 'use')
217 while current_level > previous_level:
218 name = string.split (ligne)[2]
219 version = string.split (ligne)[3]
220 if not self.packages.has_key (name):
221 self.packages [name] = {'version': version, 'uses': list(),
222 'client': list(), 'status': 'waiting',
223 'current_project': False, 'path': None}
224 if name not in self.packages[previous_client]['uses']:# and name != previous_client:
225 self.packages[previous_client]['uses'].append (name)
226 level_stack.append({'name':previous_client,'level':previous_level})
227 previous_client = name
228 previous_level = current_level
229 if len(lignes):
230 ligne = lignes.pop()
231 #print ligne
232 current_level = string.find(ligne, 'use')
233
234 #self.packages [previous_client]['status'] ='queued'
235 # restore the level
236 if len(lignes):
237 if len(level_stack):
238 item = level_stack.pop()
239 while item['level'] >= current_level and len(level_stack):
240 item = level_stack.pop()
241 previous_client = item['name']
242 previous_level = item['level']
243 #print previous_client, '-->',string.split (ligne)[2]
244
245 def instanciate_packages(self, file=None):
246 # We create the schedule of the work units
247 print '# First, we initialize the DAG by parsing "cmt show uses"'
248 if file is None:
249 cmd = 'cmt show uses'
250 else:
251 cmd = 'cat ' + file
252 status, output = commands.getstatusoutput (cmd)
253 if status != 0:
254 print output
255 sys.exit(-1)
256 self.get_uses(output)
257 self.get_paths(output)
258 #self.check_execution (package=self.current_package)
259 #self.simulate_execution()
260
261 def get_local_graph(self):
262 To_remove = list()
263 for key in self.packages:
264 if self.packages[key]['current_project']== False:
265 for selected in self.packages:
266 if key in self.packages[selected]['uses']:
267 self.packages[selected]['uses'].remove(key)
268 To_remove.append (key)
269 for item in To_remove:
270 del self.packages[item]
271
272 def simulate_execution(self):
273 ok = True
274 indice = 1
275 while ok:
276 runnable = list()
277 for key in self.packages:
278 if self.packages[key]['status']!='done':
279 if len(self.packages[key]['uses']) == 0:
280 runnable.append(key)
281 if len(runnable):
282 print '\n#--------------------------------------------------------------'
283 print "# Execute parallel actions within packages " + str(runnable)
284 for selected in runnable:
285 print '#--------------------------------------------------------------'
286 print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path']
287 print '#--------------------------------------------------------------'
288 self.packages[selected]['status']='done'
289 indice = indice + 1
290 for key in self.packages:
291 if selected in self.packages[key]['uses']:
292 self.packages[key]['uses'].remove(selected)
293 #print 'remove', selected, 'from',key
294 if len(runnable)==0:
295 ok = False
296
297 def check_execution(self, package, path=list(), cycles=list()):
298 #print package,'-->',self.packages[package]['uses']
299 #print path
300 if package in path:
301 if path[path.index(package):] not in cycles:
302 print 'Cycles:',path[path.index(package):], package
303 cycles = cycles + path[path.index(package):]
304 sys.exit(-1)
305 path.append(package)
306 for item in self.packages[package]['uses']:
307 self.check_execution(package=item, path=path, cycles=cycles)
308 path.pop()
309
310 def get_current_package(self):
311 cmd = 'cmt show macro package'
312 status, output = commands.getstatusoutput (cmd)
313 if status != 0:
314 print output
315 sys.exit(-1)
316 lines = string.split(output, '\n')
317 for line in lines:
318 if line [0] != '#':
319 start = string.find(line,"'")
320 end = string.find(line[start+1:len(line)],"'")
321 return line [start+1:start+end+1]
322
323 def get_work_area_path (self, name):
324 return self.packages [name]['path']
325
326 def get_package_path (self, name):
327 #return os.getcwd ()
328 cmd = 'cmt -use='+name+' run pwd'
329 status, output = commands.getstatusoutput (cmd)
330 if status != 0:
331 print output
332 sys.exit(-1)
333 lines = string.split(output, '\n')
334 for line in lines:
335 if line [0] != '#' and line[:5] != "#CMT>":
336 print line
337 return line
338
339 def print_dependencies(self):
340 print '# ------------------------'
341 print '# package --> dependencies'
342 print '# ------------------------'
343 for key in self.packages.keys():
344 print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']
345
346 def print_status(self, status):
347 print '# ------------------------'
348 print '# package --> dependencies'
349 print '# ------------------------'
350 i = 1
351 for key in self.packages.keys():
352 if self.packages[key] ['status'] == status:
353 print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']
354 i = i + 1
355
356 def is_work_unit_waiting (self, name):
357 return self.packages[name] ['status'] == 'waiting'
358
359 def set_work_unit_status (self, name, status):
360 self.packages[name] ['status'] = status
361
362 def get_dependencies (self, name):
363 return self.packages[name] ['uses']
364
365 def get_next_work_units (self):
366 result = list ()
367 for key in self.packages.keys():
368 if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
369 result.append(key)
370 return result
371
372 def is_work_units (self):
373 result = list ()
374 for key in self.packages.keys():
375 if self.is_work_unit_waiting(key) :
376 return True
377 return False
378
379 def suppress_work_unit (self, name):
380 #print '# remove', name, 'from schedule'
381 for key in self.packages.keys():
382 if name in self.packages[key]['uses']:
383 self.packages[key]['uses'].remove(name)
384
385 def add_work_unit (self, name, cmd):
386 if self.is_work_unit_waiting (name):
387 # we create requests
388 arg = {'cmd': cmd , 'package':name}
389 req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback)
390 # then we put the work request in the queue...
391 self.set_work_unit_status (name, 'queued')
392 self.pool.putRequest(req)
393 #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
394
395 def execute (self, command):
396 #self.print_dependencies ()
397 packages = self.get_next_work_units()
398 if len(packages) !=0:
399 print '\n#--------------------------------------------------------------'
400 print '# Execute parallel actions within packages', packages
401 for package in packages:
402 self.add_work_unit (package, command)
403
404 def execute_all(self,command):
405 #self.print_dependencies ()
406 self.execute (command)
407 self.wait()
408 #self.print_dependencies ()
409 #self.print_status (status='waiting')
410 #while self.is_work_units():
411 #self.wait()
412
413 def wait (self):
414 self.pool.wait()
415
416 # this will be called each time a result is available
417 def result_callback(self, request, result):
418 #print "**Result: %s from request #%s" % (str(result), request.requestID)
419 #print "# Result: %s from request #%s" % (result['package'], request.requestID)
420 #if result['package'] == 'CodeCheck':
421 # sys.exit(-1)
422 self.execute (result['cmd'])
423
424 # the work the threads will have to do
425 def do_execute(self, arg):
426 path = self.get_work_area_path (arg['package'])
427 if path == None or not os.path.exists(path):
428 raise RuntimeError('Path to package '+ arg['package'] +' not found')
429 self.set_work_unit_status (arg['package'], 'running')
430 #cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"
431 #os.chdir(path)
432 #arg['cmd'] = "cd "+ path +";"+ arg['cmd']
433 header = '#--------------------------------------------------------------\n'
434 header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path + '\n'
435 header = header + '#--------------------------------------------------------------\n'
436 print header
437 project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
438 log_name = string.replace(path, project_path, '')
439 log_name = string.replace(log_name, '/cmt', '')
440 log_name = string.replace(log_name, '/', '_')
441 log_name = log_name+'.loglog'
442 arg['log'] = log_name
443 cmd = "cd "+ path +";"+ arg['cmd']
444 #status, output= commands.getstatusoutput(cmd)
445 # init output file
446
447 self.packages[arg['package']] ['startTime'] = time.time ()
448
449 if self.output is not None:
450 f1 = open (self.output+'/'+ log_name, 'w+')
451 f1.write (header)
452 f1.close()
453 f1 = open (self.output+'/'+ log_name, 'a')
454 if self.error is not None:
455 f2 = open (self.error+'/error'+log_name, 'w+')
456 Popen(cmd, shell=True, stdout=f1, stderr=f2).communicate()
457 f2.close()
458 else:
459 Popen(cmd, shell=True, stdout=f1, stderr=f1).communicate()
460 f1.close()
461 else:
462 Popen(cmd, shell=True).communicate()
463
464 if not self.keep_going and status > 0:
465 sys.exit(status)
466
467 self.packages[arg['package']] ['endTime'] = time.time ()
468 if self.perf:
469 self.semaphore.acquire ()
470 f = open (self.perf, 'a')
471 f.write (arg['package']+" "+str(self.packages[arg['package']] ['startTime'])+" "+str(self.packages[arg['package']] ['endTime'] )+'\n')
472 f.close()
473 self.semaphore.release()
474 self.suppress_work_unit (arg['package'])
475 self.set_work_unit_status (arg['package'], 'done')
476 # status, output= commands.getstatusoutput(cmd)
477 #if status != 0:
478 # raise RuntimeError(output)
479 return {'cmd': arg['cmd'], 'package':arg['package']}
480
481
482 # this will be called when an exception occurs within a thread
483 def handle_exception(self, request, exc_info):
484 #traceback.print_stack()
485 print '#--------------------------------------------------------------'
486 #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
487 if exc_info[0]== exceptions.SystemExit:
488 print "Stop execution (No_keep_going option enabled): exit code == %s " %(exc_info[1])
489 print '#--------------------------------------------------------------'
490 sys.exit(exc_info[1])
491 print "# Exception occured: %s" %(exc_info[1])
492 print exc_info
493 print '#--------------------------------------------------------------'
494 #sys.exit(-1)
495
496
497 def generate_make (self, file, command):
498 makefile = open (file, 'w+')
499 makefile.write ('MAKE=make\n')
500 #MFLAGS= -j10
501 self.counter = len(self.packages)
502 self.recursive_make (self.current_package, command, makefile, len(self.packages))
503 makefile.close ()
504
505 def recursive_make (self, package, command, makefile, indice,actions=list()):
506 lines = self.generate_action_make (package, command, indice)
507 makefile.write (lines)
508 #print lines
509 for pkg in self.packages[package] ['uses']:
510 if pkg not in actions:
511 actions.append(pkg)
512 indice = indice - 1
513 self.counter = self.counter - 1
514 self.recursive_make(pkg, command,makefile, indice, actions)
515
516 def generate_action_make (self, package, command, indice):
517 lines = package + ' :: '
518 # add dependencies
519 for pkg in self.packages[package] ['uses']:
520 lines = lines + ' ' + pkg
521
522 # add the action itself
523 newcommand = string.replace (command, '<package>', package)
524 if command =='':
525 newcommand='$(MAKE)'
526 lines = lines + '\n'
527 lines = lines + '\t@echo "#--------------------------------------------------------------"\n'
528 lines = lines + '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'"\n'
529 lines = lines + '\t@echo "#--------------------------------------------------------------"\n'
530 lines = lines + 'ifdef LOCATION\n'
531 lines = lines + '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'
532 lines = lines + '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n'
533 lines = lines + '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n'
534 lines = lines + '\t+@cd ' + self.packages[package]['path']
535 lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n'
536 lines = lines + 'else\n'
537 lines = lines + '\t+@cd ' + self.packages[package]['path']
538 lines = lines + ' && ' + newcommand + '\n'
539 lines = lines + 'endif\n\n'
540 return lines
541
542#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.