mirror of
https://gitlab.com/apparmor/apparmor.git
synced 2025-03-04 08:24:42 +01:00
Merge aa-notify: add notification filtering
Allow notification filtering of the fields profile, operation, name, denied_mask, net_family and net_socket using regex. Both command line and config options in notify.conf are available. Signed-off-by: Georgia Garcia <georgia.garcia@canonical.com> MR: https://gitlab.com/apparmor/apparmor/-/merge_requests/1154 Approved-by: John Johansen <john@jjmx.net> Merged-by: John Johansen <john@jjmx.net>
This commit is contained in:
commit
4d2172e82e
3 changed files with 394 additions and 23 deletions
100
utils/aa-notify
100
utils/aa-notify
|
@ -92,7 +92,24 @@ def format_event(event, logsource):
|
|||
return "\n".join(output)
|
||||
|
||||
|
||||
def notify_about_new_entries(logfile, wait=0):
|
||||
def is_event_in_filter(event, filters):
|
||||
"""Checks if event is in filter"""
|
||||
if filters['profile'] and event.profile and not re.match(filters['profile'], event.profile):
|
||||
return False
|
||||
if filters['operation'] and event.operation and not re.match(filters['operation'], event.operation):
|
||||
return False
|
||||
if filters['name'] and event.name and not re.match(filters['name'], event.name):
|
||||
return False
|
||||
if filters['denied_mask'] and event.denied_mask and not re.match(filters['denied_mask'], event.denied_mask):
|
||||
return False
|
||||
if filters['net_family'] and event.net_family and not re.match(filters['net_family'], event.net_family):
|
||||
return False
|
||||
if filters['net_sock_type'] and event.net_sock_type and not re.match(filters['net_sock_type'], event.net_sock_type):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def notify_about_new_entries(logfile, filters, wait=0):
|
||||
"""Run the notification daemon in the background."""
|
||||
# Kill other instances of aa-notify if already running
|
||||
for process in psutil.process_iter():
|
||||
|
@ -109,6 +126,8 @@ def notify_about_new_entries(logfile, wait=0):
|
|||
# Rate limit to not show too many notifications
|
||||
try:
|
||||
for event in follow_apparmor_events(logfile, wait):
|
||||
if not is_event_in_filter(event, filters):
|
||||
continue
|
||||
debug_logger.info(format_event(event, logfile))
|
||||
yield (format_event(event, logfile))
|
||||
except PermissionError:
|
||||
|
@ -121,10 +140,12 @@ def notify_about_new_entries(logfile, wait=0):
|
|||
os._exit(0) # Exit child without calling exit handlers etc
|
||||
|
||||
|
||||
def show_entries_since_epoch(logfile, epoch_since):
|
||||
def show_entries_since_epoch(logfile, epoch_since, filters):
|
||||
"""Show AppArmor notifications since given timestamp."""
|
||||
count = 0
|
||||
for event in get_apparmor_events(logfile, epoch_since):
|
||||
if not is_event_in_filter(event, filters):
|
||||
continue
|
||||
count += 1
|
||||
if args.verbose:
|
||||
print(format_event(event, logfile))
|
||||
|
@ -145,7 +166,7 @@ def show_entries_since_epoch(logfile, epoch_since):
|
|||
print(_('For more information, please see: {}').format(debug_docs_url))
|
||||
|
||||
|
||||
def show_entries_since_last_login(logfile, username=get_user_login()):
|
||||
def show_entries_since_last_login(logfile, filters, username=get_user_login()):
|
||||
"""Show AppArmor notifications since last login of user."""
|
||||
# If running as sudo, use username of sudo user instead of root
|
||||
if 'SUDO_USER' in os.environ.keys():
|
||||
|
@ -158,15 +179,15 @@ def show_entries_since_last_login(logfile, username=get_user_login()):
|
|||
if epoch_since == 0:
|
||||
print(_('ERROR: Could not find last login'), file=sys.stderr)
|
||||
sys.exit(1)
|
||||
show_entries_since_epoch(logfile, epoch_since)
|
||||
show_entries_since_epoch(logfile, epoch_since, filters)
|
||||
|
||||
|
||||
def show_entries_since_days(logfile, since_days):
|
||||
def show_entries_since_days(logfile, since_days, filters):
|
||||
"""Show AppArmor notifications since the given amount of days."""
|
||||
day_in_seconds = 60 * 60 * 24
|
||||
epoch_now = int(time.time())
|
||||
epoch_since = epoch_now - day_in_seconds * since_days
|
||||
show_entries_since_epoch(logfile, epoch_since)
|
||||
show_entries_since_epoch(logfile, epoch_since, filters)
|
||||
|
||||
|
||||
def follow_apparmor_events(logfile, wait=0):
|
||||
|
@ -398,6 +419,17 @@ def main():
|
|||
parser.add_argument('--debug', action='store_true', help=_('debug mode'))
|
||||
parser.add_argument('--configdir', type=str, help=argparse.SUPPRESS)
|
||||
|
||||
filter_group = parser.add_argument_group('Filtering options',
|
||||
description=('Filters are used to reduce the output of information to only '
|
||||
'those entries that will match the filter. Filters use Python\'s regular '
|
||||
'expression syntax.'))
|
||||
filter_group.add_argument('--filter.profile', metavar='PROFILE', help=_('regular expression to match the profile'))
|
||||
filter_group.add_argument('--filter.operation', metavar='OPERATION', help=_('regular expression to match the operation'))
|
||||
filter_group.add_argument('--filter.name', metavar='NAME', help=_('regular expression to match the name'))
|
||||
filter_group.add_argument('--filter.denied', metavar='DENIED', help=_('regular expression to match the denied mask'))
|
||||
filter_group.add_argument('--filter.family', metavar='FAMILY', help=_('regular expression to match the network family'))
|
||||
filter_group.add_argument('--filter.socket', metavar='SOCKET', help=_('regular expression to match the network socket type'))
|
||||
|
||||
# If a TTY then assume running in test mode and fix output width
|
||||
if not sys.stdout.isatty():
|
||||
parser.formatter_class = lambda prog: argparse.HelpFormatter(prog, width=80)
|
||||
|
@ -463,6 +495,12 @@ def main():
|
|||
- message_body
|
||||
- message_footer
|
||||
- use_group
|
||||
- filter.profile,
|
||||
- filter.operation,
|
||||
- filter.name,
|
||||
- filter.denied,
|
||||
- filter.family,
|
||||
- filter.socket,
|
||||
"""
|
||||
|
||||
# # Config checks
|
||||
|
@ -472,7 +510,13 @@ def main():
|
|||
'use_group',
|
||||
'show_notifications',
|
||||
'message_body',
|
||||
'message_footer'
|
||||
'message_footer',
|
||||
'filter.profile',
|
||||
'filter.operation',
|
||||
'filter.name',
|
||||
'filter.denied',
|
||||
'filter.family',
|
||||
'filter.socket',
|
||||
]
|
||||
found_config_keys = config[''].keys()
|
||||
unknown_keys = [
|
||||
|
@ -481,6 +525,42 @@ def main():
|
|||
for item in unknown_keys:
|
||||
print(_('Warning! Configuration item "{}" is unknown!').format(item))
|
||||
|
||||
filters = {
|
||||
'profile': '',
|
||||
'operation': '',
|
||||
'name': '',
|
||||
'denied_mask': '',
|
||||
'net_family': '',
|
||||
'net_sock_type': '',
|
||||
}
|
||||
|
||||
if 'filter.profile' in config['']:
|
||||
filters['profile'] = config['']['filter.profile']
|
||||
if 'filter.operation' in config['']:
|
||||
filters['operation'] = config['']['filter.operation']
|
||||
if 'filter.name' in config['']:
|
||||
filters['name'] = config['']['filter.name']
|
||||
if 'filter.denied' in config['']:
|
||||
filters['denied_mask'] = config['']['filter.denied']
|
||||
if 'filter.family' in config['']:
|
||||
filters['net_family'] = config['']['filter.family']
|
||||
if 'filter.socket' in config['']:
|
||||
filters['net_sock_type'] = config['']['filter.socket']
|
||||
|
||||
# command line filters override notify.conf
|
||||
if getattr(args, 'filter.profile'):
|
||||
filters['profile'] = getattr(args, 'filter.profile')
|
||||
if getattr(args, 'filter.operation'):
|
||||
filters['operation'] = getattr(args, 'filter.operation')
|
||||
if getattr(args, 'filter.name'):
|
||||
filters['name'] = getattr(args, 'filter.name')
|
||||
if getattr(args, 'filter.denied'):
|
||||
filters['denied_mask'] = getattr(args, 'filter.denied')
|
||||
if getattr(args, 'filter.family'):
|
||||
filters['net_family'] = getattr(args, 'filter.family')
|
||||
if getattr(args, 'filter.socket'):
|
||||
filters['net_sock_type'] = getattr(args, 'filter.socket')
|
||||
|
||||
# Warn if use_group is defined and current group does not match defined
|
||||
if 'use_group' in config['']:
|
||||
user = pwd.getpwuid(os.geteuid())[0]
|
||||
|
@ -535,7 +615,7 @@ def main():
|
|||
# has been opened and access granted. Further reads of the file will not
|
||||
# trigger any new permission checks.
|
||||
# @TODO Plan to catch PermissionError here or..?
|
||||
for message in notify_about_new_entries(logfile, args.wait):
|
||||
for message in notify_about_new_entries(logfile, filters, args.wait):
|
||||
|
||||
# Notifications should not be run as root, since root probably is
|
||||
# the wrong desktop user and not the one getting the notifications.
|
||||
|
@ -562,9 +642,9 @@ def main():
|
|||
raise_privileges()
|
||||
|
||||
elif args.since_last:
|
||||
show_entries_since_last_login(logfile)
|
||||
show_entries_since_last_login(logfile, filters)
|
||||
elif args.since_days:
|
||||
show_entries_since_days(logfile, args.since_days)
|
||||
show_entries_since_days(logfile, args.since_days, filters)
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
|
|
@ -20,3 +20,11 @@ show_notifications="yes"
|
|||
|
||||
# OPTIONAL - custom notification message footer
|
||||
# message_footer="For more information visit https://foo.com"
|
||||
|
||||
# OPTIONAL - custom notification filtering
|
||||
# filter.profile=""
|
||||
# filter.operation=""
|
||||
# filter.name=""
|
||||
# filter.denied=""
|
||||
# filter.family=""
|
||||
# filter.socket=""
|
||||
|
|
|
@ -65,9 +65,9 @@ def cmd(command):
|
|||
return sp.returncode, out.decode('utf-8')
|
||||
|
||||
|
||||
class AANotifyTest(AATest):
|
||||
class AANotifyBase(AATest):
|
||||
|
||||
def create_logfile_contents(self, _time):
|
||||
def create_logfile_contents(_time):
|
||||
"""Create temporary log file with 30 entries of different age"""
|
||||
|
||||
test_logfile_contents_999_days_old = \
|
||||
|
@ -120,13 +120,14 @@ Feb 4 13:40:38 XPS-13-9370 kernel: [128552.880347] audit: type=1400 audit({epoc
|
|||
+ test_logfile_contents_unrelevant_entries \
|
||||
+ test_logfile_contents_0_seconds_old
|
||||
|
||||
def AASetup(self):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
file_current = NamedTemporaryFile("w+", prefix='test-aa-notify-', delete=False)
|
||||
file_last_login = NamedTemporaryFile("w+", prefix='test-aa-notify-', delete=False)
|
||||
self.test_logfile_current = file_current.name
|
||||
self.test_logfile_last_login = file_last_login.name
|
||||
cls.test_logfile_current = file_current.name
|
||||
cls.test_logfile_last_login = file_last_login.name
|
||||
|
||||
current_time_contents = self.create_logfile_contents(time.time())
|
||||
current_time_contents = cls.create_logfile_contents(time.time())
|
||||
file_current.write(current_time_contents)
|
||||
|
||||
if os.path.isfile('/var/log/wtmp'):
|
||||
|
@ -146,16 +147,20 @@ Feb 4 13:40:38 XPS-13-9370 kernel: [128552.880347] audit: type=1400 audit({epoc
|
|||
last_login = output.split()[3]
|
||||
last_login_epoch = datetime.fromisoformat(last_login).timestamp()
|
||||
# add 60 seconds to the epoch so that the time in the logs are AFTER login time
|
||||
last_login_contents = self.create_logfile_contents(last_login_epoch + 60)
|
||||
last_login_contents = cls.create_logfile_contents(last_login_epoch + 60)
|
||||
file_last_login.write(last_login_contents)
|
||||
|
||||
def AATeardown(self):
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
"""Remove temporary log file after tests ended"""
|
||||
|
||||
if self.test_logfile_current and os.path.exists(self.test_logfile_current):
|
||||
os.remove(self.test_logfile_current)
|
||||
if self.test_logfile_last_login and os.path.exists(self.test_logfile_last_login):
|
||||
os.remove(self.test_logfile_last_login)
|
||||
if cls.test_logfile_current and os.path.exists(cls.test_logfile_current):
|
||||
os.remove(cls.test_logfile_current)
|
||||
if cls.test_logfile_last_login and os.path.exists(cls.test_logfile_last_login):
|
||||
os.remove(cls.test_logfile_last_login)
|
||||
|
||||
|
||||
class AANotifyTest(AANotifyBase):
|
||||
|
||||
# The Perl aa-notify script was written so, that it will checked for kern.log
|
||||
# before printing help when invoked without arguments (sic!).
|
||||
|
@ -178,13 +183,17 @@ Feb 4 13:40:38 XPS-13-9370 kernel: [128552.880347] audit: type=1400 audit({epoc
|
|||
expected_return_code = 0
|
||||
expected_output_1 = \
|
||||
'''usage: aa-notify [-h] [-p] [--display DISPLAY] [-f FILE] [-l] [-s NUM] [-v]
|
||||
[-u USER] [-w NUM] [--debug]
|
||||
[-u USER] [-w NUM] [--debug] [--filter.profile PROFILE]
|
||||
[--filter.operation OPERATION] [--filter.name NAME]
|
||||
[--filter.denied DENIED] [--filter.family FAMILY]
|
||||
[--filter.socket SOCKET]
|
||||
|
||||
Display AppArmor notifications or messages for DENIED entries.
|
||||
'''
|
||||
|
||||
expected_output_2 = \
|
||||
'''
|
||||
options:
|
||||
-h, --help show this help message and exit
|
||||
-p, --poll poll AppArmor logs and display notifications
|
||||
--display DISPLAY set the DISPLAY environment variable (might be needed if
|
||||
|
@ -199,6 +208,22 @@ Display AppArmor notifications or messages for DENIED entries.
|
|||
-w NUM, --wait NUM wait NUM seconds before displaying notifications (with
|
||||
-p)
|
||||
--debug debug mode
|
||||
|
||||
Filtering options:
|
||||
Filters are used to reduce the output of information to only those entries
|
||||
that will match the filter. Filters use Python's regular expression syntax.
|
||||
|
||||
--filter.profile PROFILE
|
||||
regular expression to match the profile
|
||||
--filter.operation OPERATION
|
||||
regular expression to match the operation
|
||||
--filter.name NAME regular expression to match the name
|
||||
--filter.denied DENIED
|
||||
regular expression to match the denied mask
|
||||
--filter.family FAMILY
|
||||
regular expression to match the network family
|
||||
--filter.socket SOCKET
|
||||
regular expression to match the network socket type
|
||||
'''
|
||||
|
||||
return_code, output = cmd(aanotify_bin + ['--help'])
|
||||
|
@ -312,6 +337,264 @@ AppArmor denials: 10 (since'''.format(logfile=self.test_logfile_last_login)
|
|||
self.assertIn(expected_output_has, output, result + output)
|
||||
|
||||
|
||||
class AANotifyProfileFilterTest(AANotifyBase):
|
||||
|
||||
def test_profile_regex_since_100_days(self):
|
||||
profile_tests = (
|
||||
(['--filter.profile', 'libreoffice'], (0, 'AppArmor denials: 20 (since')),
|
||||
(['--filter.profile', 'libreoffice-soffice'], (0, 'AppArmor denials: 20 (since')),
|
||||
(['--filter.profile', 'libreoffice-soffice$'], (0, 'AppArmor denials: 4 (since')),
|
||||
(['--filter.profile', '^libreoffice-soffice$'], (0, 'AppArmor denials: 4 (since')),
|
||||
(['--filter.profile', 'libreoffice-soffice//null-/bin/uname'], (0, 'AppArmor denials: 14 (since')),
|
||||
(['--filter.profile', 'uname'], (0, 'AppArmor denials: 0 (since')),
|
||||
(['--filter.profile', '.*uname'], (0, 'AppArmor denials: 14 (since')),
|
||||
(['--filter.profile', 'libreoffice-soffice//null-/.*'], (0, 'AppArmor denials: 16 (since')),
|
||||
(['--filter.profile', 'libreoffice-soffice//null-/foo'], (0, 'AppArmor denials: 0 (since')),
|
||||
(['--filter.profile', 'libreoffice-soffice/foo'], (0, 'AppArmor denials: 0 (since')),
|
||||
(['--filter.profile', 'bar'], (0, 'AppArmor denials: 0 (since')),
|
||||
)
|
||||
days_params = ['-f', self.test_logfile_current, '-s', '100']
|
||||
|
||||
for test in profile_tests:
|
||||
params = test[0]
|
||||
expected = test[1]
|
||||
|
||||
with self.subTest(params=params, expected=expected):
|
||||
expected_return_code = expected[0]
|
||||
expected_output_has = expected[1]
|
||||
|
||||
return_code, output = cmd(aanotify_bin + days_params + params)
|
||||
result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code)
|
||||
self.assertEqual(expected_return_code, return_code, result + output)
|
||||
result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has)
|
||||
self.assertIn(expected_output_has, output, result + output)
|
||||
|
||||
@unittest.skipUnless(os.path.isfile('/var/log/wtmp'), 'Requires wtmp on system')
|
||||
def test_profile_regex_since_login(self):
|
||||
profile_tests = (
|
||||
(['--filter.profile', 'libreoffice'], (0, 'AppArmor denials: 10 (since')),
|
||||
(['--filter.profile', 'libreoffice-soffice'], (0, 'AppArmor denials: 10 (since')),
|
||||
(['--filter.profile', 'libreoffice-soffice$'], (0, 'AppArmor denials: 2 (since')),
|
||||
(['--filter.profile', '^libreoffice-soffice$'], (0, 'AppArmor denials: 2 (since')),
|
||||
(['--filter.profile', 'libreoffice-soffice//null-/bin/uname'], (0, 'AppArmor denials: 7 (since')),
|
||||
(['--filter.profile', 'uname'], (0, 'AppArmor denials: 0 (since')),
|
||||
(['--filter.profile', '.*uname'], (0, 'AppArmor denials: 7 (since')),
|
||||
(['--filter.profile', 'libreoffice-soffice//null-/.*'], (0, 'AppArmor denials: 8 (since')),
|
||||
(['--filter.profile', 'libreoffice-soffice//null-/foo'], (0, 'AppArmor denials: 0 (since')),
|
||||
(['--filter.profile', 'libreoffice-soffice/foo'], (0, 'AppArmor denials: 0 (since')),
|
||||
(['--filter.profile', 'bar'], (0, 'AppArmor denials: 0 (since')),
|
||||
)
|
||||
login_params = ['-f', self.test_logfile_last_login, '-l']
|
||||
|
||||
for test in profile_tests:
|
||||
params = test[0]
|
||||
expected = test[1]
|
||||
|
||||
with self.subTest(params=params, expected=expected):
|
||||
expected_return_code = expected[0]
|
||||
expected_output_has = expected[1]
|
||||
|
||||
return_code, output = cmd(aanotify_bin + login_params + params)
|
||||
if 'ERROR: Could not find last login' in output:
|
||||
self.skipTest('Could not find last login')
|
||||
result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code)
|
||||
self.assertEqual(expected_return_code, return_code, result + output)
|
||||
result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has)
|
||||
self.assertIn(expected_output_has, output, result + output)
|
||||
|
||||
|
||||
class AANotifyOperationFilterTest(AANotifyBase):
|
||||
|
||||
def test_operation_regex_since_100_days(self):
|
||||
operation_tests = (
|
||||
(['--filter.operation', 'exec'], (0, 'AppArmor denials: 4 (since')),
|
||||
(['--filter.operation', 'file_inherit'], (0, 'AppArmor denials: 2 (since')),
|
||||
(['--filter.operation', 'file_mmap'], (0, 'AppArmor denials: 8 (since')),
|
||||
(['--filter.operation', 'open'], (0, 'AppArmor denials: 6 (since')),
|
||||
(['--filter.operation', 'file.*'], (0, 'AppArmor denials: 10 (since')),
|
||||
(['--filter.operation', 'profile_load'], (0, 'AppArmor denials: 0 (since')),
|
||||
(['--filter.operation', 'profile_replace'], (0, 'AppArmor denials: 0 (since')),
|
||||
(['--filter.operation', 'bar'], (0, 'AppArmor denials: 0 (since')),
|
||||
(['--filter.operation', 'userns_create'], (0, 'AppArmor denials: 0 (since')),
|
||||
)
|
||||
days_params = ['-f', self.test_logfile_current, '-s', '100']
|
||||
|
||||
for test in operation_tests:
|
||||
params = test[0]
|
||||
expected = test[1]
|
||||
|
||||
with self.subTest(params=params, expected=expected):
|
||||
expected_return_code = expected[0]
|
||||
expected_output_has = expected[1]
|
||||
|
||||
return_code, output = cmd(aanotify_bin + days_params + params)
|
||||
result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code)
|
||||
self.assertEqual(expected_return_code, return_code, result + output)
|
||||
result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has)
|
||||
self.assertIn(expected_output_has, output, result + output)
|
||||
|
||||
@unittest.skipUnless(os.path.isfile('/var/log/wtmp'), 'Requires wtmp on system')
|
||||
def test_operation_regex_since_login(self):
|
||||
operation_tests = (
|
||||
(['--filter.operation', 'exec'], (0, 'AppArmor denials: 2 (since')),
|
||||
(['--filter.operation', 'file_inherit'], (0, 'AppArmor denials: 1 (since')),
|
||||
(['--filter.operation', 'file_mmap'], (0, 'AppArmor denials: 4 (since')),
|
||||
(['--filter.operation', 'open'], (0, 'AppArmor denials: 3 (since')),
|
||||
(['--filter.operation', 'file.*'], (0, 'AppArmor denials: 5 (since')),
|
||||
(['--filter.operation', 'profile_load'], (0, 'AppArmor denials: 0 (since')),
|
||||
(['--filter.operation', 'profile_replace'], (0, 'AppArmor denials: 0 (since')),
|
||||
(['--filter.operation', 'bar'], (0, 'AppArmor denials: 0 (since')),
|
||||
(['--filter.operation', 'userns_create'], (0, 'AppArmor denials: 0 (since')),
|
||||
)
|
||||
login_params = ['-f', self.test_logfile_last_login, '-l']
|
||||
|
||||
for test in operation_tests:
|
||||
params = test[0]
|
||||
expected = test[1]
|
||||
|
||||
with self.subTest(params=params, expected=expected):
|
||||
expected_return_code = expected[0]
|
||||
expected_output_has = expected[1]
|
||||
|
||||
return_code, output = cmd(aanotify_bin + login_params + params)
|
||||
if 'ERROR: Could not find last login' in output:
|
||||
self.skipTest('Could not find last login')
|
||||
result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code)
|
||||
self.assertEqual(expected_return_code, return_code, result + output)
|
||||
result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has)
|
||||
self.assertIn(expected_output_has, output, result + output)
|
||||
|
||||
|
||||
class AANotifyNameFilterTest(AANotifyBase):
|
||||
|
||||
def test_name_regex_since_100_days(self):
|
||||
name_tests = (
|
||||
(['--filter.name', '/bin/uname'], (0, 'AppArmor denials: 4 (since')),
|
||||
(['--filter.name', '/dev/null'], (0, 'AppArmor denials: 2 (since')),
|
||||
(['--filter.name', '/lib/x86_64-linux-gnu/ld-2.27.so'], (0, 'AppArmor denials: 2 (since')),
|
||||
(['--filter.name', '/lib/x86_64-linux-gnu/libc-2.27.so'], (0, 'AppArmor denials: 4 (since')),
|
||||
(['--filter.name', '/etc/ld.so.cache'], (0, 'AppArmor denials: 2 (since')),
|
||||
(['--filter.name', '/usr/lib/locale/locale-archive'], (0, 'AppArmor denials: 2 (since')),
|
||||
(['--filter.name', '/usr/bin/file'], (0, 'AppArmor denials: 4 (since')),
|
||||
(['--filter.name', '/'], (0, 'AppArmor denials: 20 (since')),
|
||||
(['--filter.name', '/.*'], (0, 'AppArmor denials: 20 (since')),
|
||||
(['--filter.name', '.*bin.*'], (0, 'AppArmor denials: 8 (since')),
|
||||
(['--filter.name', '/(usr/)?bin.*'], (0, 'AppArmor denials: 8 (since')),
|
||||
(['--filter.name', '/foo'], (0, 'AppArmor denials: 0 (since')),
|
||||
)
|
||||
days_params = ['-f', self.test_logfile_current, '-s', '100']
|
||||
|
||||
for test in name_tests:
|
||||
params = test[0]
|
||||
expected = test[1]
|
||||
|
||||
with self.subTest(params=params, expected=expected):
|
||||
expected_return_code = expected[0]
|
||||
expected_output_has = expected[1]
|
||||
|
||||
return_code, output = cmd(aanotify_bin + days_params + params)
|
||||
result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code)
|
||||
self.assertEqual(expected_return_code, return_code, result + output)
|
||||
result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has)
|
||||
self.assertIn(expected_output_has, output, result + output)
|
||||
|
||||
@unittest.skipUnless(os.path.isfile('/var/log/wtmp'), 'Requires wtmp on system')
|
||||
def test_name_regex_since_login(self):
|
||||
name_tests = (
|
||||
(['--filter.name', '/bin/uname'], (0, 'AppArmor denials: 2 (since')),
|
||||
(['--filter.name', '/dev/null'], (0, 'AppArmor denials: 1 (since')),
|
||||
(['--filter.name', '/lib/x86_64-linux-gnu/ld-2.27.so'], (0, 'AppArmor denials: 1 (since')),
|
||||
(['--filter.name', '/lib/x86_64-linux-gnu/libc-2.27.so'], (0, 'AppArmor denials: 2 (since')),
|
||||
(['--filter.name', '/etc/ld.so.cache'], (0, 'AppArmor denials: 1 (since')),
|
||||
(['--filter.name', '/usr/lib/locale/locale-archive'], (0, 'AppArmor denials: 1 (since')),
|
||||
(['--filter.name', '/usr/bin/file'], (0, 'AppArmor denials: 2 (since')),
|
||||
(['--filter.name', '/'], (0, 'AppArmor denials: 10 (since')),
|
||||
(['--filter.name', '/.*'], (0, 'AppArmor denials: 10 (since')),
|
||||
(['--filter.name', '.*bin.*'], (0, 'AppArmor denials: 4 (since')),
|
||||
(['--filter.name', '/(usr/)?bin.*'], (0, 'AppArmor denials: 4 (since')),
|
||||
(['--filter.name', '/foo'], (0, 'AppArmor denials: 0 (since')),
|
||||
)
|
||||
login_params = ['-f', self.test_logfile_last_login, '-l']
|
||||
|
||||
for test in name_tests:
|
||||
params = test[0]
|
||||
expected = test[1]
|
||||
|
||||
with self.subTest(params=params, expected=expected):
|
||||
expected_return_code = expected[0]
|
||||
expected_output_has = expected[1]
|
||||
|
||||
return_code, output = cmd(aanotify_bin + login_params + params)
|
||||
if 'ERROR: Could not find last login' in output:
|
||||
self.skipTest('Could not find last login')
|
||||
result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code)
|
||||
self.assertEqual(expected_return_code, return_code, result + output)
|
||||
result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has)
|
||||
self.assertIn(expected_output_has, output, result + output)
|
||||
|
||||
|
||||
class AANotifyDeniedFilterTest(AANotifyBase):
|
||||
|
||||
def test_denied_regex_since_100_days(self):
|
||||
denied_tests = (
|
||||
(['--filter.denied', 'x'], (0, 'AppArmor denials: 4 (since')),
|
||||
(['--filter.denied', 'w'], (0, 'AppArmor denials: 2 (since')),
|
||||
(['--filter.denied', 'rm'], (0, 'AppArmor denials: 8 (since')),
|
||||
(['--filter.denied', 'r'], (0, 'AppArmor denials: 14 (since')),
|
||||
(['--filter.denied', '^r$'], (0, 'AppArmor denials: 6 (since')),
|
||||
(['--filter.denied', 'x|w'], (0, 'AppArmor denials: 6 (since')),
|
||||
(['--filter.denied', '^(?!rm).*'], (0, 'AppArmor denials: 12 (since')),
|
||||
(['--filter.denied', '.(?!m).*'], (0, 'AppArmor denials: 12 (since')),
|
||||
(['--filter.denied', 'r.?'], (0, 'AppArmor denials: 14 (since')),
|
||||
)
|
||||
days_params = ['-f', self.test_logfile_current, '-s', '100']
|
||||
|
||||
for test in denied_tests:
|
||||
params = test[0]
|
||||
expected = test[1]
|
||||
|
||||
with self.subTest(params=params, expected=expected):
|
||||
expected_return_code = expected[0]
|
||||
expected_output_has = expected[1]
|
||||
|
||||
return_code, output = cmd(aanotify_bin + days_params + params)
|
||||
result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code)
|
||||
self.assertEqual(expected_return_code, return_code, result + output)
|
||||
result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has)
|
||||
self.assertIn(expected_output_has, output, result + output)
|
||||
|
||||
@unittest.skipUnless(os.path.isfile('/var/log/wtmp'), 'Requires wtmp on system')
|
||||
def test_denied_regex_since_login(self):
|
||||
denied_tests = (
|
||||
(['--filter.denied', 'x'], (0, 'AppArmor denials: 2 (since')),
|
||||
(['--filter.denied', 'w'], (0, 'AppArmor denials: 1 (since')),
|
||||
(['--filter.denied', 'rm'], (0, 'AppArmor denials: 4 (since')),
|
||||
(['--filter.denied', 'r'], (0, 'AppArmor denials: 7 (since')),
|
||||
(['--filter.denied', '^r$'], (0, 'AppArmor denials: 3 (since')),
|
||||
(['--filter.denied', 'x|w'], (0, 'AppArmor denials: 3 (since')),
|
||||
(['--filter.denied', '^(?!rm).*'], (0, 'AppArmor denials: 6 (since')),
|
||||
(['--filter.denied', '.(?!m).*'], (0, 'AppArmor denials: 6 (since')),
|
||||
(['--filter.denied', 'r.?'], (0, 'AppArmor denials: 7 (since')),
|
||||
)
|
||||
login_params = ['-f', self.test_logfile_last_login, '-l']
|
||||
|
||||
for test in denied_tests:
|
||||
params = test[0]
|
||||
expected = test[1]
|
||||
|
||||
with self.subTest(params=params, expected=expected):
|
||||
expected_return_code = expected[0]
|
||||
expected_output_has = expected[1]
|
||||
|
||||
return_code, output = cmd(aanotify_bin + login_params + params)
|
||||
if 'ERROR: Could not find last login' in output:
|
||||
self.skipTest('Could not find last login')
|
||||
result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code)
|
||||
self.assertEqual(expected_return_code, return_code, result + output)
|
||||
result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has)
|
||||
self.assertIn(expected_output_has, output, result + output)
|
||||
|
||||
|
||||
setup_aa(aa) # Wrapper for aa.init_aa()
|
||||
setup_all_loops(__name__)
|
||||
if __name__ == '__main__':
|
||||
|
|
Loading…
Add table
Reference in a new issue