source: tbroadcast/v2/python/tbroadcast.py @ 239

Last change on this file since 239 was 239, checked in by garonne, 18 years ago

bug fixed

  • Property svn:executable set to *
File size: 10.8 KB
RevLine 
[232]1#----------------------------------#
2# -- Author: V.Garonne
3# -- Date: 08/25/2006
4# -- Name: tbroadcast
5# -- Description: mainc class
6#----------------------------------#
7
8import os
9import sys
10import time
11import string
12import random
13import commands
[239]14import traceback
[232]15
16from threadpool import WorkRequest
17from threadpool import ThreadPool
18from threadpool import NoResultsPending
19from threadpool import NoWorkersAvailable
20from threadpool import  makeRequests
21from executer   import  exeCommand
22
23class Scheduler:
24
[236]25    def __init__(self, num_workers=20):
26        self.pool     = ThreadPool(num_workers=num_workers)
[232]27        self.packages = {} 
[238]28        self.check_cycles()
[232]29        self.instanciate_packages ()     
30
[238]31
32    def check_cycles (self):
33        cmd = 'cmt show cycles'
34        cycle_found = False
35        status, output = commands.getstatusoutput (cmd)
36        if status != 0:
37            print output
38            sys.exit(-1)   
39        lines = string.split(output, '\n')
40        for line in lines:           
[239]41            if line!='' and line [0] != '#':           
[238]42                if not cycle_found:
43                    cycle_found = True
44                    print "# Error: cycles found, not possible to execute broadcast with threads. See the followings packages:"
45                print line
46        if cycle_found:
47            sys.exit(-1)
48   
[232]49    def instanciate_packages(self):
50        # We create the schedule of the work units
[238]51        print '# First, we initialize the DAG by parsing cmt show uses (takes a certain time... huk should be improved asap) '
[232]52        cmd = 'cmt show uses'
53        status, output = commands.getstatusoutput (cmd)
54        if status != 0:
55            print output
56            sys.exit(-1)   
57        lines = string.split(output, '\n')
58        current_package = self.get_current_package()
59        self.packages [current_package] = {'version': '*', 'dependencies': list(), 'status': 'waiting', 'path':os.getcwd()}
[237]60        indice  = 1
[232]61        for line in lines:
62            if line [0] == '#' and line[:5] != "#CMT>" and line not in ['# Selection :','#']:
63                name    = string.split (line)[2]
64                version = string.split (line)[3]
65                if name not in self.packages[current_package]['dependencies']:
66                   self.packages[current_package]['dependencies'].append (name)               
[238]67                   #print '\n#', indice,':: add package', name
[237]68                   indice = indice + 1
[232]69                if not self.packages.has_key (name):
[237]70                   self.packages [name] = {'version': version, 'dependencies': list(), 'status': 'waiting', 'path': None}               
71                   found = False
72                   for ligne in lines:
[232]73                    if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne not in ['# Selection :','#']:               
74                       if found:
75                           if level < string.find(ligne, 'use'):
76                               if string.split (ligne)[2] not in self.packages[name]['dependencies']:
77                                   self.packages[name]['dependencies'].append (string.split (ligne)[2])
[238]78                                   #print "# add dependency", string.split (ligne)[2], ' to package ',name
[232]79                           else:
80                                found = False                                                                                                   
81                       if name == string.split (ligne)[2]:
82                           level = string.find(ligne, 'use')
83                           found = True   
[237]84            if line[:4] == "use ":             
85                result  = string.split (line[4:len(line)], ' ')
86                if  self.packages.has_key(result[0]):
87                    if len(result)==4:
88                        name, version, offset, path = string.split (line[4:len(line)], " ")
89                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
90                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
91                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
92                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
93                        else:
94                            print '# error path not found for', name
95                            sys.exit(-1)   
96                    elif len(result)==5:
97                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")                                       
98                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
99                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
100                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
101                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
102                        else:
103                            print '# error path not found for', name
104                            sys.exit(-1)                                                                                                   
105                    elif len(result)==3:
106                        name, version, path = string.split (line[4:len(line)], " ")
107                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
108                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
109                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):   
110                            full_path = path[1:-1] + '/' +name + + '/cmt'
111                        else:
112                            print '# error path not found for', name
113                            sys.exit(-1)
114                    else:
115                        print "error:",line
116                        print str(result)
117                        sys.exit(-1) 
118                    self.packages[result[0]]['path'] = full_path
[238]119        print '# really takes a certain time ... '
[232]120
121    def get_current_package(self):   
122        cmd = 'cmt show macro package'
123        status, output = commands.getstatusoutput (cmd)
124        if status != 0:
125            print output
126            sys.exit(-1)   
127        lines = string.split(output, '\n')
128        for line in lines:
129            if line [0] != '#':
130                start = string.find(line,"'")
131                end   = string.find(line[start+1:len(line)],"'")
132                return line [start+1:start+end+1]
133
134    def get_work_area_path (self, name):       
135        return self.packages [name]['path']
136       
137    def get_package_path (self, name):   
[237]138        #return os.getcwd ()
[232]139        cmd = 'cmt -use='+name+' run pwd'
140        status, output = commands.getstatusoutput (cmd)
141        if status != 0:
142            print output
143            sys.exit(-1)   
144        lines = string.split(output, '\n')
145        for line in lines:
[237]146            if line [0] != '#' and line[:5] != "#CMT>":
147                print line
[232]148                return line
149 
150    def print_dependencies(self):
151        print '# ------------------------' 
152        print '# package --> dependencies' 
153        print '# ------------------------' 
154        for key in self.packages.keys():
155            print key, '-->', self.packages[key] ['dependencies'],',', self.packages[key] ['status']                       
156
157    def is_work_unit_waiting (self, name):
158        return self.packages[name] ['status'] == 'waiting'
159
160    def set_work_unit_status (self, name, status):
161        self.packages[name] ['status'] = status
162
163    def get_dependencies (self, name):
164        return self.packages[name] ['dependencies']
165   
166    def get_next_work_units (self):
167        result = list ()
168        for key in self.packages.keys():
169            if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
170                result.append(key)
171        return result
172
[238]173    def is_work_units (self):
174        result = list ()
175        for key in self.packages.keys():
176            if self.is_work_unit_waiting(key) :
177                return True
178        return False       
179
[232]180    def suppress_work_unit (self, name):
[239]181        #print '# remove', name, 'from schedule'
[232]182        for key in self.packages.keys():
183            if name in self.packages[key]['dependencies']:
184                self.packages[key]['dependencies'].remove(name)
185               
186    def add_work_unit (self, name, cmd):
187        if self.is_work_unit_waiting (name):
188            # we create requests
189            arg = {'cmd': cmd , 'package':name}
190            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception) 
191            # then we put the work request in the queue...
192            self.set_work_unit_status (name, 'queued')
193            self.pool.putRequest(req)
194            #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
195
196    def execute (self, command):
[238]197        #self.print_dependencies ()
[232]198        packages = self.get_next_work_units()
199        if len(packages) !=0:
200            print '\n# Execute parallel actions within ', packages                     
[238]201            for package in packages:
202                self.add_work_unit (package, command)
[232]203
204    def execute_all(self,command):
205        self.execute (command)
206        self.wait()
[239]207        ##while self.is_work_units():
[232]208       
209    def wait (self):
210       self.pool.wait()   
211
212    # this will be called each time a result is available
213    def result_callback(self, request, result):
214      #print "**Result: %s from request #%s" % (str(result), request.requestID)
[239]215      #print "# Result: %s from request #%s" % (result['package'], request.requestID)
[232]216      self.execute (result['cmd'])
217
218    # the work the threads will have to do
219    def do_execute(self, arg):
[239]220        path = self.get_work_area_path (arg['package'])   
221        self.set_work_unit_status (arg['package'], 'running')     
222        cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"
223        os.chdir(path)       
[232]224        print '#--------------------------------------------------------------'
225        print '# Now trying ['+ arg['cmd']+'] in ' + path
226        print '#--------------------------------------------------------------'
227        cmd = arg['cmd']
228        status, output, error, pythonError  = exeCommand(cmd,  iTimeout = 10)     
229        self.suppress_work_unit (arg['package'])
230        self.set_work_unit_status (arg['package'], 'done')
231        # status, output= commands.getstatusoutput(cmd)
232        #print output
233        #if status != 0:
234        #   raise RuntimeError(output)
235        return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
[239]236             
[232]237    # this will be called when an exception occurs within a thread
238    def handle_exception(self, request, exc_info):
[239]239        #traceback.print_stack()
[232]240        print "# Exception occured in request #%s: %s" % \
241          (request.requestID, exc_info[1])
242#--------- EoF --------#   
Note: See TracBrowser for help on using the repository browser.