source: tbroadcast/v2/python/tbroadcast.py @ 237

Last change on this file since 237 was 237, checked in by garonne, 18 years ago

corrige initialisation

  • Property svn:executable set to *
File size: 9.9 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Date: 08/25/2006
4# -- Name: tbroadcast
5# -- Description: mainc class
6#----------------------------------#
7
8import os
9import sys
10import time
11import string
12import random
13import commands
14
15from threadpool import WorkRequest
16from threadpool import ThreadPool
17from threadpool import NoResultsPending
18from threadpool import NoWorkersAvailable
19from threadpool import  makeRequests
20from executer   import  exeCommand
21
22class Scheduler:
23
24    def __init__(self, num_workers=20):
25        self.pool     = ThreadPool(num_workers=num_workers)
26        self.packages = {} 
27        self.instanciate_packages ()     
28
29    def instanciate_packages(self):
30        # We create the schedule of the work units
31        print '# first, we initialize the DAG (takes a long time... huk should be improved asap) '
32        cmd = 'cmt show uses'
33        status, output = commands.getstatusoutput (cmd)
34        if status != 0:
35            print output
36            sys.exit(-1)   
37        lines = string.split(output, '\n')
38        current_package = self.get_current_package()
39        self.packages [current_package] = {'version': '*', 'dependencies': list(), 'status': 'waiting', 'path':os.getcwd()}
40        indice  = 1
41        for line in lines:
42            if line [0] == '#' and line[:5] != "#CMT>" and line not in ['# Selection :','#']:
43                name    = string.split (line)[2]
44                version = string.split (line)[3]
45                if name not in self.packages[current_package]['dependencies']:
46                   self.packages[current_package]['dependencies'].append (name)               
47                   print '\n#', indice,':: add package', name
48                   indice = indice + 1
49                if not self.packages.has_key (name):
50                   self.packages [name] = {'version': version, 'dependencies': list(), 'status': 'waiting', 'path': None}               
51                   found = False
52                   for ligne in lines:
53                    if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne not in ['# Selection :','#']:               
54                       if found:
55                           if level < string.find(ligne, 'use'):
56                               if string.split (ligne)[2] not in self.packages[name]['dependencies']:
57                                   self.packages[name]['dependencies'].append (string.split (ligne)[2])
58                                   print "# add dependency", string.split (ligne)[2], ' to package ',name
59                           else:
60                                found = False                                                                                                   
61                       if name == string.split (ligne)[2]:
62                           level = string.find(ligne, 'use')
63                           found = True   
64            if line[:4] == "use ":             
65                result  = string.split (line[4:len(line)], ' ')
66                if  self.packages.has_key(result[0]):
67                    if len(result)==4:
68                        name, version, offset, path = string.split (line[4:len(line)], " ")
69                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
70                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
71                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
72                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
73                        else:
74                            print '# error path not found for', name
75                            sys.exit(-1)   
76                    elif len(result)==5:
77                        name, version, offset, path, importation = string.split (line[4:len(line)], " ")                                       
78                        if os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'):
79                            full_path = path[1:-1] + '/' + offset + '/' +name + '/' + version + '/cmt'
80                        elif os.path.exists(path[1:-1] + '/' + offset + '/' +name + '/cmt'):   
81                            full_path = path[1:-1] + '/' + offset + '/' +name + '/cmt'
82                        else:
83                            print '# error path not found for', name
84                            sys.exit(-1)                                                                                                   
85                    elif len(result)==3:
86                        name, version, path = string.split (line[4:len(line)], " ")
87                        if os.path.exists(path[1:-1] + '/' +name + '/' + version + '/cmt'):
88                            full_path = path[1:-1] + '/' +name + '/' + version + '/cmt'
89                        elif os.path.exists(path[1:-1] + '/' +name + + '/cmt'):   
90                            full_path = path[1:-1] + '/' +name + + '/cmt'
91                        else:
92                            print '# error path not found for', name
93                            sys.exit(-1)
94                    else:
95                        print "error:",line
96                        print str(result)
97                        sys.exit(-1) 
98                    self.packages[result[0]]['path'] = full_path
99        print '# really takes a long time... '
100
101    def get_current_package(self):   
102        cmd = 'cmt show macro package'
103        status, output = commands.getstatusoutput (cmd)
104        if status != 0:
105            print output
106            sys.exit(-1)   
107        lines = string.split(output, '\n')
108        for line in lines:
109            if line [0] != '#':
110                start = string.find(line,"'")
111                end   = string.find(line[start+1:len(line)],"'")
112                return line [start+1:start+end+1]
113
114    def get_work_area_path (self, name):       
115        return self.packages [name]['path']
116       
117    def get_package_path (self, name):   
118        #return os.getcwd ()
119        cmd = 'cmt -use='+name+' run pwd'
120        status, output = commands.getstatusoutput (cmd)
121        if status != 0:
122            print output
123            sys.exit(-1)   
124        lines = string.split(output, '\n')
125        for line in lines:
126            if line [0] != '#' and line[:5] != "#CMT>":
127                print line
128                return line
129 
130    def print_dependencies(self):
131        print '# ------------------------' 
132        print '# package --> dependencies' 
133        print '# ------------------------' 
134        for key in self.packages.keys():
135            print key, '-->', self.packages[key] ['dependencies'],',', self.packages[key] ['status']                       
136
137    def is_work_unit_waiting (self, name):
138        return self.packages[name] ['status'] == 'waiting'
139
140    def set_work_unit_status (self, name, status):
141        self.packages[name] ['status'] = status
142
143    def get_dependencies (self, name):
144        return self.packages[name] ['dependencies']
145   
146    def get_next_work_units (self):
147        result = list ()
148        for key in self.packages.keys():
149            if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
150                result.append(key)
151        return result
152
153    def suppress_work_unit (self, name):
154        print '# remove', name, 'from schedule' 
155        for key in self.packages.keys():
156            if name in self.packages[key]['dependencies']:
157                self.packages[key]['dependencies'].remove(name)
158               
159    def add_work_unit (self, name, cmd):
160        if self.is_work_unit_waiting (name):
161            # we create requests
162            arg = {'cmd': cmd , 'package':name}
163            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception) 
164            # then we put the work request in the queue...
165            self.set_work_unit_status (name, 'queued')
166            self.pool.putRequest(req)
167            #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
168
169    def execute (self, command):
170        self.print_dependencies ()
171        packages = self.get_next_work_units()
172        if len(packages) !=0:
173            print '\n# Execute parallel actions within ', packages                     
174        for package in packages:
175            self.add_work_unit (package, command)
176
177    def execute_all(self,command):
178        self.execute (command)
179        self.wait()
180       
181    def wait (self):
182       self.pool.wait()   
183
184    # this will be called each time a result is available
185    def result_callback(self, request, result):
186      #print "**Result: %s from request #%s" % (str(result), request.requestID)
187      print "# Result: %s from request #%s" % (result['package'], request.requestID)
188      self.execute (result['cmd'])
189
190    # the work the threads will have to do
191    def do_execute(self, arg):
192        path = self.get_work_area_path (arg['package'])
193        print '#--------------------------------------------------------------'
194        print '# Now trying ['+ arg['cmd']+'] in ' + path
195        print '#--------------------------------------------------------------'
196        self.set_work_unit_status (arg['package'], 'running')     
197        cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"
198        os.chdir(path)
199        cmd = arg['cmd']
200        status, output, error, pythonError  = exeCommand(cmd,  iTimeout = 10)     
201        self.suppress_work_unit (arg['package'])
202        self.set_work_unit_status (arg['package'], 'done')
203        # status, output= commands.getstatusoutput(cmd)
204        #print output
205        #if status != 0:
206        #   raise RuntimeError(output)
207        return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
208               
209    # this will be called when an exception occurs within a thread
210    def handle_exception(self, request, exc_info):
211        print "# Exception occured in request #%s: %s" % \
212          (request.requestID, exc_info[1])
213#--------- EoF --------#   
Note: See TracBrowser for help on using the repository browser.