aa-notify: Adding support for merging notification.

This commit is contained in:
Maxime Bélair 2024-11-26 18:35:37 +00:00 committed by Georgia Garcia
parent e9d6e0ba14
commit f63fcdc8d2
6 changed files with 461 additions and 145 deletions

View file

@ -37,9 +37,11 @@ import os
import pwd
import re
import sys
import tempfile
import time
import subprocess
from collections import defaultdict
import notify2
import psutil
@ -54,7 +56,7 @@ from apparmor.fail import enable_aa_exception_handler
from apparmor.notify import get_last_login_timestamp
from apparmor.translations import init_translation
from apparmor.logparser import ReadLog
from apparmor.gui import UsernsGUI, ErrorGUI, ShowMoreGUI, set_interface_theme
from apparmor.gui import UsernsGUI, ErrorGUI, ShowMoreGUI, ShowMoreGUIAggregated, set_interface_theme
from apparmor.rule.file import FileRule
from dbus import DBusException
@ -121,7 +123,7 @@ def is_event_in_filter(event, filters):
return True
def notify_about_new_entries(logfile, filters, wait=0):
def daemonize():
"""Run the notification daemon in the background."""
# Kill other instances of aa-notify if already running
for process in psutil.process_iter():
@ -144,11 +146,18 @@ def notify_about_new_entries(logfile, filters, wait=0):
except DBusException:
sys.exit(_('Cannot initialize notify2. Please check that your terminal can use a graphical interface'))
try:
thread = threading.Thread(target=start_glib_loop)
thread.daemon = True
thread.start()
else:
print(_('Notification emitter started in the background'))
# pids = (os.getpid(), newpid)
# print("parent: %d, child: %d\n" % pids)
os._exit(0) # Exit child without calling exit handlers etc
def notify_about_new_entries(logfile, filters, wait=0):
try:
for event in follow_apparmor_events(logfile, wait):
if not is_event_in_filter(event, filters):
continue
@ -157,12 +166,6 @@ def notify_about_new_entries(logfile, filters, wait=0):
except PermissionError:
sys.exit(_("ERROR: Cannot read {}. Please check permissions.").format(logfile))
else:
print(_('Notification emitter started in the background'))
# pids = (os.getpid(), newpid)
# print("parent: %d, child: %d\n" % pids)
os._exit(0) # Exit child without calling exit handlers etc
def show_entries_since_epoch(logfile, epoch_since, filters):
"""Show AppArmor notifications since given timestamp."""
@ -292,6 +295,22 @@ def reopen_logfile_if_needed(logfile, logdata, log_inode, log_size):
return (logdata, log_inode, log_size)
def get_apparmor_events_return(logfile, since=0):
"""Read audit events from log source and return all relevant events."""
out = []
# Get logdata from file
# @TODO Implement more log sources in addition to just the logfile
try:
with open_file_read(logfile) as logdata:
for event in parse_logdata(logdata):
if event.epoch > since:
out.append(event)
return out
except PermissionError:
sys.exit(_("ERROR: Cannot read {}. Please check permissions.".format(logfile)))
def get_apparmor_events(logfile, since=0):
"""Read audit events from log source and yield all relevant events."""
@ -378,6 +397,10 @@ def drop_privileges():
os.setegid(int(next_gid))
os.seteuid(int(next_uid))
# sudo does not preserve DBUS address, so we need to guess it based on UID
if 'DBUS_SESSION_BUS_ADDRESS' not in os.environ:
os.environ['DBUS_SESSION_BUS_ADDRESS'] = 'unix:path=/run/user/{}/bus'.format(os.geteuid())
def raise_privileges():
"""Raise privileges of process.
@ -477,20 +500,25 @@ def ask_for_user_ns_denied(path, name, interactive=True):
debug_logger.debug('No action from the user for {}'.format(path))
def prompt_userns(ev, special_profiles):
"""If the user namespace creation denial was generated by an unconfined binary, displays a graphical notification.
Creates a new profile to allow userns if the user wants it. Returns whether a notification was displayed to the user
"""
if not is_special_profile_userns(ev, special_profiles):
return False
def can_leverage_userns_event(ev):
if ev['execpath'] is None:
UsernsGUI.show_error_cannot_find_execpath(ev['comm'], os.path.dirname(os.path.abspath(__file__)) + '/default_unconfined.template')
return True
return 'error_cannot_find_path'
aa.update_profiles()
if aa.get_profile_filename_from_profile_name(ev['comm']):
return 'error_userns_profile_exists'
return 'ok'
def prompt_userns(ev):
"""If the user namespace creation denial was generated by an unconfined binary, displays a graphical notification.
Creates a new profile to allow userns if the user wants it. Returns whether a notification was displayed to the user
"""
match can_leverage_userns_event(ev):
case 'error_cannot_find_path':
UsernsGUI.show_error_cannot_find_execpath(ev['comm'], os.path.dirname(os.path.abspath(__file__)) + '/default_unconfined.template')
case 'error_userns_profile_exists':
# There is already a profile with this name: we show an error to the user.
# We could use the full path as profile name like for the old profiles if we want to handle this case
# but if execpath is not supported by the kernel it could also mean that we inferred a bad path
@ -500,54 +528,66 @@ def prompt_userns(ev, special_profiles):
'This is likely because there is several binaries named {profile} thus the path inferred by AppArmor ({inferred_path}) is not correct.\n'
'You should review your profiles (in {profile_dir}).').format(profile=ev['comm'], inferred_path=ev['execpath'], profile_dir=aa.profile_dir),
False).show()
return True
case 'ok':
ask_for_user_ns_denied(ev['execpath'], ev['comm'])
return True
def get_more_info_about_event(rl, ev, special_profiles, header='', get_clean_rule=False):
out = header
clean_rule = None
for key, value in ev.items():
if value:
out += '\t{} = {}\n'.format(_(key), value)
out += _('\nThe software that declined this operation is {}\n').format(ev['profile'])
rule = rl.create_rule_from_ev(ev)
if rule:
if type(rule) is FileRule and rule.exec_perms == FileRule.ANY_EXEC:
rule.exec_perms = 'Pix'
aa.update_profiles()
if customized_message['userns']['cond'](ev, special_profiles):
profile_path = None
out += _('You may allow it through a dedicated unconfined profile for {}.').format(ev['comm'])
match can_leverage_userns_event(ev):
case 'error_cannot_find_path':
clean_rule = _('# You may allow it through a dedicated unconfined profile for {0}. However, apparmor cannot find {0}. If you want to allow it, please create a profile for it manually.').format(ev['comm'])
case 'error_userns_profile_exists':
clean_rule = _('# You may allow it through a dedicated unconfined profile for {} ({}). However, a profile already exists with this name. If you want to allow it, please create a profile for it manually.').format(ev['comm'], ev['execpath'])
case 'ok':
clean_rule = _('# You may allow it through a dedicated unconfined profile for {} ({})').format(ev['comm'], ev['execpath'])
else:
profile_path = aa.get_profile_filename_from_profile_name(ev['profile'])
clean_rule = rule.get_clean()
if profile_path:
out += _('If you want to allow this operation you can add the line below in profile {}\n').format(profile_path)
out += clean_rule
else:
out += _('However {profile} is not in {profile_dir}\nIt is likely that the profile was not stored in {profile_dir} or was removed.\n').format(profile=ev['profile'], profile_dir=aa.profile_dir)
else: # Should not happen
out += _('ERROR: Could not create rule from event.')
profile_path = None
if get_clean_rule:
return out, profile_path, clean_rule
else:
return out, profile_path
# TODO reuse more code from aa-logprof in callbacks
def cb_more_info(notification, action, _args):
(raw_ev, rl, special_profiles) = _args
(ev, rl, special_profiles) = _args
notification.close()
parsed_event = rl.parse_record(raw_ev)
out = _('Operation denied by AppArmor\n\n')
out, profile_path, clean_rule = get_more_info_about_event(rl, ev, special_profiles, _('Operation denied by AppArmor\n\n'), get_clean_rule=True)
for key, value in parsed_event.items():
if value:
out += '\t{} = {}\n'.format(_(key), value)
out += _('\nThe software that declined this operation is {}\n').format(parsed_event['profile'])
rule = rl.create_rule_from_ev(parsed_event)
# Exec events are created with the default FileRule.ANY_EXEC. We use Pix for actual rules
if type(rule) is FileRule and rule.exec_perms == FileRule.ANY_EXEC:
rule.exec_perms = 'Pix'
if rule:
aa.update_profiles()
if customized_message['userns']['cond'](parsed_event, special_profiles):
profile_path = None
out += _('You may allow it through a dedicated unconfined profile for {}.').format(parsed_event['comm'])
else:
profile_path = aa.get_profile_filename_from_profile_name(parsed_event['profile'])
if profile_path:
out += _('If you want to allow this operation you can add the line below in profile {}\n').format(profile_path)
out += rule.get_clean()
else:
out += _('However {profile} is not in {profile_dir}\nIt is likely that the profile was not stored in {profile_dir} or was removed.\n').format(profile=parsed_event['profile'], profile_dir=aa.profile_dir)
else: # Should not happen
out += _('ERROR: Could not create rule from event.')
return
ans = ShowMoreGUI(profile_path, out, rule.get_clean(), parsed_event['profile'], profile_path is not None).show()
ans = ShowMoreGUI(profile_path, out, clean_rule, ev['profile'], profile_path is not None).show()
if ans == 'add_rule':
add_to_profile(rule.get_clean(), parsed_event['profile'])
add_to_profile(clean_rule, ev['profile'])
elif ans in {'allow', 'deny'}:
create_userns_profile(parsed_event['comm'], parsed_event['execpath'], ans)
create_userns_profile(ev['comm'], ev['execpath'], ans)
def add_to_profile(rule, profile_name):
@ -573,12 +613,67 @@ def add_to_profile(rule, profile_name):
ErrorGUI(_('Failed to add rule {rule} to {profile}\nError code = {retcode}').format(rule=rule, profile=profile_name, retcode=e.returncode), False).show()
def cb_add_to_profile(notification, action, _args):
(raw_ev, rl, special_profiles) = _args
notification.close()
parsed_event = rl.parse_record(raw_ev)
def create_from_file(file_path):
update_profile_path = update_profile.__file__
command = ['pkexec', '--keep-cwd', update_profile_path, 'from_file', file_path]
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
if e.returncode != 126: # return code 126 means the user cancelled the request
ErrorGUI(_('Failed to add some rules'), False).show()
rule = rl.create_rule_from_ev(parsed_event)
def allow_all(clean_rules):
local_template_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'default_unconfined.template')
if os.path.exists(local_template_path): # We are using local aa-notify -> we use local template
template_path = local_template_path
else:
template_path = aa.CONFDIR + '/default_unconfined.template'
tmp = tempfile.NamedTemporaryFile()
with open(tmp.name, mode='w') as f:
profile = None
for line in clean_rules.splitlines():
if line == '':
continue
elif line[0] == '#':
profile = None
pass
elif line[0] != '\t':
profile = line[8:-1] # 8:-1 is to remove 'profile ' and ':'
else:
if line[1] == '#': # Add to userns
if line[-1] == '.': # '.' <==> There is an error: we cannot add the profile automatically
continue
profile_name = line.split()[-2] # line always finishes by <profile_name>
bin_path = line.split()[-1][1:-1] # 1:-1 to remove the parenthesis
profile_path = aa.get_profile_filename_from_profile_name(profile_name, True)
if not profile_path:
ErrorGUI(_('Cannot get profile path for {}.').format(profile_name), False).show()
continue
f.write('create_userns\t{}\t{}\t{}\t{}\t{}\n'.format(template_path, profile_name, bin_path, profile_path, 'allow'))
else:
if profile is not None:
f.write('add_rule\t{}\t{}\n'.format(line[1:], profile))
else:
print(_("Rule {} cannot be added automatically").format(line[1:]), file=sys.stdout)
create_from_file(tmp.name)
# TODO reuse more code from aa-logprof in callbacks
def cb_more_info_aggregated(notification, action, _args):
(to_display, aggregated, clean_rules) = _args
res = ShowMoreGUIAggregated(to_display, aggregated, clean_rules).show()
if res == 'allow_all':
allow_all(clean_rules)
def cb_add_to_profile(notification, action, _args):
(ev, rl, special_profiles) = _args
notification.close()
rule = rl.create_rule_from_ev(ev)
# Exec events are created with the default FileRule.ANY_EXEC. We use Pix for actual rules
if type(rule) is FileRule and rule.exec_perms == FileRule.ANY_EXEC:
@ -590,10 +685,10 @@ def cb_add_to_profile(notification, action, _args):
aa.update_profiles()
if customized_message['userns']['cond'](parsed_event, special_profiles):
ask_for_user_ns_denied(parsed_event['execpath'], parsed_event['comm'], False)
if customized_message['userns']['cond'](ev, special_profiles):
ask_for_user_ns_denied(ev['execpath'], ev['comm'], False)
else:
add_to_profile(rule.get_clean(), parsed_event['profile'])
add_to_profile(rule.get_clean(), ev['profile'])
customized_message = {
@ -616,6 +711,83 @@ def start_glib_loop():
loop.run()
def aggregate_event(agg, ev, keys_to_aggregate):
profile = ev['profile']
agg[profile]['count'] += 1
agg[profile]['events'].append(ev)
for key in keys_to_aggregate:
if key in ev:
value = ev[key]
agg[profile]['values'][key][value] += 1
return agg
def get_aggregated(rl, agg, max_nb_profiles, keys_to_aggregate, special_profiles):
notification = ''
summary = ''
more_info = ''
clean_rules = ''
summary = _('Notifications were raised for profiles: {}\n').format(', '.join(list(agg.keys())))
sorted_profiles = sorted(agg.items(), key=lambda item: item[1]['count'], reverse=True)
for profile, data in sorted_profiles:
profile_notif = _('profile: {} — {} events\n').format(profile, data['count'])
notification += profile_notif
summary += profile_notif
if len(agg) <= max_nb_profiles:
for key in keys_to_aggregate:
if key in data['values']:
total_key_events = sum(data['values'][key].values())
sorted_values = sorted(data['values'][key].items(), key=lambda item: item[1], reverse=True)
for value, count in sorted_values:
percent = (count / total_key_events) * 100
if percent >= 20: # We exclude rare cases for clarity. 20% is arbitrary
summary += _('\t{} was {} {:.1f}% of the time\n').format(key, value, percent)
summary += '\n'
more_info += _('profile {}, {} events\n').format(profile, data['count'])
rules_for_profiles = set()
found_profile = True
for i, ev in enumerate(data['events']):
more_info_rule, profile_path, clean_rule = get_more_info_about_event(rl, ev, special_profiles, _(' - Event {} -\n').format(i + 1), get_clean_rule=True)
if i != 0:
more_info += '\n\n'
if not profile_path:
found_profile = False
more_info += more_info_rule
if clean_rule:
rules_for_profiles.add(clean_rule)
if rules_for_profiles != set():
if profile not in special_profiles:
if found_profile:
clean_rules += _('profile {}:').format(profile)
else:
clean_rules += _('# Unknown profile {}').format(profile)
else:
clean_rules += _('# unprivileged userns denials ({}):').format(profile)
clean_rules += '\n\t' + '\n\t'.join(rules_for_profiles) + '\n'
return notification, summary, more_info, clean_rules
def display_notification(ev, rl, message, userns_special_profiles):
message = customize_notification_message(ev, message, userns_special_profiles)
n = notify2.Notification(_('AppArmor security notice'), message, 'gtk-dialog-warning')
if can_allow_rule(ev, userns_special_profiles):
n.add_action('clicked', 'Allow', cb_add_to_profile, (ev, rl, userns_special_profiles))
n.add_action('more_clicked', 'Show More', cb_more_info, (ev, rl, userns_special_profiles))
n.show()
def display_aggregated_notification(rl, aggregated, maximum_number_notification_profiles, keys_to_aggregate, special_profiles):
notification, summary, more_info, clean_rules = get_aggregated(rl, aggregated, maximum_number_notification_profiles, keys_to_aggregate, special_profiles)
n = notify2.Notification(_('AppArmor security notice'), notification, 'gtk-dialog-warning')
n.add_action('more_aggregated_clicked', 'Show More Info', cb_more_info_aggregated, (summary, more_info, clean_rules))
n.show()
def main():
"""Run aa-notify.
@ -652,6 +824,7 @@ def main():
parser.add_argument('-v', '--verbose', action='store_true', help=_('show messages with stats'))
parser.add_argument('-u', '--user', type=str, help=_('user to drop privileges to when not using sudo'))
parser.add_argument('-w', '--wait', type=int, metavar=('NUM'), help=_('wait NUM seconds before displaying notifications (with -p)'))
parser.add_argument('-m', '--merge-notifications', action='store_true', help=_('Merge notification for improved readability (with -p)'))
parser.add_argument('--prompt-filter', type=str, metavar=('PF'), help=_('kind of operations which display a popup prompt'))
parser.add_argument('--debug', action='store_true', help=_('debug mode'))
parser.add_argument('--configdir', type=str, help=argparse.SUPPRESS)
@ -736,6 +909,8 @@ def main():
- ignore_denied_capability
- interface_theme
- prompt_filter
- maximum_number_notification_profiles
- keys_to_aggregate
- filter.profile,
- filter.operation,
- filter.name,
@ -756,6 +931,8 @@ def main():
'show_notifications',
'message_body',
'message_footer',
'maximum_number_notification_profiles',
'keys_to_aggregate',
'filter.profile',
'filter.operation',
'filter.name',
@ -852,6 +1029,16 @@ def main():
if unsupported:
sys.exit(_('ERROR: using an unsupported prompt filter: {}\nSupported values: {}').format(', '.join(unsupported), ', '.join(supported_prompt_filter)))
if 'maximum_number_notification_profiles' in config['']:
maximum_number_notification_profiles = int(config['']['maximum_number_notification_profiles'].strip())
else:
maximum_number_notification_profiles = 2
if 'keys_to_aggregate' in config['']:
keys_to_aggregate = config['']['keys_to_aggregate'].strip().split(',')
else:
keys_to_aggregate = {'operation', 'class', 'name', 'denied', 'target'}
if args.file:
logfile = args.file
elif os.path.isfile('/var/run/auditd.pid') and os.path.isfile('/var/log/audit/audit.log'):
@ -888,6 +1075,38 @@ def main():
# Initialize the list of profiles for can_allow_rule
aa.read_profiles()
drop_privileges()
daemonize()
raise_privileges()
if args.merge_notifications:
if not args.wait or args.wait == 0:
args.wait = 5
old_time = int(time.time())
while True:
raw_evs = get_apparmor_events_return(logfile, old_time)
drop_privileges()
if len(raw_evs) == 1: # Single event: we handle it without aggregation
raw_ev = raw_evs[0]
ev = rl.parse_record(raw_ev)
display_notification(ev, rl, format_event(raw_ev, logfile), userns_special_profiles)
elif len(raw_evs) > 1:
aggregated = defaultdict(lambda: {'count': 0, 'values': defaultdict(lambda: defaultdict(int)), 'events': []})
for raw_ev in raw_evs:
ev = rl.parse_record(raw_ev)
aggregate_event(aggregated, ev, keys_to_aggregate)
display_aggregated_notification(rl, aggregated, maximum_number_notification_profiles, keys_to_aggregate, userns_special_profiles)
old_time = int(time.time())
# When notification is sent, raise privileged back to root if the
# original effective user id was zero (to be able to read AppArmor logs)
raise_privileges()
time.sleep(args.wait)
else:
# At this point this script needs to be able to read 'logfile' but once
# the for loop starts, privileges can be dropped since the file descriptor
# has been opened and access granted. Further reads of the file will not
@ -903,30 +1122,14 @@ def main():
# Special behaivor for userns:
if args.prompt_filter and 'userns' in args.prompt_filter and customized_message['userns']['cond'](ev, userns_special_profiles):
if prompt_userns(ev, userns_special_profiles):
prompt_userns(ev)
continue # Notification already displayed for this event, we go to the next one.
# Notifications should not be run as root, since root probably is
# the wrong desktop user and not the one getting the notifications.
drop_privileges()
# sudo does not preserve DBUS address, so we need to guess it based on UID
if 'DBUS_SESSION_BUS_ADDRESS' not in os.environ:
os.environ['DBUS_SESSION_BUS_ADDRESS'] = 'unix:path=/run/user/{}/bus'.format(os.geteuid())
message = customize_notification_message(ev, message, userns_special_profiles)
n = notify2.Notification(
_('AppArmor security notice'),
message,
'gtk-dialog-warning'
)
if can_allow_rule(ev, userns_special_profiles):
n.add_action('clicked', 'Allow', cb_add_to_profile, (event, rl, userns_special_profiles))
n.add_action('more_clicked', 'Show More', cb_more_info, (event, rl, userns_special_profiles))
n.show()
display_notification(ev, rl, message, userns_special_profiles)
# When notification is sent, raise privileged back to root if the
# original effective user id was zero (to be able to read AppArmor logs)

View file

@ -2,7 +2,12 @@ import os
import tkinter as tk
import tkinter.ttk as ttk
import subprocess
try: # We use tk without themes as a fallback which makes the GUI uglier but functional.
import ttkthemes
except ImportError:
ttkthemes = None
import apparmor.aa as aa
@ -26,17 +31,17 @@ class GUI:
os._exit(1)
self.result = None
if ttkthemes:
style = ttkthemes.ThemedStyle(self.master)
style.theme_use(interface_theme)
self.bg_color = style.lookup('TLabel', 'background')
self.fg_color = style.lookup('TLabel', 'foreground')
self.master.configure(background=self.bg_color)
self.label_frame = ttk.Frame(self.master, padding=(20, 10))
self.label_frame.pack()
self.button_frame = ttk.Frame(self.master, padding=(0, 10))
self.button_frame.pack()
self.button_frame = ttk.Frame(self.master, padding=(10, 10))
self.button_frame.pack(fill='x', expand=True)
def show(self):
self.master.mainloop()
@ -59,8 +64,9 @@ class ShowMoreGUI(GUI):
self.master.title(_('AppArmor - More info'))
self.label = tk.Label(self.label_frame, background=self.bg_color, foreground=self.fg_color,
text=self.msg, anchor='w', justify=tk.LEFT, wraplength=460)
self.label = tk.Label(self.label_frame, text=self.msg, anchor='w', justify=tk.LEFT, wraplength=460)
if ttkthemes:
self.label.configure(background=self.bg_color, foreground=self.fg_color)
self.label.pack(pady=(0, 10) if not self.profile_found else (0, 0))
if self.profile_found:
@ -80,6 +86,75 @@ class ShowMoreGUI(GUI):
self.do_nothing_button.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5)
class ShowMoreGUIAggregated(GUI):
def __init__(self, summary, detailed_text, clean_rules):
self.summary = summary
self.detailed_text = detailed_text
self.clean_rules = clean_rules
self.states = {
'summary': {
'msg': self.summary,
'btn_left': _('Show more details'),
'btn_right': _('Show rules only')
},
'detailed': {
'msg': self.detailed_text,
'btn_left': _('Show summary'),
'btn_right': _('Show rules only')
},
'rules_only': {
'msg': self.clean_rules,
'btn_left': _('Show more details'),
'btn_right': _('Show summary')
}
}
self.state = 'rules_only'
super().__init__()
self.master.title(_('AppArmor - More info'))
self.text_display = tk.Text(self.label_frame, height=40, width=100, wrap='word')
if ttkthemes:
self.text_display.configure(background=self.bg_color, foreground=self.fg_color)
self.text_display.insert('1.0', self.states[self.state]['msg'])
self.text_display['state'] = 'disabled'
self.scrollbar = ttk.Scrollbar(self.label_frame, command=self.text_display.yview)
self.text_display['yscrollcommand'] = self.scrollbar.set
self.scrollbar.pack(side='right', fill='y')
self.text_display.pack(side='left', fill='both', expand=True)
self.btn_left = ttk.Button(self.button_frame, text=self.states[self.state]['btn_left'], width=1, command=lambda: self.change_view('btn_left'))
self.btn_left.grid(row=0, column=0, padx=5, pady=5, sticky="ew")
self.btn_right = ttk.Button(self.button_frame, text=self.states[self.state]['btn_right'], width=1, command=lambda: self.change_view('btn_right'))
self.btn_right.grid(row=0, column=1, padx=5, pady=5, sticky="ew")
self.btn_allow_all = ttk.Button(self.button_frame, text="Allow All", width=1, command=lambda: self.set_result('allow_all'))
self.btn_allow_all.grid(row=0, column=2, padx=5, pady=5, sticky="ew")
for i in range(3):
self.button_frame.grid_columnconfigure(i, weight=1)
def change_view(self, action):
if action == 'btn_left':
self.state = 'detailed' if self.state != 'detailed' else 'summary'
elif action == 'btn_right':
self.state = 'rules_only' if self.state != 'rules_only' else 'summary'
self.btn_left['text'] = self.states[self.state]['btn_left']
self.btn_right['text'] = self.states[self.state]['btn_right']
self.text_display['state'] = 'normal'
self.text_display.delete('1.0', 'end')
self.text_display.insert('1.0', self.states[self.state]['msg'])
self.text_display['state'] = 'disabled'
class UsernsGUI(GUI):
def __init__(self, name, path):
self.name = name
@ -143,11 +218,11 @@ class ErrorGUI(GUI):
self.master.title('AppArmor Error')
# Create label to display the error message
self.label = ttk.Label(self.label_frame, background=self.bg_color, text=self.msg, wraplength=460)
self.label = ttk.Label(self.label_frame, text=self.msg, wraplength=460)
if ttkthemes:
self.label.configure(background=self.bg_color)
self.label.pack()
# Create a button to close the dialog
self.button = ttk.Button(self.button_frame, text='OK', command=self.destroy)
self.button.pack()

View file

@ -29,7 +29,7 @@ def create_userns(template_path, name, bin_path, profile_path, decision):
def add_to_profile(rule, profile_name):
aa.init_aa()
aa.read_profiles()
aa.update_profiles()
rule_type, rule_class = ReadLog('', '', '').get_rule_type(rule)
@ -48,32 +48,51 @@ def usage(is_help):
print('This tool is a low level tool - do not use it directly')
print('{} create_userns <template_path> <name> <bin_path> <profile_path> <decision>'.format(sys.argv[0]))
print('{} add_rule <rule> <profile_name>'.format(sys.argv[0]))
print('{} from_file <file>'.format(sys.argv[0]))
if is_help:
exit(0)
else:
exit(1)
def create_from_file(file_path):
with open(file_path) as file:
for line in file:
args = line[:-1].split('\t')
if len(args) > 1:
command = args[0]
else:
command = None # Handle the case where no command is provided
do_command(command, args)
def do_command(command, args):
match command:
case 'from_file':
if not len(args) == 2:
usage(False)
create_from_file(args[1])
case 'create_userns':
if not len(args) == 6:
usage(False)
create_userns(args[1], args[2], args[3], args[4], args[5])
case 'add_rule':
if not len(args) == 3:
usage(False)
add_to_profile(args[1], args[2])
case 'help':
usage(True)
case _:
usage(False)
def main():
if len(sys.argv) > 1:
command = sys.argv[1]
else:
command = None # Handle the case where no command is provided
match command:
case 'create_userns':
if not len(sys.argv) == 7:
usage(False)
create_userns(sys.argv[2], sys.argv[3], sys.argv[4], sys.argv[5], sys.argv[6])
case 'add_rule':
if not len(sys.argv) == 4:
usage(False)
add_to_profile(sys.argv[2], sys.argv[3])
case 'help':
usage(True)
case _:
usage(False)
do_command(command, sys.argv[1:])
if __name__ == '__main__':

View file

@ -26,5 +26,16 @@
<annotate key="org.freedesktop.policykit.exec.path">{LIB_PATH}apparmor/update_profile.py</annotate>
<annotate key="org.freedesktop.policykit.exec.argv1">create_userns</annotate>
</action>
<action id="com.ubuntu.pkexec.aa-notify.from_file">
<description>AppArmor: Modifying profile from file</description>
<message>To modify an AppArmor security profile from file, you need to authenticate.</message>
<defaults>
<allow_any>auth_admin</allow_any>
<allow_inactive>auth_admin</allow_inactive>
<allow_active>auth_admin</allow_active>
</defaults>
<annotate key="org.freedesktop.policykit.exec.path">{LIB_PATH}apparmor/update_profile.py</annotate>
<annotate key="org.freedesktop.policykit.exec.argv1">from_file</annotate>
</action>
</policyconfig>

View file

@ -23,6 +23,12 @@ ignore_denied_capability="sudo,su"
# OPTIONAL - kind of operations which display a popup prompt.
# prompt_filter="userns"
# OPTIONAL - Maximum number of profile that can send notification before they are merged
# maximum_number_notification_profiles=2
# OPTIONAL - Keys to aggregate when merging events
# keys_to_aggregate="operation,class,name,denied,target"
# OPTIONAL - restrict using aa-notify to users in the given group
# (if not set, everybody who has permissions to read the logfile can use it)
# use_group="admin"

View file

@ -184,7 +184,7 @@ class AANotifyTest(AANotifyBase):
expected_return_code = 0
expected_output_1 = \
'''usage: aa-notify [-h] [-p] [--display DISPLAY] [-f FILE] [-l] [-s NUM] [-v]
[-u USER] [-w NUM] [--prompt-filter PF] [--debug]
[-u USER] [-w NUM] [-m] [--prompt-filter PF] [--debug]
[--filter.profile PROFILE] [--filter.operation OPERATION]
[--filter.name NAME] [--filter.denied DENIED]
[--filter.family FAMILY] [--filter.socket SOCKET]
@ -207,6 +207,8 @@ Display AppArmor notifications or messages for DENIED entries.
-u USER, --user USER user to drop privileges to when not using sudo
-w NUM, --wait NUM wait NUM seconds before displaying notifications (with
-p)
-m, --merge-notifications
Merge notification for improved readability (with -p)
--prompt-filter PF kind of operations which display a popup prompt
--debug debug mode