source: tbroadcast/v2.0.5/python/tbroadcast.py@ 488

Last change on this file since 488 was 478, checked in by rybkin, 17 years ago

See C.L. 2

  • Property svn:executable set to *
File size: 25.5 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9import os
10import sys
11import time
12import string
13import random
14import os.path
15import commands
16import traceback
17import exceptions
18from threading import BoundedSemaphore
19
20from threadpool import WorkRequest
21from threadpool import ThreadPool
22from threadpool import NoResultsPending
23from threadpool import NoWorkersAvailable
24from threadpool import makeRequests
25from executer import exeCommand
26from executer import getstatusoutput
27
28class Scheduler:
29
30 def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None, error=None, silent = False, perf=False, keep_going=True):
31 self.pool = ThreadPool(num_workers=num_workers)
32 self.current_package = self.get_current_package()
33 self.current_project = {'name': None, 'path': None, 'version': None}
34 self.packages = {}
35 self.counter = 0
36 self.semaphore = BoundedSemaphore(1)
37 self.local = local
38 self.ignore_cycles = ignore_cycles
39 self.output = output
40 self.error = error
41 self.silent = silent
42 self.perf = perf
43 self.keep_going = keep_going
44 if self.perf is not False:
45 f = open (self.perf, 'w+')
46 f.close()
47 if output is not None:
48 if not os.path.exists (output):
49 print "path",output,"no exits"
50 sys.exit(-1)
51 if not os.path.isdir(output):
52 print "path",output,"no a valid directory"
53 sys.exit(-1)
54
55 self.get_current_project()
56 self.instanciate_packages (file)
57 if self.local: self.get_local_graph()
58 self.check_cycles()
59
60# status, output = commands.getstatusoutput("cmt broadcast -local 'echo <package>'")
61# lignes = string.split(output, '\n')
62# i = 1
63# for package in lignes:
64# if package!='' and package[0] != '#':
65# print i , package
66# i =i +1
67# if not self.packages.has_key(package):
68# print package
69# print len(self.packages)
70# sys.exit(-1)
71
72 def get_current_project(self):
73 cmd = 'cmt show projects | grep current'
74 status, output = getstatusoutput (cmd)
75# status, output = commands.getstatusoutput (cmd)
76 if status != 0:
77 print output
78 sys.exit(-1)
79 lines = string.split(output, '\n')
80 for line in lines:
81 if line!='' and line [0] != '#':
82 item = string.split (line, ' ')
83 self.current_project ['name'] = item[0]
84 self.current_project ['version'] = item[1]
85 self.current_project ['path'] = item[3][:-1]
86 version = self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
87 if self.current_project ['version'] == version:
88 self.current_project ['path'] = os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
89 return
90 #print self.current_project
91
92 def get_counter(self):
93 self.semaphore.acquire ()
94 self.counter = self.counter + 1
95 value = self.counter
96 self.semaphore.release()
97 return value
98
99 def check_cycles (self):
100 cmd = 'cmt -private show cycles'
101 cycle_found = False
102 status, output = getstatusoutput (cmd)
103# status, output = commands.getstatusoutput (cmd)
104 if status != 0:
105 print output
106 sys.exit(-1)
107 lines = string.split(output, '\n')
108 cycles = list ()
109 for line in lines:
110 if line!='' and line [0] != '#':
111 cycles.append (string.split(line))
112 cercles =list()
113 for cycle in cycles:
114 cycleInProject = True
115 for package in cycle:
116 if not self.packages.has_key(package):
117 cycleInProject = False
118 if cycleInProject:
119 cercles.append(cycle)
120 if len(cercles):
121 if not self.ignore_cycles:
122 print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
123 for cycle in cercles:
124 loop = ""
125 for package in cycle:
126 loop = loop + package + ' -> '
127 print loop + '...'
128 sys.exit(-1)
129 else:
130 print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
131 for cycle in cercles:
132 loop = ""
133 for package in cycle:
134 loop = loop + package + ' -> '
135 if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
136 print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
137 self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
138# sys.exit(-1)
139
140 def format_uses (self, content):
141 # format variables
142 lignes = string.split(content, '\n')
143 lines = list()
144 for ligne in lignes:
145 if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
146 lines.append(ligne)
147 lines.reverse()
148 return lines
149
150 def format_paths (self, content):
151 # format variables
152 lignes = string.split(content, '\n')
153 lines = list()
154 for ligne in lignes:
155 if ligne[:4] == "use ":
156 lines.append(ligne)
157 return lines
158
159 def get_paths (self, content):
160 lines = self.format_paths(content)
161 for line in lines:
162 result = string.split (line[4:len(line)], ' ')
163 if self.packages.has_key(result[0]):
164 if len(result)==4:
165 name, version, offset, path = string.split (line[4:len(line)], " ")
166 #print name, version, offset, path
167 #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
168 if path == '(no_auto_imports)':
169 path = offset
170 offset = ''
171 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
172 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
173 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
174 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
175 else:
176 print '# error path not found for', name
177 sys.exit(-1)
178 elif len(result)==5:
179 name, version, offset, path, importation = string.split (line[4:len(line)], " ")
180 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
181 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
182 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
183 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
184 else:
185 print '# error path not found for', name
186 sys.exit(-1)
187 elif len(result)==3:
188 name, version, path = string.split (line[4:len(line)], " ")
189 if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
190 full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
191 elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):
192 full_path = path[1:-1] + '/' +name + + '/cmt'
193 else:
194 print '# error path not found for', name
195 sys.exit(-1)
196 else:
197 print "error:",line
198 print str(result)
199 sys.exit(-1)
200 self.packages[result[0]]['path'] = os.path.normpath(full_path)
201 commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
202 if os.path.normpath(commonprefix) == self.current_project ['path']:
203 #print result[0], ' belong to project', self.current_project ['name']
204 self.packages[result[0]]['current_project'] = True
205
206 def get_uses(self, content):
207 # initiates variables
208 lignes = self.format_uses(content)
209 if not len(lignes): return
210 self.packages [self.current_package] = {'version': '*', 'client': list(),
211 'uses': list(), 'status': 'waiting',
212 'current_project': True, 'path': os.getcwd()}
213 previous_client = self.current_package
214 previous_level = 0
215 level_stack = [{'name':previous_client,'level':previous_level},]
216 ligne = lignes.pop()
217 while len(lignes)!=0:
218 current_level = string.find(ligne, 'use')
219 while current_level > previous_level:
220 name = string.split (ligne)[2]
221 version = string.split (ligne)[3]
222 if not self.packages.has_key (name):
223 self.packages [name] = {'version': version, 'uses': list(),
224 'client': list(), 'status': 'waiting',
225 'current_project': False, 'path': None}
226 if name not in self.packages[previous_client]['uses']:# and name != previous_client:
227 self.packages[previous_client]['uses'].append (name)
228 level_stack.append({'name':previous_client,'level':previous_level})
229 previous_client = name
230 previous_level = current_level
231 if len(lignes):
232 ligne = lignes.pop()
233 #print ligne
234 current_level = string.find(ligne, 'use')
235
236 #self.packages [previous_client]['status'] ='queued'
237 # restore the level
238 if len(lignes):
239 if len(level_stack):
240 item = level_stack.pop()
241 while item['level'] >= current_level and len(level_stack):
242 item = level_stack.pop()
243 previous_client = item['name']
244 previous_level = item['level']
245 #print previous_client, '-->',string.split (ligne)[2]
246
247 def instanciate_packages(self, file=None):
248 # We create the schedule of the work units
249 print '# First, we initialize the DAG by parsing "cmt show uses"'
250 if file is None:
251 cmd = 'cmt show uses'
252 else:
253 cmd = 'cat ' + file
254 status, output = getstatusoutput (cmd)
255# status, output = commands.getstatusoutput (cmd)
256 if status != 0:
257 print output
258 sys.exit(-1)
259 self.get_uses(output)
260 self.get_paths(output)
261 #self.check_execution (package=self.current_package)
262 #self.simulate_execution()
263
264 def get_local_graph(self):
265 To_remove = list()
266 for key in self.packages:
267 if self.packages[key]['current_project']== False:
268 for selected in self.packages:
269 if key in self.packages[selected]['uses']:
270 self.packages[selected]['uses'].remove(key)
271 To_remove.append (key)
272 for item in To_remove:
273 del self.packages[item]
274
275 def simulate_execution(self):
276 ok = True
277 indice = 1
278 while ok:
279 runnable = list()
280 for key in self.packages:
281 if self.packages[key]['status']!='done':
282 if len(self.packages[key]['uses']) == 0:
283 runnable.append(key)
284 if len(runnable):
285 print '\n#--------------------------------------------------------------'
286 print "# Execute parallel actions within packages " + str(runnable)
287 for selected in runnable:
288 print '#--------------------------------------------------------------'
289 print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path']
290 print '#--------------------------------------------------------------'
291 self.packages[selected]['status']='done'
292 indice = indice + 1
293 for key in self.packages:
294 if selected in self.packages[key]['uses']:
295 self.packages[key]['uses'].remove(selected)
296 #print 'remove', selected, 'from',key
297 if len(runnable)==0:
298 ok = False
299
300 def check_execution(self, package, path=list(), cycles=list()):
301 #print package,'-->',self.packages[package]['uses']
302 #print path
303 if package in path:
304 if path[path.index(package):] not in cycles:
305 print 'Cycles:',path[path.index(package):], package
306 cycles = cycles + path[path.index(package):]
307 sys.exit(-1)
308 path.append(package)
309 for item in self.packages[package]['uses']:
310 self.check_execution(package=item, path=path, cycles=cycles)
311 path.pop()
312
313 def get_current_package(self):
314 cmd = 'cmt show macro package'
315 status, output = getstatusoutput (cmd)
316# status, output = commands.getstatusoutput (cmd)
317 if status != 0:
318 print output
319 sys.exit(-1)
320 lines = string.split(output, '\n')
321 for line in lines:
322 if line [0] != '#':
323 start = string.find(line,"'")
324 end = string.find(line[start+1:len(line)],"'")
325 return line [start+1:start+end+1]
326
327 def get_work_area_path (self, name):
328 return self.packages [name]['path']
329
330 def get_package_path (self, name):
331 #return os.getcwd ()
332 cmd = 'cmt -use='+name+' run pwd'
333 status, output = getstatusoutput (cmd)
334# status, output = commands.getstatusoutput (cmd)
335 if status != 0:
336 print output
337 sys.exit(-1)
338 lines = string.split(output, '\n')
339 for line in lines:
340 if line [0] != '#' and line[:5] != "#CMT>":
341 print line
342 return line
343
344 def print_dependencies(self):
345 print '# ------------------------'
346 print '# package --> dependencies'
347 print '# ------------------------'
348 for key in self.packages.keys():
349 print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']
350
351 def print_status(self, status):
352 print '# ------------------------'
353 print '# package --> dependencies'
354 print '# ------------------------'
355 i = 1
356 for key in self.packages.keys():
357 if self.packages[key] ['status'] == status:
358 print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']
359 i = i + 1
360
361 def is_work_unit_waiting (self, name):
362 return self.packages[name] ['status'] == 'waiting'
363
364 def set_work_unit_status (self, name, status):
365 self.packages[name] ['status'] = status
366
367 def get_dependencies (self, name):
368 return self.packages[name] ['uses']
369
370 def get_next_work_units (self):
371 result = list ()
372 for key in self.packages.keys():
373 if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
374 result.append(key)
375 return result
376
377 def is_work_units (self):
378 result = list ()
379 for key in self.packages.keys():
380 if self.is_work_unit_waiting(key) :
381 return True
382 return False
383
384 def suppress_work_unit (self, name):
385 #print '# remove', name, 'from schedule'
386 for key in self.packages.keys():
387 if name in self.packages[key]['uses']:
388 self.packages[key]['uses'].remove(name)
389
390 def add_work_unit (self, name, cmd):
391 if self.is_work_unit_waiting (name):
392 # we create requests
393 arg = {'cmd': cmd , 'package':name}
394 req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception)
395 # then we put the work request in the queue...
396 self.set_work_unit_status (name, 'queued')
397 self.pool.putRequest(req)
398 #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
399
400 def execute (self, command):
401 #self.print_dependencies ()
402 packages = self.get_next_work_units()
403 if len(packages) !=0:
404 print '\n#--------------------------------------------------------------'
405 print '# Execute parallel actions within packages', packages
406 for package in packages:
407 self.add_work_unit (package, command)
408
409 def execute_all(self,command):
410 #self.print_dependencies ()
411 self.execute (command)
412 self.wait()
413 #self.print_dependencies ()
414 #self.print_status (status='waiting')
415 #while self.is_work_units():
416 #self.wait()
417
418 def wait (self):
419 self.pool.wait()
420
421 # this will be called each time a result is available
422 def result_callback(self, request, result):
423 #print "**Result: %s from request #%s" % (str(result), request.requestID)
424 #print "# Result: %s from request #%s" % (result['package'], request.requestID)
425 #if result['package'] == 'CodeCheck':
426 # sys.exit(-1)
427 self.execute (result['cmd'])
428
429 # the work the threads will have to do
430 def do_execute(self, arg):
431 path = self.get_work_area_path (arg['package'])
432 if path == None or not os.path.exists(path):
433 raise RuntimeError('Path to package '+ arg['package'] +' not found')
434 self.set_work_unit_status (arg['package'], 'running')
435 #cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"
436 #os.chdir(path)
437 #arg['cmd'] = "cd "+ path +";"+ arg['cmd']
438 header = '#--------------------------------------------------------------\n'
439 header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path + '\n'
440 header = header + '#--------------------------------------------------------------\n'
441 print header
442 project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
443 log_name = string.replace(path, project_path, '')
444 log_name = string.replace(log_name, '/cmt', '')
445 log_name = string.replace(log_name, '/', '_')
446 log_name = log_name+'.loglog'
447 arg['log'] = log_name
448 cmd = "cd "+ path +";"+ arg['cmd']
449 #status, output= commands.getstatusoutput(cmd)
450 # init output file
451 if self.output is not None:
452 f = open (self.output+'/'+ log_name, 'w+')
453 f.write (header)
454 f.close()
455 if self.error is not None:
456 f = open (self.error+'/error'+log_name, 'w+')
457 f.close()
458 self.packages[arg['package']] ['startTime'] = time.time ()
459 status, output, error, pythonError = exeCommand(sCmd=cmd, oLineCallback=self.redirectOutput, arg=arg)#,iTimeout = 3600)
460 if not self.keep_going and status > 0:
461 sys.exit(status)
462
463 self.packages[arg['package']] ['endTime'] = time.time ()
464 if self.perf:
465 self.semaphore.acquire ()
466 f = open (self.perf, 'a')
467 f.write (arg['package']+" "+str(self.packages[arg['package']] ['startTime'])+" "+str(self.packages[arg['package']] ['endTime'] )+'\n')
468 f.close()
469 self.semaphore.release()
470 self.suppress_work_unit (arg['package'])
471 self.set_work_unit_status (arg['package'], 'done')
472 # status, output= commands.getstatusoutput(cmd)
473 #if status != 0:
474 # raise RuntimeError(output)
475 return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
476
477 def redirectOutput(self, index, buffer, arg):
478 """Filter function to redirect the std output and error of the job
479 executable for real-time debugging
480 """
481 if self.output is not None:
482 if index==0:
483 f = open (self.output+'/'+arg['log'], 'a')
484 f.write (buffer+'\n')
485 f.close()
486 elif index==1:
487 if self.error is not None:
488 f = open (self.error+'/error'+arg['log'], 'a')
489 else:
490 f = open (self.output+'/'+arg['log'], 'a')
491 f.write (buffer+'\n')
492 f.close()
493 if not self.silent:
494 print buffer
495
496 # this will be called when an exception occurs within a thread
497 def handle_exception(self, request, exc_info):
498 #traceback.print_stack()
499 print '#--------------------------------------------------------------'
500 #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
501 if exc_info[0]== exceptions.SystemExit:
502 print "Stop execution (No_keep_going option enabled): exit code == %s " %(exc_info[1])
503 print '#--------------------------------------------------------------'
504 sys.exit(exc_info[1])
505 print "# Exception occured: %s" %(exc_info[1])
506 print exc_info
507 print '#--------------------------------------------------------------'
508 #sys.exit(-1)
509
510
511 def generate_make (self, file, command):
512 makefile = open (file, 'w+')
513 makefile.write ('MAKE=make\n')
514 #MFLAGS= -j10
515 self.counter = len(self.packages)
516 self.recursive_make (self.current_package, command, makefile, len(self.packages))
517 makefile.close ()
518
519 def recursive_make (self, package, command, makefile, indice,actions=list()):
520 lines = self.generate_action_make (package, command, indice)
521 makefile.write (lines)
522 #print lines
523 for pkg in self.packages[package] ['uses']:
524 if pkg not in actions:
525 actions.append(pkg)
526 indice = indice - 1
527 self.counter = self.counter - 1
528 self.recursive_make(pkg, command,makefile, indice, actions)
529
530 def generate_action_make (self, package, command, indice):
531 lines = package + ' :: '
532 # add dependencies
533 for pkg in self.packages[package] ['uses']:
534 lines = lines + ' ' + pkg
535
536 # add the action itself
537 newcommand = string.replace (command, '<package>', package)
538 if command =='':
539 newcommand='$(MAKE)'
540 lines = lines + '\n'
541 lines = lines + '\t@echo "#--------------------------------------------------------------"\n'
542 lines = lines + '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'"\n'
543 lines = lines + '\t@echo "#--------------------------------------------------------------"\n'
544 lines = lines + 'ifdef LOCATION\n'
545 lines = lines + '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'
546 lines = lines + '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n'
547 lines = lines + '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n'
548 lines = lines + '\t+@cd ' + self.packages[package]['path']
549 lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n'
550 lines = lines + 'else\n'
551 lines = lines + '\t+@cd ' + self.packages[package]['path']
552 lines = lines + ' && ' + newcommand + '\n'
553 lines = lines + 'endif\n\n'
554 return lines
555
556#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.