mirror of
https://gitlab.com/apparmor/apparmor.git
synced 2025-03-04 08:24:42 +01:00
550 lines
21 KiB
Python
Executable file
550 lines
21 KiB
Python
Executable file
#! /usr/bin/python3
|
||
# ----------------------------------------------------------------------
|
||
# Copyright (C) 2018–2019 Otto Kekäläinen <otto@kekalainen.net>
|
||
#
|
||
# This program is free software; you can redistribute it and/or
|
||
# modify it under the terms of version 2 of the GNU General Public
|
||
# License as published by the Free Software Foundation.
|
||
#
|
||
# This program is distributed in the hope that it will be useful,
|
||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
# GNU General Public License for more details.
|
||
#
|
||
# ----------------------------------------------------------------------
|
||
#
|
||
# /etc/apparmor/notify.conf:
|
||
# # set to 'yes' to enable AppArmor DENIED notifications
|
||
# show_notifications="yes"
|
||
#
|
||
# # only people in use_group can run this script
|
||
# use_group="admin"
|
||
#
|
||
# $HOME/.apparmor/notify.conf can have:
|
||
# # set to 'yes' to enable AppArmor DENIED notifications
|
||
# show_notifications="yes"
|
||
#
|
||
# In a typical desktop environment one would run as a service the
|
||
# command:
|
||
# /usr/bin/aa-notify -p -w 10
|
||
|
||
import argparse
|
||
import atexit
|
||
import grp
|
||
import os
|
||
import pwd
|
||
import re
|
||
import sys
|
||
import time
|
||
|
||
import notify2
|
||
import psutil
|
||
|
||
import apparmor.aa as aa
|
||
import apparmor.ui as aaui
|
||
import apparmor.config as aaconfig
|
||
import LibAppArmor # C-library to parse one log line
|
||
from apparmor.common import DebugLogger, open_file_read
|
||
from apparmor.fail import enable_aa_exception_handler
|
||
from apparmor.notify import get_last_login_timestamp
|
||
from apparmor.translations import init_translation
|
||
|
||
|
||
def get_user_login():
|
||
"""Portable function to get username. Should not trigger any
|
||
"OSError: [Errno 25] Inappropriate ioctl for device" errors in Giltab-CI"""
|
||
if os.name == "posix":
|
||
username = pwd.getpwuid(os.geteuid()).pw_name
|
||
else:
|
||
username = os.environ.get('USER')
|
||
if not username and hasattr(os, 'getlogin'):
|
||
username = os.getlogin()
|
||
return username
|
||
|
||
|
||
def format_event(event, logsource):
|
||
output = []
|
||
|
||
if 'message_body' in config['']:
|
||
output += [config['']['message_body']]
|
||
|
||
if event.profile:
|
||
output += ['Profile: {}'.format(event.profile)]
|
||
if event.operation:
|
||
output += ['Operation: {}'.format(event.operation)]
|
||
if event.name:
|
||
output += ['Name: {}'.format(event.name)]
|
||
if event.denied_mask:
|
||
output += ['Denied: {}'.format(event.denied_mask)]
|
||
if event.net_family and event.net_sock_type:
|
||
output += ['Family: {}'.format(event.net_family)]
|
||
output += ['Socket: {}'.format(event.net_sock_type)]
|
||
|
||
output += ['Logfile: {}'.format(logsource)]
|
||
|
||
return "\n".join(output)
|
||
|
||
|
||
def notify_about_new_entries(logfile, wait=0):
|
||
# Kill other instances of aa-notify if already running
|
||
for process in psutil.process_iter():
|
||
# Find the process that has the same name as this script, e.g. aa-notify.py
|
||
if process.name() == os.path.basename(__file__) and process.pid != os.getpid():
|
||
print(_('Killing old daemon (PID {})...').format(process.pid))
|
||
os.kill(process.pid, 15)
|
||
|
||
# Spawn/fork into the background and stay running
|
||
newpid = os.fork()
|
||
if newpid == 0:
|
||
|
||
# Follow the logfile and stream notifications
|
||
# Rate limit to not show too many notifications
|
||
try:
|
||
for event in follow_apparmor_events(logfile, wait):
|
||
debug_logger.info(format_event(event, logfile))
|
||
yield(format_event(event, logfile))
|
||
except PermissionError:
|
||
sys.exit(_("ERROR: Cannot read {}. Please check permissions.".format(logfile)))
|
||
|
||
else:
|
||
print(_('Notification emitter started in the background'))
|
||
# pids = (os.getpid(), newpid)
|
||
# print("parent: %d, child: %d\n" % pids)
|
||
os._exit(0) # Exit child without calling exit handlers etc
|
||
|
||
|
||
def show_entries_since_epoch(logfile, epoch_since):
|
||
count = 0
|
||
for event in get_apparmor_events(logfile, epoch_since):
|
||
count += 1
|
||
if args.verbose:
|
||
print(format_event(event, logfile))
|
||
print() # Print a newline after each entry for better readability
|
||
|
||
aaui.UI_Info(_('AppArmor denials: {count} (since {date})').format(
|
||
**{
|
||
'count': count,
|
||
'date': time.strftime(timeformat, time.localtime(epoch_since))
|
||
}
|
||
)
|
||
)
|
||
|
||
if args.verbose:
|
||
if 'message_footer' in config['']:
|
||
print(config['']['message_footer'])
|
||
else:
|
||
print(_('For more information, please see: {}').format(debug_docs_url))
|
||
|
||
|
||
def show_entries_since_last_login(logfile, username=get_user_login()):
|
||
# If running as sudo, use username of sudo user instead of root
|
||
if 'SUDO_USER' in os.environ.keys():
|
||
username = os.environ['SUDO_USER']
|
||
|
||
if args.verbose:
|
||
print(_('Showing entries since {} logged in').format(username))
|
||
print() # Newline
|
||
epoch_since = get_last_login_timestamp(username)
|
||
if epoch_since == 0:
|
||
print(_('ERROR: Could not find last login'), file=sys.stderr)
|
||
sys.exit(1)
|
||
show_entries_since_epoch(logfile, epoch_since)
|
||
|
||
|
||
def show_entries_since_days(logfile, since_days):
|
||
day_in_seconds = 60*60*24
|
||
epoch_now = int(time.time())
|
||
epoch_since = epoch_now - day_in_seconds * since_days
|
||
show_entries_since_epoch(logfile, epoch_since)
|
||
|
||
|
||
def follow_apparmor_events(logfile, wait=0):
|
||
"""Follow AppArmor events and yield relevant entries until process stops"""
|
||
|
||
# If wait was given as argument but was type None (from ArgumentParser)
|
||
# ensure it's type int and zero
|
||
if not wait:
|
||
wait = 0
|
||
|
||
# Record start time here so wait can be calculated later
|
||
start_time = int(time.time())
|
||
|
||
# Record initial file size to detect if log rotates
|
||
log_size = os.stat(logfile).st_size
|
||
# Record initial file inode number to detect if log gets renamed
|
||
log_inode = os.stat(logfile).st_ino
|
||
|
||
# @TODO Implement more log sources in addition to just the logfile
|
||
with open_file_read(logfile) as logdata:
|
||
|
||
# Loop all pre-existing events in the log source once so later runs
|
||
# will only see new events
|
||
for discarded_event in logdata:
|
||
pass
|
||
|
||
# @TODO: while+sleep will cause CPU interruptions once per second,
|
||
# so switch to epoll/inotify/etc for less resource consumption.
|
||
while True:
|
||
debug_logger.debug(
|
||
'Poll AppArmor event source {} seconds since start'.
|
||
format(int(time.time()) - start_time)
|
||
)
|
||
|
||
(logdata, log_inode, log_size) = reopen_logfile_if_needed(logfile, logdata, log_inode, log_size)
|
||
|
||
for event in parse_logdata(logdata):
|
||
# @TODO Alternatively use os.times()
|
||
if int(time.time()) - start_time < wait:
|
||
debug_logger.debug('Omitted an event seen during wait time')
|
||
continue
|
||
yield event
|
||
|
||
if debug_logger.debugging and debug_logger.debug_level <= 10 and int(time.time()) - start_time > 100:
|
||
debug_logger.debug('Debug mode detected: aborting notification emitter after 100 seconds.')
|
||
sys.exit(0)
|
||
|
||
time.sleep(1)
|
||
|
||
|
||
def reopen_logfile_if_needed(logfile, logdata, log_inode, log_size):
|
||
retry = True
|
||
|
||
while retry:
|
||
try:
|
||
# Reopen file if inode has changed, e.g. rename by logrotate
|
||
if os.stat(logfile).st_ino != log_inode:
|
||
debug_logger.debug('Logfile was renamed, reload to read the new file.')
|
||
logdata = open(logfile, 'r')
|
||
# Store new inode number for next comparisons
|
||
log_inode = os.stat(logfile).st_ino
|
||
|
||
# Start reading from the beginning if file shrank
|
||
if os.stat(logfile).st_size < log_size:
|
||
debug_logger.debug('Logfile shrank in size, reload from beginning.')
|
||
logdata.seek(0)
|
||
log_size = os.stat(logfile).st_size # Reset file size value
|
||
|
||
# Record new file size if grown
|
||
if os.stat(logfile).st_size > log_size:
|
||
log_size = os.stat(logfile).st_size
|
||
|
||
retry = False
|
||
except FileNotFoundError:
|
||
# @TODO: switch to epoll/inotify/
|
||
debug_logger.debug('Logfile not found, retrying.')
|
||
time.sleep(1)
|
||
# @TODO: send notification if reopening the log fails too many times
|
||
|
||
return (logdata, log_inode, log_size)
|
||
|
||
|
||
def get_apparmor_events(logfile, since=0):
|
||
"""Read audit events from log source and yield all relevant events"""
|
||
|
||
# Get logdata from file
|
||
# @TODO Implement more log sources in addition to just the logfile
|
||
try:
|
||
with open_file_read(logfile) as logdata:
|
||
for event in parse_logdata(logdata):
|
||
if event.epoch > since:
|
||
yield event
|
||
except PermissionError:
|
||
sys.exit(_("ERROR: Cannot read {}. Please check permissions.".format(logfile)))
|
||
|
||
|
||
def parse_logdata(logsource):
|
||
"""Traverse any iterable log source and extract relevant AppArmor events"""
|
||
|
||
re_audit_time_id = r'(msg=)?audit\([\d\.\:]+\):\s+' # 'audit(1282626827.320:411): '
|
||
re_kernel_time = r'\[[\d\.\s]+\]' # '[ 1612.746129]'
|
||
re_type_num = '1[45][0-9][0-9]' # 1400..1599
|
||
re_aa_or_op = '(apparmor=|operation=)'
|
||
|
||
re_log_parts = [
|
||
r'kernel:\s+(' + re_kernel_time + r'\s+)?(audit:\s+)?type=' + re_type_num + r'\s+' + re_audit_time_id + re_aa_or_op, # v2_6 syslog
|
||
r'kernel:\s+(' + re_kernel_time + r'\s+)?' + re_audit_time_id + 'type=' + re_type_num + r'\s+' + re_aa_or_op,
|
||
'type=(AVC|APPARMOR[_A-Z]*|' + re_type_num + r')\s+' + re_audit_time_id + '(type=' + re_type_num + r'\s+)?' + re_aa_or_op, # v2_6 audit and dmesg
|
||
r'type=USER_AVC\s+' + re_audit_time_id + '.*apparmor=', # dbus
|
||
r'type=UNKNOWN\[' + re_type_num + r'\]\s+' + re_audit_time_id + re_aa_or_op,
|
||
r'dbus\[[0-9]+\]:\s+apparmor=', # dbus
|
||
]
|
||
|
||
# Pre-filter log lines so that we hand over only relevant lines to LibAppArmor parsing
|
||
re_log_all = re.compile('(' + '|'.join(re_log_parts) + ')')
|
||
|
||
for entry in logsource:
|
||
|
||
# Check the start of the log line and only process lines from AppArmor
|
||
apparmor_entry = re_log_all.search(entry)
|
||
if apparmor_entry:
|
||
# Parse the line using LibAppArmor (C library)
|
||
# See aalogparse.h for data structure
|
||
event = LibAppArmor.parse_record(entry)
|
||
# Only show actual events of contained programs and ignore among
|
||
# others AppArmor profile reloads
|
||
if event.operation and event.operation[0:8] != 'profile_':
|
||
yield event
|
||
|
||
|
||
def drop_privileges():
|
||
"""If running as root, drop privileges to USER if known, or fall-back to nobody_user/group"""
|
||
|
||
if os.geteuid() == 0:
|
||
|
||
if 'SUDO_USER' in os.environ.keys():
|
||
next_username = os.environ['SUDO_USER']
|
||
next_uid = os.environ['SUDO_UID']
|
||
next_gid = os.environ['SUDO_GID']
|
||
else:
|
||
nobody_user_info = pwd.getpwnam(nobody_user)
|
||
next_username = nobody_user_info[0]
|
||
next_uid = nobody_user_info[2]
|
||
next_gid = nobody_user_info[3]
|
||
|
||
debug_logger.debug('Dropping to user "{}" privileges'.format(next_username))
|
||
|
||
# @TODO?
|
||
# Remove group privileges, including potential 'adm' group that might
|
||
# have had log read access but also other accesses.
|
||
# os.setgroups([])
|
||
|
||
# Try setting the new uid/gid
|
||
# Set gid first, otherwise the latter step would fail on missing permissions
|
||
os.setegid(int(next_gid))
|
||
os.seteuid(int(next_uid))
|
||
|
||
|
||
def raise_privileges():
|
||
"""If was running as user with saved user ID 0, raise back to root privileges"""
|
||
|
||
if os.geteuid() != 0 and original_effective_user == 0:
|
||
|
||
debug_logger.debug('Rasing privileges from UID {} back to UID 0 (root)'.format(os.geteuid()))
|
||
|
||
# os.setgid(int(next_gid))
|
||
os.seteuid(original_effective_user)
|
||
|
||
|
||
def read_notify_conf(path, shell_config):
|
||
try:
|
||
shell_config.CONF_DIR = path
|
||
conf_dict = shell_config.read_config('notify.conf')
|
||
debug_logger.debug('Found configuration file in {}/notify.conf'.format(shell_config.CONF_DIR))
|
||
return conf_dict
|
||
except FileNotFoundError:
|
||
return {}
|
||
|
||
|
||
def main():
|
||
"""
|
||
Main function of aa-notify that parses command line
|
||
arguments and starts the requested operations.
|
||
"""
|
||
|
||
global _, debug_logger, config, args
|
||
global debug_docs_url, nobody_user, original_effective_user, timeformat
|
||
|
||
debug_docs_url = "https://wiki.ubuntu.com/DebuggingApparmor"
|
||
nobody_user = "nobody"
|
||
timeformat = "%c" # Automatically using locale format
|
||
original_effective_user = os.geteuid()
|
||
|
||
# setup exception handling
|
||
enable_aa_exception_handler()
|
||
|
||
# setup module translations
|
||
_ = init_translation()
|
||
|
||
# Register the on_exit method with atexit
|
||
# Takes care of closing the debug log etc
|
||
atexit.register(aa.on_exit)
|
||
|
||
# Set up UI logger for separate messages from UI module
|
||
debug_logger = DebugLogger('Notify')
|
||
debug_logger.debug("Starting aa-notify")
|
||
|
||
parser = argparse.ArgumentParser(description=_('Display AppArmor notifications or messages for DENIED entries.'))
|
||
parser.add_argument('-p', '--poll', action='store_true', help=_('poll AppArmor logs and display notifications'))
|
||
parser.add_argument('--display', type=str, help=_('set the DISPLAY environment variable (might be needed if sudo resets $DISPLAY)'))
|
||
parser.add_argument('-f', '--file', type=str, help=_('search FILE for AppArmor messages'))
|
||
parser.add_argument('-l', '--since-last', action='store_true', help=_('display stats since last login'))
|
||
parser.add_argument('-s', '--since-days', type=int, metavar=('NUM'), help=_('show stats for last NUM days (can be used alone or with -p)'))
|
||
parser.add_argument('-v', '--verbose', action='store_true', help=_('show messages with stats'))
|
||
parser.add_argument('-u', '--user', type=str, help=_('user to drop privileges to when not using sudo'))
|
||
parser.add_argument('-w', '--wait', type=int, metavar=('NUM'), help=_('wait NUM seconds before displaying notifications (with -p)'))
|
||
parser.add_argument('--debug', action='store_true', help=_('debug mode'))
|
||
parser.add_argument('--configdir', type=str, help=argparse.SUPPRESS)
|
||
|
||
# If a TTY then assume running in test mode and fix output width
|
||
if not sys.stdout.isatty():
|
||
parser.formatter_class = lambda prog: argparse.HelpFormatter(prog, width=80)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Debug mode can be invoked directly with --debug or env LOGPROF_DEBUG=3
|
||
if args.debug:
|
||
debug_logger.activateStderr()
|
||
debug_logger.debug('Logging level: {}'.format(debug_logger.debug_level))
|
||
debug_logger.debug('Running as uid: {0[0]}, euid: {0[1]}, suid: {0[2]}'.format(os.getresuid()))
|
||
if args.poll:
|
||
debug_logger.debug('Running with --debug and --poll. Will exit in 100s')
|
||
# Sanity checks
|
||
user_ids = os.getresuid()
|
||
groups_ids = os.getresgid()
|
||
if user_ids[1] != user_ids[2]:
|
||
sys.exit("ERROR: Cannot be started with suid set!")
|
||
if groups_ids[1] != groups_ids[2]:
|
||
sys.exit("ERROR: Cannot be started with sgid set!")
|
||
|
||
# Define global variables that will be populated by init_aa()
|
||
# conf = None
|
||
logfile = None
|
||
|
||
if args.configdir: # prefer --configdir if given
|
||
confdir = args.configdir
|
||
else: # fallback to env variable (or None if not set)
|
||
confdir = os.getenv('__AA_CONFDIR')
|
||
|
||
aa.init_aa(confdir=confdir)
|
||
|
||
# Initialize aa.logfile
|
||
aa.set_logfile(args.file)
|
||
|
||
# Load global config reader
|
||
shell_config = aaconfig.Config('shell')
|
||
|
||
# Load system's notify.conf
|
||
# By default aa.CONFDIR is /etc/apparmor on most production systems
|
||
system_config = read_notify_conf(aa.CONFDIR, shell_config)
|
||
# Set default is no system notify.conf was found
|
||
if not system_config:
|
||
system_config = {'': {'show_notifications': 'yes'}}
|
||
|
||
# Load user's notify.conf
|
||
if os.path.isfile(os.environ['HOME'] + '/.apparmor/notify.conf'):
|
||
# Use legacy path if the conf file is there
|
||
user_config = read_notify_conf(os.environ['HOME'] + '/.apparmor', shell_config)
|
||
elif 'XDG_CONFIG_HOME' in os.environ and os.path.isfile(os.environ['XDG_CONFIG_HOME'] + '/apparmor/notify.conf'):
|
||
# Use XDG_CONFIG_HOME if it is defined
|
||
user_config = read_notify_conf(os.environ['XDG_CONFIG_HOME'] + '/apparmor', shell_config)
|
||
else:
|
||
# Fallback to the default value of XDG_CONFIG_HOME
|
||
user_config = read_notify_conf(os.environ['HOME'] + '/.config/apparmor', shell_config)
|
||
|
||
# Merge the two config dicts in an accurate and idiomatic way (requires Python 3.5)
|
||
config = {**system_config, **user_config}
|
||
|
||
"""
|
||
Possible configuration options:
|
||
- show_notifications
|
||
- message_body
|
||
- message_footer
|
||
- use_group
|
||
"""
|
||
|
||
# # Config checks
|
||
|
||
# Warn about unknown keys in the config
|
||
allowed_config_keys = [
|
||
'use_group',
|
||
'show_notifications',
|
||
'message_body',
|
||
'message_footer'
|
||
]
|
||
found_config_keys = config[''].keys()
|
||
unknown_keys = [item for item in found_config_keys if item not in allowed_config_keys]
|
||
for item in unknown_keys:
|
||
print(_('Warning! Configuration item "{}" is unknown!').format(item))
|
||
|
||
# Warn if use_group is defined and current group does not match defined
|
||
if 'use_group' in config['']:
|
||
user = pwd.getpwuid(os.geteuid())[0]
|
||
user_groups = [g.gr_name for g in grp.getgrall() if user in g.gr_mem]
|
||
gid = pwd.getpwnam(user).pw_gid
|
||
user_groups.append(grp.getgrgid(gid).gr_name)
|
||
|
||
if config['']['use_group'] not in user_groups:
|
||
print(
|
||
_('ERROR! User {user} not member of {group} group!').format(
|
||
user=user,
|
||
group=config['']['use_group']
|
||
),
|
||
file=sys.stderr
|
||
)
|
||
sys.exit(1)
|
||
# @TODO: Extend UI lib to have warning and error functions that
|
||
# can be used in an uniform way with both text and JSON output.
|
||
|
||
if args.file:
|
||
logfile = args.file
|
||
elif os.path.isfile('/var/run/auditd.pid') and os.path.isfile('/var/log/audit/audit.log'):
|
||
# If auditd is running, look at /var/log/audit/audit.log
|
||
logfile = '/var/log/audit/audit.log'
|
||
elif os.path.isfile('/var/log/kern.log'):
|
||
# For aa-notify, the fallback is kern.log, not syslog from aa.logfile
|
||
logfile = '/var/log/kern.log'
|
||
else:
|
||
# If all above failed, use aa cfg
|
||
logfile = aa.logfile
|
||
|
||
if args.verbose:
|
||
print(_('Using log file'), logfile)
|
||
|
||
if args.display:
|
||
os.environ['DISPLAY'] = args.display
|
||
|
||
if args.poll:
|
||
# Exit immediately if show_notifications is no or any of the options below
|
||
if config['']['show_notifications'] in [False, 'no', 'false', '0']:
|
||
print(_('Showing notifications forbidden in notify.conf, aborting..'))
|
||
sys.exit(0)
|
||
|
||
# Don't allow usage of aa-notify by root, must be some user. Desktop
|
||
# logins as root are not recommended and certainly not a use case for
|
||
# aa-notify notifications.
|
||
if not args.user and os.getuid() == 0 and 'SUDO_USER' not in os.environ.keys():
|
||
sys.exit("ERROR: Cannot be started a real root user. Use --user to define what user to use.")
|
||
|
||
# At this point this script needs to be able to read 'logfile' but once
|
||
# the for loop starts, privileges can be dropped since the file descriptor
|
||
# has been opened and access granted. Further reads of the file will not
|
||
# trigger any new permission checks.
|
||
# @TODO Plan to catch PermissionError here or..?
|
||
for message in notify_about_new_entries(logfile, args.wait):
|
||
|
||
# Notifications should not be run as root, since root probably is
|
||
# the wrong desktop user and not the one getting the notifications.
|
||
drop_privileges()
|
||
|
||
# sudo does not preserve DBUS address, so we need to guess it based on UID
|
||
if 'DBUS_SESSION_BUS_ADDRESS' not in os.environ:
|
||
os.environ['DBUS_SESSION_BUS_ADDRESS'] = 'unix:path=/run/user/{}/bus'.format(os.geteuid())
|
||
|
||
# Before use, notify2 must be initialized and the DBUS channel
|
||
# should be opened using the non-root user. This step needs to
|
||
# be executed after the drop_privileges().
|
||
notify2.init('AppArmor')
|
||
|
||
n = notify2.Notification(
|
||
_('AppArmor notification'),
|
||
message,
|
||
'gtk-dialog-warning'
|
||
)
|
||
n.show()
|
||
|
||
# When notification is sent, raise privileged back to root if the
|
||
# original effective user id was zero (to be able to read AppArmor logs)
|
||
raise_privileges()
|
||
|
||
elif args.since_last:
|
||
show_entries_since_last_login(logfile)
|
||
elif args.since_days:
|
||
show_entries_since_days(logfile, args.since_days)
|
||
else:
|
||
parser.print_help()
|
||
|
||
sys.exit(0)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|