aboutsummaryrefslogtreecommitdiffhomepage
path: root/tests/testutils/__init__.py
blob: 2072e0df2392c44146946f32d3131dc7b4a74d55 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# vim: set fileencoding=utf-8 :

from .. import context  # noqa: F401

import os
import shutil
import subprocess
import tarfile
import tempfile
import unittest
import zipfile

import gbp.log
import gbp.errors
from gbp.deb.changelog import ChangeLog

from . gbplogtester import GbpLogTester
from . debiangittestrepo import DebianGitTestRepo
from . capture import capture_stdout, capture_stderr
from . popen import patch_popen

__all__ = ['GbpLogTester', 'DebianGitTestRepo', 'OsReleaseFile',
           'MockedChangeLog', 'get_dch_default_urgency',
           'capture_stderr', 'capture_stdout',
           'ls_dir', 'ls_tar', 'ls_zip',
           'patch_popen', 'have_cmd', 'skip_without_cmd']


class OsReleaseFile(object):
    """Represents a simple file with key-value pairs"""

    def __init__(self, filename="/etc/os-release"):
        self._values = {}

        try:
            with open(filename, 'r') as filed:
                for line in filed.readlines():
                    try:
                        key, value = line.split('=', 1)
                    except ValueError:
                        pass
                    else:
                        self._values[key] = value.strip()
        except IOError as err:
            gbp.log.info('Failed to read OS release file %s: %s' %
                         (filename, err))

    def __getitem__(self, key):
        if key in self._values:
            return self._values[key]
        return None

    def __contains__(self, key):
        return key in self._values

    def __str__(self):
        return str(self._values)

    def __repr__(self):
        return repr(self._values)


class MockedChangeLog(ChangeLog):
    contents = """foo (%s) experimental; urgency=low

  %s

 -- Debian Maintainer <maint@debian.org>  Sat, 01 Jan 2012 00:00:00 +0100"""

    def __init__(self, version, changes="a important change"):
        ChangeLog.__init__(self,
                           contents=self.contents % (version, changes))


def get_dch_default_urgency():
    """Determine the default urgency level used by dch"""
    urgency = 'medium'
    tempdir = tempfile.mkdtemp()
    tmp_dch_name = os.path.join(tempdir, 'changelog')
    try:
        dch_cmd = ['debchange', '--create', '--empty', '--changelog', tmp_dch_name,
                   '--package=foo', '--newversion=1',
                   '--distribution=UNRELEASED']
        ret = subprocess.Popen(dch_cmd).wait()
    except OSError:
        pass
    else:
        if ret == 0:
            with open(tmp_dch_name, encoding='utf-8') as dchfile:
                header = dchfile.readline().strip()
                urgency = header.split()[-1].replace('urgency=', '')
    finally:
        if os.path.isdir(tempdir):
            shutil.rmtree(tempdir)
    return urgency


def ls_dir(directory, directories=True):
    """List the contents of directory, recurse to subdirectories"""
    contents = set()
    for root, dirs, files in os.walk(directory):
        prefix = ''
        if root != directory:
            prefix = os.path.relpath(root, directory) + '/'
        contents.update(['%s%s' % (prefix, fname) for fname in files])
        if directories:
            contents.update(['%s%s' % (prefix, dname) for dname in dirs])
    return contents


def ls_tar(tarball, directories=True):
    """List the contents of tar archive"""
    tmpdir = tempfile.mkdtemp()
    try:
        tarobj = tarfile.open(tarball, 'r')
        tarobj.extractall(tmpdir)
        return ls_dir(tmpdir, directories)
    finally:
        shutil.rmtree(tmpdir)


def ls_zip(archive, directories=True):
    """List the contents of zip file"""
    tmpdir = tempfile.mkdtemp()
    try:
        zipobj = zipfile.ZipFile(archive, 'r')
        zipobj.extractall(tmpdir)
        return ls_dir(tmpdir, directories)
    finally:
        shutil.rmtree(tmpdir)


def have_cmd(cmd):
    """Check if a command is available"""
    return True if shutil.which(cmd) else False


def skip_without_cmd(cmd):
    """Skip if a command is not available"""
    if have_cmd(cmd):
        return lambda func: func
    else:
        return unittest.skip("Command '%s' not found" % cmd)