source: tbroadcast/v2/python/tbroadcast.py @ 243

Last change on this file since 243 was 243, checked in by garonne, 18 years ago

MàJ

  • Property svn:executable set to *
File size: 19.6 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Mail: garonne@lal.in2p3.fr
4# -- Date: 08/25/2006
5# -- Name: tbroadcast
6# -- Description: main class
7#----------------------------------#
8
9import os
10import sys
11import time
12import string
13import random
14import os.path
15import commands
16import traceback
17from threading import BoundedSemaphore
18
19from threadpool import WorkRequest
20from threadpool import ThreadPool
21from threadpool import NoResultsPending
22from threadpool import NoWorkersAvailable
23from threadpool import  makeRequests
24from executer   import  exeCommand
25
26class Scheduler:
27
28    def __init__(self, num_workers=20, file=None, ignore_cycles=False, local=False, output=None):
29        self.pool            = ThreadPool(num_workers=num_workers)
30        self.current_package = self.get_current_package()
31        self.current_project = {'name': None, 'path': None, 'version': None}
32        self.packages        = {} 
33        self.counter         = 0
34        self.semaphore       = BoundedSemaphore(1)
35        self.local           = local
36        self.ignore_cycles   = ignore_cycles
37        self.output          = output
38        if output is not None:
39            if not os.path.exists (output):
40                print "path",output,"no exits"       
41                sys.exit(-1)
42            if not os.path.isdir(output):
43                print "path",output,"no a valid directory"       
44                sys.exit(-1)           
45               
46        self.get_current_project()
47        self.instanciate_packages (file)
48        if self.local: self.get_local_graph()
49        self.check_cycles()
50
51    def get_current_project(self):
52        cmd = 'cmt show projects | grep current'
53        status, output = commands.getstatusoutput (cmd)
54        if status != 0:
55            print output
56            sys.exit(-1)   
57        lines = string.split(output, '\n')
58        for line in lines:           
59            if line!='' and line [0] != '#':                           
60                item  = string.split (line, ' ')
61                self.current_project ['name']    = item[0]
62                self.current_project ['version'] = item[1]
63                self.current_project ['path']    = item[3][:-1]
64                version =  self.current_project ['path'][len(self.current_project ['path'])-len(self.current_project ['version'] ):]
65                if  self.current_project ['version'] == version:
66                    self.current_project ['path'] =  os.path.normpath(self.current_project ['path'][:-len(self.current_project ['version'] )])
67                #print self.current_project     
68       
69    def get_counter(self):
70        self.semaphore.acquire ()       
71        self.counter = self.counter + 1
72        value = self.counter
73        self.semaphore.release()
74        return value
75       
76    def check_cycles (self):
77        cmd = 'cmt -private show cycles'
78        cycle_found = False
79        status, output = commands.getstatusoutput (cmd)
80        if status != 0:
81            print output
82            sys.exit(-1)   
83        lines = string.split(output, '\n')
84        cycles = list ()
85        for line in lines:           
86            if line!='' and line [0] != '#':                   
87               cycles.append (string.split(line)) 
88        cercles =list()       
89        for cycle in cycles:
90            cycleInProject = True
91            for package in cycle:           
92                if not self.packages.has_key(package):
93                    cycleInProject = False       
94            if cycleInProject: 
95              cercles.append(cycle)
96        if len(cercles):
97            if not self.ignore_cycles:
98                print "# Error: cycles found, not possible to execute broadcast with threads. Please correct the following cycles:"
99                for cycle in cercles:
100                    loop = ""                   
101                    for package in cycle:
102                        loop = loop + package + ' -> '
103                    print loop + '...'
104                sys.exit(-1)       
105            else:
106                print "# Warning: There are cycles and you have selected the automatic suppress cycles mode"
107                for cycle in cercles:
108                    loop = ""                   
109                    for package in cycle:
110                        loop = loop + package + ' -> '                     
111                    if cycle[0] in self.packages[cycle[len(cycle)-1]]['uses']:
112                        print '## In cycle: '+loop + '..., we suppress the dependency '+ cycle[len(cycle)-1]+'->'+cycle[0]
113                        self.packages[cycle[len(cycle)-1]]['uses'].remove(cycle[0])
114#                sys.exit(-1)
115
116    def format_uses (self, content):
117        # format variables
118        lignes  = string.split(content, '\n')
119        lines   = list()
120        for ligne in lignes:
121           if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne[:10] != "# Required" and ligne not in ['# Selection :','#']:
122               lines.append(ligne)
123        lines.reverse()
124        return lines
125
126    def format_paths (self, content):
127        # format variables
128        lignes  = string.split(content, '\n')
129        lines   = list()
130        for ligne in lignes:
131            if ligne[:4] == "use ":             
132               lines.append(ligne)
133        return lines
134
135    def get_paths (self, content):
136        lines = self.format_paths(content)
137        for line in lines:
138                result  = string.split (line[4:len(line)], ' ')
139                if  self.packages.has_key(result[0]):
140                    if len(result)==4:
141                        name, version, offset, path = string.split (line[4:len(line)], " ")
142                        #print name, version, offset, path
143                        #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
144                        if path == '(no_auto_imports)':
145                            path   = offset
146                            offset = ''
147                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
148                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
149                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
150                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
151                        else:
152                            print '# error path not found for', name
153                            sys.exit(-1)   
154                    elif len(result)==5:
155                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")                                       
156                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
157                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
158                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
159                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
160                        else:
161                            print '# error path not found for', name
162                            sys.exit(-1)                                                                                                   
163                    elif len(result)==3:
164                        name, version, path = string.split (line[4:len(line)], " ")
165                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
166                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
167                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):   
168                            full_path = path[1:-1] + '/' +name + + '/cmt'
169                        else:
170                            print '# error path not found for', name
171                            sys.exit(-1)
172                    else:
173                        print "error:",line
174                        print str(result)
175                        sys.exit(-1) 
176                    self.packages[result[0]]['path'] = os.path.normpath(full_path)
177                    commonprefix = os.path.commonprefix([self.packages[result[0]]['path'], self.current_project ['path']])
178                    if os.path.normpath(commonprefix) == self.current_project ['path']:                   
179                        #print result[0], ' belong to project', self.current_project ['name']
180                        self.packages[result[0]]['current_project'] = True
181
182    def get_uses(self, content):       
183        # initiates variables
184        lignes = self.format_uses(content)
185        if not len(lignes): return
186        self.packages [self.current_package] = {'version': '*', 'client': list(),
187                                                'uses': list(), 'status': 'waiting', 
188                                                'current_project': True, 'path': os.getcwd()}
189        previous_client = self.current_package
190        previous_level  = 0
191        level_stack    = [{'name':previous_client,'level':previous_level},]
192        ligne = lignes.pop()       
193        while len(lignes)!=0:   
194            current_level = string.find(ligne, 'use')
195            while current_level > previous_level:               
196                name    = string.split (ligne)[2]
197                version = string.split (ligne)[3]                             
198                if not self.packages.has_key (name):
199                  self.packages [name] = {'version': version, 'uses': list(), 
200                                          'client': list(), 'status': 'waiting', 
201                                          'current_project': False, 'path': None}
202                if name not in self.packages[previous_client]['uses']:# and name != previous_client:
203                   self.packages[previous_client]['uses'].append (name)               
204                level_stack.append({'name':previous_client,'level':previous_level})
205                previous_client = name 
206                previous_level = current_level                 
207                if len(lignes):
208                    ligne = lignes.pop()
209                    #print ligne
210                    current_level = string.find(ligne, 'use')
211                                           
212            #self.packages [previous_client]['status'] ='queued'
213            # restore the level
214            if len(lignes):
215                if len(level_stack):                       
216                    item = level_stack.pop()               
217                    while item['level'] >= current_level and len(level_stack):
218                             item = level_stack.pop()
219                    previous_client = item['name']
220                    previous_level  = item['level']
221            #print previous_client, '-->',string.split (ligne)[2]
222
223    def instanciate_packages(self, file=None):
224        # We create the schedule of the work units
225        print '# First, we initialize the DAG by parsing "cmt show uses"'
226        if file is None:
227            cmd  = 'cmt show uses'
228        else:   
229            cmd = 'cat ' + file       
230        status, output = commands.getstatusoutput (cmd)
231        if status != 0:
232            print output
233            sys.exit(-1)
234        self.get_uses(output)   
235        self.get_paths(output)
236        #self.check_execution (package=self.current_package)
237        #self.simulate_execution()
238
239    def get_local_graph(self):
240        To_remove = list()
241        for key in self.packages:
242            if self.packages[key]['current_project']== False:
243                for selected in self.packages:
244                    if key in self.packages[selected]['uses']:
245                       self.packages[selected]['uses'].remove(key)
246                To_remove.append (key) 
247        for item in To_remove:
248            self.packages.pop(item)
249
250    def simulate_execution(self):
251       ok = True
252       indice = 1                     
253       while ok:
254           runnable  = list()
255           for key in self.packages:
256               if  self.packages[key]['status']!='done':
257                   if len(self.packages[key]['uses']) == 0:
258                       runnable.append(key)                                             
259           if len(runnable):
260               print '\n#--------------------------------------------------------------'
261               print "# Execute parallel actions within packages " + str(runnable) 
262           for selected in runnable:       
263               print '#--------------------------------------------------------------'
264               print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path']
265               print '#--------------------------------------------------------------'
266               self.packages[selected]['status']='done'       
267               indice = indice + 1
268               for key in self.packages:
269                   if selected in self.packages[key]['uses']:
270                       self.packages[key]['uses'].remove(selected)                               
271                       #print 'remove', selected, 'from',key
272           if len(runnable)==0:
273                           ok = False       
274               
275    def check_execution(self, package, path=list(), cycles=list()):
276        #print package,'-->',self.packages[package]['uses']
277        #print path
278        if package in path:
279            if path[path.index(package):] not in cycles:
280                print 'Cycles:',path[path.index(package):], package
281                cycles = cycles + path[path.index(package):]
282                sys.exit(-1)
283        path.append(package)
284        for item in self.packages[package]['uses']:
285              self.check_execution(package=item, path=path, cycles=cycles)
286              path.pop()       
287
288    def get_current_package(self):   
289        cmd = 'cmt show macro package'
290        status, output = commands.getstatusoutput (cmd)
291        if status != 0:
292            print output
293            sys.exit(-1)   
294        lines = string.split(output, '\n')
295        for line in lines:
296            if line [0] != '#':
297                start = string.find(line,"'")
298                end   = string.find(line[start+1:len(line)],"'")
299                return line [start+1:start+end+1]
300
301    def get_work_area_path (self, name):       
302        return self.packages [name]['path']
303       
304    def get_package_path (self, name):   
305        #return os.getcwd ()
306        cmd = 'cmt -use='+name+' run pwd'
307        status, output = commands.getstatusoutput (cmd)
308        if status != 0:
309            print output
310            sys.exit(-1)   
311        lines = string.split(output, '\n')
312        for line in lines:
313            if line [0] != '#' and line[:5] != "#CMT>":
314                print line
315                return line
316 
317    def print_dependencies(self):
318        print '# ------------------------' 
319        print '# package --> dependencies' 
320        print '# ------------------------' 
321        for key in self.packages.keys():
322            print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                       
323
324    def print_status(self, status):
325        print '# ------------------------' 
326        print '# package --> dependencies' 
327        print '# ------------------------' 
328        i = 1
329        for key in self.packages.keys():
330            if self.packages[key] ['status'] == status:
331                print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                 
332                i = i + 1
333           
334    def is_work_unit_waiting (self, name):
335        return self.packages[name] ['status'] == 'waiting'
336
337    def set_work_unit_status (self, name, status):
338        self.packages[name] ['status'] = status
339
340    def get_dependencies (self, name):
341        return self.packages[name] ['uses']
342   
343    def get_next_work_units (self):
344        result = list ()
345        for key in self.packages.keys():
346            if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
347                result.append(key)
348        return result
349
350    def is_work_units (self):
351        result = list ()
352        for key in self.packages.keys():
353            if self.is_work_unit_waiting(key) :
354                return True
355        return False       
356
357    def suppress_work_unit (self, name):
358        #print '# remove', name, 'from schedule'
359        for key in self.packages.keys():
360            if name in self.packages[key]['uses']:
361                self.packages[key]['uses'].remove(name)
362
363    def add_work_unit (self, name, cmd):
364        if self.is_work_unit_waiting (name):
365            # we create requests
366            arg = {'cmd': cmd , 'package':name}
367            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception) 
368            # then we put the work request in the queue...
369            self.set_work_unit_status (name, 'queued')
370            self.pool.putRequest(req)
371            #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
372
373    def execute (self, command):
374        #self.print_dependencies ()
375        packages = self.get_next_work_units()
376        if len(packages) !=0:
377            print '\n#--------------------------------------------------------------'   
378            print '# Execute parallel actions within packages', packages                     
379            for package in packages:
380                self.add_work_unit (package, command)
381
382    def execute_all(self,command):
383        #self.print_dependencies ()
384        self.execute (command)
385        self.wait()
386        #self.print_dependencies ()
387        #self.print_status (status='waiting')       
388        #while self.is_work_units():
389        #self.wait()           
390       
391    def wait (self):
392       self.pool.wait()   
393
394    # this will be called each time a result is available
395    def result_callback(self, request, result):
396      #print "**Result: %s from request #%s" % (str(result), request.requestID)
397      #print "# Result: %s from request #%s" % (result['package'], request.requestID)
398      self.execute (result['cmd'])
399
400    # the work the threads will have to do
401    def do_execute(self, arg):
402      path = self.get_work_area_path (arg['package'])
403      if path == None:
404          raise RuntimeError('Path to package '+ arg['package'] +' not found')
405      self.set_work_unit_status (arg['package'], 'running')     
406      cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"
407      os.chdir(path)       
408      print '#--------------------------------------------------------------'
409      print '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path
410      print '#--------------------------------------------------------------'
411      cmd = arg['cmd']
412      status, output, error, pythonError  = exeCommand(cmd)#,iTimeout = 3600)
413      if self.output is not None:
414         f = open (self.output+'/'+arg['package']+'_output.log', 'w+')
415         f.write (output)
416         f.close()
417         f = open (self.output+'/'+arg['package']+'_error.log', 'w+') 
418         f.write (str(error))
419         f.close() 
420      self.suppress_work_unit (arg['package'])
421      self.set_work_unit_status (arg['package'], 'done')
422      # status, output= commands.getstatusoutput(cmd)
423      #if status != 0:
424      #   raise RuntimeError(output)
425      return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
426             
427    # this will be called when an exception occurs within a thread
428    def handle_exception(self, request, exc_info):
429        #traceback.print_stack()
430      print '#--------------------------------------------------------------'       
431      #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
432      print "# Exception occured: %s" %(exc_info[1])
433      print '#--------------------------------------------------------------'   
434      sys.exit(-1)
435#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.