2019-01-09 23:59:40 +01:00
#! /usr/bin/python3
# ----------------------------------------------------------------------
2023-03-26 11:26:59 -07:00
# Copyright (C) 2018– 2022 Otto Kekäläinen <otto@kekalainen.net>
2010-01-28 10:25:09 -06:00
#
2010-01-28 10:58:38 -06:00
# This program is free software; you can redistribute it and/or
# modify it under the terms of version 2 of the GNU General Public
2019-01-09 23:59:40 +01:00
# License as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
2010-01-28 10:58:38 -06:00
#
2019-01-09 23:59:40 +01:00
# ----------------------------------------------------------------------
2010-01-28 10:25:09 -06:00
#
# /etc/apparmor/notify.conf:
# # set to 'yes' to enable AppArmor DENIED notifications
# show_notifications="yes"
#
# # only people in use_group can run this script
# use_group="admin"
#
2010-02-01 17:30:04 -08:00
# $HOME/.apparmor/notify.conf can have:
2010-01-28 10:25:09 -06:00
# # set to 'yes' to enable AppArmor DENIED notifications
# show_notifications="yes"
#
2019-01-09 23:59:40 +01:00
# In a typical desktop environment one would run as a service the
# command:
2019-04-23 08:36:46 +03:00
# /usr/bin/aa-notify -p -w 10
2023-03-26 11:26:59 -07:00
#
"""Show AppArmor events on command line or as desktop notifications."""
2019-01-09 23:59:40 +01:00
import argparse
import atexit
2022-08-07 20:32:07 -04:00
import grp
2019-01-09 23:59:40 +01:00
import os
2022-08-07 20:32:07 -04:00
import pwd
2019-01-09 23:59:40 +01:00
import re
import sys
import time
2022-08-07 20:32:07 -04:00
2024-08-13 16:58:25 +00:00
import subprocess
2019-01-09 23:59:40 +01:00
import notify2
import psutil
import apparmor.aa as aa
import apparmor.ui as aaui
import apparmor.config as aaconfig
2024-08-13 16:58:25 +00:00
import apparmor.update_profile as update_profile
2022-08-07 20:32:07 -04:00
import LibAppArmor # C-library to parse one log line
2024-09-17 09:17:23 +00:00
from apparmor.common import DebugLogger, open_file_read
2019-01-09 23:59:40 +01:00
from apparmor.fail import enable_aa_exception_handler
2021-10-24 12:54:20 +02:00
from apparmor.notify import get_last_login_timestamp
2019-01-09 23:59:40 +01:00
from apparmor.translations import init_translation
2024-08-13 16:58:25 +00:00
from apparmor.logparser import ReadLog
2024-09-17 09:17:23 +00:00
from apparmor.gui import UsernsGUI, ErrorGUI, ShowMoreGUI, set_interface_theme
from apparmor.rule.file import FileRule
2024-08-13 16:58:25 +00:00
from dbus import DBusException
import gi
from gi.repository import GLib
import threading
gi.require_version('GLib', '2.0')
2019-01-09 23:59:40 +01:00
2022-08-07 12:26:24 -04:00
2019-01-09 23:59:40 +01:00
def get_user_login():
2023-03-26 11:26:59 -07:00
"""Portable function to get username.
Should not trigger any
"OSError: [Errno 25] Inappropriate ioctl for device" errors in Giltab-CI.
"""
2019-01-09 23:59:40 +01:00
if os.name == "posix":
username = pwd.getpwuid(os.geteuid()).pw_name
else:
username = os.environ.get('USER')
if not username and hasattr(os, 'getlogin'):
username = os.getlogin()
return username
def format_event(event, logsource):
2023-03-26 11:26:59 -07:00
"""Generate the notification text contents."""
2019-01-09 23:59:40 +01:00
output = []
if 'message_body' in config['']:
output += [config['']['message_body']]
if event.profile:
output += ['Profile: {}'.format(event.profile)]
if event.operation:
output += ['Operation: {}'.format(event.operation)]
if event.name:
output += ['Name: {}'.format(event.name)]
if event.denied_mask:
output += ['Denied: {}'.format(event.denied_mask)]
if event.net_family and event.net_sock_type:
output += ['Family: {}'.format(event.net_family)]
output += ['Socket: {}'.format(event.net_sock_type)]
output += ['Logfile: {}'.format(logsource)]
return "\n".join(output)
2024-02-23 17:15:14 -03:00
def is_event_in_filter(event, filters):
"""Checks if event is in filter"""
2024-02-26 10:24:46 -03:00
if filters['profile'] and event.profile and not filters['profile_re'].match(event.profile):
2024-02-23 17:15:14 -03:00
return False
2024-02-26 10:24:46 -03:00
if filters['operation'] and event.operation and not filters['operation_re'].match(event.operation):
2024-02-23 17:15:14 -03:00
return False
2024-02-26 10:24:46 -03:00
if filters['name'] and event.name and not filters['name_re'].match(event.name):
2024-02-23 17:15:14 -03:00
return False
2024-02-26 10:24:46 -03:00
if filters['denied_mask'] and event.denied_mask and not filters['denied_mask_re'].match(event.denied_mask):
2024-02-23 17:15:14 -03:00
return False
2024-02-26 10:24:46 -03:00
if filters['net_family'] and event.net_family and not filters['net_family_re'].match(event.net_family):
2024-02-23 17:15:14 -03:00
return False
2024-02-26 10:24:46 -03:00
if filters['net_sock_type'] and event.net_sock_type and not filters['net_sock_type_re'].match(event.net_sock_type):
2024-02-23 17:15:14 -03:00
return False
return True
def notify_about_new_entries(logfile, filters, wait=0):
2023-03-26 11:26:59 -07:00
"""Run the notification daemon in the background."""
2019-01-09 23:59:40 +01:00
# Kill other instances of aa-notify if already running
for process in psutil.process_iter():
# Find the process that has the same name as this script, e.g. aa-notify.py
if process.name() == os.path.basename(__file__) and process.pid != os.getpid():
print(_('Killing old daemon (PID {})...').format(process.pid))
os.kill(process.pid, 15)
# Spawn/fork into the background and stay running
newpid = os.fork()
if newpid == 0:
# Follow the logfile and stream notifications
# Rate limit to not show too many notifications
try:
2024-08-13 16:58:25 +00:00
# Before use, notify2 must be initialized and the DBUS channel
# should be opened using the non-root user. This step needs to
# be executed after the drop_privileges().
notify2.init('aa-notify', mainloop='glib')
except DBusException:
sys.exit(_('Cannot initialize notify2. Please check that your terminal can use a graphical interface'))
try:
thread = threading.Thread(target=start_glib_loop)
thread.daemon = True
thread.start()
2019-01-09 23:59:40 +01:00
for event in follow_apparmor_events(logfile, wait):
2024-02-23 17:15:14 -03:00
if not is_event_in_filter(event, filters):
continue
2019-01-09 23:59:40 +01:00
debug_logger.info(format_event(event, logfile))
2024-08-13 16:58:25 +00:00
yield event, format_event(event, logfile)
2019-01-09 23:59:40 +01:00
except PermissionError:
2024-07-15 17:18:03 +02:00
sys.exit(_("ERROR: Cannot read {}. Please check permissions.").format(logfile))
2019-01-09 23:59:40 +01:00
else:
print(_('Notification emitter started in the background'))
# pids = (os.getpid(), newpid)
# print("parent: %d, child: %d\n" % pids)
os._exit(0) # Exit child without calling exit handlers etc
2024-02-23 17:15:14 -03:00
def show_entries_since_epoch(logfile, epoch_since, filters):
2023-03-26 11:26:59 -07:00
"""Show AppArmor notifications since given timestamp."""
2019-01-09 23:59:40 +01:00
count = 0
for event in get_apparmor_events(logfile, epoch_since):
2024-02-23 17:15:14 -03:00
if not is_event_in_filter(event, filters):
continue
2019-01-09 23:59:40 +01:00
count += 1
if args.verbose:
print(format_event(event, logfile))
print() # Print a newline after each entry for better readability
aaui.UI_Info(_('AppArmor denials: {count} (since {date})').format(
2024-05-17 13:53:42 +02:00
**{
'count': count,
'date': time.strftime(timeformat, time.localtime(epoch_since))
}
))
2019-01-09 23:59:40 +01:00
if args.verbose:
if 'message_footer' in config['']:
print(config['']['message_footer'])
else:
print(_('For more information, please see: {}').format(debug_docs_url))
2024-02-23 17:15:14 -03:00
def show_entries_since_last_login(logfile, filters, username=get_user_login()):
2023-03-26 11:26:59 -07:00
"""Show AppArmor notifications since last login of user."""
2019-01-09 23:59:40 +01:00
# If running as sudo, use username of sudo user instead of root
if 'SUDO_USER' in os.environ.keys():
username = os.environ['SUDO_USER']
if args.verbose:
print(_('Showing entries since {} logged in').format(username))
2022-08-07 12:26:24 -04:00
print() # Newline
2019-01-09 23:59:40 +01:00
epoch_since = get_last_login_timestamp(username)
if epoch_since == 0:
print(_('ERROR: Could not find last login'), file=sys.stderr)
sys.exit(1)
2024-02-23 17:15:14 -03:00
show_entries_since_epoch(logfile, epoch_since, filters)
2019-01-09 23:59:40 +01:00
2024-02-23 17:15:14 -03:00
def show_entries_since_days(logfile, since_days, filters):
2023-03-26 11:26:59 -07:00
"""Show AppArmor notifications since the given amount of days."""
day_in_seconds = 60 * 60 * 24
2019-01-09 23:59:40 +01:00
epoch_now = int(time.time())
epoch_since = epoch_now - day_in_seconds * since_days
2024-02-23 17:15:14 -03:00
show_entries_since_epoch(logfile, epoch_since, filters)
2019-01-09 23:59:40 +01:00
def follow_apparmor_events(logfile, wait=0):
2023-03-26 11:26:59 -07:00
"""Follow AppArmor events and yield relevant entries until process stops."""
2019-01-09 23:59:40 +01:00
# If wait was given as argument but was type None (from ArgumentParser)
2022-06-18 16:14:45 -04:00
# ensure it's type int and zero
2019-01-09 23:59:40 +01:00
if not wait:
wait = 0
# Record start time here so wait can be calculated later
start_time = int(time.time())
# Record initial file size to detect if log rotates
log_size = os.stat(logfile).st_size
# Record initial file inode number to detect if log gets renamed
log_inode = os.stat(logfile).st_ino
# @TODO Implement more log sources in addition to just the logfile
with open_file_read(logfile) as logdata:
# Loop all pre-existing events in the log source once so later runs
# will only see new events
for discarded_event in logdata:
pass
# @TODO: while+sleep will cause CPU interruptions once per second,
# so switch to epoll/inotify/etc for less resource consumption.
while True:
debug_logger.debug(
'Poll AppArmor event source {} seconds since start'.
format(int(time.time()) - start_time)
)
aa-notify: don't crash if the logfile is not present due to rotation
If aa-notify races file rotation it may crash with a trace back to
the log file being removed before the new one is moved into place.
Traceback (most recent call last):
File "/usr/sbin/aa-notify", line 570, in <module>
main()
File "/usr/sbin/aa-notify", line 533, in main
for message in notify_about_new_entries(logfile, args.wait):
File "/usr/sbin/aa-notify", line 145, in notify_about_new_entries
for event in follow_apparmor_events(logfile, wait):
File "/usr/sbin/aa-notify", line 236, in follow_apparmor_events
if os.stat(logfile).st_ino != log_inode:
FileNotFoundError: [Errno 2] No such file or directory: '/var/log/audit/audit.log'
If we hit this situation sleep and then retry opening the logfile.
Fixes: https://gitlab.com/apparmor/apparmor/-/issues/130
MR: https://gitlab.com/apparmor/apparmor/-/merge_requests/688
Signed-off-by: John Johansen <john.johansen@canonical.com>
Acked-by: Christian Boltz <apparmor@cboltz.de>
2020-11-28 04:48:47 -08:00
(logdata, log_inode, log_size) = reopen_logfile_if_needed(logfile, logdata, log_inode, log_size)
for event in parse_logdata(logdata):
# @TODO Alternatively use os.times()
if int(time.time()) - start_time < wait:
debug_logger.debug('Omitted an event seen during wait time')
continue
yield event
if debug_logger.debugging and debug_logger.debug_level <= 10 and int(time.time()) - start_time > 100:
debug_logger.debug('Debug mode detected: aborting notification emitter after 100 seconds.')
sys.exit(0)
time.sleep(1)
def reopen_logfile_if_needed(logfile, logdata, log_inode, log_size):
retry = True
while retry:
try:
2020-11-19 12:30:04 -08:00
# Reopen file if inode has changed, e.g. rename by logrotate
2019-01-09 23:59:40 +01:00
if os.stat(logfile).st_ino != log_inode:
debug_logger.debug('Logfile was renamed, reload to read the new file.')
logdata = open(logfile, 'r')
# Store new inode number for next comparisons
log_inode = os.stat(logfile).st_ino
# Start reading from the beginning if file shrank
if os.stat(logfile).st_size < log_size:
debug_logger.debug('Logfile shrank in size, reload from beginning.')
logdata.seek(0)
log_size = os.stat(logfile).st_size # Reset file size value
# Record new file size if grown
if os.stat(logfile).st_size > log_size:
log_size = os.stat(logfile).st_size
aa-notify: don't crash if the logfile is not present due to rotation
If aa-notify races file rotation it may crash with a trace back to
the log file being removed before the new one is moved into place.
Traceback (most recent call last):
File "/usr/sbin/aa-notify", line 570, in <module>
main()
File "/usr/sbin/aa-notify", line 533, in main
for message in notify_about_new_entries(logfile, args.wait):
File "/usr/sbin/aa-notify", line 145, in notify_about_new_entries
for event in follow_apparmor_events(logfile, wait):
File "/usr/sbin/aa-notify", line 236, in follow_apparmor_events
if os.stat(logfile).st_ino != log_inode:
FileNotFoundError: [Errno 2] No such file or directory: '/var/log/audit/audit.log'
If we hit this situation sleep and then retry opening the logfile.
Fixes: https://gitlab.com/apparmor/apparmor/-/issues/130
MR: https://gitlab.com/apparmor/apparmor/-/merge_requests/688
Signed-off-by: John Johansen <john.johansen@canonical.com>
Acked-by: Christian Boltz <apparmor@cboltz.de>
2020-11-28 04:48:47 -08:00
retry = False
except FileNotFoundError:
# @TODO: switch to epoll/inotify/
debug_logger.debug('Logfile not found, retrying.')
2019-01-09 23:59:40 +01:00
time.sleep(1)
aa-notify: don't crash if the logfile is not present due to rotation
If aa-notify races file rotation it may crash with a trace back to
the log file being removed before the new one is moved into place.
Traceback (most recent call last):
File "/usr/sbin/aa-notify", line 570, in <module>
main()
File "/usr/sbin/aa-notify", line 533, in main
for message in notify_about_new_entries(logfile, args.wait):
File "/usr/sbin/aa-notify", line 145, in notify_about_new_entries
for event in follow_apparmor_events(logfile, wait):
File "/usr/sbin/aa-notify", line 236, in follow_apparmor_events
if os.stat(logfile).st_ino != log_inode:
FileNotFoundError: [Errno 2] No such file or directory: '/var/log/audit/audit.log'
If we hit this situation sleep and then retry opening the logfile.
Fixes: https://gitlab.com/apparmor/apparmor/-/issues/130
MR: https://gitlab.com/apparmor/apparmor/-/merge_requests/688
Signed-off-by: John Johansen <john.johansen@canonical.com>
Acked-by: Christian Boltz <apparmor@cboltz.de>
2020-11-28 04:48:47 -08:00
# @TODO: send notification if reopening the log fails too many times
return (logdata, log_inode, log_size)
2019-01-09 23:59:40 +01:00
def get_apparmor_events(logfile, since=0):
2023-03-26 11:26:59 -07:00
"""Read audit events from log source and yield all relevant events."""
2019-01-09 23:59:40 +01:00
# Get logdata from file
# @TODO Implement more log sources in addition to just the logfile
try:
with open_file_read(logfile) as logdata:
for event in parse_logdata(logdata):
if event.epoch > since:
yield event
except PermissionError:
sys.exit(_("ERROR: Cannot read {}. Please check permissions.".format(logfile)))
def parse_logdata(logsource):
2023-03-26 11:26:59 -07:00
"""Traverse any iterable log source and extract relevant AppArmor events.
Expects log lines like:
Feb 16 20:22:28 XPS-13-9370 kernel: [520374.926882] audit: type=1400
audit(1581877348.868:657): apparmor="ALLOWED" operation="open"
profile="libreoffice-soffice"
name="/usr/share/drirc.d/00-mesa-defaults.conf" pid=22690
comm="soffice.bin" requested_mask="r" denied_mask="r" fsuid=1001 ouid=0
"""
2019-01-09 23:59:40 +01:00
2022-11-20 13:41:44 -05:00
re_audit_time_id = r'(msg=)?audit\([\d\.\:]+\):\s+' # 'audit(1282626827.320:411): '
re_kernel_time = r'\[[\d\.\s]+\]' # '[ 1612.746129]'
2023-03-26 11:26:59 -07:00
re_type_num = '1[45][0-9][0-9]' # 1400..1599
2022-08-07 16:47:43 -04:00
re_aa_or_op = '(apparmor=|operation=)'
re_log_parts = [
2022-11-20 13:41:44 -05:00
r'kernel:\s+(' + re_kernel_time + r'\s+)?(audit:\s+)?type=' + re_type_num + r'\s+' + re_audit_time_id + re_aa_or_op, # v2_6 syslog
r'kernel:\s+(' + re_kernel_time + r'\s+)?' + re_audit_time_id + 'type=' + re_type_num + r'\s+' + re_aa_or_op,
'type=(AVC|APPARMOR[_A-Z]*|' + re_type_num + r')\s+' + re_audit_time_id + '(type=' + re_type_num + r'\s+)?' + re_aa_or_op, # v2_6 audit and dmesg
r'type=USER_AVC\s+' + re_audit_time_id + '.*apparmor=', # dbus
r'type=UNKNOWN\[' + re_type_num + r'\]\s+' + re_audit_time_id + re_aa_or_op,
r'dbus\[[0-9]+\]:\s+apparmor=', # dbus
2019-01-09 23:59:40 +01:00
]
# Pre-filter log lines so that we hand over only relevant lines to LibAppArmor parsing
2022-08-07 16:47:43 -04:00
re_log_all = re.compile('(' + '|'.join(re_log_parts) + ')')
2019-01-09 23:59:40 +01:00
for entry in logsource:
# Check the start of the log line and only process lines from AppArmor
2022-08-07 16:47:43 -04:00
apparmor_entry = re_log_all.search(entry)
2019-01-09 23:59:40 +01:00
if apparmor_entry:
# Parse the line using LibAppArmor (C library)
# See aalogparse.h for data structure
event = LibAppArmor.parse_record(entry)
# Only show actual events of contained programs and ignore among
# others AppArmor profile reloads
2021-09-06 16:54:33 +02:00
if event.operation and event.operation[0:8] != 'profile_':
2019-01-09 23:59:40 +01:00
yield event
def drop_privileges():
2023-03-26 11:26:59 -07:00
"""Drop privileges of process.
2019-01-09 23:59:40 +01:00
2023-03-26 11:26:59 -07:00
If running as root, drop privileges to USER if known, or fall-back to
nobody_user/group.
"""
2019-01-09 23:59:40 +01:00
if os.geteuid() == 0:
if 'SUDO_USER' in os.environ.keys():
next_username = os.environ['SUDO_USER']
next_uid = os.environ['SUDO_UID']
next_gid = os.environ['SUDO_GID']
else:
nobody_user_info = pwd.getpwnam(nobody_user)
next_username = nobody_user_info[0]
next_uid = nobody_user_info[2]
next_gid = nobody_user_info[3]
debug_logger.debug('Dropping to user "{}" privileges'.format(next_username))
# @TODO?
# Remove group privileges, including potential 'adm' group that might
# have had log read access but also other accesses.
# os.setgroups([])
# Try setting the new uid/gid
# Set gid first, otherwise the latter step would fail on missing permissions
os.setegid(int(next_gid))
os.seteuid(int(next_uid))
2022-08-07 12:26:24 -04:00
2019-01-09 23:59:40 +01:00
def raise_privileges():
2023-03-26 11:26:59 -07:00
"""Raise privileges of process.
2019-01-09 23:59:40 +01:00
2023-03-26 11:26:59 -07:00
If was running as user with saved user ID 0, raise back to root privileges.
"""
2019-01-09 23:59:40 +01:00
if os.geteuid() != 0 and original_effective_user == 0:
debug_logger.debug('Rasing privileges from UID {} back to UID 0 (root)'.format(os.geteuid()))
# os.setgid(int(next_gid))
os.seteuid(original_effective_user)
2022-08-07 12:26:24 -04:00
2019-01-09 23:59:40 +01:00
def read_notify_conf(path, shell_config):
2023-03-26 11:26:59 -07:00
"""Read notify.conf."""
2019-01-09 23:59:40 +01:00
try:
shell_config.CONF_DIR = path
conf_dict = shell_config.read_config('notify.conf')
debug_logger.debug('Found configuration file in {}/notify.conf'.format(shell_config.CONF_DIR))
return conf_dict
except FileNotFoundError:
return {}
2022-08-07 12:26:24 -04:00
2024-02-26 10:24:46 -03:00
def compile_filter_regex(filters):
"""Compile each filter regex and add it to filters"""
if filters['profile']:
filters['profile_re'] = re.compile(filters['profile'])
if filters['operation']:
filters['operation_re'] = re.compile(filters['operation'])
if filters['name']:
filters['name_re'] = re.compile(filters['name'])
if filters['denied_mask']:
filters['denied_mask_re'] = re.compile(filters['denied_mask'])
if filters['net_family']:
filters['net_family_re'] = re.compile(filters['net_family'])
if filters['net_sock_type']:
filters['net_sock_type_re'] = re.compile(filters['net_sock_type'])
return filters
2024-09-17 09:17:23 +00:00
def can_allow_rule(ev, special_profiles):
if customized_message['userns']['cond'](ev, special_profiles):
return not aa.get_profile_filename_from_profile_name(ev['comm'])
else:
return aa.get_profile_filename_from_profile_name(ev['profile']) is not None
2024-08-13 16:58:25 +00:00
def is_special_profile_userns(ev, special_profiles):
if not special_profiles or ev['profile'] not in special_profiles:
return False # We don't use special profiles or there is already a profile defined: we don't ask to add userns
if 'execpath' not in ev or not ev['execpath']:
ev['execpath'] = aa.find_executable(ev['comm'])
return True
2024-09-17 09:17:23 +00:00
def create_userns_profile(name, path, ans):
update_profile_path = update_profile.__file__
2024-08-13 16:58:25 +00:00
2024-09-17 09:17:23 +00:00
local_template_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'default_unconfined.template')
if os.path.exists(local_template_path): # We are using local aa-notify -> we use local template
template_path = local_template_path
else:
template_path = aa.CONFDIR + '/default_unconfined.template'
if path is None:
UsernsGUI.show_error_cannot_find_execpath(name, template_path)
return
profile_path = aa.get_profile_filename_from_profile_name(name, True)
if not profile_path:
ErrorGUI(_('Cannot get profile path for {}.').format(name), False).show()
return
command = ['pkexec', '--keep-cwd', update_profile_path, 'create_userns', template_path, name, path, profile_path, ans]
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
if e.returncode != 126: # return code 126 means the user cancelled the request
UsernsGUI.show_error_cannot_reload_profile(profile_path, e.returncode)
def ask_for_user_ns_denied(path, name, interactive=True):
if interactive:
gui = UsernsGUI(name, path)
ans = gui.show()
else:
ans = 'allow'
if ans in {'allow', 'deny'}:
create_userns_profile(name, path, ans)
2024-08-13 16:58:25 +00:00
else:
debug_logger.debug('No action from the user for {}'.format(path))
def prompt_userns(ev, special_profiles):
"""If the user namespace creation denial was generated by an unconfined binary, displays a graphical notification.
Creates a new profile to allow userns if the user wants it. Returns whether a notification was displayed to the user
"""
if not is_special_profile_userns(ev, special_profiles):
return False
if ev['execpath'] is None:
UsernsGUI.show_error_cannot_find_execpath(ev['comm'], os.path.dirname(os.path.abspath(__file__)) + '/default_unconfined.template')
return True
aa.update_profiles()
if aa.get_profile_filename_from_profile_name(ev['comm']):
# There is already a profile with this name: we show an error to the user.
# We could use the full path as profile name like for the old profiles if we want to handle this case
# but if execpath is not supported by the kernel it could also mean that we inferred a bad path
# So we do nothing beyond showing this error.
ErrorGUI(
2024-09-17 09:17:23 +00:00
_('Application {profile} tried to create an user namespace, but a profile already exists with this name.\n'
'This is likely because there is several binaries named {profile} thus the path inferred by AppArmor ({inferred_path}) is not correct.\n'
'You should review your profiles (in {profile_dir}).').format(profile=ev['comm'], inferred_path=ev['execpath'], profile_dir=aa.profile_dir),
2024-08-13 16:58:25 +00:00
False).show()
return True
2024-09-17 09:17:23 +00:00
ask_for_user_ns_denied(ev['execpath'], ev['comm'])
2024-08-13 16:58:25 +00:00
return True
# TODO reuse more code from aa-logprof in callbacks
def cb_more_info(notification, action, _args):
2024-09-17 09:17:23 +00:00
(raw_ev, rl, special_profiles) = _args
2024-08-13 16:58:25 +00:00
notification.close()
parsed_event = rl.parse_record(raw_ev)
out = _('Operation denied by AppArmor\n\n')
for key, value in parsed_event.items():
if value:
out += '\t{} = {}\n'.format(_(key), value)
out += _('\nThe software that declined this operation is {}\n').format(parsed_event['profile'])
rule = rl.create_rule_from_ev(parsed_event)
2024-09-17 09:17:23 +00:00
# Exec events are created with the default FileRule.ANY_EXEC. We use Pix for actual rules
if type(rule) is FileRule and rule.exec_perms == FileRule.ANY_EXEC:
rule.exec_perms = 'Pix'
2024-08-13 16:58:25 +00:00
if rule:
aa.update_profiles()
2024-09-17 09:17:23 +00:00
if customized_message['userns']['cond'](parsed_event, special_profiles):
profile_path = None
out += _('You may allow it through a dedicated unconfined profile for {}.').format(parsed_event['comm'])
2024-08-13 16:58:25 +00:00
else:
2024-09-17 09:17:23 +00:00
profile_path = aa.get_profile_filename_from_profile_name(parsed_event['profile'])
if profile_path:
out += _('If you want to allow this operation you can add the line below in profile {}\n').format(profile_path)
out += rule.get_clean()
else:
out += _('However {profile} is not in {profile_dir}\nIt is likely that the profile was not stored in {profile_dir} or was removed.\n').format(profile=parsed_event['profile'], profile_dir=aa.profile_dir)
2024-08-13 16:58:25 +00:00
else: # Should not happen
out += _('ERROR: Could not create rule from event.')
return
2024-09-17 09:17:23 +00:00
ans = ShowMoreGUI(profile_path, out, rule.get_clean(), parsed_event['profile'], profile_path is not None).show()
if ans == 'add_rule':
add_to_profile(rule.get_clean(), parsed_event['profile'])
elif ans in {'allow', 'deny'}:
create_userns_profile(parsed_event['comm'], parsed_event['execpath'], ans)
def add_to_profile(rule, profile_name):
# We get update_profile.py through this import so that it works in all cases
profile_path = aa.get_profile_filename_from_profile_name(profile_name)
if not profile_path:
ErrorGUI(
_(
'Cannot find profile for {}\n\n'
'It is likely that the profile was not stored in {} or was removed.'
).format(profile_name, aa.profile_dir),
False
).show()
return
update_profile_path = update_profile.__file__
command = ['pkexec', '--keep-cwd', update_profile_path, 'add_rule', rule, profile_name]
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
if e.returncode != 126: # return code 126 means the user cancelled the request
ErrorGUI(_('Failed to add rule {rule} to {profile}\nError code = {retcode}').format(rule=rule, profile=profile_name, retcode=e.returncode), False).show()
2024-08-13 16:58:25 +00:00
def cb_add_to_profile(notification, action, _args):
(raw_ev, rl, special_profiles) = _args
notification.close()
parsed_event = rl.parse_record(raw_ev)
rule = rl.create_rule_from_ev(parsed_event)
2024-09-17 09:17:23 +00:00
# Exec events are created with the default FileRule.ANY_EXEC. We use Pix for actual rules
if type(rule) is FileRule and rule.exec_perms == FileRule.ANY_EXEC:
rule.exec_perms = 'Pix'
2024-08-13 16:58:25 +00:00
if not rule:
2024-09-17 09:17:23 +00:00
ErrorGUI(_('ERROR: Could not create rule from event.'), False).show()
2024-08-13 16:58:25 +00:00
return
aa.update_profiles()
if customized_message['userns']['cond'](parsed_event, special_profiles):
2024-09-17 09:17:23 +00:00
ask_for_user_ns_denied(parsed_event['execpath'], parsed_event['comm'], False)
2024-08-13 16:58:25 +00:00
else:
2024-09-17 09:17:23 +00:00
add_to_profile(rule.get_clean(), parsed_event['profile'])
2024-08-13 16:58:25 +00:00
customized_message = {
'userns': {
'cond': lambda ev, special_profiles: (ev['operation'] == 'userns_create' or ev['operation'] == 'capable') and is_special_profile_userns(ev, special_profiles),
'msg': 'Application {0} wants to create an user namespace which could be used to compromise your system\nDo you want to allow it next time {0} is run?'
}
}
def customize_notification_message(ev, msg, special_profiles):
if customized_message['userns']['cond'](ev, special_profiles):
msg = _(customized_message['userns']['msg']).format(ev['comm'])
return msg
def start_glib_loop():
loop = GLib.MainLoop()
loop.run()
2019-01-09 23:59:40 +01:00
def main():
2023-03-26 11:26:59 -07:00
"""Run aa-notify.
2019-01-09 23:59:40 +01:00
2023-03-26 11:26:59 -07:00
Parse command line arguments and starts the requested operations.
"""
2019-01-09 23:59:40 +01:00
global _, debug_logger, config, args
global debug_docs_url, nobody_user, original_effective_user, timeformat
debug_docs_url = "https://wiki.ubuntu.com/DebuggingApparmor"
nobody_user = "nobody"
timeformat = "%c" # Automatically using locale format
original_effective_user = os.geteuid()
# setup exception handling
enable_aa_exception_handler()
# setup module translations
_ = init_translation()
# Register the on_exit method with atexit
# Takes care of closing the debug log etc
atexit.register(aa.on_exit)
# Set up UI logger for separate messages from UI module
debug_logger = DebugLogger('Notify')
debug_logger.debug("Starting aa-notify")
parser = argparse.ArgumentParser(description=_('Display AppArmor notifications or messages for DENIED entries.'))
parser.add_argument('-p', '--poll', action='store_true', help=_('poll AppArmor logs and display notifications'))
parser.add_argument('--display', type=str, help=_('set the DISPLAY environment variable (might be needed if sudo resets $DISPLAY)'))
parser.add_argument('-f', '--file', type=str, help=_('search FILE for AppArmor messages'))
parser.add_argument('-l', '--since-last', action='store_true', help=_('display stats since last login'))
parser.add_argument('-s', '--since-days', type=int, metavar=('NUM'), help=_('show stats for last NUM days (can be used alone or with -p)'))
parser.add_argument('-v', '--verbose', action='store_true', help=_('show messages with stats'))
parser.add_argument('-u', '--user', type=str, help=_('user to drop privileges to when not using sudo'))
parser.add_argument('-w', '--wait', type=int, metavar=('NUM'), help=_('wait NUM seconds before displaying notifications (with -p)'))
2024-08-13 16:58:25 +00:00
parser.add_argument('--prompt-filter', type=str, metavar=('PF'), help=_('kind of operations which display a popup prompt'))
2019-01-09 23:59:40 +01:00
parser.add_argument('--debug', action='store_true', help=_('debug mode'))
2020-10-25 15:48:41 +01:00
parser.add_argument('--configdir', type=str, help=argparse.SUPPRESS)
2019-04-19 23:12:32 +03:00
2024-02-23 17:15:14 -03:00
filter_group = parser.add_argument_group('Filtering options',
description=('Filters are used to reduce the output of information to only '
'those entries that will match the filter. Filters use Python\'s regular '
'expression syntax.'))
filter_group.add_argument('--filter.profile', metavar='PROFILE', help=_('regular expression to match the profile'))
filter_group.add_argument('--filter.operation', metavar='OPERATION', help=_('regular expression to match the operation'))
filter_group.add_argument('--filter.name', metavar='NAME', help=_('regular expression to match the name'))
filter_group.add_argument('--filter.denied', metavar='DENIED', help=_('regular expression to match the denied mask'))
filter_group.add_argument('--filter.family', metavar='FAMILY', help=_('regular expression to match the network family'))
filter_group.add_argument('--filter.socket', metavar='SOCKET', help=_('regular expression to match the network socket type'))
2019-04-19 23:12:32 +03:00
# If a TTY then assume running in test mode and fix output width
if not sys.stdout.isatty():
2019-08-12 23:27:41 +02:00
parser.formatter_class = lambda prog: argparse.HelpFormatter(prog, width=80)
2019-04-19 23:12:32 +03:00
2019-01-09 23:59:40 +01:00
args = parser.parse_args()
# Debug mode can be invoked directly with --debug or env LOGPROF_DEBUG=3
if args.debug:
debug_logger.activateStderr()
debug_logger.debug('Logging level: {}'.format(debug_logger.debug_level))
debug_logger.debug('Running as uid: {0[0]}, euid: {0[1]}, suid: {0[2]}'.format(os.getresuid()))
2020-10-19 19:14:59 -07:00
if args.poll:
debug_logger.debug('Running with --debug and --poll. Will exit in 100s')
2019-01-09 23:59:40 +01:00
# Sanity checks
user_ids = os.getresuid()
groups_ids = os.getresgid()
if user_ids[1] != user_ids[2]:
sys.exit("ERROR: Cannot be started with suid set!")
if groups_ids[1] != groups_ids[2]:
sys.exit("ERROR: Cannot be started with sgid set!")
# Define global variables that will be populated by init_aa()
# conf = None
logfile = None
2020-10-25 15:48:41 +01:00
if args.configdir: # prefer --configdir if given
confdir = args.configdir
else: # fallback to env variable (or None if not set)
confdir = os.getenv('__AA_CONFDIR')
aa.init_aa(confdir=confdir)
2019-01-09 23:59:40 +01:00
# Initialize aa.logfile
aa.set_logfile(args.file)
# Load global config reader
shell_config = aaconfig.Config('shell')
# Load system's notify.conf
2019-04-23 08:36:46 +03:00
# By default aa.CONFDIR is /etc/apparmor on most production systems
system_config = read_notify_conf(aa.CONFDIR, shell_config)
2019-01-09 23:59:40 +01:00
# Set default is no system notify.conf was found
if not system_config:
system_config = {'': {'show_notifications': 'yes'}}
# Load user's notify.conf
if os.path.isfile(os.environ['HOME'] + '/.apparmor/notify.conf'):
# Use legacy path if the conf file is there
user_config = read_notify_conf(os.environ['HOME'] + '/.apparmor', shell_config)
elif 'XDG_CONFIG_HOME' in os.environ and os.path.isfile(os.environ['XDG_CONFIG_HOME'] + '/apparmor/notify.conf'):
# Use XDG_CONFIG_HOME if it is defined
user_config = read_notify_conf(os.environ['XDG_CONFIG_HOME'] + '/apparmor', shell_config)
else:
# Fallback to the default value of XDG_CONFIG_HOME
user_config = read_notify_conf(os.environ['HOME'] + '/.config/apparmor', shell_config)
# Merge the two config dicts in an accurate and idiomatic way (requires Python 3.5)
config = {**system_config, **user_config}
"""
Possible configuration options:
- show_notifications
- message_body
- message_footer
- use_group
2024-08-13 16:58:25 +00:00
- userns_special_profiles
- ignore_denied_capability
- interface_theme
2024-09-17 09:17:23 +00:00
- prompt_filter
2024-02-23 17:15:14 -03:00
- filter.profile,
- filter.operation,
- filter.name,
- filter.denied,
- filter.family,
- filter.socket,
2019-01-09 23:59:40 +01:00
"""
# # Config checks
# Warn about unknown keys in the config
allowed_config_keys = [
'use_group',
2024-08-13 16:58:25 +00:00
'userns_special_profiles',
'ignore_denied_capability',
'interface_theme',
2024-09-17 09:17:23 +00:00
'prompt_filter',
2019-01-09 23:59:40 +01:00
'show_notifications',
'message_body',
2024-02-23 17:15:14 -03:00
'message_footer',
'filter.profile',
'filter.operation',
'filter.name',
'filter.denied',
'filter.family',
'filter.socket',
2019-01-09 23:59:40 +01:00
]
found_config_keys = config[''].keys()
2023-03-26 11:26:59 -07:00
unknown_keys = [
item for item in found_config_keys if item not in allowed_config_keys
]
2019-01-09 23:59:40 +01:00
for item in unknown_keys:
2020-11-19 12:32:36 -08:00
print(_('Warning! Configuration item "{}" is unknown!').format(item))
2019-01-09 23:59:40 +01:00
2024-02-23 17:15:14 -03:00
filters = {
'profile': '',
'operation': '',
'name': '',
'denied_mask': '',
'net_family': '',
'net_sock_type': '',
}
if 'filter.profile' in config['']:
filters['profile'] = config['']['filter.profile']
if 'filter.operation' in config['']:
filters['operation'] = config['']['filter.operation']
if 'filter.name' in config['']:
filters['name'] = config['']['filter.name']
if 'filter.denied' in config['']:
filters['denied_mask'] = config['']['filter.denied']
if 'filter.family' in config['']:
filters['net_family'] = config['']['filter.family']
if 'filter.socket' in config['']:
filters['net_sock_type'] = config['']['filter.socket']
# command line filters override notify.conf
if getattr(args, 'filter.profile'):
filters['profile'] = getattr(args, 'filter.profile')
if getattr(args, 'filter.operation'):
filters['operation'] = getattr(args, 'filter.operation')
if getattr(args, 'filter.name'):
filters['name'] = getattr(args, 'filter.name')
if getattr(args, 'filter.denied'):
filters['denied_mask'] = getattr(args, 'filter.denied')
if getattr(args, 'filter.family'):
filters['net_family'] = getattr(args, 'filter.family')
if getattr(args, 'filter.socket'):
filters['net_sock_type'] = getattr(args, 'filter.socket')
2024-02-26 10:24:46 -03:00
filters = compile_filter_regex(filters)
2019-01-09 23:59:40 +01:00
# Warn if use_group is defined and current group does not match defined
if 'use_group' in config['']:
user = pwd.getpwuid(os.geteuid())[0]
user_groups = [g.gr_name for g in grp.getgrall() if user in g.gr_mem]
gid = pwd.getpwnam(user).pw_gid
user_groups.append(grp.getgrgid(gid).gr_name)
if config['']['use_group'] not in user_groups:
print(
_('ERROR! User {user} not member of {group} group!').format(
user=user,
group=config['']['use_group']
),
file=sys.stderr
)
sys.exit(1)
# @TODO: Extend UI lib to have warning and error functions that
# can be used in an uniform way with both text and JSON output.
2024-08-13 16:58:25 +00:00
if 'userns_special_profiles' in config['']:
userns_special_profiles = config['']['userns_special_profiles'].strip().split(',')
else:
userns_special_profiles = ['unconfined'] # By default, unconfined is the only special profile
if 'ignore_denied_capability' in config['']:
ignore_denied_capability = config['']['ignore_denied_capability'].strip().split(',')
else:
ignore_denied_capability = ['sudo', 'su']
if 'interface_theme' in config['']:
set_interface_theme(config['']['interface_theme'].strip())
else:
set_interface_theme('ubuntu')
2024-09-17 09:17:23 +00:00
# Todo: add more kinds of notifications
supported_prompt_filter = {'userns'}
if not args.prompt_filter and 'prompt_filter' in config['']:
args.prompt_filter = config['']['prompt_filter']
if args.prompt_filter:
args.prompt_filter = set(args.prompt_filter.strip().split(','))
unsupported = args.prompt_filter - supported_prompt_filter
if unsupported:
sys.exit(_('ERROR: using an unsupported prompt filter: {}\nSupported values: {}').format(', '.join(unsupported), ', '.join(supported_prompt_filter)))
2019-01-09 23:59:40 +01:00
if args.file:
logfile = args.file
elif os.path.isfile('/var/run/auditd.pid') and os.path.isfile('/var/log/audit/audit.log'):
# If auditd is running, look at /var/log/audit/audit.log
logfile = '/var/log/audit/audit.log'
elif os.path.isfile('/var/log/kern.log'):
# For aa-notify, the fallback is kern.log, not syslog from aa.logfile
logfile = '/var/log/kern.log'
else:
# If all above failed, use aa cfg
logfile = aa.logfile
if args.verbose:
print(_('Using log file'), logfile)
if args.display:
os.environ['DISPLAY'] = args.display
if args.poll:
# Exit immediately if show_notifications is no or any of the options below
if config['']['show_notifications'] in [False, 'no', 'false', '0']:
print(_('Showing notifications forbidden in notify.conf, aborting..'))
sys.exit(0)
# Don't allow usage of aa-notify by root, must be some user. Desktop
# logins as root are not recommended and certainly not a use case for
# aa-notify notifications.
if not args.user and os.getuid() == 0 and 'SUDO_USER' not in os.environ.keys():
sys.exit("ERROR: Cannot be started a real root user. Use --user to define what user to use.")
2024-08-13 16:58:25 +00:00
# Required to parse_record.
rl = ReadLog('', '', '')
2024-09-17 09:17:23 +00:00
# Initialize the list of profiles for can_allow_rule
aa.read_profiles()
2019-01-09 23:59:40 +01:00
# At this point this script needs to be able to read 'logfile' but once
# the for loop starts, privileges can be dropped since the file descriptor
# has been opened and access granted. Further reads of the file will not
# trigger any new permission checks.
# @TODO Plan to catch PermissionError here or..?
2024-08-13 16:58:25 +00:00
for (event, message) in notify_about_new_entries(logfile, filters, args.wait):
ev = rl.parse_record(event)
# @TODO redo special behaviours with a more regular function
# We ignore capability denials for binaries in ignore_denied_capability
if ev['operation'] == 'capable' and ev['comm'] in ignore_denied_capability:
continue
# Special behaivor for userns:
if args.prompt_filter and 'userns' in args.prompt_filter and customized_message['userns']['cond'](ev, userns_special_profiles):
if prompt_userns(ev, userns_special_profiles):
continue # Notification already displayed for this event, we go to the next one.
2019-01-09 23:59:40 +01:00
# Notifications should not be run as root, since root probably is
# the wrong desktop user and not the one getting the notifications.
drop_privileges()
# sudo does not preserve DBUS address, so we need to guess it based on UID
if 'DBUS_SESSION_BUS_ADDRESS' not in os.environ:
os.environ['DBUS_SESSION_BUS_ADDRESS'] = 'unix:path=/run/user/{}/bus'.format(os.geteuid())
2024-08-13 16:58:25 +00:00
message = customize_notification_message(ev, message, userns_special_profiles)
2019-01-09 23:59:40 +01:00
n = notify2.Notification(
2024-08-13 16:58:25 +00:00
_('AppArmor security notice'),
2019-01-09 23:59:40 +01:00
message,
'gtk-dialog-warning'
)
2024-08-13 16:58:25 +00:00
2024-09-17 09:17:23 +00:00
if can_allow_rule(ev, userns_special_profiles):
n.add_action('clicked', 'Allow', cb_add_to_profile, (event, rl, userns_special_profiles))
n.add_action('more_clicked', 'Show More', cb_more_info, (event, rl, userns_special_profiles))
2024-08-13 16:58:25 +00:00
2019-01-09 23:59:40 +01:00
n.show()
# When notification is sent, raise privileged back to root if the
2020-11-19 12:30:04 -08:00
# original effective user id was zero (to be able to read AppArmor logs)
2019-01-09 23:59:40 +01:00
raise_privileges()
elif args.since_last:
2024-02-23 17:15:14 -03:00
show_entries_since_last_login(logfile, filters)
2019-01-09 23:59:40 +01:00
elif args.since_days:
2024-02-23 17:15:14 -03:00
show_entries_since_days(logfile, args.since_days, filters)
2019-01-09 23:59:40 +01:00
else:
parser.print_help()
sys.exit(0)
if __name__ == '__main__':
main()