1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
# vim: set fileencoding=utf-8 :
from .. import context # noqa: F401
import os
import shutil
import subprocess
import tarfile
import tempfile
import unittest
import zipfile
import gbp.log
import gbp.errors
from gbp.deb.changelog import ChangeLog
from . gbplogtester import GbpLogTester
from . debiangittestrepo import DebianGitTestRepo
from . capture import capture_stdout, capture_stderr
from . popen import patch_popen
__all__ = ['GbpLogTester', 'DebianGitTestRepo', 'OsReleaseFile',
'MockedChangeLog', 'get_dch_default_urgency',
'capture_stderr', 'capture_stdout',
'ls_dir', 'ls_tar', 'ls_zip',
'patch_popen', 'have_cmd', 'skip_without_cmd']
class OsReleaseFile(object):
"""Repesents a simple file with key-value pairs"""
def __init__(self, filename="/etc/os-release"):
self._values = {}
try:
with open(filename, 'r') as filed:
for line in filed.readlines():
try:
key, value = line.split('=', 1)
except ValueError:
pass
else:
self._values[key] = value.strip()
except IOError as err:
gbp.log.info('Failed to read OS release file %s: %s' %
(filename, err))
def __getitem__(self, key):
if key in self._values:
return self._values[key]
return None
def __contains__(self, key):
return key in self._values
def __str__(self):
return str(self._values)
def __repr__(self):
return repr(self._values)
class MockedChangeLog(ChangeLog):
contents = """foo (%s) experimental; urgency=low
%s
-- Debian Maintainer <maint@debian.org> Sat, 01 Jan 2012 00:00:00 +0100"""
def __init__(self, version, changes="a important change"):
ChangeLog.__init__(self,
contents=self.contents % (version, changes))
def get_dch_default_urgency():
"""Determine the default urgency level used by dch"""
urgency = 'medium'
tempdir = tempfile.mkdtemp()
tmp_dch_name = os.path.join(tempdir, 'changelog')
try:
dch_cmd = ['debchange', '--create', '--empty', '--changelog', tmp_dch_name,
'--package=foo', '--newversion=1',
'--distribution=UNRELEASED']
ret = subprocess.Popen(dch_cmd).wait()
except OSError:
pass
else:
if ret == 0:
with open(tmp_dch_name, encoding='utf-8') as dchfile:
header = dchfile.readline().strip()
urgency = header.split()[-1].replace('urgency=', '')
finally:
if os.path.isdir(tempdir):
shutil.rmtree(tempdir)
return urgency
def ls_dir(directory, directories=True):
"""List the contents of directory, recurse to subdirectories"""
contents = set()
for root, dirs, files in os.walk(directory):
prefix = ''
if root != directory:
prefix = os.path.relpath(root, directory) + '/'
contents.update(['%s%s' % (prefix, fname) for fname in files])
if directories:
contents.update(['%s%s' % (prefix, dname) for dname in dirs])
return contents
def ls_tar(tarball, directories=True):
"""List the contents of tar archive"""
tmpdir = tempfile.mkdtemp()
try:
tarobj = tarfile.open(tarball, 'r')
tarobj.extractall(tmpdir)
return ls_dir(tmpdir, directories)
finally:
shutil.rmtree(tmpdir)
def ls_zip(archive, directories=True):
"""List the contents of zip file"""
tmpdir = tempfile.mkdtemp()
try:
zipobj = zipfile.ZipFile(archive, 'r')
zipobj.extractall(tmpdir)
return ls_dir(tmpdir, directories)
finally:
shutil.rmtree(tmpdir)
def have_cmd(cmd):
"""Check if a command is available"""
return True if shutil.which(cmd) else False
def skip_without_cmd(cmd):
"""Skip if a command is not available"""
if have_cmd(cmd):
return lambda func: func
else:
return unittest.skip("Command '%s' not found" % cmd)
|