1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
# vim: set fileencoding=utf-8 :
from .. import context
import os
import shutil
import subprocess
import tarfile
import tempfile
import zipfile
import gbp.log
import gbp.errors
from gbp.deb.changelog import ChangeLog
from . gbplogtester import GbpLogTester
from . debiangittestrepo import DebianGitTestRepo
from . capture import capture_stderr
__all__ = ['GbpLogTester', 'DebianGitTestRepo', 'OsReleaseFile',
'MockedChangeLog', 'get_dch_default_urgency', 'capture_stderr',
'ls_dir', 'ls_tar', 'ls_zip']
class OsReleaseFile(object):
"""Repesents a simple file with key-value pairs"""
def __init__(self, filename):
self._values = {}
try:
with open(filename, 'r') as filed:
for line in filed.readlines():
try:
key, value = line.split('=', 1)
except ValueError:
pass
else:
self._values[key] = value.strip()
except IOError as err:
gbp.log.info('Failed to read OS release file %s: %s' %
(filename, err))
def __getitem__(self, key):
if key in self._values:
return self._values[key]
return None
def __contains__(self, key):
return key in self._values
def __str__(self):
return str(self._values)
def __repr__(self):
return repr(self._values)
class MockedChangeLog(ChangeLog):
contents = """foo (%s) experimental; urgency=low
%s
-- Debian Maintainer <maint@debian.org> Sat, 01 Jan 2012 00:00:00 +0100"""
def __init__(self, version, changes = "a important change"):
ChangeLog.__init__(self,
contents=self.contents % (version, changes))
def get_dch_default_urgency():
"""Determine the default urgency level used by dch"""
urgency = 'medium'
tempdir = tempfile.mkdtemp()
tmp_dch_name = os.path.join(tempdir, 'changelog')
try:
dch_cmd = ['debchange', '--create', '--empty', '--changelog', tmp_dch_name,
'--package=foo', '--newversion=1',
'--distribution=UNRELEASED']
ret = subprocess.Popen(dch_cmd).wait()
except OSError:
pass
else:
if ret == 0:
with open(tmp_dch_name, encoding='utf-8') as dchfile:
header = dchfile.readline().strip()
urgency = header.split()[-1].replace('urgency=', '')
finally:
if os.path.isdir(tempdir):
shutil.rmtree(tempdir)
return urgency
def ls_dir(directory, directories=True):
"""List the contents of directory, recurse to subdirectories"""
contents = set()
for root, dirs, files in os.walk(directory):
prefix = ''
if root != directory:
prefix = os.path.relpath(root, directory) + '/'
contents.update(['%s%s' % (prefix, fname) for fname in files])
if directories:
contents.update(['%s%s' % (prefix, dname) for dname in dirs])
return contents
def ls_tar(tarball, directories=True):
"""List the contents of tar archive"""
tmpdir = tempfile.mkdtemp()
try:
tarobj = tarfile.open(tarball, 'r')
tarobj.extractall(tmpdir)
return ls_dir(tmpdir, directories)
finally:
shutil.rmtree(tmpdir)
def ls_zip(archive, directories=True):
"""List the contents of zip file"""
tmpdir = tempfile.mkdtemp()
try:
zipobj = zipfile.ZipFile(archive, 'r')
zipobj.extractall(tmpdir)
return ls_dir(tmpdir, directories)
finally:
shutil.rmtree(tmpdir)
|