Merge Switch handling of include rules from filelist to IncludeRule

This patch series moves include rule handling away from the `filelist` hasher to using IncludeRule and IncludeRuleset. This means that only variable handling is left in `filelist`.

As usual, check the individual commits for details.

PR: https://gitlab.com/apparmor/apparmor/-/merge_requests/537
Acked-by: John Johansen <john.johansen@canonical.com>
This commit is contained in:
John Johansen 2020-05-21 08:52:49 +00:00
commit 4b79a58c25
12 changed files with 278 additions and 222 deletions

View file

@ -23,7 +23,6 @@ import apparmor.cleanprofile as cleanprofile
import apparmor.ui as aaui
from apparmor.common import AppArmorException
from apparmor.regex import re_match_include
# setup exception handling
@ -130,38 +129,6 @@ class Merge(object):
log_dict = {'merge': other.aa}
apparmor.aa.loadincludes()
done = False
#Add the file-wide includes from the other profile to the user profile
options = []
for inc in other.filelist[other.filename]['include'].keys():
if not inc in self.user.filelist[self.user.filename]['include'].keys():
if inc.startswith('/'):
options.append('#include "%s"' %inc)
else:
options.append('#include <%s>' %inc)
default_option = 1
q = aaui.PromptQuestion()
q.options = options
q.selected = default_option - 1
q.headers = [_('File includes'), _('Select the ones you wish to add')]
q.functions = ['CMD_ALLOW', 'CMD_IGNORE_ENTRY', 'CMD_ABORT', 'CMD_FINISHED']
q.default = 'CMD_ALLOW'
while not done and options:
ans, selected = q.promptUser()
if ans == 'CMD_IGNORE_ENTRY':
done = True
elif ans == 'CMD_ALLOW':
selection = options[selected]
inc = re_match_include(selection)
self.user.filelist[self.user.filename]['include'][inc] = True
options.pop(selected)
aaui.UI_Info(_('Adding %s to the file.') % selection)
elif ans == 'CMD_FINISHED':
return
if not apparmor.aa.sev_db:
apparmor.aa.sev_db = apparmor.severity.Severity(apparmor.aa.CONFDIR + '/severity.db', _('unknown'))

View file

@ -32,7 +32,7 @@ from copy import deepcopy
from apparmor.aare import AARE
from apparmor.common import (AppArmorException, AppArmorBug, open_file_read, valid_path, hasher,
from apparmor.common import (AppArmorException, AppArmorBug, is_skippable_file, open_file_read, valid_path, hasher,
split_name, open_file_write, DebugLogger)
import apparmor.ui as aaui
@ -50,7 +50,7 @@ from apparmor.regex import (RE_PROFILE_START, RE_PROFILE_END,
from apparmor.profile_list import ProfileList
from apparmor.profile_storage import (ProfileStorage, add_or_remove_flag, ruletypes,
write_includes, write_list_vars )
write_list_vars )
import apparmor.rules as aarules
@ -442,7 +442,7 @@ def create_new_profile(localfile, is_stub=False):
local_profile = hasher()
local_profile[localfile] = ProfileStorage('NEW', localfile, 'create_new_profile()')
local_profile[localfile]['flags'] = 'complain'
local_profile[localfile]['include']['abstractions/base'] = 1
local_profile[localfile]['inc_ie'].add(IncludeRule('abstractions/base', False, True))
if os.path.exists(localfile) and os.path.isfile(localfile):
interpreter_path, abstraction = get_interpreter_and_abstraction(localfile)
@ -452,7 +452,7 @@ def create_new_profile(localfile, is_stub=False):
local_profile[localfile]['file'].add(FileRule(interpreter_path, None, 'ix', FileRule.ALL, owner=False))
if abstraction:
local_profile[localfile]['include'][abstraction] = True
local_profile[localfile]['inc_ie'].add(IncludeRule(abstraction, False, True))
handle_binfmt(local_profile[localfile], interpreter_path)
else:
@ -570,7 +570,7 @@ def autodep(bin_name, pname=''):
if os.path.isfile(profile_dir + '/tunables/global'):
if not filelist.get(file, False):
filelist[file] = hasher()
filelist[file]['include']['tunables/global'] = True
active_profiles.add_inc_ie(file, IncludeRule('tunables/global', False, True))
write_profile_ui_feedback(pname)
def get_profile_flags(filename, program):
@ -945,7 +945,7 @@ def ask_exec(hashlog):
aa[profile][hat]['file'].add(FileRule(interpreter_path, None, 'ix', FileRule.ALL, owner=False))
if abstraction:
aa[profile][hat]['include'][abstraction] = True
aa[profile][hat]['inc_ie'].add(IncludeRule(abstraction, False, True))
handle_binfmt(aa[profile][hat], interpreter_path)
@ -1084,42 +1084,6 @@ def ask_the_questions(log_dict):
aa[profile][hat] = ProfileStorage(profile, hat, 'mergeprof ask_the_questions() - missing hat')
aa[profile][hat]['profile'] = False
#Add the includes from the other profile to the user profile
done = False
options = []
for inc in log_dict[aamode][profile][hat]['include'].keys():
if not inc in aa[profile][hat]['include'].keys():
if inc.startswith('/'):
options.append('#include "%s"' %inc)
else:
options.append('#include <%s>' %inc)
default_option = 1
q = aaui.PromptQuestion()
q.options = options
q.selected = default_option - 1
q.headers = [_('File includes'), _('Select the ones you wish to add')]
q.functions = ['CMD_ALLOW', 'CMD_IGNORE_ENTRY', 'CMD_ABORT', 'CMD_FINISHED']
q.default = 'CMD_ALLOW'
while not done and options:
ans, selected = q.promptUser()
if ans == 'CMD_IGNORE_ENTRY':
done = True
elif ans == 'CMD_ALLOW':
selection = options[selected]
inc = re_match_include(selection)
deleted = delete_all_duplicates(aa[profile][hat], inc, ruletypes)
aa[profile][hat]['include'][inc] = True
options.pop(selected)
aaui.UI_Info(_('Adding %s to the file.') % selection)
if deleted:
aaui.UI_Info(_('Deleted %s previous matching profile entries.') % deleted)
elif ans == 'CMD_FINISHED':
return
# check for and ask about conflicting exec modes
ask_conflict_mode(profile, hat, aa[profile][hat], log_dict[aamode][profile][hat])
@ -1222,7 +1186,7 @@ def ask_rule_questions(prof_events, profile_name, the_profile, r_types):
if inc:
deleted = delete_all_duplicates(the_profile, inc, r_types)
the_profile['include'][inc] = True
the_profile['inc_ie'].add(IncludeRule.parse(selection))
aaui.UI_Info(_('Adding %s to profile.') % selection)
if deleted:
@ -1426,8 +1390,14 @@ def match_includes(profile, rule_type, rule_obj):
newincludes = []
for incname in include.keys():
# TODO: improve/fix logic to honor magic vs. quoted include paths
if incname.startswith('/'):
is_magic = False
else:
is_magic = True
# never propose includes that are already in the profile (shouldn't happen because of is_known_rule())
if profile and profile['include'].get(incname, False):
if profile and profile['inc_ie'].is_covered(IncludeRule(incname, False, is_magic)):
continue
# XXX type check should go away once we init all profiles correctly
@ -1581,6 +1551,11 @@ def collapse_log(hashlog, ignore_null_profiles=True):
profile, hat = split_name(hashlog[aamode][full_profile]['final_name']) # XXX limited to two levels to avoid an Exception on nested child profiles or nested null-*
# TODO: support nested child profiles
# used to avoid to accidently initialize aa[profile][hat] or calling is_known_rule() on events for a non-existing profile
hat_exists = False
if aa.get(profile) and aa[profile].get(hat):
hat_exists = True
if True:
if not log_dict[aamode][profile].get(hat):
# with execs in ix mode, we already have ProfileStorage initialized and should keep the content it already has
@ -1596,13 +1571,13 @@ def collapse_log(hashlog, ignore_null_profiles=True):
file_event = FileRule(path, mode, None, FileRule.ALL, owner=owner, log_event=True)
if not is_known_rule(aa[profile][hat], 'file', file_event):
if not hat_exists or not is_known_rule(aa[profile][hat], 'file', file_event):
log_dict[aamode][profile][hat]['file'].add(file_event)
# TODO: check for existing rules with this path, and merge them into one rule
for cap in hashlog[aamode][full_profile]['capability'].keys():
cap_event = CapabilityRule(cap, log_event=True)
if not is_known_rule(aa[profile][hat], 'capability', cap_event):
if not hat_exists or not is_known_rule(aa[profile][hat], 'capability', cap_event):
log_dict[aamode][profile][hat]['capability'].add(cap_event)
dbus = hashlog[aamode][full_profile]['dbus']
@ -1625,20 +1600,21 @@ def collapse_log(hashlog, ignore_null_profiles=True):
else:
raise AppArmorBug('unexpected dbus access: %s')
log_dict[aamode][profile][hat]['dbus'].add(dbus_event)
if not hat_exists or not is_known_rule(aa[profile][hat], 'dbus', dbus_event):
log_dict[aamode][profile][hat]['dbus'].add(dbus_event)
nd = hashlog[aamode][full_profile]['network']
for family in nd.keys():
for sock_type in nd[family].keys():
net_event = NetworkRule(family, sock_type, log_event=True)
if not is_known_rule(aa[profile][hat], 'network', net_event):
if not hat_exists or not is_known_rule(aa[profile][hat], 'network', net_event):
log_dict[aamode][profile][hat]['network'].add(net_event)
ptrace = hashlog[aamode][full_profile]['ptrace']
for peer in ptrace.keys():
for access in ptrace[peer].keys():
ptrace_event = PtraceRule(access, peer, log_event=True)
if not is_known_rule(aa[profile][hat], 'ptrace', ptrace_event):
if not hat_exists or not is_known_rule(aa[profile][hat], 'ptrace', ptrace_event):
log_dict[aamode][profile][hat]['ptrace'].add(ptrace_event)
sig = hashlog[aamode][full_profile]['signal']
@ -1646,27 +1622,11 @@ def collapse_log(hashlog, ignore_null_profiles=True):
for access in sig[peer].keys():
for signal in sig[peer][access].keys():
signal_event = SignalRule(access, signal, peer, log_event=True)
if not is_known_rule(aa[profile][hat], 'signal', signal_event):
if not hat_exists or not is_known_rule(aa[profile][hat], 'signal', signal_event):
log_dict[aamode][profile][hat]['signal'].add(signal_event)
return log_dict
def is_skippable_file(path):
"""Returns True if filename matches something to be skipped (rpm or dpkg backup files, hidden files etc.)
The list of skippable files needs to be synced with apparmor initscript and libapparmor _aa_is_blacklisted()
path: filename (with or without directory)"""
basename = os.path.basename(path)
if not basename or basename[0] == '.' or basename == 'README':
return True
skippable_suffix = ('.dpkg-new', '.dpkg-old', '.dpkg-dist', '.dpkg-bak', '.dpkg-remove', '.pacsave', '.pacnew', '.rpmnew', '.rpmsave', '.orig', '.rej', '~')
if basename.endswith(skippable_suffix):
return True
return False
def is_skippable_dir(path):
if re.search('^(.*/)?(disable|cache|cache\.d|force-complain|lxc|\.git)/?$', path):
return True
@ -1949,31 +1909,19 @@ def parse_profile_data(data, file, do_include):
else:
active_profiles.add_abi(file, AbiRule.parse(line))
elif re_match_include(line):
# Include files
include_name = re_match_include(line)
if profile:
profile_data[profile][hat]['include'][include_name] = True
else:
if not filelist.get(file):
filelist[file] = hasher()
filelist[file]['include'][include_name] = True
# If include is a directory
if os.path.isdir(get_include_path(include_name)):
for file_name in include_dir_filelist(profile_dir, include_name):
if not include.get(file_name, False):
load_include(file_name)
else:
if not include.get(include_name, False):
load_include(include_name)
# IncludeRule can handle 'include' and 'include if exists' - place it after the "old" 'include' handling so that it only catches 'include if exists' for now
elif IncludeRule.match(line):
rule_obj = IncludeRule.parse(line)
if profile:
profile_data[profile][hat]['inc_ie'].add(IncludeRule.parse(line))
profile_data[profile][hat]['inc_ie'].add(rule_obj)
else:
active_profiles.add_inc_ie(file, IncludeRule.parse(line))
active_profiles.add_inc_ie(file, rule_obj)
for incname in rule_obj.get_full_paths(profile_dir):
# include[] keys can be a) 'abstractions/foo' and b) '/full/path'
if incname.startswith(profile_dir):
incname = incname.replace('%s/' % profile_dir, '')
load_include(incname)
elif NetworkRule.match(line):
if not profile:
@ -2295,12 +2243,10 @@ def serialize_profile(profile_data, name, options):
else:
prof_filename = get_profile_filename_from_profile_name(name, True)
if filelist.get(prof_filename, False):
data += active_profiles.get_clean_first(prof_filename, 0)
data += write_list_vars(filelist[prof_filename], 0)
data += write_includes(filelist[prof_filename], 0)
data += active_profiles.get_clean_first(prof_filename, 0)
data += write_list_vars(filelist[prof_filename], 0)
data += active_profiles.get_clean(prof_filename, 0)
data += active_profiles.get_clean(prof_filename, 0)
#Here should be all the profiles from the files added write after global/common stuff
for prof in sorted(active_profiles.profiles_in_file(prof_filename)):
@ -2357,28 +2303,45 @@ def write_profile(profile, is_attachment=False):
original_aa[profile] = deepcopy(aa[profile])
def include_list_recursive(profile):
''' get a list of all includes in a profile and its included files '''
includelist = profile['inc_ie'].get_all_full_paths(profile_dir)
full_list = []
while includelist:
incname = includelist.pop(0)
if incname in full_list:
continue
full_list.append(incname)
# include[] keys can be a) 'abstractions/foo' and b) '/full/path'
if incname.startswith(profile_dir):
incname = incname.replace('%s/' % profile_dir, '')
for childinc in include[incname][incname]['inc_ie'].rules:
for childinc_file in childinc.get_full_paths(profile_dir):
if childinc_file not in full_list:
includelist += [childinc_file]
return full_list
def is_known_rule(profile, rule_type, rule_obj):
# XXX get rid of get() checks after we have a proper function to initialize a profile
if profile.get(rule_type, False):
if profile[rule_type].is_covered(rule_obj, False):
return True
includelist = list(profile['include'].keys())
checked = []
includelist = include_list_recursive(profile)
while includelist:
incname = includelist.pop(0)
checked.append(incname)
for incname in includelist:
# include[] keys can be a) 'abstractions/foo' and b) '/full/path'
if incname.startswith(profile_dir):
incname = incname.replace('%s/' % profile_dir, '')
if os.path.isdir(get_include_path(incname)):
includelist += include_dir_filelist(profile_dir, incname)
else:
if include[incname][incname][rule_type].is_covered(rule_obj, False):
return True
for childinc in include[incname][incname]['include'].keys():
if childinc not in checked:
includelist += [childinc]
if include[incname][incname][rule_type].is_covered(rule_obj, False):
return True
return False
@ -2387,35 +2350,25 @@ def get_file_perms(profile, path, audit, deny):
perms = profile['file'].get_perms_for_path(path, audit, deny)
includelist = list(profile['include'].keys())
checked = []
includelist = include_list_recursive(profile)
while includelist:
incname = includelist.pop(0)
for incname in includelist:
# include[] keys can be a) 'abstractions/foo' and b) '/full/path'
if incname.startswith(profile_dir):
incname = incname.replace('%s/' % profile_dir, '')
if incname in checked:
continue
checked.append(incname)
incperms = include[incname][incname]['file'].get_perms_for_path(path, audit, deny)
if os.path.isdir(get_include_path(incname)):
includelist += include_dir_filelist(profile_dir, incname)
else:
incperms = include[incname][incname]['file'].get_perms_for_path(path, audit, deny)
for allow_or_deny in ['allow', 'deny']:
for owner_or_all in ['all', 'owner']:
for perm in incperms[allow_or_deny][owner_or_all]:
perms[allow_or_deny][owner_or_all].add(perm)
for allow_or_deny in ['allow', 'deny']:
for owner_or_all in ['all', 'owner']:
for perm in incperms[allow_or_deny][owner_or_all]:
perms[allow_or_deny][owner_or_all].add(perm)
if 'a' in perms[allow_or_deny][owner_or_all] and 'w' in perms[allow_or_deny][owner_or_all]:
perms[allow_or_deny][owner_or_all].remove('a') # a is a subset of w, so remove it
if 'a' in perms[allow_or_deny][owner_or_all] and 'w' in perms[allow_or_deny][owner_or_all]:
perms[allow_or_deny][owner_or_all].remove('a') # a is a subset of w, so remove it
for incpath in incperms['paths']:
perms['paths'].add(incpath)
for childinc in include[incname][incname]['include'].keys():
if childinc not in checked:
includelist += [childinc]
for incpath in incperms['paths']:
perms['paths'].add(incpath)
return perms

View file

@ -32,12 +32,8 @@ class CleanProf(object):
def compare_profiles(self):
deleted = 0
other_file_includes = list(self.other.filelist[self.other.filename]['include'].keys())
#Remove the duplicate file-level includes from other
for rule in self.profile.filelist[self.profile.filename]['include'].keys():
if rule in other_file_includes:
self.other.filelist[self.other.filename]['include'].pop(rule)
deleted += self.other.active_profiles.delete_preamble_duplicates(self.other.filename)
for profile in self.profile.aa.keys():
deleted += self.remove_duplicate_rules(profile)
@ -46,23 +42,22 @@ class CleanProf(object):
def remove_duplicate_rules(self, program):
#Process the profile of the program
#Process every hat in the profile individually
file_includes = list(self.profile.filelist[self.profile.filename]['include'].keys())
deleted = 0
for hat in sorted(self.profile.aa[program].keys()):
#The combined list of includes from profile and the file
includes = list(self.profile.aa[program][hat]['include'].keys()) + file_includes
#If different files remove duplicate includes in the other profile
if not self.same_file:
if self.other.aa[program].get(hat): # carefully avoid to accidently initialize self.other.aa[program][hat]
for inc in includes:
if self.other.aa[program][hat]['include'].get(inc, False):
self.other.aa[program][hat]['include'].pop(inc)
deleted += 1
deleted = 0
# remove duplicate rules from the preamble
deleted += self.profile.active_profiles.delete_preamble_duplicates(self.profile.filename)
#Process every hat in the profile individually
for hat in sorted(self.profile.aa[program].keys()):
includes = self.profile.aa[program][hat]['inc_ie'].get_all_full_paths(apparmor.profile_dir)
#Clean up superfluous rules from includes in the other profile
for inc in includes:
# apparmor.include[] keys can be a) 'abstractions/foo' and b) '/full/path'
if inc.startswith(apparmor.profile_dir):
inc = inc.replace('%s/' % apparmor.profile_dir, '')
if not self.profile.include.get(inc, {}).get(inc, False):
apparmor.load_include(inc)
if self.other.aa[program].get(hat): # carefully avoid to accidently initialize self.other.aa[program][hat]

View file

@ -172,6 +172,22 @@ def get_directory_contents(path):
files.sort()
return files
def is_skippable_file(path):
"""Returns True if filename matches something to be skipped (rpm or dpkg backup files, hidden files etc.)
The list of skippable files needs to be synced with apparmor initscript and libapparmor _aa_is_blacklisted()
path: filename (with or without directory)"""
basename = os.path.basename(path)
if not basename or basename[0] == '.' or basename == 'README':
return True
skippable_suffix = ('.dpkg-new', '.dpkg-old', '.dpkg-dist', '.dpkg-bak', '.dpkg-remove', '.pacsave', '.pacnew', '.rpmnew', '.rpmsave', '.orig', '.rej', '~')
if basename.endswith(skippable_suffix):
return True
return False
def open_file_read(path, encoding='UTF-8'):
'''Open specified file read-only'''
return open_file_anymode('r', path, encoding)

View file

@ -47,7 +47,6 @@ class ProfileList:
self.files[filename] = {
'abi': AbiRuleset(),
'alias': {},
'include': {}, # not filled, but avoids errors in is_known_rule() and some other functions when aa-mergeprof asks about the preamble
'inc_ie': IncludeRuleset(),
'profiles': [],
}
@ -116,6 +115,19 @@ class ProfileList:
self.files[filename]['inc_ie'].add(inc_rule)
def delete_preamble_duplicates(self, filename):
''' Delete duplicates in the preamble of the given profile file '''
if not self.files.get(filename):
raise AppArmorBug('%s not listed in ProfileList files' % filename)
deleted = 0
for r_type in ['abi', 'inc_ie']: # TODO: don't hardcode
deleted += self.files[filename][r_type].delete_duplicates(None) # None means not to check includes -- TODO check if this makes sense for all preamble rule types
return deleted
def get_raw(self, filename, depth=0):
''' Get the preamble for the given profile filename (in original formatting) '''
if not self.files.get(filename):

View file

@ -61,7 +61,6 @@ class ProfileStorage:
for rule in ruletypes:
data[rule] = ruletypes[rule]['ruleset']()
data['include'] = dict()
data['lvar'] = dict()
data['filename'] = ''
@ -141,7 +140,6 @@ class ProfileStorage:
# "old" write functions for rule types not implemented as *Rule class yet
write_functions = {
'include': write_includes,
'lvar': write_list_vars,
'mount': write_mount,
'pivot_root': write_pivot_root,
@ -151,7 +149,6 @@ class ProfileStorage:
write_order = [
'abi',
'lvar',
'include',
'inc_ie',
'rlimit',
'capability',
@ -218,23 +215,6 @@ def write_list_vars(ref, depth):
return data
def write_includes(prof_data, depth):
pre = ' ' * depth
data = []
for key in sorted(prof_data['include'].keys()):
if key.startswith('/'):
qkey = '"%s"' % key
else:
qkey = '<%s>' % quote_if_needed(key)
data.append('%s#include %s' % (pre, qkey))
if data:
data.append('')
return data
def var_transform(ref):
data = []
for value in sorted(ref):

View file

@ -13,8 +13,9 @@
# ----------------------------------------------------------------------
from apparmor.regex import RE_INCLUDE, re_match_include_parse
from apparmor.common import AppArmorBug, AppArmorException, type_is_str
from apparmor.common import AppArmorBug, AppArmorException, is_skippable_file, type_is_str
from apparmor.rule import BaseRule, BaseRuleset, parse_comment
import os
# setup module translations
from apparmor.translations import init_translation
@ -124,7 +125,43 @@ class IncludeRule(BaseRule):
_('Include'), self.get_clean(),
]
def get_full_paths(self, profile_dir):
''' get list of full paths of an include (can contain multiple paths if self.path is a directory) '''
# currently this section is based on aa.py get_include_path() (with variable names changed)
# TODO: improve/fix logic to honor magic vs. quoted include paths
if self.path.startswith('/'):
full_path = self.path
else:
full_path = os.path.join(profile_dir, self.path)
files = []
if os.path.isdir(full_path):
for path in os.listdir(full_path):
if is_skippable_file(path):
continue
file_name = os.path.join(full_path, path)
if os.path.isfile(file_name): # only add files, but not subdirectories etc.
files.append(file_name)
elif os.path.exists(full_path):
files.append(full_path)
elif self.ifexists == False:
files.append(full_path) # add full_path even if it doesn't exist on disk. Might cause a 'file not found' error later.
return files
class IncludeRuleset(BaseRuleset):
'''Class to handle and store a collection of include rules'''
pass
def get_all_full_paths(self, profile_dir):
''' get full path of all includes '''
paths = []
for rule_obj in self.rules:
paths += rule_obj.get_full_paths(profile_dir)
return paths

View file

@ -3,6 +3,8 @@
#include if exists <tunables/nothing>
#include if exists <tunables/global>
include if exists <tunables/global>
alias /foo -> /bar ,
@ -17,6 +19,8 @@
#include <abstractions/base>
#include if exists <foo>
#include if exists <abstractions/base>
include <abstractions/base>
capability sys_admin,
audit capability,

View file

@ -5,8 +5,7 @@ alias /foo -> /bar,
@{asdf} = "" foo
@{xy} = x y
#include <tunables/global>
include <tunables/global>
include if exists <tunables/nothing>
# A simple test comment which will persist
@ -15,8 +14,7 @@ include if exists <tunables/nothing>
/usr/bin/a/simple/cleanprof/test/profile {
abi "abi/4.20",
#include <abstractions/base>
include <abstractions/base>
include if exists <foo>
set rlimit nofile <= 256,

View file

@ -25,6 +25,7 @@ from apparmor.aa import (check_for_apparmor, get_output, get_reqs, get_interpret
from apparmor.aare import AARE
from apparmor.common import AppArmorException, AppArmorBug
from apparmor.rule.file import FileRule
from apparmor.rule.include import IncludeRule
class AaTestWithTempdir(AATest):
def AASetup(self):
@ -149,9 +150,9 @@ class AaTest_create_new_profile(AATest):
self.assertEqual(set(profile[program][program]['file'].get_clean()), {'%s mr,' % program, ''})
if exp_abstraction:
self.assertEqual(set(profile[program][program]['include'].keys()), {exp_abstraction, 'abstractions/base'})
self.assertEqual(profile[program][program]['inc_ie'].get_clean(), ['include <abstractions/base>', 'include <%s>' % exp_abstraction, ''])
else:
self.assertEqual(set(profile[program][program]['include'].keys()), {'abstractions/base'})
self.assertEqual(profile[program][program]['inc_ie'].get_clean(), ['include <abstractions/base>', ''])
class AaTest_get_interpreter_and_abstraction(AATest):
tests = [
@ -829,9 +830,9 @@ class AaTest_get_file_perms_2(AATest):
apparmor.aa.load_include('abstractions/aspell')
profile = apparmor.aa.ProfileStorage('/test', '/test', 'test-aa.py')
profile['include']['abstractions/base'] = True
profile['include']['abstractions/bash'] = True
profile['include']['abstractions/enchant'] = True # includes abstractions/aspell
profile['inc_ie'].add(IncludeRule.parse('include <abstractions/base>'))
profile['inc_ie'].add(IncludeRule.parse('include <abstractions/bash>'))
profile['inc_ie'].add(IncludeRule.parse('include <abstractions/enchant>'))
profile['file'].add(FileRule.parse('owner /usr/share/common-licenses/** w,'))
profile['file'].add(FileRule.parse('owner /usr/share/common-licenses/what/ever a,')) # covered by the above 'w' rule, so 'a' should be ignored
@ -871,9 +872,10 @@ class AaTest_propose_file_rules(AATest):
apparmor.aa.user_globs['/no/thi*ng'] = AARE('/no/thi*ng', True)
profile = apparmor.aa.ProfileStorage('/test', '/test', 'test-aa.py')
profile['include']['abstractions/base'] = True
profile['include']['abstractions/bash'] = True
profile['include']['abstractions/enchant'] = True # includes abstractions/aspell
profile['inc_ie'].add(IncludeRule.parse('include <abstractions/base>'))
profile['inc_ie'].add(IncludeRule.parse('include <abstractions/bash>'))
profile['inc_ie'].add(IncludeRule.parse('include <abstractions/enchant>'))
profile['file'].add(FileRule.parse('owner /usr/share/common-licenses/** w,'))
profile['file'].add(FileRule.parse('/dev/null rwk,'))
@ -915,10 +917,10 @@ class AaTest_propose_file_rules_with_absolute_includes(AATest):
apparmor.aa.load_include(abs_include3)
profile = apparmor.aa.ProfileStorage('/test', '/test', 'test-aa.py')
profile['include']['abstractions/base'] = False
profile['include'][abs_include1] = False
profile['include'][abs_include2] = False
profile['include'][abs_include3] = False
profile['inc_ie'].add(IncludeRule.parse('include <abstractions/base>'))
profile['inc_ie'].add(IncludeRule.parse('include "%s"' % abs_include1))
profile['inc_ie'].add(IncludeRule.parse('include "%s"' % abs_include2))
profile['inc_ie'].add(IncludeRule.parse('include "%s"' % abs_include3))
rule_obj = FileRule(params[0], params[1], None, FileRule.ALL, owner=False, log_event=True)
proposals = propose_file_rules(profile, rule_obj)

View file

@ -15,7 +15,10 @@
import unittest
from collections import namedtuple
from common_test import AATest, setup_all_loops
from common_test import AATest, setup_all_loops, write_file
import os
import shutil
from apparmor.rule.include import IncludeRule, IncludeRuleset
#from apparmor.rule import BaseRule
@ -336,9 +339,57 @@ class IncludeLogprofHeaderTest(AATest):
obj = IncludeRule._parse(params)
self.assertEqual(obj.logprof_header(), expected)
class IncludeFullPathsTest(AATest):
def AASetup(self):
self.createTmpdir()
#copy the local profiles to the test directory
self.profile_dir = '%s/profiles' % self.tmpdir
shutil.copytree('../../profiles/apparmor.d/', self.profile_dir, symlinks=True)
inc_dir = os.path.join(self.profile_dir, 'abstractions/inc.d')
os.mkdir(inc_dir, 0o755)
write_file(inc_dir, 'incfoo', '/incfoo r,')
write_file(inc_dir, 'incbar', '/incbar r,')
write_file(inc_dir, 'README', '# README') # gets skipped
sub_dir = os.path.join(self.profile_dir, 'abstractions/inc.d/subdir') # gets skipped
os.mkdir(sub_dir, 0o755)
empty_dir = os.path.join(self.profile_dir, 'abstractions/empty.d')
os.mkdir(empty_dir, 0o755)
tests = [
# @@ will be replaced with self.profile_dir
('include <abstractions/base>', ['@@/abstractions/base'] ),
# ('include "foo"', ['@@/foo'] ), # TODO: adjust logic to honor quoted vs. magic paths (and allow quoted relative paths in re_match_include_parse())
('include "/foo/bar"', ['/foo/bar'] ),
('include <abstractions/inc.d>', ['@@/abstractions/inc.d/incfoo', '@@/abstractions/inc.d/incbar'] ),
('include <abstractions/empty.d>', [] ),
('include <abstractions/not_found>', ['@@/abstractions/not_found'] ),
('include if exists <abstractions/not_found>', [] ),
]
def _run_test(self, params, expected):
exp2 = []
for path in expected:
exp2.append(path.replace('@@', self.profile_dir))
obj = IncludeRule._parse(params)
self.assertEqual(obj.get_full_paths(self.profile_dir), exp2)
## --- tests for IncludeRuleset --- #
class IncludeRulesTest(AATest):
def AASetup(self):
self.createTmpdir()
#copy the local profiles to the test directory
self.profile_dir = '%s/profiles' % self.tmpdir
shutil.copytree('../../profiles/apparmor.d/', self.profile_dir, symlinks=True)
write_file(self.profile_dir, 'baz', '/baz r,')
def test_empty_ruleset(self):
ruleset = IncludeRuleset()
ruleset_2 = IncludeRuleset()
@ -347,6 +398,7 @@ class IncludeRulesTest(AATest):
self.assertEqual([], ruleset_2.get_raw(2))
self.assertEqual([], ruleset_2.get_clean(2))
self.assertEqual([], ruleset_2.get_clean_unsorted(2))
self.assertEqual([], ruleset.get_all_full_paths(self.profile_dir))
def test_ruleset_1(self):
ruleset = IncludeRuleset()
@ -373,12 +425,18 @@ class IncludeRulesTest(AATest):
'',
]
expected_fullpaths = [
os.path.join(self.profile_dir, 'foo'),
'/bar'
]
for rule in rules:
ruleset.add(IncludeRule.parse(rule))
self.assertEqual(expected_raw, ruleset.get_raw())
self.assertEqual(expected_clean, ruleset.get_clean())
self.assertEqual(expected_clean_unsorted, ruleset.get_clean_unsorted())
self.assertEqual(expected_fullpaths, ruleset.get_all_full_paths(self.profile_dir))
def test_ruleset_2(self):
ruleset = IncludeRuleset()
@ -413,12 +471,19 @@ class IncludeRulesTest(AATest):
'',
]
expected_fullpaths = [
os.path.join(self.profile_dir, 'baz'),
os.path.join(self.profile_dir, 'foo'),
'/bar',
]
for rule in rules:
ruleset.add(IncludeRule.parse(rule))
self.assertEqual(expected_raw, ruleset.get_raw())
self.assertEqual(expected_clean, ruleset.get_clean())
self.assertEqual(expected_clean_unsorted, ruleset.get_clean_unsorted())
self.assertEqual(expected_fullpaths, ruleset.get_all_full_paths(self.profile_dir))
class IncludeGlobTestAATest(AATest):
def setUp(self):

View file

@ -149,6 +149,21 @@ class TestAdd_inc_ie(AATest):
self.pl.add_inc_ie('/etc/apparmor.d/bin.foo', 'tunables/global') # str insteadd of IncludeRule
self.assertEqual(list(self.pl.files.keys()), [])
def test_dedup_inc_ie_1(self):
self.pl.add_inc_ie('/etc/apparmor.d/bin.foo', IncludeRule.parse('include <tunables/global>'))
self.pl.add_inc_ie('/etc/apparmor.d/bin.foo', IncludeRule.parse('#include if exists <tunables/global> # comment'))
self.pl.add_inc_ie('/etc/apparmor.d/bin.foo', IncludeRule.parse(' #include <tunables/global> '))
deleted = self.pl.delete_preamble_duplicates('/etc/apparmor.d/bin.foo')
self.assertEqual(deleted, 2)
self.assertEqual(list(self.pl.files.keys()), ['/etc/apparmor.d/bin.foo'])
self.assertEqual(self.pl.get_clean('/etc/apparmor.d/bin.foo'), ['include <tunables/global>', ''])
self.assertEqual(self.pl.get_raw('/etc/apparmor.d/bin.foo'), ['include <tunables/global>', ''])
def test_dedup_error_1(self):
with self.assertRaises(AppArmorBug):
self.pl.delete_preamble_duplicates('/file/not/found')
self.assertEqual(list(self.pl.files.keys()), [])
class TestAdd_abi(AATest):
def AASetup(self):
self.pl = ProfileList()
@ -173,6 +188,15 @@ class TestAdd_abi(AATest):
self.pl.add_abi('/etc/apparmor.d/bin.foo', 'abi/4.19') # str insteadd of AbiRule
self.assertEqual(list(self.pl.files.keys()), [])
def test_dedup_abi_1(self):
self.pl.add_abi('/etc/apparmor.d/bin.foo', AbiRule.parse('abi <abi/4.19>,'))
self.pl.add_abi('/etc/apparmor.d/bin.foo', AbiRule.parse(' abi <abi/4.19> , # comment'))
self.assertEqual(list(self.pl.files.keys()), ['/etc/apparmor.d/bin.foo'])
deleted = self.pl.delete_preamble_duplicates('/etc/apparmor.d/bin.foo')
self.assertEqual(deleted, 1)
self.assertEqual(self.pl.get_clean_first('/etc/apparmor.d/bin.foo'), ['abi <abi/4.19>,', '']) # TODO switch to get_clean() once merged
self.assertEqual(self.pl.get_raw('/etc/apparmor.d/bin.foo'), ['abi <abi/4.19>,', ''])
class TestAdd_alias(AATest):
def AASetup(self):
self.pl = ProfileList()
@ -210,6 +234,9 @@ class TestAdd_alias(AATest):
self.pl.add_alias('/etc/apparmor.d/bin.foo', '/foo', None) # target None insteadd of str
self.assertEqual(list(self.pl.files.keys()), [])
# def test_dedup_alias_1(self):
# TODO: implement after fixing alias handling (when a profile has two aliases with the same path on the left side)
class TestGet(AATest):
def AASetup(self):
self.pl = ProfileList()