source: tbroadcast/HEAD/python/tbroadcast.py@ 664

Last change on this file since 664 was 573, checked in by rybkin, 14 years ago

See C.L. 6

  • Property svn:executable set to *
File size: 23.7 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9# 21-Jul-2009 compile most used packages first; protect critical sections
10
11import os
12import sys
13import time
14import string
15import os.path
16# import commands
17import traceback
18import exceptions
19from threading import BoundedSemaphore
20
21from threadpool import WorkRequest
22from threadpool import ThreadPool
23
24from subprocess import Popen
25
26class Scheduler:
27
28 def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, sort=False,
29 output=None, error=None, silent = False, perf=False, keep_going=True):
30 self.pool = ThreadPool(num_workers=num_workers, poll_timeout=3)
31 self.num_workers = num_workers
32 self.current_project = {'name': None, 'path': None, 'version': None}
33 self.packages = {}
34 self.counter = 0
35 self.semaphore = BoundedSemaphore(1)
36 self.local = local
37 self.sort = sort
38 self.ignore_cycles = ignore_cycles
39 self.output = output
40 self.error = error
41 self.silent = silent
42 self.perf = perf
43 self.keep_going = keep_going
44 if self.sort:
45 print "Compile packages sorted according to use count"
46 if self.perf is not False:
47 f = open (self.perf, 'w+')
48 f.close()
49 if output is not None:
50 if not os.path.exists (output):
51 print "path",output,"does not exists"
52 sys.exit(-1)
53 if not os.path.isdir(output):
54 print "path",output,"is not a valid directory"
55 sys.exit(-1)
56
57 # init cmt stuff
58 self.get_current_project()
59 self.current_package = self.get_current_package()
60 self.instanciate_packages (file)
61 if self.local: self.get_local_graph()
62 self.check_cycles()
63 self.get_use_count()
64
65 def get_current_project(self):
66 cmd = 'cmt show projects | grep current'
67 status, output = getstatusoutput (cmd)
68 #status, output = commands.getstatusoutput (cmd)
69 if status != 0:
70 print "WARNING: CMT exited with non-zero status!"
71 print output
72 sys.exit(-1)
73 lines = string.split(output, '\n')
74 for line in lines:
75 if line!='' and line [0] != '#':
76 item = string.split (line, ' ')
77 self.current_project ['name'] = item[0]
78 self.current_project ['version'] = item[1]
79 self.current_project ['path'] = item[3][:-1]
80 version = self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
81 if self.current_project ['version'] == version:
82 self.current_project ['path'] = os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
83 return
84
85 def get_counter(self):
86 self.semaphore.acquire ()
87 self.counter = self.counter + 1
88 value = self.counter
89 self.semaphore.release()
90 return value
91
92 def check_cycles (self):
93 cmd = 'cmt -private show cycles'
94 cycle_found = False
95 status, output = getstatusoutput (cmd)
96 #status, output = commands.getstatusoutput (cmd)
97 if status != 0:
98 print "WARNING: CMT exited with non-zero status!"
99 print output
100 sys.exit(-1)
101 lines = string.split(output, '\n')
102 cycles = list()
103 for line in lines:
104 if line!='' and line [0] != '#':
105 cycles.append (string.split(line))
106 cercles =list()
107 for cycle in cycles:
108 cycleInProject = True
109 for package in cycle:
110 if not self.packages.has_key(package):
111 cycleInProject = False
112 if cycleInProject:
113 cercles.append(cycle)
114 if len(cercles):
115 if not self.ignore_cycles:
116 print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
117 for cycle in cercles:
118 loop = ""
119 for package in cycle:
120 loop = loop + package + ' -> '
121 print loop + '...'
122 sys.exit(-1)
123 else:
124 print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
125 for cycle in cercles:
126 loop = ""
127 for package in cycle:
128 loop = loop + package + ' -> '
129 if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
130 print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
131 self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
132# sys.exit(-1)
133
134 def format_uses (self, content):
135 # format variables
136 lignes = string.split(content, '\n')
137 lines = list()
138 for ligne in lignes:
139 if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
140 lines.append(ligne)
141 lines.reverse()
142 return lines
143
144 def format_paths (self, content):
145 # format variables
146 lignes = string.split(content, '\n')
147 lines = list()
148 for ligne in lignes:
149 if ligne[:4] == "use ":
150 lines.append(ligne)
151 return lines
152
153 def get_paths (self, content):
154 lines = self.format_paths(content)
155 for line in lines:
156 result = string.split (line[4:len(line)], ' ')
157 if self.packages.has_key(result[0]):
158 if len(result)==4:
159 name, version, offset, path = string.split (line[4:len(line)], " ")
160 #print name, version, offset, path
161 #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
162 if path == '(no_auto_imports)':
163 path = offset
164 offset = ''
165 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
166 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
167 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
168 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
169 else:
170 print '# error path not found for', name
171 sys.exit(-1)
172 elif len(result)==5:
173 name, version, offset, path, importation = string.split (line[4:len(line)], " ")
174 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
175 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
176 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
177 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
178 else:
179 print '# error path not found for', name
180 sys.exit(-1)
181 elif len(result)==3:
182 name, version, path = string.split (line[4:len(line)], " ")
183 if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
184 full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
185 elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):
186 full_path = path[1:-1] + '/' +name + + '/cmt'
187 else:
188 print '# error path not found for', name
189 sys.exit(-1)
190 else:
191 print "error:",line
192 print str(result)
193 sys.exit(-1)
194 self.packages[result[0]]['path'] = os.path.normpath(full_path)
195 commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
196 if os.path.normpath(commonprefix) == self.current_project ['path']:
197 #print result[0], ' belong to project', self.current_project ['name']
198 self.packages[result[0]]['current_project'] = True
199
200 def get_uses(self, content):
201 # initiates variables
202 lignes = self.format_uses(content)
203 if not len(lignes): return
204 self.packages [self.current_package] = {'version': '*', 'use_count': 0,
205 'uses': list(), 'status': 'waiting',
206 'current_project': True, 'path': os.getcwd()}
207 previous_client = self.current_package
208 previous_level = 0
209 level_stack = [{'name':previous_client,'level':previous_level},]
210 ligne = lignes.pop()
211 current_level = string.find(ligne, 'use')
212 while True:
213 #while len(lignes)!=0:
214 #current_level = string.find(ligne, 'use')
215 while current_level > previous_level:
216 name = string.split (ligne)[2]
217 version = string.split (ligne)[3]
218 if not self.packages.has_key (name):
219 self.packages [name] = {'version': version, 'use_count': 0,
220 'uses': list(), 'status': 'waiting',
221 'current_project': False, 'path': None}
222 if name not in self.packages[previous_client]['uses']:# and name != previous_client:
223 self.packages[previous_client]['uses'].append (name)
224 level_stack.append({'name':previous_client,'level':previous_level})
225 previous_client = name
226 previous_level = current_level
227 if len(lignes):
228 ligne = lignes.pop()
229 #print ligne
230 current_level = string.find(ligne, 'use')
231 else: return
232
233 # restore the level
234 #if len(lignes):
235 if len(level_stack):
236 item = level_stack.pop()
237 while item['level'] >= current_level and len(level_stack):
238 item = level_stack.pop()
239 previous_client = item['name']
240 previous_level = item['level']
241
242 if current_level <= previous_level: return
243 #print previous_client, '-->',string.split (ligne)[2]
244
245 def instanciate_packages(self, file=None):
246 # We create the schedule of the work units
247 print '# First, we initialize the DAG by parsing "cmt show uses"'
248 if file is None:
249 cmd = 'cmt show uses'
250 else:
251 cmd = 'cat ' + file
252 status, output = getstatusoutput (cmd)
253 #status, output = commands.getstatusoutput (cmd)
254 if status != 0:
255 print "WARNING: CMT exited with non-zero status!"
256 #sys.exit(-1)
257 self.get_uses(output)
258 self.get_paths(output)
259 #self.simulate_execution()
260
261 def get_use_count(self):
262 for key in self.packages:
263 count = 0
264 for parent in self.packages:
265 if key in self.packages[parent]['uses']: count += 1
266 self.packages[key]['use_count'] = count
267 # print "Package",key,"use_count",count
268
269 def get_local_graph(self):
270 To_remove = list()
271 for key in self.packages:
272 if self.packages[key]['current_project']== False:
273 for selected in self.packages:
274 if key in self.packages[selected]['uses']:
275 self.packages[selected]['uses'].remove(key)
276 To_remove.append (key)
277 for item in To_remove:
278 del self.packages[item]
279
280 def simulate_execution(self):
281 while True:
282 ndone = self.simulate_requests()
283 if ndone == 0: break
284
285 def simulate_requests(self):
286 runnable = self.get_next_work_units()
287 if len(runnable):
288 print '\n#--------------------------------------------------------------'
289 print "# Execute parallel actions within packages - total", len(runnable)
290 print '#--------------------------------------------------------------'
291 for selected in runnable:
292 use_count = self.packages[selected]['use_count']
293 path = self.packages[selected]['path']
294 print '#--------------------------------------------------------------'
295 print '# (%d/%d %d) Now trying [] in %s' % (self.get_counter(), len(self.packages), use_count, path)
296 print '#--------------------------------------------------------------'
297 self.suppress_work_unit(selected)
298 return len(runnable)
299
300 def get_current_package(self):
301 cmd = 'cmt show macro package'
302 status, output = getstatusoutput (cmd)
303 #status, output = commands.getstatusoutput (cmd)
304 if status != 0:
305 print "WARNING: CMT exited with non-zero status!"
306 print output
307 sys.exit(-1)
308 lines = string.split(output, '\n')
309 for line in lines:
310 if line [0] != '#':
311 start = string.find(line,"'")
312 end = string.find(line[start+1:len(line)],"'")
313 return line [start+1:start+end+1]
314
315 def print_dependencies(self):
316 print '# -------------------------------------------'
317 print '# package --> dependencies, status, use_count'
318 print '# -------------------------------------------'
319 for key in self.packages.keys():
320 print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status'],',', self.packages[key] ['use_count']
321
322 def print_status(self, status):
323 print '# ------------------------'
324 print '# package --> dependencies'
325 print '# ------------------------'
326 i = 1
327 for key in self.packages.keys():
328 if self.packages[key] ['status'] == status:
329 print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']
330 i = i + 1
331
332 def is_work_unit_waiting (self, name):
333 return self.packages[name] ['status'] == 'waiting'
334
335 def set_work_unit_status (self, name, status):
336 self.packages[name] ['status'] = status
337
338 def get_next_work_units (self):
339 # by default returned list is in arbitrary order - may be this is better
340 # if self.sort is set returned list is sorted - most used packages first
341 runnable = list()
342 for key in self.packages:
343 if len(self.packages[key]['uses']) == 0 and self.packages[key]['status'] == 'waiting':
344 use_count = self.packages[key]['use_count']
345 runnable.append((use_count,key))
346 if self.sort:
347 runnable.sort()
348 runnable.reverse()
349 result = [ pair[1] for pair in runnable ]
350 return result
351
352 def suppress_work_unit (self, name):
353 #print '# remove', name, 'from schedule'
354 self.semaphore.acquire()
355 self.packages[name]['status']='done'
356 for key in self.packages.keys():
357 if name in self.packages[key]['uses']:
358 self.packages[key]['uses'].remove(name)
359 self.semaphore.release()
360
361 def add_work_unit (self, name, cmd):
362 if self.is_work_unit_waiting (name):
363 # we create requests
364 arg = {'cmd': cmd , 'package': name}
365 req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback)
366# req = WorkRequest(self.do_execute, [arg] , None,
367# callback=self.result_callback, exc_callback=self.handle_exception)
368 # then we put the work request in the queue...
369 self.set_work_unit_status (name, 'queued')
370 self.pool.putRequest(req)
371 # print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
372
373 def execute (self, command):
374 #self.print_dependencies ()
375 self.semaphore.acquire()
376 packages = self.get_next_work_units()
377 if len(packages):
378 print '\n#--------------------------------------------------------------'
379 print '# Execute parallel actions within packages (total',len(packages),')',packages
380 print '\n#--------------------------------------------------------------'
381 for package in packages:
382 self.add_work_unit (package, command)
383 sys.stdout.flush()
384 self.semaphore.release()
385
386 def execute_all(self,command):
387 #self.print_dependencies ()
388 self.execute (command)
389 self.wait()
390 if self.counter != len(self.packages):
391 print 'tbroadcast: warning: compiled',self.counter,'out of',len(self.packages),'packages'
392 self.pool.dismissWorkers(self.num_workers, do_join=True)
393 #self.print_dependencies ()
394 #self.print_status (status='waiting')
395
396 def wait (self):
397 self.pool.wait()
398
399 # this will be called each time a result is available
400 def result_callback(self, request, result):
401 #print "**Result: %s from request #%s" % (str(result), request.requestID)
402 #print "# Result: %s from request #%s" % (result['package'], request.requestID)
403 self.execute (result['cmd'])
404
405 # the work the threads will have to do
406 def do_execute(self, arg):
407 package = arg['package']
408 path = self.packages[package]['path']
409 if path == None or not os.path.exists(path):
410 raise RuntimeError('Path to package '+ package +' not found')
411 self.set_work_unit_status(package, 'running')
412 header = '#--------------------------------------------------------------\n'
413 header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+arg['cmd']+'] in '+path+'\n'
414 header = header + '#--------------------------------------------------------------\n'
415 print header
416 sys.stdout.flush()
417 project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
418 log_name = string.replace(path, project_path, '')
419 log_name = string.replace(log_name, '/cmt', '')
420 log_name = string.replace(log_name, '/', '_')
421 log_name = log_name+'.loglog'
422 # arg['log'] = log_name
423 cmd = "cd "+ path +";"+ arg['cmd']
424 # status, output = commands.getstatusoutput(cmd)
425 # init output file
426
427 self.packages[package] ['startTime'] = time.time()
428
429 if self.output is not None:
430 f1 = open (self.output+'/'+ log_name, 'w+')
431 f1.write (header)
432 f1.flush()
433 if self.error is not None:
434 f2 = open (self.error+'/error'+log_name, 'w+')
435 fp = Popen(cmd, shell=True, stdout=f1, stderr=f2)
436 fp.communicate()
437 status = fp.wait()
438 f2.close()
439 else:
440 fp = Popen(cmd, shell=True, stdout=f1, stderr=f1)
441 fp.communicate()
442 status = fp.wait()
443 f1.close()
444 else:
445 fp = Popen(cmd, shell=True)
446 fp.communicate()
447 status = fp.wait()
448 sys.stdout.flush()
449 sys.stderr.flush()
450
451 # Error is not handled - exit() is forbidden here
452 if not self.keep_going and status > 0:
453 print 'Error',status,'for package',package
454 # sys.exit(status)
455
456 self.packages[package] ['endTime'] = time.time()
457 if self.perf:
458 self.semaphore.acquire()
459 f = open (self.perf, 'a')
460 f.write (package+" "+str(self.packages[package]['startTime'])+" "+str(self.packages[package]['endTime'] )+'\n')
461 f.close()
462 self.semaphore.release()
463 self.suppress_work_unit(package)
464 return {'cmd': arg['cmd'], 'package':arg['package']}
465
466
467 # this will be called when an exception occurs within a thread
468 def handle_exception(self, request, exc_info):
469 #traceback.print_stack()
470 print '#--------------------------------------------------------------'
471 #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
472 if exc_info[0]== exceptions.SystemExit:
473 print "Stop execution (No_keep_going option enabled): exit code == %s " %(exc_info[1])
474 print '#--------------------------------------------------------------'
475 sys.exit(exc_info[1])
476 print "# Exception occured: %s" %(exc_info[1])
477 print exc_info
478 print '#--------------------------------------------------------------'
479 #sys.exit(-1)
480
481
482 def generate_make (self, file, command):
483 makefile = open (file, 'w+')
484 makefile.write ('MAKE=make\n')
485 #MFLAGS= -j10
486 self.counter = len(self.packages)
487 self.recursive_make (self.current_package, command, makefile, len(self.packages))
488 makefile.close ()
489
490 def recursive_make (self, package, command, makefile, indice,actions=list()):
491 lines = self.generate_action_make (package, command, indice)
492 makefile.write (lines)
493 #print lines
494 for pkg in self.packages[package] ['uses']:
495 if pkg not in actions:
496 actions.append(pkg)
497 indice = indice - 1
498 self.counter = self.counter - 1
499 self.recursive_make(pkg, command,makefile, indice, actions)
500
501 def generate_action_make (self, package, command, indice):
502 lines = package + ' :: '
503 # add dependencies
504 for pkg in self.packages[package] ['uses']:
505 lines = lines + ' ' + pkg
506
507 # add the action itself
508 newcommand = string.replace (command, '<package>', package)
509 if command =='':
510 newcommand='$(MAKE)'
511 lines = lines + '\n'
512 lines = lines + '\t@echo "#--------------------------------------------------------------"\n'
513 lines = lines + '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'"\n'
514 lines = lines + '\t@echo "#--------------------------------------------------------------"\n'
515 lines = lines + 'ifdef LOCATION\n'
516 lines = lines + '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'
517 lines = lines + '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n'
518 lines = lines + '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n'
519 lines = lines + '\t+@cd ' + self.packages[package]['path']
520 lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n'
521 lines = lines + 'else\n'
522 lines = lines + '\t+@cd ' + self.packages[package]['path']
523 lines = lines + ' && ' + newcommand + '\n'
524 lines = lines + 'endif\n\n'
525 return lines
526
527
528# own copy of getstatusoutput
529def getstatusoutput(cmd):
530 """Return (status, stdout) of executing cmd in a shell.
531
532 A trailing line separator is removed from the output string.
533 The exit status of the command is encoded in the format specified for wait(),
534 when the exit status is zero (termination without errors), 0 is returned.
535 """
536 import os
537 p = os.popen(cmd, 'r')
538 out = p.read()
539 sts = p.close()
540 if sts is None: sts = 0
541 if out.endswith(os.linesep):
542 out = out[:out.rindex(os.linesep)]
543 return sts, out
544
545#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.