summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGuido Günther <agx@sigxcpu.org>2017-08-03 17:54:33 -0300
committerGuido Günther <agx@sigxcpu.org>2017-08-03 17:54:33 -0300
commit702819a4a3fdd9426bd36bbbd91a3d7ba916668a (patch)
tree154e6ccf02324b00f79d6aba79b3eaad30216db0
parent3b99b64630a7c6f165b15905733e007b229d7381 (diff)
pkg: clear __init__.py
for readability
-rw-r--r--gbp/deb/policy.py3
-rw-r--r--gbp/deb/pristinetar.py2
-rw-r--r--gbp/pkg/__init__.py447
-rw-r--r--gbp/pkg/archive.py82
-rw-r--r--gbp/pkg/compressor.py75
-rw-r--r--gbp/pkg/pkgpolicy.py167
-rw-r--r--gbp/pkg/upstreamsource.py193
-rw-r--r--gbp/rpm/__init__.py4
-rw-r--r--gbp/rpm/policy.py4
-rw-r--r--gbp/scripts/common/__init__.py4
-rwxr-xr-xgbp/scripts/export_orig.py4
-rwxr-xr-xgbp/scripts/import_srpm.py4
-rwxr-xr-xgbp/scripts/pq_rpm.py4
13 files changed, 537 insertions, 456 deletions
diff --git a/gbp/deb/policy.py b/gbp/deb/policy.py
index 6ea80fe4..9a118114 100644
--- a/gbp/deb/policy.py
+++ b/gbp/deb/policy.py
@@ -23,7 +23,8 @@ like allowed characters in version numbers, etc.
import os
import re
-from gbp.pkg import PkgPolicy, Compressor
+from gbp.pkg.pkgpolicy import PkgPolicy
+from gbp.pkg.compressor import Compressor
class DebianPkgPolicy(PkgPolicy):
diff --git a/gbp/deb/pristinetar.py b/gbp/deb/pristinetar.py
index bb034e77..92c627b1 100644
--- a/gbp/deb/pristinetar.py
+++ b/gbp/deb/pristinetar.py
@@ -16,7 +16,7 @@
# <http://www.gnu.org/licenses/>
"""Handle checkin and checkout of archives from the pristine-tar branch"""
-from gbp.pkg import Compressor
+from gbp.pkg.compressor import Compressor
from gbp.pkg.pristinetar import PristineTar
from gbp.deb import DebianPkgPolicy
diff --git a/gbp/pkg/__init__.py b/gbp/pkg/__init__.py
index c3dcde48..e6204e79 100644
--- a/gbp/pkg/__init__.py
+++ b/gbp/pkg/__init__.py
@@ -17,445 +17,8 @@
# <http://www.gnu.org/licenses/>
"""Common functionality of the Debian/RPM package helpers"""
-import os
-import re
-import glob
-
-import six
-
-import gbp.command_wrappers as gbpc
-from gbp.errors import GbpError
-
-
-# compression types, extra options and extensions
-class Compressor(object):
- # Map frequently used names of compression types to the internal ones:
- Aliases = {'bz2': 'bzip2',
- 'gz': 'gzip', }
-
- Opts = {'gzip': '-n',
- 'bzip2': '',
- 'lzma': '',
- 'xz': ''}
-
- Exts = {'gzip': 'gz',
- 'bzip2': 'bz2',
- 'lzma': 'lzma',
- 'xz': 'xz'}
-
- def __init__(self, type_, level=None):
- self._type = type_
- self._level = int(level) if level not in [None, ''] else None
-
- def is_known(self):
- return self.type in self.Opts.keys()
-
- @property
- def type(self):
- return self._type
-
- @property
- def level(self):
- return self._level
-
- @property
- def _level_opt(self):
- return '-%d' % self.level if self.level is not None else ''
-
- @property
- def _more_opts(self):
- return self.Opts.get(self._type, '')
-
- def cmdline(self, stdout=True):
- """
- >>> Compressor('gzip', level=9).cmdline()
- 'gzip -9 -n -c'
- >>> Compressor('gzip').cmdline(True)
- 'gzip -n -c'
- """
- return "%s %s %s %s" % (self.type, self._level_opt, self._more_opts,
- "-c" if stdout else '')
-
- def __repr__(self):
- """
- >>> Compressor('gzip').__repr__()
- "<compressor type='gzip' >"
- >>> Compressor('gzip', 9).__repr__()
- "<compressor type='gzip' level=9>"
- """
- level_str = "level=%s" % self.level if self.level is not None else ''
- return "<compressor type='%s' %s>" % (self.type, level_str)
-
-
-# Supported archive formats
-archive_formats = ['tar', 'zip']
-
-# Map combined file extensions to archive and compression format
-archive_ext_aliases = {'tgz': ('tar', 'gzip'),
- 'tbz2': ('tar', 'bzip2'),
- 'tlz': ('tar', 'lzma'),
- 'txz': ('tar', 'xz')}
-
-
-
-def parse_archive_filename(filename):
- """
- Given a filename return the basename (filename without the
- archive and compression extensions), archive format and
- compression method used.
-
- @param filename: the name of the file
- @type filename: string
- @return: tuple containing basename, archive format and compression method
- @rtype: C{tuple} of C{str}
-
- >>> parse_archive_filename("abc.tar.gz")
- ('abc', 'tar', 'gzip')
- >>> parse_archive_filename("abc.tar.bz2")
- ('abc', 'tar', 'bzip2')
- >>> parse_archive_filename("abc.def.tbz2")
- ('abc.def', 'tar', 'bzip2')
- >>> parse_archive_filename("abc.def.tar.xz")
- ('abc.def', 'tar', 'xz')
- >>> parse_archive_filename("abc.zip")
- ('abc', 'zip', None)
- >>> parse_archive_filename("abc.lzma")
- ('abc', None, 'lzma')
- >>> parse_archive_filename("abc.tar.foo")
- ('abc.tar.foo', None, None)
- >>> parse_archive_filename("abc")
- ('abc', None, None)
- """
- (base_name, archive_fmt, compression) = (filename, None, None)
-
- # Split filename to pieces
- split = filename.split(".")
- if len(split) > 1:
- if split[-1] in archive_ext_aliases:
- base_name = ".".join(split[:-1])
- (archive_fmt, compression) = archive_ext_aliases[split[-1]]
- elif split[-1] in archive_formats:
- base_name = ".".join(split[:-1])
- (archive_fmt, compression) = (split[-1], None)
- else:
- for (c, ext) in six.iteritems(Compressor.Exts):
- if ext == split[-1]:
- base_name = ".".join(split[:-1])
- compression = c
- if len(split) > 2 and split[-2] in archive_formats:
- base_name = ".".join(split[:-2])
- archive_fmt = split[-2]
-
- return (base_name, archive_fmt, compression)
-
-
-class PkgPolicy(object):
- """
- Common helpers for packaging policy.
- """
- packagename_re = None
- packagename_msg = None
- upstreamversion_re = None
- upstreamversion_msg = None
-
- @classmethod
- def is_valid_packagename(cls, name):
- """
- Is this a valid package name?
-
- >>> PkgPolicy.is_valid_packagename('doesnotmatter')
- Traceback (most recent call last):
- ...
- NotImplementedError: Class needs to provide packagename_re
- """
- if cls.packagename_re is None:
- raise NotImplementedError("Class needs to provide packagename_re")
- return True if cls.packagename_re.match(name) else False
-
- @classmethod
- def is_valid_upstreamversion(cls, version):
- """
- Is this a valid upstream version number?
-
- >>> PkgPolicy.is_valid_upstreamversion('doesnotmatter')
- Traceback (most recent call last):
- ...
- NotImplementedError: Class needs to provide upstreamversion_re
- """
- if cls.upstreamversion_re is None:
- raise NotImplementedError("Class needs to provide upstreamversion_re")
- return True if cls.upstreamversion_re.match(version) else False
-
- @staticmethod
- def guess_upstream_src_version(filename, extra_regex=r''):
- """
- Guess the package name and version from the filename of an upstream
- archive.
-
- @param filename: filename (archive or directory) from which to guess
- @type filename: C{string}
- @param extra_regex: additional regex to apply, needs a 'package' and a
- 'version' group
- @return: (package name, version) or ('', '')
- @rtype: tuple
-
- >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.gz')
- ('foo-bar', '0.2')
- >>> PkgPolicy.guess_upstream_src_version('foo-Bar_0.2.orig.tar.gz')
- ('', '')
- >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2.tar.gz')
- ('git-bar', '0.2')
- >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2-rc1.tar.gz')
- ('git-bar', '0.2-rc1')
- >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2:~-rc1.tar.gz')
- ('git-bar', '0.2:~-rc1')
- >>> PkgPolicy.guess_upstream_src_version('git-Bar-0A2d:rc1.tar.bz2')
- ('git-Bar', '0A2d:rc1')
- >>> PkgPolicy.guess_upstream_src_version('git-1.tar.bz2')
- ('git', '1')
- >>> PkgPolicy.guess_upstream_src_version('kvm_87+dfsg.orig.tar.gz')
- ('kvm', '87+dfsg')
- >>> PkgPolicy.guess_upstream_src_version('foo-Bar-a.b.tar.gz')
- ('', '')
- >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.xz')
- ('foo-bar', '0.2')
- >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.lzma')
- ('foo-bar', '0.2')
- >>> PkgPolicy.guess_upstream_src_version('foo-bar-0.2.zip')
- ('foo-bar', '0.2')
- >>> PkgPolicy.guess_upstream_src_version('foo-bar-0.2.tlz')
- ('foo-bar', '0.2')
- >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.tar.gz')
- ('foo-bar', '0.2')
- """
- version_chars = r'[a-zA-Z\d\.\~\-\:\+]'
- basename = parse_archive_filename(os.path.basename(filename))[0]
-
- version_filters = map(
- lambda x: x % version_chars,
- ( # Debian upstream tarball: package_'<version>.orig.tar.gz'
- r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)\.orig',
- # Debian native: 'package_<version>.tar.gz'
- r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)',
- # Upstream 'package-<version>.tar.gz'
- # or directory 'package-<version>':
- r'^(?P<package>[a-zA-Z\d\.\+\-]+)(-)(?P<version>[0-9]%s*)'))
- if extra_regex:
- version_filters = extra_regex + version_filters
-
- for filter in version_filters:
- m = re.match(filter, basename)
- if m:
- return (m.group('package'), m.group('version'))
- return ('', '')
-
- @staticmethod
- def has_origs(orig_files, dir):
- "Check orig tarball and sub tarballs exists in dir"
- for o in orig_files:
- if not os.path.exists(os.path.join(dir, o)):
- return False
- return True
-
- @classmethod
- def has_orig(cls, orig_file, dir):
- return cls.has_origs([orig_file], dir)
-
- @staticmethod
- def symlink_origs(orig_files, orig_dir, output_dir, force=False):
- """
- symlink orig tarball from orig_dir to output_dir
- @return: [] if all links were created, list of
- failed links otherwise
- """
- orig_dir = os.path.abspath(orig_dir)
- output_dir = os.path.abspath(output_dir)
- err = []
-
- if orig_dir == output_dir:
- return []
-
- for f in orig_files:
- src = os.path.join(orig_dir, f)
- dst = os.path.join(output_dir, f)
- if not os.access(src, os.F_OK):
- err.append(f)
- continue
- try:
- if os.path.exists(dst) and force:
- os.unlink(dst)
- os.symlink(src, dst)
- except OSError:
- err.append(f)
- return err
-
- @classmethod
- def symlink_orig(cls, orig_file, orig_dir, output_dir, force=False):
- return cls.symlink_origs([orig_file], orig_dir, output_dir, force=force)
-
-
-class UpstreamSource(object):
- """
- Upstream source. Can be either an unpacked dir, a tarball or another type
- of archive
-
- @cvar _orig: are the upstream sources already suitable as an upstream
- tarball
- @type _orig: boolean
- @cvar _path: path to the upstream sources
- @type _path: string
- @cvar _unpacked: path to the unpacked source tree
- @type _unpacked: string
- """
- def __init__(self, name, unpacked=None, pkg_policy=PkgPolicy):
- self._orig = False
- self._pkg_policy = pkg_policy
- self._path = name
- self.unpacked = unpacked
-
- self._check_orig()
- if self.is_dir():
- self.unpacked = self.path
-
- def _check_orig(self):
- """
- Check if upstream source format can be used as orig tarball.
- This doesn't imply that the tarball is correctly named.
-
- @return: C{True} if upstream source format is suitable
- as upstream tarball, C{False} otherwise.
- @rtype: C{bool}
- """
- if self.is_dir():
- self._orig = False
- return
-
- parts = self._path.split('.')
- try:
- if parts[-1] == 'tgz':
- self._orig = True
- elif parts[-2] == 'tar':
- if (parts[-1] in Compressor.Opts or
- parts[-1] in Compressor.Aliases):
- self._orig = True
- except IndexError:
- self._orig = False
-
- def is_orig(self):
- """
- @return: C{True} if sources are suitable as orig tarball,
- C{False} otherwise
- @rtype: C{bool}
- """
- return self._orig
-
- def is_dir(self):
- """
- @return: C{True} if if upstream sources are an unpacked directory,
- C{False} otherwise
- @rtype: C{bool}
- """
- return True if os.path.isdir(self._path) else False
-
- @property
- def path(self):
- return self._path.rstrip('/')
-
- def unpack(self, dir, filters=None):
- """
- Unpack packed upstream sources into a given directory
- (filtering out files specified by filters) and determine the
- toplevel of the source tree.
- """
- if self.is_dir():
- raise GbpError("Cannot unpack directory %s" % self.path)
-
- if not filters:
- filters = []
-
- if not isinstance(filters, list):
- raise GbpError("Filters must be a list")
-
- self._unpack_archive(dir, filters)
- self.unpacked = self._unpacked_toplevel(dir)
-
- def _unpack_archive(self, dir, filters):
- """
- Unpack packed upstream sources into a given directory
- allowing to filter out files in case of tar archives.
- """
- ext = os.path.splitext(self.path)[1]
- if ext in [".zip", ".xpi"]:
- if filters:
- raise GbpError("Can only filter tar archives: %s", (ext, self.path))
- self._unpack_zip(dir)
- else:
- self._unpack_tar(dir, filters)
-
- def _unpack_zip(self, dir):
- try:
- gbpc.UnpackZipArchive(self.path, dir)()
- except gbpc.CommandExecFailed:
- raise GbpError("Unpacking of %s failed" % self.path)
-
- def _unpacked_toplevel(self, dir):
- """unpacked archives can contain a leading directory or not"""
- unpacked = glob.glob('%s/*' % dir)
- unpacked.extend(glob.glob("%s/.*" % dir)) # include hidden files and folders
- # Check that dir contains nothing but a single folder:
- if len(unpacked) == 1 and os.path.isdir(unpacked[0]):
- return unpacked[0]
- else:
- return dir
-
- def _unpack_tar(self, dir, filters):
- """
- Unpack a tarball to I{dir} applying a list of I{filters}. Leave the
- cleanup to the caller in case of an error.
- """
- try:
- unpackArchive = gbpc.UnpackTarArchive(self.path, dir, filters)
- unpackArchive()
- except gbpc.CommandExecFailed:
- # unpackArchive already printed an error message
- raise GbpError
-
- def pack(self, newarchive, filters=None):
- """
- Recreate a new archive from the current one
-
- @param newarchive: the name of the new archive
- @type newarchive: string
- @param filters: tar filters to apply
- @type filters: array of strings
- @return: the new upstream source
- @rtype: UpstreamSource
- """
- if not self.unpacked:
- raise GbpError("Need an unpacked source tree to pack")
-
- if not filters:
- filters = []
-
- if not isinstance(filters, list):
- raise GbpError("Filters must be a list")
-
- try:
- unpacked = self.unpacked.rstrip('/')
- repackArchive = gbpc.PackTarArchive(newarchive,
- os.path.dirname(unpacked),
- os.path.basename(unpacked),
- filters)
- repackArchive()
- except gbpc.CommandExecFailed:
- # repackArchive already printed an error
- raise GbpError
- return type(self)(newarchive)
-
- @staticmethod
- def known_compressions():
- return Compressor.Exts.values()
-
- def guess_version(self, extra_regex=r''):
- return self._pkg_policy.guess_upstream_src_version(self.path,
- extra_regex)
+from gbp.pkg.pkgpolicy import PkgPolicy # noqa: F401
+from gbp.pkg.compressor import Compressor # noqa: F401
+from gbp.pkg.archive import Archive # noqa: F401
+from gbp.pkg.upstreamsource import UpstreamSource # noqa: F401
+from gbp.pkg.pristinetar import PristineTar # noqa: F401
diff --git a/gbp/pkg/archive.py b/gbp/pkg/archive.py
new file mode 100644
index 00000000..007dfb28
--- /dev/null
+++ b/gbp/pkg/archive.py
@@ -0,0 +1,82 @@
+# vim: set fileencoding=utf-8 :
+#
+# (C) 2017 Guido Guenther <agx@sigxcpu.org>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, please see
+# <http://www.gnu.org/licenses/>
+
+
+import six
+from .compressor import Compressor
+
+
+class Archive(object):
+ # Supported archive formats
+ Formats = ['tar', 'zip']
+
+ # Map combined file extensions to archive and compression format
+ Ext_aliases = {'tgz': ('tar', 'gzip'),
+ 'tbz2': ('tar', 'bzip2'),
+ 'tlz': ('tar', 'lzma'),
+ 'txz': ('tar', 'xz')}
+
+ @staticmethod
+ def parse_filename(filename):
+ """
+ Given an filename return the basename (filename without the
+ archive and compression extensions), archive format and
+ compression method used.
+
+ @param filename: the name of the file
+ @type filename: string
+ @return: tuple containing basename, archive format and compression method
+ @rtype: C{tuple} of C{str}
+
+ >>> Archive.parse_filename("abc.tar.gz")
+ ('abc', 'tar', 'gzip')
+ >>> Archive.parse_filename("abc.tar.bz2")
+ ('abc', 'tar', 'bzip2')
+ >>> Archive.parse_filename("abc.def.tbz2")
+ ('abc.def', 'tar', 'bzip2')
+ >>> Archive.parse_filename("abc.def.tar.xz")
+ ('abc.def', 'tar', 'xz')
+ >>> Archive.parse_filename("abc.zip")
+ ('abc', 'zip', None)
+ >>> Archive.parse_filename("abc.lzma")
+ ('abc', None, 'lzma')
+ >>> Archive.parse_filename("abc.tar.foo")
+ ('abc.tar.foo', None, None)
+ >>> Archive.parse_filename("abc")
+ ('abc', None, None)
+ """
+ (base_name, archive_fmt, compression) = (filename, None, None)
+
+ # Split filename into pieces
+ split = filename.split(".")
+ if len(split) > 1:
+ if split[-1] in Archive.Ext_aliases:
+ base_name = ".".join(split[:-1])
+ (archive_fmt, compression) = Archive.Ext_aliases[split[-1]]
+ elif split[-1] in Archive.Formats:
+ base_name = ".".join(split[:-1])
+ (archive_fmt, compression) = (split[-1], None)
+ else:
+ for (c, ext) in six.iteritems(Compressor.Exts):
+ if ext == split[-1]:
+ base_name = ".".join(split[:-1])
+ compression = c
+ if len(split) > 2 and split[-2] in Archive.Formats:
+ base_name = ".".join(split[:-2])
+ archive_fmt = split[-2]
+
+ return (base_name, archive_fmt, compression)
diff --git a/gbp/pkg/compressor.py b/gbp/pkg/compressor.py
new file mode 100644
index 00000000..b1b795fb
--- /dev/null
+++ b/gbp/pkg/compressor.py
@@ -0,0 +1,75 @@
+# vim: set fileencoding=utf-8 :
+#
+# (C) 2017 Guido Guenther <agx@sigxcpu.org>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, please see
+# <http://www.gnu.org/licenses/>
+
+
+class Compressor(object):
+ # Map frequently used names of compression types to the internal ones:
+ Aliases = {'bz2': 'bzip2',
+ 'gz': 'gzip', }
+
+ Opts = {'gzip': '-n',
+ 'bzip2': '',
+ 'lzma': '',
+ 'xz': ''}
+
+ Exts = {'gzip': 'gz',
+ 'bzip2': 'bz2',
+ 'lzma': 'lzma',
+ 'xz': 'xz'}
+
+ def __init__(self, type_, level=None):
+ self._type = type_
+ self._level = int(level) if level not in [None, ''] else None
+
+ def is_known(self):
+ return self.type in self.Opts.keys()
+
+ @property
+ def type(self):
+ return self._type
+
+ @property
+ def level(self):
+ return self._level
+
+ @property
+ def _level_opt(self):
+ return '-%d' % self.level if self.level is not None else ''
+
+ @property
+ def _more_opts(self):
+ return self.Opts.get(self._type, '')
+
+ def cmdline(self, stdout=True):
+ """
+ >>> Compressor('gzip', level=9).cmdline()
+ 'gzip -9 -n -c'
+ >>> Compressor('gzip').cmdline(True)
+ 'gzip -n -c'
+ """
+ return "%s %s %s %s" % (self.type, self._level_opt, self._more_opts,
+ "-c" if stdout else '')
+
+ def __repr__(self):
+ """
+ >>> Compressor('gzip').__repr__()
+ "<compressor type='gzip' >"
+ >>> Compressor('gzip', 9).__repr__()
+ "<compressor type='gzip' level=9>"
+ """
+ level_str = "level=%s" % self.level if self.level is not None else ''
+ return "<compressor type='%s' %s>" % (self.type, level_str)
diff --git a/gbp/pkg/pkgpolicy.py b/gbp/pkg/pkgpolicy.py
new file mode 100644
index 00000000..eb2d72ea
--- /dev/null
+++ b/gbp/pkg/pkgpolicy.py
@@ -0,0 +1,167 @@
+# vim: set fileencoding=utf-8 :
+#
+# (C) 2017 Guido Guenther <agx@sigxcpu.org>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, please see
+# <http://www.gnu.org/licenses/>
+
+
+import os
+import re
+
+from gbp.pkg.archive import Archive
+
+
+class PkgPolicy(object):
+ """
+ Common helpers for packaging policy.
+ """
+ packagename_re = None
+ packagename_msg = None
+ upstreamversion_re = None
+ upstreamversion_msg = None
+
+ @classmethod
+ def is_valid_packagename(cls, name):
+ """
+ Is this a valid package name?
+
+ >>> PkgPolicy.is_valid_packagename('doesnotmatter')
+ Traceback (most recent call last):
+ ...
+ NotImplementedError: Class needs to provide packagename_re
+ """
+ if cls.packagename_re is None:
+ raise NotImplementedError("Class needs to provide packagename_re")
+ return True if cls.packagename_re.match(name) else False
+
+ @classmethod
+ def is_valid_upstreamversion(cls, version):
+ """
+ Is this a valid upstream version number?
+
+ >>> PkgPolicy.is_valid_upstreamversion('doesnotmatter')
+ Traceback (most recent call last):
+ ...
+ NotImplementedError: Class needs to provide upstreamversion_re
+ """
+ if cls.upstreamversion_re is None:
+ raise NotImplementedError("Class needs to provide upstreamversion_re")
+ return True if cls.upstreamversion_re.match(version) else False
+
+ @staticmethod
+ def guess_upstream_src_version(filename, extra_regex=r''):
+ """
+ Guess the package name and version from the filename of an upstream
+ archive.
+
+ @param filename: filename (archive or directory) from which to guess
+ @type filename: C{string}
+ @param extra_regex: additional regex to apply, needs a 'package' and a
+ 'version' group
+ @return: (package name, version) or ('', '')
+ @rtype: tuple
+
+ >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.gz')
+ ('foo-bar', '0.2')
+ >>> PkgPolicy.guess_upstream_src_version('foo-Bar_0.2.orig.tar.gz')
+ ('', '')
+ >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2.tar.gz')
+ ('git-bar', '0.2')
+ >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2-rc1.tar.gz')
+ ('git-bar', '0.2-rc1')
+ >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2:~-rc1.tar.gz')
+ ('git-bar', '0.2:~-rc1')
+ >>> PkgPolicy.guess_upstream_src_version('git-Bar-0A2d:rc1.tar.bz2')
+ ('git-Bar', '0A2d:rc1')
+ >>> PkgPolicy.guess_upstream_src_version('git-1.tar.bz2')
+ ('git', '1')
+ >>> PkgPolicy.guess_upstream_src_version('kvm_87+dfsg.orig.tar.gz')
+ ('kvm', '87+dfsg')
+ >>> PkgPolicy.guess_upstream_src_version('foo-Bar-a.b.tar.gz')
+ ('', '')
+ >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.xz')
+ ('foo-bar', '0.2')
+ >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.lzma')
+ ('foo-bar', '0.2')
+ >>> PkgPolicy.guess_upstream_src_version('foo-bar-0.2.zip')
+ ('foo-bar', '0.2')
+ >>> PkgPolicy.guess_upstream_src_version('foo-bar-0.2.tlz')
+ ('foo-bar', '0.2')
+ >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.tar.gz')
+ ('foo-bar', '0.2')
+ """
+ version_chars = r'[a-zA-Z\d\.\~\-\:\+]'
+ basename = Archive.parse_filename(os.path.basename(filename))[0]
+
+ version_filters = map(
+ lambda x: x % version_chars,
+ ( # Debian upstream tarball: package_'<version>.orig.tar.gz'
+ r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)\.orig',
+ # Debian native: 'package_<version>.tar.gz'
+ r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)',
+ # Upstream 'package-<version>.tar.gz'
+ # or directory 'package-<version>':
+ r'^(?P<package>[a-zA-Z\d\.\+\-]+)(-)(?P<version>[0-9]%s*)'))
+ if extra_regex:
+ version_filters = extra_regex + version_filters
+
+ for filter in version_filters:
+ m = re.match(filter, basename)
+ if m:
+ return (m.group('package'), m.group('version'))
+ return ('', '')
+
+ @staticmethod
+ def has_origs(orig_files, dir):
+ "Check orig tarball and additional tarballs exists in dir"
+ for o in orig_files:
+ if not os.path.exists(os.path.join(dir, o)):
+ return False
+ return True
+
+ @classmethod
+ def has_orig(cls, orig_file, dir):
+ return cls.has_origs([orig_file], dir)
+
+ @staticmethod
+ def symlink_origs(orig_files, orig_dir, output_dir, force=False):
+ """
+ symlink orig tarball from orig_dir to output_dir
+ @return: [] if all links were created, list of
+ failed links otherwise
+ """
+ orig_dir = os.path.abspath(orig_dir)
+ output_dir = os.path.abspath(output_dir)
+ err = []
+
+ if orig_dir == output_dir:
+ return []
+
+ for f in orig_files:
+ src = os.path.join(orig_dir, f)
+ dst = os.path.join(output_dir, f)
+ if not os.access(src, os.F_OK):
+ err.append(f)
+ continue
+ try:
+ if os.path.exists(dst) and force:
+ os.unlink(dst)
+ os.symlink(src, dst)
+ except OSError:
+ err.append(f)
+ return err
+
+ @classmethod
+ def symlink_orig(cls, orig_file, orig_dir, output_dir, force=False):
+ return cls.symlink_origs([orig_file], orig_dir, output_dir, force=force)
diff --git a/gbp/pkg/upstreamsource.py b/gbp/pkg/upstreamsource.py
new file mode 100644
index 00000000..9716ed2a
--- /dev/null
+++ b/gbp/pkg/upstreamsource.py
@@ -0,0 +1,193 @@
+# vim: set fileencoding=utf-8 :
+#
+# (C) 2017 Guido Guenther <agx@sigxcpu.org>
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, please see
+# <http://www.gnu.org/licenses/>
+
+import glob
+import os
+
+import gbp.command_wrappers as gbpc
+
+from gbp.pkg.compressor import Compressor
+from gbp.pkg.pkgpolicy import PkgPolicy
+
+from gbp.errors import GbpError
+
+
+class UpstreamSource(object):
+ """
+ Upstream source. Can be either an unpacked dir, a tarball or another type
+ of archive
+
+ @cvar _orig: are the upstream sources already suitable as an upstream
+ tarball
+ @type _orig: boolean
+ @cvar _path: path to the upstream sources
+ @type _path: string
+ @cvar _unpacked: path to the unpacked source tree
+ @type _unpacked: string
+ """
+ def __init__(self, name, unpacked=None, pkg_policy=PkgPolicy):
+ self._orig = False
+ self._pkg_policy = pkg_policy
+ self._path = name
+ self.unpacked = unpacked
+
+ self._check_orig()
+ if self.is_dir():
+ self.unpacked = self.path
+
+ def _check_orig(self):
+ """
+ Check if upstream source format can be used as orig tarball.
+ This doesn't imply that the tarball is correctly named.
+
+ @return: C{True} if upstream source format is suitable
+ as upstream tarball, C{False} otherwise.
+ @rtype: C{bool}
+ """
+ if self.is_dir():
+ self._orig = False
+ return
+
+ parts = self._path.split('.')
+ try:
+ if parts[-1] == 'tgz':
+ self._orig = True
+ elif parts[-2] == 'tar':
+ if (parts[-1] in Compressor.Opts or
+ parts[-1] in Compressor.Aliases):
+ self._orig = True
+ except IndexError:
+ self._orig = False
+
+ def is_orig(self):
+ """
+ @return: C{True} if sources are suitable as orig tarball,
+ C{False} otherwise
+ @rtype: C{bool}
+ """
+ return self._orig
+
+ def is_dir(self):
+ """
+ @return: C{True} if if upstream sources are an unpacked directory,
+ C{False} otherwise
+ @rtype: C{bool}
+ """
+ return True if os.path.isdir(self._path) else False
+
+ @property
+ def path(self):
+ return self._path.rstrip('/')
+
+ def unpack(self, dir, filters=None):
+ """
+ Unpack packed upstream sources into a given directory
+ (filtering out files specified by filters) and determine the
+ toplevel of the source tree.
+ """
+ if self.is_dir():
+ raise GbpError("Cannot unpack directory %s" % self.path)
+
+ if not filters:
+ filters = []
+
+ if not isinstance(filters, list):
+ raise GbpError("Filters must be a list")
+
+ self._unpack_archive(dir, filters)
+ self.unpacked = self._unpacked_toplevel(dir)
+
+ def _unpack_archive(self, dir, filters):
+ """
+ Unpack packed upstream sources into a given directory
+ allowing to filter out files in case of tar archives.
+ """
+ ext = os.path.splitext(self.path)[1]
+ if ext in [".zip", ".xpi"]:
+ if filters:
+ raise GbpError("Can only filter tar archives: %s", (ext, self.path))
+ self._unpack_zip(dir)
+ else:
+ self._unpack_tar(dir, filters)
+
+ def _unpack_zip(self, dir):
+ try:
+ gbpc.UnpackZipArchive(self.path, dir)()
+ except gbpc.CommandExecFailed:
+ raise GbpError("Unpacking of %s failed" % self.path)
+
+ def _unpacked_toplevel(self, dir):
+ """unpacked archives can contain a leading directory or not"""
+ unpacked = glob.glob('%s/*' % dir)
+ unpacked.extend(glob.glob("%s/.*" % dir)) # include hidden files and folders
+ # Check that dir contains nothing but a single folder:
+ if len(unpacked) == 1 and os.path.isdir(unpacked[0]):
+ return unpacked[0]
+ else:
+ return dir
+
+ def _unpack_tar(self, dir, filters):
+ """
+ Unpack a tarball to I{dir} applying a list of I{filters}. Leave the
+ cleanup to the caller in case of an error.
+ """
+ try:
+ unpackArchive = gbpc.UnpackTarArchive(self.path, dir, filters)
+ unpackArchive()
+ except gbpc.CommandExecFailed:
+ # unpackArchive already printed an error message
+ raise GbpError
+
+ def pack(self, newarchive, filters=None):
+ """
+ Recreate a new archive from the current one
+
+ @param newarchive: the name of the new archive
+ @type newarchive: string
+ @param filters: tar filters to apply
+ @type filters: array of strings
+ @return: the new upstream source
+ @rtype: UpstreamSource
+ """
+ if not self.unpacked:
+ raise GbpError("Need an unpacked source tree to pack")
+
+ if not filters:
+ filters = []
+
+ if not isinstance(filters, list):
+ raise GbpError("Filters must be a list")
+
+ try:
+ unpacked = self.unpacked.rstrip('/')
+ repackArchive = gbpc.PackTarArchive(newarchive,
+ os.path.dirname(unpacked),
+ os.path.basename(unpacked),
+ filters)
+ repackArchive()
+ except gbpc.CommandExecFailed:
+ # repackArchive already printed an error
+ raise GbpError
+ return type(self)(newarchive)
+
+ @staticmethod
+ def known_compressions():
+ return Compressor.Exts.values()
+
+ def guess_version(self, extra_regex=r''):
+ return self._pkg_policy.guess_upstream_src_version(self.path,
+ extra_regex)
diff --git a/gbp/rpm/__init__.py b/gbp/rpm/__init__.py
index 253a2e56..38ecbac8 100644
--- a/gbp/rpm/__init__.py
+++ b/gbp/rpm/__init__.py
@@ -30,7 +30,7 @@ from gbp.errors import GbpError
from gbp.git import GitRepositoryError
from gbp.patch_series import (PatchSeries, Patch)
import gbp.log
-from gbp.pkg import (UpstreamSource, parse_archive_filename)
+from gbp.pkg import (UpstreamSource, Archive)
from gbp.rpm.policy import RpmPkgPolicy
from gbp.rpm.linkedlist import LinkedList
from gbp.rpm.lib_rpm import librpm, get_librpm_log
@@ -788,7 +788,7 @@ class SpecFile(object):
src = {'num': num, 'filename': os.path.basename(filename),
'uri': filename}
src['filename_base'], src['archive_fmt'], src['compression'] = \
- parse_archive_filename(os.path.basename(filename))
+ Archive.parse_filename(os.path.basename(filename))
if (src['filename_base'].startswith(self.name) and
src['archive_fmt']):
# Take the first archive that starts with pkg name
diff --git a/gbp/rpm/policy.py b/gbp/rpm/policy.py
index e5be7fef..a027ed99 100644
--- a/gbp/rpm/policy.py
+++ b/gbp/rpm/policy.py
@@ -18,7 +18,7 @@
import re
-from gbp.pkg import PkgPolicy, parse_archive_filename
+from gbp.pkg import PkgPolicy, Archive
from gbp.scripts.common.pq import parse_gbp_commands
@@ -68,7 +68,7 @@ class RpmPkgPolicy(PkgPolicy):
>>> RpmPkgPolicy.is_valid_orig_archive("foo.gz")
False
"""
- _base, arch_fmt, _compression = parse_archive_filename(filename)
+ _base, arch_fmt, _compression = Archive.parse_filename(filename)
if arch_fmt:
return True
return False
diff --git a/gbp/scripts/common/__init__.py b/gbp/scripts/common/__init__.py
index 44dd9298..e34a0c3c 100644
--- a/gbp/scripts/common/__init__.py
+++ b/gbp/scripts/common/__init__.py
@@ -20,7 +20,7 @@ import re
import os
from gbp.errors import GbpError
from gbp.deb import DebianPkgPolicy
-from gbp.pkg import parse_archive_filename
+from gbp.pkg import Archive
class ExitCodes(object):
@@ -50,7 +50,7 @@ def get_component_tarballs(name, version, tarball, components):
tarball.
"""
tarballs = []
- (_, _, comp_type) = parse_archive_filename(tarball)
+ (_, _, comp_type) = Archive.parse_filename(tarball)
for component in components:
cname = DebianPkgPolicy.build_tarball_name(name,
version,
diff --git a/gbp/scripts/export_orig.py b/gbp/scripts/export_orig.py
index e6ef7986..f4fc4a12 100755
--- a/gbp/scripts/export_orig.py
+++ b/gbp/scripts/export_orig.py
@@ -24,7 +24,7 @@ from gbp.deb.git import GitRepositoryError
from gbp.errors import GbpError
import gbp.log
import gbp.notifications
-from gbp.pkg import Compressor, parse_archive_filename
+from gbp.pkg import Compressor, Archive
# upstream tarball preparation
@@ -223,7 +223,7 @@ def guess_comp_type(repo, comp_type, source, tarball_dir):
else:
commit = repo.pristine_tar_branch
tarball = repo.get_commit_info(commit)['subject']
- (base_name, archive_fmt, comp_type) = parse_archive_filename(tarball)
+ (base_name, archive_fmt, comp_type) = Archive.parse_filename(tarball)
gbp.log.debug("Determined compression type '%s'" % comp_type)
if not comp_type:
comp_type = 'gzip'
diff --git a/gbp/scripts/import_srpm.py b/gbp/scripts/import_srpm.py
index 302c3422..a2629d80 100755
--- a/gbp/scripts/import_srpm.py
+++ b/gbp/scripts/import_srpm.py
@@ -39,7 +39,7 @@ from gbp.errors import GbpError
from gbp.scripts.common import ExitCodes, is_download
from gbp.scripts.common import repo_setup
import gbp.log
-from gbp.pkg import parse_archive_filename
+from gbp.pkg import Archive
no_packaging_branch_msg = """
Repository does not have branch '%s' for packaging/distribution sources.
@@ -400,7 +400,7 @@ def main(argv):
if not options.native:
if options.pristine_tar:
- archive_fmt = parse_archive_filename(orig_tarball)[1]
+ archive_fmt = Archive.parse_filename(orig_tarball)[1]
if archive_fmt == 'tar':
repo.pristine_tar.commit(orig_tarball,
'refs/heads/%s' %
diff --git a/gbp/scripts/pq_rpm.py b/gbp/scripts/pq_rpm.py
index adf1b4ab..ebce7ff2 100755
--- a/gbp/scripts/pq_rpm.py
+++ b/gbp/scripts/pq_rpm.py
@@ -33,7 +33,7 @@ from gbp.git.modifier import GitModifier
from gbp.command_wrappers import GitCommand, CommandExecFailed
from gbp.errors import GbpError
from gbp.patch_series import PatchSeries, Patch
-from gbp.pkg import parse_archive_filename
+from gbp.pkg import Archive
from gbp.rpm import (SpecFile, NoSpecError, guess_spec, guess_spec_repo,
spec_from_repo)
from gbp.scripts.common import ExitCodes
@@ -227,7 +227,7 @@ def safe_patches(queue):
gbp.log.debug("Saving patches '%s' in '%s'" %
(os.path.dirname(queue[0].path), tmpdir))
for patch in queue:
- base, _archive_fmt, comp = parse_archive_filename(patch.path)
+ base, _archive_fmt, comp = Archive.parse_filename(patch.path)
uncompressors = {'gzip': gzip.open, 'bzip2': bz2.BZ2File}
if comp in uncompressors:
gbp.log.debug("Uncompressing '%s'" % os.path.basename(patch.path))