source: tbroadcast/v2/python/tbroadcast.py @ 244

Last change on this file since 244 was 244, checked in by garonne, 18 years ago

MàJ

  • Property svn:executable set to *
File size: 21.2 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9import os
10import sys
11import time
12import string
13import random
14import os.path
15import commands
16import traceback
17from threading import BoundedSemaphore
18
19from threadpool import WorkRequest
20from threadpool import ThreadPool
21from threadpool import NoResultsPending
22from threadpool import NoWorkersAvailable
23from threadpool import  makeRequests
24from executer   import  exeCommand
25
26class Scheduler:
27
28    def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None, error=None, silent = False):
29        self.pool            = ThreadPool(num_workers=num_workers)
30        self.current_package = self.get_current_package()
31        self.current_project = {'name': None, 'path': None, 'version': None}
32        self.packages        = {}
33        self.counter         = 0
34        self.semaphore       = BoundedSemaphore(1)
35        self.local           = local
36        self.ignore_cycles   = ignore_cycles
37        self.output          = output
38        self.error           = error
39        self.silent          = silent
40        if output is not None:
41            if not os.path.exists (output):
42                print "path",output,"no exits"       
43                sys.exit(-1)
44            if not os.path.isdir(output):
45                print "path",output,"no a valid directory"       
46                sys.exit(-1)           
47               
48        self.get_current_project()
49        self.instanciate_packages (file)
50        if self.local: self.get_local_graph()
51        self.check_cycles()
52       
53#        status, output = commands.getstatusoutput("cmt broadcast -local 'echo <package>'")
54#        lignes = string.split(output, '\n')
55#        i = 1
56#        for package in lignes:
57#            if package!='' and package[0] != '#':                           
58#                print i , package
59#                i =i +1
60#                if not self.packages.has_key(package):
61#                                    print package       
62#        print len(self.packages)
63#        sys.exit(-1)
64
65    def get_current_project(self):
66        cmd = 'cmt show projects | grep current'
67        status, output = commands.getstatusoutput (cmd)
68        if status != 0:
69            print output
70            sys.exit(-1)   
71        lines = string.split(output, '\n')
72        for line in lines:           
73            if line!='' and line [0] != '#':                           
74                item  = string.split (line, ' ')
75                self.current_project ['name']    = item[0]
76                self.current_project ['version'] = item[1]
77                self.current_project ['path']    = item[3][:-1]
78                version =  self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
79                if  self.current_project ['version'] == version:
80                    self.current_project ['path'] =  os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
81                #print self.current_project     
82       
83    def get_counter(self):
84        self.semaphore.acquire ()       
85        self.counter = self.counter + 1
86        value = self.counter
87        self.semaphore.release()
88        return value
89       
90    def check_cycles (self):
91        cmd = 'cmt -private show cycles'
92        cycle_found = False
93        status, output = commands.getstatusoutput (cmd)
94        if status != 0:
95            print output
96            sys.exit(-1)   
97        lines = string.split(output, '\n')
98        cycles = list ()
99        for line in lines:           
100            if line!='' and line [0] != '#':                   
101               cycles.append (string.split(line)) 
102        cercles =list()       
103        for cycle in cycles:
104            cycleInProject = True
105            for package in cycle:           
106                if not self.packages.has_key(package):
107                    cycleInProject = False       
108            if cycleInProject: 
109              cercles.append(cycle)
110        if len(cercles):
111            if not self.ignore_cycles:
112                print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
113                for cycle in cercles:
114                    loop = ""                   
115                    for package in cycle:
116                        loop = loop + package + ' -> '
117                    print loop + '...'
118                sys.exit(-1)       
119            else:
120                print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
121                for cycle in cercles:
122                    loop = ""                   
123                    for package in cycle:
124                        loop = loop + package + ' -> '                     
125                    if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
126                        print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
127                        self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
128#                sys.exit(-1)
129
130    def format_uses (self, content):
131        # format variables
132        lignes  = string.split(content, '\n')
133        lines   = list()
134        for ligne in lignes:
135           if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
136               lines.append(ligne)
137        lines.reverse()
138        return lines
139
140    def format_paths (self, content):
141        # format variables
142        lignes  = string.split(content, '\n')
143        lines   = list()
144        for ligne in lignes:
145            if ligne[:4] == "use ":             
146               lines.append(ligne)
147        return lines
148
149    def get_paths (self, content):
150        lines = self.format_paths(content)
151        for line in lines:
152                result  = string.split (line[4:len(line)], ' ')
153                if  self.packages.has_key(result[0]):
154                    if len(result)==4:
155                        name, version, offset, path = string.split (line[4:len(line)], " ")
156                        #print name, version, offset, path
157                        #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
158                        if path == '(no_auto_imports)':
159                            path   = offset
160                            offset = ''
161                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
162                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
163                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
164                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
165                        else:
166                            print '# error path not found for', name
167                            sys.exit(-1)   
168                    elif len(result)==5:
169                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")                                       
170                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
171                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
172                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
173                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
174                        else:
175                            print '# error path not found for', name
176                            sys.exit(-1)                                                                                                   
177                    elif len(result)==3:
178                        name, version, path = string.split (line[4:len(line)], " ")
179                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
180                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
181                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):   
182                            full_path = path[1:-1] + '/' +name + + '/cmt'
183                        else:
184                            print '# error path not found for', name
185                            sys.exit(-1)
186                    else:
187                        print "error:",line
188                        print str(result)
189                        sys.exit(-1) 
190                    self.packages[result[0]]['path'] = os.path.normpath(full_path)
191                    commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
192                    if os.path.normpath(commonprefix) == self.current_project ['path']:                   
193                        #print result[0], ' belong to project', self.current_project ['name']
194                        self.packages[result[0]]['current_project'] = True
195
196    def get_uses(self, content):       
197        # initiates variables
198        lignes = self.format_uses(content)
199        if not len(lignes): return
200        self.packages [self.current_package] = {'version': '*', 'client': list(),
201                                                'uses': list(), 'status': 'waiting', 
202                                                'current_project': True, 'path': os.getcwd()}
203        previous_client = self.current_package
204        previous_level  = 0
205        level_stack    = [{'name':previous_client,'level':previous_level},]
206        ligne = lignes.pop()       
207        while len(lignes)!=0:   
208            current_level = string.find(ligne, 'use')
209            while current_level > previous_level:               
210                name    = string.split (ligne)[2]
211                version = string.split (ligne)[3]                             
212                if not self.packages.has_key (name):
213                  self.packages [name] = {'version': version, 'uses': list(), 
214                                          'client': list(), 'status': 'waiting', 
215                                          'current_project': False, 'path': None}
216                if name not in self.packages[previous_client]['uses']:# and name != previous_client:
217                   self.packages[previous_client]['uses'].append (name)               
218                level_stack.append({'name':previous_client,'level':previous_level})
219                previous_client = name 
220                previous_level = current_level                 
221                if len(lignes):
222                    ligne = lignes.pop()
223                    #print ligne
224                    current_level = string.find(ligne, 'use')
225                                           
226            #self.packages [previous_client]['status'] ='queued'
227            # restore the level
228            if len(lignes):
229                if len(level_stack):                       
230                    item = level_stack.pop()               
231                    while item['level'] >= current_level and len(level_stack):
232                             item = level_stack.pop()
233                    previous_client = item['name']
234                    previous_level  = item['level']
235            #print previous_client, '-->',string.split (ligne)[2]
236
237    def instanciate_packages(self, file=None):
238        # We create the schedule of the work units
239        print '# First, we initialize the DAG by parsing "cmt show uses"'
240        if file is None:
241            cmd  = 'cmt show uses'
242        else:   
243            cmd = 'cat ' + file       
244        status, output = commands.getstatusoutput (cmd)
245        if status != 0:
246            print output
247            sys.exit(-1)
248        self.get_uses(output)   
249        self.get_paths(output)
250        #self.check_execution (package=self.current_package)
251        #self.simulate_execution()
252
253    def get_local_graph(self):
254        To_remove = list()
255        for key in self.packages:
256            if self.packages[key]['current_project']== False:
257                for selected in self.packages:
258                    if key in self.packages[selected]['uses']:
259                       self.packages[selected]['uses'].remove(key)
260                To_remove.append (key) 
261        for item in To_remove:
262             del self.packages[item]
263
264    def simulate_execution(self):
265       ok = True
266       indice = 1                     
267       while ok:
268           runnable  = list()
269           for key in self.packages:
270               if  self.packages[key]['status']!='done':
271                   if len(self.packages[key]['uses']) == 0:
272                       runnable.append(key)                                             
273           if len(runnable):
274               print '\n#--------------------------------------------------------------'
275               print "# Execute parallel actions within packages " + str(runnable) 
276           for selected in runnable:       
277               print '#--------------------------------------------------------------'
278               print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path']
279               print '#--------------------------------------------------------------'
280               self.packages[selected]['status']='done'       
281               indice = indice + 1
282               for key in self.packages:
283                   if selected in self.packages[key]['uses']:
284                       self.packages[key]['uses'].remove(selected)                               
285                       #print 'remove', selected, 'from',key
286           if len(runnable)==0:
287                           ok = False       
288               
289    def check_execution(self, package, path=list(), cycles=list()):
290        #print package,'-->',self.packages[package]['uses']
291        #print path
292        if package in path:
293            if path[path.index(package):] not in cycles:
294                print 'Cycles:',path[path.index(package):], package
295                cycles = cycles + path[path.index(package):]
296                sys.exit(-1)
297        path.append(package)
298        for item in self.packages[package]['uses']:
299              self.check_execution(package=item, path=path, cycles=cycles)
300              path.pop()       
301
302    def get_current_package(self):   
303        cmd = 'cmt show macro package'
304        status, output = commands.getstatusoutput (cmd)
305        if status != 0:
306            print output
307            sys.exit(-1)   
308        lines = string.split(output, '\n')
309        for line in lines:
310            if line [0] != '#':
311                start = string.find(line,"'")
312                end   = string.find(line[start+1:len(line)],"'")
313                return line [start+1:start+end+1]
314
315    def get_work_area_path (self, name):       
316        return self.packages [name]['path']
317       
318    def get_package_path (self, name):   
319        #return os.getcwd ()
320        cmd = 'cmt -use='+name+' run pwd'
321        status, output = commands.getstatusoutput (cmd)
322        if status != 0:
323            print output
324            sys.exit(-1)   
325        lines = string.split(output, '\n')
326        for line in lines:
327            if line [0] != '#' and line[:5] != "#CMT>":
328                print line
329                return line
330 
331    def print_dependencies(self):
332        print '# ------------------------' 
333        print '# package --> dependencies' 
334        print '# ------------------------' 
335        for key in self.packages.keys():
336            print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                       
337
338    def print_status(self, status):
339        print '# ------------------------' 
340        print '# package --> dependencies' 
341        print '# ------------------------' 
342        i = 1
343        for key in self.packages.keys():
344            if self.packages[key] ['status'] == status:
345                print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                 
346                i = i + 1
347           
348    def is_work_unit_waiting (self, name):
349        return self.packages[name] ['status'] == 'waiting'
350
351    def set_work_unit_status (self, name, status):
352        self.packages[name] ['status'] = status
353
354    def get_dependencies (self, name):
355        return self.packages[name] ['uses']
356   
357    def get_next_work_units (self):
358        result = list ()
359        for key in self.packages.keys():
360            if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
361                result.append(key)
362        return result
363
364    def is_work_units (self):
365        result = list ()
366        for key in self.packages.keys():
367            if self.is_work_unit_waiting(key) :
368                return True
369        return False       
370
371    def suppress_work_unit (self, name):
372        #print '# remove', name, 'from schedule'
373        for key in self.packages.keys():
374            if name in self.packages[key]['uses']:
375                self.packages[key]['uses'].remove(name)
376
377    def add_work_unit (self, name, cmd):
378        if self.is_work_unit_waiting (name):
379            # we create requests
380            arg = {'cmd': cmd , 'package':name}
381            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception) 
382            # then we put the work request in the queue...
383            self.set_work_unit_status (name, 'queued')
384            self.pool.putRequest(req)
385            #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
386
387    def execute (self, command):
388        #self.print_dependencies ()
389        packages = self.get_next_work_units()
390        if len(packages) !=0:
391            print '\n#--------------------------------------------------------------'   
392            print '# Execute parallel actions within packages', packages                     
393            for package in packages:
394                self.add_work_unit (package, command)
395
396    def execute_all(self,command):
397        #self.print_dependencies ()
398        self.execute (command)
399        self.wait()
400        #self.print_dependencies ()
401        #self.print_status (status='waiting')       
402        #while self.is_work_units():
403        #self.wait()           
404       
405    def wait (self):
406       self.pool.wait()   
407
408    # this will be called each time a result is available
409    def result_callback(self, request, result):
410      #print "**Result: %s from request #%s" % (str(result), request.requestID)
411      #print "# Result: %s from request #%s" % (result['package'], request.requestID)
412      self.execute (result['cmd'])
413
414    # the work the threads will have to do
415    def do_execute(self, arg):
416      path = self.get_work_area_path (arg['package'])
417      if path == None:
418          raise RuntimeError('Path to package '+ arg['package'] +' not found')
419      self.set_work_unit_status (arg['package'], 'running')     
420      cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"
421      os.chdir(path)       
422      print '#--------------------------------------------------------------'
423      print '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path
424      print '#--------------------------------------------------------------'
425      cmd = arg['cmd'] 
426      #status, output= commands.getstatusoutput(cmd)
427      # init output file
428      if self.output is not None:
429           f = open (self.output+'/'+arg['package']+'_output.log', 'w+')
430           f.close()     
431           if self.error is not None:
432               f = open (self.error+'/'+arg['package']+'_error.log', 'w+')
433               f.close()           
434      status, output, error, pythonError  = exeCommand(sCmd=cmd, oLineCallback=self.redirectOutput, arg=arg)#,iTimeout = 3600)
435      self.suppress_work_unit (arg['package'])
436      self.set_work_unit_status (arg['package'], 'done')
437      # status, output= commands.getstatusoutput(cmd)
438      #if status != 0:
439      #   raise RuntimeError(output)
440      return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
441
442    def redirectOutput(self, index, buffer, arg):
443        """Filter function to redirect the std output and error of the job
444           executable for real-time debugging
445        """
446        if self.output is not None:
447           if index==0:   
448               f = open (self.output+'/'+arg['package']+'_output.log', 'a')
449               f.write (buffer+'\n')
450               f.close()
451           elif index==1: 
452               if self.error is not None:
453                   f = open (self.error+'/'+arg['package']+'_error.log', 'a')
454               else:
455                   f = open (self.output+'/'+arg['package']+'_output.log', 'a')                   
456               f.write (buffer+'\n')                   
457               f.close()                               
458        if not self.silent: 
459            print buffer
460             
461    # this will be called when an exception occurs within a thread
462    def handle_exception(self, request, exc_info):
463        #traceback.print_stack()
464      print '#--------------------------------------------------------------'       
465      #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
466      print "# Exception occured: %s" %(exc_info[1])
467      print '#--------------------------------------------------------------'   
468      sys.exit(-1)
469
470
471#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.