diff options
author | Guido Günther <agx@sigxcpu.org> | 2011-11-14 19:54:12 +0100 |
---|---|---|
committer | Guido Günther <agx@sigxcpu.org> | 2011-11-20 14:24:24 +0100 |
commit | 917a496fd8ffc7e857571ca747df01051ef12e35 (patch) | |
tree | b724bf364f559f3206e2409fa4b2494392ffcdb3 /gbp/git | |
parent | 8c90aac38b82262449b407062b15509703e7f876 (diff) |
Move git code into submodule
Diffstat (limited to 'gbp/git')
-rw-r--r-- | gbp/git/__init__.py | 1391 |
1 files changed, 1391 insertions, 0 deletions
diff --git a/gbp/git/__init__.py b/gbp/git/__init__.py new file mode 100644 index 00000000..9b461d1c --- /dev/null +++ b/gbp/git/__init__.py @@ -0,0 +1,1391 @@ +# vim: set fileencoding=utf-8 : +# +# (C) 2006,2007,2008,2011 Guido Guenther <agx@sigxcpu.org> +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +"""Git repository class and helpers""" + +import re +import subprocess +import os.path +from gbp.command_wrappers import (GitCommand, copy_from) +from gbp.errors import GbpError +import gbp.log as log +import dateutil.parser +import calendar + +class GitRepositoryError(Exception): + """Exception thrown by L{GitRepository}""" + pass + + +class GitModifier(object): + """Stores authorship/comitter information""" + def __init__(self, name=None, email=None, date=None): + self.name = name + self.email = email + self.date = date + + def _get_env(self, who): + """Get author or comitter information as env var dictionary""" + who = who.upper() + if who not in ['AUTHOR', 'COMMITTER']: + raise GitRepository("Neither comitter nor author") + + extra_env = {} + if self.name: + extra_env['GIT_%s_NAME' % who] = self.name + if self.email: + extra_env['GIT_%s_EMAIL' % who] = self.email + if self.date: + extra_env['GIT_%s_DATE' % who] = self.date + return extra_env + + def get_author_env(self): + """ + Get env vars for authorship information + + >>> g = GitModifier("foo", "bar") + >>> g.get_author_env() + {'GIT_AUTHOR_EMAIL': 'bar', 'GIT_AUTHOR_NAME': 'foo'} + + @return: Author information suitable to use as environment variables + @rtype: C{dict} + """ + return self._get_env('author') + + def get_committer_env(self): + """ + Get env vars for comitter information + + >>> g = GitModifier("foo", "bar") + >>> g.get_committer_env() + {'GIT_COMMITTER_NAME': 'foo', 'GIT_COMMITTER_EMAIL': 'bar'} + + @return: Commiter information suitable to use as environment variables + @rtype: C{dict} + """ + return self._get_env('committer') + + +class GitCommit(object): + """A git commit""" + sha1_re = re.compile(r'[0-9a-f]{40}$') + + @staticmethod + def is_sha1(value): + """ + Is I{value} a valid 40 digit SHA1? + + >>> GitCommit.is_sha1('asdf') + False + >>> GitCommit.is_sha1('deadbeef') + False + >>> GitCommit.is_sha1('17975594b2d42f2a3d144a9678fdf2c2c1dd96a0') + True + >>> GitCommit.is_sha1('17975594b2d42f2a3d144a9678fdf2c2c1dd96a0toolong') + False + + @param value: the value to check + @type value: C{str} + @return: C{True} if I{value} is a 40 digit SHA1, C{False} otherwise. + @rtype: C{bool} + """ + return True if GitCommit.sha1_re.match(value) else False + + +class GitRepository(object): + """ + Represents a git repository at I{path}. It's currently assumed that the git + repository is stored in a directory named I{.git/} below I{path}. + + @ivar _path: The path to the working tree + @type _path: C{str} + @ivar _bare: Whether this is a bare repository + @type _bare: C{bool} + """ + + def _check_bare(self): + """Check whether this is a bare repository""" + out, ret = self.__git_getoutput('rev-parse', ['--is-bare-repository']) + if ret: + raise GitRepositoryError( + "Failed to get repository state at '%s'" % self.path) + self._bare = False if out[0].strip() != 'true' else True + self._git_dir = '' if self._bare else '.git' + + def __init__(self, path): + self._path = os.path.abspath(path) + self._bare = False + try: + out, ret = self.__git_getoutput('rev-parse', ['--show-cdup']) + if ret or out not in [ ['\n'], [] ]: + raise GitRepositoryError("No git repo at '%s'" % self.path) + except GitRepositoryError: + raise # We already have a useful error message + except: + raise GitRepositoryError("No git repo at '%s'" % self.path) + self._check_bare() + + def __build_env(self, extra_env): + """Prepare environment for subprocess calls""" + env = None + if extra_env is not None: + env = os.environ.copy() + env.update(extra_env) + return env + + def __git_getoutput(self, command, args=[], extra_env=None, cwd=None): + """ + Run a git command and return the output + + @param command: git command to run + @type command: C{str} + @param args: list of arguments + @type args: C{list} + @param extra_env: extra environment variables to pass + @type extra_env: C{dict} + @param cwd: directory to swith to when running the command, defaults to I{self.path} + @type cwd: C{str} + @return: stdout, return code + @rtype: C{tuple} + """ + output = [] + + if not cwd: + cwd = self.path + + env = self.__build_env(extra_env) + cmd = ['git', command] + args + log.debug(cmd) + popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, env=env, cwd=cwd) + while popen.poll() == None: + output += popen.stdout.readlines() + output += popen.stdout.readlines() + return output, popen.returncode + + def __git_inout(self, command, args, input, extra_env=None): + """ + Run a git command with input and return output + + @param command: git command to run + @type command: C{str} + @param input: input to pipe to command + @type input: C{str} + @param args: list of arguments + @type args: C{list} + @param extra_env: extra environment variables to pass + @type extra_env: C{dict} + @return: stdout, stderr, return code + @rtype: C{tuple} + """ + env = self.__build_env(extra_env) + cmd = ['git', command] + args + log.debug(cmd) + popen = subprocess.Popen(cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + env=env, + cwd=self.path) + (stdout, stderr) = popen.communicate(input) + return stdout, stderr, popen.returncode + + def _git_command(self, command, args=[], extra_env=None): + """ + Execute git command with arguments args and environment env + at path. + + @param command: git command + @type command: C{str} + @param args: command line arguments + @type args: C{list} + @param extra_env: extra environment variables to set when running command + @type extra_env: C{dict} + """ + GitCommand(command, args, extra_env=extra_env, cwd=self.path)() + + @property + def path(self): + """The absolute path to the repository""" + return self._path + + @property + def git_dir(self): + """The absolute path to git's metadata""" + return os.path.join(self.path, self._git_dir) + + @property + def bare(self): + """Wheter this is a bare repository""" + return self._bare + + @property + def tags(self): + """List of all tags in the repository""" + return self.get_tags() + + @property + def branch(self): + """The currently checked out branch""" + try: + return self.get_branch() + except GitRepositoryError: + return None + + @property + def head(self): + """return the SHA1 of the current HEAD""" + return self.rev_parse('HEAD') + +#{ Branches and Merging + def create_branch(self, branch, rev=None): + """ + Create a new branch + + @param branch: the branch's name + @param rev: where to start the branch from + + If rev is None the branch starts form the current HEAD. + """ + args = [ branch ] + args += [ rev ] if rev else [] + + self._git_command("branch", args) + + def delete_branch(self, branch, remote=False): + """ + Delete branch I{branch} + + @param branch: name of the branch to delete + @type branch: C{str} + @param remote: delete a remote branch + @param remote: C{bool} + """ + args = [ "-D" ] + args += [ "-r" ] if remote else [] + + if self.branch != branch: + self._git_command("branch", args + [branch]) + else: + raise GitRepositoryError, "Can't delete the branch you're on" + + def get_branch(self): + """ + On what branch is the current working copy + + @return: current branch + @rtype: C{str} + """ + out, ret = self.__git_getoutput('symbolic-ref', [ 'HEAD' ]) + if ret: + raise GitRepositoryError("Currently not on a branch") + + ref = out[0][:-1] + # Check if ref really exists + failed = self.__git_getoutput('show-ref', [ ref ])[1] + if not failed: + return ref[11:] # strip /refs/heads + + def has_branch(self, branch, remote=False): + """ + Check if the repository has branch named I{branch}. + + @param branch: branch to look for + @param remote: only look for remote branches + @type remote: C{bool} + @return: C{True} if the repository has this branch, C{False} otherwise + @rtype: C{bool} + """ + if remote: + ref = 'refs/remotes/%s' % branch + else: + ref = 'refs/heads/%s' % branch + failed = self.__git_getoutput('show-ref', [ ref ])[1] + if failed: + return False + return True + + def set_branch(self, branch): + """ + Switch to branch I{branch} + + @param branch: name of the branch to switch to + @type branch: C{str} + """ + if self.branch == branch: + return + + if self.bare: + self._git_command("symbolic-ref", + [ 'HEAD', 'refs/heads/%s' % branch ]) + else: + self._git_command("checkout", [ branch ]) + + def get_merge_branch(self, branch): + """ + Get the branch we'd merge from + + @return: repo and branch we would merge from + @rtype: C{str} + """ + try: + remote = self.get_config("branch.%s.remote" % branch) + merge = self.get_config("branch.%s.merge" % branch) + except KeyError: + return None + remote += merge.replace("refs/heads","", 1) + return remote + + def merge(self, commit, verbose=False): + """ + Merge changes from the named commit into the current branch + + @param commit: the commit to merge from (usually a branch name) + @type commit: C{str} + """ + args = [ "--summary" ] if verbose else [ "--no-summary" ] + self._git_command("merge", args + [ commit ]) + + def is_fast_forward(self, from_branch, to_branch): + """ + Check if an update I{from from_branch} to I{to_branch} would be a fast + forward or if the branch is up to date already. + + @return: can_fast_forward, up_to_date + @rtype: C{tuple} + """ + has_local = False # local repo has new commits + has_remote = False # remote repo has new commits + out = self.__git_getoutput('rev-list', ["--left-right", + "%s...%s" % (from_branch, to_branch), + "--"])[0] + + if not out: # both branches have the same commits + return True, True + + for line in out: + if line.startswith("<"): + has_local = True + elif line.startswith(">"): + has_remote = True + + if has_local and has_remote: + return False, False + elif has_local: + return False, True + elif has_remote: + return True, False + + def _get_branches(self, remote=False): + """ + Get a list of branches + + @param remote: whether to list local or remote branches + @type remote: C{bool} + @return: local or remote branches + @rtype: C{list} + """ + args = [ '--format=%(refname:short)' ] + args += [ 'refs/remotes/' ] if remote else [ 'refs/heads/' ] + out = self.__git_getoutput('for-each-ref', args)[0] + return [ ref.strip() for ref in out ] + + def get_local_branches(self): + """ + Get a list of local branches + + @return: local branches + @rtype: C{list} + """ + return self._get_branches(remote=False) + + + def get_remote_branches(self): + """ + Get a list of remote branches + + @return: remote branches + @rtype: C{list} + """ + return self._get_branches(remote=True) + + def update_ref(self, ref, new, old=None, msg=None): + """ + Update ref I{ref} to commit I{new} if I{ref} currently points to + I{old} + + @param ref: the ref to update + @type ref: C{str} + @param new: the new value for ref + @type new: C{str} + @param old: the old value of ref + @type old: C{str} + @param msg: the reason for the update + @type msg: C{str} + """ + args = [ ref, new ] + if old: + args += [ old ] + if msg: + args = [ '-m', msg ] + args + self._git_command("update-ref", args) + +#{ Tags + + def create_tag(self, name, msg=None, commit=None, sign=False, keyid=None): + """ + Create a new tag. + + @param name: the tag's name + @type name: C{str} + @param msg: The tag message. + @type msg: C{str} + @param commit: the commit or object to create the tag at, default + is I{HEAD} + @type commit: C{str} + @param sign: Whether to sing the tag + @type sign: C{bool} + @param keyid: the GPG keyid used to sign the tag + @type keyid: C{str} + """ + args = [] + args += [ '-m', msg ] if msg else [] + if sign: + args += [ '-s' ] + args += [ '-u', keyid ] if keyid else [] + args += [ name ] + args += [ commit ] if commit else [] + self._git_command("tag", args) + + def delete_tag(self, tag): + """ + Delete a tag named I{tag} + + @param tag: the tag to delete + @type tag: C{str} + """ + if self.has_tag(tag): + self._git_command("tag", [ "-d", tag ]) + + def move_tag(self, old, new): + self._git_command("tag", [ new, old ]) + self.delete_tag(old) + + def has_tag(self, tag): + """ + Check if the repository has a tag named I{tag}. + + @param tag: tag to look for + @type tag: C{str} + @return: C{True} if the repository has that tag, C{False} otherwise + @rtype: C{bool} + """ + out, ret = self.__git_getoutput('tag', [ '-l', tag ]) + return [ False, True ][len(out)] + + def _build_legacy_tag(self, format, version): + """legacy version numbering""" + if ':' in version: # strip of any epochs + version = version.split(':', 1)[1] + version = version.replace('~', '.') + return format % dict(version=version) + + def find_version(self, format, version): + """ + Check if a certain version is stored in this repo. Return it's SHA1 in + this case. For legacy tags Don't check only the tag but also the + message, since the former wasn't injective until recently. + You only need to use this funciton if you also need to check for legacy + tags. + + @param format: tag pattern + @param version: debian version number + @return: sha1 of the version tag + """ + tag = build_tag(format, version) + legacy_tag = self._build_legacy_tag(format, version) + if self.has_tag(tag): # new tags are injective + return self.rev_parse(tag) + elif self.has_tag(legacy_tag): + out, ret = self.__git_getoutput('cat-file', args=['-p', legacy_tag]) + if ret: + return None + for line in out: + if line.endswith(" %s\n" % version): + return self.rev_parse(legacy_tag) + elif line.startswith('---'): # GPG signature start + return None + return None + + def find_tag(self, commit, pattern=None): + """ + Find the closest tag to a given commit + + @param commit: the commit to describe + @type commit: C{str} + @param pattern: only look for tags matching I{pattern} + @type pattern: C{str} + @return: the found tag + @rtype: C{str} + """ + args = [ '--abbrev=0' ] + if pattern: + args += [ '--match' , pattern ] + args += [ commit ] + + tag, ret = self.__git_getoutput('describe', args) + if ret: + raise GitRepositoryError, "can't find tag for %s" % commit + return tag[0].strip() + + def get_tags(self, pattern=None): + """ + List tags + + @param pattern: only list tags matching I{pattern} + @type pattern: C{str} + @return: tags + @rtype: C{list} of C{str} + """ + args = [ '-l', pattern ] if pattern else [] + return [ line.strip() for line in self.__git_getoutput('tag', args)[0] ] +#} + def force_head(self, commit, hard=False): + """ + Force HEAD to a specific commit + + @param commit: commit to move HEAD to + @param hard: also update the working copy + @type hard: C{bool} + """ + if not GitCommit.is_sha1(commit): + commit = self.rev_parse(commit) + + if self.bare: + ref = "refs/heads/%s" % self.get_branch() + self._git_command("update-ref", [ ref, commit ]) + else: + args = ['--quiet'] + if hard: + args += [ '--hard' ] + args += [ commit, '--' ] + self._git_command("reset", args) + + def is_clean(self): + """ + Does the repository contain any uncommitted modifications? + + @return: C{True} if the repository is clean, C{False} otherwise + and Git's status message + @rtype: C{tuple} + """ + if self.bare: + return (True, '') + + clean_msg = 'nothing to commit' + out, ret = self.__git_getoutput('status') + if ret: + raise GbpError("Can't get repository status") + ret = False + for line in out: + if line.startswith('#'): + continue + if line.startswith(clean_msg): + ret = True + break + return (ret, "".join(out)) + + def is_empty(self): + """ + Is the repository empty? + + @return: True if the repositorydoesn't have any commits, + False otherwise + @rtype: C{bool} + """ + # an empty repo has no branches: + return False if self.branch else True + + def rev_parse(self, name): + """ + Find the SHA1 of a given name + + @param name: the name to look for + @type name: C{str} + @return: the name's sha1 + @rtype: C{str} + """ + args = [ "--quiet", "--verify", name ] + sha, ret = self.__git_getoutput('rev-parse', args) + if ret: + raise GitRepositoryError, "revision '%s' not found" % name + return sha[0].strip() + +#{ Trees + def checkout(self, treeish): + """ + Checkout treeish + + @param treeish: the treeish to check out + @type treeish: C{str} + """ + self._git_command("checkout", ["--quiet", treeish]) + + def has_treeish(self, treeish): + """ + Check if the repository has the treeish object I{treeish}. + + @param treeish: treeish object to look for + @type treeish: C{str} + @return: C{True} if the repository has that tree, C{False} otherwise + @rtype: C{bool} + """ + + out, ret = self.__git_getoutput('ls-tree', [ treeish ]) + return [ True, False ][ret != 0] + + def write_tree(self, index_file=None): + """ + Create a tree object from the current index + + @param index_file: alternate index file to write the current index to + @type index_file: C{str} + @return: the new tree object's sha1 + @rtype: C{str} + """ + if index_file: + extra_env = {'GIT_INDEX_FILE': index_file } + else: + extra_env = None + + tree, ret = self.__git_getoutput('write-tree', extra_env=extra_env) + if ret: + raise GitRepositoryError, "can't write out current index" + return tree[0].strip() +#} + + def get_config(self, name): + """ + Gets the config value associated with I{name} + + @param name: config value to get + @return: fetched config value + @rtype: C{str} + """ + value, ret = self.__git_getoutput('config', [ name ]) + if ret: raise KeyError + return value[0][:-1] # first line with \n ending removed + + def get_author_info(self): + """ + Determine a sane values for author name and author email from git's + config and environment variables. + + @return: name and email + @rtype: C{tuple} + """ + try: + name = self.get_config("user.email") + except KeyError: + name = os.getenv("USER") + try: + email = self.get_config("user.email") + except KeyError: + email = os.getenv("EMAIL") + email = os.getenv("GIT_AUTHOR_EMAIL", email) + name = os.getenv("GIT_AUTHOR_NAME", name) + return (name, email) + +#{ Remote Repositories + + def get_remote_repos(self): + """ + Get all remote repositories + + @return: remote repositories + @rtype: C{list} of C{str} + """ + out = self.__git_getoutput('remote')[0] + return [ remote.strip() for remote in out ] + + def has_remote_repo(self, name): + """ + Do we know about a remote named I{name}? + + @param name: name of the remote repository + @type name: C{str} + @return: C{True} if the remote repositore is known, C{False} otherwise + @rtype: C{bool} + """ + if name in self.get_remote_repos(): + return True + else: + return False + + def add_remote_repo(self, name, url, tags=True, fetch=False): + """ + Add a tracked remote repository + + @param name: the name to use for the remote + @type name: C{str} + @param url: the url to add + @type url: C{str} + @param tags: whether to fetch tags + @type tags: C{bool} + @param fetch: whether to fetch immediately from the remote side + @type fetch: C{bool} + """ + args = [ "add" ] + args += [ '--tags' ] if tags else [ '--no-tags'] + args += [ '--fetch' ] if fetch else [] + args += [ name, url ] + self._git_command("remote", args) + + def fetch(self, repo=None): + """ + Download objects and refs from another repository. + + @param repo: repository to fetch from + @type repo: C{str} + """ + args = [ '--quiet' ] + args += [repo] if repo else [] + + self._git_command("fetch", args) + + def pull(self, repo=None, ff_only=False): + """ + Fetch and merge from another repository + + @param repo: repository to fetch from + @type repo: C{str} + @param ff_only: only merge if this results in a fast forward merge + @type ff_only: C{bool} + """ + args = [] + args += [ '--ff-only' ] if ff_only else [] + args += [ repo ] if repo else [] + self._git_command("pull", args) + +#{ Files + + def add_files(self, paths, force=False, index_file=None, work_tree=None): + """ + Add files to a the repository + + @param paths: list of files to add + @type paths: list or C{str} + @param force: add files even if they would be ignored by .gitignore + @type force: C{bool} + @param index_file: alternative index file to use + @param work_tree: alternative working tree to use + """ + extra_env = {} + + if type(paths) in [type(''), type(u'')]: + paths = [ paths ] + + args = [ '-f' ] if force else [] + + if index_file: + extra_env['GIT_INDEX_FILE'] = index_file + + if work_tree: + extra_env['GIT_WORK_TREE'] = work_tree + + self._git_command("add", args + paths, extra_env) + + def remove_files(self, paths, verbose=False): + """ + Remove files from the repository + + @param paths: list of files to remove + @param paths: C{list} or C{str} + @param verbose: be verbose + @type verbose: C{bool} + """ + if type(paths) in [type(''), type(u'')]: + paths = [ paths ] + + args = [] if verbose else ['--quiet'] + self._git_command("rm", args + paths) + + def list_files(self, types=['cached']): + """ + List files in index and working tree + + @param types: list of types to show + @type types: C{list} + @return: list of files + @rtype: C{list} of C{str} + """ + all_types = [ 'cached', 'deleted', 'others', 'ignored', 'stage' + 'unmerged', 'killed', 'modified' ] + args = [ '-z' ] + + for t in types: + if t in all_types: + args += [ '--%s' % t ] + else: + raise GitRepositoryError("Unknown type '%s'" % t) + out, ret = self.__git_getoutput('ls-files', args) + if ret: + raise GitRepositoryError("Error listing files: '%d'" % ret) + if out: + return [ file for file in out[0].split('\0') if file ] + else: + return [] + +#{ Comitting + + def _commit(self, msg, args=[], author_info=None): + extra_env = author_info.get_author_env() if author_info else None + self._git_command("commit", ['-q', '-m', msg] + args, extra_env=extra_env) + + def commit_staged(self, msg, author_info=None): + """ + Commit currently staged files to the repository + + @param msg: commit message + @type msg: C{str} + @param author_info: authorship information + @type author_info: L{GitModifier} + """ + self._commit(msg=msg, author_info=author_info) + + def commit_all(self, msg, author_info=None): + """ + Commit all changes to the repository + @param msg: commit message + @type msg: C{str} + @param author_info: authorship information + @type author_info: L{GitModifier} + """ + self._commit(msg=msg, args=['-a'], author_info=author_info) + + def commit_files(self, files, msg, author_info=None): + """ + Commit the given files to the repository + + @param files: file or files to commit + @type files: C{str} or C{list} + @param msg: commit message + @type msg: C{str} + @param author_info: authorship information + @type author_info: L{GitModifier} + """ + if type(files) in [type(''), type(u'')]: + files = [ files ] + self._commit(msg=msg, args=files, author_info=author_info) + + def commit_dir(self, unpack_dir, msg, branch, other_parents=None, + author={}, committer={}): + """ + Replace the current tip of branch I{branch} with the contents from I{unpack_dir} + + @param unpack_dir: content to add + @type unpack_dir: C{str} + @param msg: commit message to use + @type msg: C{str} + @param branch: branch to add the contents of unpack_dir to + @type branch: C{str} + @param other_parents: additional parents of this commit + @type other_parents: C{list} of C{str} + @param author: author information to use for commit + @type author: C{dict} with keys I{name}, I{email}, I{date} + @param committer: committer information to use for commit + @type committer: C{dict} with keys I{name}, I{email}, I{date} + """ + + git_index_file = os.path.join(self.path, self._git_dir, 'gbp_index') + try: + os.unlink(git_index_file) + except OSError: + pass + self.add_files('.', force=True, index_file=git_index_file, + work_tree=unpack_dir) + tree = self.write_tree(git_index_file) + + if branch: + cur = self.rev_parse(branch) + else: # emtpy repo + cur = None + branch = 'master' + + # Build list of parents: + parents = [] + if cur: + parents = [ cur ] + if other_parents: + for parent in other_parents: + sha = self.rev_parse(parent) + if sha not in parents: + parents += [ sha ] + + commit = self.commit_tree(tree=tree, msg=msg, parents=parents, + author=author, committer=committer) + if not commit: + raise GbpError, "Failed to commit tree" + self.update_ref("refs/heads/%s" % branch, commit, cur) + return commit + + def commit_tree(self, tree, msg, parents, author={}, committer={}): + """ + Commit a tree with commit msg I{msg} and parents I{parents} + + @param tree: tree to commit + @param msg: commit message + @param parents: parents of this commit + @param author: authorship information + @type author: C{dict} with keys 'name' and 'email' + @param committer: comitter information + @type committer: C{dict} with keys 'name' and 'email' + """ + extra_env = {} + for key, val in author.items(): + if val: + extra_env['GIT_AUTHOR_%s' % key.upper()] = val + for key, val in committer.items(): + if val: + extra_env['GIT_COMMITTER_%s' % key.upper()] = val + + args = [ tree ] + for parent in parents: + args += [ '-p' , parent ] + sha1, stderr, ret = self.__git_inout('commit-tree', args, msg, extra_env) + if not ret: + return sha1.strip() + else: + raise GbpError, "Failed to commit tree: %s" % stderr + +#{ Commit Information + + def get_commits(self, since=None, until=None, paths=None, options=None, + first_parent=False): + """ + Get commits from since to until touching paths + + @param since: commit to start from + @param until: last commit to get + @param paths: only list commits touching paths + @param options: list of options passed to git log + @type options: C{list} of C{str}ings + @param first_parent: only follow first parent when seeing a + merge commit + @type first_parent: C{bool} + """ + + args = ['--pretty=format:%H'] + + if options: + args += options + + if first_parent: + args += [ "--first-parent" ] + + if since and until: + args += ['%s..%s' % (since, until)] + + if paths: + args += [ "--", paths ] + + commits, ret = self.__git_getoutput('log', args) + if ret: + where = " on %s" % paths if paths else "" + raise GitRepositoryError, ("Error getting commits %s..%s%s" % + (since, until, where)) + return [ commit.strip() for commit in commits ] + + def show(self, id): + """git-show id""" + commit, ret = self.__git_getoutput('show', [ "--pretty=medium", id ]) + if ret: + raise GitRepositoryError, "can't get %s" % id + for line in commit: + yield line + + def grep_log(self, regex, where=None): + args = ['--pretty=format:%H'] + args.append("--grep=%s" % regex) + if where: + args.append(where) + args.append('--') + + commits, ret = self.__git_getoutput('log', args) + if ret: + raise GitRepositoryError, "Error grepping log for %s" % regex + return [ commit.strip() for commit in commits[::-1] ] + + def get_subject(self, commit): + """ + Gets the subject of a commit. + + @param commit: the commit to get the subject from + @return: the commit's subject + @rtype: C{str} + """ + out, ret = self.__git_getoutput('log', ['-n1', '--pretty=format:%s', commit]) + if ret: + raise GitRepositoryError, "Error getting subject of commit %s" % commit + return out[0].strip() + + def get_commit_info(self, commit): + """ + Look up data of a specific commit + + @param commit: the commit to inspect + @return: the commit's including id, author, email, subject and body + @rtype: dict + """ + out, ret = self.__git_getoutput('log', + ['--pretty=format:%an%n%ae%n%s%n%b%n', + '-n1', commit]) + if ret: + raise GitRepositoryError, "Unable to retrieve log entry for %s" \ + % commit + return {'id' : commit, + 'author' : out[0].strip(), + 'email' : out[1].strip(), + 'subject' : out[2].rstrip(), + 'body' : [line.rstrip() for line in out[3:]]} + + +#{ Patches + def format_patches(self, start, end, output_dir): + """ + Output the commits between start and end as patches in output_dir + """ + options = [ '-N', '-k', '-o', output_dir, '%s...%s' % (start, end) ] + output, ret = self.__git_getoutput('format-patch', options) + return [ line.strip() for line in output ] + + def apply_patch(self, patch, index=True, context=None, strip=None): + """Apply a patch using git apply""" + args = [] + if context: + args += [ '-C', context ] + if index: + args.append("--index") + if strip: + args += [ '-p', strip ] + args.append(patch) + self._git_command("apply", args) +#} + + def archive(self, format, prefix, output, treeish, **kwargs): + args = [ '--format=%s' % format, '--prefix=%s' % prefix, + '--output=%s' % output, treeish ] + out, ret = self.__git_getoutput('archive', args, **kwargs) + if ret: + raise GitRepositoryError, "unable to archive %s"%(treeish) + + def collect_garbage(self, auto=False): + """ + Cleanup unnecessary files and optimize the local repository + + param auto: only cleanup if required + param auto: C{bool} + """ + args = [ '--auto' ] if auto else [] + self._git_command("gc", args) + +#{ Submodules + + def has_submodules(self): + """ + Does the repo have any submodules? + + @return: C{True} if the repository has any submodules, C{False} + otherwise + @rtype: C{bool} + """ + if os.path.exists('.gitmodules'): + return True + else: + return False + + + def add_submodule(self, repo_path): + """ + Add a submodule + + @param repo_path: path to submodule + @type repo_path: C{str} + """ + self._git_command("submodule", [ "add", repo_path ]) + + + def update_submodules(self, init=True, recursive=True, fetch=False): + """ + Update all submodules + + @param init: whether to initialize the submodule if necessary + @type init: C{bool} + @param recursive: whether to update submodules recursively + @type recursive: C{bool} + @param fetch: whether to fetch new objects + @type fetch: C{bool} + """ + + if not self.has_submodules(): + return + args = [ "update" ] + if recursive: + args.append("--recursive") + if init: + args.append("--init") + if not fetch: + args.append("--no-fetch") + + self._git_command("submodule", args) + + + def get_submodules(self, treeish, path=None, recursive=True): + """ + List the submodules of treeish + + @return: a list of submodule/commit-id tuples + @rtype: list of tuples + """ + # Note that we is lstree instead of submodule commands because + # there's no way to list the submodules of another branch with + # the latter. + submodules = [] + if path is None: + path = "." + + args = [ treeish ] + if recursive: + args += ['-r'] + + out, ret = self.__git_getoutput('ls-tree', args, cwd=path) + for line in out: + mode, objtype, commit, name = line[:-1].split(None, 3) + # A submodules is shown as "commit" object in ls-tree: + if objtype == "commit": + nextpath = os.path.sep.join([path, name]) + submodules.append( (nextpath, commit) ) + if recursive: + submodules += self.get_submodules(commit, path=nextpath, + recursive=recursive) + return submodules + +#{ Repository Creation + + @classmethod + def create(klass, path, description=None, bare=False): + """ + Create a repository at path + + @param path: where to create the repository + @type path: C{str} + @return: git repository object + @rtype: L{GitRepository} + """ + abspath = os.path.abspath(path) + + if bare: + args = [ '--bare' ] + git_dir = '' + else: + args = [] + git_dir = '.git' + + try: + if not os.path.exists(abspath): + os.makedirs(abspath) + GitCommand("init", args, cwd=abspath)() + if description: + with file(os.path.join(abspath, git_dir, "description"), 'w') as f: + description += '\n' if description[-1] != '\n' else '' + f.write(description) + return klass(abspath) + except OSError, err: + raise GitRepositoryError, "Cannot create Git repository at %s: %s " % (abspath, err[1]) + return None + + @classmethod + def clone(klass, path, remote, depth=0, recursive=False, mirror=False, + bare=False, auto_name=True): + """ + Clone a git repository at I{remote} to I{path}. + + @param path: where to clone the repository to + @type path: C{str} + @param remote: URL to clone + @type remote: C{str} + @param depth: create a shallow clone of depth I{depth} + @type depth: C{int} + @param recursive: whether to clone submodules + @type recursive: C{bool} + @param auto_name: If I{True} create a directory below I{path} based on + the I{remote}s name. Otherwise create the repo directly at I{path}. + @type auto_name: C{bool} + @return: git repository object + @rtype: L{GitRepository} + """ + abspath = os.path.abspath(path) + if auto_name: + name = None + else: + abspath, name = abspath.rsplit('/', 1) + + args = [ '--quiet' ] + args += [ '--depth', depth ] if depth else [] + args += [ '--recursive' ] if recursive else [] + args += [ '--mirror' ] if mirror else [] + args += [ '--bare' ] if bare else [] + args += [ remote ] + args += [ name ] if name else [] + try: + if not os.path.exists(abspath): + os.makedirs(abspath) + + GitCommand("clone", args, cwd=abspath)() + if not name: + name = remote.rstrip('/').rsplit('/',1)[1] + if (mirror or bare): + name = "%s.git" % name + elif name.endswith('.git'): + name = name[:-4] + return klass(os.path.join(abspath, name)) + except OSError, err: + raise GitRepositoryError, "Cannot clone Git repository %s to %s: %s " % (remote, abspath, err[1]) + return None +#} + + +class FastImport(object): + """Invoke git-fast-import""" + _bufsize = 1024 + + m_regular = 644 + m_exec = 755 + m_symlink = 120000 + + def __init__(self): + try: + self._fi = subprocess.Popen([ 'git', 'fast-import', '--quiet'], stdin=subprocess.PIPE) + self._out = self._fi.stdin + except OSError as err: + raise GbpError("Error spawning git fast-import: %s" % err) + except ValueError as err: + raise GbpError("Invalid argument when spawning git fast-import: %s" % err) + + def _do_data(self, fd, size): + self._out.write("data %s\n" % size) + while True: + data = fd.read(self._bufsize) + self._out.write(data) + if len(data) != self._bufsize: + break + self._out.write("\n") + + def _do_file(self, filename, mode, fd, size): + name = "/".join(filename.split('/')[1:]) + self._out.write("M %d inline %s\n" % (mode, name)) + self._do_data(fd, size) + + def add_file(self, filename, fd, size): + self._do_file(filename, self.m_regular, fd, size) + + def add_executable(self, filename, fd, size): + self._do_file(filename, self.m_exec, fd, size) + + def add_symlink(self, filename, linkname): + name = "/".join(filename.split('/')[1:]) + self._out.write("M %d inline %s\n" % (self.m_symlink, name)) + self._out.write("data %s\n" % len(linkname)) + self._out.write("%s\n" % linkname) + + def start_commit(self, branch, committer, email, time, msg): + length = len(msg) + self._out.write("""commit refs/heads/%(branch)s +committer %(committer)s <%(email)s> %(time)s +data %(length)s +%(msg)s +from refs/heads/%(branch)s^0 +""" % locals()) + + def do_deleteall(self): + self._out.write("deleteall\n") + + def close(self): + if self._out: + self._out.close() + if self._fi: + self._fi.wait() + + def __del__(self): + self.close() + + +def build_tag(format, version): + """Generate a tag from a given format and a version + + >>> build_tag("debian/%(version)s", "0:0~0") + 'debian/0%0_0' + """ + return format % dict(version=__sanitize_version(version)) + + +def __sanitize_version(version): + """sanitize a version so git accepts it as a tag + + >>> __sanitize_version("0.0.0") + '0.0.0' + >>> __sanitize_version("0.0~0") + '0.0_0' + >>> __sanitize_version("0:0.0") + '0%0.0' + >>> __sanitize_version("0%0~0") + '0%0_0' + """ + return version.replace('~', '_').replace(':', '%') + + +def tag_to_version(tag, format): + """Extract the version from a tag + + >>> tag_to_version("upstream/1%2_3-4", "upstream/%(version)s") + '1:2~3-4' + >>> tag_to_version("foo/2.3.4", "foo/%(version)s") + '2.3.4' + >>> tag_to_version("foo/2.3.4", "upstream/%(version)s") + """ + version_re = format.replace('%(version)s', + '(?P<version>[\w_%+-.]+)') + r = re.match(version_re, tag) + if r: + version = r.group('version').replace('_', '~').replace('%', ':') + return version + return None + + +def rfc822_date_to_git(rfc822_date): + """Parse a date in RFC822 format, and convert to a 'seconds tz' C{str}ing. + + >>> rfc822_date_to_git('Thu, 1 Jan 1970 00:00:01 +0000') + '1 +0000' + >>> rfc822_date_to_git('Thu, 20 Mar 2008 01:12:57 -0700') + '1206000777 -0700' + >>> rfc822_date_to_git('Sat, 5 Apr 2008 17:01:32 +0200') + '1207407692 +0200' + """ + d = dateutil.parser.parse(rfc822_date) + seconds = calendar.timegm(d.utctimetuple()) + tz = d.strftime("%z") + return '%d %s' % (seconds, tz) + +# vim:et:ts=4:sw=4:et:sts=4:ai:set list listchars=tab\:»·,trail\:·: |