#!/usr/bin/env python """cmt_svn_checkout.py: Checkout CMT package(s) from Subversion repository. Usage: cmt_svn_checkout.py [OPTION]... PATH... Checkout PATH(s) relative to or being Subversion repository URL(s). Mandatory arguments to long options are mandatory for short options too. -r, --version-tag=REV checkout version tag REV of PATH --version-dir=DIR use DIR as version directory instead of version tag -d, --directory=DIR checkout into DIR instead of (basename of) PATH -o, --offset=OFFSET checkout PATH at OFFSET relative to repository URL --no_config disable config step upon checkout --without_version_directory do not create version directory upon PATH checkout --with_version_directory create version directory upon PATH checkout (default) --url=URL checkout PATH from repository URL -h, --help display this help and exit --version output version information and exit The SVNROOT, SVNTRUNK, SVNTAGS, and SVNBRANCHES environment variables specify repository URL of PATH, location of PATH trunk, tags, and branches (relatively to PATH) respectively. Report bugs to . """ __version__ = '0.6.0' __date__ = 'Fri Mar 09 2012' __author__ = 'Grigory Rybkin' import sys import getopt import os import posixpath import os.path import urlparse # for Python 2.3 and older if sys.version_info[0] == 2 and sys.version_info[1] < 4: for p in ('svn', 'svn+ssh'): if p not in urlparse.uses_netloc: urlparse.uses_netloc.append(p) import tempfile import re self = 'cmt_svn_checkout.py' # try: # from svn import client, core # except ImportError, e: # print >>sys.stderr, '%s: cannot import Subversion Python bindings: %s' \ # % (self, str(e)) # sys.exit(1) os.environ['LC_ALL'] = 'C' class Utils(object): def getstatusoutput(cmd): """Return (status, stdout + stderr) of executing cmd in a shell. A trailing line separator is removed from the output string. The exit status of the command is encoded in the format specified for wait(), when the exit status is zero (termination without errors), 0 is returned. """ p = os.popen('( %s ) 2>&1' % cmd, 'r') out = p.read() sts = p.close() if sts is None: sts = 0 if out.endswith(os.linesep): out = out[:out.rindex(os.linesep)] elif out[-1:] == '\n': out = out[:-1] return sts, out getstatusoutput = staticmethod(getstatusoutput) def getstatuserror(cmd): """Return (status, stderr) of executing cmd in a shell. On Unix, the return value is the exit status of the command is encoded in the format specified for wait(). On Windows, on command.com systems (Windows 95, 98 and ME) this is always 0; on cmd.exe systems (Windows NT, 2000 and XP) this is the exit status of the command run. """ fd, p = tempfile.mkstemp() os.close(fd) # print >> sys.stderr, 'Created file %s with fd %i' % (p, fd) # p = os.tempnam() # print >> sys.stderr, 'Created file name %s' % (p) sc = os.system('( %s ) 2>%s' % (cmd, p)) f = open(p) e = f.read() f.close() os.unlink(p) return sc, e getstatuserror = staticmethod(getstatuserror) class ClientContext(object): schemes = ('http', 'https', 'svn', 'svn+ssh', 'file') def svn_path_canonicalize(self, path): """Return a new path (or URL) like path, but transformed such that some types of path specification redundancies are removed. This involves collapsing redundant "/./" elements, removing multiple adjacent separator characters, removing trailing separator characters, and possibly other semantically inoperative transformations. Convert the scheme and hostname to lowercase.""" scheme, netloc, path, query, fragment = urlparse.urlsplit(path) scheme = scheme.lower() netloc = netloc.lower() if path.startswith('/'): b = '/' else: b = '' path = b + '/'.join([s for s in path.split('/') if s and s != '.']) return urlparse.urlunsplit((scheme, netloc, path, query, fragment)) def urljoin(self, *args): urls = [urlparse.urlsplit(arg) for arg in args] if not urls: return '' schemes = [url[0] for url in urls] schemes.reverse() for i, s in enumerate(schemes): if s and s in self.schemes: scheme = s index = i break else: scheme = '' index = len(urls) - 1 netlocs = [url[1] for url in urls] netlocs.reverse() for i, s in enumerate(netlocs[:index + 1]): if s: netloc = s index = i break else: netloc = '' path = posixpath.join(*[url[2] for url in urls][len(urls) - 1 - index:]) query = fragment = '' return urlparse.urlunsplit((scheme, netloc, path, query, fragment)) # scheme = self.last_nonempty([url[0] for url in urls if url[0] in self.schemes]) # netloc = self.last_nonempty([url[1] for url in urls]) # path = posixpath.join(*[url[2] for url in urls]) # query = self.last_nonempty([url[3] for url in urls]) # fragment = self.last_nonempty([url[4] for url in urls]) # return urlparse.urlunsplit((scheme, netloc, path, query, fragment)) def last_nonempty(self, list, default = ''): list.reverse() for i in list: if i: return i else: return default # def __init__(self): # core.svn_config_ensure(None) # self.ctx = client.create_context() # self.providers = [ # client.get_simple_provider(), # client.get_username_provider(), # client.get_ssl_server_trust_file_provider(), # client.get_ssl_client_cert_file_provider(), # client.get_ssl_client_cert_pw_file_provider() # ] # self.ctx.auth_baton = core.svn_auth_open(self.providers) # self.ctx.config = core.svn_config_get_config(None) # def __call__(self): # return self.ctx def cwd(): _gcwd = os.getcwd() try: _cwd = os.environ['PWD'] if _cwd and os.path.samefile(_cwd, _gcwd): return _cwd else: return _gcwd except (KeyError, AttributeError): return _gcwd def cd(path): new = os.path.normpath(os.path.join(cwd(), path)) os.chdir(path) _gcwd = os.getcwd() try: if os.path.samefile(new, _gcwd): os.environ['PWD'] = new else: os.environ['PWD'] = _gcwd except AttributeError: pass def error(instance, location='', file = sys.stderr): try: message = ': '.join([str(arg) for arg in instance.args]) except AttributeError: message = str(instance).rstrip() if location: location += ': ' print >> file, "%s%s" % (location, message) class CmtContext(object): def __init__(self, config = True, with_version_directory = True, cleanup = False, head_version = None): self.config = config self.with_version_directory = with_version_directory self.cleanup = cleanup self.head_version = head_version self.env() def env(self): if self.head_version is None: self.set_head_version(os.getenv('CMTHEADVERSION', 'HEAD')) # print >>sys.stderr, 'env: set head_version: %s' % self.head_version def set_head_version(self, version): self.head_version = str(version).replace('', '%(package)s').replace('', '%(PACKAGE)s').replace('', '%(revision)i') def eval_head_version(self, m): return self.head_version % \ {'package' : m.package, 'PACKAGE' : m.package.upper(), 'revision' : m.info.last_changed_rev} def write(self, p, version): #print >> sys.stderr, 'write:', p, version try: t = version + '\n' if os.path.exists(p): f = open(p, 'r+') b = f.tell() v = f.read() if v != t: f.seek(b) f.write(t) f.truncate() else: f = open(p, 'w') f.write(t) f.close() except IOError, e: print >> sys.stderr, e return 1 return 0 def generate(self, p): #print >> sys.stderr, 'generate:', p curdir = cwd() # cmd = 'cmt -disable_warnings' cmd = 'cmt -q' if self.with_version_directory: cmd += ' -with_version_directory' else: cmd += ' -without_version_directory' if self.cleanup: cmd += ' -cleanup' else: cmd += ' -no_cleanup' cmd += ' config' cd(p) sc = os.system(cmd) if sc != 0: sc = 1 cd(curdir) # return sc return 0 def configure(self, path, version): sc = 0 for d in ('cmt', 'mgr'): p = os.path.join(path, d) if os.path.isdir(p): if not self.with_version_directory: sc += self.write(os.path.join(p,'version.cmt'), version) elif os.path.exists(os.path.join(p,'version.cmt')): print >>sys.stderr, 'Warning: file %s normally should not be under version control. Please, consider removing' % os.path.join(d,'version.cmt') try: os.rename(os.path.join(p,'version.cmt'), os.path.join(p,'version.cmt.orig')) print >>sys.stderr, 'renamed %s -> %s' % (`os.path.join(p,'version.cmt')`, `os.path.join(p,'version.cmt.orig')`) except (IOError, os.error), e: error(e) if self.config: sc += self.generate(p) return sc return sc #print >> sys.stderr, 'Cannot configure %s ' % (path, version) class Module(object): def __init__(self, module): self.module = module self.init = False class info(object): last_changed_rev = 0 self.info = info # print >>sys.stderr, 'init module %s: last_changed_rev %i' % \ # (self.module, self.info.last_changed_rev) class Checkout(object): def __init__(self, url = None, trunk = None, tags = None, branches = None, version = None, version_dir = None, directory = None, offset = '', modules = []): self.url = url self.trunk = trunk self.tags = tags self.branches = branches self.version = version self.version_dir = version_dir self.directory = directory self.offset = offset self.modules = modules self.reposLayout() def reposLayout(self): if self.url is None: self.url = os.getenv('SVNROOT', '') # try: # self.url = os.environ['SVNROOT'] # except KeyError: # pass if self.trunk is None: self.trunk = os.getenv('SVNTRUNK', 'trunk') if self.tags is None: self.tags = os.getenv('SVNTAGS', 'tags') if self.branches is None: self.branches = os.getenv('SVNBRANCHES', 'branches') def cmtRepos(self): self.url = 'https://svn.lal.in2p3.fr/projects/CMT' self.trunk = 'HEAD' self.tags= '.' self.branches = '.' def add(self, module): self.modules.append(module) def info_receiver(self, path, info, pool): self.path = path self.info = info pool.info = info def cmp(self, path1, path2, client_context): cmd = 'svn diff %s %s' % (path1, path2) # cmd = 'svn diff --summarize %s %s' % (path1, path2) sc, out = Utils.getstatusoutput(cmd) if sc != 0: return 2 if out: return 1 else: return 0 # outfile = tempfile.TemporaryFile('wb') # errfile = tempfile.TemporaryFile('wb') # outb = outfile.tell() # errb = errfile.tell() # client.diff([] # , path1 # , self.head_revision # , path2 # , self.head_revision # , True, True, False # , outfile # , errfile # , client_context()) # # position at the end of the file # outfile.seek(0, 2) # errfile.seek(0, 2) # oute = outfile.tell() # erre = errfile.tell() # if erre > errb: return 2 # elif oute > outb: return 1 # else: return 0 def trunk_tag(self, module, client_context): """Attempt to determine the tag of the module's trunk. Return the tag if its contents are the same as those of the trunk, and its last_changed_rev is greater than the trunk created_rev, None otherwise. """ trunk = posixpath.join(module.url, self.trunk) # trunk = posixpath.normpath(posixpath.join(module.url, self.trunk)) cmd = 'svn info %s' % trunk sc, out = Utils.getstatusoutput(cmd) if sc != 0: return None p = r'last\s+changed\s+rev:\s+(?P\d+)' m = re.search(p, out, re.I) if m: class info(object): pass info.last_changed_rev = int(m.group('rev')) self.info_receiver(trunk, info, module) # self.info_receiver(trunk, info, None) # print >>sys.stderr, '%s: last_changed_rev %i' % \ # (trunk, self.info.last_changed_rev) # last_changed_rev = int(m.group('rev')) else: return None tags = posixpath.join(module.url, self.tags) # tags = posixpath.normpath(posixpath.join(module.url, self.tags)) cmd = 'svn ls -v %s' % tags sc, out = Utils.getstatusoutput(cmd) if sc != 0: return None tags_dirent = [line.split() for line in out.splitlines()] rev_tag = dict([(int(line[0]), line[-1].rstrip(posixpath.sep)) for line in tags_dirent if line[-1].endswith(posixpath.sep)]) revs = rev_tag.keys() revs.sort() revs.reverse() for rev in revs: if rev < self.info.last_changed_rev: break # if rev < last_changed_rev: break tag = posixpath.join(tags, rev_tag[rev]) # tag = posixpath.normpath(posixpath.join(tags, rev_tag[rev])) if 0 == self.cmp(trunk, tag, client_context): return rev_tag[rev] return None # try: # trunk = core.svn_path_canonicalize(posixpath.join(module.url, self.trunk)) # client.info(trunk, # self.head_revision, # self.head_revision, # self.info_receiver, # False, # client_context()) # tags = core.svn_path_canonicalize(posixpath.join(module.url, self.tags)) # tags_dirent = client.ls(tags, # self.head_revision, # False, # client_context()) # rev_tag = dict([(tags_dirent[p].created_rev, p) for p in tags_dirent if tags_dirent[p].kind == core.svn_node_dir]) # revs = rev_tag.keys() # revs.sort() # revs.reverse() # for rev in revs: # if rev < self.info.last_changed_rev: break # tag = core.svn_path_canonicalize(posixpath.join(tags, rev_tag[rev])) # if 0 == self.cmp(trunk, tag, client_context): # return rev_tag[rev] # return None # except core.SubversionException, e: # return None def initialize(self, cmt_context, client_context): sc = 0 # self.head_revision = core.svn_opt_revision_t() # self.head_revision.kind = core.svn_opt_revision_head # canonicalize: self.url = client_context.svn_path_canonicalize(self.url) self.offset = client_context.svn_path_canonicalize(self.offset) for m in self.modules: m.module = client_context.svn_path_canonicalize(m.module) m.url = client_context.urljoin(self.url, self.offset, m.module) if urlparse.urlparse(m.url)[0] not in client_context.schemes: error('%s: Not a valid Subversion URL' % m.url) sc += 1; continue # print >>sys.stderr, '%s: URL constructed from %s %s %s' % \ # (m.url, `self.url`, `self.offset`, `m.module`) m.package = posixpath.basename(m.url) if self.directory is not None: m.path = os.path.normpath(self.directory) else: scheme, netloc, path, query, fragment = urlparse.urlsplit(m.module) if not scheme and not netloc: m.path = os.path.normpath(os.path.join(*path.split(posixpath.sep))) else: m.path = posixpath.basename(m.url) if self.version is None: m.head = True m.version = self.trunk_tag(m, client_context) or \ cmt_context.eval_head_version(m) # print >>sys.stderr, 'set version: %s' % m.version else: m.head = False m.version = self.version if m.head: m.URL = [posixpath.join(m.url, self.trunk)] else: m.URL = [posixpath.join(m.url, p, m.version) for p in (self.tags, self.branches)] m.URL = [client_context.svn_path_canonicalize(url) for url in m.URL] # m.URL = [core.svn_path_canonicalize(url) for url in m.URL] if cmt_context.with_version_directory: if self.version_dir is None: m.path = os.path.join(m.path, m.version) else: m.path = os.path.join(m.path, self.version_dir) m.init = True # print m.URL, m.path, m.init # for m in self.modules: # print m.url, m.path, m.init return sc def execute(self, cmt_context, client_context): sce = 0 for m in self.modules: if not m.init: continue done = False err = [] for url in m.URL: cmd = 'svn checkout %s %s' % (url, m.path) # cmd = 'svn checkout -q %s %s' % (url, m.path) sc, e = Utils.getstatuserror(cmd) # cmd = 'svn checkout -q %s %s' % (url, m.path) # sc, e = Utils.getstatusoutput(cmd) if 0 == sc: # try: # #print 'client.checkout2:', url, m.path # result_rev = client.checkout2(url, # m.path, # self.head_revision, # self.head_revision, # True, # True, # client_context()) # except core.SubversionException, e: # err.append(e) # continue done = True break else: err.append(e) continue if not done: for e in err: error(e) # #print >> sys.stderr, e # sc += 1 # print >> sys.stderr, 'Failed to checkout %s into %s.' % \ # (' or '.join(m.URL), m.path) sce += 1 # print 'Checked out revision %i.' % result_rev scc = cmt_context.configure(m.path, m.version) if scc != 0: print >> sys.stderr, \ '%s %s: configure returned %i.' % (m.path, m.version, scc) sce += scc return sce def main(argv=[__name__]): self = os.path.basename(argv[0]) try: opts, args = getopt.getopt(argv[1:], "hr:d:o:", ["help", "version", "version-tag=", "version-dir=", "directory=", "offset=", "no_config", "with_version_directory", "without_version_directory", "url="]) except getopt.error, e: print >>sys.stderr, '%s: %s' % (self, str(e)) print >>sys.stderr, "Try '%s --help' for more information." % self return 1 cmt_context = CmtContext() checkout = Checkout() for o, v in opts: if o in ("-h", "--help"): print sys.modules[__name__].__doc__ return 0 elif o in ("--version",): print '%s %s (%s)' % (self, __version__, __date__) print '%sWritten by %s.' % (os.linesep, __author__) return 0 elif o in ("-r", "--version-tag"): checkout.version = v elif o in ("--version-dir",): checkout.version_dir = v elif o in ("-d", "--directory"): checkout.directory = v elif o in ("-o", "--offset"): checkout.offset = v elif o in ("--no_config",): cmt_context.config = False elif o in ("--without_version_directory",): cmt_context.with_version_directory = False elif o in ("--with_version_directory",): cmt_context.with_version_directory = True elif o in ("--url",): checkout.url = v if not args: print >>sys.stderr, '%s: missing path argument' % self print >>sys.stderr, "Try '%s --help' for more information." % self return 1 for arg in args: checkout.add(Module(arg)) client_context = ClientContext() sci = checkout.initialize(cmt_context, client_context) sce = checkout.execute(cmt_context, client_context) if sci != 0 or sce !=0: return 1 else: return 0 if __name__ == '__main__': sys.exit(main(sys.argv))