source: tbroadcast/v2.0.3/python/tbroadcast.py@ 577

Last change on this file since 577 was 316, checked in by garonne, 19 years ago

fixed a bug

  • Property svn:executable set to *
File size: 24.8 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9import os
10import sys
11import time
12import string
13import random
14import os.path
15import commands
16import traceback
17from threading import BoundedSemaphore
18
19from threadpool import WorkRequest
20from threadpool import ThreadPool
21from threadpool import NoResultsPending
22from threadpool import NoWorkersAvailable
23from threadpool import makeRequests
24from executer import exeCommand
25
26class Scheduler:
27
28 def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None, error=None, silent = False, perf=False):
29 self.pool = ThreadPool(num_workers=num_workers)
30 self.current_package = self.get_current_package()
31 self.current_project = {'name': None, 'path': None, 'version': None}
32 self.packages = {}
33 self.counter = 0
34 self.semaphore = BoundedSemaphore(1)
35 self.local = local
36 self.ignore_cycles = ignore_cycles
37 self.output = output
38 self.error = error
39 self.silent = silent
40 self.perf = perf
41 if self.perf is not False:
42 f = open (self.perf, 'w+')
43 f.close()
44 if output is not None:
45 if not os.path.exists (output):
46 print "path",output,"no exits"
47 sys.exit(-1)
48 if not os.path.isdir(output):
49 print "path",output,"no a valid directory"
50 sys.exit(-1)
51
52 self.get_current_project()
53 self.instanciate_packages (file)
54 if self.local: self.get_local_graph()
55 self.check_cycles()
56
57# status, output = commands.getstatusoutput("cmt broadcast -local 'echo <package>'")
58# lignes = string.split(output, '\n')
59# i = 1
60# for package in lignes:
61# if package!='' and package[0] != '#':
62# print i , package
63# i =i +1
64# if not self.packages.has_key(package):
65# print package
66# print len(self.packages)
67# sys.exit(-1)
68
69 def get_current_project(self):
70 cmd = 'cmt show projects | grep current'
71 status, output = commands.getstatusoutput (cmd)
72 if status != 0:
73 print output
74 sys.exit(-1)
75 lines = string.split(output, '\n')
76 for line in lines:
77 if line!='' and line [0] != '#':
78 item = string.split (line, ' ')
79 self.current_project ['name'] = item[0]
80 self.current_project ['version'] = item[1]
81 self.current_project ['path'] = item[3][:-1]
82 version = self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
83 if self.current_project ['version'] == version:
84 self.current_project ['path'] = os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
85 return
86 #print self.current_project
87
88 def get_counter(self):
89 self.semaphore.acquire ()
90 self.counter = self.counter + 1
91 value = self.counter
92 self.semaphore.release()
93 return value
94
95 def check_cycles (self):
96 cmd = 'cmt -private show cycles'
97 cycle_found = False
98 status, output = commands.getstatusoutput (cmd)
99 if status != 0:
100 print output
101 sys.exit(-1)
102 lines = string.split(output, '\n')
103 cycles = list ()
104 for line in lines:
105 if line!='' and line [0] != '#':
106 cycles.append (string.split(line))
107 cercles =list()
108 for cycle in cycles:
109 cycleInProject = True
110 for package in cycle:
111 if not self.packages.has_key(package):
112 cycleInProject = False
113 if cycleInProject:
114 cercles.append(cycle)
115 if len(cercles):
116 if not self.ignore_cycles:
117 print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
118 for cycle in cercles:
119 loop = ""
120 for package in cycle:
121 loop = loop + package + ' -> '
122 print loop + '...'
123 sys.exit(-1)
124 else:
125 print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
126 for cycle in cercles:
127 loop = ""
128 for package in cycle:
129 loop = loop + package + ' -> '
130 if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
131 print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
132 self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
133# sys.exit(-1)
134
135 def format_uses (self, content):
136 # format variables
137 lignes = string.split(content, '\n')
138 lines = list()
139 for ligne in lignes:
140 if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
141 lines.append(ligne)
142 lines.reverse()
143 return lines
144
145 def format_paths (self, content):
146 # format variables
147 lignes = string.split(content, '\n')
148 lines = list()
149 for ligne in lignes:
150 if ligne[:4] == "use ":
151 lines.append(ligne)
152 return lines
153
154 def get_paths (self, content):
155 lines = self.format_paths(content)
156 for line in lines:
157 result = string.split (line[4:len(line)], ' ')
158 if self.packages.has_key(result[0]):
159 if len(result)==4:
160 name, version, offset, path = string.split (line[4:len(line)], " ")
161 #print name, version, offset, path
162 #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
163 if path == '(no_auto_imports)':
164 path = offset
165 offset = ''
166 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
167 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
168 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
169 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
170 else:
171 print '# error path not found for', name
172 sys.exit(-1)
173 elif len(result)==5:
174 name, version, offset, path, importation = string.split (line[4:len(line)], " ")
175 if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
176 full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
177 elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):
178 full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
179 else:
180 print '# error path not found for', name
181 sys.exit(-1)
182 elif len(result)==3:
183 name, version, path = string.split (line[4:len(line)], " ")
184 if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
185 full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
186 elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):
187 full_path = path[1:-1] + '/' +name + + '/cmt'
188 else:
189 print '# error path not found for', name
190 sys.exit(-1)
191 else:
192 print "error:",line
193 print str(result)
194 sys.exit(-1)
195 self.packages[result[0]]['path'] = os.path.normpath(full_path)
196 commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
197 if os.path.normpath(commonprefix) == self.current_project ['path']:
198 #print result[0], ' belong to project', self.current_project ['name']
199 self.packages[result[0]]['current_project'] = True
200
201 def get_uses(self, content):
202 # initiates variables
203 lignes = self.format_uses(content)
204 if not len(lignes): return
205 self.packages [self.current_package] = {'version': '*', 'client': list(),
206 'uses': list(), 'status': 'waiting',
207 'current_project': True, 'path': os.getcwd()}
208 previous_client = self.current_package
209 previous_level = 0
210 level_stack = [{'name':previous_client,'level':previous_level},]
211 ligne = lignes.pop()
212 while len(lignes)!=0:
213 current_level = string.find(ligne, 'use')
214 while current_level > previous_level:
215 name = string.split (ligne)[2]
216 version = string.split (ligne)[3]
217 if not self.packages.has_key (name):
218 self.packages [name] = {'version': version, 'uses': list(),
219 'client': list(), 'status': 'waiting',
220 'current_project': False, 'path': None}
221 if name not in self.packages[previous_client]['uses']:# and name != previous_client:
222 self.packages[previous_client]['uses'].append (name)
223 level_stack.append({'name':previous_client,'level':previous_level})
224 previous_client = name
225 previous_level = current_level
226 if len(lignes):
227 ligne = lignes.pop()
228 #print ligne
229 current_level = string.find(ligne, 'use')
230
231 #self.packages [previous_client]['status'] ='queued'
232 # restore the level
233 if len(lignes):
234 if len(level_stack):
235 item = level_stack.pop()
236 while item['level'] >= current_level and len(level_stack):
237 item = level_stack.pop()
238 previous_client = item['name']
239 previous_level = item['level']
240 #print previous_client, '-->',string.split (ligne)[2]
241
242 def instanciate_packages(self, file=None):
243 # We create the schedule of the work units
244 print '# First, we initialize the DAG by parsing "cmt show uses"'
245 if file is None:
246 cmd = 'cmt show uses'
247 else:
248 cmd = 'cat ' + file
249 status, output = commands.getstatusoutput (cmd)
250 if status != 0:
251 print output
252 sys.exit(-1)
253 self.get_uses(output)
254 self.get_paths(output)
255 #self.check_execution (package=self.current_package)
256 #self.simulate_execution()
257
258 def get_local_graph(self):
259 To_remove = list()
260 for key in self.packages:
261 if self.packages[key]['current_project']== False:
262 for selected in self.packages:
263 if key in self.packages[selected]['uses']:
264 self.packages[selected]['uses'].remove(key)
265 To_remove.append (key)
266 for item in To_remove:
267 del self.packages[item]
268
269 def simulate_execution(self):
270 ok = True
271 indice = 1
272 while ok:
273 runnable = list()
274 for key in self.packages:
275 if self.packages[key]['status']!='done':
276 if len(self.packages[key]['uses']) == 0:
277 runnable.append(key)
278 if len(runnable):
279 print '\n#--------------------------------------------------------------'
280 print "# Execute parallel actions within packages " + str(runnable)
281 for selected in runnable:
282 print '#--------------------------------------------------------------'
283 print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path']
284 print '#--------------------------------------------------------------'
285 self.packages[selected]['status']='done'
286 indice = indice + 1
287 for key in self.packages:
288 if selected in self.packages[key]['uses']:
289 self.packages[key]['uses'].remove(selected)
290 #print 'remove', selected, 'from',key
291 if len(runnable)==0:
292 ok = False
293
294 def check_execution(self, package, path=list(), cycles=list()):
295 #print package,'-->',self.packages[package]['uses']
296 #print path
297 if package in path:
298 if path[path.index(package):] not in cycles:
299 print 'Cycles:',path[path.index(package):], package
300 cycles = cycles + path[path.index(package):]
301 sys.exit(-1)
302 path.append(package)
303 for item in self.packages[package]['uses']:
304 self.check_execution(package=item, path=path, cycles=cycles)
305 path.pop()
306
307 def get_current_package(self):
308 cmd = 'cmt show macro package'
309 status, output = commands.getstatusoutput (cmd)
310 if status != 0:
311 print output
312 sys.exit(-1)
313 lines = string.split(output, '\n')
314 for line in lines:
315 if line [0] != '#':
316 start = string.find(line,"'")
317 end = string.find(line[start+1:len(line)],"'")
318 return line [start+1:start+end+1]
319
320 def get_work_area_path (self, name):
321 return self.packages [name]['path']
322
323 def get_package_path (self, name):
324 #return os.getcwd ()
325 cmd = 'cmt -use='+name+' run pwd'
326 status, output = commands.getstatusoutput (cmd)
327 if status != 0:
328 print output
329 sys.exit(-1)
330 lines = string.split(output, '\n')
331 for line in lines:
332 if line [0] != '#' and line[:5] != "#CMT>":
333 print line
334 return line
335
336 def print_dependencies(self):
337 print '# ------------------------'
338 print '# package --> dependencies'
339 print '# ------------------------'
340 for key in self.packages.keys():
341 print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']
342
343 def print_status(self, status):
344 print '# ------------------------'
345 print '# package --> dependencies'
346 print '# ------------------------'
347 i = 1
348 for key in self.packages.keys():
349 if self.packages[key] ['status'] == status:
350 print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']
351 i = i + 1
352
353 def is_work_unit_waiting (self, name):
354 return self.packages[name] ['status'] == 'waiting'
355
356 def set_work_unit_status (self, name, status):
357 self.packages[name] ['status'] = status
358
359 def get_dependencies (self, name):
360 return self.packages[name] ['uses']
361
362 def get_next_work_units (self):
363 result = list ()
364 for key in self.packages.keys():
365 if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
366 result.append(key)
367 return result
368
369 def is_work_units (self):
370 result = list ()
371 for key in self.packages.keys():
372 if self.is_work_unit_waiting(key) :
373 return True
374 return False
375
376 def suppress_work_unit (self, name):
377 #print '# remove', name, 'from schedule'
378 for key in self.packages.keys():
379 if name in self.packages[key]['uses']:
380 self.packages[key]['uses'].remove(name)
381
382 def add_work_unit (self, name, cmd):
383 if self.is_work_unit_waiting (name):
384 # we create requests
385 arg = {'cmd': cmd , 'package':name}
386 req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception)
387 # then we put the work request in the queue...
388 self.set_work_unit_status (name, 'queued')
389 self.pool.putRequest(req)
390 #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
391
392 def execute (self, command):
393 #self.print_dependencies ()
394 packages = self.get_next_work_units()
395 if len(packages) !=0:
396 print '\n#--------------------------------------------------------------'
397 print '# Execute parallel actions within packages', packages
398 for package in packages:
399 self.add_work_unit (package, command)
400
401 def execute_all(self,command):
402 #self.print_dependencies ()
403 self.execute (command)
404 self.wait()
405 #self.print_dependencies ()
406 #self.print_status (status='waiting')
407 #while self.is_work_units():
408 #self.wait()
409
410 def wait (self):
411 self.pool.wait()
412
413 # this will be called each time a result is available
414 def result_callback(self, request, result):
415 #print "**Result: %s from request #%s" % (str(result), request.requestID)
416 #print "# Result: %s from request #%s" % (result['package'], request.requestID)
417 #if result['package'] == 'CodeCheck':
418 # sys.exit(-1)
419 self.execute (result['cmd'])
420
421 # the work the threads will have to do
422 def do_execute(self, arg):
423 path = self.get_work_area_path (arg['package'])
424 if path == None or not os.path.exists(path):
425 raise RuntimeError('Path to package '+ arg['package'] +' not found')
426 self.set_work_unit_status (arg['package'], 'running')
427 #cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"
428 #os.chdir(path)
429 #arg['cmd'] = "cd "+ path +";"+ arg['cmd']
430 header = '#--------------------------------------------------------------\n'
431 header = header + '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path + '\n'
432 header = header + '#--------------------------------------------------------------\n'
433 print header
434 project_path = self.current_project['path']+'/'+self.current_project['version']+'/'
435 log_name = string.replace(path, project_path, '')
436 log_name = string.replace(log_name, '/cmt', '')
437 log_name = string.replace(log_name, '/', '_')
438 log_name = log_name+'.loglog'
439 arg['log'] = log_name
440 cmd = "cd "+ path +";"+ arg['cmd']
441 #status, output= commands.getstatusoutput(cmd)
442 # init output file
443 if self.output is not None:
444 f = open (self.output+'/'+ log_name, 'w+')
445 f.write (header)
446 f.close()
447 if self.error is not None:
448 f = open (self.error+'/error'+log_name, 'w+')
449 f.close()
450 self.packages[arg['package']] ['startTime'] = time.time ()
451 status, output, error, pythonError = exeCommand(sCmd=cmd, oLineCallback=self.redirectOutput, arg=arg)#,iTimeout = 3600)
452 self.packages[arg['package']] ['endTime'] = time.time ()
453 if self.perf:
454 self.semaphore.acquire ()
455 f = open (self.perf, 'a')
456 f.write (arg['package']+" "+str(self.packages[arg['package']] ['startTime'])+" "+str(self.packages[arg['package']] ['endTime'] )+'\n')
457 f.close()
458 self.semaphore.release()
459 self.suppress_work_unit (arg['package'])
460 self.set_work_unit_status (arg['package'], 'done')
461 # status, output= commands.getstatusoutput(cmd)
462 #if status != 0:
463 # raise RuntimeError(output)
464 return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
465
466 def redirectOutput(self, index, buffer, arg):
467 """Filter function to redirect the std output and error of the job
468 executable for real-time debugging
469 """
470 if self.output is not None:
471 if index==0:
472 f = open (self.output+'/'+arg['log'], 'a')
473 f.write (buffer+'\n')
474 f.close()
475 elif index==1:
476 if self.error is not None:
477 f = open (self.error+'/error'+arg['log'], 'a')
478 else:
479 f = open (self.output+'/'+arg['log'], 'a')
480 f.write (buffer+'\n')
481 f.close()
482 if not self.silent:
483 print buffer
484
485 # this will be called when an exception occurs within a thread
486 def handle_exception(self, request, exc_info):
487 #traceback.print_stack()
488 print '#--------------------------------------------------------------'
489 #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
490 print "# Exception occured: %s" %(exc_info[1])
491 print exc_info
492 print '#--------------------------------------------------------------'
493 #sys.exit(-1)
494
495
496 def generate_make (self, file, command):
497 makefile = open (file, 'w+')
498 makefile.write ('MAKE=make\n')
499 #MFLAGS= -j10
500 self.counter = len(self.packages)
501 self.recursive_make (self.current_package, command, makefile, len(self.packages))
502 makefile.close ()
503
504 def recursive_make (self, package, command, makefile, indice,actions=list()):
505 lines = self.generate_action_make (package, command, indice)
506 makefile.write (lines)
507 #print lines
508 for pkg in self.packages[package] ['uses']:
509 if pkg not in actions:
510 actions.append(pkg)
511 indice = indice - 1
512 self.counter = self.counter - 1
513 self.recursive_make(pkg, command,makefile, indice, actions)
514
515 def generate_action_make (self, package, command, indice):
516 lines = package + ' :: '
517 # add dependencies
518 for pkg in self.packages[package] ['uses']:
519 lines = lines + ' ' + pkg
520
521 # add the action itself
522 newcommand = string.replace (command, '<package>', package)
523 if command =='':
524 newcommand='$(MAKE)'
525 lines = lines + '\n'
526 lines = lines + '\t@echo "#--------------------------------------------------------------"\n'
527 lines = lines + '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'"\n'
528 lines = lines + '\t@echo "#--------------------------------------------------------------"\n'
529 lines = lines + 'ifdef LOCATION\n'
530 lines = lines + '\t@echo "#--------------------------------------------------------------"> $(LOCATION)/'+ package +'.loglog\n'
531 lines = lines + '\t@echo "# ('+str(self.counter)+'/'+str(len(self.packages))+') Now trying ['+newcommand+'] in '+ self.packages[package]['path']+'">> $(LOCATION)/'+ package +'.loglog\n'
532 lines = lines + '\t@echo "#--------------------------------------------------------------">> $(LOCATION)/'+ package +'.loglog\n'
533 lines = lines + '\t+@cd ' + self.packages[package]['path']
534 lines = lines + ' && ' + newcommand + ' >> $(LOCATION)/'+ package +'.loglog 2>&1\n'
535 lines = lines + 'else\n'
536 lines = lines + '\t+@cd ' + self.packages[package]['path']
537 lines = lines + ' && ' + newcommand + '\n'
538 lines = lines + 'endif\n\n'
539 return lines
540
541#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.