source: tbroadcast/v2/python/tbroadcast.py@ 236

Last change on this file since 236 was 236, checked in by garonne, 19 years ago

MàJ

  • Property svn:executable set to *
File size: 7.0 KB
Line 
1#----------------------------------#
2# -- Author: V.Garonne
3# -- Date: 08/25/2006
4# -- Name: tbroadcast
5# -- Description: mainc class
6#----------------------------------#
7
8import os
9import sys
10import time
11import string
12import random
13import commands
14
15from threadpool import WorkRequest
16from threadpool import ThreadPool
17from threadpool import NoResultsPending
18from threadpool import NoWorkersAvailable
19from threadpool import makeRequests
20from executer import exeCommand
21
22class Scheduler:
23
24 def __init__(self, num_workers=20):
25 self.pool = ThreadPool(num_workers=num_workers)
26 self.packages = {}
27 self.instanciate_packages ()
28
29 def instanciate_packages(self):
30 # We create the schedule of the work units
31 cmd = 'cmt show uses'
32 status, output = commands.getstatusoutput (cmd)
33 if status != 0:
34 print output
35 sys.exit(-1)
36 lines = string.split(output, '\n')
37 current_package = self.get_current_package()
38 self.packages [current_package] = {'version': '*', 'dependencies': list(), 'status': 'waiting', 'path':os.getcwd()}
39 for line in lines:
40 if line [0] == '#' and line[:5] != "#CMT>" and line not in ['# Selection :','#']:
41 name = string.split (line)[2]
42 version = string.split (line)[3]
43 if name not in self.packages[current_package]['dependencies']:
44 self.packages[current_package]['dependencies'].append (name)
45 if not self.packages.has_key (name):
46 self.packages [name] = {'version': version, 'dependencies': list(), 'status': 'waiting', 'path':self.get_package_path(name)}
47 found = False
48 for ligne in lines:
49 if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne not in ['# Selection :','#']:
50 if found:
51 if level < string.find(ligne, 'use'):
52 if string.split (ligne)[2] not in self.packages[name]['dependencies']:
53 self.packages[name]['dependencies'].append (string.split (ligne)[2])
54 else:
55 found = False
56 if name == string.split (ligne)[2]:
57 level = string.find(ligne, 'use')
58 found = True
59
60 def get_current_package(self):
61 cmd = 'cmt show macro package'
62 status, output = commands.getstatusoutput (cmd)
63 if status != 0:
64 print output
65 sys.exit(-1)
66 lines = string.split(output, '\n')
67 for line in lines:
68 if line [0] != '#':
69 start = string.find(line,"'")
70 end = string.find(line[start+1:len(line)],"'")
71 return line [start+1:start+end+1]
72
73 def get_work_area_path (self, name):
74 return self.packages [name]['path']
75
76 def get_package_path (self, name):
77 cmd = 'cmt -use='+name+' run pwd'
78 status, output = commands.getstatusoutput (cmd)
79 if status != 0:
80 print output
81 sys.exit(-1)
82 lines = string.split(output, '\n')
83 for line in lines:
84 if line [0] != '#':
85 return line
86
87 def print_dependencies(self):
88 print '# ------------------------'
89 print '# package --> dependencies'
90 print '# ------------------------'
91 for key in self.packages.keys():
92 print key, '-->', self.packages[key] ['dependencies'],',', self.packages[key] ['status']
93
94 def is_work_unit_waiting (self, name):
95 return self.packages[name] ['status'] == 'waiting'
96
97 def set_work_unit_status (self, name, status):
98 self.packages[name] ['status'] = status
99
100 def get_dependencies (self, name):
101 return self.packages[name] ['dependencies']
102
103 def get_next_work_units (self):
104 result = list ()
105 for key in self.packages.keys():
106 if len(self.get_dependencies (key)) == 0 and self.is_work_unit_waiting(key) :
107 result.append(key)
108 return result
109
110 def suppress_work_unit (self, name):
111 print '# remove', name, 'from schedule'
112 for key in self.packages.keys():
113 if name in self.packages[key]['dependencies']:
114 self.packages[key]['dependencies'].remove(name)
115
116 def add_work_unit (self, name, cmd):
117 if self.is_work_unit_waiting (name):
118 # we create requests
119 arg = {'cmd': cmd , 'package':name}
120 req = WorkRequest(self.do_execute, [arg] , None, callback=self.result_callback, exc_callback=self.handle_exception)
121 # then we put the work request in the queue...
122 self.set_work_unit_status (name, 'queued')
123 self.pool.putRequest(req)
124 #print "# Work request #%s added on %s." % (req.requestID, str(arg['package']))
125
126 def execute (self, command):
127 self.print_dependencies ()
128 packages = self.get_next_work_units()
129 if len(packages) !=0:
130 print '\n# Execute parallel actions within ', packages
131 for package in packages:
132 self.add_work_unit (package, command)
133
134 def execute_all(self,command):
135 self.execute (command)
136 self.wait()
137
138 def wait (self):
139 self.pool.wait()
140
141 # this will be called each time a result is available
142 def result_callback(self, request, result):
143 #print "**Result: %s from request #%s" % (str(result), request.requestID)
144 print "# Result: %s from request #%s" % (result['package'], request.requestID)
145 self.execute (result['cmd'])
146
147 # the work the threads will have to do
148 def do_execute(self, arg):
149 path = self.get_work_area_path (arg['package'])
150 print '#--------------------------------------------------------------'
151 print '# Now trying ['+ arg['cmd']+'] in ' + path
152 print '#--------------------------------------------------------------'
153 self.set_work_unit_status (arg['package'], 'running')
154 cmd = "cmt -use="+ arg['package'] + " run '"+ arg['cmd'] + "'"
155 os.chdir(path)
156 cmd = arg['cmd']
157 status, output, error, pythonError = exeCommand(cmd, iTimeout = 10)
158 self.suppress_work_unit (arg['package'])
159 self.set_work_unit_status (arg['package'], 'done')
160 # status, output= commands.getstatusoutput(cmd)
161 #print output
162 #if status != 0:
163 # raise RuntimeError(output)
164 return {'output':output, 'cmd': arg['cmd'], 'package':arg['package']}
165
166 # this will be called when an exception occurs within a thread
167 def handle_exception(self, request, exc_info):
168 print "# Exception occured in request #%s: %s" % \
169 (request.requestID, exc_info[1])
170#--------- EoF --------#
Note: See TracBrowser for help on using the repository browser.