diff options
author | Markus Lehtonen <markus.lehtonen@linux.intel.com> | 2012-02-08 14:42:00 +0200 |
---|---|---|
committer | Guido Günther <agx@sigxcpu.org> | 2012-05-01 22:29:29 +0200 |
commit | f495df9857d9738cb7ed2200d3124fca6372f990 (patch) | |
tree | f84fba137005eab9f5eb89c2648fe99823db7ed5 /gbp/pkg | |
parent | 082679da5004790d5d5bf19ede8a657f6c2b8769 (diff) |
Refactor deb helpers: move UpstreamSource class
to pkg base module. This refactor is preparation to the upcoming rpm
support.
Diffstat (limited to 'gbp/pkg')
-rw-r--r-- | gbp/pkg/__init__.py | 205 |
1 files changed, 205 insertions, 0 deletions
diff --git a/gbp/pkg/__init__.py b/gbp/pkg/__init__.py index a3f02bc6..540c22cf 100644 --- a/gbp/pkg/__init__.py +++ b/gbp/pkg/__init__.py @@ -102,3 +102,208 @@ class PkgPolicy(object): return False return True + +class UpstreamSource(object): + """ + Upstream source. Can be either an unpacked dir, a tarball or another type + of archive + + @cvar _orig: are the upstream sources already suitable as an upstream + tarball + @type _orig: boolean + @cvar _path: path to the upstream sources + @type _path: string + @cvar _unpacked: path to the unpacked source tree + @type _unpacked: string + """ + def __init__(self, name, unpacked=None): + self._orig = False + self._path = name + self.unpacked = unpacked + + self._check_orig() + if self.is_dir(): + self.unpacked = self.path + + def _check_orig(self): + """ + Check if upstream source format can be used as orig tarball. + This doesn't imply that the tarball is correctly named. + + @return: C{True} if upstream source format is suitable + as upstream tarball, C{False} otherwise. + @rtype: C{bool} + """ + if self.is_dir(): + self._orig = False + return + + parts = self._path.split('.') + try: + if parts[-1] == 'tgz': + self._orig = True + elif parts[-2] == 'tar': + if (parts[-1] in compressor_opts or + parts[-1] in compressor_aliases): + self._orig = True + except IndexError: + self._orig = False + + def is_orig(self): + """ + @return: C{True} if sources are suitable as upstream source, + C{False} otherwise + @rtype: C{bool} + """ + return self._orig + + def is_dir(self): + """ + @return: C{True} if if upstream sources are an unpacked directory, + C{False} otherwise + @rtype: C{bool} + """ + return True if os.path.isdir(self._path) else False + + @property + def path(self): + return self._path.rstrip('/') + + def unpack(self, dir, filters=[]): + """ + Unpack packed upstream sources into a given directory + and determine the toplevel of the source tree. + """ + if self.is_dir(): + raise GbpError, "Cannot unpack directory %s" % self.path + + if not filters: + filters = [] + + if type(filters) != type([]): + raise GbpError, "Filters must be a list" + + self._unpack_archive(dir, filters) + self.unpacked = self._unpacked_toplevel(dir) + + def _unpack_archive(self, dir, filters): + """ + Unpack packed upstream sources into a given directory. + """ + ext = os.path.splitext(self.path)[1] + if ext in [ ".zip", ".xpi" ]: + self._unpack_zip(dir) + else: + self._unpack_tar(dir, filters) + + def _unpack_zip(self, dir): + try: + gbpc.UnpackZipArchive(self.path, dir)() + except gbpc.CommandExecFailed: + raise GbpError, "Unpacking of %s failed" % self.path + + def _unpacked_toplevel(self, dir): + """unpacked archives can contain a leading directory or not""" + unpacked = glob.glob('%s/*' % dir) + unpacked.extend(glob.glob("%s/.*" % dir)) # include hidden files and folders + # Check that dir contains nothing but a single folder: + if len(unpacked) == 1 and os.path.isdir(unpacked[0]): + return unpacked[0] + else: + return dir + + def _unpack_tar(self, dir, filters): + """ + Unpack a tarball to I{dir} applying a list of I{filters}. Leave the + cleanup to the caller in case of an error. + """ + try: + unpackArchive = gbpc.UnpackTarArchive(self.path, dir, filters) + unpackArchive() + except gbpc.CommandExecFailed: + # unpackArchive already printed an error message + raise GbpError + + def pack(self, newarchive, filters=[]): + """ + Recreate a new archive from the current one + + @param newarchive: the name of the new archive + @type newarchive: string + @param filters: tar filters to apply + @type filters: array of strings + @return: the new upstream source + @rtype: UpstreamSource + """ + if not self.unpacked: + raise GbpError, "Need an unpacked source tree to pack" + + if not filters: + filters = [] + + if type(filters) != type([]): + raise GbpError, "Filters must be a list" + + try: + unpacked = self.unpacked.rstrip('/') + repackArchive = gbpc.PackTarArchive(newarchive, + os.path.dirname(unpacked), + os.path.basename(unpacked), + filters) + repackArchive() + except gbpc.CommandExecFailed: + # repackArchive already printed an error + raise GbpError + return UpstreamSource(newarchive) + + @staticmethod + def known_compressions(): + return [ args[1][-1] for args in compressor_opts.items() ] + + def guess_version(self, extra_regex=r''): + """ + Guess the package name and version from the filename of an upstream + archive. + + >>> UpstreamSource('foo-bar_0.2.orig.tar.gz').guess_version() + ('foo-bar', '0.2') + >>> UpstreamSource('foo-Bar_0.2.orig.tar.gz').guess_version() + >>> UpstreamSource('git-bar-0.2.tar.gz').guess_version() + ('git-bar', '0.2') + >>> UpstreamSource('git-bar-0.2-rc1.tar.gz').guess_version() + ('git-bar', '0.2-rc1') + >>> UpstreamSource('git-bar-0.2:~-rc1.tar.gz').guess_version() + ('git-bar', '0.2:~-rc1') + >>> UpstreamSource('git-Bar-0A2d:rc1.tar.bz2').guess_version() + ('git-Bar', '0A2d:rc1') + >>> UpstreamSource('git-1.tar.bz2').guess_version() + ('git', '1') + >>> UpstreamSource('kvm_87+dfsg.orig.tar.gz').guess_version() + ('kvm', '87+dfsg') + >>> UpstreamSource('foo-Bar_0.2.orig.tar.gz').guess_version() + >>> UpstreamSource('foo-Bar-a.b.tar.gz').guess_version() + >>> UpstreamSource('foo-bar_0.2.orig.tar.xz').guess_version() + ('foo-bar', '0.2') + >>> UpstreamSource('foo-bar_0.2.orig.tar.lzma').guess_version() + ('foo-bar', '0.2') + + @param extra_regex: additional regex to apply, needs a 'package' and a + 'version' group + @return: (package name, version) or None. + @rtype: tuple + """ + version_chars = r'[a-zA-Z\d\.\~\-\:\+]' + extensions = r'\.tar\.(%s)' % "|".join(self.known_compressions()) + + version_filters = map ( lambda x: x % (version_chars, extensions), + ( # Debian package_<version>.orig.tar.gz: + r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)\.orig%s', + # Upstream package-<version>.tar.gz: + r'^(?P<package>[a-zA-Z\d\.\+\-]+)-(?P<version>[0-9]%s*)%s')) + if extra_regex: + version_filters = extra_regex + version_filters + + for filter in version_filters: + m = re.match(filter, os.path.basename(self.path)) + if m: + return (m.group('package'), m.group('version')) |