source: tbroadcast/v2.0.7/python/tbroadcast.py@ 577

Last change on this file since 577 was 517, checked in by rybkin, 16 years ago

Version v2.0.6_rc4 from Igor Kachaev

  • Property svn:executable set to *
File size: 23.4 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9# 21-Jul-2009 compile most used packages first; protect critical sections
10
11import os
12import sys
13import time
14import string
15import os.path
16# import commands
17import traceback
18import exceptions
19from threading import BoundedSemaphore
20
21from threadpool import WorkRequest
22from threadpool import ThreadPool
23
24from subprocess import Popen
25
26class Scheduler:
27
28 def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, sort=False,
29 output=None, error=None, silent = False, perf=False, keep_going=True):
30 self.pool = ThreadPool(num_workers=num_workers, poll_timeout=3)
31 self.num_workers = num_workers
32 self.current_project = {'name': None, 'path': None, 'version': None}
33 self.packages = {}
34 self.counter = 0
35 self.semaphore = BoundedSemaphore(1)
36 self.local = local
37 self.sort = sort
38 self.ignore_cycles = ignore_cycles
39 self.output = output
40 self.error = error
41 self.silent = silent
42 self.perf = perf
43 self.keep_going = keep_going
44 if self.sort:
45 print "Compile packages sorted according to use count"
46 if self.perf is not False:
47 f = open (self.perf, 'w+')
48 f.close()
49 if output is not None:
50 if not os.path.exists (output):
51 print "path",output,"does not exists"
52 sys.exit(-1)
53 if not os.path.isdir(output):
54 print "path",output,"is not a valid directory"
55 sys.exit(-1)
56
57 # init cmt stuff
58 self.get_current_project()
59 self.current_package = self.get_current_package()
60 self.instanciate_packages (file)
61 if self.local: self.get_local_graph()
62 self.check_cycles()
63 self.get_use_count()
64
65 def get_current_project(self):
66 cmd = 'cmt show projects | grep current'
67 status, output = getstatusoutput (cmd)
68 #status, output = commands.getstatusoutput (cmd)
69 if status != 0:
70 print output
71 sys.exit(-1)
72 lines = string.split(output, '\n')
73 for line in lines:
74 if line!='' and line [0] != '#':
75 item = string.split (line, ' ')
76 self.current_project ['name'] = item[0]
77 self.current_project ['version'] = item[1]
78 self.current_project ['path'] = item[3][:-1]
79 version = self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
80 if self.current_project ['version'] == version:
81 self.current_project ['path'] = os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
82 return
83
84 def get_counter(self):
85 self.semaphore.acquire ()
86 self.counter = self.counter + 1
87 value = self.counter
88 self.semaphore.release()
89 return value
90
91 def check_cycles (self):
92 cmd = 'cmt -private show cycles'
93 cycle_found = False
94 status, output = getstatusoutput (cmd)
95 #status, output = commands.getstatusoutput (cmd)
96 if status != 0:
97 print output
98 sys.exit(-1)
99 lines = string.split(output, '\n')
100 cycles = list()
101 for line in lines:
102 if line!='' and line [0] != '#':
103 cycles.append (string.split(line))
104 cercles =list()
105 for cycle in cycles:
106 cycleInProject = True
107 for package in cycle:
108 if not self.packages.has_key(package):
109 cycleInProject = False
110 if cycleInProject:
111 cercles.append(cycle)
112 if len(cercles):
113 if not self.ignore_cycles:
114 print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
115 for cycle in cercles:
116 loop = ""
117 for package in cycle:
118 loop = loop + package + ' -> '
119 print loop + '...'
120 sys.exit(-1)
121 else:
122 print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
123 for cycle in cercles:
124 loop = ""
125 for package in cycle:
126 loop = loop + package + ' -> '
127 if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
128 print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
129 self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
130# sys.exit(-1)
131
132 def format_uses (self, content):
133 # format variables
134 lignes = string.split(content, '\n')
135 lines = list()
136 for ligne in lignes:
137 if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
138 lines.append(ligne)
139 lines.reverse()
140 return lines
141
142 def format_paths (self, content):
143 # format variables
144 lignes = string.split(content, '\n')
145 lines = list()
146 for ligne in lignes:
147 if ligne[:4] == "use ":
148 lines.append(ligne)
149 return lines
150
151 def get_paths (self, content):
152 lines = self.format_paths(content)
153 for line in lines:
154 result = string.split (line[4:len(line)], ' ')
155 if self.packages.has_key(result[0]):
156 if len(result)==4:
157 name, version, offset, path = string.split (line[4:len(line)], " ")
158 #print name, version, offset, path
159 #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
160 if path == '(no_auto_imports)':
161 path = offset
162 offset = ''
163 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
164 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
165 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
166 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
167 else:
168 print '# error path not found for', name
169 sys.exit(-1)
170 elif len(result)==5:
171 name, version, offset, path, importation = string.split (line[4:len(line)], " ")
172 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
173 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
174 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
175 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
176 else:
177 print '# error path not found for', name
178 sys.exit(-1)
179 elif len(result)==3:
180 name, version, path = string.split (line[4:len(line)], " ")
181 if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
182 full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
183 elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):
184 full_path = path[1:-1] + '/' +name + + '/cmt'
185 else:
186 print '# error path not found for', name
187 sys.exit(-1)
188 else:
189 print "error:",line
190 print str(result)
191 sys.exit(-1)
192 self.packages[result[0]]['path'] = os.path.normpath(full_path)
193 commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
194 if os.path.normpath(commonprefix) == self.current_project ['path']:
195 #print result[0], ' belong to project', self.current_project ['name']
196 self.packages[result[0]]['current_project'] = True
197
198 def get_uses(self, content):
199 # initiates variables
200 lignes = self.format_uses(content)
201 if not len(lignes): return
202 self.packages [self.current_package] = {'version': '*', 'use_count': 0,
203 'uses': list(), 'status': 'waiting',
204 'current_project': True, 'path': os.getcwd()}
205 previous_client = self.current_package
206 previous_level = 0
207 level_stack = [{'name':previous_client,'level':previous_level},]
208 ligne = lignes.pop()
209 while len(lignes)!=0:
210 current_level = string.find(ligne, 'use')
211 while current_level > previous_level:
212 name = string.split (ligne)[2]
213 version = string.split (ligne)[3]
214 if not self.packages.has_key (name):
215 self.packages [name] = {'version': version, 'use_count': 0,
216 'uses': list(), 'status': 'waiting',
217 'current_project': False, 'path': None}
218 if name not in self.packages[previous_client]['uses']:# and name != previous_client:
219 self.packages[previous_client]['uses'].append (name)
220 level_stack.append({'name':previous_client,'level':previous_level})
221 previous_client = name
222 previous_level = current_level
223 if len(lignes):
224 ligne = lignes.pop()
225 #print ligne
226 current_level = string.find(ligne, 'use')
227
228 # restore the level
229 if len(lignes):
230 if len(level_stack):
231 item = level_stack.pop()
232 while item['level'] >= current_level and len(level_stack):
233 item = level_stack.pop()
234 previous_client = item['name']
235 previous_level = item['level']
236 #print previous_client, '-->',string.split (ligne)[2]
237
238 def instanciate_packages(self, file=None):
239 # We create the schedule of the work units
240 print '# First, we initialize the DAG by parsing "cmt show uses"'
241 if file is None:
242 cmd = 'cmt show uses'
243 else:
244 cmd = 'cat ' + file
245 status, output = getstatusoutput (cmd)
246 #status, output = commands.getstatusoutput (cmd)
247 if status != 0:
248 print output
249 sys.exit(-1)
250 self.get_uses(output)
251 self.get_paths(output)
252 #self.simulate_execution()
253
254 def get_use_count(self):
255 for key in self.packages:
256 count = 0
257 for parent in self.packages:
258 if key in self.packages[parent]['uses']: count += 1
259 self.packages[key]['use_count'] = count
260 # print "Package",key,"use_count",count
261
262 def get_local_graph(self):
263 To_remove = list()
264 for key in self.packages:
265 if self.packages[key]['current_project']== False:
266 for selected in self.packages:
267 if key in self.packages[selected]['uses']:
268 self.packages[selected]['uses'].remove(key)
269 To_remove.append (key)
270 for item in To_remove:
271 del self.packages[item]
272
273 def simulate_execution(self):
274 while True:
275 ndone = self.simulate_requests()
276 if ndone == 0: break
277
278 def simulate_requests(self):
279 runnable = self.get_next_work_units()
280 if len(runnable):
281 print '\n#--------------------------------------------------------------'
282 print "# Execute parallel actions within packages - total", len(runnable)
283 print '#--------------------------------------------------------------'
284 for selected in runnable:
285 use_count = self.packages[selected]['use_count']
286 path = self.packages[selected]['path']
287 print '#--------------------------------------------------------------'
288 print '# (%d/%d %d) Now trying [] in %s' % (self.get_counter(), len(self.packages), use_count, path)
289 print '#--------------------------------------------------------------'
290 self.suppress_work_unit(selected)
291 return len(runnable)
292
293 def get_current_package(self):
294 cmd = 'cmt show macro package'
295 status, output = getstatusoutput (cmd)
296 #status, output = commands.getstatusoutput (cmd)
297 if status != 0:
298 print output
299 sys.exit(-1)
300 lines = string.split(output, '\n')
301 for line in lines:
302 if line [0] != '#':
303 start = string.find(line,"'")
304 end = string.find(line[start+1:len(line)],"'")
305 return line [start+1:start+end+1]
306
307 def print_dependencies(self):
308 print '# -------------------------------------------'
309 print '# package --> dependencies, status, use_count'
310 print '# -------------------------------------------'
311 for key in self.packages.keys():
312 print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status'],',', self.packages[key] ['use_count']
313
314 def print_status(self, status):
315 print '# ------------------------'
316 print '# package --> dependencies'
317 print '# ------------------------'
318 i = 1
319 for key in self.packages.keys():
320 if self.packages[key] ['status'] == status:
321 print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']
322 i = i + 1
323
324 def is_work_unit_waiting (self, name):
325 return self.packages[name] ['status'] == 'waiting'
326
327 def set_work_unit_status (self, name, status):
328 self.packages[name] ['status'] = status
329
330 def get_next_work_units (self):
331 # by default returned list is in arbitrary order - may be this is better
332 # if self.sort is set returned list is sorted - most used packages first
333 runnable = list()
334 for key in self.packages:
335 if len(self.packages[key]['uses']) == 0 and self.packages[key]['status'] == 'waiting':
336 use_count = self.packages[key]['use_count']
337 runnable.append((use_count,key))
338 if self.sort:
339 runnable.sort()
340 runnable.reverse()
341 result = [ pair[1] for pair in runnable ]
342 return result
343
344 def suppress_work_unit (self, name):
345 #print '# remove', name, 'from schedule'
346 self.semaphore.acquire()
347 self.packages[name]['status']='done'
348 for key in self.packages.keys():
349 if name in self.packages[key]['uses']:
350 self.packages[key]['uses'].remove(name)
351 self.semaphore.release()
352
353 def add_work_unit (self, name, cmd):
354 if self.is_work_unit_waiting (name):
355 # we create requests
356 arg = {'cmd': cmd , 'package': name}
357 req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback)
358# req = WorkRequest(self.do_execute, [arg] , None,
359# callback=self.result_callback, exc_callback=self.handle_exception)
360 # then we put the work request in the queue...
361 self.set_work_unit_status (name, 'queued')
362 self.pool.putRequest(req)
363 # print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
364
365 def execute (self, command):
366 #self.print_dependencies ()
367 self.semaphore.acquire()
368 packages = self.get_next_work_units()
369 if len(packages):
370 print '\n#--------------------------------------------------------------'
371 print '# Execute parallel actions within packages (total',len(packages),')',packages
372 print '\n#--------------------------------------------------------------'
373 for package in packages:
374 self.add_work_unit (package, command)
375 sys.stdout.flush()
376 self.semaphore.release()
377
378 def execute_all(self,command):
379 #self.print_dependencies ()
380 self.execute (command)
381 self.wait()
382 if self.counter != len(self.packages):
383 print 'tbroadcast: warning: compiled',self.counter,'out of',len(self.packages),'packages'
384 self.pool.dismissWorkers(self.num_workers, do_join=True)
385 #self.print_dependencies ()
386 #self.print_status (status='waiting')
387
388 def wait (self):
389 self.pool.wait()
390
391 # this will be called each time a result is available
392 def result_callback(self, request, result):
393 #print "**Result: %s from request #%s" % (str(result), request.requestID)
394 #print "# Result: %s from request #%s" % (result['package'], request.requestID)
395 self.execute (result['cmd'])
396
397 # the work the threads will have to do
398 def do_execute(self, arg):
399 package = arg['package']
400 path = self.packages[package]['path']
401 if path == None or not os.path.exists(path):
402 raise RuntimeError('Path to package '+ package +' not found')
403 self.set_work_unit_status(package, 'running')
404 header = '#--------------------------------------------------------------\n'
405 header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+arg['cmd']+'] in '+path+'\n'
406 header = header + '#--------------------------------------------------------------\n'
407 print header
408 sys.stdout.flush()
409 project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
410 log_name = string.replace(path, project_path, '')
411 log_name = string.replace(log_name, '/cmt', '')
412 log_name = string.replace(log_name, '/', '_')
413 log_name = log_name+'.loglog'
414 # arg['log'] = log_name
415 cmd = "cd "+ path +";"+ arg['cmd']
416 # status, output = commands.getstatusoutput(cmd)
417 # init output file
418
419 self.packages[package] ['startTime'] = time.time()
420
421 if self.output is not None:
422 f1 = open (self.output+'/'+ log_name, 'w+')
423 f1.write (header)
424 f1.flush()
425 if self.error is not None:
426 f2 = open (self.error+'/error'+log_name, 'w+')
427 fp = Popen(cmd, shell=True, stdout=f1, stderr=f2)
428 fp.communicate()
429 status = fp.wait()
430 f2.close()
431 else:
432 fp = Popen(cmd, shell=True, stdout=f1, stderr=f1)
433 fp.communicate()
434 status = fp.wait()
435 f1.close()
436 else:
437 fp = Popen(cmd, shell=True)
438 fp.communicate()
439 status = fp.wait()
440 sys.stdout.flush()
441 sys.stderr.flush()
442
443 # Error is not handled - exit() is forbidden here
444 if not self.keep_going and status > 0:
445 print 'Error',status,'for package',package
446 # sys.exit(status)
447
448 self.packages[package] ['endTime'] = time.time()
449 if self.perf:
450 self.semaphore.acquire()
451 f = open (self.perf, 'a')
452 f.write (package+" "+str(self.packages[package]['startTime'])+" "+str(self.packages[package]['endTime'] )+'\n')
453 f.close()
454 self.semaphore.release()
455 self.suppress_work_unit(package)
456 return {'cmd': arg['cmd'], 'package':arg['package']}
457
458
459 # this will be called when an exception occurs within a thread
460 def handle_exception(self, request, exc_info):
461 #traceback.print_stack()
462 print '#--------------------------------------------------------------'
463 #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
464 if exc_info[0]== exceptions.SystemExit:
465 print "Stop execution (No_keep_going option enabled): exit code == %s " %(exc_info[1])
466 print '#--------------------------------------------------------------'
467 sys.exit(exc_info[1])
468 print "# Exception occured: %s" %(exc_info[1])
469 print exc_info
470 print '#--------------------------------------------------------------'
471 #sys.exit(-1)
472
473
474 def generate_make (self, file, command):
475 makefile = open (file, 'w+')
476 makefile.write ('MAKE=make\n')
477 #MFLAGS= -j10
478 self.counter = len(self.packages)
479 self.recursive_make (self.current_package, command, makefile, len(self.packages))
480 makefile.close ()
481
482 def recursive_make (self, package, command, makefile, indice,actions=list()):
483 lines = self.generate_action_make (package, command, indice)
484 makefile.write (lines)
485 #print lines
486 for pkg in self.packages[package] ['uses']:
487 if pkg not in actions:
488 actions.append(pkg)
489 indice = indice - 1
490 self.counter = self.counter - 1
491 self.recursive_make(pkg, command,makefile, indice, actions)
492
493 def generate_action_make (self, package, command, indice):
494 lines = package + ' :: '
495 # add dependencies
496 for pkg in self.packages[package] ['uses']:
497 lines = lines + ' ' + pkg
498
499 # add the action itself
500 newcommand = string.replace (command, '<package>', package)
501 if command =='':
502 newcommand='$(MAKE)'
503 lines = lines + '\n'
504 lines = lines + '\t@echo "#--------------------------------------------------------------"\n'
505 lines = lines + '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'"\n'
506 lines = lines + '\t@echo "#--------------------------------------------------------------"\n'
507 lines = lines + 'ifdef LOCATION\n'
508 lines = lines + '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'
509 lines = lines + '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n'
510 lines = lines + '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n'
511 lines = lines + '\t+@cd ' + self.packages[package]['path']
512 lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n'
513 lines = lines + 'else\n'
514 lines = lines + '\t+@cd ' + self.packages[package]['path']
515 lines = lines + ' && ' + newcommand + '\n'
516 lines = lines + 'endif\n\n'
517 return lines
518
519
520# own copy of getstatusoutput
521def getstatusoutput(cmd):
522 """Return (status, stdout) of executing cmd in a shell.
523
524 A trailing line separator is removed from the output string.
525 The exit status of the command is encoded in the format specified for wait(),
526 when the exit status is zero (termination without errors), 0 is returned.
527 """
528 import os
529 p = os.popen(cmd, 'r')
530 out = p.read()
531 sts = p.close()
532 if sts is None: sts = 0
533 if out.endswith(os.linesep):
534 out = out[:out.rindex(os.linesep)]
535 return sts, out
536
537#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.