Changeset 242 for tbroadcast


Ignore:
Timestamp:
Aug 30, 2006, 12:31:45 PM (18 years ago)
Author:
garonne
Message:

enhance severals things

Location:
tbroadcast/v2
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • tbroadcast/v2/python/tbroadcast.py

    r241 r242  
    11#----------------------------------#
    22# -- Author: V.Garonne
     3# -- Mail: garonne@lal.in2p3.fr
    34# -- Date: 08/25/2006
    45# -- Name: tbroadcast
    5 # -- Description: mainc class
     6# -- Description: main class
    67#----------------------------------#
    78
     
    3132        self.semaphore       = BoundedSemaphore(1)
    3233        self.check_cycles()
    33         self.instanciate_packages ()     
    34         self.reduce_graph ()
     34        self.instanciate_packages ()
    3535       
    3636    def get_counter(self):
     
    5757        if cycle_found:
    5858            sys.exit(-1)
    59    
    60     def instanciate_packages(self):
    61         # We create the schedule of the work units
    62         print '# First, we initialize the DAG by parsing cmt show uses'
    63         cmd = 'cmt show uses'
    64         status, output = commands.getstatusoutput (cmd)
    65         if status != 0:
    66             print output
    67             sys.exit(-1)   
    68         lines = string.split(output, '\n')
    69         self.packages [self.current_package] = {'version': '*', 'dependencies': list(), 'status': 'waiting', 'path':os.getcwd(), 'indice':1}
    70         indice  = 1
     59
     60    def format_uses (self, content):
     61        # format variables
     62        lignes  = string.split(content, '\n')
     63        lines   = list()
     64        for ligne in lignes:
     65           if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne not in ['# Selection :','#']:
     66               lines.append(ligne)
     67        lines.reverse()
     68        return lines
     69
     70    def format_paths (self, content):
     71        # format variables
     72        lignes  = string.split(content, '\n')
     73        lines   = list()
     74        for ligne in lignes:
     75            if ligne[:4] == "use ":             
     76               lines.append(ligne)
     77        return lines
     78
     79    def get_paths (self, content):
     80        lines = self.format_paths(content)
    7181        for line in lines:
    72             if line [0] == '#' and line[:5] != "#CMT>" and line not in ['# Selection :','#']:
    73                 name    = string.split (line)[2]
    74                 version = string.split (line)[3]
    75                 if name not in self.packages[self.current_package]['dependencies']:
    76                    self.packages[self.current_package]['dependencies'].append (name)               
    77                    #print '\n#', indice,':: add package', name
    78                    indice = indice + 1
    79                 if not self.packages.has_key (name):
    80                    self.packages [name] = {'version': version, 'dependencies': list(), 'status': 'waiting', 'path': None, 'indice':indice}               
    81                    indice = indice + 1
    82                    found = False
    83                    for ligne in lines:
    84                     if ligne [0] == '#' and ligne[:5] != "#CMT>" and ligne not in ['# Selection :','#']:               
    85                        if found:
    86                            if level < string.find(ligne, 'use'):
    87                                if string.split (ligne)[2] not in self.packages[name]['dependencies']:
    88                                    self.packages[name]['dependencies'].append (string.split (ligne)[2])
    89                                    #print "# add dependency", string.split (ligne)[2], ' to package ',name
    90                            else:
    91                                 found = False                                                                                                   
    92                        if name == string.split (ligne)[2]:
    93                            level = string.find(ligne, 'use')
    94                            found = True   
    95             if line[:4] == "use ":             
    9682                result  = string.split (line[4:len(line)], ' ')
    9783                if  self.packages.has_key(result[0]):
     
    133119                        sys.exit(-1)
    134120                    self.packages[result[0]]['path'] = full_path
    135         print '# Sometimes takes a certain time (should be improved asap)'
     121   
     122    def get_uses(self, content):       
     123        # initiates variables
     124        lignes = self.format_uses(content)
     125        self.packages [self.current_package] = {'version': '*', 'client': list(),'uses': list(), 'status': 'waiting', 'path': os.getcwd()}
     126        previous_client = self.current_package
     127        previous_level  = 0
     128        level_stack    = [{'name':previous_client,'level':previous_level},]
     129        ligne = lignes.pop()       
     130        while len(lignes)!=0:   
     131            current_level = string.find(ligne, 'use')
     132            while current_level > previous_level:               
     133                name    = string.split (ligne)[2]
     134                version = string.split (ligne)[3]                             
     135                if not self.packages.has_key (name):
     136                  self.packages [name] = {'version': version, 'uses': list(), 'client': list(), 'status': 'waiting', 'path': None}
     137                if name not in self.packages[previous_client]['uses']:
     138                   self.packages[previous_client]['uses'].append (name)               
     139                level_stack.append({'name':previous_client,'level':previous_level})
     140                previous_client = name 
     141                previous_level = current_level                 
     142                if len(lignes):
     143                    ligne = lignes.pop()
     144                    #print ligne
     145                    current_level = string.find(ligne, 'use')
     146                       
     147            #self.packages [previous_client]['status'] ='queued'
     148            # restore the level
     149            item = level_stack.pop()               
     150            while item['level'] >= current_level:
     151                 item = level_stack.pop()
     152            previous_client = item['name']
     153            previous_level  = item['level']
     154            #print previous_client, '-->',string.split (ligne)[2]
     155
     156    def instanciate_packages(self, file=None):
     157        # We create the schedule of the work units
     158        print '# First, we initialize the DAG by parsing cmt show uses'
     159        if file is None:
     160            cmd  = 'cmt show uses'
     161        else:   
     162            cmd = 'cat ' + file       
     163        status, output = commands.getstatusoutput (cmd)
     164        if status != 0:
     165            print output
     166            sys.exit(-1)
     167        self.get_uses(output)   
     168        self.get_paths(output)
     169        #self.check_execution (package=self.current_package)
     170        #self.simulate_execution()
     171
     172    def simulate_execution(self):
     173       ok = True
     174       indice = 1
     175       while ok:
     176           runnable  = list()
     177           checkable = list()
     178           for key in self.packages:
     179               if  self.packages[key]['status']!='done':
     180                   if len(self.packages[key]['uses']) == 0:
     181                       runnable.append(key)
     182           if len(runnable):
     183               print '\n#--------------------------------------------------------------'
     184               print "# Execute parallel actions within packages " + str(runnable)
     185           #print 'checkable:',checkable
     186           for selected in runnable:       
     187               print '#--------------------------------------------------------------'
     188               print '# ('+str(indice)+'/'+str(len(self.packages))+') Now trying [] in '+ self.packages[selected]['path']
     189               print '#--------------------------------------------------------------'
     190               self.packages[selected]['status']='done'       
     191               indice = indice + 1
     192               for key in self.packages:
     193                   if selected in self.packages[key]['uses']:
     194                       self.packages[key]['uses'].remove(selected)                               
     195                       #print 'remove', selected, 'from',key
     196           if len(runnable)==0:
     197                           ok = False       
     198               
     199    def check_execution(self, package, path=list(), cycles=list()):
     200        #print package,'-->',self.packages[package]['uses']
     201        #print path
     202        if package in path:
     203            if path[path.index(package):] not in cycles:
     204                print 'Cycles:',path[path.index(package):], package
     205                cycles = cycles + path[path.index(package):]
     206                sys.exit(-1)
     207        path.append(package)
     208        for item in self.packages[package]['uses']:
     209              self.check_execution(package=item, path=path, cycles=cycles)
     210              path.pop()       
    136211
    137212    def get_current_package(self):   
     
    169244        print '# ------------------------'
    170245        for key in self.packages.keys():
    171             print key, '-->', self.packages[key] ['dependencies'],',', self.packages[key] ['status']                       
     246            print key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                       
    172247
    173248    def print_status(self, status):
     
    178253        for key in self.packages.keys():
    179254            if self.packages[key] ['status'] == status:
    180                 print i , key, '-->', self.packages[key] ['dependencies'],',', self.packages[key] ['status']                 
     255                print i , key, '-->', self.packages[key] ['uses'],',', self.packages[key] ['status']                 
    181256                i = i + 1
    182                
    183     def reduce_graph(self):
    184         packages     = {}
    185         dependencies = list ()               
    186         dependencies.append (self.current_package)               
    187         space = ' '
    188         #print '-->', self.current_package
    189         packages[self.current_package] = list()
    190         for package in self.packages[self.current_package]['dependencies']:
    191             if package not in dependencies:
    192                 dependencies.append(package)           
    193                 packages[self.current_package].append(package)
    194                 packages[package] = list()                       
    195                 #print '  -->', package
    196                 limit = True
    197                 for paquetage in self.packages[package]['dependencies']:
    198                     if paquetage not in dependencies:                       
    199                         space = space + ' '
    200                         #print space, paquetage
    201                         dependencies.append (paquetage)
    202                         packages[package].append (paquetage)
    203             else:
    204                 packages[package] = list()                               
    205                 space = ''       
    206         for key in packages:
    207             #print key, packages[key]
    208             self.packages[key]['dependencies'] = packages[key]
    209257           
    210258    def is_work_unit_waiting (self, name):
     
    215263
    216264    def get_dependencies (self, name):
    217         return self.packages[name] ['dependencies']
     265        return self.packages[name] ['uses']
    218266   
    219267    def get_next_work_units (self):
     
    234282        #print '# remove', name, 'from schedule'
    235283        for key in self.packages.keys():
    236             if name in self.packages[key]['dependencies']:
    237                 self.packages[key]['dependencies'].remove(name)
     284            if name in self.packages[key]['uses']:
     285                self.packages[key]['uses'].remove(name)
    238286
    239287    def add_work_unit (self, name, cmd):
     
    262310        #self.print_dependencies ()
    263311        #self.print_status (status='waiting')       
    264         #print self.is_work_units()
    265312        #while self.is_work_units():
    266         #            self.wait()           
     313        #self.wait()           
    267314       
    268315    def wait (self):
  • tbroadcast/v2/scripts/tbroadcast

    r241 r242  
    11#!/usr/bin/env python
    22#----------------------------------#
    3 # -- Author: V.Garonne
     3# -- Author: V. Garonne
     4# -- Mail: garonne@lal.in2p3.fr
    45# -- Date: 08/25/2006
    56# -- Name: tbroadcast
     
    1819    print '#   <command>: command to execute'
    1920    print '# global options :'
    20     print '#   -nb=<num_worker>]                  : Total number of threads'
    21     print '#   -help]                             : Print help'
     21    print '#   -nb=<num_worker>]     : Total number of threads'
     22    print '#   -test                 : Simulate execution'
     23    print '#   -check                : Check execution (deadlocks)'
     24    print '#   -print                : Print dependency graph'   
     25    print '#   -help]                : Print help'
    2226
    2327if __name__ == '__main__':
    2428    # Default options
    25     num_worker = 20
    26     command    = ''   
     29    num_worker  = 20
     30    command     = ''
     31    test        = False
     32    check       = False
     33    print_graph = False
    2734    for arg in sys.argv[1:len(sys.argv)]:           
    2835         if arg[0]=='-':
     
    3037             if option == '-nb':
    3138                num_worker = int (string.split(arg,'=')[1])         
     39             elif option == '-test':
     40                 test = True   
     41             elif option == '-check':
     42                 check = True   
     43             elif option == '-print':
     44                 print_graph = True   
    3245             else:
    3346                usage()
    34                 sys.exit(-1)   
    35                
     47                sys.exit(-1)     
    3648         else:
    3749              command = arg
    3850
    39     master = Scheduler (num_workers=num_worker)
    40     master.execute_all (command)
    41 
     51    master = Scheduler (num_workers=num_worker)   
     52    if test:
     53        master.simulate_execution()
     54    elif check:
     55        master.check_execution (package=master.get_current_package())   
     56    elif print_graph:
     57        master.print_dependencies ()
     58    else:
     59        master.execute_all (command)
    4260    sys.exit(-1);
    43 #--------- EoF --------#   
     61#--------- EoF --------#
Note: See TracChangeset for help on using the changeset viewer.