summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuido Günther <agx@sigxcpu.org>2014-05-02 17:29:48 +0200
committerGuido Günther <agx@sigxcpu.org>2014-05-02 17:29:48 +0200
commit27ad89007ecba9e49e3a0e72cef232b5e5f4f689 (patch)
tree12be1932a519a2c6391f9a3d641c8ab267694a01
parent721b0bf59d40990b3170a1ded2ad1739001b8d6d (diff)
parentedc021d52d205bce99d9bd49a7de971e114acc80 (diff)
Merge tag 'v0.0.6' into debian/master
whatmaps 0.0.6
-rw-r--r--.gitignore1
-rw-r--r--setup.cfg2
-rw-r--r--setup.py14
-rw-r--r--tests/context.py13
-rw-r--r--tests/test_debiandistro.py100
-rw-r--r--tests/test_debianpkg.py29
-rw-r--r--tests/test_distro.py40
-rw-r--r--tests/test_pkg.py102
-rw-r--r--tests/test_process.py21
-rw-r--r--tests/test_redhatdistro.py58
-rw-r--r--tests/test_rpmpkg.py28
-rw-r--r--tox.ini13
-rwxr-xr-xwhatmaps/command.py372
-rw-r--r--whatmaps/debiandistro.py164
-rw-r--r--whatmaps/debianpkg.py42
-rw-r--r--whatmaps/distro.py70
-rw-r--r--whatmaps/pkg.py81
-rw-r--r--whatmaps/process.py2
-rw-r--r--whatmaps/redhatdistro.py57
-rw-r--r--whatmaps/rpmpkg.py41
20 files changed, 888 insertions, 362 deletions
diff --git a/.gitignore b/.gitignore
index e9b90d1..e440596 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
.coverage
+.tox
*.pyc
*~
build/
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 0000000..ee9ac0e
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,2 @@
+[nosetests]
+cover-package=whatmaps
diff --git a/setup.py b/setup.py
index 46b0d81..7e7ed00 100644
--- a/setup.py
+++ b/setup.py
@@ -1,5 +1,19 @@
#!/usr/bin/python
# vim: set fileencoding=utf-8 :
+#
+# (C) 2010,2014 Guido Günther <agx@sigxcpu.org>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
from setuptools import setup
diff --git a/tests/context.py b/tests/context.py
index d859c2f..5878771 100644
--- a/tests/context.py
+++ b/tests/context.py
@@ -1,5 +1,18 @@
# this context.py should be included by all tests
# idea from http://kennethreitz.com/repository-structure-and-python.html
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import shutil
diff --git a/tests/test_debiandistro.py b/tests/test_debiandistro.py
new file mode 100644
index 0000000..c01efd8
--- /dev/null
+++ b/tests/test_debiandistro.py
@@ -0,0 +1,100 @@
+# vim: set fileencoding=utf-8 :
+# (C) 2014 Guido Günther <agx@sigxcpu.org>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""Test L{whatmaps.process} config"""
+
+import unittest
+from mock import patch
+
+try:
+ import apt_pkg
+ have_apt_pkg=True
+except ImportError:
+ have_apt_pkg=False
+
+from whatmaps.debiandistro import DebianDistro
+from whatmaps.debianpkg import DebianPkg
+
+class TestDebianDistro(unittest.TestCase):
+ def test_vars(self):
+ """Check Debian distro vars"""
+ self.assertEqual(DebianDistro.id, 'Debian')
+ self.assertIsNotNone(DebianDistro._pkg_services)
+ self.assertIsNotNone(DebianDistro._pkg_service_blacklist)
+ self.assertIsNotNone(DebianDistro.service_blacklist)
+ self.assertEqual(DebianDistro.restart_service_cmd('aservice'),
+ ['invoke-rc.d', 'aservice', 'restart'])
+ self.assertTrue(DebianDistro.has_apt())
+
+ def test_pkg_by_file(self):
+ with patch('subprocess.Popen') as mock:
+ PopenMock = mock.return_value
+ PopenMock.returncode = 0
+ PopenMock.communicate.return_value = ['apackage']
+
+ pkg = DebianDistro.pkg_by_file('afile')
+ self.assertIsInstance(pkg, DebianPkg)
+ self.assertEqual(pkg.name, 'apackage')
+ PopenMock.communicate.assert_called_once_with()
+ mock.assert_called_once_with(['dpkg-query', '-S', 'afile'],
+ stderr=-1, stdout=-1)
+
+ def test_pkg_by_file_failure(self):
+ """Test if None is returned on subcommand erros"""
+ with patch('subprocess.Popen') as mock:
+ PopenMock = mock.return_value
+ PopenMock.returncode = 1
+ PopenMock.communicate.return_value = ['apackage']
+
+ pkg = DebianDistro.pkg_by_file('afile')
+ self.assertIsNone(pkg)
+ PopenMock.communicate.assert_called_once_with()
+ mock.assert_called_once_with(['dpkg-query', '-S', 'afile'],
+ stderr=-1, stdout=-1)
+
+ def test_read_apt_pipeline(self):
+ """Test our interaction with the apt pipeline"""
+ class AptPipelineMock(object):
+ def __init__(self):
+ self.iter = self.lines()
+
+ def lines(self):
+ for line in ['VERSION 2', 'Whatmaps::Enable-Restart=1', '\n']:
+ yield line
+
+ def readlines(self):
+ return ['pkg1 0.0 c 1.0 **CONFIGURE**',
+ 'pkg2 - c 1.0 **CONFIGURE**',
+ '']
+
+ def readline(self):
+ return next(self.iter)
+
+ with patch('sys.stdin', new_callable=AptPipelineMock):
+ pkgs = DebianDistro.read_apt_pipeline()
+ self.assertEqual(len(pkgs), 1)
+ self.assertIn('pkg1', pkgs)
+ self.assertTrue(pkgs['pkg1'].name, 'pkg1')
+
+ @patch('apt_pkg.init')
+ @patch('apt_pkg.Acquire')
+ @unittest.skipUnless(have_apt_pkg, "apt_pkg not installed")
+ def test_filter_security_updates(self, apt_pkg_acquire, apt_pkg_init):
+ pkgs = {'pkg1': DebianPkg('pkg1'),
+ 'pkg2': DebianPkg('pkg2'),
+ }
+ with patch('apt_pkg.Cache') as mock:
+ DebianDistro.filter_security_updates(pkgs)
+ apt_pkg_init.assert_called_once_with()
+ apt_pkg_acquire.assert_called_once_with()
diff --git a/tests/test_debianpkg.py b/tests/test_debianpkg.py
new file mode 100644
index 0000000..72ce8fb
--- /dev/null
+++ b/tests/test_debianpkg.py
@@ -0,0 +1,29 @@
+# vim: set fileencoding=utf-8 :
+# (C) 2014 Guido Günther <agx@sigxcpu.org>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""Test L{whatmaps.process} config"""
+
+import unittest
+
+from mock import patch
+
+from whatmaps.debianpkg import DebianPkg
+
+class TestDebianPkg(unittest.TestCase):
+ def test_services(self):
+ with patch('whatmaps.pkg.Pkg._get_contents') as mock:
+ mock.return_value = ['/etc/init.d/aservice', '/usr/bin/afile']
+ p = DebianPkg('doesnotmatter')
+ self.assertEqual(p.services, ['aservice'])
+
diff --git a/tests/test_distro.py b/tests/test_distro.py
new file mode 100644
index 0000000..e22da68
--- /dev/null
+++ b/tests/test_distro.py
@@ -0,0 +1,40 @@
+# vim: set fileencoding=utf-8 :
+# (C) 2014 Guido Günther <agx@sigxcpu.org>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""Test L{whatmaps.process} config"""
+
+import unittest
+
+from whatmaps.distro import Distro
+
+from . import context
+
+class Pkg(object):
+ name = 'doesnotmatter'
+
+class TestDistro(unittest.TestCase):
+ def test_abstract(self):
+ """Check abstract method signatures"""
+ # Variables
+ self.assertEqual(Distro.service_blacklist, set())
+ self.assertIsNone(Distro.id)
+ # Pure virtual methods
+ self.assertRaises(Distro.pkg, None, None, NotImplementedError)
+ self.assertRaises(Distro.pkg_by_file, None, NotImplementedError)
+ self.assertRaises(Distro.restart_service_cmd, None, NotImplementedError)
+ self.assertRaises(Distro.restart_service, None, NotImplementedError)
+ # Lookup methods
+ self.assertEqual(Distro.pkg_services(Pkg), [])
+ self.assertEqual(Distro.pkg_service_blacklist(Pkg), [])
+ self.assertFalse(Distro.has_apt())
diff --git a/tests/test_pkg.py b/tests/test_pkg.py
new file mode 100644
index 0000000..8006b19
--- /dev/null
+++ b/tests/test_pkg.py
@@ -0,0 +1,102 @@
+# vim: set fileencoding=utf-8 :
+# (C) 2014 Guido Günther <agx@sigxcpu.org>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""Test L{whatmaps.process} config"""
+
+import unittest
+from mock import patch
+
+from whatmaps.pkg import Pkg, PkgError
+
+from . import context
+
+class TestPkg(unittest.TestCase):
+ def setUp(self):
+ self.tmpdir = context.new_tmpdir(__name__)
+
+ def test_abstract(self):
+ """Check abstract method signatures"""
+ self.assertIsNone(Pkg.type)
+ self.assertIsNone(Pkg.services)
+
+ def test_repr(self):
+ p = Pkg('apckage')
+ self.assertEqual(str(p), "<None Pkg object name:'apckage'>")
+
+ def test_list_contents(self):
+ with patch('subprocess.Popen') as mock:
+ p = Pkg('doesnotmatter')
+ p._list_contents = '/does/not/matter'
+ PopenMock = mock.return_value
+ PopenMock.communicate.return_value = [
+ '/package/content',
+ '/more/package/content',
+ ]
+ PopenMock.returncode = 0
+ result = p._get_contents()
+ self.assertIn('/package/content', result)
+ self.assertNotIn('/more/package/content', result)
+
+ # We want to check that we don't invoke Popen on
+ # a second call so let it fail
+ PopenMock.returncode = 1
+
+ result = p._get_contents()
+ self.assertIn('/package/content', result)
+ self.assertNotIn('/more/package/content', result)
+
+ def test_shared_objects(self):
+ """Test that we properly match shared objects"""
+ with patch('subprocess.Popen') as mock:
+ p = Pkg('doesnotmatter')
+ p._list_contents = '/does/not/matter'
+ PopenMock = mock.return_value
+ PopenMock.communicate.return_value = ['\n'.join([
+ '/lib/foo.so.1',
+ '/lib/bar.so',
+ '/not/a/shared/object',
+ '/not/a/shared/object.soeither',
+ ])]
+ PopenMock.returncode = 0
+ result = p.shared_objects
+ self.assertIn('/lib/foo.so.1', result)
+ self.assertIn('/lib/bar.so', result)
+ self.assertNotIn('/not/a/shred/object', result)
+ self.assertNotIn('/not/a/shred/object.soeither', result)
+
+ # We want to check that we don't invoke Popen on
+ # a second call so let it fail.
+ PopenMock.returncode = 1
+ result = p._get_contents()
+ self.assertIn('/lib/foo.so.1', result)
+ self.assertNotIn('/not/a/shred/object', result)
+
+ def test_shared_object_error(self):
+ """Test that we raise PkgError"""
+ with patch('subprocess.Popen') as mock:
+ p = Pkg('doesnotmatter')
+ p._list_contents = '/does/not/matter'
+ PopenMock = mock.return_value
+ PopenMock.communicate.return_value = ['']
+ PopenMock.returncode = 1
+ try:
+ p.shared_objects
+ self.fail("PkgError exception not raised")
+ except PkgError:
+ pass
+ except Exception as e:
+ self.fail("Raised '%s is not PkgError" % e)
+
+ def tearDown(self):
+ context.teardown()
diff --git a/tests/test_process.py b/tests/test_process.py
index da47ebd..91f5431 100644
--- a/tests/test_process.py
+++ b/tests/test_process.py
@@ -1,18 +1,17 @@
# vim: set fileencoding=utf-8 :
# (C) 2014 Guido Günther <agx@sigxcpu.org>
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Test L{whatmaps.process} config"""
import os
diff --git a/tests/test_redhatdistro.py b/tests/test_redhatdistro.py
new file mode 100644
index 0000000..438adf8
--- /dev/null
+++ b/tests/test_redhatdistro.py
@@ -0,0 +1,58 @@
+# vim: set fileencoding=utf-8 :
+# (C) 2014 Guido Günther <agx@sigxcpu.org>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""Test L{whatmaps.process} config"""
+
+import unittest
+from mock import patch
+
+from whatmaps.redhatdistro import RedHatDistro
+from whatmaps.rpmpkg import RpmPkg
+
+class TestRedHatDistro(unittest.TestCase):
+ def test_vars(self):
+ """Check RedHat distro vars"""
+ self.assertEqual(RedHatDistro.id, None)
+ self.assertIsNotNone(RedHatDistro._pkg_services)
+ self.assertIsNotNone(RedHatDistro._pkg_service_blacklist)
+ self.assertIsNotNone(RedHatDistro.service_blacklist)
+ self.assertEqual(RedHatDistro.restart_service_cmd('aservice'),
+ ['service', 'aservice', 'restart'])
+ self.assertFalse(RedHatDistro.has_apt())
+
+ def test_pkg_by_file(self):
+ with patch('subprocess.Popen') as mock:
+ PopenMock = mock.return_value
+ PopenMock.returncode = 0
+ PopenMock.communicate.return_value = ['apackage']
+
+ pkg = RedHatDistro.pkg_by_file('afile')
+ self.assertIsInstance(pkg, RpmPkg)
+ self.assertEqual(pkg.name, 'apackage')
+ PopenMock.communicate.assert_called_once_with()
+ mock.assert_called_once_with(['rpm', '-qf', 'afile'],
+ stderr=-1, stdout=-1)
+
+ def test_pkg_by_file_failure(self):
+ """Test if None is returned on subcommand erros"""
+ with patch('subprocess.Popen') as mock:
+ PopenMock = mock.return_value
+ PopenMock.returncode = 1
+ PopenMock.communicate.return_value = ['apackage']
+
+ pkg = RedHatDistro.pkg_by_file('afile')
+ self.assertIsNone(pkg)
+ PopenMock.communicate.assert_called_once_with()
+ mock.assert_called_once_with(['rpm', '-qf', 'afile'],
+ stderr=-1, stdout=-1)
diff --git a/tests/test_rpmpkg.py b/tests/test_rpmpkg.py
new file mode 100644
index 0000000..064f059
--- /dev/null
+++ b/tests/test_rpmpkg.py
@@ -0,0 +1,28 @@
+# vim: set fileencoding=utf-8 :
+# (C) 2014 Guido Günther <agx@sigxcpu.org>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+"""Test L{whatmaps.process} config"""
+
+import unittest
+
+from mock import patch
+
+from whatmaps.rpmpkg import RpmPkg
+
+class TestRpmPkg(unittest.TestCase):
+ def test_services(self):
+ with patch('whatmaps.pkg.Pkg._get_contents') as mock:
+ mock.return_value = ['/etc/rc.d/init.d/aservice', '/usr/bin/afile']
+ p = RpmPkg('doesnotmatter')
+ self.assertEqual(p.services, ['aservice'])
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 0000000..68a25f8
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,13 @@
+# Tox (http://tox.testrun.org/) is a tool for running tests
+# in multiple virtualenvs. This configuration file will run the
+# test suite on all supported python versions. To use it, "pip install tox"
+# and then run "tox" from this directory.
+
+[tox]
+envlist = py27, py33, py34
+
+[testenv]
+commands = python setup.py nosetests
+deps =
+ nose
+ mock
diff --git a/whatmaps/command.py b/whatmaps/command.py
index 502f7d3..3225f54 100755
--- a/whatmaps/command.py
+++ b/whatmaps/command.py
@@ -16,6 +16,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
+
import glob
import os
import logging
@@ -26,347 +28,16 @@ import sys
import errno
from optparse import OptionParser
try:
- import apt_pkg
-except ImportError:
- apt_pkg = None
-try:
import lsb_release
except ImportError:
lsb_release = None
-from whatmaps.process import Process
-
-
-class PkgError(Exception):
- pass
-
-
-class Distro(object):
- """
- A distribution
- @cvar id: distro id as returned by lsb-release
- """
- id = None
- service_blacklist = set()
- _pkg_services = {}
- _pkg_blacklist = {}
-
- @classmethod
- def pkg(klass, name):
- """Return package object named name"""
- raise NotImplementedError
-
- @classmethod
- def pkg_by_file(klass, path):
- """Return package object that contains path"""
- raise NotImplementedError
-
- @classmethod
- def restart_service_cmd(klass, service):
- """Command to restart service"""
- raise NotImplementedError
-
- @classmethod
- def restart_service(klass, service):
- """Restart a service"""
- subprocess.call(klass.restart_service_cmd(service))
-
- @classmethod
- def pkg_services(klass, pkg):
- """
- List of services that package pkg needs restarted that aren't part
- of pkg itself
- """
- try:
- return klass._pkg_services[pkg.name]
- except KeyError:
- return []
-
- @classmethod
- def pkg_service_blacklist(klass, pkg):
- """
- List of services in pkg that we don't want to be restarted even when
- a binary from this package maps a shared lib that changed.
- """
- try:
- return klass._pkg_service_blacklist[pkg.name]
- except KeyError:
- return []
-
- @classmethod
- def has_apt(klass):
- """Does the distribution use apt"""
- return False
-
-
-class Pkg(object):
- """
- A package in a distribution
- @var services: list of services provided by package
- @var shared_objects: list of shared objects shipped in this package
- @cvar type: package type (e.g. RPM or Debian)
- @cvar _so_regex: regex that matches shared objects in the list returned by
- _get_contents
- @cvar _list_contents: command to list contents of a package, will be passed
- to subprocess. "$pkg_name" will be replaced by the package
- name.
- """
-
- type = None
- services = None
- shared_objects = None
- _so_regex = re.compile(r'(?P<so>/.*\.so(\.[^/]*)$)')
- _list_contents = None
-
- def __init__(self, name):
- self.name = name
- self._services = None
- self._shared_objects = None
- self._contents = None
-
- def __repr__(self):
- return "<%s Pkg object name:'%s'>" % (self.type, self.name)
-
- def _get_contents(self):
- """List of files in the package"""
- if self._contents:
- return self._contents
- else:
- cmd = [ string.Template(arg).substitute(arg, pkg_name=self.name)
- for arg in self._list_contents ]
- list_contents = subprocess.Popen(cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- output = list_contents.communicate()[0]
- if list_contents.returncode:
- raise PkgError
- self.contents = output.split('\n')
- return self.contents
-
-
-class DebianDistro(Distro):
- "Debian (dpkg) based distribution"""
- id = 'Debian'
-
- _pkg_services = { 'apache2-mpm-worker': [ 'apache2' ],
- 'apache2-mpm-prefork': [ 'apache2' ],
- 'apache2.2-bin': [ 'apache2' ],
- 'dovecot-imapd': [ 'dovecot' ],
- 'dovecot-pop3d': [ 'dovecot' ],
- 'exim4-daemon-light': [ 'exim4' ],
- 'exim4-daemon-heavy': [ 'exim4' ],
- 'qemu-system-x86_64': [ 'libvirt-guests' ],
- }
-
- # Per package blacklist
- _pkg_service_blacklist = { 'libvirt-bin': [ 'libvirt-guests' ] }
-
- # Per distro blacklist
- service_blacklist = set(['kvm', 'qemu-kvm', 'qemu-system-x86'])
-
- @classmethod
- def pkg(klass, name):
- return DebianPkg(name)
-
- @classmethod
- def pkg_by_file(klass, path):
- find_file = subprocess.Popen(['dpkg-query', '-S', path],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- output = find_file.communicate()[0]
- if find_file.returncode:
- return None
- pkg = output.split(':')[0]
- return DebianPkg(pkg)
-
- @classmethod
- def restart_service_cmd(klass, name):
- return ['invoke-rc.d', name, 'restart']
-
- @classmethod
- def has_apt(klass):
- return True
-
- @staticmethod
- def read_apt_pipeline():
- whatmaps_enabled = False
-
- version = sys.stdin.readline().rstrip()
- if version != "VERSION 2":
- logging.error("Wrong or missing VERSION from apt pipeline\n"
- "(is Dpkg::Tools::Options::/usr/bin/whatmaps::Version set to 2?)")
- raise PkgError
-
- while 1:
- aptconfig = sys.stdin.readline()
- if not aptconfig or aptconfig == '\n':
- break
- if aptconfig.startswith('Whatmaps::Enable-Restart=') and \
- aptconfig.strip().split('=', 1)[1].lower() in ["true", "1"]:
- logging.debug("Service restarts enabled")
- whatmaps_enabled = True
-
- if not whatmaps_enabled:
- return None
-
- pkgs = {}
- for line in sys.stdin.readlines():
- if not line:
- break
- (pkgname, oldversion, compare, newversion, filename) = line.split()
-
- if filename == '**CONFIGURE**':
- if oldversion != '-': # Updates only
- pkgs[pkgname] = DebianPkg(pkgname)
- pkgs[pkgname].version = newversion
- return pkgs
-
-
- @classmethod
- def _security_update_origins(klass):
- "Determine security update origins from apt configuration"
- codename = lsb_release.get_distro_information()['CODENAME']
- def _subst(line):
- mapping = {'distro_codename' : codename,
- 'distro_id' : klass.id, }
- return string.Template(line).substitute(mapping)
-
- origins = []
- for s in apt_pkg.config.value_list('Whatmaps::Security-Update-Origins'):
- (distro_id, distro_codename) = s.split()
- origins.append((_subst(distro_id),
- _subst(distro_codename)))
- logging.debug("Security Update Origins: %s", origins)
- return origins
-
-
- @classmethod
- def filter_security_updates(klass, pkgs):
- """Filter on security updates"""
-
- apt_pkg.init()
- acquire = apt_pkg.Acquire()
- cache = apt_pkg.Cache()
-
- security_update_origins = klass._security_update_origins()
- security_updates = {}
-
- for pkg in pkgs.values():
- cache_pkg = cache[pkg.name]
- for cache_version in cache_pkg.version_list:
- if pkg.version == cache_version.ver_str:
- for pfile, _ in cache_version.file_list:
- for origin in security_update_origins:
- if pfile.origin == origin[0] and \
- pfile.archive == origin[1]:
- security_updates[pkg] = pkg
- break
- return security_updates
-
-
-class DebianPkg(Pkg):
- type = 'Debian'
- _init_script_re = re.compile('/etc/init.d/[\w\-\.]')
- _list_contents = ['dpkg-query', '-L', '${pkg_name}' ]
-
- def __init__(self, name):
- Pkg.__init__(self, name)
-
- @property
- def shared_objects(self):
- if self._shared_objects != None:
- return self._shared_objects
-
- self._shared_objects = []
- contents = self._get_contents()
-
- for line in contents:
- m = self._so_regex.match(line)
- if m:
- self._shared_objects.append(m.group('so'))
- return self._shared_objects
-
- @property
- def services(self):
- if self._services != None:
- return self._services
-
- self._services = []
- contents = self._get_contents()
- # Only supports sysvinit so far:
- for line in contents:
- if self._init_script_re.match(line):
- self._services.append(os.path.basename(line.strip()))
- return self._services
-
-
-class RedHatDistro(Distro):
- "RPM based distribution"""
- _pkg_re = re.compile(r'(?P<pkg>[\w\-\+]+)-(?P<ver>[\w\.]+)'
- '-(?P<rel>[\w\.]+)\.(?P<arch>.+)')
-
- @classmethod
- def pkg(klass, name):
- return RpmPkg(name)
-
- @classmethod
- def pkg_by_file(klass, path):
- find_file = subprocess.Popen(['rpm', '-qf', path],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- output = find_file.communicate()[0]
- if find_file.returncode:
- return None
- m = klass._pkg_re.match(output.strip())
- if m:
- pkg = m.group('pkg')
- else:
- pkg = output.strip()
- return RpmPkg(pkg)
-
- @classmethod
- def restart_service_cmd(klass, name):
- return ['service', name, 'restart']
-
-
-class FedoraDistro(RedHatDistro):
- id = 'Fedora'
-
-
-class RpmPkg(Pkg):
- type = 'RPM'
- _init_script_re = re.compile('/etc/rc.d/init.d/[\w\-\.]')
- _list_contents = [ 'rpm', '-ql', '$pkg_name' ]
-
- def __init__(self, name):
- Pkg.__init__(self, name)
-
- @property
- def shared_objects(self):
- if self._shared_objects != None:
- return self._shared_objects
-
- self._shared_objects = []
- contents = self._get_contents()
-
- for line in contents:
- m = self._so_regex.match(line)
- if m:
- self._shared_objects.append(m.group('so'))
- return self._shared_objects
-
- @property
- def services(self):
- if self._services != None:
- return self._services
-
- self._services = []
- contents = self._get_contents()
- # Only supports sysvinit so far:
- for line in contents:
- if self._init_script_re.match(line):
- self._services.append(os.path.basename(line.strip()))
- return self._services
+from . process import Process
+from . debiandistro import DebianDistro
+from . redhatdistro import FedoraDistro
+from . pkg import Pkg, PkgError
+from . debianpkg import DebianPkg
+from . rpmpkg import RpmPkg
def check_maps(procs, shared_objects):
@@ -374,7 +45,7 @@ def check_maps(procs, shared_objects):
for proc in procs:
for so in shared_objects:
if proc.maps(so):
- if restart_procs.has_key(proc.exe):
+ if proc.exe in restart_procs:
restart_procs[proc.exe] += [ proc ]
else:
restart_procs[proc.exe] = [ proc ]
@@ -427,13 +98,13 @@ def detect_distro():
def write_cmd_file(services, cmd_file, distro):
"Write out commands needed to restart the services to a file"
- out = file(cmd_file, 'w')
- print >>out, '#! /bin/sh'
+ out = open(cmd_file, 'w')
+ print('#! /bin/sh', file=out)
for service in services:
logging.debug("Need to restart %s", service)
- print >>out, " ".join(distro.restart_service_cmd(service))
+ print(" ".join(distro.restart_service_cmd(service)), file=out)
out.close()
- os.chmod(cmd_file, 0755)
+ os.chmod(cmd_file, 0o755)
def main(argv):
@@ -494,13 +165,14 @@ def main(argv):
logging.error("Cannot parse contents of %s" % pkg.name)
return 1
logging.debug("Found shared objects:")
- map(lambda x: logging.debug(" %s", x), shared_objects)
+ for so in shared_objects:
+ logging.debug(" %s", so)
# Find processes that map them
restart_procs = check_maps(get_all_pids(), shared_objects)
logging.debug("Processes that map them:")
- map(lambda (x, y): logging.debug(" Exe: %s Pids: %s", x, y),
- restart_procs.items())
+ for exe, pids in restart_procs.items():
+ logging.debug(" Exe: %s Pids: %s", exe, pids),
# Find packages that contain the binaries of these processes
pkgs = {}
@@ -509,15 +181,15 @@ def main(argv):
if not pkg:
logging.warning("No package found for '%s' - restart manually" % proc)
else:
- if pkgs.has_key(pkg.name):
+ if pkg.name in pkgs:
pkgs[pkg.name].procs.append(proc)
else:
pkg.procs = [ proc ]
pkgs[pkg.name] = pkg
logging.info("Packages that ship the affected binaries:")
- map(lambda x: logging.info(" Pkg: %s, binaries: %s" % (x.name, x.procs)),
- pkgs.values())
+ for pkg in pkgs.values():
+ logging.info(" Pkg: %s, binaries: %s" % (pkg.name, pkg.procs))
all_services = set()
try:
@@ -548,9 +220,9 @@ def main(argv):
logging.info("Restarting %s" % service)
distro.restart_service(service)
elif all_services:
- print "Services that possibly need to be restarted:"
+ print("Services that possibly need to be restarted:")
for s in all_services:
- print s
+ print(s)
return 0
diff --git a/whatmaps/debiandistro.py b/whatmaps/debiandistro.py
new file mode 100644
index 0000000..54c706a
--- /dev/null
+++ b/whatmaps/debiandistro.py
@@ -0,0 +1,164 @@
+# vim: set fileencoding=utf-8 :
+#
+# (C) 2010,2014 Guido Günther <agx@sigxcpu.org>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public Licnese for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+try:
+ import apt_pkg
+except ImportError:
+ apt_pkg = None
+
+try:
+ import lsb_release
+except ImportError:
+ lsb_release = None
+
+import logging
+import subprocess
+import sys
+
+from . distro import Distro
+from . debianpkg import DebianPkg
+from . pkg import PkgError
+
+class DebianDistro(Distro):
+ "Debian (dpkg) based distribution"""
+ id = 'Debian'
+
+ _pkg_services = { 'apache2-mpm-worker': [ 'apache2' ],
+ 'apache2-mpm-prefork': [ 'apache2' ],
+ 'apache2.2-bin': [ 'apache2' ],
+ 'dovecot-imapd': [ 'dovecot' ],
+ 'dovecot-pop3d': [ 'dovecot' ],
+ 'exim4-daemon-light': [ 'exim4' ],
+ 'exim4-daemon-heavy': [ 'exim4' ],
+ 'qemu-system-x86_64': [ 'libvirt-guests' ],
+ 'openjdk-6-jre-headless': ['jenkins', 'tomcat7'],
+ 'openjdk-7-jre-headless': ['jenkins', 'tomcat7'],
+ }
+
+ # Per package blacklist
+ _pkg_service_blacklist = { 'libvirt-bin': [ 'libvirt-guests' ] }
+
+ # Per distro blacklist
+ service_blacklist = set(['kvm', 'qemu-kvm', 'qemu-system-x86'])
+
+ @classmethod
+ def pkg(klass, name):
+ return DebianPkg(name)
+
+ @classmethod
+ def pkg_by_file(klass, path):
+ find_file = subprocess.Popen(['dpkg-query', '-S', path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ output = find_file.communicate()[0]
+ if find_file.returncode:
+ return None
+ pkg = output.split(':')[0]
+ return DebianPkg(pkg)
+
+ @classmethod
+ def restart_service_cmd(klass, name):
+ return ['invoke-rc.d', name, 'restart']
+
+ @classmethod
+ def has_apt(klass):
+ return True
+
+ @staticmethod
+ def read_apt_pipeline():
+ whatmaps_enabled = False
+
+ version = sys.stdin.readline().rstrip()
+ if version != "VERSION 2":
+ err = "Wrong or missing VERSION from apt pipeline"
+ logging.error("%s\n"
+ "(is Dpkg::Tools::Options::/usr/bin/whatmaps::Version set to 2?)"
+ % err)
+ raise PkgError(err)
+
+ while 1:
+ aptconfig = sys.stdin.readline()
+ if not aptconfig or aptconfig == '\n':
+ break
+ if aptconfig.startswith('Whatmaps::Enable-Restart=') and \
+ aptconfig.strip().split('=', 1)[1].lower() in ["true", "1"]:
+ logging.debug("Service restarts enabled")
+ whatmaps_enabled = True
+
+ if not whatmaps_enabled:
+ return None
+
+ pkgs = {}
+ for line in sys.stdin.readlines():
+ if not line:
+ break
+ (pkgname, oldversion, compare, newversion, filename) = line.split()
+
+ if filename == '**CONFIGURE**':
+ if oldversion != '-': # Updates only
+ pkgs[pkgname] = DebianPkg(pkgname)
+ pkgs[pkgname].version = newversion
+ return pkgs
+
+
+ @classmethod
+ def _security_update_origins(klass):
+ "Determine security update origins from apt configuration"
+
+ if lsb_release is None:
+ raise PkgError("lsb_release not found, can't determine security updates")
+
+ codename = lsb_release.get_distro_information()['CODENAME']
+ def _subst(line):
+ mapping = {'distro_codename' : codename,
+ 'distro_id' : klass.id, }
+ return string.Template(line).substitute(mapping)
+
+ origins = []
+ for s in apt_pkg.config.value_list('Whatmaps::Security-Update-Origins'):
+ (distro_id, distro_codename) = s.split()
+ origins.append((_subst(distro_id),
+ _subst(distro_codename)))
+ logging.debug("Security Update Origins: %s", origins)
+ return origins
+
+
+ @classmethod
+ def filter_security_updates(klass, pkgs):
+ """Filter on security updates"""
+
+ if apt_pkg is None:
+ raise PkgError("apt_pkg not installed, can't determine security updates")
+
+ apt_pkg.init()
+ acquire = apt_pkg.Acquire()
+ cache = apt_pkg.Cache()
+
+ security_update_origins = klass._security_update_origins()
+ security_updates = {}
+
+ for pkg in pkgs.values():
+ cache_pkg = cache[pkg.name]
+ for cache_version in cache_pkg.version_list:
+ if pkg.version == cache_version.ver_str:
+ for pfile, _ in cache_version.file_list:
+ for origin in security_update_origins:
+ if pfile.origin == origin[0] and \
+ pfile.archive == origin[1]:
+ security_updates[pkg] = pkg
+ break
+ return security_updates
diff --git a/whatmaps/debianpkg.py b/whatmaps/debianpkg.py
new file mode 100644
index 0000000..8a3349d
--- /dev/null
+++ b/whatmaps/debianpkg.py
@@ -0,0 +1,42 @@
+# vim: set fileencoding=utf-8 :
+#
+# (C) 2010,2014 Guido Günther <agx@sigxcpu.org>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import re
+
+from . pkg import Pkg
+
+class DebianPkg(Pkg):
+ type = 'Debian'
+ _init_script_re = re.compile('/etc/init.d/[\w\-\.]')
+ _list_contents = ['dpkg-query', '-L', '${pkg_name}' ]
+
+ def __init__(self, name):
+ Pkg.__init__(self, name)
+
+ @property
+ def services(self):
+ if self._services != None:
+ return self._services
+
+ self._services = []
+ contents = self._get_contents()
+ # Only supports sysvinit so far:
+ for line in contents:
+ if self._init_script_re.match(line):
+ self._services.append(os.path.basename(line.strip()))
+ return self._services
+
diff --git a/whatmaps/distro.py b/whatmaps/distro.py
new file mode 100644
index 0000000..e4f417f
--- /dev/null
+++ b/whatmaps/distro.py
@@ -0,0 +1,70 @@
+# vim: set fileencoding=utf-8 :
+#
+# (C) 2010,2014 Guido Günther <agx@sigxcpu.org>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import subprocess
+
+class Distro(object):
+ """
+ A distribution
+ @cvar id: distro id as returned by lsb-release
+ """
+ id = None
+ service_blacklist = set()
+ _pkg_services = {}
+ _pkg_blacklist = {}
+ _pkg_service_blacklist = {}
+
+ @classmethod
+ def pkg(klass, name):
+ """Return package object named name"""
+ raise NotImplementedError
+
+ @classmethod
+ def pkg_by_file(klass, path):
+ """Return package object that contains path"""
+ raise NotImplementedError
+
+ @classmethod
+ def restart_service_cmd(klass, service):
+ """Command to restart service"""
+ raise NotImplementedError
+
+ @classmethod
+ def restart_service(klass, service):
+ """Restart a service"""
+ subprocess.call(klass.restart_service_cmd(service))
+
+ @classmethod
+ def pkg_services(klass, pkg):
+ """
+ List of services that package pkg needs restarted that aren't part
+ of pkg itself
+ """
+ return klass._pkg_services.get(pkg.name, [])
+
+ @classmethod
+ def pkg_service_blacklist(klass, pkg):
+ """
+ List of services in pkg that we don't want to be restarted even when
+ a binary from this package maps a shared lib that changed.
+ """
+ return klass._pkg_service_blacklist.get(pkg.name, [])
+
+ @classmethod
+ def has_apt(klass):
+ """Does the distribution use apt"""
+ return False
diff --git a/whatmaps/pkg.py b/whatmaps/pkg.py
new file mode 100644
index 0000000..ab9088d
--- /dev/null
+++ b/whatmaps/pkg.py
@@ -0,0 +1,81 @@
+# vim: set fileencoding=utf-8 :
+#
+# (C) 2010,2014 Guido Günther <agx@sigxcpu.org>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import re
+import string
+import subprocess
+
+class PkgError(Exception):
+ pass
+
+
+class Pkg(object):
+ """
+ A package in a distribution
+ @var services: list of services provided by package
+ @var shared_objects: list of shared objects shipped in this package
+ @cvar type: package type (e.g. RPM or Debian)
+ @cvar _so_regex: regex that matches shared objects in the list returned by
+ _get_contents
+ @cvar _list_contents: command to list contents of a package, will be passed
+ to subprocess. "$pkg_name" will be replaced by the package
+ name.
+ """
+
+ type = None
+ services = None
+ _so_regex = re.compile(r'(?P<so>/.*\.so(\.[^/]*)?$)')
+ _list_contents = None
+
+ def __init__(self, name):
+ self.name = name
+ self._services = None
+ self._shared_objects = None
+ self._contents = None
+
+ def __repr__(self):
+ return "<%s Pkg object name:'%s'>" % (self.type, self.name)
+
+ def _get_contents(self):
+ """List of files in the package"""
+ if self._contents:
+ return self._contents
+ else:
+ cmd = [ string.Template(arg).substitute(arg, pkg_name=self.name)
+ for arg in self._list_contents ]
+ list_contents = subprocess.Popen(cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ output = list_contents.communicate()[0]
+ if list_contents.returncode:
+ raise PkgError("Failed to list package contents for '%s'" % self.name)
+ self._contents = output.split('\n')
+ return self._contents
+
+ @property
+ def shared_objects(self):
+ if self._shared_objects is not None:
+ return self._shared_objects
+
+ self._shared_objects = []
+ contents = self._get_contents()
+
+ for line in contents:
+ m = self._so_regex.match(line)
+ if m:
+ self._shared_objects.append(m.group('so'))
+ return self._shared_objects
diff --git a/whatmaps/process.py b/whatmaps/process.py
index 3fe23f3..f3d5dcd 100644
--- a/whatmaps/process.py
+++ b/whatmaps/process.py
@@ -53,7 +53,7 @@ class Process(object):
def _read_maps(self):
"""Read the SOs from /proc/<pid>/maps"""
try:
- f = file(self._procpath('%d/maps' % self.pid))
+ f = open(self._procpath('%d/maps' % self.pid))
except IOError as e:
# ignore killed process
if e.errno != errno.ENOENT:
diff --git a/whatmaps/redhatdistro.py b/whatmaps/redhatdistro.py
new file mode 100644
index 0000000..4d9f5cf
--- /dev/null
+++ b/whatmaps/redhatdistro.py
@@ -0,0 +1,57 @@
+#!/usr/bin/python -u
+# vim: set fileencoding=utf-8 :
+#
+# (C) 2010,2014 Guido Günther <agx@sigxcpu.org>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import re
+import subprocess
+
+from . distro import Distro
+from . pkg import PkgError
+from . rpmpkg import RpmPkg
+
+class RedHatDistro(Distro):
+ "RPM based distribution"""
+ _pkg_re = re.compile(r'(?P<pkg>[\w\-\+]+)-(?P<ver>[\w\.]+)'
+ '-(?P<rel>[\w\.]+)\.(?P<arch>.+)')
+
+ @classmethod
+ def pkg(klass, name):
+ return RpmPkg(name)
+
+ @classmethod
+ def pkg_by_file(klass, path):
+ find_file = subprocess.Popen(['rpm', '-qf', path],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ output = find_file.communicate()[0]
+ if find_file.returncode:
+ return None
+ m = klass._pkg_re.match(output.strip())
+ if m:
+ pkg = m.group('pkg')
+ else:
+ pkg = output.strip()
+ return RpmPkg(pkg)
+
+ @classmethod
+ def restart_service_cmd(klass, name):
+ return ['service', name, 'restart']
+
+
+class FedoraDistro(RedHatDistro):
+ id = 'Fedora'
+
diff --git a/whatmaps/rpmpkg.py b/whatmaps/rpmpkg.py
new file mode 100644
index 0000000..595edd2
--- /dev/null
+++ b/whatmaps/rpmpkg.py
@@ -0,0 +1,41 @@
+# vim: set fileencoding=utf-8 :
+#
+# (C) 2010,2014 Guido Günther <agx@sigxcpu.org>
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import re
+
+from . pkg import Pkg
+
+class RpmPkg(Pkg):
+ type = 'RPM'
+ _init_script_re = re.compile('/etc/rc.d/init.d/[\w\-\.]')
+ _list_contents = [ 'rpm', '-ql', '$pkg_name' ]
+
+ def __init__(self, name):
+ Pkg.__init__(self, name)
+
+ @property
+ def services(self):
+ if self._services != None:
+ return self._services
+
+ self._services = []
+ contents = self._get_contents()
+ # Only supports sysvinit so far:
+ for line in contents:
+ if self._init_script_re.match(line):
+ self._services.append(os.path.basename(line.strip()))
+ return self._services