source: tbroadcast/v2.0.4/python/tbroadcast.py@ 483

Last change on this file since 483 was 393, checked in by garonne, 19 years ago

See C.L 1

  • Property svn:executable set to *
File size: 25.2 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9import os
10import sys
11import time
12import string
13import random
14import os.path
15import commands
16import traceback
17import exceptions
18from threading import BoundedSemaphore
19
20from threadpool import WorkRequest
21from threadpool import ThreadPool
22from threadpool import NoResultsPending
23from threadpool import NoWorkersAvailable
24from threadpool import makeRequests
25from executer import exeCommand
26
27class Scheduler:
28
29 def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None, error=None, silent = False, perf=False, keep_going=True):
30 self.pool = ThreadPool(num_workers=num_workers)
31 self.current_package = self.get_current_package()
32 self.current_project = {'name': None, 'path': None, 'version': None}
33 self.packages = {}
34 self.counter = 0
35 self.semaphore = BoundedSemaphore(1)
36 self.local = local
37 self.ignore_cycles = ignore_cycles
38 self.output = output
39 self.error = error
40 self.silent = silent
41 self.perf = perf
42 self.keep_going = keep_going
43 if self.perf is not False:
44 f = open (self.perf, 'w+')
45 f.close()
46 if output is not None:
47 if not os.path.exists (output):
48 print "path",output,"no exits"
49 sys.exit(-1)
50 if not os.path.isdir(output):
51 print "path",output,"no a valid directory"
52 sys.exit(-1)
53
54 self.get_current_project()
55 self.instanciate_packages (file)
56 if self.local: self.get_local_graph()
57 self.check_cycles()
58
59# status, output = commands.getstatusoutput("cmt broadcast -local 'echo <package>'")
60# lignes = string.split(output, '\n')
61# i = 1
62# for package in lignes:
63# if package!='' and package[0] != '#':
64# print i , package
65# i =i +1
66# if not self.packages.has_key(package):
67# print package
68# print len(self.packages)
69# sys.exit(-1)
70
71 def get_current_project(self):
72 cmd = 'cmt show projects | grep current'
73 status, output = commands.getstatusoutput (cmd)
74 if status != 0:
75 print output
76 sys.exit(-1)
77 lines = string.split(output, '\n')
78 for line in lines:
79 if line!='' and line [0] != '#':
80 item = string.split (line, ' ')
81 self.current_project ['name'] = item[0]
82 self.current_project ['version'] = item[1]
83 self.current_project ['path'] = item[3][:-1]
84 version = self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
85 if self.current_project ['version'] == version:
86 self.current_project ['path'] = os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
87 return
88 #print self.current_project
89
90 def get_counter(self):
91 self.semaphore.acquire ()
92 self.counter = self.counter + 1
93 value = self.counter
94 self.semaphore.release()
95 return value
96
97 def check_cycles (self):
98 cmd = 'cmt -private show cycles'
99 cycle_found = False
100 status, output = commands.getstatusoutput (cmd)
101 if status != 0:
102 print output
103 sys.exit(-1)
104 lines = string.split(output, '\n')
105 cycles = list ()
106 for line in lines:
107 if line!='' and line [0] != '#':
108 cycles.append (string.split(line))
109 cercles =list()
110 for cycle in cycles:
111 cycleInProject = True
112 for package in cycle:
113 if not self.packages.has_key(package):
114 cycleInProject = False
115 if cycleInProject:
116 cercles.append(cycle)
117 if len(cercles):
118 if not self.ignore_cycles:
119 print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
120 for cycle in cercles:
121 loop = ""
122 for package in cycle:
123 loop = loop + package + ' -> '
124 print loop + '...'
125 sys.exit(-1)
126 else:
127 print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
128 for cycle in cercles:
129 loop = ""
130 for package in cycle:
131 loop = loop + package + ' -> '
132 if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
133 print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
134 self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
135# sys.exit(-1)
136
137 def format_uses (self, content):
138 # format variables
139 lignes = string.split(content, '\n')
140 lines = list()
141 for ligne in lignes:
142 if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
143 lines.append(ligne)
144 lines.reverse()
145 return lines
146
147 def format_paths (self, content):
148 # format variables
149 lignes = string.split(content, '\n')
150 lines = list()
151 for ligne in lignes:
152 if ligne[:4] == "use ":
153 lines.append(ligne)
154 return lines
155
156 def get_paths (self, content):
157 lines = self.format_paths(content)
158 for line in lines:
159 result = string.split (line[4:len(line)], ' ')
160 if self.packages.has_key(result[0]):
161 if len(result)==4:
162 name, version, offset, path = string.split (line[4:len(line)], " ")
163 #print name, version, offset, path
164 #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
165 if path == '(no_auto_imports)':
166 path = offset
167 offset = ''
168 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
169 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
170 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
171 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
172 else:
173 print '# error path not found for', name
174 sys.exit(-1)
175 elif len(result)==5:
176 name, version, offset, path, importation = string.split (line[4:len(line)], " ")
177 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
178 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
179 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
180 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
181 else:
182 print '# error path not found for', name
183 sys.exit(-1)
184 elif len(result)==3:
185 name, version, path = string.split (line[4:len(line)], " ")
186 if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
187 full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
188 elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):
189 full_path = path[1:-1] + '/' +name + + '/cmt'
190 else:
191 print '# error path not found for', name
192 sys.exit(-1)
193 else:
194 print "error:",line
195 print str(result)
196 sys.exit(-1)
197 self.packages[result[0]]['path'] = os.path.normpath(full_path)
198 commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
199 if os.path.normpath(commonprefix) == self.current_project ['path']:
200 #print result[0], ' belong to project', self.current_project ['name']
201 self.packages[result[0]]['current_project'] = True
202
203 def get_uses(self, content):
204 # initiates variables
205 lignes = self.format_uses(content)
206 if not len(lignes): return
207 self.packages [self.current_package] = {'version': '*', 'client': list(),
208 'uses': list(), 'status': 'waiting',
209 'current_project': True, 'path': os.getcwd()}
210 previous_client = self.current_package
211 previous_level = 0
212 level_stack = [{'name':previous_client,'level':previous_level},]
213 ligne = lignes.pop()
214 while len(lignes)!=0:
215 current_level = string.find(ligne, 'use')
216 while current_level > previous_level:
217 name = string.split (ligne)[2]
218 version = string.split (ligne)[3]
219 if not self.packages.has_key (name):
220 self.packages [name] = {'version': version, 'uses': list(),
221 'client': list(), 'status': 'waiting',
222 'current_project': False, 'path': None}
223 if name not in self.packages[previous_client]['uses']:# and name != previous_client:
224 self.packages[previous_client]['uses'].append (name)
225 level_stack.append({'name':previous_client,'level':previous_level})
226 previous_client = name
227 previous_level = current_level
228 if len(lignes):
229 ligne = lignes.pop()
230 #print ligne
231 current_level = string.find(ligne, 'use')
232
233 #self.packages [previous_client]['status'] ='queued'
234 # restore the level
235 if len(lignes):
236 if len(level_stack):
237 item = level_stack.pop()
238 while item['level'] >= current_level and len(level_stack):
239 item = level_stack.pop()
240 previous_client = item['name']
241 previous_level = item['level']
242 #print previous_client, '-->',string.split (ligne)[2]
243
244 def instanciate_packages(self, file=None):
245 # We create the schedule of the work units
246 print '# First, we initialize the DAG by parsing "cmt show uses"'
247 if file is None:
248 cmd = 'cmt show uses'
249 else:
250 cmd = 'cat ' + file
251 status, output = commands.getstatusoutput (cmd)
252 if status != 0:
253 print output
254 sys.exit(-1)
255 self.get_uses(output)
256 self.get_paths(output)
257 #self.check_execution (package=self.current_package)
258 #self.simulate_execution()
259
260 def get_local_graph(self):
261 To_remove = list()
262 for key in self.packages:
263 if self.packages[key]['current_project']== False:
264 for selected in self.packages:
265 if key in self.packages[selected]['uses']:
266 self.packages[selected]['uses'].remove(key)
267 To_remove.append (key)
268 for item in To_remove:
269 del self.packages[item]
270
271 def simulate_execution(self):
272 ok = True
273 indice = 1
274 while ok:
275 runnable = list()
276 for key in self.packages:
277 if self.packages[key]['status']!='done':
278 if len(self.packages[key]['uses']) == 0:
279 runnable.append(key)
280 if len(runnable):
281 print '\n#--------------------------------------------------------------'
282 print "# Execute parallel actions within packages " + str(runnable)
283 for selected in runnable:
284 print '#--------------------------------------------------------------'
285 print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path']
286 print '#--------------------------------------------------------------'
287 self.packages[selected]['status']='done'
288 indice = indice + 1
289 for key in self.packages:
290 if selected in self.packages[key]['uses']:
291 self.packages[key]['uses'].remove(selected)
292 #print 'remove', selected, 'from',key
293 if len(runnable)==0:
294 ok = False
295
296 def check_execution(self, package, path=list(), cycles=list()):
297 #print package,'-->',self.packages[package]['uses']
298 #print path
299 if package in path:
300 if path[path.index(package):] not in cycles:
301 print 'Cycles:',path[path.index(package):], package
302 cycles = cycles + path[path.index(package):]
303 sys.exit(-1)
304 path.append(package)
305 for item in self.packages[package]['uses']:
306 self.check_execution(package=item, path=path, cycles=cycles)
307 path.pop()
308
309 def get_current_package(self):
310 cmd = 'cmt show macro package'
311 status, output = commands.getstatusoutput (cmd)
312 if status != 0:
313 print output
314 sys.exit(-1)
315 lines = string.split(output, '\n')
316 for line in lines:
317 if line [0] != '#':
318 start = string.find(line,"'")
319 end = string.find(line[start+1:len(line)],"'")
320 return line [start+1:start+end+1]
321
322 def get_work_area_path (self, name):
323 return self.packages [name]['path']
324
325 def get_package_path (self, name):
326 #return os.getcwd ()
327 cmd = 'cmt -use='+name+' run pwd'
328 status, output = commands.getstatusoutput (cmd)
329 if status != 0:
330 print output
331 sys.exit(-1)
332 lines = string.split(output, '\n')
333 for line in lines:
334 if line [0] != '#' and line[:5] != "#CMT>":
335 print line
336 return line
337
338 def print_dependencies(self):
339 print '# ------------------------'
340 print '# package --> dependencies'
341 print '# ------------------------'
342 for key in self.packages.keys():
343 print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']
344
345 def print_status(self, status):
346 print '# ------------------------'
347 print '# package --> dependencies'
348 print '# ------------------------'
349 i = 1
350 for key in self.packages.keys():
351 if self.packages[key] ['status'] == status:
352 print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']
353 i = i + 1
354
355 def is_work_unit_waiting (self, name):
356 return self.packages[name] ['status'] == 'waiting'
357
358 def set_work_unit_status (self, name, status):
359 self.packages[name] ['status'] = status
360
361 def get_dependencies (self, name):
362 return self.packages[name] ['uses']
363
364 def get_next_work_units (self):
365 result = list ()
366 for key in self.packages.keys():
367 if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
368 result.append(key)
369 return result
370
371 def is_work_units (self):
372 result = list ()
373 for key in self.packages.keys():
374 if self.is_work_unit_waiting(key) :
375 return True
376 return False
377
378 def suppress_work_unit (self, name):
379 #print '# remove', name, 'from schedule'
380 for key in self.packages.keys():
381 if name in self.packages[key]['uses']:
382 self.packages[key]['uses'].remove(name)
383
384 def add_work_unit (self, name, cmd):
385 if self.is_work_unit_waiting (name):
386 # we create requests
387 arg = {'cmd': cmd , 'package':name}
388 req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception)
389 # then we put the work request in the queue...
390 self.set_work_unit_status (name, 'queued')
391 self.pool.putRequest(req)
392 #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
393
394 def execute (self, command):
395 #self.print_dependencies ()
396 packages = self.get_next_work_units()
397 if len(packages) !=0:
398 print '\n#--------------------------------------------------------------'
399 print '# Execute parallel actions within packages', packages
400 for package in packages:
401 self.add_work_unit (package, command)
402
403 def execute_all(self,command):
404 #self.print_dependencies ()
405 self.execute (command)
406 self.wait()
407 #self.print_dependencies ()
408 #self.print_status (status='waiting')
409 #while self.is_work_units():
410 #self.wait()
411
412 def wait (self):
413 self.pool.wait()
414
415 # this will be called each time a result is available
416 def result_callback(self, request, result):
417 #print "**Result: %s from request #%s" % (str(result), request.requestID)
418 #print "# Result: %s from request #%s" % (result['package'], request.requestID)
419 #if result['package'] == 'CodeCheck':
420 # sys.exit(-1)
421 self.execute (result['cmd'])
422
423 # the work the threads will have to do
424 def do_execute(self, arg):
425 path = self.get_work_area_path (arg['package'])
426 if path == None or not os.path.exists(path):
427 raise RuntimeError('Path to package '+ arg['package'] +' not found')
428 self.set_work_unit_status (arg['package'], 'running')
429 #cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"
430 #os.chdir(path)
431 #arg['cmd'] = "cd "+ path +";"+ arg['cmd']
432 header = '#--------------------------------------------------------------\n'
433 header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path + '\n'
434 header = header + '#--------------------------------------------------------------\n'
435 print header
436 project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
437 log_name = string.replace(path, project_path, '')
438 log_name = string.replace(log_name, '/cmt', '')
439 log_name = string.replace(log_name, '/', '_')
440 log_name = log_name+'.loglog'
441 arg['log'] = log_name
442 cmd = "cd "+ path +";"+ arg['cmd']
443 #status, output= commands.getstatusoutput(cmd)
444 # init output file
445 if self.output is not None:
446 f = open (self.output+'/'+ log_name, 'w+')
447 f.write (header)
448 f.close()
449 if self.error is not None:
450 f = open (self.error+'/error'+log_name, 'w+')
451 f.close()
452 self.packages[arg['package']] ['startTime'] = time.time ()
453 status, output, error, pythonError = exeCommand(sCmd=cmd, oLineCallback=self.redirectOutput, arg=arg)#,iTimeout = 3600)
454 if not self.keep_going and status > 0:
455 sys.exit(status)
456
457 self.packages[arg['package']] ['endTime'] = time.time ()
458 if self.perf:
459 self.semaphore.acquire ()
460 f = open (self.perf, 'a')
461 f.write (arg['package']+" "+str(self.packages[arg['package']] ['startTime'])+" "+str(self.packages[arg['package']] ['endTime'] )+'\n')
462 f.close()
463 self.semaphore.release()
464 self.suppress_work_unit (arg['package'])
465 self.set_work_unit_status (arg['package'], 'done')
466 # status, output= commands.getstatusoutput(cmd)
467 #if status != 0:
468 # raise RuntimeError(output)
469 return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
470
471 def redirectOutput(self, index, buffer, arg):
472 """Filter function to redirect the std output and error of the job
473 executable for real-time debugging
474 """
475 if self.output is not None:
476 if index==0:
477 f = open (self.output+'/'+arg['log'], 'a')
478 f.write (buffer+'\n')
479 f.close()
480 elif index==1:
481 if self.error is not None:
482 f = open (self.error+'/error'+arg['log'], 'a')
483 else:
484 f = open (self.output+'/'+arg['log'], 'a')
485 f.write (buffer+'\n')
486 f.close()
487 if not self.silent:
488 print buffer
489
490 # this will be called when an exception occurs within a thread
491 def handle_exception(self, request, exc_info):
492 #traceback.print_stack()
493 print '#--------------------------------------------------------------'
494 #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
495 if exc_info[0]== exceptions.SystemExit:
496 print "Stop execution (No_keep_going option enabled): exit code == %s " %(exc_info[1])
497 print '#--------------------------------------------------------------'
498 sys.exit(exc_info[1])
499 print "# Exception occured: %s" %(exc_info[1])
500 print exc_info
501 print '#--------------------------------------------------------------'
502 #sys.exit(-1)
503
504
505 def generate_make (self, file, command):
506 makefile = open (file, 'w+')
507 makefile.write ('MAKE=make\n')
508 #MFLAGS= -j10
509 self.counter = len(self.packages)
510 self.recursive_make (self.current_package, command, makefile, len(self.packages))
511 makefile.close ()
512
513 def recursive_make (self, package, command, makefile, indice,actions=list()):
514 lines = self.generate_action_make (package, command, indice)
515 makefile.write (lines)
516 #print lines
517 for pkg in self.packages[package] ['uses']:
518 if pkg not in actions:
519 actions.append(pkg)
520 indice = indice - 1
521 self.counter = self.counter - 1
522 self.recursive_make(pkg, command,makefile, indice, actions)
523
524 def generate_action_make (self, package, command, indice):
525 lines = package + ' :: '
526 # add dependencies
527 for pkg in self.packages[package] ['uses']:
528 lines = lines + ' ' + pkg
529
530 # add the action itself
531 newcommand = string.replace (command, '<package>', package)
532 if command =='':
533 newcommand='$(MAKE)'
534 lines = lines + '\n'
535 lines = lines + '\t@echo "#--------------------------------------------------------------"\n'
536 lines = lines + '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'"\n'
537 lines = lines + '\t@echo "#--------------------------------------------------------------"\n'
538 lines = lines + 'ifdef LOCATION\n'
539 lines = lines + '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'
540 lines = lines + '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n'
541 lines = lines + '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n'
542 lines = lines + '\t+@cd ' + self.packages[package]['path']
543 lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n'
544 lines = lines + 'else\n'
545 lines = lines + '\t+@cd ' + self.packages[package]['path']
546 lines = lines + ' && ' + newcommand + '\n'
547 lines = lines + 'endif\n\n'
548 return lines
549
550#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.