diff --git a/utils/aa-notify b/utils/aa-notify index 30b71c369..938450981 100755 --- a/utils/aa-notify +++ b/utils/aa-notify @@ -37,9 +37,11 @@ import os import pwd import re import sys +import tempfile import time import subprocess +from collections import defaultdict import notify2 import psutil @@ -54,7 +56,7 @@ from apparmor.fail import enable_aa_exception_handler from apparmor.notify import get_last_login_timestamp from apparmor.translations import init_translation from apparmor.logparser import ReadLog -from apparmor.gui import UsernsGUI, ErrorGUI, ShowMoreGUI, set_interface_theme +from apparmor.gui import UsernsGUI, ErrorGUI, ShowMoreGUI, ShowMoreGUIAggregated, set_interface_theme from apparmor.rule.file import FileRule from dbus import DBusException @@ -121,7 +123,7 @@ def is_event_in_filter(event, filters): return True -def notify_about_new_entries(logfile, filters, wait=0): +def daemonize(): """Run the notification daemon in the background.""" # Kill other instances of aa-notify if already running for process in psutil.process_iter(): @@ -144,19 +146,9 @@ def notify_about_new_entries(logfile, filters, wait=0): except DBusException: sys.exit(_('Cannot initialize notify2. Please check that your terminal can use a graphical interface')) - try: - thread = threading.Thread(target=start_glib_loop) - thread.daemon = True - thread.start() - - for event in follow_apparmor_events(logfile, wait): - if not is_event_in_filter(event, filters): - continue - debug_logger.info(format_event(event, logfile)) - yield event, format_event(event, logfile) - except PermissionError: - sys.exit(_("ERROR: Cannot read {}. Please check permissions.").format(logfile)) - + thread = threading.Thread(target=start_glib_loop) + thread.daemon = True + thread.start() else: print(_('Notification emitter started in the background')) # pids = (os.getpid(), newpid) @@ -164,6 +156,17 @@ def notify_about_new_entries(logfile, filters, wait=0): os._exit(0) # Exit child without calling exit handlers etc +def notify_about_new_entries(logfile, filters, wait=0): + try: + for event in follow_apparmor_events(logfile, wait): + if not is_event_in_filter(event, filters): + continue + debug_logger.info(format_event(event, logfile)) + yield event, format_event(event, logfile) + except PermissionError: + sys.exit(_("ERROR: Cannot read {}. Please check permissions.").format(logfile)) + + def show_entries_since_epoch(logfile, epoch_since, filters): """Show AppArmor notifications since given timestamp.""" count = 0 @@ -292,6 +295,22 @@ def reopen_logfile_if_needed(logfile, logdata, log_inode, log_size): return (logdata, log_inode, log_size) +def get_apparmor_events_return(logfile, since=0): + """Read audit events from log source and return all relevant events.""" + out = [] + # Get logdata from file + # @TODO Implement more log sources in addition to just the logfile + try: + with open_file_read(logfile) as logdata: + for event in parse_logdata(logdata): + if event.epoch > since: + out.append(event) + + return out + except PermissionError: + sys.exit(_("ERROR: Cannot read {}. Please check permissions.".format(logfile))) + + def get_apparmor_events(logfile, since=0): """Read audit events from log source and yield all relevant events.""" @@ -378,6 +397,10 @@ def drop_privileges(): os.setegid(int(next_gid)) os.seteuid(int(next_uid)) + # sudo does not preserve DBUS address, so we need to guess it based on UID + if 'DBUS_SESSION_BUS_ADDRESS' not in os.environ: + os.environ['DBUS_SESSION_BUS_ADDRESS'] = 'unix:path=/run/user/{}/bus'.format(os.geteuid()) + def raise_privileges(): """Raise privileges of process. @@ -477,77 +500,94 @@ def ask_for_user_ns_denied(path, name, interactive=True): debug_logger.debug('No action from the user for {}'.format(path)) -def prompt_userns(ev, special_profiles): - """If the user namespace creation denial was generated by an unconfined binary, displays a graphical notification. - Creates a new profile to allow userns if the user wants it. Returns whether a notification was displayed to the user - """ - if not is_special_profile_userns(ev, special_profiles): - return False - +def can_leverage_userns_event(ev): if ev['execpath'] is None: - UsernsGUI.show_error_cannot_find_execpath(ev['comm'], os.path.dirname(os.path.abspath(__file__)) + '/default_unconfined.template') - return True + return 'error_cannot_find_path' aa.update_profiles() if aa.get_profile_filename_from_profile_name(ev['comm']): - # There is already a profile with this name: we show an error to the user. - # We could use the full path as profile name like for the old profiles if we want to handle this case - # but if execpath is not supported by the kernel it could also mean that we inferred a bad path - # So we do nothing beyond showing this error. - ErrorGUI( - _('Application {profile} tried to create an user namespace, but a profile already exists with this name.\n' - 'This is likely because there is several binaries named {profile} thus the path inferred by AppArmor ({inferred_path}) is not correct.\n' - 'You should review your profiles (in {profile_dir}).').format(profile=ev['comm'], inferred_path=ev['execpath'], profile_dir=aa.profile_dir), - False).show() - return True + return 'error_userns_profile_exists' + return 'ok' - ask_for_user_ns_denied(ev['execpath'], ev['comm']) - return True +def prompt_userns(ev): + """If the user namespace creation denial was generated by an unconfined binary, displays a graphical notification. + Creates a new profile to allow userns if the user wants it. Returns whether a notification was displayed to the user + """ + match can_leverage_userns_event(ev): + case 'error_cannot_find_path': + UsernsGUI.show_error_cannot_find_execpath(ev['comm'], os.path.dirname(os.path.abspath(__file__)) + '/default_unconfined.template') + case 'error_userns_profile_exists': + # There is already a profile with this name: we show an error to the user. + # We could use the full path as profile name like for the old profiles if we want to handle this case + # but if execpath is not supported by the kernel it could also mean that we inferred a bad path + # So we do nothing beyond showing this error. + ErrorGUI( + _('Application {profile} tried to create an user namespace, but a profile already exists with this name.\n' + 'This is likely because there is several binaries named {profile} thus the path inferred by AppArmor ({inferred_path}) is not correct.\n' + 'You should review your profiles (in {profile_dir}).').format(profile=ev['comm'], inferred_path=ev['execpath'], profile_dir=aa.profile_dir), + False).show() + case 'ok': + ask_for_user_ns_denied(ev['execpath'], ev['comm']) + + +def get_more_info_about_event(rl, ev, special_profiles, header='', get_clean_rule=False): + out = header + clean_rule = None + + for key, value in ev.items(): + if value: + out += '\t{} = {}\n'.format(_(key), value) + + out += _('\nThe software that declined this operation is {}\n').format(ev['profile']) + + rule = rl.create_rule_from_ev(ev) + + if rule: + if type(rule) is FileRule and rule.exec_perms == FileRule.ANY_EXEC: + rule.exec_perms = 'Pix' + aa.update_profiles() + if customized_message['userns']['cond'](ev, special_profiles): + profile_path = None + out += _('You may allow it through a dedicated unconfined profile for {}.').format(ev['comm']) + match can_leverage_userns_event(ev): + case 'error_cannot_find_path': + clean_rule = _('# You may allow it through a dedicated unconfined profile for {0}. However, apparmor cannot find {0}. If you want to allow it, please create a profile for it manually.').format(ev['comm']) + case 'error_userns_profile_exists': + clean_rule = _('# You may allow it through a dedicated unconfined profile for {} ({}). However, a profile already exists with this name. If you want to allow it, please create a profile for it manually.').format(ev['comm'], ev['execpath']) + case 'ok': + clean_rule = _('# You may allow it through a dedicated unconfined profile for {} ({})').format(ev['comm'], ev['execpath']) + else: + profile_path = aa.get_profile_filename_from_profile_name(ev['profile']) + clean_rule = rule.get_clean() + if profile_path: + out += _('If you want to allow this operation you can add the line below in profile {}\n').format(profile_path) + out += clean_rule + else: + out += _('However {profile} is not in {profile_dir}\nIt is likely that the profile was not stored in {profile_dir} or was removed.\n').format(profile=ev['profile'], profile_dir=aa.profile_dir) + else: # Should not happen + out += _('ERROR: Could not create rule from event.') + profile_path = None + + if get_clean_rule: + return out, profile_path, clean_rule + else: + return out, profile_path # TODO reuse more code from aa-logprof in callbacks def cb_more_info(notification, action, _args): - (raw_ev, rl, special_profiles) = _args + (ev, rl, special_profiles) = _args notification.close() - parsed_event = rl.parse_record(raw_ev) - out = _('Operation denied by AppArmor\n\n') + out, profile_path, clean_rule = get_more_info_about_event(rl, ev, special_profiles, _('Operation denied by AppArmor\n\n'), get_clean_rule=True) - for key, value in parsed_event.items(): - if value: - out += '\t{} = {}\n'.format(_(key), value) - - out += _('\nThe software that declined this operation is {}\n').format(parsed_event['profile']) - - rule = rl.create_rule_from_ev(parsed_event) - - # Exec events are created with the default FileRule.ANY_EXEC. We use Pix for actual rules - if type(rule) is FileRule and rule.exec_perms == FileRule.ANY_EXEC: - rule.exec_perms = 'Pix' - - if rule: - aa.update_profiles() - if customized_message['userns']['cond'](parsed_event, special_profiles): - profile_path = None - out += _('You may allow it through a dedicated unconfined profile for {}.').format(parsed_event['comm']) - else: - profile_path = aa.get_profile_filename_from_profile_name(parsed_event['profile']) - if profile_path: - out += _('If you want to allow this operation you can add the line below in profile {}\n').format(profile_path) - out += rule.get_clean() - else: - out += _('However {profile} is not in {profile_dir}\nIt is likely that the profile was not stored in {profile_dir} or was removed.\n').format(profile=parsed_event['profile'], profile_dir=aa.profile_dir) - else: # Should not happen - out += _('ERROR: Could not create rule from event.') - return - - ans = ShowMoreGUI(profile_path, out, rule.get_clean(), parsed_event['profile'], profile_path is not None).show() + ans = ShowMoreGUI(profile_path, out, clean_rule, ev['profile'], profile_path is not None).show() if ans == 'add_rule': - add_to_profile(rule.get_clean(), parsed_event['profile']) + add_to_profile(clean_rule, ev['profile']) elif ans in {'allow', 'deny'}: - create_userns_profile(parsed_event['comm'], parsed_event['execpath'], ans) + create_userns_profile(ev['comm'], ev['execpath'], ans) def add_to_profile(rule, profile_name): @@ -573,12 +613,67 @@ def add_to_profile(rule, profile_name): ErrorGUI(_('Failed to add rule {rule} to {profile}\nError code = {retcode}').format(rule=rule, profile=profile_name, retcode=e.returncode), False).show() -def cb_add_to_profile(notification, action, _args): - (raw_ev, rl, special_profiles) = _args - notification.close() - parsed_event = rl.parse_record(raw_ev) +def create_from_file(file_path): + update_profile_path = update_profile.__file__ + command = ['pkexec', '--keep-cwd', update_profile_path, 'from_file', file_path] + try: + subprocess.run(command, check=True) + except subprocess.CalledProcessError as e: + if e.returncode != 126: # return code 126 means the user cancelled the request + ErrorGUI(_('Failed to add some rules'), False).show() - rule = rl.create_rule_from_ev(parsed_event) + +def allow_all(clean_rules): + local_template_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'default_unconfined.template') + if os.path.exists(local_template_path): # We are using local aa-notify -> we use local template + template_path = local_template_path + else: + template_path = aa.CONFDIR + '/default_unconfined.template' + + tmp = tempfile.NamedTemporaryFile() + with open(tmp.name, mode='w') as f: + profile = None + for line in clean_rules.splitlines(): + if line == '': + continue + elif line[0] == '#': + profile = None + pass + elif line[0] != '\t': + profile = line[8:-1] # 8:-1 is to remove 'profile ' and ':' + else: + if line[1] == '#': # Add to userns + if line[-1] == '.': # '.' <==> There is an error: we cannot add the profile automatically + continue + profile_name = line.split()[-2] # line always finishes by + bin_path = line.split()[-1][1:-1] # 1:-1 to remove the parenthesis + profile_path = aa.get_profile_filename_from_profile_name(profile_name, True) + if not profile_path: + ErrorGUI(_('Cannot get profile path for {}.').format(profile_name), False).show() + continue + f.write('create_userns\t{}\t{}\t{}\t{}\t{}\n'.format(template_path, profile_name, bin_path, profile_path, 'allow')) + else: + if profile is not None: + f.write('add_rule\t{}\t{}\n'.format(line[1:], profile)) + else: + print(_("Rule {} cannot be added automatically").format(line[1:]), file=sys.stdout) + + create_from_file(tmp.name) + + +# TODO reuse more code from aa-logprof in callbacks +def cb_more_info_aggregated(notification, action, _args): + (to_display, aggregated, clean_rules) = _args + res = ShowMoreGUIAggregated(to_display, aggregated, clean_rules).show() + if res == 'allow_all': + allow_all(clean_rules) + + +def cb_add_to_profile(notification, action, _args): + (ev, rl, special_profiles) = _args + notification.close() + + rule = rl.create_rule_from_ev(ev) # Exec events are created with the default FileRule.ANY_EXEC. We use Pix for actual rules if type(rule) is FileRule and rule.exec_perms == FileRule.ANY_EXEC: @@ -590,10 +685,10 @@ def cb_add_to_profile(notification, action, _args): aa.update_profiles() - if customized_message['userns']['cond'](parsed_event, special_profiles): - ask_for_user_ns_denied(parsed_event['execpath'], parsed_event['comm'], False) + if customized_message['userns']['cond'](ev, special_profiles): + ask_for_user_ns_denied(ev['execpath'], ev['comm'], False) else: - add_to_profile(rule.get_clean(), parsed_event['profile']) + add_to_profile(rule.get_clean(), ev['profile']) customized_message = { @@ -616,6 +711,83 @@ def start_glib_loop(): loop.run() +def aggregate_event(agg, ev, keys_to_aggregate): + profile = ev['profile'] + agg[profile]['count'] += 1 + agg[profile]['events'].append(ev) + + for key in keys_to_aggregate: + if key in ev: + value = ev[key] + agg[profile]['values'][key][value] += 1 + + return agg + + +def get_aggregated(rl, agg, max_nb_profiles, keys_to_aggregate, special_profiles): + notification = '' + summary = '' + more_info = '' + clean_rules = '' + summary = _('Notifications were raised for profiles: {}\n').format(', '.join(list(agg.keys()))) + + sorted_profiles = sorted(agg.items(), key=lambda item: item[1]['count'], reverse=True) + for profile, data in sorted_profiles: + profile_notif = _('profile: {} — {} events\n').format(profile, data['count']) + notification += profile_notif + summary += profile_notif + if len(agg) <= max_nb_profiles: + for key in keys_to_aggregate: + if key in data['values']: + total_key_events = sum(data['values'][key].values()) + sorted_values = sorted(data['values'][key].items(), key=lambda item: item[1], reverse=True) + for value, count in sorted_values: + percent = (count / total_key_events) * 100 + if percent >= 20: # We exclude rare cases for clarity. 20% is arbitrary + summary += _('\t{} was {} {:.1f}% of the time\n').format(key, value, percent) + summary += '\n' + + more_info += _('profile {}, {} events\n').format(profile, data['count']) + rules_for_profiles = set() + found_profile = True + for i, ev in enumerate(data['events']): + more_info_rule, profile_path, clean_rule = get_more_info_about_event(rl, ev, special_profiles, _(' - Event {} -\n').format(i + 1), get_clean_rule=True) + if i != 0: + more_info += '\n\n' + if not profile_path: + found_profile = False + more_info += more_info_rule + if clean_rule: + rules_for_profiles.add(clean_rule) + if rules_for_profiles != set(): + if profile not in special_profiles: + if found_profile: + clean_rules += _('profile {}:').format(profile) + else: + clean_rules += _('# Unknown profile {}').format(profile) + else: + clean_rules += _('# unprivileged userns denials ({}):').format(profile) + clean_rules += '\n\t' + '\n\t'.join(rules_for_profiles) + '\n' + return notification, summary, more_info, clean_rules + + +def display_notification(ev, rl, message, userns_special_profiles): + message = customize_notification_message(ev, message, userns_special_profiles) + n = notify2.Notification(_('AppArmor security notice'), message, 'gtk-dialog-warning') + if can_allow_rule(ev, userns_special_profiles): + n.add_action('clicked', 'Allow', cb_add_to_profile, (ev, rl, userns_special_profiles)) + n.add_action('more_clicked', 'Show More', cb_more_info, (ev, rl, userns_special_profiles)) + + n.show() + + +def display_aggregated_notification(rl, aggregated, maximum_number_notification_profiles, keys_to_aggregate, special_profiles): + notification, summary, more_info, clean_rules = get_aggregated(rl, aggregated, maximum_number_notification_profiles, keys_to_aggregate, special_profiles) + n = notify2.Notification(_('AppArmor security notice'), notification, 'gtk-dialog-warning') + n.add_action('more_aggregated_clicked', 'Show More Info', cb_more_info_aggregated, (summary, more_info, clean_rules)) + n.show() + + def main(): """Run aa-notify. @@ -652,6 +824,7 @@ def main(): parser.add_argument('-v', '--verbose', action='store_true', help=_('show messages with stats')) parser.add_argument('-u', '--user', type=str, help=_('user to drop privileges to when not using sudo')) parser.add_argument('-w', '--wait', type=int, metavar=('NUM'), help=_('wait NUM seconds before displaying notifications (with -p)')) + parser.add_argument('-m', '--merge-notifications', action='store_true', help=_('Merge notification for improved readability (with -p)')) parser.add_argument('--prompt-filter', type=str, metavar=('PF'), help=_('kind of operations which display a popup prompt')) parser.add_argument('--debug', action='store_true', help=_('debug mode')) parser.add_argument('--configdir', type=str, help=argparse.SUPPRESS) @@ -736,6 +909,8 @@ def main(): - ignore_denied_capability - interface_theme - prompt_filter + - maximum_number_notification_profiles + - keys_to_aggregate - filter.profile, - filter.operation, - filter.name, @@ -756,6 +931,8 @@ def main(): 'show_notifications', 'message_body', 'message_footer', + 'maximum_number_notification_profiles', + 'keys_to_aggregate', 'filter.profile', 'filter.operation', 'filter.name', @@ -852,6 +1029,16 @@ def main(): if unsupported: sys.exit(_('ERROR: using an unsupported prompt filter: {}\nSupported values: {}').format(', '.join(unsupported), ', '.join(supported_prompt_filter))) + if 'maximum_number_notification_profiles' in config['']: + maximum_number_notification_profiles = int(config['']['maximum_number_notification_profiles'].strip()) + else: + maximum_number_notification_profiles = 2 + + if 'keys_to_aggregate' in config['']: + keys_to_aggregate = config['']['keys_to_aggregate'].strip().split(',') + else: + keys_to_aggregate = {'operation', 'class', 'name', 'denied', 'target'} + if args.file: logfile = args.file elif os.path.isfile('/var/run/auditd.pid') and os.path.isfile('/var/log/audit/audit.log'): @@ -888,49 +1075,65 @@ def main(): # Initialize the list of profiles for can_allow_rule aa.read_profiles() - # At this point this script needs to be able to read 'logfile' but once - # the for loop starts, privileges can be dropped since the file descriptor - # has been opened and access granted. Further reads of the file will not - # trigger any new permission checks. - # @TODO Plan to catch PermissionError here or..? - for (event, message) in notify_about_new_entries(logfile, filters, args.wait): - ev = rl.parse_record(event) + drop_privileges() + daemonize() + raise_privileges() - # @TODO redo special behaviours with a more regular function - # We ignore capability denials for binaries in ignore_denied_capability - if ev['operation'] == 'capable' and ev['comm'] in ignore_denied_capability: - continue + if args.merge_notifications: + if not args.wait or args.wait == 0: + args.wait = 5 - # Special behaivor for userns: - if args.prompt_filter and 'userns' in args.prompt_filter and customized_message['userns']['cond'](ev, userns_special_profiles): - if prompt_userns(ev, userns_special_profiles): + old_time = int(time.time()) + while True: + + raw_evs = get_apparmor_events_return(logfile, old_time) + drop_privileges() + + if len(raw_evs) == 1: # Single event: we handle it without aggregation + raw_ev = raw_evs[0] + ev = rl.parse_record(raw_ev) + display_notification(ev, rl, format_event(raw_ev, logfile), userns_special_profiles) + elif len(raw_evs) > 1: + aggregated = defaultdict(lambda: {'count': 0, 'values': defaultdict(lambda: defaultdict(int)), 'events': []}) + for raw_ev in raw_evs: + ev = rl.parse_record(raw_ev) + aggregate_event(aggregated, ev, keys_to_aggregate) + display_aggregated_notification(rl, aggregated, maximum_number_notification_profiles, keys_to_aggregate, userns_special_profiles) + + old_time = int(time.time()) + + # When notification is sent, raise privileged back to root if the + # original effective user id was zero (to be able to read AppArmor logs) + raise_privileges() + time.sleep(args.wait) + else: + # At this point this script needs to be able to read 'logfile' but once + # the for loop starts, privileges can be dropped since the file descriptor + # has been opened and access granted. Further reads of the file will not + # trigger any new permission checks. + # @TODO Plan to catch PermissionError here or..? + for (event, message) in notify_about_new_entries(logfile, filters, args.wait): + ev = rl.parse_record(event) + + # @TODO redo special behaviours with a more regular function + # We ignore capability denials for binaries in ignore_denied_capability + if ev['operation'] == 'capable' and ev['comm'] in ignore_denied_capability: + continue + + # Special behaivor for userns: + if args.prompt_filter and 'userns' in args.prompt_filter and customized_message['userns']['cond'](ev, userns_special_profiles): + prompt_userns(ev) continue # Notification already displayed for this event, we go to the next one. - # Notifications should not be run as root, since root probably is - # the wrong desktop user and not the one getting the notifications. - drop_privileges() + # Notifications should not be run as root, since root probably is + # the wrong desktop user and not the one getting the notifications. + drop_privileges() - # sudo does not preserve DBUS address, so we need to guess it based on UID - if 'DBUS_SESSION_BUS_ADDRESS' not in os.environ: - os.environ['DBUS_SESSION_BUS_ADDRESS'] = 'unix:path=/run/user/{}/bus'.format(os.geteuid()) + display_notification(ev, rl, message, userns_special_profiles) - message = customize_notification_message(ev, message, userns_special_profiles) - - n = notify2.Notification( - _('AppArmor security notice'), - message, - 'gtk-dialog-warning' - ) - - if can_allow_rule(ev, userns_special_profiles): - n.add_action('clicked', 'Allow', cb_add_to_profile, (event, rl, userns_special_profiles)) - n.add_action('more_clicked', 'Show More', cb_more_info, (event, rl, userns_special_profiles)) - - n.show() - - # When notification is sent, raise privileged back to root if the - # original effective user id was zero (to be able to read AppArmor logs) - raise_privileges() + # When notification is sent, raise privileged back to root if the + # original effective user id was zero (to be able to read AppArmor logs) + raise_privileges() elif args.since_last: show_entries_since_last_login(logfile, filters) diff --git a/utils/apparmor/gui.py b/utils/apparmor/gui.py index 9fbf265ec..b3522d50c 100644 --- a/utils/apparmor/gui.py +++ b/utils/apparmor/gui.py @@ -2,7 +2,12 @@ import os import tkinter as tk import tkinter.ttk as ttk import subprocess -import ttkthemes + +try: # We use tk without themes as a fallback which makes the GUI uglier but functional. + import ttkthemes +except ImportError: + ttkthemes = None + import apparmor.aa as aa @@ -26,17 +31,17 @@ class GUI: os._exit(1) self.result = None - style = ttkthemes.ThemedStyle(self.master) - style.theme_use(interface_theme) - self.bg_color = style.lookup('TLabel', 'background') - self.fg_color = style.lookup('TLabel', 'foreground') - self.master.configure(background=self.bg_color) - + if ttkthemes: + style = ttkthemes.ThemedStyle(self.master) + style.theme_use(interface_theme) + self.bg_color = style.lookup('TLabel', 'background') + self.fg_color = style.lookup('TLabel', 'foreground') + self.master.configure(background=self.bg_color) self.label_frame = ttk.Frame(self.master, padding=(20, 10)) self.label_frame.pack() - self.button_frame = ttk.Frame(self.master, padding=(0, 10)) - self.button_frame.pack() + self.button_frame = ttk.Frame(self.master, padding=(10, 10)) + self.button_frame.pack(fill='x', expand=True) def show(self): self.master.mainloop() @@ -59,8 +64,9 @@ class ShowMoreGUI(GUI): self.master.title(_('AppArmor - More info')) - self.label = tk.Label(self.label_frame, background=self.bg_color, foreground=self.fg_color, - text=self.msg, anchor='w', justify=tk.LEFT, wraplength=460) + self.label = tk.Label(self.label_frame, text=self.msg, anchor='w', justify=tk.LEFT, wraplength=460) + if ttkthemes: + self.label.configure(background=self.bg_color, foreground=self.fg_color) self.label.pack(pady=(0, 10) if not self.profile_found else (0, 0)) if self.profile_found: @@ -80,6 +86,75 @@ class ShowMoreGUI(GUI): self.do_nothing_button.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5) +class ShowMoreGUIAggregated(GUI): + def __init__(self, summary, detailed_text, clean_rules): + self.summary = summary + self.detailed_text = detailed_text + self.clean_rules = clean_rules + + self.states = { + 'summary': { + 'msg': self.summary, + 'btn_left': _('Show more details'), + 'btn_right': _('Show rules only') + }, + 'detailed': { + 'msg': self.detailed_text, + 'btn_left': _('Show summary'), + 'btn_right': _('Show rules only') + }, + 'rules_only': { + 'msg': self.clean_rules, + 'btn_left': _('Show more details'), + 'btn_right': _('Show summary') + } + } + + self.state = 'rules_only' + + super().__init__() + + self.master.title(_('AppArmor - More info')) + + self.text_display = tk.Text(self.label_frame, height=40, width=100, wrap='word') + if ttkthemes: + self.text_display.configure(background=self.bg_color, foreground=self.fg_color) + self.text_display.insert('1.0', self.states[self.state]['msg']) + self.text_display['state'] = 'disabled' + + self.scrollbar = ttk.Scrollbar(self.label_frame, command=self.text_display.yview) + self.text_display['yscrollcommand'] = self.scrollbar.set + + self.scrollbar.pack(side='right', fill='y') + self.text_display.pack(side='left', fill='both', expand=True) + + self.btn_left = ttk.Button(self.button_frame, text=self.states[self.state]['btn_left'], width=1, command=lambda: self.change_view('btn_left')) + self.btn_left.grid(row=0, column=0, padx=5, pady=5, sticky="ew") + + self.btn_right = ttk.Button(self.button_frame, text=self.states[self.state]['btn_right'], width=1, command=lambda: self.change_view('btn_right')) + self.btn_right.grid(row=0, column=1, padx=5, pady=5, sticky="ew") + + self.btn_allow_all = ttk.Button(self.button_frame, text="Allow All", width=1, command=lambda: self.set_result('allow_all')) + self.btn_allow_all.grid(row=0, column=2, padx=5, pady=5, sticky="ew") + + for i in range(3): + self.button_frame.grid_columnconfigure(i, weight=1) + + def change_view(self, action): + + if action == 'btn_left': + self.state = 'detailed' if self.state != 'detailed' else 'summary' + elif action == 'btn_right': + self.state = 'rules_only' if self.state != 'rules_only' else 'summary' + + self.btn_left['text'] = self.states[self.state]['btn_left'] + self.btn_right['text'] = self.states[self.state]['btn_right'] + self.text_display['state'] = 'normal' + self.text_display.delete('1.0', 'end') + self.text_display.insert('1.0', self.states[self.state]['msg']) + self.text_display['state'] = 'disabled' + + class UsernsGUI(GUI): def __init__(self, name, path): self.name = name @@ -143,11 +218,11 @@ class ErrorGUI(GUI): self.master.title('AppArmor Error') - # Create label to display the error message - self.label = ttk.Label(self.label_frame, background=self.bg_color, text=self.msg, wraplength=460) + self.label = ttk.Label(self.label_frame, text=self.msg, wraplength=460) + if ttkthemes: + self.label.configure(background=self.bg_color) self.label.pack() - # Create a button to close the dialog self.button = ttk.Button(self.button_frame, text='OK', command=self.destroy) self.button.pack() diff --git a/utils/apparmor/update_profile.py b/utils/apparmor/update_profile.py index 8d54becc5..5c688e150 100755 --- a/utils/apparmor/update_profile.py +++ b/utils/apparmor/update_profile.py @@ -29,7 +29,7 @@ def create_userns(template_path, name, bin_path, profile_path, decision): def add_to_profile(rule, profile_name): aa.init_aa() - aa.read_profiles() + aa.update_profiles() rule_type, rule_class = ReadLog('', '', '').get_rule_type(rule) @@ -48,32 +48,51 @@ def usage(is_help): print('This tool is a low level tool - do not use it directly') print('{} create_userns '.format(sys.argv[0])) print('{} add_rule '.format(sys.argv[0])) - + print('{} from_file '.format(sys.argv[0])) if is_help: exit(0) else: exit(1) +def create_from_file(file_path): + with open(file_path) as file: + for line in file: + args = line[:-1].split('\t') + if len(args) > 1: + command = args[0] + else: + command = None # Handle the case where no command is provided + do_command(command, args) + + +def do_command(command, args): + match command: + case 'from_file': + if not len(args) == 2: + usage(False) + create_from_file(args[1]) + case 'create_userns': + if not len(args) == 6: + usage(False) + create_userns(args[1], args[2], args[3], args[4], args[5]) + case 'add_rule': + if not len(args) == 3: + usage(False) + add_to_profile(args[1], args[2]) + case 'help': + usage(True) + case _: + usage(False) + + def main(): if len(sys.argv) > 1: command = sys.argv[1] else: command = None # Handle the case where no command is provided - match command: - case 'create_userns': - if not len(sys.argv) == 7: - usage(False) - create_userns(sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5], sys.argv[6]) - case 'add_rule': - if not len(sys.argv) == 4: - usage(False) - add_to_profile(sys.argv[2], sys.argv[3]) - case 'help': - usage(True) - case _: - usage(False) + do_command(command, sys.argv[1:]) if __name__ == '__main__': diff --git a/utils/com.ubuntu.pkexec.aa-notify.policy b/utils/com.ubuntu.pkexec.aa-notify.policy index 1b0d19501..8920a2962 100644 --- a/utils/com.ubuntu.pkexec.aa-notify.policy +++ b/utils/com.ubuntu.pkexec.aa-notify.policy @@ -26,5 +26,16 @@ {LIB_PATH}apparmor/update_profile.py create_userns + + AppArmor: Modifying profile from file + To modify an AppArmor security profile from file, you need to authenticate. + + auth_admin + auth_admin + auth_admin + + {LIB_PATH}apparmor/update_profile.py + from_file + diff --git a/utils/notify.conf b/utils/notify.conf index 1524a45d4..a6145c643 100644 --- a/utils/notify.conf +++ b/utils/notify.conf @@ -23,6 +23,12 @@ ignore_denied_capability="sudo,su" # OPTIONAL - kind of operations which display a popup prompt. # prompt_filter="userns" +# OPTIONAL - Maximum number of profile that can send notification before they are merged +# maximum_number_notification_profiles=2 + +# OPTIONAL - Keys to aggregate when merging events +# keys_to_aggregate="operation,class,name,denied,target" + # OPTIONAL - restrict using aa-notify to users in the given group # (if not set, everybody who has permissions to read the logfile can use it) # use_group="admin" diff --git a/utils/test/test-aa-notify.py b/utils/test/test-aa-notify.py index d706edf2e..e6a1b8fba 100644 --- a/utils/test/test-aa-notify.py +++ b/utils/test/test-aa-notify.py @@ -184,7 +184,7 @@ class AANotifyTest(AANotifyBase): expected_return_code = 0 expected_output_1 = \ '''usage: aa-notify [-h] [-p] [--display DISPLAY] [-f FILE] [-l] [-s NUM] [-v] - [-u USER] [-w NUM] [--prompt-filter PF] [--debug] + [-u USER] [-w NUM] [-m] [--prompt-filter PF] [--debug] [--filter.profile PROFILE] [--filter.operation OPERATION] [--filter.name NAME] [--filter.denied DENIED] [--filter.family FAMILY] [--filter.socket SOCKET] @@ -207,6 +207,8 @@ Display AppArmor notifications or messages for DENIED entries. -u USER, --user USER user to drop privileges to when not using sudo -w NUM, --wait NUM wait NUM seconds before displaying notifications (with -p) + -m, --merge-notifications + Merge notification for improved readability (with -p) --prompt-filter PF kind of operations which display a popup prompt --debug debug mode