Move and rename write_header() to ProfileStorage.get_header()

Also adjust the calling code to use get_header() instead of
write_header().

Finally, move the tests to test-profile-storage.py and do a few
adjustments needed by the change. Part of these adjustments is to hand
over empty params with the correct type instead of just "None".
This commit is contained in:
Christian Boltz 2021-04-11 14:55:24 +02:00
parent 4ef975fb97
commit 6a170ddaa1
Failed to generate hash of commit
4 changed files with 120 additions and 114 deletions

View file

@ -61,7 +61,6 @@ from apparmor.rule.include import IncludeRule
from apparmor.rule.network import NetworkRule
from apparmor.rule.ptrace import PtraceRule
from apparmor.rule.signal import SignalRule
from apparmor.rule import quote_if_needed
# setup module translations
from apparmor.translations import init_translation
@ -654,7 +653,7 @@ def change_profile_flags(prof_filename, program, flag, set_flag):
prof_storage['header_comment'] = matches['comment'] or ''
prof_storage['xattrs'] = matches['xattrs']
line = write_header(prof_storage, len(space)/2, profile, False, True)
line = prof_storage.get_header(len(space)/2, profile, False, True)
line = '%s\n' % line[0]
elif RE_PROFILE_HAT_DEF.search(line):
matches = RE_PROFILE_HAT_DEF.search(line)
@ -2107,35 +2106,6 @@ def parse_unix_rule(line):
# XXX Do real parsing here
return aarules.Raw_Unix_Rule(line)
def write_header(prof_data, depth, name, embedded_hat, write_flags):
pre = ' ' * int(depth * 2)
data = []
unquoted_name = name
name = quote_if_needed(name)
attachment = ''
if prof_data['attachment']:
attachment = ' %s' % quote_if_needed(prof_data['attachment'])
comment = ''
if prof_data['header_comment']:
comment = ' %s' % prof_data['header_comment']
if (not embedded_hat and not unquoted_name.startswith('/')) or (embedded_hat and not unquoted_name.startswith('^')) or prof_data['attachment'] or prof_data['profile_keyword']:
name = 'profile %s%s' % (name, attachment)
xattrs = ''
if prof_data['xattrs']:
xattrs = ' xattrs=(%s)' % prof_data['xattrs']
flags = ''
if write_flags and prof_data['flags']:
flags = ' flags=(%s)' % prof_data['flags']
data.append('%s%s%s%s {%s' % (pre, name, xattrs, flags, comment))
return data
def write_piece(profile_data, depth, name, nhat, write_flags):
pre = ' ' * depth
data = []
@ -2147,7 +2117,7 @@ def write_piece(profile_data, depth, name, nhat, write_flags):
wname = name + '//' + nhat
name = nhat
inhat = True
data += write_header(profile_data[name], depth, wname, False, write_flags)
data += profile_data[name].get_header(depth, wname, False, write_flags)
data += profile_data[name].get_rules_clean(depth + 1)
pre2 = ' ' * (depth + 1)
@ -2158,9 +2128,9 @@ def write_piece(profile_data, depth, name, nhat, write_flags):
if not profile_data[hat]['external']:
data.append('')
if profile_data[hat]['profile']:
data += write_header(profile_data[hat], depth + 1, hat, True, write_flags)
data += profile_data[hat].get_header(depth + 1, hat, True, write_flags)
else:
data += write_header(profile_data[hat], depth + 1, '^' + hat, True, write_flags)
data += profile_data[hat].get_header(depth + 1, '^' + hat, True, write_flags)
data += profile_data[hat].get_rules_clean(depth + 2)

View file

@ -130,6 +130,35 @@ class ProfileStorage:
else:
raise AppArmorBug('attempt to read unknown key %s' % key)
def get_header(self, depth, name, embedded_hat, write_flags):
pre = ' ' * int(depth * 2)
data = []
unquoted_name = name
name = quote_if_needed(name)
attachment = ''
if self.data['attachment']:
attachment = ' %s' % quote_if_needed(self.data['attachment'])
comment = ''
if self.data['header_comment']:
comment = ' %s' % self.data['header_comment']
if (not embedded_hat and not unquoted_name.startswith('/')) or (embedded_hat and not unquoted_name.startswith('^')) or self.data['attachment'] or self.data['profile_keyword']:
name = 'profile %s%s' % (name, attachment)
xattrs = ''
if self.data['xattrs']:
xattrs = ' xattrs=(%s)' % self.data['xattrs']
flags = ''
if write_flags and self.data['flags']:
flags = ' flags=(%s)' % self.data['flags']
data.append('%s%s%s%s {%s' % (pre, name, xattrs, flags, comment))
return data
def get_rules_clean(self, depth):
'''return all clean rules of a profile (with default formatting, and leading whitespace as specified in the depth parameter)

View file

@ -20,7 +20,7 @@ import sys
import apparmor.aa # needed to set global vars in some tests
from apparmor.aa import (check_for_apparmor, get_output, get_reqs, get_interpreter_and_abstraction, create_new_profile,
get_profile_flags, change_profile_flags, set_options_audit_mode, set_options_owner_mode, is_skippable_file,
parse_profile_start, parse_profile_start_to_storage, parse_profile_data, write_header,
parse_profile_start, parse_profile_start_to_storage, parse_profile_data,
get_file_perms, propose_file_rules)
from apparmor.aare import AARE
from apparmor.common import AppArmorException, AppArmorBug
@ -581,85 +581,6 @@ class AaTest_parse_profile_data(AATest):
d = '/foo flags=(complain) xattrs=(user.bar=bar) {\n}\n'
parse_profile_data(d.split(), 'somefile', False, False)
class AaTest_write_header(AATest):
tests = [
# name embedded_hat write_flags depth flags attachment prof.keyw. comment expected
(['/foo', False, True, 1, 'complain', None, None, None ], ' /foo flags=(complain) {'),
(['/foo', True, True, 1, 'complain', None, None, None ], ' profile /foo flags=(complain) {'),
(['/foo sp', False, False, 2, 'complain', None, None, None ], ' "/foo sp" {'),
(['/foo' ,False, False, 2, 'complain', None, None, None ], ' /foo {'),
(['/foo', True, False, 2, 'complain', None, None, None ], ' profile /foo {'),
(['/foo', False, True, 0, None, None, None, None ], '/foo {'),
(['/foo', True, True, 0, None, None, None, None ], 'profile /foo {'),
(['/foo', False, False, 0, None, None, None, None ], '/foo {'),
(['/foo', True, False, 0, None, None, None, None ], 'profile /foo {'),
(['bar', False, True, 1, 'complain', None, None, None ], ' profile bar flags=(complain) {'),
(['bar', False, True, 1, 'complain', '/foo', None, None ], ' profile bar /foo flags=(complain) {'),
(['bar', True, True, 1, 'complain', '/foo', None, None ], ' profile bar /foo flags=(complain) {'),
(['bar baz', False, True, 1, None, '/foo', None, None ], ' profile "bar baz" /foo {'),
(['bar', True, True, 1, None, '/foo', None, None ], ' profile bar /foo {'),
(['bar baz', False, True, 1, 'complain', '/foo sp', None, None ], ' profile "bar baz" "/foo sp" flags=(complain) {'),
(['^foo', False, True, 1, 'complain', None, None, None ], ' profile ^foo flags=(complain) {'),
(['^foo', True, True, 1, 'complain', None, None, None ], ' ^foo flags=(complain) {'),
(['^foo', True, True, 1.5, 'complain', None, None, None ], ' ^foo flags=(complain) {'),
(['^foo', True, True, 1.3, 'complain', None, None, None ], ' ^foo flags=(complain) {'),
(['/foo', False, True, 1, 'complain', None, 'profile', None ], ' profile /foo flags=(complain) {'),
(['/foo', True, True, 1, 'complain', None, 'profile', None ], ' profile /foo flags=(complain) {'),
(['/foo', False, True, 1, 'complain', None, None, '# x' ], ' /foo flags=(complain) { # x'),
(['/foo', True, True, 1, None, None, None, '# x' ], ' profile /foo { # x'),
(['/foo', False, True, 1, None, None, 'profile', '# x' ], ' profile /foo { # x'),
(['/foo', True, True, 1, 'complain', None, 'profile', '# x' ], ' profile /foo flags=(complain) { # x'),
]
def _run_test(self, params, expected):
name = params[0]
embedded_hat = params[1]
write_flags = params[2]
depth = params[3]
prof_data = { 'flags': params[4], 'attachment': params[5], 'profile_keyword': params[6], 'header_comment': params[7], 'xattrs': '' }
result = write_header(prof_data, depth, name, embedded_hat, write_flags)
self.assertEqual(result, [expected])
class AaTest_write_header_01(AATest):
tests = [
(
{'name': '/foo', 'write_flags': True, 'depth': 1, 'flags': 'complain'},
' /foo flags=(complain) {',
),
(
{'name': '/foo', 'write_flags': True, 'depth': 1, 'flags': 'complain', 'profile_keyword': 'profile'},
' profile /foo flags=(complain) {',
),
(
{'name': '/foo', 'write_flags': True, 'flags': 'complain'},
'/foo flags=(complain) {',
),
(
{'name': '/foo', 'xattrs': 'user.foo=bar', 'write_flags': True, 'flags': 'complain'},
'/foo xattrs=(user.foo=bar) flags=(complain) {',
),
(
{'name': '/foo', 'xattrs': 'user.foo=bar', 'embedded_hat': True},
'profile /foo xattrs=(user.foo=bar) {',
),
]
def _run_test(self, params, expected):
name = params['name']
embedded_hat = params.get('embedded_hat', False)
write_flags = params.get('write_flags', False)
depth = params.get('depth', 0)
prof_data = {
'xattrs': params.get('xattrs', None),
'flags': params.get('flags', None),
'attachment': params.get('attachment', None),
'profile_keyword': params.get('profile_keyword', None),
'header_comment': params.get('header_comment', None),
}
result = write_header(prof_data, depth, name, embedded_hat, write_flags)
self.assertEqual(result, [expected])
class AaTest_get_file_perms_1(AATest):
tests = [
('/usr/share/common-licenses/foo/bar', {'allow': {'all': set(), 'owner': {'w'} }, 'deny': {'all':set(), 'owner': set()}, 'paths': {'/usr/share/common-licenses/**'} }),

View file

@ -35,6 +35,92 @@ class TestUnknownKey(AATest):
with self.assertRaises(AppArmorBug):
self.storage['foo'] = 'bar'
class AaTest_get_header(AATest):
tests = [
# name embedded_hat write_flags depth flags attachment prof.keyw. comment expected
(['/foo', False, True, 1, 'complain', '', False, '' ], ' /foo flags=(complain) {'),
(['/foo', True, True, 1, 'complain', '', False, '' ], ' profile /foo flags=(complain) {'),
(['/foo sp', False, False, 2, 'complain', '', False, '' ], ' "/foo sp" {'),
(['/foo' ,False, False, 2, 'complain', '', False, '' ], ' /foo {'),
(['/foo', True, False, 2, 'complain', '', False, '' ], ' profile /foo {'),
(['/foo', False, True, 0, None, '', False, '' ], '/foo {'),
(['/foo', True, True, 0, None, '', False, '' ], 'profile /foo {'),
(['/foo', False, False, 0, None, '', False, '' ], '/foo {'),
(['/foo', True, False, 0, None, '', False, '' ], 'profile /foo {'),
(['bar', False, True, 1, 'complain', '', False, '' ], ' profile bar flags=(complain) {'),
(['bar', False, True, 1, 'complain', '/foo', False, '' ], ' profile bar /foo flags=(complain) {'),
(['bar', True, True, 1, 'complain', '/foo', False, '' ], ' profile bar /foo flags=(complain) {'),
(['bar baz', False, True, 1, None, '/foo', False, '' ], ' profile "bar baz" /foo {'),
(['bar', True, True, 1, None, '/foo', False, '' ], ' profile bar /foo {'),
(['bar baz', False, True, 1, 'complain', '/foo sp', False, '' ], ' profile "bar baz" "/foo sp" flags=(complain) {'),
(['^foo', False, True, 1, 'complain', '', False, '' ], ' profile ^foo flags=(complain) {'),
(['^foo', True, True, 1, 'complain', '', False, '' ], ' ^foo flags=(complain) {'),
(['^foo', True, True, 1.5, 'complain', '', False, '' ], ' ^foo flags=(complain) {'),
(['^foo', True, True, 1.3, 'complain', '', False, '' ], ' ^foo flags=(complain) {'),
(['/foo', False, True, 1, 'complain', '', True, '' ], ' profile /foo flags=(complain) {'),
(['/foo', True, True, 1, 'complain', '', True, '' ], ' profile /foo flags=(complain) {'),
(['/foo', False, True, 1, 'complain', '', False, '# x' ], ' /foo flags=(complain) { # x'),
(['/foo', True, True, 1, None, '', False, '# x' ], ' profile /foo { # x'),
(['/foo', False, True, 1, None, '', True, '# x' ], ' profile /foo { # x'),
(['/foo', True, True, 1, 'complain', '', True, '# x' ], ' profile /foo flags=(complain) { # x'),
]
def _run_test(self, params, expected):
name = params[0]
embedded_hat = params[1]
write_flags = params[2]
depth = params[3]
prof_storage = ProfileStorage(name, '', 'test')
prof_storage['flags'] = params[4]
prof_storage['attachment'] = params[5]
prof_storage['profile_keyword'] = params[6]
prof_storage['header_comment'] = params[7]
prof_storage['xattrs'] = ''
result = prof_storage.get_header(depth, name, embedded_hat, write_flags)
self.assertEqual(result, [expected])
class AaTest_get_header_01(AATest):
tests = [
(
{'name': '/foo', 'write_flags': True, 'depth': 1, 'flags': 'complain'},
' /foo flags=(complain) {',
),
(
{'name': '/foo', 'write_flags': True, 'depth': 1, 'flags': 'complain', 'profile_keyword': True},
' profile /foo flags=(complain) {',
),
(
{'name': '/foo', 'write_flags': True, 'flags': 'complain'},
'/foo flags=(complain) {',
),
(
{'name': '/foo', 'xattrs': 'user.foo=bar', 'write_flags': True, 'flags': 'complain'},
'/foo xattrs=(user.foo=bar) flags=(complain) {',
),
(
{'name': '/foo', 'xattrs': 'user.foo=bar', 'embedded_hat': True},
'profile /foo xattrs=(user.foo=bar) {',
),
]
def _run_test(self, params, expected):
name = params['name']
embedded_hat = params.get('embedded_hat', False)
write_flags = params.get('write_flags', False)
depth = params.get('depth', 0)
prof_storage = ProfileStorage(name, '', 'test')
for param in ['flags', 'attachment', 'profile_keyword', 'header_comment', 'xattrs']:
if params.get(param) is not None:
prof_storage[param] = params[param]
result = prof_storage.get_header(depth, name, embedded_hat, write_flags)
self.assertEqual(result, [expected])
class AaTest_add_or_remove_flag(AATest):
tests = [
# existing flag(s) flag to change add or remove? expected flags