diff options
author | Guido Günther <agx@sigxcpu.org> | 2017-08-03 17:54:33 -0300 |
---|---|---|
committer | Guido Günther <agx@sigxcpu.org> | 2017-08-03 17:54:33 -0300 |
commit | 702819a4a3fdd9426bd36bbbd91a3d7ba916668a (patch) | |
tree | 154e6ccf02324b00f79d6aba79b3eaad30216db0 | |
parent | 3b99b64630a7c6f165b15905733e007b229d7381 (diff) |
pkg: clear __init__.py
for readability
-rw-r--r-- | gbp/deb/policy.py | 3 | ||||
-rw-r--r-- | gbp/deb/pristinetar.py | 2 | ||||
-rw-r--r-- | gbp/pkg/__init__.py | 447 | ||||
-rw-r--r-- | gbp/pkg/archive.py | 82 | ||||
-rw-r--r-- | gbp/pkg/compressor.py | 75 | ||||
-rw-r--r-- | gbp/pkg/pkgpolicy.py | 167 | ||||
-rw-r--r-- | gbp/pkg/upstreamsource.py | 193 | ||||
-rw-r--r-- | gbp/rpm/__init__.py | 4 | ||||
-rw-r--r-- | gbp/rpm/policy.py | 4 | ||||
-rw-r--r-- | gbp/scripts/common/__init__.py | 4 | ||||
-rwxr-xr-x | gbp/scripts/export_orig.py | 4 | ||||
-rwxr-xr-x | gbp/scripts/import_srpm.py | 4 | ||||
-rwxr-xr-x | gbp/scripts/pq_rpm.py | 4 |
13 files changed, 537 insertions, 456 deletions
diff --git a/gbp/deb/policy.py b/gbp/deb/policy.py index 6ea80fe4..9a118114 100644 --- a/gbp/deb/policy.py +++ b/gbp/deb/policy.py @@ -23,7 +23,8 @@ like allowed characters in version numbers, etc. import os import re -from gbp.pkg import PkgPolicy, Compressor +from gbp.pkg.pkgpolicy import PkgPolicy +from gbp.pkg.compressor import Compressor class DebianPkgPolicy(PkgPolicy): diff --git a/gbp/deb/pristinetar.py b/gbp/deb/pristinetar.py index bb034e77..92c627b1 100644 --- a/gbp/deb/pristinetar.py +++ b/gbp/deb/pristinetar.py @@ -16,7 +16,7 @@ # <http://www.gnu.org/licenses/> """Handle checkin and checkout of archives from the pristine-tar branch""" -from gbp.pkg import Compressor +from gbp.pkg.compressor import Compressor from gbp.pkg.pristinetar import PristineTar from gbp.deb import DebianPkgPolicy diff --git a/gbp/pkg/__init__.py b/gbp/pkg/__init__.py index c3dcde48..e6204e79 100644 --- a/gbp/pkg/__init__.py +++ b/gbp/pkg/__init__.py @@ -17,445 +17,8 @@ # <http://www.gnu.org/licenses/> """Common functionality of the Debian/RPM package helpers""" -import os -import re -import glob - -import six - -import gbp.command_wrappers as gbpc -from gbp.errors import GbpError - - -# compression types, extra options and extensions -class Compressor(object): - # Map frequently used names of compression types to the internal ones: - Aliases = {'bz2': 'bzip2', - 'gz': 'gzip', } - - Opts = {'gzip': '-n', - 'bzip2': '', - 'lzma': '', - 'xz': ''} - - Exts = {'gzip': 'gz', - 'bzip2': 'bz2', - 'lzma': 'lzma', - 'xz': 'xz'} - - def __init__(self, type_, level=None): - self._type = type_ - self._level = int(level) if level not in [None, ''] else None - - def is_known(self): - return self.type in self.Opts.keys() - - @property - def type(self): - return self._type - - @property - def level(self): - return self._level - - @property - def _level_opt(self): - return '-%d' % self.level if self.level is not None else '' - - @property - def _more_opts(self): - return self.Opts.get(self._type, '') - - def cmdline(self, stdout=True): - """ - >>> Compressor('gzip', level=9).cmdline() - 'gzip -9 -n -c' - >>> Compressor('gzip').cmdline(True) - 'gzip -n -c' - """ - return "%s %s %s %s" % (self.type, self._level_opt, self._more_opts, - "-c" if stdout else '') - - def __repr__(self): - """ - >>> Compressor('gzip').__repr__() - "<compressor type='gzip' >" - >>> Compressor('gzip', 9).__repr__() - "<compressor type='gzip' level=9>" - """ - level_str = "level=%s" % self.level if self.level is not None else '' - return "<compressor type='%s' %s>" % (self.type, level_str) - - -# Supported archive formats -archive_formats = ['tar', 'zip'] - -# Map combined file extensions to archive and compression format -archive_ext_aliases = {'tgz': ('tar', 'gzip'), - 'tbz2': ('tar', 'bzip2'), - 'tlz': ('tar', 'lzma'), - 'txz': ('tar', 'xz')} - - - -def parse_archive_filename(filename): - """ - Given a filename return the basename (filename without the - archive and compression extensions), archive format and - compression method used. - - @param filename: the name of the file - @type filename: string - @return: tuple containing basename, archive format and compression method - @rtype: C{tuple} of C{str} - - >>> parse_archive_filename("abc.tar.gz") - ('abc', 'tar', 'gzip') - >>> parse_archive_filename("abc.tar.bz2") - ('abc', 'tar', 'bzip2') - >>> parse_archive_filename("abc.def.tbz2") - ('abc.def', 'tar', 'bzip2') - >>> parse_archive_filename("abc.def.tar.xz") - ('abc.def', 'tar', 'xz') - >>> parse_archive_filename("abc.zip") - ('abc', 'zip', None) - >>> parse_archive_filename("abc.lzma") - ('abc', None, 'lzma') - >>> parse_archive_filename("abc.tar.foo") - ('abc.tar.foo', None, None) - >>> parse_archive_filename("abc") - ('abc', None, None) - """ - (base_name, archive_fmt, compression) = (filename, None, None) - - # Split filename to pieces - split = filename.split(".") - if len(split) > 1: - if split[-1] in archive_ext_aliases: - base_name = ".".join(split[:-1]) - (archive_fmt, compression) = archive_ext_aliases[split[-1]] - elif split[-1] in archive_formats: - base_name = ".".join(split[:-1]) - (archive_fmt, compression) = (split[-1], None) - else: - for (c, ext) in six.iteritems(Compressor.Exts): - if ext == split[-1]: - base_name = ".".join(split[:-1]) - compression = c - if len(split) > 2 and split[-2] in archive_formats: - base_name = ".".join(split[:-2]) - archive_fmt = split[-2] - - return (base_name, archive_fmt, compression) - - -class PkgPolicy(object): - """ - Common helpers for packaging policy. - """ - packagename_re = None - packagename_msg = None - upstreamversion_re = None - upstreamversion_msg = None - - @classmethod - def is_valid_packagename(cls, name): - """ - Is this a valid package name? - - >>> PkgPolicy.is_valid_packagename('doesnotmatter') - Traceback (most recent call last): - ... - NotImplementedError: Class needs to provide packagename_re - """ - if cls.packagename_re is None: - raise NotImplementedError("Class needs to provide packagename_re") - return True if cls.packagename_re.match(name) else False - - @classmethod - def is_valid_upstreamversion(cls, version): - """ - Is this a valid upstream version number? - - >>> PkgPolicy.is_valid_upstreamversion('doesnotmatter') - Traceback (most recent call last): - ... - NotImplementedError: Class needs to provide upstreamversion_re - """ - if cls.upstreamversion_re is None: - raise NotImplementedError("Class needs to provide upstreamversion_re") - return True if cls.upstreamversion_re.match(version) else False - - @staticmethod - def guess_upstream_src_version(filename, extra_regex=r''): - """ - Guess the package name and version from the filename of an upstream - archive. - - @param filename: filename (archive or directory) from which to guess - @type filename: C{string} - @param extra_regex: additional regex to apply, needs a 'package' and a - 'version' group - @return: (package name, version) or ('', '') - @rtype: tuple - - >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.gz') - ('foo-bar', '0.2') - >>> PkgPolicy.guess_upstream_src_version('foo-Bar_0.2.orig.tar.gz') - ('', '') - >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2.tar.gz') - ('git-bar', '0.2') - >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2-rc1.tar.gz') - ('git-bar', '0.2-rc1') - >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2:~-rc1.tar.gz') - ('git-bar', '0.2:~-rc1') - >>> PkgPolicy.guess_upstream_src_version('git-Bar-0A2d:rc1.tar.bz2') - ('git-Bar', '0A2d:rc1') - >>> PkgPolicy.guess_upstream_src_version('git-1.tar.bz2') - ('git', '1') - >>> PkgPolicy.guess_upstream_src_version('kvm_87+dfsg.orig.tar.gz') - ('kvm', '87+dfsg') - >>> PkgPolicy.guess_upstream_src_version('foo-Bar-a.b.tar.gz') - ('', '') - >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.xz') - ('foo-bar', '0.2') - >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.lzma') - ('foo-bar', '0.2') - >>> PkgPolicy.guess_upstream_src_version('foo-bar-0.2.zip') - ('foo-bar', '0.2') - >>> PkgPolicy.guess_upstream_src_version('foo-bar-0.2.tlz') - ('foo-bar', '0.2') - >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.tar.gz') - ('foo-bar', '0.2') - """ - version_chars = r'[a-zA-Z\d\.\~\-\:\+]' - basename = parse_archive_filename(os.path.basename(filename))[0] - - version_filters = map( - lambda x: x % version_chars, - ( # Debian upstream tarball: package_'<version>.orig.tar.gz' - r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)\.orig', - # Debian native: 'package_<version>.tar.gz' - r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)', - # Upstream 'package-<version>.tar.gz' - # or directory 'package-<version>': - r'^(?P<package>[a-zA-Z\d\.\+\-]+)(-)(?P<version>[0-9]%s*)')) - if extra_regex: - version_filters = extra_regex + version_filters - - for filter in version_filters: - m = re.match(filter, basename) - if m: - return (m.group('package'), m.group('version')) - return ('', '') - - @staticmethod - def has_origs(orig_files, dir): - "Check orig tarball and sub tarballs exists in dir" - for o in orig_files: - if not os.path.exists(os.path.join(dir, o)): - return False - return True - - @classmethod - def has_orig(cls, orig_file, dir): - return cls.has_origs([orig_file], dir) - - @staticmethod - def symlink_origs(orig_files, orig_dir, output_dir, force=False): - """ - symlink orig tarball from orig_dir to output_dir - @return: [] if all links were created, list of - failed links otherwise - """ - orig_dir = os.path.abspath(orig_dir) - output_dir = os.path.abspath(output_dir) - err = [] - - if orig_dir == output_dir: - return [] - - for f in orig_files: - src = os.path.join(orig_dir, f) - dst = os.path.join(output_dir, f) - if not os.access(src, os.F_OK): - err.append(f) - continue - try: - if os.path.exists(dst) and force: - os.unlink(dst) - os.symlink(src, dst) - except OSError: - err.append(f) - return err - - @classmethod - def symlink_orig(cls, orig_file, orig_dir, output_dir, force=False): - return cls.symlink_origs([orig_file], orig_dir, output_dir, force=force) - - -class UpstreamSource(object): - """ - Upstream source. Can be either an unpacked dir, a tarball or another type - of archive - - @cvar _orig: are the upstream sources already suitable as an upstream - tarball - @type _orig: boolean - @cvar _path: path to the upstream sources - @type _path: string - @cvar _unpacked: path to the unpacked source tree - @type _unpacked: string - """ - def __init__(self, name, unpacked=None, pkg_policy=PkgPolicy): - self._orig = False - self._pkg_policy = pkg_policy - self._path = name - self.unpacked = unpacked - - self._check_orig() - if self.is_dir(): - self.unpacked = self.path - - def _check_orig(self): - """ - Check if upstream source format can be used as orig tarball. - This doesn't imply that the tarball is correctly named. - - @return: C{True} if upstream source format is suitable - as upstream tarball, C{False} otherwise. - @rtype: C{bool} - """ - if self.is_dir(): - self._orig = False - return - - parts = self._path.split('.') - try: - if parts[-1] == 'tgz': - self._orig = True - elif parts[-2] == 'tar': - if (parts[-1] in Compressor.Opts or - parts[-1] in Compressor.Aliases): - self._orig = True - except IndexError: - self._orig = False - - def is_orig(self): - """ - @return: C{True} if sources are suitable as orig tarball, - C{False} otherwise - @rtype: C{bool} - """ - return self._orig - - def is_dir(self): - """ - @return: C{True} if if upstream sources are an unpacked directory, - C{False} otherwise - @rtype: C{bool} - """ - return True if os.path.isdir(self._path) else False - - @property - def path(self): - return self._path.rstrip('/') - - def unpack(self, dir, filters=None): - """ - Unpack packed upstream sources into a given directory - (filtering out files specified by filters) and determine the - toplevel of the source tree. - """ - if self.is_dir(): - raise GbpError("Cannot unpack directory %s" % self.path) - - if not filters: - filters = [] - - if not isinstance(filters, list): - raise GbpError("Filters must be a list") - - self._unpack_archive(dir, filters) - self.unpacked = self._unpacked_toplevel(dir) - - def _unpack_archive(self, dir, filters): - """ - Unpack packed upstream sources into a given directory - allowing to filter out files in case of tar archives. - """ - ext = os.path.splitext(self.path)[1] - if ext in [".zip", ".xpi"]: - if filters: - raise GbpError("Can only filter tar archives: %s", (ext, self.path)) - self._unpack_zip(dir) - else: - self._unpack_tar(dir, filters) - - def _unpack_zip(self, dir): - try: - gbpc.UnpackZipArchive(self.path, dir)() - except gbpc.CommandExecFailed: - raise GbpError("Unpacking of %s failed" % self.path) - - def _unpacked_toplevel(self, dir): - """unpacked archives can contain a leading directory or not""" - unpacked = glob.glob('%s/*' % dir) - unpacked.extend(glob.glob("%s/.*" % dir)) # include hidden files and folders - # Check that dir contains nothing but a single folder: - if len(unpacked) == 1 and os.path.isdir(unpacked[0]): - return unpacked[0] - else: - return dir - - def _unpack_tar(self, dir, filters): - """ - Unpack a tarball to I{dir} applying a list of I{filters}. Leave the - cleanup to the caller in case of an error. - """ - try: - unpackArchive = gbpc.UnpackTarArchive(self.path, dir, filters) - unpackArchive() - except gbpc.CommandExecFailed: - # unpackArchive already printed an error message - raise GbpError - - def pack(self, newarchive, filters=None): - """ - Recreate a new archive from the current one - - @param newarchive: the name of the new archive - @type newarchive: string - @param filters: tar filters to apply - @type filters: array of strings - @return: the new upstream source - @rtype: UpstreamSource - """ - if not self.unpacked: - raise GbpError("Need an unpacked source tree to pack") - - if not filters: - filters = [] - - if not isinstance(filters, list): - raise GbpError("Filters must be a list") - - try: - unpacked = self.unpacked.rstrip('/') - repackArchive = gbpc.PackTarArchive(newarchive, - os.path.dirname(unpacked), - os.path.basename(unpacked), - filters) - repackArchive() - except gbpc.CommandExecFailed: - # repackArchive already printed an error - raise GbpError - return type(self)(newarchive) - - @staticmethod - def known_compressions(): - return Compressor.Exts.values() - - def guess_version(self, extra_regex=r''): - return self._pkg_policy.guess_upstream_src_version(self.path, - extra_regex) +from gbp.pkg.pkgpolicy import PkgPolicy # noqa: F401 +from gbp.pkg.compressor import Compressor # noqa: F401 +from gbp.pkg.archive import Archive # noqa: F401 +from gbp.pkg.upstreamsource import UpstreamSource # noqa: F401 +from gbp.pkg.pristinetar import PristineTar # noqa: F401 diff --git a/gbp/pkg/archive.py b/gbp/pkg/archive.py new file mode 100644 index 00000000..007dfb28 --- /dev/null +++ b/gbp/pkg/archive.py @@ -0,0 +1,82 @@ +# vim: set fileencoding=utf-8 : +# +# (C) 2017 Guido Guenther <agx@sigxcpu.org> +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, please see +# <http://www.gnu.org/licenses/> + + +import six +from .compressor import Compressor + + +class Archive(object): + # Supported archive formats + Formats = ['tar', 'zip'] + + # Map combined file extensions to archive and compression format + Ext_aliases = {'tgz': ('tar', 'gzip'), + 'tbz2': ('tar', 'bzip2'), + 'tlz': ('tar', 'lzma'), + 'txz': ('tar', 'xz')} + + @staticmethod + def parse_filename(filename): + """ + Given an filename return the basename (filename without the + archive and compression extensions), archive format and + compression method used. + + @param filename: the name of the file + @type filename: string + @return: tuple containing basename, archive format and compression method + @rtype: C{tuple} of C{str} + + >>> Archive.parse_filename("abc.tar.gz") + ('abc', 'tar', 'gzip') + >>> Archive.parse_filename("abc.tar.bz2") + ('abc', 'tar', 'bzip2') + >>> Archive.parse_filename("abc.def.tbz2") + ('abc.def', 'tar', 'bzip2') + >>> Archive.parse_filename("abc.def.tar.xz") + ('abc.def', 'tar', 'xz') + >>> Archive.parse_filename("abc.zip") + ('abc', 'zip', None) + >>> Archive.parse_filename("abc.lzma") + ('abc', None, 'lzma') + >>> Archive.parse_filename("abc.tar.foo") + ('abc.tar.foo', None, None) + >>> Archive.parse_filename("abc") + ('abc', None, None) + """ + (base_name, archive_fmt, compression) = (filename, None, None) + + # Split filename into pieces + split = filename.split(".") + if len(split) > 1: + if split[-1] in Archive.Ext_aliases: + base_name = ".".join(split[:-1]) + (archive_fmt, compression) = Archive.Ext_aliases[split[-1]] + elif split[-1] in Archive.Formats: + base_name = ".".join(split[:-1]) + (archive_fmt, compression) = (split[-1], None) + else: + for (c, ext) in six.iteritems(Compressor.Exts): + if ext == split[-1]: + base_name = ".".join(split[:-1]) + compression = c + if len(split) > 2 and split[-2] in Archive.Formats: + base_name = ".".join(split[:-2]) + archive_fmt = split[-2] + + return (base_name, archive_fmt, compression) diff --git a/gbp/pkg/compressor.py b/gbp/pkg/compressor.py new file mode 100644 index 00000000..b1b795fb --- /dev/null +++ b/gbp/pkg/compressor.py @@ -0,0 +1,75 @@ +# vim: set fileencoding=utf-8 : +# +# (C) 2017 Guido Guenther <agx@sigxcpu.org> +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, please see +# <http://www.gnu.org/licenses/> + + +class Compressor(object): + # Map frequently used names of compression types to the internal ones: + Aliases = {'bz2': 'bzip2', + 'gz': 'gzip', } + + Opts = {'gzip': '-n', + 'bzip2': '', + 'lzma': '', + 'xz': ''} + + Exts = {'gzip': 'gz', + 'bzip2': 'bz2', + 'lzma': 'lzma', + 'xz': 'xz'} + + def __init__(self, type_, level=None): + self._type = type_ + self._level = int(level) if level not in [None, ''] else None + + def is_known(self): + return self.type in self.Opts.keys() + + @property + def type(self): + return self._type + + @property + def level(self): + return self._level + + @property + def _level_opt(self): + return '-%d' % self.level if self.level is not None else '' + + @property + def _more_opts(self): + return self.Opts.get(self._type, '') + + def cmdline(self, stdout=True): + """ + >>> Compressor('gzip', level=9).cmdline() + 'gzip -9 -n -c' + >>> Compressor('gzip').cmdline(True) + 'gzip -n -c' + """ + return "%s %s %s %s" % (self.type, self._level_opt, self._more_opts, + "-c" if stdout else '') + + def __repr__(self): + """ + >>> Compressor('gzip').__repr__() + "<compressor type='gzip' >" + >>> Compressor('gzip', 9).__repr__() + "<compressor type='gzip' level=9>" + """ + level_str = "level=%s" % self.level if self.level is not None else '' + return "<compressor type='%s' %s>" % (self.type, level_str) diff --git a/gbp/pkg/pkgpolicy.py b/gbp/pkg/pkgpolicy.py new file mode 100644 index 00000000..eb2d72ea --- /dev/null +++ b/gbp/pkg/pkgpolicy.py @@ -0,0 +1,167 @@ +# vim: set fileencoding=utf-8 : +# +# (C) 2017 Guido Guenther <agx@sigxcpu.org> +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, please see +# <http://www.gnu.org/licenses/> + + +import os +import re + +from gbp.pkg.archive import Archive + + +class PkgPolicy(object): + """ + Common helpers for packaging policy. + """ + packagename_re = None + packagename_msg = None + upstreamversion_re = None + upstreamversion_msg = None + + @classmethod + def is_valid_packagename(cls, name): + """ + Is this a valid package name? + + >>> PkgPolicy.is_valid_packagename('doesnotmatter') + Traceback (most recent call last): + ... + NotImplementedError: Class needs to provide packagename_re + """ + if cls.packagename_re is None: + raise NotImplementedError("Class needs to provide packagename_re") + return True if cls.packagename_re.match(name) else False + + @classmethod + def is_valid_upstreamversion(cls, version): + """ + Is this a valid upstream version number? + + >>> PkgPolicy.is_valid_upstreamversion('doesnotmatter') + Traceback (most recent call last): + ... + NotImplementedError: Class needs to provide upstreamversion_re + """ + if cls.upstreamversion_re is None: + raise NotImplementedError("Class needs to provide upstreamversion_re") + return True if cls.upstreamversion_re.match(version) else False + + @staticmethod + def guess_upstream_src_version(filename, extra_regex=r''): + """ + Guess the package name and version from the filename of an upstream + archive. + + @param filename: filename (archive or directory) from which to guess + @type filename: C{string} + @param extra_regex: additional regex to apply, needs a 'package' and a + 'version' group + @return: (package name, version) or ('', '') + @rtype: tuple + + >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.gz') + ('foo-bar', '0.2') + >>> PkgPolicy.guess_upstream_src_version('foo-Bar_0.2.orig.tar.gz') + ('', '') + >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2.tar.gz') + ('git-bar', '0.2') + >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2-rc1.tar.gz') + ('git-bar', '0.2-rc1') + >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2:~-rc1.tar.gz') + ('git-bar', '0.2:~-rc1') + >>> PkgPolicy.guess_upstream_src_version('git-Bar-0A2d:rc1.tar.bz2') + ('git-Bar', '0A2d:rc1') + >>> PkgPolicy.guess_upstream_src_version('git-1.tar.bz2') + ('git', '1') + >>> PkgPolicy.guess_upstream_src_version('kvm_87+dfsg.orig.tar.gz') + ('kvm', '87+dfsg') + >>> PkgPolicy.guess_upstream_src_version('foo-Bar-a.b.tar.gz') + ('', '') + >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.xz') + ('foo-bar', '0.2') + >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.lzma') + ('foo-bar', '0.2') + >>> PkgPolicy.guess_upstream_src_version('foo-bar-0.2.zip') + ('foo-bar', '0.2') + >>> PkgPolicy.guess_upstream_src_version('foo-bar-0.2.tlz') + ('foo-bar', '0.2') + >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.tar.gz') + ('foo-bar', '0.2') + """ + version_chars = r'[a-zA-Z\d\.\~\-\:\+]' + basename = Archive.parse_filename(os.path.basename(filename))[0] + + version_filters = map( + lambda x: x % version_chars, + ( # Debian upstream tarball: package_'<version>.orig.tar.gz' + r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)\.orig', + # Debian native: 'package_<version>.tar.gz' + r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)', + # Upstream 'package-<version>.tar.gz' + # or directory 'package-<version>': + r'^(?P<package>[a-zA-Z\d\.\+\-]+)(-)(?P<version>[0-9]%s*)')) + if extra_regex: + version_filters = extra_regex + version_filters + + for filter in version_filters: + m = re.match(filter, basename) + if m: + return (m.group('package'), m.group('version')) + return ('', '') + + @staticmethod + def has_origs(orig_files, dir): + "Check orig tarball and additional tarballs exists in dir" + for o in orig_files: + if not os.path.exists(os.path.join(dir, o)): + return False + return True + + @classmethod + def has_orig(cls, orig_file, dir): + return cls.has_origs([orig_file], dir) + + @staticmethod + def symlink_origs(orig_files, orig_dir, output_dir, force=False): + """ + symlink orig tarball from orig_dir to output_dir + @return: [] if all links were created, list of + failed links otherwise + """ + orig_dir = os.path.abspath(orig_dir) + output_dir = os.path.abspath(output_dir) + err = [] + + if orig_dir == output_dir: + return [] + + for f in orig_files: + src = os.path.join(orig_dir, f) + dst = os.path.join(output_dir, f) + if not os.access(src, os.F_OK): + err.append(f) + continue + try: + if os.path.exists(dst) and force: + os.unlink(dst) + os.symlink(src, dst) + except OSError: + err.append(f) + return err + + @classmethod + def symlink_orig(cls, orig_file, orig_dir, output_dir, force=False): + return cls.symlink_origs([orig_file], orig_dir, output_dir, force=force) diff --git a/gbp/pkg/upstreamsource.py b/gbp/pkg/upstreamsource.py new file mode 100644 index 00000000..9716ed2a --- /dev/null +++ b/gbp/pkg/upstreamsource.py @@ -0,0 +1,193 @@ +# vim: set fileencoding=utf-8 : +# +# (C) 2017 Guido Guenther <agx@sigxcpu.org> +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, please see +# <http://www.gnu.org/licenses/> + +import glob +import os + +import gbp.command_wrappers as gbpc + +from gbp.pkg.compressor import Compressor +from gbp.pkg.pkgpolicy import PkgPolicy + +from gbp.errors import GbpError + + +class UpstreamSource(object): + """ + Upstream source. Can be either an unpacked dir, a tarball or another type + of archive + + @cvar _orig: are the upstream sources already suitable as an upstream + tarball + @type _orig: boolean + @cvar _path: path to the upstream sources + @type _path: string + @cvar _unpacked: path to the unpacked source tree + @type _unpacked: string + """ + def __init__(self, name, unpacked=None, pkg_policy=PkgPolicy): + self._orig = False + self._pkg_policy = pkg_policy + self._path = name + self.unpacked = unpacked + + self._check_orig() + if self.is_dir(): + self.unpacked = self.path + + def _check_orig(self): + """ + Check if upstream source format can be used as orig tarball. + This doesn't imply that the tarball is correctly named. + + @return: C{True} if upstream source format is suitable + as upstream tarball, C{False} otherwise. + @rtype: C{bool} + """ + if self.is_dir(): + self._orig = False + return + + parts = self._path.split('.') + try: + if parts[-1] == 'tgz': + self._orig = True + elif parts[-2] == 'tar': + if (parts[-1] in Compressor.Opts or + parts[-1] in Compressor.Aliases): + self._orig = True + except IndexError: + self._orig = False + + def is_orig(self): + """ + @return: C{True} if sources are suitable as orig tarball, + C{False} otherwise + @rtype: C{bool} + """ + return self._orig + + def is_dir(self): + """ + @return: C{True} if if upstream sources are an unpacked directory, + C{False} otherwise + @rtype: C{bool} + """ + return True if os.path.isdir(self._path) else False + + @property + def path(self): + return self._path.rstrip('/') + + def unpack(self, dir, filters=None): + """ + Unpack packed upstream sources into a given directory + (filtering out files specified by filters) and determine the + toplevel of the source tree. + """ + if self.is_dir(): + raise GbpError("Cannot unpack directory %s" % self.path) + + if not filters: + filters = [] + + if not isinstance(filters, list): + raise GbpError("Filters must be a list") + + self._unpack_archive(dir, filters) + self.unpacked = self._unpacked_toplevel(dir) + + def _unpack_archive(self, dir, filters): + """ + Unpack packed upstream sources into a given directory + allowing to filter out files in case of tar archives. + """ + ext = os.path.splitext(self.path)[1] + if ext in [".zip", ".xpi"]: + if filters: + raise GbpError("Can only filter tar archives: %s", (ext, self.path)) + self._unpack_zip(dir) + else: + self._unpack_tar(dir, filters) + + def _unpack_zip(self, dir): + try: + gbpc.UnpackZipArchive(self.path, dir)() + except gbpc.CommandExecFailed: + raise GbpError("Unpacking of %s failed" % self.path) + + def _unpacked_toplevel(self, dir): + """unpacked archives can contain a leading directory or not""" + unpacked = glob.glob('%s/*' % dir) + unpacked.extend(glob.glob("%s/.*" % dir)) # include hidden files and folders + # Check that dir contains nothing but a single folder: + if len(unpacked) == 1 and os.path.isdir(unpacked[0]): + return unpacked[0] + else: + return dir + + def _unpack_tar(self, dir, filters): + """ + Unpack a tarball to I{dir} applying a list of I{filters}. Leave the + cleanup to the caller in case of an error. + """ + try: + unpackArchive = gbpc.UnpackTarArchive(self.path, dir, filters) + unpackArchive() + except gbpc.CommandExecFailed: + # unpackArchive already printed an error message + raise GbpError + + def pack(self, newarchive, filters=None): + """ + Recreate a new archive from the current one + + @param newarchive: the name of the new archive + @type newarchive: string + @param filters: tar filters to apply + @type filters: array of strings + @return: the new upstream source + @rtype: UpstreamSource + """ + if not self.unpacked: + raise GbpError("Need an unpacked source tree to pack") + + if not filters: + filters = [] + + if not isinstance(filters, list): + raise GbpError("Filters must be a list") + + try: + unpacked = self.unpacked.rstrip('/') + repackArchive = gbpc.PackTarArchive(newarchive, + os.path.dirname(unpacked), + os.path.basename(unpacked), + filters) + repackArchive() + except gbpc.CommandExecFailed: + # repackArchive already printed an error + raise GbpError + return type(self)(newarchive) + + @staticmethod + def known_compressions(): + return Compressor.Exts.values() + + def guess_version(self, extra_regex=r''): + return self._pkg_policy.guess_upstream_src_version(self.path, + extra_regex) diff --git a/gbp/rpm/__init__.py b/gbp/rpm/__init__.py index 253a2e56..38ecbac8 100644 --- a/gbp/rpm/__init__.py +++ b/gbp/rpm/__init__.py @@ -30,7 +30,7 @@ from gbp.errors import GbpError from gbp.git import GitRepositoryError from gbp.patch_series import (PatchSeries, Patch) import gbp.log -from gbp.pkg import (UpstreamSource, parse_archive_filename) +from gbp.pkg import (UpstreamSource, Archive) from gbp.rpm.policy import RpmPkgPolicy from gbp.rpm.linkedlist import LinkedList from gbp.rpm.lib_rpm import librpm, get_librpm_log @@ -788,7 +788,7 @@ class SpecFile(object): src = {'num': num, 'filename': os.path.basename(filename), 'uri': filename} src['filename_base'], src['archive_fmt'], src['compression'] = \ - parse_archive_filename(os.path.basename(filename)) + Archive.parse_filename(os.path.basename(filename)) if (src['filename_base'].startswith(self.name) and src['archive_fmt']): # Take the first archive that starts with pkg name diff --git a/gbp/rpm/policy.py b/gbp/rpm/policy.py index e5be7fef..a027ed99 100644 --- a/gbp/rpm/policy.py +++ b/gbp/rpm/policy.py @@ -18,7 +18,7 @@ import re -from gbp.pkg import PkgPolicy, parse_archive_filename +from gbp.pkg import PkgPolicy, Archive from gbp.scripts.common.pq import parse_gbp_commands @@ -68,7 +68,7 @@ class RpmPkgPolicy(PkgPolicy): >>> RpmPkgPolicy.is_valid_orig_archive("foo.gz") False """ - _base, arch_fmt, _compression = parse_archive_filename(filename) + _base, arch_fmt, _compression = Archive.parse_filename(filename) if arch_fmt: return True return False diff --git a/gbp/scripts/common/__init__.py b/gbp/scripts/common/__init__.py index 44dd9298..e34a0c3c 100644 --- a/gbp/scripts/common/__init__.py +++ b/gbp/scripts/common/__init__.py @@ -20,7 +20,7 @@ import re import os from gbp.errors import GbpError from gbp.deb import DebianPkgPolicy -from gbp.pkg import parse_archive_filename +from gbp.pkg import Archive class ExitCodes(object): @@ -50,7 +50,7 @@ def get_component_tarballs(name, version, tarball, components): tarball. """ tarballs = [] - (_, _, comp_type) = parse_archive_filename(tarball) + (_, _, comp_type) = Archive.parse_filename(tarball) for component in components: cname = DebianPkgPolicy.build_tarball_name(name, version, diff --git a/gbp/scripts/export_orig.py b/gbp/scripts/export_orig.py index e6ef7986..f4fc4a12 100755 --- a/gbp/scripts/export_orig.py +++ b/gbp/scripts/export_orig.py @@ -24,7 +24,7 @@ from gbp.deb.git import GitRepositoryError from gbp.errors import GbpError import gbp.log import gbp.notifications -from gbp.pkg import Compressor, parse_archive_filename +from gbp.pkg import Compressor, Archive # upstream tarball preparation @@ -223,7 +223,7 @@ def guess_comp_type(repo, comp_type, source, tarball_dir): else: commit = repo.pristine_tar_branch tarball = repo.get_commit_info(commit)['subject'] - (base_name, archive_fmt, comp_type) = parse_archive_filename(tarball) + (base_name, archive_fmt, comp_type) = Archive.parse_filename(tarball) gbp.log.debug("Determined compression type '%s'" % comp_type) if not comp_type: comp_type = 'gzip' diff --git a/gbp/scripts/import_srpm.py b/gbp/scripts/import_srpm.py index 302c3422..a2629d80 100755 --- a/gbp/scripts/import_srpm.py +++ b/gbp/scripts/import_srpm.py @@ -39,7 +39,7 @@ from gbp.errors import GbpError from gbp.scripts.common import ExitCodes, is_download from gbp.scripts.common import repo_setup import gbp.log -from gbp.pkg import parse_archive_filename +from gbp.pkg import Archive no_packaging_branch_msg = """ Repository does not have branch '%s' for packaging/distribution sources. @@ -400,7 +400,7 @@ def main(argv): if not options.native: if options.pristine_tar: - archive_fmt = parse_archive_filename(orig_tarball)[1] + archive_fmt = Archive.parse_filename(orig_tarball)[1] if archive_fmt == 'tar': repo.pristine_tar.commit(orig_tarball, 'refs/heads/%s' % diff --git a/gbp/scripts/pq_rpm.py b/gbp/scripts/pq_rpm.py index adf1b4ab..ebce7ff2 100755 --- a/gbp/scripts/pq_rpm.py +++ b/gbp/scripts/pq_rpm.py @@ -33,7 +33,7 @@ from gbp.git.modifier import GitModifier from gbp.command_wrappers import GitCommand, CommandExecFailed from gbp.errors import GbpError from gbp.patch_series import PatchSeries, Patch -from gbp.pkg import parse_archive_filename +from gbp.pkg import Archive from gbp.rpm import (SpecFile, NoSpecError, guess_spec, guess_spec_repo, spec_from_repo) from gbp.scripts.common import ExitCodes @@ -227,7 +227,7 @@ def safe_patches(queue): gbp.log.debug("Saving patches '%s' in '%s'" % (os.path.dirname(queue[0].path), tmpdir)) for patch in queue: - base, _archive_fmt, comp = parse_archive_filename(patch.path) + base, _archive_fmt, comp = Archive.parse_filename(patch.path) uncompressors = {'gzip': gzip.open, 'bzip2': bz2.BZ2File} if comp in uncompressors: gbp.log.debug("Uncompressing '%s'" % os.path.basename(patch.path)) |