From 2b32130280d53497a1e369faabe5b0a2557fffd7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maxime=20B=C3=A9lair?= Date: Tue, 17 Sep 2024 09:17:23 +0000 Subject: [PATCH] aa-notify: Simplify user interfaces and update man page --- utils/aa-notify | 172 +++++++++++++++++++++---------- utils/aa-notify.pod | 28 ++++- utils/apparmor/aa.py | 6 +- utils/apparmor/config.py | 11 +- utils/apparmor/gui.py | 105 +++++-------------- utils/apparmor/logparser.py | 17 +-- utils/apparmor/rule/file.py | 32 ++++-- utils/apparmor/update_profile.py | 2 +- utils/notify.conf | 3 + utils/test/test-file.py | 33 +++++- 10 files changed, 242 insertions(+), 167 deletions(-) diff --git a/utils/aa-notify b/utils/aa-notify index ffa39dc2e..30b71c369 100755 --- a/utils/aa-notify +++ b/utils/aa-notify @@ -49,12 +49,13 @@ import apparmor.ui as aaui import apparmor.config as aaconfig import apparmor.update_profile as update_profile import LibAppArmor # C-library to parse one log line -from apparmor.common import DebugLogger, open_file_read, AppArmorException +from apparmor.common import DebugLogger, open_file_read from apparmor.fail import enable_aa_exception_handler from apparmor.notify import get_last_login_timestamp from apparmor.translations import init_translation from apparmor.logparser import ReadLog -from apparmor.gui import UsernsGUI, AddProfileGUI, ErrorGUI, ShowMoreGUI, set_interface_theme +from apparmor.gui import UsernsGUI, ErrorGUI, ShowMoreGUI, set_interface_theme +from apparmor.rule.file import FileRule from dbus import DBusException import gi @@ -420,6 +421,13 @@ def compile_filter_regex(filters): return filters +def can_allow_rule(ev, special_profiles): + if customized_message['userns']['cond'](ev, special_profiles): + return not aa.get_profile_filename_from_profile_name(ev['comm']) + else: + return aa.get_profile_filename_from_profile_name(ev['profile']) is not None + + def is_special_profile_userns(ev, special_profiles): if not special_profiles or ev['profile'] not in special_profiles: return False # We don't use special profiles or there is already a profile defined: we don't ask to add userns @@ -430,23 +438,41 @@ def is_special_profile_userns(ev, special_profiles): return True -def ask_for_user_ns_denied(path, name, profile_path): - gui = UsernsGUI(name, path) - ans = gui.show() - if ans in {'allow', 'deny'}: - update_profile_path = update_profile.__file__ - local_template_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'default_unconfined.template') - if os.path.exists(local_template_path): # We are using local aa-notify -> we use local template - template_path = local_template_path - else: - template_path = aa.CONFDIR + '/default_unconfined.template' +def create_userns_profile(name, path, ans): + update_profile_path = update_profile.__file__ - command = ['pkexec', '--keep-cwd', update_profile_path, 'create_userns', template_path, name, path, profile_path, ans] - try: - subprocess.run(command, check=True) - except subprocess.CalledProcessError as e: - if e.returncode != 126: # return code 126 means the user cancelled the request - UsernsGUI.show_error_cannot_reload_profile(profile_path, e.returncode) + local_template_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'default_unconfined.template') + if os.path.exists(local_template_path): # We are using local aa-notify -> we use local template + template_path = local_template_path + else: + template_path = aa.CONFDIR + '/default_unconfined.template' + + if path is None: + UsernsGUI.show_error_cannot_find_execpath(name, template_path) + return + + profile_path = aa.get_profile_filename_from_profile_name(name, True) + if not profile_path: + ErrorGUI(_('Cannot get profile path for {}.').format(name), False).show() + return + + command = ['pkexec', '--keep-cwd', update_profile_path, 'create_userns', template_path, name, path, profile_path, ans] + + try: + subprocess.run(command, check=True) + except subprocess.CalledProcessError as e: + if e.returncode != 126: # return code 126 means the user cancelled the request + UsernsGUI.show_error_cannot_reload_profile(profile_path, e.returncode) + + +def ask_for_user_ns_denied(path, name, interactive=True): + if interactive: + gui = UsernsGUI(name, path) + ans = gui.show() + else: + ans = 'allow' + if ans in {'allow', 'deny'}: + create_userns_profile(name, path, ans) else: debug_logger.debug('No action from the user for {}'.format(path)) @@ -470,24 +496,20 @@ def prompt_userns(ev, special_profiles): # but if execpath is not supported by the kernel it could also mean that we inferred a bad path # So we do nothing beyond showing this error. ErrorGUI( - _('Application {0} tried to create an user namespace, but a profile already exists with this name.\n' - 'This is likely because there is several binaries named {0} thus the path inferred by AppArmor ({1}) is not correct.\n' - 'You should review your profiles (in {2}).').format(ev['comm'], ev['execpath'], aa.profile_dir), + _('Application {profile} tried to create an user namespace, but a profile already exists with this name.\n' + 'This is likely because there is several binaries named {profile} thus the path inferred by AppArmor ({inferred_path}) is not correct.\n' + 'You should review your profiles (in {profile_dir}).').format(profile=ev['comm'], inferred_path=ev['execpath'], profile_dir=aa.profile_dir), False).show() return True - profile_path = aa.get_profile_filename_from_profile_name(ev['comm'], True) - if not profile_path: - return False - - ask_for_user_ns_denied(ev['execpath'], ev['comm'], profile_path) + ask_for_user_ns_denied(ev['execpath'], ev['comm']) return True # TODO reuse more code from aa-logprof in callbacks def cb_more_info(notification, action, _args): - (raw_ev, rl) = _args + (raw_ev, rl, special_profiles) = _args notification.close() parsed_event = rl.parse_record(raw_ev) @@ -501,19 +523,54 @@ def cb_more_info(notification, action, _args): rule = rl.create_rule_from_ev(parsed_event) + # Exec events are created with the default FileRule.ANY_EXEC. We use Pix for actual rules + if type(rule) is FileRule and rule.exec_perms == FileRule.ANY_EXEC: + rule.exec_perms = 'Pix' + if rule: aa.update_profiles() - profile_path = aa.get_profile_filename_from_profile_name(parsed_event['profile']) - if profile_path: - out += _('If you want to allow this operation you can add the line below in profile {}\n').format(profile_path) - out += rule.get_clean() + if customized_message['userns']['cond'](parsed_event, special_profiles): + profile_path = None + out += _('You may allow it through a dedicated unconfined profile for {}.').format(parsed_event['comm']) else: - out += _('However {0} is not in {1}\nIt is likely that the profile was not stored in {1} or was removed.\n').format(parsed_event['profile'], aa.profile_dir) + profile_path = aa.get_profile_filename_from_profile_name(parsed_event['profile']) + if profile_path: + out += _('If you want to allow this operation you can add the line below in profile {}\n').format(profile_path) + out += rule.get_clean() + else: + out += _('However {profile} is not in {profile_dir}\nIt is likely that the profile was not stored in {profile_dir} or was removed.\n').format(profile=parsed_event['profile'], profile_dir=aa.profile_dir) else: # Should not happen out += _('ERROR: Could not create rule from event.') return - ShowMoreGUI(profile_path, out, profile_path is not None).show() + ans = ShowMoreGUI(profile_path, out, rule.get_clean(), parsed_event['profile'], profile_path is not None).show() + if ans == 'add_rule': + add_to_profile(rule.get_clean(), parsed_event['profile']) + elif ans in {'allow', 'deny'}: + create_userns_profile(parsed_event['comm'], parsed_event['execpath'], ans) + + +def add_to_profile(rule, profile_name): + # We get update_profile.py through this import so that it works in all cases + profile_path = aa.get_profile_filename_from_profile_name(profile_name) + + if not profile_path: + ErrorGUI( + _( + 'Cannot find profile for {}\n\n' + 'It is likely that the profile was not stored in {} or was removed.' + ).format(profile_name, aa.profile_dir), + False + ).show() + return + + update_profile_path = update_profile.__file__ + command = ['pkexec', '--keep-cwd', update_profile_path, 'add_rule', rule, profile_name] + try: + subprocess.run(command, check=True) + except subprocess.CalledProcessError as e: + if e.returncode != 126: # return code 126 means the user cancelled the request + ErrorGUI(_('Failed to add rule {rule} to {profile}\nError code = {retcode}').format(rule=rule, profile=profile_name, retcode=e.returncode), False).show() def cb_add_to_profile(notification, action, _args): @@ -523,26 +580,20 @@ def cb_add_to_profile(notification, action, _args): rule = rl.create_rule_from_ev(parsed_event) + # Exec events are created with the default FileRule.ANY_EXEC. We use Pix for actual rules + if type(rule) is FileRule and rule.exec_perms == FileRule.ANY_EXEC: + rule.exec_perms = 'Pix' + if not rule: - ErrorGUI.show(_('ERROR: Could not create rule from event.')) + ErrorGUI(_('ERROR: Could not create rule from event.'), False).show() return aa.update_profiles() if customized_message['userns']['cond'](parsed_event, special_profiles): - if parsed_event['execpath'] is None: - UsernsGUI.show_error_cannot_find_execpath(parsed_event['comm'], os.path.dirname(os.path.abspath(__file__)) + '/default_unconfined.template') - return - profile_path = aa.get_profile_filename_from_profile_name(parsed_event['comm'], True) - if not profile_path: - ErrorGUI(_('Cannot get profile path for {}.').format(parsed_event['comm']), False).show() - return - ask_for_user_ns_denied(parsed_event['execpath'], parsed_event['comm'], profile_path) + ask_for_user_ns_denied(parsed_event['execpath'], parsed_event['comm'], False) else: - try: - AddProfileGUI(rule.get_clean(), parsed_event['profile']).show() - except AppArmorException: - AddProfileGUI.show_error_cannot_find_profile(parsed_event['profile']) + add_to_profile(rule.get_clean(), parsed_event['profile']) customized_message = { @@ -646,15 +697,6 @@ def main(): else: # fallback to env variable (or None if not set) confdir = os.getenv('__AA_CONFDIR') - # Todo: add more kinds of notifications - supported_prompt_filter = {'userns'} - - if args.prompt_filter: - args.prompt_filter = set(args.prompt_filter.strip().split(',')) - unsupported = args.prompt_filter - supported_prompt_filter - if unsupported: - sys.exit(_('ERROR: using an unsupported prompt filter: {}\nSupported values: {}').format(', '.join(unsupported), ', '.join(supported_prompt_filter))) - aa.init_aa(confdir=confdir) # Initialize aa.logfile @@ -693,6 +735,7 @@ def main(): - userns_special_profiles - ignore_denied_capability - interface_theme + - prompt_filter - filter.profile, - filter.operation, - filter.name, @@ -709,6 +752,7 @@ def main(): 'userns_special_profiles', 'ignore_denied_capability', 'interface_theme', + 'prompt_filter', 'show_notifications', 'message_body', 'message_footer', @@ -798,6 +842,16 @@ def main(): else: set_interface_theme('ubuntu') + # Todo: add more kinds of notifications + supported_prompt_filter = {'userns'} + if not args.prompt_filter and 'prompt_filter' in config['']: + args.prompt_filter = config['']['prompt_filter'] + if args.prompt_filter: + args.prompt_filter = set(args.prompt_filter.strip().split(',')) + unsupported = args.prompt_filter - supported_prompt_filter + if unsupported: + sys.exit(_('ERROR: using an unsupported prompt filter: {}\nSupported values: {}').format(', '.join(unsupported), ', '.join(supported_prompt_filter))) + if args.file: logfile = args.file elif os.path.isfile('/var/run/auditd.pid') and os.path.isfile('/var/log/audit/audit.log'): @@ -831,6 +885,9 @@ def main(): # Required to parse_record. rl = ReadLog('', '', '') + # Initialize the list of profiles for can_allow_rule + aa.read_profiles() + # At this point this script needs to be able to read 'logfile' but once # the for loop starts, privileges can be dropped since the file descriptor # has been opened and access granted. Further reads of the file will not @@ -865,8 +922,9 @@ def main(): 'gtk-dialog-warning' ) - n.add_action('clicked', 'Allow', cb_add_to_profile, (event, rl, userns_special_profiles)) - n.add_action('more_clicked', 'Show More', cb_more_info, (event, rl)) + if can_allow_rule(ev, userns_special_profiles): + n.add_action('clicked', 'Allow', cb_add_to_profile, (event, rl, userns_special_profiles)) + n.add_action('more_clicked', 'Show More', cb_more_info, (event, rl, userns_special_profiles)) n.show() diff --git a/utils/aa-notify.pod b/utils/aa-notify.pod index 591385c75..fea8a25ee 100644 --- a/utils/aa-notify.pod +++ b/utils/aa-notify.pod @@ -86,11 +86,24 @@ displays a short usage statement. System-wide configuration for B is done via /etc/apparmor/notify.conf: - # set to 'yes' to enable AppArmor DENIED notifications + # Set to 'no' to disable AppArmor notifications globally show_notifications="yes" - # only people in use_group can use aa-notify - use_group="admin" + # Special profiles used to remove privileges for unconfined binaries using user namespaces. If unsure, leave as is. + userns_special_profiles="unconfined,unprivileged_userns" + + # Theme for aa-notify GUI. See https://ttkthemes.readthedocs.io/en/latest/themes.html for available themes. + interface_theme="ubuntu" + + # Binaries for which we ignore userns-related capability denials + ignore_denied_capability="sudo,su" + + # OPTIONAL - kind of operations which display a popup prompt. + prompt_filter="userns" + + # OPTIONAL - restrict using aa-notify to users in the given group + # (if not set, everybody who has permissions to read the logfile can use it) + # use_group="admin" # OPTIONAL - custom notification message body message_body="This is a custom notification message." @@ -98,6 +111,15 @@ System-wide configuration for B is done via # OPTIONAL - custom notification message footer message_footer="For more information visit https://foo.com" + # OPTIONAL - custom notification filtering + # Filters are used to reduce the output of information to only those entries that will match the filter. Filters use Python's regular expression syntax. + filter.profile="^(foo|bar)$" # Match the profile: Only shows notifications for profiles "foo" or "bar" + filter.operation="^open$" # Match the operation: Only shows notifications for "open" operation + filter.name="^(?!/usr/lib/)" # Match the name: Excludes notifications for names starting by "/usr/lib/" + filter.denied="^r$" # Match the denied_mask: Only shows notifications where "r", and only "r", was denied + filter.family="^inet$" # Match the network family: Only shows notifications for "inet" family + filter.socket="stream" # Match the network socket type: Only shows notifications for "stream" sockets + Per-user configuration is done via $XDG_CONFIG_HOME/apparmor/notify.conf (or the deprecated ~/.apparmor/notify.conf if it exists): diff --git a/utils/apparmor/aa.py b/utils/apparmor/aa.py index 5885659f0..386708fc5 100644 --- a/utils/apparmor/aa.py +++ b/utils/apparmor/aa.py @@ -1602,9 +1602,9 @@ def collapse_log(hashlog, ignore_null_profiles=True): log_dict[aamode][final_name] = ProfileStorage(profile, hat, 'collapse_log()') for ev_type, ev_class in ReadLog.ruletypes.items(): - for event in ev_class.from_hashlog(hashlog[aamode][full_profile][ev_type]): - if not hat_exists or not is_known_rule(aa[profile][hat], ev_type, event): - log_dict[aamode][final_name][ev_type].add(event) + for rule in ev_class.from_hashlog(hashlog[aamode][full_profile][ev_type]): + if not hat_exists or not is_known_rule(aa[profile][hat], ev_type, rule): + log_dict[aamode][final_name][ev_type].add(rule) return log_dict diff --git a/utils/apparmor/config.py b/utils/apparmor/config.py index 7226819f0..04dd38b0b 100644 --- a/utils/apparmor/config.py +++ b/utils/apparmor/config.py @@ -19,6 +19,11 @@ from configparser import ConfigParser from tempfile import NamedTemporaryFile from apparmor.common import AppArmorException, open_file_read # , warn, msg, +import apparmor.ui as aaui + +from apparmor.translations import init_translation + +_ = init_translation() # CFG = None @@ -103,7 +108,11 @@ class Config: config = {'': dict()} with open_file_read(filepath) as conf_file: for line in conf_file: - result = shlex.split(line, True) + try: + result = shlex.split(line, True) + except ValueError as e: + aaui.UI_Important(_('Warning! invalid line \'{line}\' in config file: {err}').format(line=line[:-1], err=e)) + continue # If not a comment of empty line if result: # option="value" or option=value type diff --git a/utils/apparmor/gui.py b/utils/apparmor/gui.py index 2cefc07e7..9fbf265ec 100644 --- a/utils/apparmor/gui.py +++ b/utils/apparmor/gui.py @@ -1,13 +1,10 @@ import os import tkinter as tk import tkinter.ttk as ttk -import tkinter.font as font import subprocess import ttkthemes import apparmor.aa as aa -import apparmor.update_profile as update_profile -from apparmor.common import AppArmorException from apparmor.translations import init_translation @@ -28,6 +25,7 @@ class GUI: print(_('ERROR: Cannot initialize Tkinter. Please check that your terminal can use a graphical interface')) os._exit(1) + self.result = None style = ttkthemes.ThemedStyle(self.master) style.theme_use(interface_theme) self.bg_color = style.lookup('TLabel', 'background') @@ -40,73 +38,19 @@ class GUI: self.button_frame = ttk.Frame(self.master, padding=(0, 10)) self.button_frame.pack() - -class AddProfileGUI(GUI): - def __init__(self, rule, profile_name, dbg=None): - self.dbg = dbg - self.rule = rule - self.profile_name = profile_name - self.profile_path = aa.get_profile_filename_from_profile_name(profile_name) - - if not self.profile_path: - raise AppArmorException('Cannot find profile for {}'.format(self.profile_name)) - super().__init__() - - self.master.title(_('AppArmor - Add rule to profile')) - - self.profile_label = ttk.Label(self.label_frame, text=_('Profile for: {}').format(self.profile_name)) - self.profile_label.pack() - - self.entry_frame = ttk.Frame(self.master) - self.entry_frame.pack() - - self.rule = tk.StringVar(value=self.rule) - self.rule_entry = ttk.Entry(self.entry_frame, font=font.nametofont("TkDefaultFont"), width=50, textvariable=self.rule) - self.rule_entry.pack(side=tk.LEFT) - - self.button_frame = ttk.Frame(self.master, padding=(10, 10)) - self.button_frame.pack() - - self.add_to_profile_button = ttk.Button(self.button_frame, text=_('Add to Profile'), command=self.add_to_profile) - self.add_to_profile_button.pack(side=tk.LEFT) - - self.show_profile_button = ttk.Button(self.button_frame, text=_('Show Current Profile'), command=lambda: open_with_default_editor(self.profile_path)) - self.show_profile_button.pack(side=tk.LEFT) - - self.cancel_button = ttk.Button(self.button_frame, text=_('Cancel'), command=self.master.destroy) - self.cancel_button.pack(side=tk.LEFT) - - def add_to_profile(self): - if self.dbg: - self.dbg.debug('Adding rule \'{}\' to profile at: {}'.format(self.rule.get(), self.profile_name)) - - # We get update_profile.py through this import so that it works in all cases - update_profile_path = update_profile.__file__ - command = ['pkexec', '--keep-cwd', update_profile_path, 'add_rule', self.rule.get(), self.profile_name] - try: - subprocess.run(command, check=True) - except subprocess.CalledProcessError as e: - if e.returncode != 126: # return code 126 means the user cancelled the request - ErrorGUI(_('Failed to add rule {} to {}\n Error code = {}').format(self.rule.get(), self.profile_name, e.returncode), False).show() - - self.master.destroy() - def show(self): self.master.mainloop() + return self.result - @staticmethod - def show_error_cannot_find_profile(profile_name): - ErrorGUI( - _( - 'Cannot find profile for {}\n\n' - 'It is likely that the profile was not stored in {} or was removed.' - ).format(profile_name, aa.profile_dir), - False - ).show() + def set_result(self, result): + self.result = result + self.master.destroy() class ShowMoreGUI(GUI): - def __init__(self, profile_path, msg, profile_found=True): + def __init__(self, profile_path, msg, rule, profile_name, profile_found=True): + self.rule = rule + self.profile_name = profile_name self.profile_path = profile_path self.msg = msg self.profile_found = profile_found @@ -121,17 +65,25 @@ class ShowMoreGUI(GUI): if self.profile_found: self.show_profile_button = ttk.Button(self.button_frame, text=_('Show Current Profile'), command=lambda: open_with_default_editor(self.profile_path)) - self.show_profile_button.pack() + self.show_profile_button.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5) - def show(self): - self.master.mainloop() + self.add_to_profile_button = ttk.Button(self.button_frame, text=_('Allow'), command=lambda: self.set_result('add_rule')) + self.add_to_profile_button.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5) + elif rule == 'userns create,': + self.add_policy_button = ttk.Button(self.master, text=_('Allow'), command=lambda: self.set_result('allow')) + self.add_policy_button.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5) + + self.never_ask_button = ttk.Button(self.master, text=_('Deny'), command=lambda: self.set_result('deny')) + self.never_ask_button.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5) + + self.do_nothing_button = ttk.Button(self.master, text=_('Do nothing'), command=self.master.destroy) + self.do_nothing_button.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5) class UsernsGUI(GUI): def __init__(self, name, path): self.name = name self.path = path - self.result = None super().__init__() @@ -144,10 +96,10 @@ class UsernsGUI(GUI): link.pack() link.bind('', self.more_info) - self.add_policy_button = ttk.Button(self.master, text=_('Allow'), command=lambda: self.set_result("allow")) + self.add_policy_button = ttk.Button(self.master, text=_('Allow'), command=lambda: self.set_result('allow')) self.add_policy_button.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5) - self.never_ask_button = ttk.Button(self.master, text=_('Deny'), command=lambda: self.set_result("deny")) + self.never_ask_button = ttk.Button(self.master, text=_('Deny'), command=lambda: self.set_result('deny')) self.never_ask_button.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5) self.do_nothing_button = ttk.Button(self.master, text=_('Do nothing'), command=self.master.destroy) @@ -162,17 +114,10 @@ However, this feature also introduces security risks, (e.g. privilege escalation This dialog allows you to choose whether you want to enable user namespaces for this application. The application path is {}""".format(self.path)) - more_gui = ShowMoreGUI(self.path, more_info_text, profile_found=False) + # Rule=None so we don't show redundant buttons in ShowMoreGUI. + more_gui = ShowMoreGUI(self.path, more_info_text, None, self.name, profile_found=False) more_gui.show() - def set_result(self, result): - self.result = result - self.master.destroy() - - def show(self): - self.master.mainloop() - return self.result - @staticmethod def show_error_cannot_reload_profile(profile_path, error): ErrorGUI(_('Failed to create or load profile {}\n Error code = {}').format(profile_path, error), False).show() @@ -203,7 +148,7 @@ class ErrorGUI(GUI): self.label.pack() # Create a button to close the dialog - self.button = ttk.Button(self.button_frame, text="OK", command=self.destroy) + self.button = ttk.Button(self.button_frame, text='OK', command=self.destroy) self.button.pack() def destroy(self): diff --git a/utils/apparmor/logparser.py b/utils/apparmor/logparser.py index 305d6361d..2316f7fa1 100644 --- a/utils/apparmor/logparser.py +++ b/utils/apparmor/logparser.py @@ -114,8 +114,7 @@ class ReadLog: def get_event_type(self, e): if e['operation'] == 'exec': - return 'exec' - + return 'file' elif e['class'] and e['class'] == 'namespace': if e['denied_mask'] and e['denied_mask'].startswith('userns_'): return 'userns' @@ -141,7 +140,6 @@ class ReadLog: return 'change_hat' elif e['operation'] == 'change_profile': return 'change_profile' - elif e['operation'] == 'ptrace': return 'ptrace' elif e['operation'] == 'signal': @@ -306,17 +304,10 @@ class ReadLog: if profile != 'null-complain-profile' and not self.profile_exists(profile): return - if e['operation'] == 'exec': - if not e['name']: - raise AppArmorException('exec without executed binary') - - if not e['name2']: - e['name2'] = '' # exec events in enforce mode don't have target=... - - self.hashlog[aamode][full_profile]['exec'][e['name']][e['name2']] = True - return - # TODO: replace all the if conditions with a loop over 'ruletypes' + if e['operation'] == 'exec': + FileRule.hashlog_from_event(self.hashlog[aamode][full_profile]['exec'], e) + return elif e['class'] and e['class'] == 'namespace': if e['denied_mask'].startswith('userns_'): diff --git a/utils/apparmor/rule/file.py b/utils/apparmor/rule/file.py index 267a05f8e..94c1d076d 100644 --- a/utils/apparmor/rule/file.py +++ b/utils/apparmor/rule/file.py @@ -427,6 +427,17 @@ class FileRule(BaseRule): @staticmethod def hashlog_from_event(hl, e): + # FileRule can be of two types, "file" or "exec" + if e['operation'] == 'exec': + if not e['name']: + raise AppArmorException('exec without executed binary') + + if not e['name2']: + e['name2'] = '' # exec events in enforce mode don't have target=... + + hl[e['name']][e['name2']] = True + return + # Map c (create) and d (delete) to w (logging is more detailed than the profile language) dmask = e['denied_mask'] dmask = dmask.replace('c', 'w') @@ -461,13 +472,20 @@ class FileRule(BaseRule): @classmethod def from_hashlog(cls, hl): - for path, owner in BaseRule.generate_rules_from_hashlog(hl, 2): - mode = set(hl[path][owner].keys()) - # logparser sums up multiple log events, so both 'a' and 'w' can be present - if 'a' in mode and 'w' in mode: - mode.remove('a') - yield cls(path, mode, None, FileRule.ALL, owner=owner, log_event=True) - # TODO: check for existing rules with this path, and merge them into one rule + for h1, h2 in BaseRule.generate_rules_from_hashlog(hl, 2): + # FileRule can be either a 'normal' or an 'exec' file rule. These rules are encoded in hashlog differently. + if hl[h1][h2] is True: # Exec Rule + name = h1 + yield FileRule(name, None, FileRule.ANY_EXEC, FileRule.ALL, owner=False, log_event=True) + else: + path = h1 + owner = h2 + mode = set(hl[path][owner].keys()) + # logparser sums up multiple log events, so both 'a' and 'w' can be present + if 'a' in mode and 'w' in mode: + mode.remove('a') + yield cls(path, mode, None, FileRule.ALL, owner=owner, log_event=True) + # TODO: check for existing rules with this path, and merge them into one rule class FileRuleset(BaseRuleset): diff --git a/utils/apparmor/update_profile.py b/utils/apparmor/update_profile.py index 56072c2cb..8d54becc5 100755 --- a/utils/apparmor/update_profile.py +++ b/utils/apparmor/update_profile.py @@ -15,7 +15,7 @@ def create_userns(template_path, name, bin_path, profile_path, decision): with open(template_path, 'r') as f: profile_template = f.read() - rule = 'userns' if decision == 'allow' else 'audit deny userns' + rule = 'userns create' if decision == 'allow' else 'audit deny userns create' profile = profile_template.format(rule=rule, name=name, path=bin_path) with open(profile_path, 'w') as file: diff --git a/utils/notify.conf b/utils/notify.conf index fefb4ad76..1524a45d4 100644 --- a/utils/notify.conf +++ b/utils/notify.conf @@ -13,12 +13,15 @@ show_notifications="yes" # Special profiles used to remove privileges for unconfined binaries using user namespaces. If unsure, leave as is. userns_special_profiles="unconfined,unprivileged_userns" + # Theme to use for aa-notify GUI themes. See https://ttkthemes.readthedocs.io/en/latest/themes.html for available themes. interface_theme="ubuntu" # Binaries for which we ignore userns-related capability denials ignore_denied_capability="sudo,su" +# OPTIONAL - kind of operations which display a popup prompt. +# prompt_filter="userns" # OPTIONAL - restrict using aa-notify to users in the given group # (if not set, everybody who has permissions to read the logfile can use it) diff --git a/utils/test/test-file.py b/utils/test/test-file.py index dcf424d6a..bb7656883 100644 --- a/utils/test/test-file.py +++ b/utils/test/test-file.py @@ -1219,8 +1219,27 @@ class FileGetExecConflictRules_1(AATest): self. assertEqual(conflicts.get_clean(), expected) -class FileModeTest(AATest): - def test_write_append(self): +class FileCreateFromEvent(AATest): + + def test_exec_rule(self): + parser = ReadLog('', '', '') + raw_ev = '[258945.534540] audit: type=1400 audit(1725865139.443:401): apparmor="DENIED" operation="exec" class="file" profile="foo" name="/usr/bin/ls" pid=130888 comm="foo" requested_mask="x" denied_mask="x" fsuid=1000 ouid=0' + hl = hasher() + ev = parser.parse_event(raw_ev) + FileRule.hashlog_from_event(hl, ev) + + expected = {'/usr/bin/ls': {'': True}} + self.assertEqual(hl, expected) + + fr = FileRule.from_hashlog(hl) + + expected = FileRule('/usr/bin/ls', None, FileRule.ANY_EXEC, FileRule.ALL, False) + + self.assertTrue(expected.is_equal(next(fr))) + with self.assertRaises(StopIteration): + next(fr) + + def test_filemode_write_append(self): parser = ReadLog('', '', '') events = [ '[ 9614.885136] audit: type=1400 audit(1720429924.397:191): apparmor="DENIED" operation="open" class="file" profile="/home/user/test/a" name="/home/user/test/foo" pid=24460 comm="a" requested_mask="w" denied_mask="w" fsuid=1000 ouid=1000', @@ -1243,6 +1262,16 @@ class FileModeTest(AATest): next(fr) +class FileInvalidCreateFromEvent(AATest): + def test_write_append(self): + parser = ReadLog('', '', '') + raw_ev = '[258145.094974] audit: type=1400 audit(1725864339.004:396): apparmor="DENIED" operation="exec" class="file" profile="foo" name="" pid=125456 comm="foo" requested_mask="x" denied_mask="x" fsuid=1000 ouid=1000' # Name is empty + hl = hasher() + ev = parser.parse_event(raw_ev) + with self.assertRaises(AppArmorException): + FileRule.hashlog_from_event(hl, ev) + + setup_all_loops(__name__) if __name__ == '__main__': unittest.main(verbosity=1)