source: tbroadcast/HEAD/python/tbroadcast.py@ 664

Last change on this file since 664 was 573, checked in by rybkin, 14 years ago

See C.L. 6

  • Property svn:executable set to *
File size: 23.7 KB
RevLine 
[232]1#----------------------------------#
2# -- Author: V.Garonne
[242]3# -- Mail: garonne@lal.in2p3.fr
[517]4# -- Date: 08/25/2006
[232]5# -- Name: tbroadcast
[242]6# -- Description: main class
[232]7#----------------------------------#
8
[517]9# 21-Jul-2009 compile most used packages first; protect critical sections
10
[232]11import os
12import sys
13import time
14import string
[243]15import os.path
[517]16# import commands
[239]17import traceback
[393]18import exceptions
[241]19from threading import BoundedSemaphore
[232]20
[508]21from threadpool import WorkRequest
22from threadpool import ThreadPool
[232]23
[508]24from subprocess import Popen
25
[232]26class Scheduler:
27
[517]28 def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, sort=False,
29 output=None, error=None, silent = False, perf=False, keep_going=True):
30 self.pool = ThreadPool(num_workers=num_workers, poll_timeout=3)
31 self.num_workers = num_workers
[243]32 self.current_project = {'name': None, 'path': None, 'version': None}
[244]33 self.packages = {}
[241]34 self.counter = 0
35 self.semaphore = BoundedSemaphore(1)
[243]36 self.local = local
[517]37 self.sort = sort
[243]38 self.ignore_cycles = ignore_cycles
39 self.output = output
[244]40 self.error = error
41 self.silent = silent
[245]42 self.perf = perf
[393]43 self.keep_going = keep_going
[517]44 if self.sort:
45 print "Compile packages sorted according to use count"
[248]46 if self.perf is not False:
47 f = open (self.perf, 'w+')
[245]48 f.close()
[243]49 if output is not None:
50 if not os.path.exists (output):
[517]51 print "path",output,"does not exists"
[243]52 sys.exit(-1)
53 if not os.path.isdir(output):
[517]54 print "path",output,"is not a valid directory"
55 sys.exit(-1)
56
57 # init cmt stuff
[243]58 self.get_current_project()
[517]59 self.current_package = self.get_current_package()
[243]60 self.instanciate_packages (file)
61 if self.local: self.get_local_graph()
[238]62 self.check_cycles()
[517]63 self.get_use_count()
[243]64
65 def get_current_project(self):
66 cmd = 'cmt show projects | grep current'
[517]67 status, output = getstatusoutput (cmd)
68 #status, output = commands.getstatusoutput (cmd)
[243]69 if status != 0:
[573]70 print "WARNING: CMT exited with non-zero status!"
[243]71 print output
[517]72 sys.exit(-1)
[243]73 lines = string.split(output, '\n')
[517]74 for line in lines:
75 if line!='' and line [0] != '#':
[243]76 item = string.split (line, ' ')
77 self.current_project ['name'] = item[0]
78 self.current_project ['version'] = item[1]
79 self.current_project ['path'] = item[3][:-1]
80 version = self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
81 if self.current_project ['version'] == version:
82 self.current_project ['path'] = os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
[316]83 return
[517]84
[241]85 def get_counter(self):
[517]86 self.semaphore.acquire ()
[241]87 self.counter = self.counter + 1
[517]88 value = self.counter
[241]89 self.semaphore.release()
90 return value
[517]91
[238]92 def check_cycles (self):
[243]93 cmd = 'cmt -private show cycles'
[238]94 cycle_found = False
[517]95 status, output = getstatusoutput (cmd)
96 #status, output = commands.getstatusoutput (cmd)
[238]97 if status != 0:
[573]98 print "WARNING: CMT exited with non-zero status!"
[238]99 print output
[517]100 sys.exit(-1)
[238]101 lines = string.split(output, '\n')
[517]102 cycles = list()
103 for line in lines:
104 if line!='' and line [0] != '#':
105 cycles.append (string.split(line))
106 cercles =list()
[243]107 for cycle in cycles:
108 cycleInProject = True
[517]109 for package in cycle:
[243]110 if not self.packages.has_key(package):
[517]111 cycleInProject = False
112 if cycleInProject:
[243]113 cercles.append(cycle)
114 if len(cercles):
115 if not self.ignore_cycles:
116 print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
117 for cycle in cercles:
[517]118 loop = ""
[243]119 for package in cycle:
120 loop = loop + package + ' -> '
121 print loop + '...'
[517]122 sys.exit(-1)
[243]123 else:
124 print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
125 for cycle in cercles:
[517]126 loop = ""
[243]127 for package in cycle:
[517]128 loop = loop + package + ' -> '
[243]129 if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
130 print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
131 self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
132# sys.exit(-1)
[242]133
134 def format_uses (self, content):
135 # format variables
136 lignes = string.split(content, '\n')
137 lines = list()
138 for ligne in lignes:
[243]139 if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
[242]140 lines.append(ligne)
141 lines.reverse()
142 return lines
143
144 def format_paths (self, content):
145 # format variables
146 lignes = string.split(content, '\n')
147 lines = list()
148 for ligne in lignes:
[517]149 if ligne[:4] == "use ":
[242]150 lines.append(ligne)
151 return lines
152
153 def get_paths (self, content):
154 lines = self.format_paths(content)
[232]155 for line in lines:
[237]156 result = string.split (line[4:len(line)], ' ')
157 if self.packages.has_key(result[0]):
158 if len(result)==4:
159 name, version, offset, path = string.split (line[4:len(line)], " ")
[241]160 #print name, version, offset, path
161 #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
162 if path == '(no_auto_imports)':
163 path = offset
164 offset = ''
[237]165 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
166 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
[517]167 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
[237]168 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
169 else:
170 print '# error path not found for', name
[517]171 sys.exit(-1)
[237]172 elif len(result)==5:
[517]173 name, version, offset, path, importation = string.split (line[4:len(line)], " ")
[237]174 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
175 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
[517]176 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
[237]177 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
178 else:
179 print '# error path not found for', name
[517]180 sys.exit(-1)
[237]181 elif len(result)==3:
182 name, version, path = string.split (line[4:len(line)], " ")
183 if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
184 full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
[517]185 elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):
[237]186 full_path = path[1:-1] + '/' +name + + '/cmt'
187 else:
188 print '# error path not found for', name
189 sys.exit(-1)
190 else:
191 print "error:",line
192 print str(result)
[517]193 sys.exit(-1)
[243]194 self.packages[result[0]]['path'] = os.path.normpath(full_path)
195 commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
[517]196 if os.path.normpath(commonprefix) == self.current_project ['path']:
[243]197 #print result[0], ' belong to project', self.current_project ['name']
198 self.packages[result[0]]['current_project'] = True
199
[517]200 def get_uses(self, content):
[242]201 # initiates variables
202 lignes = self.format_uses(content)
[243]203 if not len(lignes): return
[517]204 self.packages [self.current_package] = {'version': '*', 'use_count': 0,
205 'uses': list(), 'status': 'waiting',
[243]206 'current_project': True, 'path': os.getcwd()}
[242]207 previous_client = self.current_package
208 previous_level = 0
209 level_stack = [{'name':previous_client,'level':previous_level},]
[517]210 ligne = lignes.pop()
[573]211 current_level = string.find(ligne, 'use')
212 while True:
213 #while len(lignes)!=0:
214 #current_level = string.find(ligne, 'use')
[517]215 while current_level > previous_level:
[242]216 name = string.split (ligne)[2]
[517]217 version = string.split (ligne)[3]
[242]218 if not self.packages.has_key (name):
[517]219 self.packages [name] = {'version': version, 'use_count': 0,
220 'uses': list(), 'status': 'waiting',
[243]221 'current_project': False, 'path': None}
222 if name not in self.packages[previous_client]['uses']:# and name != previous_client:
[517]223 self.packages[previous_client]['uses'].append (name)
[242]224 level_stack.append({'name':previous_client,'level':previous_level})
[517]225 previous_client = name
226 previous_level = current_level
[242]227 if len(lignes):
228 ligne = lignes.pop()
229 #print ligne
230 current_level = string.find(ligne, 'use')
[573]231 else: return
[517]232
[242]233 # restore the level
[573]234 #if len(lignes):
235 if len(level_stack):
236 item = level_stack.pop()
237 while item['level'] >= current_level and len(level_stack):
[517]238 item = level_stack.pop()
[573]239 previous_client = item['name']
240 previous_level = item['level']
241
242 if current_level <= previous_level: return
[242]243 #print previous_client, '-->',string.split (ligne)[2]
[232]244
[242]245 def instanciate_packages(self, file=None):
246 # We create the schedule of the work units
[243]247 print '# First, we initialize the DAG by parsing "cmt show uses"'
[242]248 if file is None:
249 cmd = 'cmt show uses'
[517]250 else:
251 cmd = 'cat ' + file
252 status, output = getstatusoutput (cmd)
253 #status, output = commands.getstatusoutput (cmd)
[242]254 if status != 0:
[573]255 print "WARNING: CMT exited with non-zero status!"
256 #sys.exit(-1)
[517]257 self.get_uses(output)
[242]258 self.get_paths(output)
259 #self.simulate_execution()
260
[517]261 def get_use_count(self):
262 for key in self.packages:
263 count = 0
264 for parent in self.packages:
265 if key in self.packages[parent]['uses']: count += 1
266 self.packages[key]['use_count'] = count
267 # print "Package",key,"use_count",count
268
[243]269 def get_local_graph(self):
270 To_remove = list()
271 for key in self.packages:
272 if self.packages[key]['current_project']== False:
273 for selected in self.packages:
274 if key in self.packages[selected]['uses']:
275 self.packages[selected]['uses'].remove(key)
[517]276 To_remove.append (key)
[243]277 for item in To_remove:
[517]278 del self.packages[item]
[243]279
[242]280 def simulate_execution(self):
[517]281 while True:
282 ndone = self.simulate_requests()
283 if ndone == 0: break
[242]284
[517]285 def simulate_requests(self):
286 runnable = self.get_next_work_units()
287 if len(runnable):
288 print '\n#--------------------------------------------------------------'
289 print "# Execute parallel actions within packages - total", len(runnable)
290 print '#--------------------------------------------------------------'
291 for selected in runnable:
292 use_count = self.packages[selected]['use_count']
293 path = self.packages[selected]['path']
294 print '#--------------------------------------------------------------'
295 print '# (%d/%d %d) Now trying [] in %s' % (self.get_counter(), len(self.packages), use_count, path)
296 print '#--------------------------------------------------------------'
297 self.suppress_work_unit(selected)
298 return len(runnable)
299
300 def get_current_package(self):
[508]301 cmd = 'cmt show macro package'
[517]302 status, output = getstatusoutput (cmd)
303 #status, output = commands.getstatusoutput (cmd)
[232]304 if status != 0:
[573]305 print "WARNING: CMT exited with non-zero status!"
[232]306 print output
[517]307 sys.exit(-1)
[232]308 lines = string.split(output, '\n')
309 for line in lines:
310 if line [0] != '#':
311 start = string.find(line,"'")
312 end = string.find(line[start+1:len(line)],"'")
313 return line [start+1:start+end+1]
314
315 def print_dependencies(self):
[517]316 print '# -------------------------------------------'
317 print '# package --> dependencies, status, use_count'
318 print '# -------------------------------------------'
[232]319 for key in self.packages.keys():
[517]320 print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status'],',', self.packages[key] ['use_count']
[232]321
[241]322 def print_status(self, status):
[517]323 print '# ------------------------'
324 print '# package --> dependencies'
325 print '# ------------------------'
[241]326 i = 1
327 for key in self.packages.keys():
328 if self.packages[key] ['status'] == status:
[517]329 print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']
[241]330 i = i + 1
[517]331
[232]332 def is_work_unit_waiting (self, name):
333 return self.packages[name] ['status'] == 'waiting'
334
335 def set_work_unit_status (self, name, status):
336 self.packages[name] ['status'] = status
337
338 def get_next_work_units (self):
[517]339 # by default returned list is in arbitrary order - may be this is better
340 # if self.sort is set returned list is sorted - most used packages first
341 runnable = list()
342 for key in self.packages:
343 if len(self.packages[key]['uses']) == 0 and self.packages[key]['status'] == 'waiting':
344 use_count = self.packages[key]['use_count']
345 runnable.append((use_count,key))
346 if self.sort:
347 runnable.sort()
348 runnable.reverse()
349 result = [ pair[1] for pair in runnable ]
[232]350 return result
351
352 def suppress_work_unit (self, name):
[517]353 #print '# remove', name, 'from schedule'
354 self.semaphore.acquire()
355 self.packages[name]['status']='done'
[232]356 for key in self.packages.keys():
[242]357 if name in self.packages[key]['uses']:
358 self.packages[key]['uses'].remove(name)
[517]359 self.semaphore.release()
[241]360
[232]361 def add_work_unit (self, name, cmd):
362 if self.is_work_unit_waiting (name):
363 # we create requests
[517]364 arg = {'cmd': cmd , 'package': name}
365 req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback)
366# req = WorkRequest(self.do_execute, [arg] , None,
367# callback=self.result_callback, exc_callback=self.handle_exception)
[232]368 # then we put the work request in the queue...
369 self.set_work_unit_status (name, 'queued')
370 self.pool.putRequest(req)
[517]371 # print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
[232]372
373 def execute (self, command):
[238]374 #self.print_dependencies ()
[517]375 self.semaphore.acquire()
[232]376 packages = self.get_next_work_units()
[517]377 if len(packages):
378 print '\n#--------------------------------------------------------------'
379 print '# Execute parallel actions within packages (total',len(packages),')',packages
380 print '\n#--------------------------------------------------------------'
[238]381 for package in packages:
382 self.add_work_unit (package, command)
[517]383 sys.stdout.flush()
384 self.semaphore.release()
[232]385
386 def execute_all(self,command):
[241]387 #self.print_dependencies ()
[232]388 self.execute (command)
389 self.wait()
[517]390 if self.counter != len(self.packages):
391 print 'tbroadcast: warning: compiled',self.counter,'out of',len(self.packages),'packages'
392 self.pool.dismissWorkers(self.num_workers, do_join=True)
[241]393 #self.print_dependencies ()
[517]394 #self.print_status (status='waiting')
395
[232]396 def wait (self):
[517]397 self.pool.wait()
[232]398
399 # this will be called each time a result is available
400 def result_callback(self, request, result):
401 #print "**Result: %s from request #%s" % (str(result), request.requestID)
[239]402 #print "# Result: %s from request #%s" % (result['package'], request.requestID)
[517]403 self.execute (result['cmd'])
[232]404
[517]405 # the work the threads will have to do
[232]406 def do_execute(self, arg):
[517]407 package = arg['package']
408 path = self.packages[package]['path']
[245]409 if path == None or not os.path.exists(path):
[517]410 raise RuntimeError('Path to package '+ package +' not found')
411 self.set_work_unit_status(package, 'running')
412 header = '#--------------------------------------------------------------\n'
413 header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+arg['cmd']+'] in '+path+'\n'
[260]414 header = header + '#--------------------------------------------------------------\n'
415 print header
[517]416 sys.stdout.flush()
[261]417 project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
[508]418 log_name = string.replace(path, project_path, '')
419 log_name = string.replace(log_name, '/cmt', '')
420 log_name = string.replace(log_name, '/', '_')
421 log_name = log_name+'.loglog'
[517]422 # arg['log'] = log_name
423 cmd = "cd "+ path +";"+ arg['cmd']
424 # status, output = commands.getstatusoutput(cmd)
[244]425 # init output file
[508]426
[517]427 self.packages[package] ['startTime'] = time.time()
[508]428
[243]429 if self.output is not None:
[517]430 f1 = open (self.output+'/'+ log_name, 'w+')
[508]431 f1.write (header)
[517]432 f1.flush()
433 if self.error is not None:
434 f2 = open (self.error+'/error'+log_name, 'w+')
435 fp = Popen(cmd, shell=True, stdout=f1, stderr=f2)
436 fp.communicate()
437 status = fp.wait()
438 f2.close()
439 else:
440 fp = Popen(cmd, shell=True, stdout=f1, stderr=f1)
441 fp.communicate()
442 status = fp.wait()
[508]443 f1.close()
444 else:
[517]445 fp = Popen(cmd, shell=True)
446 fp.communicate()
447 status = fp.wait()
448 sys.stdout.flush()
449 sys.stderr.flush()
450
451 # Error is not handled - exit() is forbidden here
[393]452 if not self.keep_going and status > 0:
[517]453 print 'Error',status,'for package',package
454 # sys.exit(status)
455
456 self.packages[package] ['endTime'] = time.time()
[245]457 if self.perf:
[517]458 self.semaphore.acquire()
[248]459 f = open (self.perf, 'a')
[517]460 f.write (package+" "+str(self.packages[package]['startTime'])+" "+str(self.packages[package]['endTime'] )+'\n')
[245]461 f.close()
462 self.semaphore.release()
[517]463 self.suppress_work_unit(package)
[508]464 return {'cmd': arg['cmd'], 'package':arg['package']}
[244]465
[517]466
[232]467 # this will be called when an exception occurs within a thread
468 def handle_exception(self, request, exc_info):
[517]469 #traceback.print_stack()
470 print '#--------------------------------------------------------------'
[240]471 #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
[393]472 if exc_info[0]== exceptions.SystemExit:
[517]473 print "Stop execution (No_keep_going option enabled): exit code == %s " %(exc_info[1])
474 print '#--------------------------------------------------------------'
[393]475 sys.exit(exc_info[1])
[517]476 print "# Exception occured: %s" %(exc_info[1])
[245]477 print exc_info
[517]478 print '#--------------------------------------------------------------'
[245]479 #sys.exit(-1)
[244]480
[517]481
[306]482 def generate_make (self, file, command):
483 makefile = open (file, 'w+')
[315]484 makefile.write ('MAKE=make\n')
[312]485 #MFLAGS= -j10
[310]486 self.counter = len(self.packages)
[306]487 self.recursive_make (self.current_package, command, makefile, len(self.packages))
488 makefile.close ()
[517]489
[306]490 def recursive_make (self, package, command, makefile, indice,actions=list()):
491 lines = self.generate_action_make (package, command, indice)
492 makefile.write (lines)
[517]493 #print lines
[306]494 for pkg in self.packages[package] ['uses']:
495 if pkg not in actions:
496 actions.append(pkg)
497 indice = indice - 1
[310]498 self.counter = self.counter - 1
[517]499 self.recursive_make(pkg, command,makefile, indice, actions)
500
[306]501 def generate_action_make (self, package, command, indice):
[517]502 lines = package + ' :: '
[306]503 # add dependencies
504 for pkg in self.packages[package] ['uses']:
[517]505 lines = lines + ' ' + pkg
[306]506
507 # add the action itself
[517]508 newcommand = string.replace (command, '<package>', package)
[310]509 if command =='':
[312]510 newcommand='$(MAKE)'
[306]511 lines = lines + '\n'
512 lines = lines + '\t@echo "#--------------------------------------------------------------"\n'
[310]513 lines = lines + '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'"\n'
[306]514 lines = lines + '\t@echo "#--------------------------------------------------------------"\n'
[314]515 lines = lines + 'ifdef LOCATION\n'
[517]516 lines = lines + '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'
[315]517 lines = lines + '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n'
518 lines = lines + '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n'
[517]519 lines = lines + '\t+@cd ' + self.packages[package]['path']
[315]520 lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n'
[314]521 lines = lines + 'else\n'
[517]522 lines = lines + '\t+@cd ' + self.packages[package]['path']
[314]523 lines = lines + ' && ' + newcommand + '\n'
[517]524 lines = lines + 'endif\n\n'
[306]525 return lines
[517]526
527
528# own copy of getstatusoutput
529def getstatusoutput(cmd):
530 """Return (status, stdout) of executing cmd in a shell.
531
532 A trailing line separator is removed from the output string.
533 The exit status of the command is encoded in the format specified for wait(),
534 when the exit status is zero (termination without errors), 0 is returned.
535 """
536 import os
537 p = os.popen(cmd, 'r')
538 out = p.read()
539 sts = p.close()
540 if sts is None: sts = 0
541 if out.endswith(os.linesep):
542 out = out[:out.rindex(os.linesep)]
543 return sts, out
544
[478]545#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.