source: tbroadcast/v2/python/tbroadcast.py @ 232

Last change on this file since 232 was 232, checked in by garonne, 18 years ago

add version 2 in python

  • Property svn:executable set to *
File size: 7.0 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Date: 08/25/2006
4# -- Name: tbroadcast
5# -- Description: mainc class
6#----------------------------------#
7
8import os
9import sys
10import time
11import string
12import random
13import commands
14
15from threadpool import WorkRequest
16from threadpool import ThreadPool
17from threadpool import NoResultsPending
18from threadpool import NoWorkersAvailable
19from threadpool import  makeRequests
20from executer   import  exeCommand
21
22class Scheduler:
23
24    def __init__(self, num_worker=20):
25        self.pool     = ThreadPool(num_worker)
26        self.packages = {} 
27        self.instanciate_packages ()     
28
29    def instanciate_packages(self):
30        # We create the schedule of the work units
31        cmd = 'cmt show uses'
32        status, output = commands.getstatusoutput (cmd)
33        if status != 0:
34            print output
35            sys.exit(-1)   
36        lines = string.split(output, '\n')
37        current_package = self.get_current_package()
38        self.packages [current_package] = {'version': '*', 'dependencies': list(), 'status': 'waiting', 'path':os.getcwd()}
39        for line in lines:
40            if line [0] == '#' and line[:5] != "#CMT>" and line not in ['# Selection :','#']:
41                name    = string.split (line)[2]
42                version = string.split (line)[3]
43                if name not in self.packages[current_package]['dependencies']:
44                   self.packages[current_package]['dependencies'].append (name)               
45                if not self.packages.has_key (name):
46                   self.packages [name] = {'version': version, 'dependencies': list(), 'status': 'waiting', 'path':self.get_package_path(name)}               
47                found = False
48                for ligne in lines:
49                    if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne not in ['# Selection :','#']:               
50                       if found:
51                           if level < string.find(ligne, 'use'):
52                               if string.split (ligne)[2] not in self.packages[name]['dependencies']:
53                                   self.packages[name]['dependencies'].append (string.split (ligne)[2])
54                           else:
55                                found = False                                                                                                   
56                       if name == string.split (ligne)[2]:
57                           level = string.find(ligne, 'use')
58                           found = True   
59
60    def get_current_package(self):   
61        cmd = 'cmt show macro package'
62        status, output = commands.getstatusoutput (cmd)
63        if status != 0:
64            print output
65            sys.exit(-1)   
66        lines = string.split(output, '\n')
67        for line in lines:
68            if line [0] != '#':
69                start = string.find(line,"'")
70                end   = string.find(line[start+1:len(line)],"'")
71                return line [start+1:start+end+1]
72
73    def get_work_area_path (self, name):       
74        return self.packages [name]['path']
75       
76    def get_package_path (self, name):   
77        cmd = 'cmt -use='+name+' run pwd'
78        status, output = commands.getstatusoutput (cmd)
79        if status != 0:
80            print output
81            sys.exit(-1)   
82        lines = string.split(output, '\n')
83        for line in lines:
84            if line [0] != '#':
85                return line
86 
87    def print_dependencies(self):
88        print '# ------------------------' 
89        print '# package --> dependencies' 
90        print '# ------------------------' 
91        for key in self.packages.keys():
92            print key, '-->', self.packages[key] ['dependencies'],',', self.packages[key] ['status']                       
93
94    def is_work_unit_waiting (self, name):
95        return self.packages[name] ['status'] == 'waiting'
96
97    def set_work_unit_status (self, name, status):
98        self.packages[name] ['status'] = status
99
100    def get_dependencies (self, name):
101        return self.packages[name] ['dependencies']
102   
103    def get_next_work_units (self):
104        result = list ()
105        for key in self.packages.keys():
106            if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
107                result.append(key)
108        return result
109
110    def suppress_work_unit (self, name):
111        print '# remove', name, 'from schedule' 
112        for key in self.packages.keys():
113            if name in self.packages[key]['dependencies']:
114                self.packages[key]['dependencies'].remove(name)
115               
116    def add_work_unit (self, name, cmd):
117        if self.is_work_unit_waiting (name):
118            # we create requests
119            arg = {'cmd': cmd , 'package':name}
120            req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception) 
121            # then we put the work request in the queue...
122            self.set_work_unit_status (name, 'queued')
123            self.pool.putRequest(req)
124            #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
125
126    def execute (self, command):
127        self.print_dependencies ()
128        packages = self.get_next_work_units()
129        if len(packages) !=0:
130            print '\n# Execute parallel actions within ', packages                     
131        for package in packages:
132            self.add_work_unit (package, command)
133
134    def execute_all(self,command):
135        self.execute (command)
136        self.wait()
137       
138    def wait (self):
139       self.pool.wait()   
140
141    # this will be called each time a result is available
142    def result_callback(self, request, result):
143      #print "**Result: %s from request #%s" % (str(result), request.requestID)
144      print "# Result: %s from request #%s" % (result['package'], request.requestID)
145      self.execute (result['cmd'])
146
147    # the work the threads will have to do
148    def do_execute(self, arg):
149        path = self.get_work_area_path (arg['package'])
150        print '#--------------------------------------------------------------'
151        print '# Now trying ['+ arg['cmd']+'] in ' + path
152        print '#--------------------------------------------------------------'
153        self.set_work_unit_status (arg['package'], 'running')     
154        cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"
155        os.chdir(path)
156        cmd = arg['cmd']
157        status, output, error, pythonError  = exeCommand(cmd,  iTimeout = 10)     
158        self.suppress_work_unit (arg['package'])
159        self.set_work_unit_status (arg['package'], 'done')
160        # status, output= commands.getstatusoutput(cmd)
161        #print output
162        #if status != 0:
163        #   raise RuntimeError(output)
164        return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
165               
166    # this will be called when an exception occurs within a thread
167    def handle_exception(self, request, exc_info):
168        print "# Exception occured in request #%s: %s" % \
169          (request.requestID, exc_info[1])
170#--------- EoF --------#   
Note: See TracBrowser for help on using the repository browser.