#!/usr/bin/env python """cmt_svn_checkout.py: Checkout CMT package(s) from Subversion repository. Usage: cmt_svn_checkout.py [OPTION]... PATH... Checkout PATH(s) relative to or being Subversion repository URL(s). Mandatory arguments to long options are mandatory for short options too. -r, --version-tag=REV checkout version tag REV of PATH --version-dir=DIR use DIR as version directory instead of version tag -d, --directory=DIR checkout into DIR instead of (basename of) PATH -o, --offset=OFFSET checkout PATH at OFFSET relative to repository URL --no_config disable config step upon checkout --without_version_directory do not create version directory upon PATH checkout --with_version_directory create version directory upon PATH checkout (default) --url=URL checkout PATH from repository URL --debug print lots of debugging information -h, --help display this help and exit --version output version information and exit The SVNROOT, SVNTRUNK, SVNTAGS, and SVNBRANCHES (also SVNDEVBRANCHES) environment variables specify repository URL of PATH, location of PATH trunk, tags, and branches (also devbranches) (relatively to PATH) respectively. Report bugs to . """ __version__ = '0.10.0' __date__ = 'Fri May 27 2016' __author__ = 'Grigory Rybkin' import sys import getopt import os import posixpath import os.path import urlparse # for Python 2.3 and older if sys.version_info[0] == 2 and sys.version_info[1] < 4: for p in ('svn', 'svn+ssh'): if p not in urlparse.uses_netloc: urlparse.uses_netloc.append(p) import tempfile import re import logging self = 'cmt_svn_checkout.py' # try: # from svn import client, core # except ImportError, e: # print >>sys.stderr, '%s: cannot import Subversion Python bindings: %s' \ # % (self, str(e)) # sys.exit(1) os.environ['LC_ALL'] = 'C' class Utils(object): def getstatusoutput(cmd): """Return (status, stdout + stderr) of executing cmd in a shell. A trailing line separator is removed from the output string. The exit status of the command is encoded in the format specified for wait(), when the exit status is zero (termination without errors), 0 is returned. """ p = os.popen('( %s ) 2>&1' % cmd, 'r') out = p.read() sts = p.close() if sts is None: sts = 0 if out.endswith(os.linesep): out = out[:out.rindex(os.linesep)] elif out[-1:] == '\n': out = out[:-1] return sts, out getstatusoutput = staticmethod(getstatusoutput) def getstatuserror(cmd): """Return (status, stderr) of executing cmd in a shell. On Unix, the return value is the exit status of the command is encoded in the format specified for wait(). On Windows, on command.com systems (Windows 95, 98 and ME) this is always 0; on cmd.exe systems (Windows NT, 2000 and XP) this is the exit status of the command run. """ fd, p = tempfile.mkstemp() os.close(fd) # print >> sys.stderr, 'Created file %s with fd %i' % (p, fd) # p = os.tempnam() # print >> sys.stderr, 'Created file name %s' % (p) sc = os.system('( %s ) 2>%s' % (cmd, p)) f = open(p) e = f.read() f.close() os.unlink(p) return sc, e getstatuserror = staticmethod(getstatuserror) class ClientContext(object): schemes = ('http', 'https', 'svn', 'svn+ssh', 'file') def svn_path_canonicalize(self, path): """Return a new path (or URL) like path, but transformed such that some types of path specification redundancies are removed. This involves collapsing redundant "/./" elements, removing multiple adjacent separator characters, removing trailing separator characters, and possibly other semantically inoperative transformations. Convert the scheme and hostname to lowercase.""" scheme, netloc, path, query, fragment = urlparse.urlsplit(path) scheme = scheme.lower() netloc = netloc.lower() if path.startswith('/'): b = '/' else: b = '' path = b + '/'.join([s for s in path.split('/') if s and s != '.']) return urlparse.urlunsplit((scheme, netloc, path, query, fragment)) def urljoin(self, *args): urls = [urlparse.urlsplit(arg) for arg in args] if not urls: return '' schemes = [url[0] for url in urls] schemes.reverse() for i, s in enumerate(schemes): if s and s in self.schemes: scheme = s index = i break else: scheme = '' index = len(urls) - 1 netlocs = [url[1] for url in urls] netlocs.reverse() for i, s in enumerate(netlocs[:index + 1]): if s: netloc = s index = i break else: netloc = '' path = posixpath.join(*[url[2] for url in urls][len(urls) - 1 - index:]) query = fragment = '' return urlparse.urlunsplit((scheme, netloc, path, query, fragment)) # scheme = self.last_nonempty([url[0] for url in urls if url[0] in self.schemes]) # netloc = self.last_nonempty([url[1] for url in urls]) # path = posixpath.join(*[url[2] for url in urls]) # query = self.last_nonempty([url[3] for url in urls]) # fragment = self.last_nonempty([url[4] for url in urls]) # return urlparse.urlunsplit((scheme, netloc, path, query, fragment)) def last_nonempty(self, list, default = ''): list.reverse() for i in list: if i: return i else: return default # def __init__(self): # core.svn_config_ensure(None) # self.ctx = client.create_context() # self.providers = [ # client.get_simple_provider(), # client.get_username_provider(), # client.get_ssl_server_trust_file_provider(), # client.get_ssl_client_cert_file_provider(), # client.get_ssl_client_cert_pw_file_provider() # ] # self.ctx.auth_baton = core.svn_auth_open(self.providers) # self.ctx.config = core.svn_config_get_config(None) # def __call__(self): # return self.ctx def cwd(): _gcwd = os.getcwd() try: _cwd = os.environ['PWD'] if _cwd and os.path.samefile(_cwd, _gcwd): return _cwd else: return _gcwd except (KeyError, AttributeError): return _gcwd def cd(path): new = os.path.normpath(os.path.join(cwd(), path)) os.chdir(path) _gcwd = os.getcwd() try: if os.path.samefile(new, _gcwd): os.environ['PWD'] = new else: os.environ['PWD'] = _gcwd except AttributeError: pass def error(instance, location='', file = sys.stderr): try: message = ': '.join([str(arg) for arg in instance.args]) except AttributeError: message = str(instance).rstrip() if location: location += ': ' print >> file, "%s%s" % (location, message) class CmtContext(object): def __init__(self, config = True, with_version_directory = True, cleanup = False, head_version = None): self.config = config self.with_version_directory = with_version_directory self.cleanup = cleanup if head_version: self.set_head_version(head_version) else: self.set_head_version(os.getenv('CMTHEADVERSION', '')) def set_head_version(self, version): if not version: self.head_version = '' self.head_version_tag = False elif version.startswith('!'): # strip the leading '!' # require synchronization with C++ code self.head_version = version[1:] self.head_version_tag = False else: self.head_version = version self.head_version_tag = True def eval_head_version(self, m): return str(self.head_version).replace('', '%(package)s').replace('', '%(PACKAGE)s').replace('', '%(revision)i') % \ {'package' : m.package, 'PACKAGE' : m.package.upper(), 'revision' : m.info.last_changed_rev} \ or 'HEAD' def write(self, p, version): #print >> sys.stderr, 'write:', p, version try: t = version + '\n' if os.path.exists(p): f = open(p, 'r+') b = f.tell() v = f.read() if v != t: f.seek(b) f.write(t) f.truncate() else: f = open(p, 'w') f.write(t) f.close() except IOError, e: print >> sys.stderr, e return 1 return 0 def generate(self, p): #print >> sys.stderr, 'generate:', p curdir = cwd() # cmd = 'cmt -disable_warnings' cmd = 'cmt -q' if self.with_version_directory: cmd += ' -with_version_directory' else: cmd += ' -without_version_directory' if self.cleanup: cmd += ' -cleanup' else: cmd += ' -no_cleanup' cmd += ' config' cd(p) sc = os.system(cmd) if sc != 0: sc = 1 cd(curdir) # return sc return 0 def configure(self, path, version): sc = 0 for d in ('cmt', 'mgr'): p = os.path.join(path, d) if os.path.isdir(p): if not self.with_version_directory: sc += self.write(os.path.join(p,'version.cmt'), version) elif os.path.exists(os.path.join(p,'version.cmt')): print >>sys.stderr, 'Warning: file %s normally should not be under version control. Please, consider removing' % os.path.join(d,'version.cmt') try: os.rename(os.path.join(p,'version.cmt'), os.path.join(p,'version.cmt.orig')) print >>sys.stderr, 'renamed %s -> %s' % (`os.path.join(p,'version.cmt')`, `os.path.join(p,'version.cmt.orig')`) except (IOError, os.error), e: error(e) if self.config: sc += self.generate(p) return sc return sc #print >> sys.stderr, 'Cannot configure %s ' % (path, version) class Module(object): def __init__(self, module): self.module = module self.init = False class info(object): last_changed_rev = 0 self.info = info # print >>sys.stderr, 'init module %s: last_changed_rev %i' % \ # (self.module, self.info.last_changed_rev) class Checkout(object): def __init__(self, url = None, trunk = None, tags = None, branches = None, devbranches = None, version = None, version_dir = None, directory = None, offset = '', modules = []): self.url = url self.trunk = trunk self.tags = tags self.branches = branches self.devbranches = devbranches self.version = version self.version_dir = version_dir self.directory = directory self.offset = offset self.modules = modules self.reposLayout() def reposLayout(self): if self.url is None: self.url = os.getenv('SVNROOT', '') # try: # self.url = os.environ['SVNROOT'] # except KeyError: # pass if self.trunk is None: self.trunk = os.getenv('SVNTRUNK', 'trunk') if self.tags is None: self.tags = os.getenv('SVNTAGS', 'tags') if self.branches is None: self.branches = os.getenv('SVNBRANCHES', 'branches') if self.devbranches is None: self.devbranches = os.getenv('SVNDEVBRANCHES', 'devbranches') def cmtRepos(self): self.url = 'https://svn.lal.in2p3.fr/projects/CMT' self.trunk = 'HEAD' self.tags= '.' self.branches = '.' self.devbranches = '.' def add(self, module): self.modules.append(module) def info_receiver(self, path, info, pool): self.path = path self.info = info pool.info = info def cmp(self, path1, path2, client_context): cmd = 'svn diff %s %s' % (path1, path2) # cmd = 'svn diff --summarize %s %s' % (path1, path2) sc, out = Utils.getstatusoutput(cmd) if sc != 0: return 2 if out: return 1 else: return 0 # outfile = tempfile.TemporaryFile('wb') # errfile = tempfile.TemporaryFile('wb') # outb = outfile.tell() # errb = errfile.tell() # client.diff([] # , path1 # , self.head_revision # , path2 # , self.head_revision # , True, True, False # , outfile # , errfile # , client_context()) # # position at the end of the file # outfile.seek(0, 2) # errfile.seek(0, 2) # oute = outfile.tell() # erre = errfile.tell() # if erre > errb: return 2 # elif oute > outb: return 1 # else: return 0 def trunk_tag(self, module, client_context): """Attempt to determine the tag of the module's trunk. Return the tag if its files are copied from the trunk and their last_changed_rev numbers are the same as those of the trunk files. """ trunk = posixpath.join(module.url, self.trunk) # trunk = posixpath.normpath(posixpath.join(module.url, self.trunk)) cmd = 'svn ls -vR %s' % trunk sc, out = Utils.getstatusoutput(cmd) logger.debug('%s\n%s' % (cmd, out)) if sc != 0: return None trunk_dirent = [line.split() for line in out.splitlines()] trunk_revs = dict([(line[-1], int(line[0])) for line in trunk_dirent if not line[-1].endswith(posixpath.sep)]) logger.debug('%s' % trunk_revs) curdir = posixpath.curdir + posixpath.sep for line in trunk_dirent: if line[-1] == curdir: class info(object): pass info.last_changed_rev = int(line[0]) self.info_receiver(trunk, info, module) logger.debug('last_changed_rev: %d' % info.last_changed_rev) break # cmd = 'svn info %s' % trunk # p = r'last\s+changed\s+rev:\s+(?P\d+)' # m = re.search(p, out, re.I) # if m: # class info(object): pass # info.last_changed_rev = int(m.group('rev')) # self.info_receiver(trunk, info, module) # # self.info_receiver(trunk, info, None) # # print >>sys.stderr, '%s: last_changed_rev %i' % \ # # (trunk, self.info.last_changed_rev) # # last_changed_rev = int(m.group('rev')) # else: # return None tags = posixpath.join(module.url, self.tags) # tags = posixpath.normpath(posixpath.join(module.url, self.tags)) cmd = 'svn ls -vR %s' % tags # cmd = 'svn ls -v %s' % tags sc, out = Utils.getstatusoutput(cmd) logger.debug('%s\n%s' % (cmd, out)) if sc != 0: return None tags_dirent = [line.split() for line in out.splitlines()] tags_revs = dict() for ent in tags_dirent: try: tag, path = ent[-1].split(posixpath.sep, 1) except ValueError: continue if tag not in tags_revs: tags_revs[tag] = dict() if path and not path.endswith(posixpath.sep): # assume there are no empty directories in the tag tags_revs[tag][path] = int(ent[0]) logger.debug('%s' % tags_revs) cmd = 'svn info %s' % trunk sc, out = Utils.getstatusoutput(cmd) logger.debug('%s\n%s' % (cmd, out)) if sc != 0: return None p = r'repository\s+root:\s+(?P%s://\S+)' % \ ('(?:' + '|'.join(map(re.escape, client_context.schemes)) + ')') m = re.search(p, out, re.I) logger.debug('pattern: %r' % (p)) if m: root = m.group('root') else: return None logger.debug('root: %s' % root) trunk_path = trunk[len(root):] tags_path = tags[len(root):] logger.debug('trunk_path: %s' % trunk_path) logger.debug('tags_path: %s' % tags_path) offset = len(trunk) - len(root) + len(posixpath.sep) # Usually, a tag is created as a server-side copy. # Sometimes, a tag is created as a copy of WC (working copy) # after commit. # Below, we try to take into account the latter case. cmd = 'svn log -v -q %s' % tags sc, out = Utils.getstatusoutput(cmd) logger.debug('%s\n%s' % (cmd, out)) if sc != 0: return None p = re.compile( r'^-{5,}$\s^r\d+.+$\s^Changed paths:$\s^\s+A\s+%s%s(?P[^%s]+)\s+\(from %s:\d+\)$(?P(?:\s^\s+(?:R|A|D)\s+%s%s(?P=tag)%s(?P.+)(?:\s+\(from %s%s(?P=path):\d+\))?$)*)' % (re.escape(tags_path), posixpath.sep, posixpath.sep, re.escape(trunk_path), re.escape(tags_path), posixpath.sep, posixpath.sep, re.escape(trunk_path), posixpath.sep) , re.M ) tags_copied = list() tags_replaced_revs = dict() for m in p.finditer(out): logger.debug('tag: %s replaced: %r' % (m.group('tag'), m.group('replaced'))) tags_copied.append(m.group('tag')) for line in m.group('replaced').strip().splitlines(): l = line.split() if len(l) == 2: continue # action code D - deleted paths repl = l[3].rstrip(')') i = repl.rindex(':') path = repl[offset:i] rev = int(repl[i + 1:]) logger.debug('path: %s rev: %d' % (path, rev)) if m.group('tag') not in tags_replaced_revs: tags_replaced_revs[m.group('tag')] = dict() if path and not path.endswith(posixpath.sep): # assume there are no empty directories in the tag tags_replaced_revs[m.group('tag')][path] = rev logger.debug('copied: %s' % tags_copied) logger.debug('replaced: %s' % tags_replaced_revs) for t in tags_revs.keys(): if t not in tags_copied: del tags_revs[t] logger.debug('%s: Not a trunk copy' % t) for t in tags_replaced_revs: if t in tags_revs: tags_revs[t].update(tags_replaced_revs[t]) for tag in tags_revs: logger.debug('Compare: %s -> %s' % (tag, tags_revs[tag])) if trunk_revs == tags_revs[tag]: return tag return None # tags_dirent = [line.split() for line in out.splitlines()] # rev_tag = dict([(int(line[0]), line[-1].rstrip(posixpath.sep)) for line in tags_dirent if line[-1].endswith(posixpath.sep)]) # revs = rev_tag.keys() # revs.sort() # revs.reverse() # for rev in revs: # logger.debug('rev: %s' % rev) # if rev < self.info.last_changed_rev: break # # if rev < last_changed_rev: break # tag = posixpath.join(tags, rev_tag[rev]) # # tag = posixpath.normpath(posixpath.join(tags, rev_tag[rev])) # logger.debug('comparing: %s %s(%d)' % (trunk, tag, rev)) # if 0 == self.cmp(trunk, tag, client_context): # return rev_tag[rev] # return None # try: # trunk = core.svn_path_canonicalize(posixpath.join(module.url, self.trunk)) # client.info(trunk, # self.head_revision, # self.head_revision, # self.info_receiver, # False, # client_context()) # tags = core.svn_path_canonicalize(posixpath.join(module.url, self.tags)) # tags_dirent = client.ls(tags, # self.head_revision, # False, # client_context()) # rev_tag = dict([(tags_dirent[p].created_rev, p) for p in tags_dirent if tags_dirent[p].kind == core.svn_node_dir]) # revs = rev_tag.keys() # revs.sort() # revs.reverse() # for rev in revs: # if rev < self.info.last_changed_rev: break # tag = core.svn_path_canonicalize(posixpath.join(tags, rev_tag[rev])) # if 0 == self.cmp(trunk, tag, client_context): # return rev_tag[rev] # return None # except core.SubversionException, e: # return None def initialize(self, cmt_context, client_context): sc = 0 # self.head_revision = core.svn_opt_revision_t() # self.head_revision.kind = core.svn_opt_revision_head # canonicalize: self.url = client_context.svn_path_canonicalize(self.url) self.offset = client_context.svn_path_canonicalize(self.offset) for m in self.modules: m.module = client_context.svn_path_canonicalize(m.module) m.url = client_context.urljoin(self.url, self.offset, m.module) if urlparse.urlparse(m.url)[0] not in client_context.schemes: error('%s: Not a valid Subversion URL' % m.url) sc += 1; continue # print >>sys.stderr, '%s: URL constructed from %s %s %s' % \ # (m.url, `self.url`, `self.offset`, `m.module`) m.package = posixpath.basename(m.url) if self.directory is not None: m.path = os.path.normpath(self.directory) else: scheme, netloc, path, query, fragment = urlparse.urlsplit(m.module) if not scheme and not netloc: m.path = os.path.normpath(os.path.join(*path.split(posixpath.sep))) else: m.path = posixpath.basename(m.url) if self.version is None: m.head = True if cmt_context.head_version_tag: m.version = self.trunk_tag(m, client_context) or \ cmt_context.eval_head_version(m) else: trunk = posixpath.join(m.url, self.trunk) cmd = 'svn info %s' % trunk scc, out = Utils.getstatusoutput(cmd) logger.debug('%s\n%s' % (cmd, out)) if scc != 0: logger.error('%s\n%s' % (cmd, out)) sc += 1; continue p = r'last\s+changed\s+rev:\s+(?P\d+)' M = re.search(p, out, re.I) if M: class info(object): pass info.last_changed_rev = int(M.group('rev')) self.info_receiver(trunk, info, m) logger.debug('last_changed_rev: %d' % info.last_changed_rev) else: logger.warning('%s: last_changed_rev: Not found' % trunk) m.version = cmt_context.eval_head_version(m) logger.debug('set version: %s' % m.version) else: m.head = False m.version = self.version if m.head: m.URL = [posixpath.join(m.url, self.trunk)] else: m.URL = [posixpath.join(m.url, p, m.version) for p in (self.tags, self.branches, self.devbranches)] m.URL = [client_context.svn_path_canonicalize(url) for url in m.URL] # m.URL = [core.svn_path_canonicalize(url) for url in m.URL] if cmt_context.with_version_directory: if self.version_dir is None: m.path = os.path.join(m.path, m.version) else: m.path = os.path.join(m.path, self.version_dir) m.init = True # print m.URL, m.path, m.init # for m in self.modules: # print m.url, m.path, m.init return sc def execute(self, cmt_context, client_context): sce = 0 for m in self.modules: if not m.init: continue done = False err = [] for url in m.URL: cmd = 'svn checkout %s %s' % (url, m.path) # cmd = 'svn checkout -q %s %s' % (url, m.path) sc, e = Utils.getstatuserror(cmd) # cmd = 'svn checkout -q %s %s' % (url, m.path) # sc, e = Utils.getstatusoutput(cmd) if 0 == sc: # try: # #print 'client.checkout2:', url, m.path # result_rev = client.checkout2(url, # m.path, # self.head_revision, # self.head_revision, # True, # True, # client_context()) # except core.SubversionException, e: # err.append(e) # continue done = True break else: err.append(e) continue if not done: for e in err: error(e) # #print >> sys.stderr, e # sc += 1 # print >> sys.stderr, 'Failed to checkout %s into %s.' % \ # (' or '.join(m.URL), m.path) sce += 1 # print 'Checked out revision %i.' % result_rev scc = cmt_context.configure(m.path, m.version) if scc != 0: print >> sys.stderr, \ '%s %s: configure returned %i.' % (m.path, m.version, scc) sce += scc return sce def main(argv=[__name__]): self = os.path.basename(argv[0]) try: opts, args = getopt.getopt(argv[1:], "hr:d:o:", ["help", "version", "version-tag=", "version-dir=", "directory=", "offset=", "no_config", "with_version_directory", "without_version_directory", "url=", "debug"]) except getopt.error, e: print >>sys.stderr, '%s: %s' % (self, str(e)) print >>sys.stderr, "Try '%s --help' for more information." % self return 1 global logger logging.basicConfig() logger = logging.getLogger(self) logger.setLevel(logging.INFO) if os.getenv('SVNDEBUG'): logger.setLevel(logging.DEBUG) cmt_context = CmtContext() checkout = Checkout() for o, v in opts: if o in ("-h", "--help"): print sys.modules[__name__].__doc__ return 0 elif o in ("--version",): print '%s %s (%s)' % (self, __version__, __date__) print '%sWritten by %s.' % (os.linesep, __author__) return 0 elif o in ("-r", "--version-tag"): checkout.version = v elif o in ("--version-dir",): checkout.version_dir = v elif o in ("-d", "--directory"): checkout.directory = v elif o in ("-o", "--offset"): checkout.offset = v elif o in ("--no_config",): cmt_context.config = False elif o in ("--without_version_directory",): cmt_context.with_version_directory = False elif o in ("--with_version_directory",): cmt_context.with_version_directory = True elif o in ("--url",): checkout.url = v elif o in ("--debug",): logger.setLevel(logging.DEBUG) if not args: print >>sys.stderr, '%s: missing path argument' % self print >>sys.stderr, "Try '%s --help' for more information." % self return 1 for arg in args: checkout.add(Module(arg)) client_context = ClientContext() sci = checkout.initialize(cmt_context, client_context) sce = checkout.execute(cmt_context, client_context) if sci != 0 or sce !=0: return 1 else: return 0 if __name__ == '__main__': sys.exit(main(sys.argv))