From b067bf9d5cbab91da407b05ba75d3ba8ef72d18b Mon Sep 17 00:00:00 2001 From: Guido Günther Date: Fri, 18 Apr 2014 16:31:56 +0200 Subject: Split out and test DebianDistro --- tests/test_debiandistro.py | 94 ++++++++++++++++++++++++++ whatmaps/command.py | 124 ---------------------------------- whatmaps/debiandistro.py | 162 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 256 insertions(+), 124 deletions(-) create mode 100644 tests/test_debiandistro.py create mode 100644 whatmaps/debiandistro.py diff --git a/tests/test_debiandistro.py b/tests/test_debiandistro.py new file mode 100644 index 0000000..10586f9 --- /dev/null +++ b/tests/test_debiandistro.py @@ -0,0 +1,94 @@ +# vim: set fileencoding=utf-8 : +# (C) 2014 Guido Günther +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +"""Test L{whatmaps.process} config""" + +import unittest +from mock import patch + +from whatmaps.debiandistro import DebianDistro +from whatmaps.debianpkg import DebianPkg + +class TestDebianDistro(unittest.TestCase): + def test_vars(self): + """Check Debian distro vars""" + self.assertEqual(DebianDistro.id, 'Debian') + self.assertIsNotNone(DebianDistro._pkg_services) + self.assertIsNotNone(DebianDistro._pkg_service_blacklist) + self.assertIsNotNone(DebianDistro.service_blacklist) + self.assertEqual(DebianDistro.restart_service_cmd('aservice'), + ['invoke-rc.d', 'aservice', 'restart']) + self.assertTrue(DebianDistro.has_apt()) + + def test_pkg_by_file(self): + with patch('subprocess.Popen') as mock: + PopenMock = mock.return_value + PopenMock.returncode = 0 + PopenMock.communicate.return_value = ['apackage'] + + pkg = DebianDistro.pkg_by_file('afile') + self.assertIsInstance(pkg, DebianPkg) + self.assertEqual(pkg.name, 'apackage') + PopenMock.communicate.assert_called_once_with() + mock.assert_called_once_with(['dpkg-query', '-S', 'afile'], + stderr=-1, stdout=-1) + + def test_pkg_by_file_failure(self): + """Test if None is returned on subcommand erros""" + with patch('subprocess.Popen') as mock: + PopenMock = mock.return_value + PopenMock.returncode = 1 + PopenMock.communicate.return_value = ['apackage'] + + pkg = DebianDistro.pkg_by_file('afile') + self.assertIsNone(pkg) + PopenMock.communicate.assert_called_once_with() + mock.assert_called_once_with(['dpkg-query', '-S', 'afile'], + stderr=-1, stdout=-1) + + def test_read_apt_pipeline(self): + """Test our interaction with the apt pipeline""" + class AptPipelineMock(object): + def __init__(self): + self.iter = self.lines() + + def lines(self): + for line in ['VERSION 2', 'Whatmaps::Enable-Restart=1', '\n']: + yield line + + def readlines(self): + return ['pkg1 0.0 c 1.0 **CONFIGURE**', + 'pkg2 - c 1.0 **CONFIGURE**', + ''] + + def readline(self): + return self.iter.next() + + with patch('sys.stdin', new_callable=AptPipelineMock): + pkgs = DebianDistro.read_apt_pipeline() + self.assertEqual(len(pkgs), 1) + self.assertTrue(pkgs.has_key('pkg1')) + self.assertTrue(pkgs['pkg1'].name, 'pkg1') + + @patch('apt_pkg.init') + @patch('apt_pkg.Acquire') + def test_filter_security_updates(self, apt_pkg_acquire, apt_pkg_init): + pkgs = {'pkg1': DebianPkg('pkg1'), + 'pkg2': DebianPkg('pkg2'), + } + with patch('apt_pkg.Cache') as mock: + DebianDistro.filter_security_updates(pkgs) + apt_pkg_init.assert_called_once_with() + apt_pkg_acquire.assert_called_once_with() diff --git a/whatmaps/command.py b/whatmaps/command.py index 461b8e4..46e758b 100755 --- a/whatmaps/command.py +++ b/whatmaps/command.py @@ -25,10 +25,6 @@ import subprocess import sys import errno from optparse import OptionParser -try: - import apt_pkg -except ImportError: - apt_pkg = None try: import lsb_release except ImportError: @@ -40,126 +36,6 @@ from . distro import Distro from . pkg import Pkg, PkgError -class DebianDistro(Distro): - "Debian (dpkg) based distribution""" - id = 'Debian' - - _pkg_services = { 'apache2-mpm-worker': [ 'apache2' ], - 'apache2-mpm-prefork': [ 'apache2' ], - 'apache2.2-bin': [ 'apache2' ], - 'dovecot-imapd': [ 'dovecot' ], - 'dovecot-pop3d': [ 'dovecot' ], - 'exim4-daemon-light': [ 'exim4' ], - 'exim4-daemon-heavy': [ 'exim4' ], - 'qemu-system-x86_64': [ 'libvirt-guests' ], - } - - # Per package blacklist - _pkg_service_blacklist = { 'libvirt-bin': [ 'libvirt-guests' ] } - - # Per distro blacklist - service_blacklist = set(['kvm', 'qemu-kvm', 'qemu-system-x86']) - - @classmethod - def pkg(klass, name): - return DebianPkg(name) - - @classmethod - def pkg_by_file(klass, path): - find_file = subprocess.Popen(['dpkg-query', '-S', path], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - output = find_file.communicate()[0] - if find_file.returncode: - return None - pkg = output.split(':')[0] - return DebianPkg(pkg) - - @classmethod - def restart_service_cmd(klass, name): - return ['invoke-rc.d', name, 'restart'] - - @classmethod - def has_apt(klass): - return True - - @staticmethod - def read_apt_pipeline(): - whatmaps_enabled = False - - version = sys.stdin.readline().rstrip() - if version != "VERSION 2": - logging.error("Wrong or missing VERSION from apt pipeline\n" - "(is Dpkg::Tools::Options::/usr/bin/whatmaps::Version set to 2?)") - raise PkgError - - while 1: - aptconfig = sys.stdin.readline() - if not aptconfig or aptconfig == '\n': - break - if aptconfig.startswith('Whatmaps::Enable-Restart=') and \ - aptconfig.strip().split('=', 1)[1].lower() in ["true", "1"]: - logging.debug("Service restarts enabled") - whatmaps_enabled = True - - if not whatmaps_enabled: - return None - - pkgs = {} - for line in sys.stdin.readlines(): - if not line: - break - (pkgname, oldversion, compare, newversion, filename) = line.split() - - if filename == '**CONFIGURE**': - if oldversion != '-': # Updates only - pkgs[pkgname] = DebianPkg(pkgname) - pkgs[pkgname].version = newversion - return pkgs - - - @classmethod - def _security_update_origins(klass): - "Determine security update origins from apt configuration" - codename = lsb_release.get_distro_information()['CODENAME'] - def _subst(line): - mapping = {'distro_codename' : codename, - 'distro_id' : klass.id, } - return string.Template(line).substitute(mapping) - - origins = [] - for s in apt_pkg.config.value_list('Whatmaps::Security-Update-Origins'): - (distro_id, distro_codename) = s.split() - origins.append((_subst(distro_id), - _subst(distro_codename))) - logging.debug("Security Update Origins: %s", origins) - return origins - - - @classmethod - def filter_security_updates(klass, pkgs): - """Filter on security updates""" - - apt_pkg.init() - acquire = apt_pkg.Acquire() - cache = apt_pkg.Cache() - - security_update_origins = klass._security_update_origins() - security_updates = {} - - for pkg in pkgs.values(): - cache_pkg = cache[pkg.name] - for cache_version in cache_pkg.version_list: - if pkg.version == cache_version.ver_str: - for pfile, _ in cache_version.file_list: - for origin in security_update_origins: - if pfile.origin == origin[0] and \ - pfile.archive == origin[1]: - security_updates[pkg] = pkg - break - return security_updates - - class DebianPkg(Pkg): type = 'Debian' _init_script_re = re.compile('/etc/init.d/[\w\-\.]') diff --git a/whatmaps/debiandistro.py b/whatmaps/debiandistro.py new file mode 100644 index 0000000..629de06 --- /dev/null +++ b/whatmaps/debiandistro.py @@ -0,0 +1,162 @@ +# vim: set fileencoding=utf-8 : +# +# (C) 2010,2014 Guido Günther +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public Licnese for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +try: + import apt_pkg +except ImportError: + apt_pkg = None + +try: + import lsb_release +except ImportError: + lsb_release = None + +import logging +import subprocess +import sys + +from . distro import Distro +from . debianpkg import DebianPkg +from . pkg import PkgError + +class DebianDistro(Distro): + "Debian (dpkg) based distribution""" + id = 'Debian' + + _pkg_services = { 'apache2-mpm-worker': [ 'apache2' ], + 'apache2-mpm-prefork': [ 'apache2' ], + 'apache2.2-bin': [ 'apache2' ], + 'dovecot-imapd': [ 'dovecot' ], + 'dovecot-pop3d': [ 'dovecot' ], + 'exim4-daemon-light': [ 'exim4' ], + 'exim4-daemon-heavy': [ 'exim4' ], + 'qemu-system-x86_64': [ 'libvirt-guests' ], + } + + # Per package blacklist + _pkg_service_blacklist = { 'libvirt-bin': [ 'libvirt-guests' ] } + + # Per distro blacklist + service_blacklist = set(['kvm', 'qemu-kvm', 'qemu-system-x86']) + + @classmethod + def pkg(klass, name): + return DebianPkg(name) + + @classmethod + def pkg_by_file(klass, path): + find_file = subprocess.Popen(['dpkg-query', '-S', path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + output = find_file.communicate()[0] + if find_file.returncode: + return None + pkg = output.split(':')[0] + return DebianPkg(pkg) + + @classmethod + def restart_service_cmd(klass, name): + return ['invoke-rc.d', name, 'restart'] + + @classmethod + def has_apt(klass): + return True + + @staticmethod + def read_apt_pipeline(): + whatmaps_enabled = False + + version = sys.stdin.readline().rstrip() + if version != "VERSION 2": + err = "Wrong or missing VERSION from apt pipeline" + logging.error("%s\n" + "(is Dpkg::Tools::Options::/usr/bin/whatmaps::Version set to 2?)" + % err) + raise PkgError(err) + + while 1: + aptconfig = sys.stdin.readline() + if not aptconfig or aptconfig == '\n': + break + if aptconfig.startswith('Whatmaps::Enable-Restart=') and \ + aptconfig.strip().split('=', 1)[1].lower() in ["true", "1"]: + logging.debug("Service restarts enabled") + whatmaps_enabled = True + + if not whatmaps_enabled: + return None + + pkgs = {} + for line in sys.stdin.readlines(): + if not line: + break + (pkgname, oldversion, compare, newversion, filename) = line.split() + + if filename == '**CONFIGURE**': + if oldversion != '-': # Updates only + pkgs[pkgname] = DebianPkg(pkgname) + pkgs[pkgname].version = newversion + return pkgs + + + @classmethod + def _security_update_origins(klass): + "Determine security update origins from apt configuration" + + if lsb_release is None: + raise PkgError("lsb_release not found, can't determine security updates") + + codename = lsb_release.get_distro_information()['CODENAME'] + def _subst(line): + mapping = {'distro_codename' : codename, + 'distro_id' : klass.id, } + return string.Template(line).substitute(mapping) + + origins = [] + for s in apt_pkg.config.value_list('Whatmaps::Security-Update-Origins'): + (distro_id, distro_codename) = s.split() + origins.append((_subst(distro_id), + _subst(distro_codename))) + logging.debug("Security Update Origins: %s", origins) + return origins + + + @classmethod + def filter_security_updates(klass, pkgs): + """Filter on security updates""" + + if apt_pkg is None: + raise PkgError("apt_pkg not installed, can't determine security updates") + + apt_pkg.init() + acquire = apt_pkg.Acquire() + cache = apt_pkg.Cache() + + security_update_origins = klass._security_update_origins() + security_updates = {} + + for pkg in pkgs.values(): + cache_pkg = cache[pkg.name] + for cache_version in cache_pkg.version_list: + if pkg.version == cache_version.ver_str: + for pfile, _ in cache_version.file_list: + for origin in security_update_origins: + if pfile.origin == origin[0] and \ + pfile.archive == origin[1]: + security_updates[pkg] = pkg + break + return security_updates -- cgit v1.2.3