source: tbroadcast/v2/python/tbroadcast.py @ 241

Last change on this file since 241 was 241, checked in by garonne, 18 years ago

add semaphores

  • Property svn:executable set to *
File size: 13.8 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Date: 08/25/2006
4# -- Name: tbroadcast
5# -- Description: mainc class
6#----------------------------------#
7
8import os
9import sys
10import time
11import string
12import random
13import commands
14import traceback
15from threading import BoundedSemaphore
16
17from threadpool import WorkRequest
18from threadpool import ThreadPool
19from threadpool import NoResultsPending
20from threadpool import NoWorkersAvailable
21from threadpool import  makeRequests
22from executer   import  exeCommand
23
24class Scheduler:
25
26    def __init__(self, num_workers=20):
27        self.pool            = ThreadPool(num_workers=num_workers)
28        self.current_package = self.get_current_package()
29        self.packages        = {} 
30        self.counter         = 0
31        self.semaphore       = BoundedSemaphore(1)
32        self.check_cycles()
33        self.instanciate_packages ()     
34        self.reduce_graph ()
35       
36    def get_counter(self):
37        self.semaphore.acquire ()       
38        self.counter = self.counter + 1
39        value = self.counter
40        self.semaphore.release()
41        return value
42       
43    def check_cycles (self):
44        cmd = 'cmt show cycles'
45        cycle_found = False
46        status, output = commands.getstatusoutput (cmd)
47        if status != 0:
48            print output
49            sys.exit(-1)   
50        lines = string.split(output, '\n')
51        for line in lines:           
52            if line!='' and line [0] != '#':           
53                if not cycle_found:
54                    cycle_found = True
55                    print "# Error: cycles found, not possible to execute broadcast with threads. See the followings packages:"
56                print line
57        if cycle_found:
58            sys.exit(-1)
59   
60    def instanciate_packages(self):
61        # We create the schedule of the work units
62        print '# First, we initialize the DAG by parsing cmt show uses'
63        cmd = 'cmt show uses'
64        status, output = commands.getstatusoutput (cmd)
65        if status != 0:
66            print output
67            sys.exit(-1)   
68        lines = string.split(output, '\n')
69        self.packages [self.current_package] = {'version': '*', 'dependencies': list(), 'status': 'waiting', 'path':os.getcwd(), 'indice':1}
70        indice  = 1
71        for line in lines:
72            if line [0] == '#' and line[:5] != "#CMT>" and line not in ['# Selection :','#']:
73                name    = string.split (line)[2]
74                version = string.split (line)[3]
75                if name not in self.packages[self.current_package]['dependencies']:
76                   self.packages[self.current_package]['dependencies'].append (name)               
77                   #print '\n#', indice,':: add package', name
78                   indice = indice + 1
79                if not self.packages.has_key (name):
80                   self.packages [name] = {'version': version, 'dependencies': list(), 'status': 'waiting', 'path': None, 'indice':indice}               
81                   indice = indice + 1
82                   found = False
83                   for ligne in lines:
84                    if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne not in ['# Selection :','#']:               
85                       if found:
86                           if level < string.find(ligne, 'use'):
87                               if string.split (ligne)[2] not in self.packages[name]['dependencies']:
88                                   self.packages[name]['dependencies'].append (string.split (ligne)[2])
89                                   #print "# add dependency", string.split (ligne)[2], ' to package ',name
90                           else:
91                                found = False                                                                                                   
92                       if name == string.split (ligne)[2]:
93                           level = string.find(ligne, 'use')
94                           found = True   
95            if line[:4] == "use ":             
96                result  = string.split (line[4:len(line)], ' ')
97                if  self.packages.has_key(result[0]):
98                    if len(result)==4:
99                        name, version, offset, path = string.split (line[4:len(line)], " ")
100                        #print name, version, offset, path
101                        #print path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
102                        if path == '(no_auto_imports)':
103                            path   = offset
104                            offset = ''
105                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
106                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
107                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
108                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
109                        else:
110                            print '# error path not found for', name
111                            sys.exit(-1)   
112                    elif len(result)==5:
113                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")                                       
114                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
115                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
116                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
117                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
118                        else:
119                            print '# error path not found for', name
120                            sys.exit(-1)                                                                                                   
121                    elif len(result)==3:
122                        name, version, path = string.split (line[4:len(line)], " ")
123                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
124                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
125                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):   
126                            full_path = path[1:-1] + '/' +name + + '/cmt'
127                        else:
128                            print '# error path not found for', name
129                            sys.exit(-1)
130                    else:
131                        print "error:",line
132                        print str(result)
133                        sys.exit(-1) 
134                    self.packages[result[0]]['path'] = full_path
135        print '# Sometimes takes a certain time (should be improved asap)'
136
137    def get_current_package(self):   
138        cmd = 'cmt show macro package'
139        status, output = commands.getstatusoutput (cmd)
140        if status != 0:
141            print output
142            sys.exit(-1)   
143        lines = string.split(output, '\n')
144        for line in lines:
145            if line [0] != '#':
146                start = string.find(line,"'")
147                end   = string.find(line[start+1:len(line)],"'")
148                return line [start+1:start+end+1]
149
150    def get_work_area_path (self, name):       
151        return self.packages [name]['path']
152       
153    def get_package_path (self, name):   
154        #return os.getcwd ()
155        cmd = 'cmt -use='+name+' run pwd'
156        status, output = commands.getstatusoutput (cmd)
157        if status != 0:
158            print output
159            sys.exit(-1)   
160        lines = string.split(output, '\n')
161        for line in lines:
162            if line [0] != '#' and line[:5] != "#CMT>":
163                print line
164                return line
165 
166    def print_dependencies(self):
167        print '# ------------------------' 
168        print '# package --> dependencies' 
169        print '# ------------------------' 
170        for key in self.packages.keys():
171            print key, '-->', self.packages[key] ['dependencies'],',', self.packages[key] ['status']                       
172
173    def print_status(self, status):
174        print '# ------------------------' 
175        print '# package --> dependencies' 
176        print '# ------------------------' 
177        i = 1
178        for key in self.packages.keys():
179            if self.packages[key] ['status'] == status:
180                print i , key, '-->', self.packages[key] ['dependencies'],',', self.packages[key] ['status']                 
181                i = i + 1
182               
183    def reduce_graph(self):
184        packages     = {}
185        dependencies = list ()               
186        dependencies.append (self.current_package)               
187        space = ' '
188        #print '-->', self.current_package
189        packages[self.current_package] = list()
190        for package in self.packages[self.current_package]['dependencies']:
191            if package not in dependencies:
192                dependencies.append(package)           
193                packages[self.current_package].append(package)
194                packages[package] = list()                       
195                #print '  -->', package
196                limit = True
197                for paquetage in self.packages[package]['dependencies']:
198                    if paquetage not in dependencies:                       
199                        space = space + ' '
200                        #print space, paquetage
201                        dependencies.append (paquetage)
202                        packages[package].append (paquetage)
203            else:
204                packages[package] = list()                               
205                space = ''       
206        for key in packages:
207            #print key, packages[key]
208            self.packages[key]['dependencies'] = packages[key]
209           
210    def is_work_unit_waiting (self, name):
211        return self.packages[name] ['status'] == 'waiting'
212
213    def set_work_unit_status (self, name, status):
214        self.packages[name] ['status'] = status
215
216    def get_dependencies (self, name):
217        return self.packages[name] ['dependencies']
218   
219    def get_next_work_units (self):
220        result = list ()
221        for key in self.packages.keys():
222            if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
223                result.append(key)
224        return result
225
226    def is_work_units (self):
227        result = list ()
228        for key in self.packages.keys():
229            if self.is_work_unit_waiting(key) :
230                return True
231        return False       
232
233    def suppress_work_unit (self, name):
234        #print '# remove', name, 'from schedule'
235        for key in self.packages.keys():
236            if name in self.packages[key]['dependencies']:
237                self.packages[key]['dependencies'].remove(name)
238
239    def add_work_unit (self, name, cmd):
240        if self.is_work_unit_waiting (name):
241            # we create requests
242            arg = {'cmd': cmd , 'package':name}
243            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception) 
244            # then we put the work request in the queue...
245            self.set_work_unit_status (name, 'queued')
246            self.pool.putRequest(req)
247            #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
248
249    def execute (self, command):
250        #self.print_dependencies ()
251        packages = self.get_next_work_units()
252        if len(packages) !=0:
253            print '\n#--------------------------------------------------------------'   
254            print '# Execute parallel actions within packages', packages                     
255            for package in packages:
256                self.add_work_unit (package, command)
257
258    def execute_all(self,command):
259        #self.print_dependencies ()
260        self.execute (command)
261        self.wait()
262        #self.print_dependencies ()
263        #self.print_status (status='waiting')       
264        #print self.is_work_units()
265        #while self.is_work_units():
266        #            self.wait()           
267       
268    def wait (self):
269       self.pool.wait()   
270
271    # this will be called each time a result is available
272    def result_callback(self, request, result):
273      #print "**Result: %s from request #%s" % (str(result), request.requestID)
274      #print "# Result: %s from request #%s" % (result['package'], request.requestID)
275      self.execute (result['cmd'])
276
277    # the work the threads will have to do
278    def do_execute(self, arg):
279      path = self.get_work_area_path (arg['package'])
280      if path == None:
281          raise RuntimeError('Path to package '+ arg['package'] +' not found')
282      self.set_work_unit_status (arg['package'], 'running')     
283      cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"
284      os.chdir(path)       
285      print '#--------------------------------------------------------------'
286      print '# ('+str(self.get_counter())+'/'+str(len(self.packages))+') Now trying ['+ arg['cmd']+'] in ' + path
287      print '#--------------------------------------------------------------'
288      cmd = arg['cmd']
289      status, output, error, pythonError  = exeCommand(cmd,  iTimeout = 10)     
290      self.suppress_work_unit (arg['package'])
291      self.set_work_unit_status (arg['package'], 'done')
292      # status, output= commands.getstatusoutput(cmd)
293      #if status != 0:
294      #   raise RuntimeError(output)
295      return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
296             
297    # this will be called when an exception occurs within a thread
298    def handle_exception(self, request, exc_info):
299        #traceback.print_stack()
300      print '#--------------------------------------------------------------'       
301      #print "# Exception occured in request #%s: %s" %(request.requestID, exc_info[1])
302      print "# Exception occured: %s" %(exc_info[1])
303      print '#--------------------------------------------------------------'   
304      sys.exit(-1)
305#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.