diff --git a/libraries/libapparmor/testsuite/test_multi/testcase_remount_01.err b/libraries/libapparmor/testsuite/test_multi/testcase_remount_01.err new file mode 100644 index 000000000..e69de29bb diff --git a/libraries/libapparmor/testsuite/test_multi/testcase_remount_01.in b/libraries/libapparmor/testsuite/test_multi/testcase_remount_01.in new file mode 100644 index 000000000..17dce4df8 --- /dev/null +++ b/libraries/libapparmor/testsuite/test_multi/testcase_remount_01.in @@ -0,0 +1 @@ +type=AVC msg=audit(1709108389.303:12383): apparmor="DENIED" operation="mount" class="mount" info="failed mntpnt match" error=-13 profile="/home/user/test/testmount" name="/tmp/foo/" pid=14155 comm="testmount" flags="ro, remount" diff --git a/libraries/libapparmor/testsuite/test_multi/testcase_remount_01.out b/libraries/libapparmor/testsuite/test_multi/testcase_remount_01.out new file mode 100644 index 000000000..74773be57 --- /dev/null +++ b/libraries/libapparmor/testsuite/test_multi/testcase_remount_01.out @@ -0,0 +1,15 @@ +START +File: testcase_remount_01.in +Event type: AA_RECORD_DENIED +Audit ID: 1709108389.303:12383 +Operation: mount +Profile: /home/user/test/testmount +Name: /tmp/foo/ +Command: testmount +Info: failed mntpnt match +ErrorCode: 13 +PID: 14155 +Flags: ro, remount +Class: mount +Epoch: 1709108389 +Audit subid: 12383 diff --git a/libraries/libapparmor/testsuite/test_multi/testcase_remount_01.profile b/libraries/libapparmor/testsuite/test_multi/testcase_remount_01.profile new file mode 100644 index 000000000..56495d0f5 --- /dev/null +++ b/libraries/libapparmor/testsuite/test_multi/testcase_remount_01.profile @@ -0,0 +1,4 @@ +/home/user/test/testmount { + mount options=(remount, ro) -> /tmp/foo/, + +} diff --git a/libraries/libapparmor/testsuite/test_multi/testcase_umount_01.err b/libraries/libapparmor/testsuite/test_multi/testcase_umount_01.err new file mode 100644 index 000000000..e69de29bb diff --git a/libraries/libapparmor/testsuite/test_multi/testcase_umount_01.in b/libraries/libapparmor/testsuite/test_multi/testcase_umount_01.in new file mode 100644 index 000000000..36eea0cf3 --- /dev/null +++ b/libraries/libapparmor/testsuite/test_multi/testcase_umount_01.in @@ -0,0 +1 @@ +type=AVC msg=audit(1709025786.045:43147): apparmor="DENIED" operation="umount" class="mount" profile="/home/user/test/testmount" name="/mnt/a/" pid=26697 comm="testmount" diff --git a/libraries/libapparmor/testsuite/test_multi/testcase_umount_01.out b/libraries/libapparmor/testsuite/test_multi/testcase_umount_01.out new file mode 100644 index 000000000..dd6669afd --- /dev/null +++ b/libraries/libapparmor/testsuite/test_multi/testcase_umount_01.out @@ -0,0 +1,12 @@ +START +File: testcase_umount_01.in +Event type: AA_RECORD_DENIED +Audit ID: 1709025786.045:43147 +Operation: umount +Profile: /home/user/test/testmount +Name: /mnt/a/ +Command: testmount +PID: 26697 +Class: mount +Epoch: 1709025786 +Audit subid: 43147 diff --git a/libraries/libapparmor/testsuite/test_multi/testcase_umount_01.profile b/libraries/libapparmor/testsuite/test_multi/testcase_umount_01.profile new file mode 100644 index 000000000..39febcc24 --- /dev/null +++ b/libraries/libapparmor/testsuite/test_multi/testcase_umount_01.profile @@ -0,0 +1,4 @@ +/home/user/test/testmount { + umount /mnt/a/, + +} diff --git a/utils/apparmor/aa.py b/utils/apparmor/aa.py index d726f7a2b..045e7117e 100644 --- a/utils/apparmor/aa.py +++ b/utils/apparmor/aa.py @@ -53,6 +53,7 @@ from apparmor.rule.signal import SignalRule from apparmor.rule.userns import UserNamespaceRule from apparmor.rule.mqueue import MessageQueueRule from apparmor.rule.io_uring import IOUringRule +from apparmor.rule.mount import MountRule from apparmor.translations import init_translation _ = init_translation() @@ -1757,6 +1758,19 @@ def collapse_log(hashlog, ignore_null_profiles=True): if not hat_exists or not is_known_rule(aa[profile][hat], 'io_uring', io_uring_event): log_dict[aamode][final_name]['io_uring'].add(io_uring_event) + mount = hashlog[aamode][full_profile]['mount'] + for operation, operation_val in mount.items(): + for options, options_val in operation_val.items(): + for fstype, fstype_value in options_val.items(): + for dest, dest_value in fstype_value.items(): + for source, source_value in dest_value.items(): + _options = (options[0], options[1].split(', ')) if options is not None else MountRule.ALL + _fstype = (fstype[0], fstype[1].split(', ')) if fstype is not None else MountRule.ALL + _source = source if source is not None else MountRule.ALL + _dest = dest if dest is not None else MountRule.ALL + mount_event = MountRule(operation=operation, fstype=_fstype, options=_options, source=_source, dest=_dest) + if not hat_exists or not is_known_rule(aa[profile][hat], 'mount', mount_event): + log_dict[aamode][final_name]['mount'].add(mount_event) return log_dict @@ -2136,6 +2150,7 @@ def match_line_against_rule_classes(line, profile, file, lineno, in_preamble): 'userns', 'mqueue', 'io_uring', + 'mount', ): if rule_name in ruletypes: diff --git a/utils/apparmor/logparser.py b/utils/apparmor/logparser.py index b9de2fbab..fb3bb759d 100644 --- a/utils/apparmor/logparser.py +++ b/utils/apparmor/logparser.py @@ -60,6 +60,7 @@ class ReadLog: 'userns': hasher(), 'mqueue': hasher(), 'io_uring': hasher(), + 'mount': hasher(), } def prefetch_next_log_entry(self): @@ -116,6 +117,13 @@ class ReadLog: ev['peer'] = event.peer elif ev['operation'] and ev['operation'] == 'ptrace': ev['peer'] = event.peer + elif ev['operation'] and ev['operation'] == 'mount': + ev['flags'] = event.flags + ev['fs_type'] = event.fs_type + ev['src_name'] = event.src_name + elif ev['operation'] and (ev['operation'] == 'umount'): + ev['flags'] = event.flags + ev['fs_type'] = event.fs_type elif ev['operation'] and ev['operation'].startswith('dbus_'): ev['peer_profile'] = event.peer_profile ev['bus'] = event.dbus_bus @@ -204,6 +212,19 @@ class ReadLog: self.hashlog[aamode][full_profile]['io_uring'][e['denied_mask']][e['peer_profile']] = True return + elif e['class'] and e['class'] == 'mount': + if e['flags'] != None: + e['flags'] = ('=', e['flags']) + if e['fs_type'] != None: + e['fs_type'] = ('=', e['fs_type']) + + + if e['operation'] == 'mount': + self.hashlog[aamode][full_profile]['mount'][e['operation']][e['flags']][e['fs_type']][e['name']][e['src_name']] = True + else: # Umount + self.hashlog[aamode][full_profile]['mount'][e['operation']][e['flags']][e['fs_type']][e['name']][None] = True + return + elif self.op_type(e) == 'file': # Map c (create) and d (delete) to w (logging is more detailed than the profile language) dmask = e['denied_mask'] diff --git a/utils/apparmor/profile_storage.py b/utils/apparmor/profile_storage.py index 5fbe13328..165168cc1 100644 --- a/utils/apparmor/profile_storage.py +++ b/utils/apparmor/profile_storage.py @@ -31,6 +31,8 @@ from apparmor.rule.signal import SignalRule, SignalRuleset from apparmor.rule.userns import UserNamespaceRule, UserNamespaceRuleset from apparmor.rule.mqueue import MessageQueueRule, MessageQueueRuleset from apparmor.rule.io_uring import IOUringRule, IOUringRuleset +from apparmor.rule.mount import MountRule, MountRuleset + from apparmor.translations import init_translation _ = init_translation() @@ -50,6 +52,8 @@ ruletypes = { 'userns': {'rule': UserNamespaceRule, 'ruleset': UserNamespaceRuleset}, 'mqueue': {'rule': MessageQueueRule, 'ruleset': MessageQueueRuleset}, 'io_uring': {'rule': IOUringRule, 'ruleset': IOUringRuleset}, + 'mount': {'rule': MountRule, 'ruleset': MountRuleset}, + } @@ -84,9 +88,7 @@ class ProfileStorage: data['allow'] = dict() data['deny'] = dict() - # mount, pivot_root, unix have a .get() fallback to list() - initialize them nevertheless - data['allow']['mount'] = [] - data['deny']['mount'] = [] + # pivot_root, unix have a .get() fallback to list() - initialize them nevertheless data['allow']['pivot_root'] = [] data['deny']['pivot_root'] = [] data['allow']['unix'] = [] @@ -181,7 +183,6 @@ class ProfileStorage: # "old" write functions for rule types not implemented as *Rule class yet write_functions = { - 'mount': write_mount, 'pivot_root': write_pivot_root, 'unix': write_unix, } @@ -308,27 +309,6 @@ def var_transform(ref): data.append(quote_if_needed(value)) return ' '.join(data) - -def write_mount_rules(prof_data, depth, allow): - pre = ' ' * depth - data = [] - - # no mount rules, so return - if not prof_data[allow].get('mount', False): - return data - - for mount_rule in prof_data[allow]['mount']: - data.append('%s%s' % (pre, mount_rule.serialize())) - data.append('') - return data - - -def write_mount(prof_data, depth): - data = write_mount_rules(prof_data, depth, 'deny') - data.extend(write_mount_rules(prof_data, depth, 'allow')) - return data - - def write_pivot_root_rules(prof_data, depth, allow): pre = ' ' * depth data = [] diff --git a/utils/apparmor/regex.py b/utils/apparmor/regex.py index 01fde43fe..a784ccc91 100644 --- a/utils/apparmor/regex.py +++ b/utils/apparmor/regex.py @@ -47,7 +47,7 @@ RE_PROFILE_NETWORK = re.compile(RE_AUDIT_DENY + r'network(?P
\s+.*)?' + RE_PROFILE_CHANGE_HAT = re.compile(r'^\s*\^("??.+?"??)' + RE_COMMA_EOL) RE_PROFILE_HAT_DEF = re.compile(r'^(?P\s*)(?P\^|hat\s+)(?P"??[^)]+?"??)' + RE_FLAGS + r'\s*\{' + RE_EOL) RE_PROFILE_DBUS = re.compile(RE_AUDIT_DENY + r'(dbus\s*,|dbus(?P
\s+[^#]*)\s*,)' + RE_EOL) -RE_PROFILE_MOUNT = re.compile(RE_AUDIT_DENY + r'((mount|remount|umount|unmount)(\s+[^#]*)?\s*,)' + RE_EOL) +RE_PROFILE_MOUNT = re.compile(RE_AUDIT_DENY + r'((?Pmount|remount|umount|unmount)(?P
\s+[^#]*)?\s*,)' + RE_EOL) RE_PROFILE_SIGNAL = re.compile(RE_AUDIT_DENY + r'(signal\s*,|signal(?P
\s+[^#]*)\s*,)' + RE_EOL) RE_PROFILE_PTRACE = re.compile(RE_AUDIT_DENY + r'(ptrace\s*,|ptrace(?P
\s+[^#]*)\s*,)' + RE_EOL) RE_PROFILE_PIVOT_ROOT = re.compile(RE_AUDIT_DENY + r'(pivot_root\s*,|pivot_root\s+[^#]*\s*,)' + RE_EOL) diff --git a/utils/apparmor/rule/mount.py b/utils/apparmor/rule/mount.py new file mode 100644 index 000000000..a0dc387db --- /dev/null +++ b/utils/apparmor/rule/mount.py @@ -0,0 +1,287 @@ +# ---------------------------------------------------------------------- +# Copyright (C) 2024 Canonical, Ltd. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of version 2 of the GNU General Public +# License as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# ---------------------------------------------------------------------- +import re + +from apparmor.common import AppArmorBug, AppArmorException + +from apparmor.regex import RE_PROFILE_MOUNT, RE_PROFILE_PATH_OR_VAR, strip_parenthesis +#from apparmor.rule import AARE +from apparmor.rule import BaseRule, BaseRuleset, parse_modifiers, logprof_value_or_all, check_and_split_list + +from apparmor.translations import init_translation + +_ = init_translation() + +# TODO : +# - match correctly AARE on every field +# - Find the actual list of supported filesystems. This one comes from /proc/filesystems +# - Support path that begin by { (e.g. {,/usr}/lib/...) This syntax is not a valid AARE but is used by usr.lib.snapd.snap-confine.real in Ubuntu and will currently raise an error in genprof if these lines are not modified. +# - Apparmor remount logs are displayed as mount (with remount flag). Profiles generated with aa-genprof are therefore mount rules. It could be interesting to make them remount rules. + +valid_fs = [ + 'sysfs', 'tmpfs', 'bdevfs', 'procfs', 'cgroup', 'cgroup2', 'cpuset', 'devtmpfs', 'configfs', 'debugfs', 'tracefs', + 'securityfs', 'sockfs', 'bpf', 'npipefs', 'ramfs', 'hugetlbfs', 'devpts', 'ext3', 'ext2', 'ext4', 'squashfs', + 'vfat', 'ecryptfs', 'fuseblk', 'fuse', 'fusectl', 'efivarfs', 'mqueue', 'store', 'autofs', 'binfmt_misc', 'overlay', + 'none', 'bdev', 'proc', 'pipefs', 'pstore', 'brtfs', 'xfs', +] +flags_keywords = [ + 'ro', 'rw', 'nosuid', 'suid', 'nodev', 'dev', 'noexec', 'exec', 'sync', 'async', 'remount', 'mand', 'nomand', + 'dirsync', 'noatime', 'atime', 'nodiratime', 'diratime', 'bind', 'rbind', 'move', 'verbose', 'silent', 'loud', + 'acl', 'noacl', 'unbindable', 'runbindable', 'private', 'rprivate', 'slave', 'rslave', 'shared', 'rshared', + 'relatime', 'norelatime', 'iversion', 'noiversion', 'strictatime', 'nostrictatime', 'lazytime', 'nolazytime', + 'nouser', 'user', 'symfollow', 'nosymfollow', '([A-Za-z0-9]|AARE)', # TODO: handle AARE +] +join_valid_flags = '|'.join(flags_keywords) +join_valid_fs = '|'.join(valid_fs) + +sep = r"\s*[\s,]\s*" + +fs_type_pattern = r"\b(?Pfstype|vfstype)\b\s*(?P=|in)\s*"\ + r"(?P\(\s*(" + join_valid_fs + ")(" + sep + "(" + join_valid_fs + "))*\s*\)|"\ + r"\{\s*(" + join_valid_fs + ")(" + sep + "(" + join_valid_fs + r"))*\s*\}|(\s*" + join_valid_fs + "))"\ + + +option_pattern = r"\s*(\boption(s?)\b\s*(?P=|in)\s*"\ + r"(?P\(\s*(" + join_valid_flags + ")(" + sep + "(" + join_valid_flags + r"))*\s*\)|" \ + "(\s*" + join_valid_flags + ")"\ + "))?" +mount_condition_pattern = rf"({fs_type_pattern})?\s*({option_pattern})?" + +# Source can either be +# - A path : /foo +# - A filesystem : sysfs (sudo mount -t tmpfs tmpfs /tmp/bar) +# - Any label : mntlabel (sudo mount -t tmpfs mntlabel /tmp/bar) +source_fileglob_pattern = r"(\s*" + (RE_PROFILE_PATH_OR_VAR % "source_file")[:-1] + "|" + r"\w+" + "))" +dest_fileglob_pattern = r"(\s*" + RE_PROFILE_PATH_OR_VAR % "dest_file" + ')' + +RE_MOUNT_DETAILS = re.compile(r"^\s*" + mount_condition_pattern + rf"(\s+{source_fileglob_pattern})?" + rf"(\s+->\s+{dest_fileglob_pattern})?\s*" +"$") +RE_UMOUNT_DETAILS = re.compile(r"^\s*" + mount_condition_pattern + rf"(\s+{dest_fileglob_pattern})?\s*" +"$") + +class MountRule(BaseRule): + '''Class to handle and store a single mount rule''' + + # Nothing external should reference this class, all external users + # should reference the class field MountRule.ALL + class __MountAll(object): + pass + + ALL = __MountAll + + rule_name = 'mount' + _match_re = RE_PROFILE_MOUNT + + def __init__(self, operation, fstype, options, source, dest, audit=False, deny=False, allow_keyword=False, comment='', log_event=None): + + super().__init__(audit=audit, deny=deny, + allow_keyword=allow_keyword, + comment=comment, + log_event=log_event) + + self.operation = operation + + self.fstype, self.all_fstype, unknown_items = check_and_split_list(fstype[1] if fstype != self.ALL else fstype, valid_fs, self.ALL, type(self).__name__, 'fstype') + self.is_fstype_equal = fstype[0] if not self.all_fstype else None + + self.options, self.all_options, unknown_items = check_and_split_list(options[1] if options != self.ALL else options, flags_keywords, self.ALL, type(self).__name__, 'options') + self.is_options_equal = options[0] if not self.all_options else None + + if source != self.ALL and source[0].isalpha(): + self.source = source + self.all_source = False + self.source_is_path = False + else: + self.source_is_path = True + self.source, self.all_source = self._aare_or_all(source, 'source', is_path=self.source_is_path, log_event=log_event) + + if not self.all_fstype and self.is_fstype_equal != "=" and self.is_fstype_equal != "in": + raise AppArmorBug(f'Invalid is_fstype_equal : {self.is_fstype_equal}') + if not self.all_options and self.is_options_equal != "=" and self.is_options_equal != "in": + raise AppArmorBug(f'Invalid is_options_equal : {self.is_options_equal}') + if self.operation != 'mount' and not self.all_source: + raise AppArmorException(f'Operation {self.operation} cannot have a source') + + flags_forbidden_with_source = {'remount', 'unbindable', 'shared', 'private', 'slave', 'runbindable', 'rshared', 'rprivate', 'rslave'} + if self.operation == 'mount' and not self.all_source and not self.all_options and flags_forbidden_with_source & self.options != set(): + raise AppArmorException(f'Operation {flags_forbidden_with_source & self.options} cannot have a source. Source = {self.source}') + + self.dest, self.all_dest = self._aare_or_all(dest, 'dest', is_path=True, log_event=log_event) + + self.can_glob = not self.all_source and not self.all_dest and not self.all_options + + + @classmethod + def _create_instance(cls, raw_rule, matches): + '''parse raw_rule and return instance of this class''' + + audit, deny, allow_keyword, comment = parse_modifiers(matches) + + operation = matches.group('operation') + + rule_details = '' + if matches.group('details'): + rule_details = matches.group('details') + + if operation == "mount": + parsed = RE_MOUNT_DETAILS.search(rule_details) + else: + parsed = RE_UMOUNT_DETAILS.search(rule_details) + + r = parsed.groupdict() if parsed else None + if not r: + raise AppArmorException('Can\'t parse mount rule ' + raw_rule) + + if r['fstype'] is not None: + is_fstype_equal = r['fstype_equals_or_in'] + fstype = strip_parenthesis(r['fstype']).replace(',', ' ').split() + else: + is_fstype_equal = None + fstype = cls.ALL + + if r['options'] is not None: + is_options_equal = r['options_equals_or_in'] + options = strip_parenthesis(r['options']).replace(',', ' ').split() + else: + is_options_equal =None + options = cls.ALL + + if operation == 'mount' and r['source_file'] is not None: # Umount cannot have a source + source = r['source_file'] + else: + source = cls.ALL + + if r['dest_file'] is not None: + dest = r['dest_file'] + else: + dest = cls.ALL + + else: + is_fstype_equal = None + is_options_equal = None + fstype = cls.ALL + options = cls.ALL + source = cls.ALL + dest = cls.ALL + + return cls(operation=operation, fstype=(is_fstype_equal, fstype), options=(is_options_equal, options), source=source, dest=dest, audit=audit, deny=deny, allow_keyword=allow_keyword, comment=comment) + + def get_clean(self, depth=0): + space = ' ' * depth + + fstype = ' fstype%s(%s)' % (self.is_fstype_equal, ', '.join(sorted(self.fstype))) if not self.all_fstype else '' + options = ' options%s(%s)' % (self.is_options_equal, ', '.join(sorted(self.options))) if not self.all_options else '' + + if self.operation == 'mount': + return ('%s%s%s%s%s%s%s,%s' % ( self.modifiers_str(), + space, + self.operation, + fstype, + options, + " " + str(self.source.regex) if not self.all_source else '', + " -> " + str(self.dest.regex) if not self.all_dest else '', + self.comment, + )) + else: + return ('%s%s%s%s%s%s,%s' % ( self.modifiers_str(), + space, + self.operation, + fstype, + options, + " " + str(self.dest.regex) if not self.all_dest else '', + self.comment, + )) + + def _is_covered_localvars(self, other_rule): + if self.operation != other_rule.operation: + return False + if self.is_fstype_equal != other_rule.is_fstype_equal: + return False + if self.is_options_equal != other_rule.is_options_equal: + return False + if not self._is_covered_list(self.fstype, self.all_fstype, other_rule.fstype, other_rule.all_fstype, 'fstype'): + return False + if not self._is_covered_list(self.options, self.all_options, other_rule.options, other_rule.all_options, 'options'): + return False + if not self.source_is_path and not other_rule.source_is_path: + if self.source != other_rule.source: + return False + elif self.source_is_path != other_rule.source_is_path: + return False + elif not self._is_covered_aare(self.source, self.all_source, other_rule.source, other_rule.all_source, 'source'): + return False + if not self._is_covered_aare(self.dest, self.all_dest, other_rule.dest, other_rule.all_dest, 'dest'): + return False + + return True + + def _is_equal_localvars(self, rule_obj, strict): + if self.operation != rule_obj.operation: + return False + if self.is_fstype_equal != rule_obj.is_fstype_equal: + return False + if self.is_options_equal != rule_obj.is_options_equal: + return False + if self.fstype != rule_obj.fstype or self.options != rule_obj.options: + return False + if not self.source_is_path and not rule_obj.source_is_path: + if self.source != rule_obj.source: + return False + elif self.source_is_path != rule_obj.source_is_path: + return False + elif not self._is_equal_aare(self.source, self.all_source, rule_obj.source, rule_obj.all_source, 'source'): + return False + if not self._is_equal_aare(self.dest, self.all_dest, rule_obj.dest, rule_obj.all_dest, 'dest'): + return False + + return True + + def glob(self): + '''Change path to next possible glob''' + if self.all_source and self.all_options: + return + + if not self.all_dest: + self.all_dest = True + self.dest = self.ALL + elif not self.all_source and type(self.source) is not str: + self.source = self.source.glob_path() + if self.source.is_equal('/**/'): + self.all_source= True + self.source=self.ALL + + else: + self.options = self.ALL + self.all_options = True + self.raw_rule = None + + def _logprof_header_localvars(self): + operation = self.operation + fstype = logprof_value_or_all(self.fstype, self.all_fstype) + options = logprof_value_or_all(self.options, self.all_options) + source = logprof_value_or_all(self.source, self.all_source) + dest = logprof_value_or_all(self.dest, self.all_dest) + + return ( + _('Operation'), operation, + _('Fstype'), (self.is_fstype_equal, fstype) if fstype != 'ALL' else fstype , + _('Options'), (self.is_options_equal, options) if options != 'ALL' else options , + _('Source'), source, + _('Destination'), dest, + + ) + +class MountRuleset(BaseRuleset): + '''Class to handle and store a collection of Mount rules''' + + + diff --git a/utils/test/cleanprof_test.out b/utils/test/cleanprof_test.out index d9865d7dc..41e875a07 100644 --- a/utils/test/cleanprof_test.out +++ b/utils/test/cleanprof_test.out @@ -28,7 +28,7 @@ $bar = true dbus send bus=session, - mount options=(rw,suid) /c -> /3, + mount options=(rw, suid) /c -> /3, signal set=(abrt alrm bus chld fpe hup ill int kill pipe quit segv stkflt term trap usr1 usr2), diff --git a/utils/test/test-mount.py b/utils/test/test-mount.py new file mode 100644 index 000000000..df3fa168f --- /dev/null +++ b/utils/test/test-mount.py @@ -0,0 +1,225 @@ +#!/usr/bin/python3 +# ---------------------------------------------------------------------- +# Copyright (C) 2024 Canonical, Ltd. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of version 2 of the GNU General Public +# License as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# ---------------------------------------------------------------------- + +import unittest +from collections import namedtuple +from common_test import AATest, setup_all_loops + +from apparmor.common import AppArmorException, AppArmorBug +from apparmor.translations import init_translation + +from apparmor.rule.mount import MountRule, valid_fs + +_ = init_translation() + + +class MountTestParse(AATest): + + tests = ( + # Rule Operation Filesystem Options Source Destination Audit Deny Allow Comment + ('mount fstype=bpf options=rw bpf -> /sys/fs/bpf/,', MountRule('mount', ('=',('bpf')), ('=',('rw')), "bpf", "/sys/fs/bpf/", False, False, False, '' )), + ('mount fstype=bpf options=(rw) random_label -> /sys/fs/bpf/,', MountRule('mount', ('=',('bpf')), ('=',('rw')), "random_label", "/sys/fs/bpf/", False, False, False, '' )), + ('mount,', MountRule('mount', MountRule.ALL, MountRule.ALL, MountRule.ALL, MountRule.ALL, False, False, False, '' )), + ('mount fstype=(ext3, ext4),', MountRule('mount', ('=',('ext3', 'ext4')), MountRule.ALL, MountRule.ALL, MountRule.ALL, False, False, False, '' )), + ('mount bpf,', MountRule('mount', MountRule.ALL, MountRule.ALL, "bpf", MountRule.ALL, False, False, False, '' )), + ('mount none,', MountRule('mount', MountRule.ALL, MountRule.ALL, "none", MountRule.ALL, False, False, False, '' )), + ('mount fstype=(ext3, ext4) options=(ro),', MountRule('mount', ('=',('ext3', 'ext4')), ('=',('ro')), MountRule.ALL, MountRule.ALL, False, False, False, '' )), + ('mount @{mntpnt},', MountRule('mount', MountRule.ALL, MountRule.ALL, "@{mntpnt}", MountRule.ALL, False, False, False, '' )), + ('mount /a,', MountRule('mount', MountRule.ALL, MountRule.ALL, "/a", MountRule.ALL, False, False, False, '' )), + ('mount fstype=(ext3, ext4) /a -> /b,', MountRule('mount', ('=',('ext3', 'ext4')), MountRule.ALL, "/a", "/b", False, False, False, '' )), + ('mount fstype=(ext3, ext4) options=(ro, rbind) /a -> /b,', MountRule('mount', ('=',('ext3', 'ext4')), ('=',('ro', 'rbind')), "/a", "/b", False, False, False, '' )), + ('mount fstype=(ext3, ext4) options=(ro, rbind) /a -> /b, #cmt', MountRule('mount', ('=',('ext3', 'ext4')), ('=',('ro', 'rbind')), "/a", "/b", False, False, False, ' #cmt')), + ('mount fstype=(ext3, ext4) options in (ro, rbind) /a -> /b,', MountRule('mount', ('=',('ext3', 'ext4')), ('in',('ro', 'rbind')), "/a", "/b", False, False, False, '' )), + ('mount fstype in (ext3, ext4) options=(ro, rbind) /a -> /b, #cmt', MountRule('mount', ('in',('ext3', 'ext4')),('=',('ro', 'rbind')), "/a", "/b", False, False, False, ' #cmt')), + ('mount fstype in (ext3, ext4) option in (ro, rbind) /a, #cmt', MountRule('mount', ('in',('ext3', 'ext4')),('in',('ro', 'rbind')), "/a", MountRule.ALL, False, False, False, ' #cmt')), + ('mount fstype=(ext3, ext4) option=(ro, rbind) /a -> /b, #cmt', MountRule('mount', ('=',('ext3', 'ext4')), ('=', ('ro', 'rbind')), "/a", "/b", False, False, False, ' #cmt')), + ('mount options=(rw, rbind) /usr/lib{,32,64,x32}/modules/ -> /tmp/snap.rootfs_*{,/usr}/lib/modules/,', + MountRule('mount', MountRule.ALL, ('=',('rw', 'rbind')), "/usr/lib{,32,64,x32}/modules/", + "/tmp/snap.rootfs_*{,/usr}/lib/modules/", + False, False, False, '' )), + ('umount,', MountRule('umount', MountRule.ALL, MountRule.ALL, MountRule.ALL, MountRule.ALL, False, False, False, '' )), + ('umount fstype=ext3,', MountRule('umount', ('=',('ext3')), MountRule.ALL, MountRule.ALL, MountRule.ALL, False, False, False, '' )), + ('umount /a,', MountRule('umount', MountRule.ALL, MountRule.ALL, MountRule.ALL, "/a", False, False, False, '' )), + + ('remount,', MountRule('remount',MountRule.ALL, MountRule.ALL, MountRule.ALL, MountRule.ALL, False, False, False, '' )), + ('remount fstype=ext4,', MountRule('remount',('=',('ext4')), MountRule.ALL, MountRule.ALL, MountRule.ALL, False, False, False, '' )), + ('remount /b,', MountRule('remount',MountRule.ALL, MountRule.ALL, MountRule.ALL, "/b", False, False, False, '' )), + + + ) + + def _run_test(self, rawrule, expected): + self.assertTrue(MountRule.match(rawrule)) + obj = MountRule.create_instance(rawrule) + expected.raw_rule = rawrule.strip() + self.assertTrue(obj.is_equal(expected, True)) + + +class MountTestParseInvalid(AATest): + tests = ( + ('mount fstype=,', AppArmorException), + ('mount fstype=(foo),', AppArmorException), + ('mount fstype=(),', AppArmorException), + ('mount options=(),', AppArmorException), + ('mount option=(invalid),', AppArmorException), + ('mount option=(ext3ext4),',AppArmorException), + ) + + def _run_test(self, rawrule, expected): + self.assertTrue(MountRule.match(rawrule)) # the above invalid rules still match the main regex! + with self.assertRaises(expected): + MountRule.create_instance(rawrule) + + def test_parse_fail(self): + with self.assertRaises(AppArmorException): + MountRule.create_instance('foo,') + + def test_diff_non_mountrule(self): + exp = namedtuple('exp', ('audit', 'deny')) + obj = MountRule('mount',("=", '(ext4)'), MountRule.ALL, MountRule.ALL, MountRule.ALL) + with self.assertRaises(AppArmorBug): + obj.is_equal(exp(False, False), False) + + def test_diff_invalid_fstype_equals_or_in(self): + with self.assertRaises(AppArmorBug): + MountRule('mount', ('ext3', '(ext4)'), MountRule.ALL, MountRule.ALL, MountRule.ALL) # fstype[0] should be '=' or 'in' + + def test_diff_invalid_options_equals_or_in(self): + with self.assertRaises(AppArmorBug): + MountRule('mount', MountRule.ALL, ('rbind', '(rw)'), MountRule.ALL, MountRule.ALL) # fstype[0] should be '=' or 'in' + + def test_diff_fstype(self): + obj1 = MountRule('mount',("=", '(ext4)'), MountRule.ALL, MountRule.ALL, MountRule.ALL) + obj2 = MountRule('mount',MountRule.ALL, MountRule.ALL, MountRule.ALL, MountRule.ALL) + self.assertFalse(obj1.is_equal(obj2, False)) + + def test_diff_source(self): + obj1 = MountRule('mount',MountRule.ALL, MountRule.ALL, "/foo", MountRule.ALL) + obj2 = MountRule('mount',MountRule.ALL, MountRule.ALL, "/bar", MountRule.ALL) + self.assertFalse(obj1.is_equal(obj2, False)) + + def test_invalid_umount_with_source(self): + with self.assertRaises(AppArmorException): + MountRule('umount', MountRule.ALL, MountRule.ALL, "/foo", MountRule.ALL) # Umount and remount shall not have a source + + def test_invalid_remount_with_source(self): + with self.assertRaises(AppArmorException): + MountRule('remount', MountRule.ALL, MountRule.ALL, "/foo", MountRule.ALL) + + +class MountTestFilesystems(AATest): + def test_fs(self): + with open("/proc/filesystems") as f: + for line in f: + self.assertTrue(line.split()[-1] in valid_fs) + + +class MountTestGlob(AATest): + def test_glob(self): + globList = [( + "mount options=(bind, rw) /home/user/Downloads/ -> /mnt/a/,", + "mount options=(bind, rw) /home/user/Downloads/,", + "mount options=(bind, rw) /home/user/*/,", + "mount options=(bind, rw) /home/**/,", + "mount options=(bind, rw),", + "mount,", + "mount,", + )] + for globs in globList: + for i in range(len(globs)-1): + rule = MountRule.create_instance(globs[i]) + rule.glob() + self.assertEqual(rule.get_clean(), globs[i+1]) + + +class MountTestClean(AATest): + tests = ( + # raw rule clean rule + (' mount , # foo ', 'mount, # foo'), + (' mount fstype = ( sysfs ) , ', 'mount fstype=(sysfs),'), + (' mount fstype = ( sysfs , procfs ) , ', 'mount fstype=(procfs, sysfs),'), + (' mount options = ( rw ) , ', 'mount options=(rw),'), + (' mount options = ( rw , noatime ) , ', 'mount options=(noatime, rw),'), + (' umount /foo , ', 'umount /foo,'), + (' remount /foo , ', 'remount /foo,'), + ) + + def _run_test(self, rawrule, expected): + self.assertTrue(MountRule.match(rawrule)) + obj = MountRule.create_instance(rawrule) + clean = obj.get_clean() + raw = obj.get_raw() + + self.assertEqual(expected, clean, 'unexpected clean rule') + self.assertEqual(rawrule.strip(), raw, 'unexpected raw rule') + +class MountLogprofHeaderTest(AATest): + tests = ( + ('mount,', [_('Operation'), _('mount'), _('Fstype'), _('ALL'), _('Options'), _('ALL'), _('Source'), _('ALL'), _('Destination'), _('ALL')]), + ('mount options=(ro, nosuid) /a,', [_('Operation'), _('mount'), _('Fstype'), _('ALL'), _('Options'), ("=", _('nosuid ro')),_('Source'), _('/a'), _('Destination'), _('ALL')]), + ('mount fstype=(ext3, ext4) options=(ro, nosuid) /a -> /b,', [_('Operation'), _('mount'), _('Fstype'), ("=", _('ext3 ext4')),_('Options'), ("=", _('nosuid ro')),_('Source'), _('/a'), _('Destination'), _('/b')]) + ) + + def _run_test(self, params, expected): + obj = MountRule.create_instance(params) + self.assertEqual(obj.logprof_header(), expected) + + +class MountIsCoveredTest(AATest): + def test_is_covered(self): + obj = MountRule("mount", ("=", ('ext3', 'ext4')), ("=", ('ro')), "/foo/b*", "/b*") + tests = [ + ("mount", ("=", ('ext3', 'ext4')), ("=", ('ro')), "/foo/b", "/bar"), + ("mount", ("=", ('ext3', 'ext4')), ("=", ('ro')), "/foo/bar", "/b") + ] + for test in tests: + self.assertTrue(obj.is_covered(MountRule(*test))) + self.assertFalse(obj.is_equal(MountRule(*test))) + + def test_is_covered_fs_source(self): + obj = MountRule("mount", ("=", ('ext3', 'ext4')), ("=", ('ro')), "tmpfs", MountRule.ALL) + self.assertTrue(obj.is_covered(MountRule("mount", ("=", ('ext3')), ("=", ('ro')), "tmpfs", MountRule.ALL))) + self.assertFalse(obj.is_equal(MountRule("mount", ("=", ('ext3')), ("=", ('ro')), "tmpfs", MountRule.ALL))) + + def test_is_notcovered(self): + obj = MountRule("mount", ("=", ('ext3', 'ext4')), ("=", ('ro')), "/foo/b*", "/b*") + tests = [ + ("mount", ("in", ('ext3', 'ext4')), ("=", ('ro')), "/foo/bar", "/bar" ), + ("mount", ("=", ('procfs, ext4')), ("=", ('ro')), "/foo/bar", "/bar" ), + ("mount", ("=", ('ext3')), ("=", ('rw')), "/foo/bar", "/bar" ), + ("mount", ("=", ('ext3', 'ext4')), MountRule.ALL, "/foo/b*", "/bar" ), + ("mount", MountRule.ALL, ("=", ('ro')), "/foo/b*", "/bar" ), + ("mount", ("=", ('ext3', 'ext4')), ("=", ('ro')), "/invalid/bar", "/bar" ), + ("umount", MountRule.ALL, MountRule.ALL, MountRule.ALL, "/bar" ), + ("remount", MountRule.ALL, MountRule.ALL, MountRule.ALL, "/bar" ), + ("mount", ("=", ('ext3', 'ext4')), ("=", ('ro')), "tmpfs", "/bar" ), + ("mount", ("=", ('ext3', 'ext4')), ("=", ('ro')), "/foo/b*", "/invalid" ), + ] + for test in tests: + self.assertFalse(obj.is_covered(MountRule(*test))) + self.assertFalse(obj.is_equal(MountRule(*test))) + + + def test_is_not_covered_fs_source(self): + obj = MountRule("mount", ("=", ('ext3', 'ext4')), ("=", ('ro')), "tmpfs", MountRule.ALL) + test = ("mount", ("=", ('ext3', 'ext4')), ("=", ('ro')), "procfs", MountRule.ALL) + self.assertFalse(obj.is_covered(MountRule(*test))) + self.assertFalse(obj.is_equal(MountRule(*test))) + + + +setup_all_loops(__name__) +if __name__ == '__main__': + unittest.main(verbosity=1) diff --git a/utils/test/test-parser-simple-tests.py b/utils/test/test-parser-simple-tests.py index 234f9f482..efde47eba 100644 --- a/utils/test/test-parser-simple-tests.py +++ b/utils/test/test-parser-simple-tests.py @@ -47,6 +47,13 @@ exception_not_raised = ( # interesting[tm] profile name 'change_hat/bad_parsing.sd', + 'dbus/bad_regex_04.sd', + 'dbus/bad_regex_05.sd', + 'dbus/bad_regex_06.sd', + 'file/bad_re_brace_1.sd', + 'file/bad_re_brace_2.sd', + 'file/bad_re_brace_3.sd', + # The tools don't detect conflicting change_profile exec modes 'change_profile/onx_conflict_unsafe1.sd', 'change_profile/onx_conflict_unsafe2.sd', @@ -70,49 +77,14 @@ exception_not_raised = ( 'file/bad_re_brace_1.sd', 'file/bad_re_brace_2.sd', 'file/bad_re_brace_3.sd', - 'mount/bad_1.sd', - 'mount/bad_2.sd', - 'mount/bad_3.sd', - 'mount/bad_4.sd', - 'mount/bad_opt_10.sd', - 'mount/bad_opt_11.sd', - 'mount/bad_opt_12.sd', - 'mount/bad_opt_13.sd', - 'mount/bad_opt_14.sd', - 'mount/bad_opt_15.sd', - 'mount/bad_opt_16.sd', - 'mount/bad_opt_17.sd', - 'mount/bad_opt_18.sd', - 'mount/bad_opt_19.sd', - 'mount/bad_opt_1.sd', - 'mount/bad_opt_20.sd', - 'mount/bad_opt_21.sd', - 'mount/bad_opt_22.sd', - 'mount/bad_opt_23.sd', - 'mount/bad_opt_24.sd', - 'mount/bad_opt_2.sd', - 'mount/bad_opt_3.sd', - 'mount/bad_opt_4.sd', - 'mount/bad_opt_5.sd', - 'mount/bad_opt_6.sd', - 'mount/bad_opt_7.sd', - 'mount/bad_opt_8.sd', - 'mount/bad_opt_9.sd', - 'mount/bad_opt_25.sd', - 'mount/bad_opt_26.sd', - 'mount/bad_opt_27.sd', - 'mount/bad_opt_28.sd', + + # We do not check that options are compatible 'mount/bad_opt_29.sd', 'mount/bad_opt_30.sd', 'mount/bad_opt_31.sd', - 'mount/bad_opt_32.sd', - 'mount/bad_opt_35.sd', - 'mount/bad_opt_36.sd', - 'mount/bad_opt_37.sd', - 'mount/bad_opt_38.sd', - 'mount/bad_opt_39.sd', - 'mount/bad_opt_40.sd', - 'mount/bad_opt_41.sd', + 'mount/bad_1.sd', + 'mount/bad_2.sd', + 'profile/flags/flags_bad10.sd', 'profile/flags/flags_bad11.sd', 'profile/flags/flags_bad12.sd', @@ -341,6 +313,48 @@ unknown_line = ( 'bare_include_tests/ok_84.sd', 'bare_include_tests/ok_85.sd', 'bare_include_tests/ok_86.sd', + + # option = make-${valid-option} (e.g. make-private) is not supported + 'mount/ok_opt_48.sd', + 'mount/ok_opt_49.sd', + 'mount/ok_opt_50.sd', + 'mount/ok_opt_51.sd', + 'mount/ok_opt_52.sd', + 'mount/ok_opt_53.sd', + 'mount/ok_opt_54.sd', + 'mount/ok_opt_55.sd', + + # Mount with flags in {remount, [r]unbindable, [r]shared, [r]private, [r]slave} does not support a source + 'mount/ok_opt_68.sd', + 'mount/ok_opt_69.sd', + 'mount/ok_opt_70.sd', + 'mount/ok_opt_71.sd', + 'mount/ok_opt_72.sd', + 'mount/ok_opt_73.sd', + 'mount/ok_opt_74.sd', + 'mount/ok_opt_75.sd', + + # option = make-${valid-option} (e.g. make-private) is not supported + 'mount/ok_opt_76.sd', + 'mount/ok_opt_77.sd', + 'mount/ok_opt_78.sd', + 'mount/ok_opt_79.sd', + 'mount/ok_opt_80.sd', + 'mount/ok_opt_81.sd', + 'mount/ok_opt_82.sd', + 'mount/ok_opt_83.sd', + 'mount/ok_opt_84.sd', + + # According to spec mount should be in the form fstype=... options=... and NOT in the form options=... fstype=... + 'mount/ok_opt_combo_3.sd', + 'mount/ok_opt_combo_2.sd', + 'mount/ok_opt_combo_1.sd', + 'mount/ok_opt_combo_4.sd', + + # Invalid keyword: read-only --> Should be ro + 'mount/ok_opt_3.sd', + # Options should be comma separated + 'mount/in_4.sd', # also order option then fstype is invalid ) # testcases with various unexpected failures