From 4608d32628eb3302d230eed4d327870a2c1db9ca Mon Sep 17 00:00:00 2001 From: Georgia Garcia Date: Fri, 23 Feb 2024 17:15:14 -0300 Subject: [PATCH] aa-notify: add notification filtering Allow notification filtering of the fields profile, operation, name, denied_mask, net_family and net_socket using regex. Both command line and config options in notify.conf are available. Signed-off-by: Georgia Garcia --- utils/aa-notify | 100 ++++++++++-- utils/notify.conf | 8 + utils/test/test-aa-notify.py | 309 +++++++++++++++++++++++++++++++++-- 3 files changed, 394 insertions(+), 23 deletions(-) diff --git a/utils/aa-notify b/utils/aa-notify index d2394ccef..d38510ebe 100755 --- a/utils/aa-notify +++ b/utils/aa-notify @@ -92,7 +92,24 @@ def format_event(event, logsource): return "\n".join(output) -def notify_about_new_entries(logfile, wait=0): +def is_event_in_filter(event, filters): + """Checks if event is in filter""" + if filters['profile'] and event.profile and not re.match(filters['profile'], event.profile): + return False + if filters['operation'] and event.operation and not re.match(filters['operation'], event.operation): + return False + if filters['name'] and event.name and not re.match(filters['name'], event.name): + return False + if filters['denied_mask'] and event.denied_mask and not re.match(filters['denied_mask'], event.denied_mask): + return False + if filters['net_family'] and event.net_family and not re.match(filters['net_family'], event.net_family): + return False + if filters['net_sock_type'] and event.net_sock_type and not re.match(filters['net_sock_type'], event.net_sock_type): + return False + return True + + +def notify_about_new_entries(logfile, filters, wait=0): """Run the notification daemon in the background.""" # Kill other instances of aa-notify if already running for process in psutil.process_iter(): @@ -109,6 +126,8 @@ def notify_about_new_entries(logfile, wait=0): # Rate limit to not show too many notifications try: for event in follow_apparmor_events(logfile, wait): + if not is_event_in_filter(event, filters): + continue debug_logger.info(format_event(event, logfile)) yield (format_event(event, logfile)) except PermissionError: @@ -121,10 +140,12 @@ def notify_about_new_entries(logfile, wait=0): os._exit(0) # Exit child without calling exit handlers etc -def show_entries_since_epoch(logfile, epoch_since): +def show_entries_since_epoch(logfile, epoch_since, filters): """Show AppArmor notifications since given timestamp.""" count = 0 for event in get_apparmor_events(logfile, epoch_since): + if not is_event_in_filter(event, filters): + continue count += 1 if args.verbose: print(format_event(event, logfile)) @@ -145,7 +166,7 @@ def show_entries_since_epoch(logfile, epoch_since): print(_('For more information, please see: {}').format(debug_docs_url)) -def show_entries_since_last_login(logfile, username=get_user_login()): +def show_entries_since_last_login(logfile, filters, username=get_user_login()): """Show AppArmor notifications since last login of user.""" # If running as sudo, use username of sudo user instead of root if 'SUDO_USER' in os.environ.keys(): @@ -158,15 +179,15 @@ def show_entries_since_last_login(logfile, username=get_user_login()): if epoch_since == 0: print(_('ERROR: Could not find last login'), file=sys.stderr) sys.exit(1) - show_entries_since_epoch(logfile, epoch_since) + show_entries_since_epoch(logfile, epoch_since, filters) -def show_entries_since_days(logfile, since_days): +def show_entries_since_days(logfile, since_days, filters): """Show AppArmor notifications since the given amount of days.""" day_in_seconds = 60 * 60 * 24 epoch_now = int(time.time()) epoch_since = epoch_now - day_in_seconds * since_days - show_entries_since_epoch(logfile, epoch_since) + show_entries_since_epoch(logfile, epoch_since, filters) def follow_apparmor_events(logfile, wait=0): @@ -398,6 +419,17 @@ def main(): parser.add_argument('--debug', action='store_true', help=_('debug mode')) parser.add_argument('--configdir', type=str, help=argparse.SUPPRESS) + filter_group = parser.add_argument_group('Filtering options', + description=('Filters are used to reduce the output of information to only ' + 'those entries that will match the filter. Filters use Python\'s regular ' + 'expression syntax.')) + filter_group.add_argument('--filter.profile', metavar='PROFILE', help=_('regular expression to match the profile')) + filter_group.add_argument('--filter.operation', metavar='OPERATION', help=_('regular expression to match the operation')) + filter_group.add_argument('--filter.name', metavar='NAME', help=_('regular expression to match the name')) + filter_group.add_argument('--filter.denied', metavar='DENIED', help=_('regular expression to match the denied mask')) + filter_group.add_argument('--filter.family', metavar='FAMILY', help=_('regular expression to match the network family')) + filter_group.add_argument('--filter.socket', metavar='SOCKET', help=_('regular expression to match the network socket type')) + # If a TTY then assume running in test mode and fix output width if not sys.stdout.isatty(): parser.formatter_class = lambda prog: argparse.HelpFormatter(prog, width=80) @@ -463,6 +495,12 @@ def main(): - message_body - message_footer - use_group + - filter.profile, + - filter.operation, + - filter.name, + - filter.denied, + - filter.family, + - filter.socket, """ # # Config checks @@ -472,7 +510,13 @@ def main(): 'use_group', 'show_notifications', 'message_body', - 'message_footer' + 'message_footer', + 'filter.profile', + 'filter.operation', + 'filter.name', + 'filter.denied', + 'filter.family', + 'filter.socket', ] found_config_keys = config[''].keys() unknown_keys = [ @@ -481,6 +525,42 @@ def main(): for item in unknown_keys: print(_('Warning! Configuration item "{}" is unknown!').format(item)) + filters = { + 'profile': '', + 'operation': '', + 'name': '', + 'denied_mask': '', + 'net_family': '', + 'net_sock_type': '', + } + + if 'filter.profile' in config['']: + filters['profile'] = config['']['filter.profile'] + if 'filter.operation' in config['']: + filters['operation'] = config['']['filter.operation'] + if 'filter.name' in config['']: + filters['name'] = config['']['filter.name'] + if 'filter.denied' in config['']: + filters['denied_mask'] = config['']['filter.denied'] + if 'filter.family' in config['']: + filters['net_family'] = config['']['filter.family'] + if 'filter.socket' in config['']: + filters['net_sock_type'] = config['']['filter.socket'] + + # command line filters override notify.conf + if getattr(args, 'filter.profile'): + filters['profile'] = getattr(args, 'filter.profile') + if getattr(args, 'filter.operation'): + filters['operation'] = getattr(args, 'filter.operation') + if getattr(args, 'filter.name'): + filters['name'] = getattr(args, 'filter.name') + if getattr(args, 'filter.denied'): + filters['denied_mask'] = getattr(args, 'filter.denied') + if getattr(args, 'filter.family'): + filters['net_family'] = getattr(args, 'filter.family') + if getattr(args, 'filter.socket'): + filters['net_sock_type'] = getattr(args, 'filter.socket') + # Warn if use_group is defined and current group does not match defined if 'use_group' in config['']: user = pwd.getpwuid(os.geteuid())[0] @@ -535,7 +615,7 @@ def main(): # has been opened and access granted. Further reads of the file will not # trigger any new permission checks. # @TODO Plan to catch PermissionError here or..? - for message in notify_about_new_entries(logfile, args.wait): + for message in notify_about_new_entries(logfile, filters, args.wait): # Notifications should not be run as root, since root probably is # the wrong desktop user and not the one getting the notifications. @@ -562,9 +642,9 @@ def main(): raise_privileges() elif args.since_last: - show_entries_since_last_login(logfile) + show_entries_since_last_login(logfile, filters) elif args.since_days: - show_entries_since_days(logfile, args.since_days) + show_entries_since_days(logfile, args.since_days, filters) else: parser.print_help() diff --git a/utils/notify.conf b/utils/notify.conf index 03854838e..e44d9eec1 100644 --- a/utils/notify.conf +++ b/utils/notify.conf @@ -20,3 +20,11 @@ show_notifications="yes" # OPTIONAL - custom notification message footer # message_footer="For more information visit https://foo.com" + +# OPTIONAL - custom notification filtering +# filter.profile="" +# filter.operation="" +# filter.name="" +# filter.denied="" +# filter.family="" +# filter.socket="" diff --git a/utils/test/test-aa-notify.py b/utils/test/test-aa-notify.py index 4af38f57a..6635f154e 100644 --- a/utils/test/test-aa-notify.py +++ b/utils/test/test-aa-notify.py @@ -65,9 +65,9 @@ def cmd(command): return sp.returncode, out.decode('utf-8') -class AANotifyTest(AATest): +class AANotifyBase(AATest): - def create_logfile_contents(self, _time): + def create_logfile_contents(_time): """Create temporary log file with 30 entries of different age""" test_logfile_contents_999_days_old = \ @@ -120,13 +120,14 @@ Feb 4 13:40:38 XPS-13-9370 kernel: [128552.880347] audit: type=1400 audit({epoc + test_logfile_contents_unrelevant_entries \ + test_logfile_contents_0_seconds_old - def AASetup(self): + @classmethod + def setUpClass(cls): file_current = NamedTemporaryFile("w+", prefix='test-aa-notify-', delete=False) file_last_login = NamedTemporaryFile("w+", prefix='test-aa-notify-', delete=False) - self.test_logfile_current = file_current.name - self.test_logfile_last_login = file_last_login.name + cls.test_logfile_current = file_current.name + cls.test_logfile_last_login = file_last_login.name - current_time_contents = self.create_logfile_contents(time.time()) + current_time_contents = cls.create_logfile_contents(time.time()) file_current.write(current_time_contents) if os.path.isfile('/var/log/wtmp'): @@ -146,16 +147,20 @@ Feb 4 13:40:38 XPS-13-9370 kernel: [128552.880347] audit: type=1400 audit({epoc last_login = output.split()[3] last_login_epoch = datetime.fromisoformat(last_login).timestamp() # add 60 seconds to the epoch so that the time in the logs are AFTER login time - last_login_contents = self.create_logfile_contents(last_login_epoch + 60) + last_login_contents = cls.create_logfile_contents(last_login_epoch + 60) file_last_login.write(last_login_contents) - def AATeardown(self): + @classmethod + def tearDownClass(cls): """Remove temporary log file after tests ended""" - if self.test_logfile_current and os.path.exists(self.test_logfile_current): - os.remove(self.test_logfile_current) - if self.test_logfile_last_login and os.path.exists(self.test_logfile_last_login): - os.remove(self.test_logfile_last_login) + if cls.test_logfile_current and os.path.exists(cls.test_logfile_current): + os.remove(cls.test_logfile_current) + if cls.test_logfile_last_login and os.path.exists(cls.test_logfile_last_login): + os.remove(cls.test_logfile_last_login) + + +class AANotifyTest(AANotifyBase): # The Perl aa-notify script was written so, that it will checked for kern.log # before printing help when invoked without arguments (sic!). @@ -178,13 +183,17 @@ Feb 4 13:40:38 XPS-13-9370 kernel: [128552.880347] audit: type=1400 audit({epoc expected_return_code = 0 expected_output_1 = \ '''usage: aa-notify [-h] [-p] [--display DISPLAY] [-f FILE] [-l] [-s NUM] [-v] - [-u USER] [-w NUM] [--debug] + [-u USER] [-w NUM] [--debug] [--filter.profile PROFILE] + [--filter.operation OPERATION] [--filter.name NAME] + [--filter.denied DENIED] [--filter.family FAMILY] + [--filter.socket SOCKET] Display AppArmor notifications or messages for DENIED entries. ''' expected_output_2 = \ ''' +options: -h, --help show this help message and exit -p, --poll poll AppArmor logs and display notifications --display DISPLAY set the DISPLAY environment variable (might be needed if @@ -199,6 +208,22 @@ Display AppArmor notifications or messages for DENIED entries. -w NUM, --wait NUM wait NUM seconds before displaying notifications (with -p) --debug debug mode + +Filtering options: + Filters are used to reduce the output of information to only those entries + that will match the filter. Filters use Python's regular expression syntax. + + --filter.profile PROFILE + regular expression to match the profile + --filter.operation OPERATION + regular expression to match the operation + --filter.name NAME regular expression to match the name + --filter.denied DENIED + regular expression to match the denied mask + --filter.family FAMILY + regular expression to match the network family + --filter.socket SOCKET + regular expression to match the network socket type ''' return_code, output = cmd(aanotify_bin + ['--help']) @@ -312,6 +337,264 @@ AppArmor denials: 10 (since'''.format(logfile=self.test_logfile_last_login) self.assertIn(expected_output_has, output, result + output) +class AANotifyProfileFilterTest(AANotifyBase): + + def test_profile_regex_since_100_days(self): + profile_tests = ( + (['--filter.profile', 'libreoffice'], (0, 'AppArmor denials: 20 (since')), + (['--filter.profile', 'libreoffice-soffice'], (0, 'AppArmor denials: 20 (since')), + (['--filter.profile', 'libreoffice-soffice$'], (0, 'AppArmor denials: 4 (since')), + (['--filter.profile', '^libreoffice-soffice$'], (0, 'AppArmor denials: 4 (since')), + (['--filter.profile', 'libreoffice-soffice//null-/bin/uname'], (0, 'AppArmor denials: 14 (since')), + (['--filter.profile', 'uname'], (0, 'AppArmor denials: 0 (since')), + (['--filter.profile', '.*uname'], (0, 'AppArmor denials: 14 (since')), + (['--filter.profile', 'libreoffice-soffice//null-/.*'], (0, 'AppArmor denials: 16 (since')), + (['--filter.profile', 'libreoffice-soffice//null-/foo'], (0, 'AppArmor denials: 0 (since')), + (['--filter.profile', 'libreoffice-soffice/foo'], (0, 'AppArmor denials: 0 (since')), + (['--filter.profile', 'bar'], (0, 'AppArmor denials: 0 (since')), + ) + days_params = ['-f', self.test_logfile_current, '-s', '100'] + + for test in profile_tests: + params = test[0] + expected = test[1] + + with self.subTest(params=params, expected=expected): + expected_return_code = expected[0] + expected_output_has = expected[1] + + return_code, output = cmd(aanotify_bin + days_params + params) + result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) + self.assertEqual(expected_return_code, return_code, result + output) + result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has) + self.assertIn(expected_output_has, output, result + output) + + @unittest.skipUnless(os.path.isfile('/var/log/wtmp'), 'Requires wtmp on system') + def test_profile_regex_since_login(self): + profile_tests = ( + (['--filter.profile', 'libreoffice'], (0, 'AppArmor denials: 10 (since')), + (['--filter.profile', 'libreoffice-soffice'], (0, 'AppArmor denials: 10 (since')), + (['--filter.profile', 'libreoffice-soffice$'], (0, 'AppArmor denials: 2 (since')), + (['--filter.profile', '^libreoffice-soffice$'], (0, 'AppArmor denials: 2 (since')), + (['--filter.profile', 'libreoffice-soffice//null-/bin/uname'], (0, 'AppArmor denials: 7 (since')), + (['--filter.profile', 'uname'], (0, 'AppArmor denials: 0 (since')), + (['--filter.profile', '.*uname'], (0, 'AppArmor denials: 7 (since')), + (['--filter.profile', 'libreoffice-soffice//null-/.*'], (0, 'AppArmor denials: 8 (since')), + (['--filter.profile', 'libreoffice-soffice//null-/foo'], (0, 'AppArmor denials: 0 (since')), + (['--filter.profile', 'libreoffice-soffice/foo'], (0, 'AppArmor denials: 0 (since')), + (['--filter.profile', 'bar'], (0, 'AppArmor denials: 0 (since')), + ) + login_params = ['-f', self.test_logfile_last_login, '-l'] + + for test in profile_tests: + params = test[0] + expected = test[1] + + with self.subTest(params=params, expected=expected): + expected_return_code = expected[0] + expected_output_has = expected[1] + + return_code, output = cmd(aanotify_bin + login_params + params) + if 'ERROR: Could not find last login' in output: + self.skipTest('Could not find last login') + result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) + self.assertEqual(expected_return_code, return_code, result + output) + result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has) + self.assertIn(expected_output_has, output, result + output) + + +class AANotifyOperationFilterTest(AANotifyBase): + + def test_operation_regex_since_100_days(self): + operation_tests = ( + (['--filter.operation', 'exec'], (0, 'AppArmor denials: 4 (since')), + (['--filter.operation', 'file_inherit'], (0, 'AppArmor denials: 2 (since')), + (['--filter.operation', 'file_mmap'], (0, 'AppArmor denials: 8 (since')), + (['--filter.operation', 'open'], (0, 'AppArmor denials: 6 (since')), + (['--filter.operation', 'file.*'], (0, 'AppArmor denials: 10 (since')), + (['--filter.operation', 'profile_load'], (0, 'AppArmor denials: 0 (since')), + (['--filter.operation', 'profile_replace'], (0, 'AppArmor denials: 0 (since')), + (['--filter.operation', 'bar'], (0, 'AppArmor denials: 0 (since')), + (['--filter.operation', 'userns_create'], (0, 'AppArmor denials: 0 (since')), + ) + days_params = ['-f', self.test_logfile_current, '-s', '100'] + + for test in operation_tests: + params = test[0] + expected = test[1] + + with self.subTest(params=params, expected=expected): + expected_return_code = expected[0] + expected_output_has = expected[1] + + return_code, output = cmd(aanotify_bin + days_params + params) + result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) + self.assertEqual(expected_return_code, return_code, result + output) + result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has) + self.assertIn(expected_output_has, output, result + output) + + @unittest.skipUnless(os.path.isfile('/var/log/wtmp'), 'Requires wtmp on system') + def test_operation_regex_since_login(self): + operation_tests = ( + (['--filter.operation', 'exec'], (0, 'AppArmor denials: 2 (since')), + (['--filter.operation', 'file_inherit'], (0, 'AppArmor denials: 1 (since')), + (['--filter.operation', 'file_mmap'], (0, 'AppArmor denials: 4 (since')), + (['--filter.operation', 'open'], (0, 'AppArmor denials: 3 (since')), + (['--filter.operation', 'file.*'], (0, 'AppArmor denials: 5 (since')), + (['--filter.operation', 'profile_load'], (0, 'AppArmor denials: 0 (since')), + (['--filter.operation', 'profile_replace'], (0, 'AppArmor denials: 0 (since')), + (['--filter.operation', 'bar'], (0, 'AppArmor denials: 0 (since')), + (['--filter.operation', 'userns_create'], (0, 'AppArmor denials: 0 (since')), + ) + login_params = ['-f', self.test_logfile_last_login, '-l'] + + for test in operation_tests: + params = test[0] + expected = test[1] + + with self.subTest(params=params, expected=expected): + expected_return_code = expected[0] + expected_output_has = expected[1] + + return_code, output = cmd(aanotify_bin + login_params + params) + if 'ERROR: Could not find last login' in output: + self.skipTest('Could not find last login') + result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) + self.assertEqual(expected_return_code, return_code, result + output) + result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has) + self.assertIn(expected_output_has, output, result + output) + + +class AANotifyNameFilterTest(AANotifyBase): + + def test_name_regex_since_100_days(self): + name_tests = ( + (['--filter.name', '/bin/uname'], (0, 'AppArmor denials: 4 (since')), + (['--filter.name', '/dev/null'], (0, 'AppArmor denials: 2 (since')), + (['--filter.name', '/lib/x86_64-linux-gnu/ld-2.27.so'], (0, 'AppArmor denials: 2 (since')), + (['--filter.name', '/lib/x86_64-linux-gnu/libc-2.27.so'], (0, 'AppArmor denials: 4 (since')), + (['--filter.name', '/etc/ld.so.cache'], (0, 'AppArmor denials: 2 (since')), + (['--filter.name', '/usr/lib/locale/locale-archive'], (0, 'AppArmor denials: 2 (since')), + (['--filter.name', '/usr/bin/file'], (0, 'AppArmor denials: 4 (since')), + (['--filter.name', '/'], (0, 'AppArmor denials: 20 (since')), + (['--filter.name', '/.*'], (0, 'AppArmor denials: 20 (since')), + (['--filter.name', '.*bin.*'], (0, 'AppArmor denials: 8 (since')), + (['--filter.name', '/(usr/)?bin.*'], (0, 'AppArmor denials: 8 (since')), + (['--filter.name', '/foo'], (0, 'AppArmor denials: 0 (since')), + ) + days_params = ['-f', self.test_logfile_current, '-s', '100'] + + for test in name_tests: + params = test[0] + expected = test[1] + + with self.subTest(params=params, expected=expected): + expected_return_code = expected[0] + expected_output_has = expected[1] + + return_code, output = cmd(aanotify_bin + days_params + params) + result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) + self.assertEqual(expected_return_code, return_code, result + output) + result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has) + self.assertIn(expected_output_has, output, result + output) + + @unittest.skipUnless(os.path.isfile('/var/log/wtmp'), 'Requires wtmp on system') + def test_name_regex_since_login(self): + name_tests = ( + (['--filter.name', '/bin/uname'], (0, 'AppArmor denials: 2 (since')), + (['--filter.name', '/dev/null'], (0, 'AppArmor denials: 1 (since')), + (['--filter.name', '/lib/x86_64-linux-gnu/ld-2.27.so'], (0, 'AppArmor denials: 1 (since')), + (['--filter.name', '/lib/x86_64-linux-gnu/libc-2.27.so'], (0, 'AppArmor denials: 2 (since')), + (['--filter.name', '/etc/ld.so.cache'], (0, 'AppArmor denials: 1 (since')), + (['--filter.name', '/usr/lib/locale/locale-archive'], (0, 'AppArmor denials: 1 (since')), + (['--filter.name', '/usr/bin/file'], (0, 'AppArmor denials: 2 (since')), + (['--filter.name', '/'], (0, 'AppArmor denials: 10 (since')), + (['--filter.name', '/.*'], (0, 'AppArmor denials: 10 (since')), + (['--filter.name', '.*bin.*'], (0, 'AppArmor denials: 4 (since')), + (['--filter.name', '/(usr/)?bin.*'], (0, 'AppArmor denials: 4 (since')), + (['--filter.name', '/foo'], (0, 'AppArmor denials: 0 (since')), + ) + login_params = ['-f', self.test_logfile_last_login, '-l'] + + for test in name_tests: + params = test[0] + expected = test[1] + + with self.subTest(params=params, expected=expected): + expected_return_code = expected[0] + expected_output_has = expected[1] + + return_code, output = cmd(aanotify_bin + login_params + params) + if 'ERROR: Could not find last login' in output: + self.skipTest('Could not find last login') + result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) + self.assertEqual(expected_return_code, return_code, result + output) + result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has) + self.assertIn(expected_output_has, output, result + output) + + +class AANotifyDeniedFilterTest(AANotifyBase): + + def test_denied_regex_since_100_days(self): + denied_tests = ( + (['--filter.denied', 'x'], (0, 'AppArmor denials: 4 (since')), + (['--filter.denied', 'w'], (0, 'AppArmor denials: 2 (since')), + (['--filter.denied', 'rm'], (0, 'AppArmor denials: 8 (since')), + (['--filter.denied', 'r'], (0, 'AppArmor denials: 14 (since')), + (['--filter.denied', '^r$'], (0, 'AppArmor denials: 6 (since')), + (['--filter.denied', 'x|w'], (0, 'AppArmor denials: 6 (since')), + (['--filter.denied', '^(?!rm).*'], (0, 'AppArmor denials: 12 (since')), + (['--filter.denied', '.(?!m).*'], (0, 'AppArmor denials: 12 (since')), + (['--filter.denied', 'r.?'], (0, 'AppArmor denials: 14 (since')), + ) + days_params = ['-f', self.test_logfile_current, '-s', '100'] + + for test in denied_tests: + params = test[0] + expected = test[1] + + with self.subTest(params=params, expected=expected): + expected_return_code = expected[0] + expected_output_has = expected[1] + + return_code, output = cmd(aanotify_bin + days_params + params) + result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) + self.assertEqual(expected_return_code, return_code, result + output) + result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has) + self.assertIn(expected_output_has, output, result + output) + + @unittest.skipUnless(os.path.isfile('/var/log/wtmp'), 'Requires wtmp on system') + def test_denied_regex_since_login(self): + denied_tests = ( + (['--filter.denied', 'x'], (0, 'AppArmor denials: 2 (since')), + (['--filter.denied', 'w'], (0, 'AppArmor denials: 1 (since')), + (['--filter.denied', 'rm'], (0, 'AppArmor denials: 4 (since')), + (['--filter.denied', 'r'], (0, 'AppArmor denials: 7 (since')), + (['--filter.denied', '^r$'], (0, 'AppArmor denials: 3 (since')), + (['--filter.denied', 'x|w'], (0, 'AppArmor denials: 3 (since')), + (['--filter.denied', '^(?!rm).*'], (0, 'AppArmor denials: 6 (since')), + (['--filter.denied', '.(?!m).*'], (0, 'AppArmor denials: 6 (since')), + (['--filter.denied', 'r.?'], (0, 'AppArmor denials: 7 (since')), + ) + login_params = ['-f', self.test_logfile_last_login, '-l'] + + for test in denied_tests: + params = test[0] + expected = test[1] + + with self.subTest(params=params, expected=expected): + expected_return_code = expected[0] + expected_output_has = expected[1] + + return_code, output = cmd(aanotify_bin + login_params + params) + if 'ERROR: Could not find last login' in output: + self.skipTest('Could not find last login') + result = 'Got return code {}, expected {}\n'.format(return_code, expected_return_code) + self.assertEqual(expected_return_code, return_code, result + output) + result = 'Got output "{}", expected "{}"\n'.format(output, expected_output_has) + self.assertIn(expected_output_has, output, result + output) + + setup_aa(aa) # Wrapper for aa.init_aa() setup_all_loops(__name__) if __name__ == '__main__':