mirror of
https://gitlab.com/apparmor/apparmor.git
synced 2025-03-04 08:24:42 +01:00
1149 lines
47 KiB
Python
Executable file
1149 lines
47 KiB
Python
Executable file
#! /usr/bin/python3
|
||
# ----------------------------------------------------------------------
|
||
# Copyright (C) 2018–2022 Otto Kekäläinen <otto@kekalainen.net>
|
||
#
|
||
# This program is free software; you can redistribute it and/or
|
||
# modify it under the terms of version 2 of the GNU General Public
|
||
# License as published by the Free Software Foundation.
|
||
#
|
||
# This program is distributed in the hope that it will be useful,
|
||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
# GNU General Public License for more details.
|
||
#
|
||
# ----------------------------------------------------------------------
|
||
#
|
||
# /etc/apparmor/notify.conf:
|
||
# # set to 'yes' to enable AppArmor DENIED notifications
|
||
# show_notifications="yes"
|
||
#
|
||
# # only people in use_group can run this script
|
||
# use_group="admin"
|
||
#
|
||
# $HOME/.apparmor/notify.conf can have:
|
||
# # set to 'yes' to enable AppArmor DENIED notifications
|
||
# show_notifications="yes"
|
||
#
|
||
# In a typical desktop environment one would run as a service the
|
||
# command:
|
||
# /usr/bin/aa-notify -p -w 10
|
||
#
|
||
"""Show AppArmor events on command line or as desktop notifications."""
|
||
|
||
import argparse
|
||
import atexit
|
||
import grp
|
||
import os
|
||
import pwd
|
||
import re
|
||
import sys
|
||
import tempfile
|
||
import time
|
||
|
||
import subprocess
|
||
from collections import defaultdict
|
||
|
||
import notify2
|
||
import psutil
|
||
|
||
import apparmor.aa as aa
|
||
import apparmor.ui as aaui
|
||
import apparmor.config as aaconfig
|
||
import apparmor.update_profile as update_profile
|
||
import LibAppArmor # C-library to parse one log line
|
||
from apparmor.common import DebugLogger, open_file_read
|
||
from apparmor.fail import enable_aa_exception_handler
|
||
from apparmor.notify import get_last_login_timestamp
|
||
from apparmor.translations import init_translation
|
||
from apparmor.logparser import ReadLog
|
||
from apparmor.gui import UsernsGUI, ErrorGUI, ShowMoreGUI, ShowMoreGUIAggregated, set_interface_theme
|
||
from apparmor.rule.file import FileRule
|
||
|
||
from dbus import DBusException
|
||
import gi
|
||
from gi.repository import GLib
|
||
import threading
|
||
|
||
gi.require_version('GLib', '2.0')
|
||
|
||
|
||
def get_user_login():
|
||
"""Portable function to get username.
|
||
|
||
Should not trigger any
|
||
"OSError: [Errno 25] Inappropriate ioctl for device" errors in Giltab-CI.
|
||
"""
|
||
if os.name == "posix":
|
||
username = pwd.getpwuid(os.geteuid()).pw_name
|
||
else:
|
||
username = os.environ.get('USER')
|
||
if not username and hasattr(os, 'getlogin'):
|
||
username = os.getlogin()
|
||
return username
|
||
|
||
|
||
def format_event(event, logsource):
|
||
"""Generate the notification text contents."""
|
||
output = []
|
||
|
||
if 'message_body' in config['']:
|
||
output += [config['']['message_body']]
|
||
|
||
if event.profile:
|
||
output += ['Profile: {}'.format(event.profile)]
|
||
if event.operation:
|
||
output += ['Operation: {}'.format(event.operation)]
|
||
if event.name:
|
||
output += ['Name: {}'.format(event.name)]
|
||
if event.denied_mask:
|
||
output += ['Denied: {}'.format(event.denied_mask)]
|
||
if event.net_family and event.net_sock_type:
|
||
output += ['Family: {}'.format(event.net_family)]
|
||
output += ['Socket: {}'.format(event.net_sock_type)]
|
||
|
||
output += ['Logfile: {}'.format(logsource)]
|
||
|
||
return "\n".join(output)
|
||
|
||
|
||
def is_event_in_filter(event, filters):
|
||
"""Checks if event is in filter"""
|
||
if filters['profile'] and event.profile and not filters['profile_re'].match(event.profile):
|
||
return False
|
||
if filters['operation'] and event.operation and not filters['operation_re'].match(event.operation):
|
||
return False
|
||
if filters['name'] and event.name and not filters['name_re'].match(event.name):
|
||
return False
|
||
if filters['denied_mask'] and event.denied_mask and not filters['denied_mask_re'].match(event.denied_mask):
|
||
return False
|
||
if filters['net_family'] and event.net_family and not filters['net_family_re'].match(event.net_family):
|
||
return False
|
||
if filters['net_sock_type'] and event.net_sock_type and not filters['net_sock_type_re'].match(event.net_sock_type):
|
||
return False
|
||
return True
|
||
|
||
|
||
def daemonize():
|
||
"""Run the notification daemon in the background."""
|
||
# Kill other instances of aa-notify if already running
|
||
for process in psutil.process_iter():
|
||
# Find the process that has the same name as this script, e.g. aa-notify.py
|
||
if process.name() == os.path.basename(__file__) and process.pid != os.getpid():
|
||
print(_('Killing old daemon (PID {})...').format(process.pid))
|
||
os.kill(process.pid, 15)
|
||
|
||
# Spawn/fork into the background and stay running
|
||
newpid = os.fork()
|
||
if newpid == 0:
|
||
|
||
# Follow the logfile and stream notifications
|
||
# Rate limit to not show too many notifications
|
||
try:
|
||
# Before use, notify2 must be initialized and the DBUS channel
|
||
# should be opened using the non-root user. This step needs to
|
||
# be executed after the drop_privileges().
|
||
notify2.init('aa-notify', mainloop='glib')
|
||
except DBusException:
|
||
sys.exit(_('Cannot initialize notify2. Please check that your terminal can use a graphical interface'))
|
||
|
||
thread = threading.Thread(target=start_glib_loop)
|
||
thread.daemon = True
|
||
thread.start()
|
||
else:
|
||
print(_('Notification emitter started in the background'))
|
||
# pids = (os.getpid(), newpid)
|
||
# print("parent: %d, child: %d\n" % pids)
|
||
os._exit(0) # Exit child without calling exit handlers etc
|
||
|
||
|
||
def notify_about_new_entries(logfile, filters, wait=0):
|
||
try:
|
||
for event in follow_apparmor_events(logfile, wait):
|
||
if not is_event_in_filter(event, filters):
|
||
continue
|
||
debug_logger.info(format_event(event, logfile))
|
||
yield event, format_event(event, logfile)
|
||
except PermissionError:
|
||
sys.exit(_("ERROR: Cannot read {}. Please check permissions.").format(logfile))
|
||
|
||
|
||
def show_entries_since_epoch(logfile, epoch_since, filters):
|
||
"""Show AppArmor notifications since given timestamp."""
|
||
count = 0
|
||
for event in get_apparmor_events(logfile, epoch_since):
|
||
if not is_event_in_filter(event, filters):
|
||
continue
|
||
count += 1
|
||
if args.verbose:
|
||
print(format_event(event, logfile))
|
||
print() # Print a newline after each entry for better readability
|
||
|
||
aaui.UI_Info(_('AppArmor denials: {count} (since {date})').format(
|
||
**{
|
||
'count': count,
|
||
'date': time.strftime(timeformat, time.localtime(epoch_since))
|
||
}
|
||
))
|
||
|
||
if args.verbose:
|
||
if 'message_footer' in config['']:
|
||
print(config['']['message_footer'])
|
||
else:
|
||
print(_('For more information, please see: {}').format(debug_docs_url))
|
||
|
||
|
||
def show_entries_since_last_login(logfile, filters, username=get_user_login()):
|
||
"""Show AppArmor notifications since last login of user."""
|
||
# If running as sudo, use username of sudo user instead of root
|
||
if 'SUDO_USER' in os.environ.keys():
|
||
username = os.environ['SUDO_USER']
|
||
|
||
if args.verbose:
|
||
print(_('Showing entries since {} logged in').format(username))
|
||
print() # Newline
|
||
epoch_since = get_last_login_timestamp(username)
|
||
if epoch_since == 0:
|
||
print(_('ERROR: Could not find last login'), file=sys.stderr)
|
||
sys.exit(1)
|
||
show_entries_since_epoch(logfile, epoch_since, filters)
|
||
|
||
|
||
def show_entries_since_days(logfile, since_days, filters):
|
||
"""Show AppArmor notifications since the given amount of days."""
|
||
day_in_seconds = 60 * 60 * 24
|
||
epoch_now = int(time.time())
|
||
epoch_since = epoch_now - day_in_seconds * since_days
|
||
show_entries_since_epoch(logfile, epoch_since, filters)
|
||
|
||
|
||
def follow_apparmor_events(logfile, wait=0):
|
||
"""Follow AppArmor events and yield relevant entries until process stops."""
|
||
# If wait was given as argument but was type None (from ArgumentParser)
|
||
# ensure it's type int and zero
|
||
if not wait:
|
||
wait = 0
|
||
|
||
# Record start time here so wait can be calculated later
|
||
start_time = int(time.time())
|
||
|
||
# Record initial file size to detect if log rotates
|
||
log_size = os.stat(logfile).st_size
|
||
# Record initial file inode number to detect if log gets renamed
|
||
log_inode = os.stat(logfile).st_ino
|
||
|
||
# @TODO Implement more log sources in addition to just the logfile
|
||
with open_file_read(logfile) as logdata:
|
||
|
||
# Loop all pre-existing events in the log source once so later runs
|
||
# will only see new events
|
||
for discarded_event in logdata:
|
||
pass
|
||
|
||
# @TODO: while+sleep will cause CPU interruptions once per second,
|
||
# so switch to epoll/inotify/etc for less resource consumption.
|
||
while True:
|
||
debug_logger.debug(
|
||
'Poll AppArmor event source {} seconds since start'.
|
||
format(int(time.time()) - start_time)
|
||
)
|
||
|
||
(logdata, log_inode, log_size) = reopen_logfile_if_needed(logfile, logdata, log_inode, log_size)
|
||
|
||
for event in parse_logdata(logdata):
|
||
# @TODO Alternatively use os.times()
|
||
if int(time.time()) - start_time < wait:
|
||
debug_logger.debug('Omitted an event seen during wait time')
|
||
continue
|
||
yield event
|
||
|
||
if debug_logger.debugging and debug_logger.debug_level <= 10 and int(time.time()) - start_time > 100:
|
||
debug_logger.debug('Debug mode detected: aborting notification emitter after 100 seconds.')
|
||
sys.exit(0)
|
||
|
||
time.sleep(1)
|
||
|
||
|
||
def reopen_logfile_if_needed(logfile, logdata, log_inode, log_size):
|
||
retry = True
|
||
|
||
while retry:
|
||
try:
|
||
# Reopen file if inode has changed, e.g. rename by logrotate
|
||
if os.stat(logfile).st_ino != log_inode:
|
||
debug_logger.debug('Logfile was renamed, reload to read the new file.')
|
||
logdata = open(logfile, 'r')
|
||
# Store new inode number for next comparisons
|
||
log_inode = os.stat(logfile).st_ino
|
||
|
||
# Start reading from the beginning if file shrank
|
||
if os.stat(logfile).st_size < log_size:
|
||
debug_logger.debug('Logfile shrank in size, reload from beginning.')
|
||
logdata.seek(0)
|
||
log_size = os.stat(logfile).st_size # Reset file size value
|
||
|
||
# Record new file size if grown
|
||
if os.stat(logfile).st_size > log_size:
|
||
log_size = os.stat(logfile).st_size
|
||
|
||
retry = False
|
||
except FileNotFoundError:
|
||
# @TODO: switch to epoll/inotify/
|
||
debug_logger.debug('Logfile not found, retrying.')
|
||
time.sleep(1)
|
||
# @TODO: send notification if reopening the log fails too many times
|
||
|
||
return (logdata, log_inode, log_size)
|
||
|
||
|
||
def get_apparmor_events_return(logfile, since=0):
|
||
"""Read audit events from log source and return all relevant events."""
|
||
out = []
|
||
# Get logdata from file
|
||
# @TODO Implement more log sources in addition to just the logfile
|
||
try:
|
||
with open_file_read(logfile) as logdata:
|
||
for event in parse_logdata(logdata):
|
||
if event.epoch > since:
|
||
out.append(event)
|
||
|
||
return out
|
||
except PermissionError:
|
||
sys.exit(_("ERROR: Cannot read {}. Please check permissions.".format(logfile)))
|
||
|
||
|
||
def get_apparmor_events(logfile, since=0):
|
||
"""Read audit events from log source and yield all relevant events."""
|
||
|
||
# Get logdata from file
|
||
# @TODO Implement more log sources in addition to just the logfile
|
||
try:
|
||
with open_file_read(logfile) as logdata:
|
||
for event in parse_logdata(logdata):
|
||
if event.epoch > since:
|
||
yield event
|
||
except PermissionError:
|
||
sys.exit(_("ERROR: Cannot read {}. Please check permissions.".format(logfile)))
|
||
|
||
|
||
def parse_logdata(logsource):
|
||
"""Traverse any iterable log source and extract relevant AppArmor events.
|
||
|
||
Expects log lines like:
|
||
Feb 16 20:22:28 XPS-13-9370 kernel: [520374.926882] audit: type=1400
|
||
audit(1581877348.868:657): apparmor="ALLOWED" operation="open"
|
||
profile="libreoffice-soffice"
|
||
name="/usr/share/drirc.d/00-mesa-defaults.conf" pid=22690
|
||
comm="soffice.bin" requested_mask="r" denied_mask="r" fsuid=1001 ouid=0
|
||
"""
|
||
|
||
re_audit_time_id = r'(msg=)?audit\([\d\.\:]+\):\s+' # 'audit(1282626827.320:411): '
|
||
re_kernel_time = r'\[[\d\.\s]+\]' # '[ 1612.746129]'
|
||
re_type_num = '1[45][0-9][0-9]' # 1400..1599
|
||
re_aa_or_op = '(apparmor=|operation=)'
|
||
|
||
re_log_parts = [
|
||
r'kernel:\s+(' + re_kernel_time + r'\s+)?(audit:\s+)?type=' + re_type_num + r'\s+' + re_audit_time_id + re_aa_or_op, # v2_6 syslog
|
||
r'kernel:\s+(' + re_kernel_time + r'\s+)?' + re_audit_time_id + 'type=' + re_type_num + r'\s+' + re_aa_or_op,
|
||
'type=(AVC|APPARMOR[_A-Z]*|' + re_type_num + r')\s+' + re_audit_time_id + '(type=' + re_type_num + r'\s+)?' + re_aa_or_op, # v2_6 audit and dmesg
|
||
r'type=USER_AVC\s+' + re_audit_time_id + '.*apparmor=', # dbus
|
||
r'type=UNKNOWN\[' + re_type_num + r'\]\s+' + re_audit_time_id + re_aa_or_op,
|
||
r'dbus\[[0-9]+\]:\s+apparmor=', # dbus
|
||
]
|
||
|
||
# Pre-filter log lines so that we hand over only relevant lines to LibAppArmor parsing
|
||
re_log_all = re.compile('(' + '|'.join(re_log_parts) + ')')
|
||
|
||
for entry in logsource:
|
||
|
||
# Check the start of the log line and only process lines from AppArmor
|
||
apparmor_entry = re_log_all.search(entry)
|
||
if apparmor_entry:
|
||
# Parse the line using LibAppArmor (C library)
|
||
# See aalogparse.h for data structure
|
||
event = LibAppArmor.parse_record(entry)
|
||
# Only show actual events of contained programs and ignore among
|
||
# others AppArmor profile reloads
|
||
if event.operation and event.operation[0:8] != 'profile_':
|
||
yield event
|
||
|
||
|
||
def drop_privileges():
|
||
"""Drop privileges of process.
|
||
|
||
If running as root, drop privileges to USER if known, or fall-back to
|
||
nobody_user/group.
|
||
"""
|
||
if os.geteuid() == 0:
|
||
|
||
if 'SUDO_USER' in os.environ.keys():
|
||
next_username = os.environ['SUDO_USER']
|
||
next_uid = os.environ['SUDO_UID']
|
||
next_gid = os.environ['SUDO_GID']
|
||
else:
|
||
nobody_user_info = pwd.getpwnam(nobody_user)
|
||
next_username = nobody_user_info[0]
|
||
next_uid = nobody_user_info[2]
|
||
next_gid = nobody_user_info[3]
|
||
|
||
debug_logger.debug('Dropping to user "{}" privileges'.format(next_username))
|
||
|
||
# @TODO?
|
||
# Remove group privileges, including potential 'adm' group that might
|
||
# have had log read access but also other accesses.
|
||
# os.setgroups([])
|
||
|
||
# Try setting the new uid/gid
|
||
# Set gid first, otherwise the latter step would fail on missing permissions
|
||
os.setegid(int(next_gid))
|
||
os.seteuid(int(next_uid))
|
||
|
||
# sudo does not preserve DBUS address, so we need to guess it based on UID
|
||
if 'DBUS_SESSION_BUS_ADDRESS' not in os.environ:
|
||
os.environ['DBUS_SESSION_BUS_ADDRESS'] = 'unix:path=/run/user/{}/bus'.format(os.geteuid())
|
||
|
||
|
||
def raise_privileges():
|
||
"""Raise privileges of process.
|
||
|
||
If was running as user with saved user ID 0, raise back to root privileges.
|
||
"""
|
||
if os.geteuid() != 0 and original_effective_user == 0:
|
||
|
||
debug_logger.debug('Rasing privileges from UID {} back to UID 0 (root)'.format(os.geteuid()))
|
||
|
||
# os.setgid(int(next_gid))
|
||
os.seteuid(original_effective_user)
|
||
|
||
|
||
def read_notify_conf(path, shell_config):
|
||
"""Read notify.conf."""
|
||
try:
|
||
shell_config.CONF_DIR = path
|
||
conf_dict = shell_config.read_config('notify.conf')
|
||
debug_logger.debug('Found configuration file in {}/notify.conf'.format(shell_config.CONF_DIR))
|
||
return conf_dict
|
||
except FileNotFoundError:
|
||
return {}
|
||
|
||
|
||
def compile_filter_regex(filters):
|
||
"""Compile each filter regex and add it to filters"""
|
||
if filters['profile']:
|
||
filters['profile_re'] = re.compile(filters['profile'])
|
||
if filters['operation']:
|
||
filters['operation_re'] = re.compile(filters['operation'])
|
||
if filters['name']:
|
||
filters['name_re'] = re.compile(filters['name'])
|
||
if filters['denied_mask']:
|
||
filters['denied_mask_re'] = re.compile(filters['denied_mask'])
|
||
if filters['net_family']:
|
||
filters['net_family_re'] = re.compile(filters['net_family'])
|
||
if filters['net_sock_type']:
|
||
filters['net_sock_type_re'] = re.compile(filters['net_sock_type'])
|
||
|
||
return filters
|
||
|
||
|
||
def can_allow_rule(ev, special_profiles):
|
||
if customized_message['userns']['cond'](ev, special_profiles):
|
||
return not aa.get_profile_filename_from_profile_name(ev['comm'])
|
||
else:
|
||
return aa.get_profile_filename_from_profile_name(ev['profile']) is not None
|
||
|
||
|
||
def is_special_profile_userns(ev, special_profiles):
|
||
if not special_profiles or ev['profile'] not in special_profiles:
|
||
return False # We don't use special profiles or there is already a profile defined: we don't ask to add userns
|
||
|
||
if 'execpath' not in ev or not ev['execpath']:
|
||
ev['execpath'] = aa.find_executable(ev['comm'])
|
||
|
||
return True
|
||
|
||
|
||
def create_userns_profile(name, path, ans):
|
||
update_profile_path = update_profile.__file__
|
||
|
||
local_template_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'default_unconfined.template')
|
||
if os.path.exists(local_template_path): # We are using local aa-notify -> we use local template
|
||
template_path = local_template_path
|
||
else:
|
||
template_path = aa.CONFDIR + '/default_unconfined.template'
|
||
|
||
if path is None:
|
||
UsernsGUI.show_error_cannot_find_execpath(name, template_path)
|
||
return
|
||
|
||
profile_path = aa.get_profile_filename_from_profile_name(name, True)
|
||
if not profile_path:
|
||
ErrorGUI(_('Cannot get profile path for {}.').format(name), False).show()
|
||
return
|
||
|
||
command = ['pkexec', '--keep-cwd', update_profile_path, 'create_userns', template_path, name, path, profile_path, ans]
|
||
|
||
try:
|
||
subprocess.run(command, check=True)
|
||
except subprocess.CalledProcessError as e:
|
||
if e.returncode != 126: # return code 126 means the user cancelled the request
|
||
UsernsGUI.show_error_cannot_reload_profile(profile_path, e.returncode)
|
||
|
||
|
||
def ask_for_user_ns_denied(path, name, interactive=True):
|
||
if interactive:
|
||
gui = UsernsGUI(name, path)
|
||
ans = gui.show()
|
||
else:
|
||
ans = 'allow'
|
||
if ans in {'allow', 'deny'}:
|
||
create_userns_profile(name, path, ans)
|
||
else:
|
||
debug_logger.debug('No action from the user for {}'.format(path))
|
||
|
||
|
||
def can_leverage_userns_event(ev):
|
||
if ev['execpath'] is None:
|
||
return 'error_cannot_find_path'
|
||
|
||
aa.update_profiles()
|
||
|
||
if aa.get_profile_filename_from_profile_name(ev['comm']):
|
||
return 'error_userns_profile_exists'
|
||
return 'ok'
|
||
|
||
|
||
def prompt_userns(ev):
|
||
"""If the user namespace creation denial was generated by an unconfined binary, displays a graphical notification.
|
||
Creates a new profile to allow userns if the user wants it. Returns whether a notification was displayed to the user
|
||
"""
|
||
userns_event_usable = can_leverage_userns_event(ev)
|
||
if userns_event_usable == 'error_cannot_find_path':
|
||
UsernsGUI.show_error_cannot_find_execpath(ev['comm'], os.path.dirname(os.path.abspath(__file__)) + '/default_unconfined.template')
|
||
elif userns_event_usable == 'error_userns_profile_exists':
|
||
# There is already a profile with this name: we show an error to the user.
|
||
# We could use the full path as profile name like for the old profiles if we want to handle this case
|
||
# but if execpath is not supported by the kernel it could also mean that we inferred a bad path
|
||
# So we do nothing beyond showing this error.
|
||
ErrorGUI(
|
||
_('Application {profile} tried to create an user namespace, but a profile already exists with this name.\n'
|
||
'This is likely because there is several binaries named {profile} thus the path inferred by AppArmor ({inferred_path}) is not correct.\n'
|
||
'You should review your profiles (in {profile_dir}).').format(profile=ev['comm'], inferred_path=ev['execpath'], profile_dir=aa.profile_dir),
|
||
False).show()
|
||
elif userns_event_usable == 'ok':
|
||
ask_for_user_ns_denied(ev['execpath'], ev['comm'])
|
||
|
||
|
||
def get_more_info_about_event(rl, ev, special_profiles, header='', get_clean_rule=False):
|
||
out = header
|
||
clean_rule = None
|
||
|
||
for key, value in ev.items():
|
||
if value:
|
||
out += '\t{} = {}\n'.format(_(key), value)
|
||
|
||
out += _('\nThe software that declined this operation is {}\n').format(ev['profile'])
|
||
|
||
rule = rl.create_rule_from_ev(ev)
|
||
|
||
if rule:
|
||
if type(rule) is FileRule and rule.exec_perms == FileRule.ANY_EXEC:
|
||
rule.exec_perms = 'Pix'
|
||
aa.update_profiles()
|
||
if customized_message['userns']['cond'](ev, special_profiles):
|
||
profile_path = None
|
||
out += _('You may allow it through a dedicated unconfined profile for {}.').format(ev['comm'])
|
||
userns_event_usable = can_leverage_userns_event(ev)
|
||
if userns_event_usable == 'error_cannot_find_path':
|
||
clean_rule = _('# You may allow it through a dedicated unconfined profile for {0}. However, apparmor cannot find {0}. If you want to allow it, please create a profile for it manually.').format(ev['comm'])
|
||
elif userns_event_usable == 'error_userns_profile_exists':
|
||
clean_rule = _('# You may allow it through a dedicated unconfined profile for {} ({}). However, a profile already exists with this name. If you want to allow it, please create a profile for it manually.').format(ev['comm'], ev['execpath'])
|
||
elif userns_event_usable == 'ok':
|
||
clean_rule = _('# You may allow it through a dedicated unconfined profile for {} ({})').format(ev['comm'], ev['execpath'])
|
||
else:
|
||
profile_path = aa.get_profile_filename_from_profile_name(ev['profile'])
|
||
clean_rule = rule.get_clean()
|
||
if profile_path:
|
||
out += _('If you want to allow this operation you can add the line below in profile {}\n').format(profile_path)
|
||
out += clean_rule
|
||
else:
|
||
out += _('However {profile} is not in {profile_dir}\nIt is likely that the profile was not stored in {profile_dir} or was removed.\n').format(profile=ev['profile'], profile_dir=aa.profile_dir)
|
||
else: # Should not happen
|
||
out += _('ERROR: Could not create rule from event.')
|
||
profile_path = None
|
||
|
||
if get_clean_rule:
|
||
return out, profile_path, clean_rule
|
||
else:
|
||
return out, profile_path
|
||
|
||
|
||
# TODO reuse more code from aa-logprof in callbacks
|
||
def cb_more_info(notification, action, _args):
|
||
(ev, rl, special_profiles) = _args
|
||
notification.close()
|
||
|
||
out, profile_path, clean_rule = get_more_info_about_event(rl, ev, special_profiles, _('Operation denied by AppArmor\n\n'), get_clean_rule=True)
|
||
|
||
ans = ShowMoreGUI(profile_path, out, clean_rule, ev['profile'], profile_path is not None).show()
|
||
if ans == 'add_rule':
|
||
add_to_profile(clean_rule, ev['profile'])
|
||
elif ans in {'allow', 'deny'}:
|
||
create_userns_profile(ev['comm'], ev['execpath'], ans)
|
||
|
||
|
||
def add_to_profile(rule, profile_name):
|
||
# We get update_profile.py through this import so that it works in all cases
|
||
profile_path = aa.get_profile_filename_from_profile_name(profile_name)
|
||
|
||
if not profile_path:
|
||
ErrorGUI(
|
||
_(
|
||
'Cannot find profile for {}\n\n'
|
||
'It is likely that the profile was not stored in {} or was removed.'
|
||
).format(profile_name, aa.profile_dir),
|
||
False
|
||
).show()
|
||
return
|
||
|
||
update_profile_path = update_profile.__file__
|
||
command = ['pkexec', '--keep-cwd', update_profile_path, 'add_rule', rule, profile_name]
|
||
try:
|
||
subprocess.run(command, check=True)
|
||
except subprocess.CalledProcessError as e:
|
||
if e.returncode != 126: # return code 126 means the user cancelled the request
|
||
ErrorGUI(_('Failed to add rule {rule} to {profile}\nError code = {retcode}').format(rule=rule, profile=profile_name, retcode=e.returncode), False).show()
|
||
|
||
|
||
def create_from_file(file_path):
|
||
update_profile_path = update_profile.__file__
|
||
command = ['pkexec', '--keep-cwd', update_profile_path, 'from_file', file_path]
|
||
try:
|
||
subprocess.run(command, check=True)
|
||
except subprocess.CalledProcessError as e:
|
||
if e.returncode != 126: # return code 126 means the user cancelled the request
|
||
ErrorGUI(_('Failed to add some rules'), False).show()
|
||
|
||
|
||
def allow_all(clean_rules):
|
||
local_template_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'default_unconfined.template')
|
||
if os.path.exists(local_template_path): # We are using local aa-notify -> we use local template
|
||
template_path = local_template_path
|
||
else:
|
||
template_path = aa.CONFDIR + '/default_unconfined.template'
|
||
|
||
tmp = tempfile.NamedTemporaryFile()
|
||
with open(tmp.name, mode='w') as f:
|
||
profile = None
|
||
for line in clean_rules.splitlines():
|
||
if line == '':
|
||
continue
|
||
elif line[0] == '#':
|
||
profile = None
|
||
pass
|
||
elif line[0] != '\t':
|
||
profile = line[8:-1] # 8:-1 is to remove 'profile ' and ':'
|
||
else:
|
||
if line[1] == '#': # Add to userns
|
||
if line[-1] == '.': # '.' <==> There is an error: we cannot add the profile automatically
|
||
continue
|
||
profile_name = line.split()[-2] # line always finishes by <profile_name>
|
||
bin_path = line.split()[-1][1:-1] # 1:-1 to remove the parenthesis
|
||
profile_path = aa.get_profile_filename_from_profile_name(profile_name, True)
|
||
if not profile_path:
|
||
ErrorGUI(_('Cannot get profile path for {}.').format(profile_name), False).show()
|
||
continue
|
||
f.write('create_userns\t{}\t{}\t{}\t{}\t{}\n'.format(template_path, profile_name, bin_path, profile_path, 'allow'))
|
||
else:
|
||
if profile is not None:
|
||
f.write('add_rule\t{}\t{}\n'.format(line[1:], profile))
|
||
else:
|
||
print(_("Rule {} cannot be added automatically").format(line[1:]), file=sys.stdout)
|
||
|
||
create_from_file(tmp.name)
|
||
|
||
|
||
# TODO reuse more code from aa-logprof in callbacks
|
||
def cb_more_info_aggregated(notification, action, _args):
|
||
(to_display, aggregated, clean_rules) = _args
|
||
res = ShowMoreGUIAggregated(to_display, aggregated, clean_rules).show()
|
||
if res == 'allow_all':
|
||
allow_all(clean_rules)
|
||
|
||
|
||
def cb_add_to_profile(notification, action, _args):
|
||
(ev, rl, special_profiles) = _args
|
||
notification.close()
|
||
|
||
rule = rl.create_rule_from_ev(ev)
|
||
|
||
# Exec events are created with the default FileRule.ANY_EXEC. We use Pix for actual rules
|
||
if type(rule) is FileRule and rule.exec_perms == FileRule.ANY_EXEC:
|
||
rule.exec_perms = 'Pix'
|
||
|
||
if not rule:
|
||
ErrorGUI(_('ERROR: Could not create rule from event.'), False).show()
|
||
return
|
||
|
||
aa.update_profiles()
|
||
|
||
if customized_message['userns']['cond'](ev, special_profiles):
|
||
ask_for_user_ns_denied(ev['execpath'], ev['comm'], False)
|
||
else:
|
||
add_to_profile(rule.get_clean(), ev['profile'])
|
||
|
||
|
||
customized_message = {
|
||
'userns': {
|
||
'cond': lambda ev, special_profiles: (ev['operation'] == 'userns_create' or ev['operation'] == 'capable') and is_special_profile_userns(ev, special_profiles),
|
||
'msg': 'Application {0} wants to create an user namespace which could be used to compromise your system\nDo you want to allow it next time {0} is run?'
|
||
}
|
||
}
|
||
|
||
|
||
def customize_notification_message(ev, msg, special_profiles):
|
||
if customized_message['userns']['cond'](ev, special_profiles):
|
||
msg = _(customized_message['userns']['msg']).format(ev['comm'])
|
||
|
||
return msg
|
||
|
||
|
||
def start_glib_loop():
|
||
loop = GLib.MainLoop()
|
||
loop.run()
|
||
|
||
|
||
def aggregate_event(agg, ev, keys_to_aggregate):
|
||
profile = ev['profile']
|
||
agg[profile]['count'] += 1
|
||
agg[profile]['events'].append(ev)
|
||
|
||
for key in keys_to_aggregate:
|
||
if key in ev:
|
||
value = ev[key]
|
||
agg[profile]['values'][key][value] += 1
|
||
|
||
return agg
|
||
|
||
|
||
def get_aggregated(rl, agg, max_nb_profiles, keys_to_aggregate, special_profiles):
|
||
notification = ''
|
||
summary = ''
|
||
more_info = ''
|
||
clean_rules = ''
|
||
summary = _('Notifications were raised for profiles: {}\n').format(', '.join(list(agg.keys())))
|
||
|
||
sorted_profiles = sorted(agg.items(), key=lambda item: item[1]['count'], reverse=True)
|
||
for profile, data in sorted_profiles:
|
||
profile_notif = _('profile: {} — {} events\n').format(profile, data['count'])
|
||
notification += profile_notif
|
||
summary += profile_notif
|
||
if len(agg) <= max_nb_profiles:
|
||
for key in keys_to_aggregate:
|
||
if key in data['values']:
|
||
total_key_events = sum(data['values'][key].values())
|
||
sorted_values = sorted(data['values'][key].items(), key=lambda item: item[1], reverse=True)
|
||
for value, count in sorted_values:
|
||
percent = (count / total_key_events) * 100
|
||
if percent >= 20: # We exclude rare cases for clarity. 20% is arbitrary
|
||
summary += _('\t{} was {} {:.1f}% of the time\n').format(key, value, percent)
|
||
summary += '\n'
|
||
|
||
more_info += _('profile {}, {} events\n').format(profile, data['count'])
|
||
rules_for_profiles = set()
|
||
found_profile = True
|
||
for i, ev in enumerate(data['events']):
|
||
more_info_rule, profile_path, clean_rule = get_more_info_about_event(rl, ev, special_profiles, _(' - Event {} -\n').format(i + 1), get_clean_rule=True)
|
||
if i != 0:
|
||
more_info += '\n\n'
|
||
if not profile_path:
|
||
found_profile = False
|
||
more_info += more_info_rule
|
||
if clean_rule:
|
||
rules_for_profiles.add(clean_rule)
|
||
if rules_for_profiles != set():
|
||
if profile not in special_profiles:
|
||
if found_profile:
|
||
clean_rules += _('profile {}:').format(profile)
|
||
else:
|
||
clean_rules += _('# Unknown profile {}').format(profile)
|
||
else:
|
||
clean_rules += _('# unprivileged userns denials ({}):').format(profile)
|
||
clean_rules += '\n\t' + '\n\t'.join(rules_for_profiles) + '\n'
|
||
return notification, summary, more_info, clean_rules
|
||
|
||
|
||
def display_notification(ev, rl, message, userns_special_profiles):
|
||
message = customize_notification_message(ev, message, userns_special_profiles)
|
||
n = notify2.Notification(_('AppArmor security notice'), message, 'gtk-dialog-warning')
|
||
if can_allow_rule(ev, userns_special_profiles):
|
||
n.add_action('clicked', 'Allow', cb_add_to_profile, (ev, rl, userns_special_profiles))
|
||
n.add_action('more_clicked', 'Show More', cb_more_info, (ev, rl, userns_special_profiles))
|
||
|
||
n.show()
|
||
|
||
|
||
def display_aggregated_notification(rl, aggregated, maximum_number_notification_profiles, keys_to_aggregate, special_profiles):
|
||
notification, summary, more_info, clean_rules = get_aggregated(rl, aggregated, maximum_number_notification_profiles, keys_to_aggregate, special_profiles)
|
||
n = notify2.Notification(_('AppArmor security notice'), notification, 'gtk-dialog-warning')
|
||
n.add_action('more_aggregated_clicked', 'Show More Info', cb_more_info_aggregated, (summary, more_info, clean_rules))
|
||
n.show()
|
||
|
||
|
||
def main():
|
||
"""Run aa-notify.
|
||
|
||
Parse command line arguments and starts the requested operations.
|
||
"""
|
||
global _, debug_logger, config, args
|
||
global debug_docs_url, nobody_user, original_effective_user, timeformat
|
||
|
||
debug_docs_url = "https://wiki.ubuntu.com/DebuggingApparmor"
|
||
nobody_user = "nobody"
|
||
timeformat = "%c" # Automatically using locale format
|
||
original_effective_user = os.geteuid()
|
||
|
||
# setup exception handling
|
||
enable_aa_exception_handler()
|
||
|
||
# setup module translations
|
||
_ = init_translation()
|
||
|
||
# Register the on_exit method with atexit
|
||
# Takes care of closing the debug log etc
|
||
atexit.register(aa.on_exit)
|
||
|
||
# Set up UI logger for separate messages from UI module
|
||
debug_logger = DebugLogger('Notify')
|
||
debug_logger.debug("Starting aa-notify")
|
||
|
||
parser = argparse.ArgumentParser(description=_('Display AppArmor notifications or messages for DENIED entries.'))
|
||
parser.add_argument('-p', '--poll', action='store_true', help=_('poll AppArmor logs and display notifications'))
|
||
parser.add_argument('--display', type=str, help=_('set the DISPLAY environment variable (might be needed if sudo resets $DISPLAY)'))
|
||
parser.add_argument('-f', '--file', type=str, help=_('search FILE for AppArmor messages'))
|
||
parser.add_argument('-l', '--since-last', action='store_true', help=_('display stats since last login'))
|
||
parser.add_argument('-s', '--since-days', type=int, metavar=('NUM'), help=_('show stats for last NUM days (can be used alone or with -p)'))
|
||
parser.add_argument('-v', '--verbose', action='store_true', help=_('show messages with stats'))
|
||
parser.add_argument('-u', '--user', type=str, help=_('user to drop privileges to when not using sudo'))
|
||
parser.add_argument('-w', '--wait', type=int, metavar=('NUM'), help=_('wait NUM seconds before displaying notifications (with -p)'))
|
||
parser.add_argument('-m', '--merge-notifications', action='store_true', help=_('Merge notification for improved readability (with -p)'))
|
||
parser.add_argument('--prompt-filter', type=str, metavar=('PF'), help=_('kind of operations which display a popup prompt'))
|
||
parser.add_argument('--debug', action='store_true', help=_('debug mode'))
|
||
parser.add_argument('--configdir', type=str, help=argparse.SUPPRESS)
|
||
|
||
filter_group = parser.add_argument_group('Filtering options',
|
||
description=('Filters are used to reduce the output of information to only '
|
||
'those entries that will match the filter. Filters use Python\'s regular '
|
||
'expression syntax.'))
|
||
filter_group.add_argument('--filter.profile', metavar='PROFILE', help=_('regular expression to match the profile'))
|
||
filter_group.add_argument('--filter.operation', metavar='OPERATION', help=_('regular expression to match the operation'))
|
||
filter_group.add_argument('--filter.name', metavar='NAME', help=_('regular expression to match the name'))
|
||
filter_group.add_argument('--filter.denied', metavar='DENIED', help=_('regular expression to match the denied mask'))
|
||
filter_group.add_argument('--filter.family', metavar='FAMILY', help=_('regular expression to match the network family'))
|
||
filter_group.add_argument('--filter.socket', metavar='SOCKET', help=_('regular expression to match the network socket type'))
|
||
|
||
# If a TTY then assume running in test mode and fix output width
|
||
if not sys.stdout.isatty():
|
||
parser.formatter_class = lambda prog: argparse.HelpFormatter(prog, width=80)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Debug mode can be invoked directly with --debug or env LOGPROF_DEBUG=3
|
||
if args.debug:
|
||
debug_logger.activateStderr()
|
||
debug_logger.debug('Logging level: {}'.format(debug_logger.debug_level))
|
||
debug_logger.debug('Running as uid: {0[0]}, euid: {0[1]}, suid: {0[2]}'.format(os.getresuid()))
|
||
if args.poll:
|
||
debug_logger.debug('Running with --debug and --poll. Will exit in 100s')
|
||
# Sanity checks
|
||
user_ids = os.getresuid()
|
||
groups_ids = os.getresgid()
|
||
if user_ids[1] != user_ids[2]:
|
||
sys.exit("ERROR: Cannot be started with suid set!")
|
||
if groups_ids[1] != groups_ids[2]:
|
||
sys.exit("ERROR: Cannot be started with sgid set!")
|
||
|
||
# Define global variables that will be populated by init_aa()
|
||
# conf = None
|
||
logfile = None
|
||
|
||
if args.configdir: # prefer --configdir if given
|
||
confdir = args.configdir
|
||
else: # fallback to env variable (or None if not set)
|
||
confdir = os.getenv('__AA_CONFDIR')
|
||
|
||
aa.init_aa(confdir=confdir)
|
||
|
||
# Initialize aa.logfile
|
||
aa.set_logfile(args.file)
|
||
|
||
# Load global config reader
|
||
shell_config = aaconfig.Config('shell')
|
||
|
||
# Load system's notify.conf
|
||
# By default aa.CONFDIR is /etc/apparmor on most production systems
|
||
system_config = read_notify_conf(aa.CONFDIR, shell_config)
|
||
# Set default is no system notify.conf was found
|
||
if not system_config:
|
||
system_config = {'': {'show_notifications': 'yes'}}
|
||
|
||
# Load user's notify.conf
|
||
if os.path.isfile(os.environ['HOME'] + '/.apparmor/notify.conf'):
|
||
# Use legacy path if the conf file is there
|
||
user_config = read_notify_conf(os.environ['HOME'] + '/.apparmor', shell_config)
|
||
elif 'XDG_CONFIG_HOME' in os.environ and os.path.isfile(os.environ['XDG_CONFIG_HOME'] + '/apparmor/notify.conf'):
|
||
# Use XDG_CONFIG_HOME if it is defined
|
||
user_config = read_notify_conf(os.environ['XDG_CONFIG_HOME'] + '/apparmor', shell_config)
|
||
else:
|
||
# Fallback to the default value of XDG_CONFIG_HOME
|
||
user_config = read_notify_conf(os.environ['HOME'] + '/.config/apparmor', shell_config)
|
||
|
||
# Merge the two config dicts in an accurate and idiomatic way (requires Python 3.5)
|
||
config = {**system_config, **user_config}
|
||
|
||
"""
|
||
Possible configuration options:
|
||
- show_notifications
|
||
- message_body
|
||
- message_footer
|
||
- use_group
|
||
- userns_special_profiles
|
||
- ignore_denied_capability
|
||
- interface_theme
|
||
- prompt_filter
|
||
- maximum_number_notification_profiles
|
||
- keys_to_aggregate
|
||
- filter.profile,
|
||
- filter.operation,
|
||
- filter.name,
|
||
- filter.denied,
|
||
- filter.family,
|
||
- filter.socket,
|
||
"""
|
||
|
||
# # Config checks
|
||
|
||
# Warn about unknown keys in the config
|
||
allowed_config_keys = [
|
||
'use_group',
|
||
'userns_special_profiles',
|
||
'ignore_denied_capability',
|
||
'interface_theme',
|
||
'prompt_filter',
|
||
'show_notifications',
|
||
'message_body',
|
||
'message_footer',
|
||
'maximum_number_notification_profiles',
|
||
'keys_to_aggregate',
|
||
'filter.profile',
|
||
'filter.operation',
|
||
'filter.name',
|
||
'filter.denied',
|
||
'filter.family',
|
||
'filter.socket',
|
||
]
|
||
found_config_keys = config[''].keys()
|
||
unknown_keys = [
|
||
item for item in found_config_keys if item not in allowed_config_keys
|
||
]
|
||
for item in unknown_keys:
|
||
print(_('Warning! Configuration item "{}" is unknown!').format(item))
|
||
|
||
filters = {
|
||
'profile': '',
|
||
'operation': '',
|
||
'name': '',
|
||
'denied_mask': '',
|
||
'net_family': '',
|
||
'net_sock_type': '',
|
||
}
|
||
|
||
if 'filter.profile' in config['']:
|
||
filters['profile'] = config['']['filter.profile']
|
||
if 'filter.operation' in config['']:
|
||
filters['operation'] = config['']['filter.operation']
|
||
if 'filter.name' in config['']:
|
||
filters['name'] = config['']['filter.name']
|
||
if 'filter.denied' in config['']:
|
||
filters['denied_mask'] = config['']['filter.denied']
|
||
if 'filter.family' in config['']:
|
||
filters['net_family'] = config['']['filter.family']
|
||
if 'filter.socket' in config['']:
|
||
filters['net_sock_type'] = config['']['filter.socket']
|
||
|
||
# command line filters override notify.conf
|
||
if getattr(args, 'filter.profile'):
|
||
filters['profile'] = getattr(args, 'filter.profile')
|
||
if getattr(args, 'filter.operation'):
|
||
filters['operation'] = getattr(args, 'filter.operation')
|
||
if getattr(args, 'filter.name'):
|
||
filters['name'] = getattr(args, 'filter.name')
|
||
if getattr(args, 'filter.denied'):
|
||
filters['denied_mask'] = getattr(args, 'filter.denied')
|
||
if getattr(args, 'filter.family'):
|
||
filters['net_family'] = getattr(args, 'filter.family')
|
||
if getattr(args, 'filter.socket'):
|
||
filters['net_sock_type'] = getattr(args, 'filter.socket')
|
||
|
||
filters = compile_filter_regex(filters)
|
||
|
||
# Warn if use_group is defined and current group does not match defined
|
||
if 'use_group' in config['']:
|
||
user = pwd.getpwuid(os.geteuid())[0]
|
||
user_groups = [g.gr_name for g in grp.getgrall() if user in g.gr_mem]
|
||
gid = pwd.getpwnam(user).pw_gid
|
||
user_groups.append(grp.getgrgid(gid).gr_name)
|
||
|
||
if config['']['use_group'] not in user_groups:
|
||
print(
|
||
_('ERROR! User {user} not member of {group} group!').format(
|
||
user=user,
|
||
group=config['']['use_group']
|
||
),
|
||
file=sys.stderr
|
||
)
|
||
sys.exit(1)
|
||
# @TODO: Extend UI lib to have warning and error functions that
|
||
# can be used in an uniform way with both text and JSON output.
|
||
|
||
if 'userns_special_profiles' in config['']:
|
||
userns_special_profiles = config['']['userns_special_profiles'].strip().split(',')
|
||
else:
|
||
userns_special_profiles = ['unconfined'] # By default, unconfined is the only special profile
|
||
|
||
if 'ignore_denied_capability' in config['']:
|
||
ignore_denied_capability = config['']['ignore_denied_capability'].strip().split(',')
|
||
else:
|
||
ignore_denied_capability = ['sudo', 'su']
|
||
|
||
if 'interface_theme' in config['']:
|
||
set_interface_theme(config['']['interface_theme'].strip())
|
||
else:
|
||
set_interface_theme('ubuntu')
|
||
|
||
# Todo: add more kinds of notifications
|
||
supported_prompt_filter = {'userns'}
|
||
if not args.prompt_filter and 'prompt_filter' in config['']:
|
||
args.prompt_filter = config['']['prompt_filter']
|
||
if args.prompt_filter:
|
||
args.prompt_filter = set(args.prompt_filter.strip().split(','))
|
||
unsupported = args.prompt_filter - supported_prompt_filter
|
||
if unsupported:
|
||
sys.exit(_('ERROR: using an unsupported prompt filter: {}\nSupported values: {}').format(', '.join(unsupported), ', '.join(supported_prompt_filter)))
|
||
|
||
if 'maximum_number_notification_profiles' in config['']:
|
||
maximum_number_notification_profiles = int(config['']['maximum_number_notification_profiles'].strip())
|
||
else:
|
||
maximum_number_notification_profiles = 2
|
||
|
||
if 'keys_to_aggregate' in config['']:
|
||
keys_to_aggregate = config['']['keys_to_aggregate'].strip().split(',')
|
||
else:
|
||
keys_to_aggregate = {'operation', 'class', 'name', 'denied', 'target'}
|
||
|
||
if args.file:
|
||
logfile = args.file
|
||
elif os.path.isfile('/var/run/auditd.pid') and os.path.isfile('/var/log/audit/audit.log'):
|
||
# If auditd is running, look at /var/log/audit/audit.log
|
||
logfile = '/var/log/audit/audit.log'
|
||
elif os.path.isfile('/var/log/kern.log'):
|
||
# For aa-notify, the fallback is kern.log, not syslog from aa.logfile
|
||
logfile = '/var/log/kern.log'
|
||
else:
|
||
# If all above failed, use aa cfg
|
||
logfile = aa.logfile
|
||
|
||
if args.verbose:
|
||
print(_('Using log file'), logfile)
|
||
|
||
if args.display:
|
||
os.environ['DISPLAY'] = args.display
|
||
|
||
if args.poll:
|
||
# Exit immediately if show_notifications is no or any of the options below
|
||
if config['']['show_notifications'] in [False, 'no', 'false', '0']:
|
||
print(_('Showing notifications forbidden in notify.conf, aborting..'))
|
||
sys.exit(0)
|
||
|
||
# Don't allow usage of aa-notify by root, must be some user. Desktop
|
||
# logins as root are not recommended and certainly not a use case for
|
||
# aa-notify notifications.
|
||
if not args.user and os.getuid() == 0 and 'SUDO_USER' not in os.environ.keys():
|
||
sys.exit("ERROR: Cannot be started a real root user. Use --user to define what user to use.")
|
||
|
||
# Required to parse_record.
|
||
rl = ReadLog('', '', '')
|
||
|
||
# Initialize the list of profiles for can_allow_rule
|
||
aa.read_profiles()
|
||
|
||
drop_privileges()
|
||
daemonize()
|
||
raise_privileges()
|
||
|
||
if args.merge_notifications:
|
||
if not args.wait or args.wait == 0:
|
||
args.wait = 5
|
||
|
||
old_time = int(time.time())
|
||
while True:
|
||
|
||
raw_evs = get_apparmor_events_return(logfile, old_time)
|
||
drop_privileges()
|
||
|
||
if len(raw_evs) == 1: # Single event: we handle it without aggregation
|
||
raw_ev = raw_evs[0]
|
||
ev = rl.parse_record(raw_ev)
|
||
display_notification(ev, rl, format_event(raw_ev, logfile), userns_special_profiles)
|
||
elif len(raw_evs) > 1:
|
||
aggregated = defaultdict(lambda: {'count': 0, 'values': defaultdict(lambda: defaultdict(int)), 'events': []})
|
||
for raw_ev in raw_evs:
|
||
ev = rl.parse_record(raw_ev)
|
||
aggregate_event(aggregated, ev, keys_to_aggregate)
|
||
display_aggregated_notification(rl, aggregated, maximum_number_notification_profiles, keys_to_aggregate, userns_special_profiles)
|
||
|
||
old_time = int(time.time())
|
||
|
||
# When notification is sent, raise privileged back to root if the
|
||
# original effective user id was zero (to be able to read AppArmor logs)
|
||
raise_privileges()
|
||
time.sleep(args.wait)
|
||
else:
|
||
# At this point this script needs to be able to read 'logfile' but once
|
||
# the for loop starts, privileges can be dropped since the file descriptor
|
||
# has been opened and access granted. Further reads of the file will not
|
||
# trigger any new permission checks.
|
||
# @TODO Plan to catch PermissionError here or..?
|
||
for (event, message) in notify_about_new_entries(logfile, filters, args.wait):
|
||
ev = rl.parse_record(event)
|
||
|
||
# @TODO redo special behaviours with a more regular function
|
||
# We ignore capability denials for binaries in ignore_denied_capability
|
||
if ev['operation'] == 'capable' and ev['comm'] in ignore_denied_capability:
|
||
continue
|
||
|
||
# Special behaivor for userns:
|
||
if args.prompt_filter and 'userns' in args.prompt_filter and customized_message['userns']['cond'](ev, userns_special_profiles):
|
||
prompt_userns(ev)
|
||
continue # Notification already displayed for this event, we go to the next one.
|
||
|
||
# Notifications should not be run as root, since root probably is
|
||
# the wrong desktop user and not the one getting the notifications.
|
||
drop_privileges()
|
||
|
||
display_notification(ev, rl, message, userns_special_profiles)
|
||
|
||
# When notification is sent, raise privileged back to root if the
|
||
# original effective user id was zero (to be able to read AppArmor logs)
|
||
raise_privileges()
|
||
|
||
elif args.since_last:
|
||
show_entries_since_last_login(logfile, filters)
|
||
elif args.since_days:
|
||
show_entries_since_days(logfile, args.since_days, filters)
|
||
else:
|
||
parser.print_help()
|
||
|
||
sys.exit(0)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|