source: tbroadcast/v2/python/tbroadcast.py @ 238

Last change on this file since 238 was 238, checked in by garonne, 18 years ago

check cycles

  • Property svn:executable set to *
File size: 10.8 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Date: 08/25/2006
4# -- Name: tbroadcast
5# -- Description: mainc class
6#----------------------------------#
7
8import os
9import sys
10import time
11import string
12import random
13import commands
14
15from threadpool import WorkRequest
16from threadpool import ThreadPool
17from threadpool import NoResultsPending
18from threadpool import NoWorkersAvailable
19from threadpool import  makeRequests
20from executer   import  exeCommand
21
22class Scheduler:
23
24    def __init__(self, num_workers=20):
25        self.pool     = ThreadPool(num_workers=num_workers)
26        self.packages = {} 
27        self.check_cycles()
28        self.instanciate_packages ()     
29
30
31    def check_cycles (self):
32        cmd = 'cmt show cycles'
33        cycle_found = False
34        status, output = commands.getstatusoutput (cmd)
35        if status != 0:
36            print output
37            sys.exit(-1)   
38        lines = string.split(output, '\n')
39        for line in lines:           
40            if line [0] != '#':           
41                if not cycle_found:
42                    cycle_found = True
43                    print "# Error: cycles found, not possible to execute broadcast with threads. See the followings packages:"
44                print line
45        if cycle_found:
46            sys.exit(-1)
47   
48    def instanciate_packages(self):
49        # We create the schedule of the work units
50        print '# First, we initialize the DAG by parsing cmt show uses (takes a certain time... huk should be improved asap) '
51        cmd = 'cmt show uses'
52        status, output = commands.getstatusoutput (cmd)
53        if status != 0:
54            print output
55            sys.exit(-1)   
56        lines = string.split(output, '\n')
57        current_package = self.get_current_package()
58        self.packages [current_package] = {'version': '*', 'dependencies': list(), 'status': 'waiting', 'path':os.getcwd()}
59        indice  = 1
60        for line in lines:
61            if line [0] == '#' and line[:5] != "#CMT>" and line not in ['# Selection :','#']:
62                name    = string.split (line)[2]
63                version = string.split (line)[3]
64                if name not in self.packages[current_package]['dependencies']:
65                   self.packages[current_package]['dependencies'].append (name)               
66                   #print '\n#', indice,':: add package', name
67                   indice = indice + 1
68                if not self.packages.has_key (name):
69                   self.packages [name] = {'version': version, 'dependencies': list(), 'status': 'waiting', 'path': None}               
70                   found = False
71                   for ligne in lines:
72                    if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne not in ['# Selection :','#']:               
73                       if found:
74                           if level < string.find(ligne, 'use'):
75                               if string.split (ligne)[2] not in self.packages[name]['dependencies']:
76                                   self.packages[name]['dependencies'].append (string.split (ligne)[2])
77                                   #print "# add dependency", string.split (ligne)[2], ' to package ',name
78                           else:
79                                found = False                                                                                                   
80                       if name == string.split (ligne)[2]:
81                           level = string.find(ligne, 'use')
82                           found = True   
83            if line[:4] == "use ":             
84                result  = string.split (line[4:len(line)], ' ')
85                if  self.packages.has_key(result[0]):
86                    if len(result)==4:
87                        name, version, offset, path = string.split (line[4:len(line)], " ")
88                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
89                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
90                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
91                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
92                        else:
93                            print '# error path not found for', name
94                            sys.exit(-1)   
95                    elif len(result)==5:
96                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")                                       
97                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
98                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
99                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
100                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
101                        else:
102                            print '# error path not found for', name
103                            sys.exit(-1)                                                                                                   
104                    elif len(result)==3:
105                        name, version, path = string.split (line[4:len(line)], " ")
106                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
107                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
108                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):   
109                            full_path = path[1:-1] + '/' +name + + '/cmt'
110                        else:
111                            print '# error path not found for', name
112                            sys.exit(-1)
113                    else:
114                        print "error:",line
115                        print str(result)
116                        sys.exit(-1) 
117                    self.packages[result[0]]['path'] = full_path
118        print '# really takes a certain time ... '
119
120    def get_current_package(self):   
121        cmd = 'cmt show macro package'
122        status, output = commands.getstatusoutput (cmd)
123        if status != 0:
124            print output
125            sys.exit(-1)   
126        lines = string.split(output, '\n')
127        for line in lines:
128            if line [0] != '#':
129                start = string.find(line,"'")
130                end   = string.find(line[start+1:len(line)],"'")
131                return line [start+1:start+end+1]
132
133    def get_work_area_path (self, name):       
134        return self.packages [name]['path']
135       
136    def get_package_path (self, name):   
137        #return os.getcwd ()
138        cmd = 'cmt -use='+name+' run pwd'
139        status, output = commands.getstatusoutput (cmd)
140        if status != 0:
141            print output
142            sys.exit(-1)   
143        lines = string.split(output, '\n')
144        for line in lines:
145            if line [0] != '#' and line[:5] != "#CMT>":
146                print line
147                return line
148 
149    def print_dependencies(self):
150        print '# ------------------------' 
151        print '# package --> dependencies' 
152        print '# ------------------------' 
153        for key in self.packages.keys():
154            print key, '-->', self.packages[key] ['dependencies'],',', self.packages[key] ['status']                       
155
156    def is_work_unit_waiting (self, name):
157        return self.packages[name] ['status'] == 'waiting'
158
159    def set_work_unit_status (self, name, status):
160        self.packages[name] ['status'] = status
161
162    def get_dependencies (self, name):
163        return self.packages[name] ['dependencies']
164   
165    def get_next_work_units (self):
166        result = list ()
167        for key in self.packages.keys():
168            if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
169                result.append(key)
170        return result
171
172    def is_work_units (self):
173        result = list ()
174        for key in self.packages.keys():
175            if self.is_work_unit_waiting(key) :
176                return True
177        return False       
178
179    def suppress_work_unit (self, name):
180        print '# remove', name, 'from schedule' 
181        for key in self.packages.keys():
182            if name in self.packages[key]['dependencies']:
183                self.packages[key]['dependencies'].remove(name)
184               
185    def add_work_unit (self, name, cmd):
186        if self.is_work_unit_waiting (name):
187            # we create requests
188            arg = {'cmd': cmd , 'package':name}
189            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception) 
190            # then we put the work request in the queue...
191            self.set_work_unit_status (name, 'queued')
192            self.pool.putRequest(req)
193            #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
194
195    def execute (self, command):
196        #self.print_dependencies ()
197        packages = self.get_next_work_units()
198        if len(packages) !=0:
199            print '\n# Execute parallel actions within ', packages                     
200            for package in packages:
201                self.add_work_unit (package, command)
202
203    def execute_all(self,command):
204        self.execute (command)
205        self.wait()
206        while self.is_work_units(): 
207            self.wait()   
208       
209    def wait (self):
210       self.pool.wait()   
211
212    # this will be called each time a result is available
213    def result_callback(self, request, result):
214      #print "**Result: %s from request #%s" % (str(result), request.requestID)
215      print "# Result: %s from request #%s" % (result['package'], request.requestID)
216      self.execute (result['cmd'])
217
218    # the work the threads will have to do
219    def do_execute(self, arg):
220        path = self.get_work_area_path (arg['package'])
221        print '#--------------------------------------------------------------'
222        print '# Now trying ['+ arg['cmd']+'] in ' + path
223        print '#--------------------------------------------------------------'
224        self.set_work_unit_status (arg['package'], 'running')     
225        cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"
226        os.chdir(path)
227        cmd = arg['cmd']
228        status, output, error, pythonError  = exeCommand(cmd,  iTimeout = 10)     
229        self.suppress_work_unit (arg['package'])
230        self.set_work_unit_status (arg['package'], 'done')
231        # status, output= commands.getstatusoutput(cmd)
232        #print output
233        #if status != 0:
234        #   raise RuntimeError(output)
235        return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
236               
237    # this will be called when an exception occurs within a thread
238    def handle_exception(self, request, exc_info):
239        print "# Exception occured in request #%s: %s" % \
240          (request.requestID, exc_info[1])
241#--------- EoF --------#   
Note: See TracBrowser for help on using the repository browser.