mirror of
https://gitlab.com/apparmor/apparmor.git
synced 2025-03-04 08:24:42 +01:00

... and not only `/something` Fixes: https://gitlab.com/apparmor/apparmor/-/issues/413#note_2008929033
259 lines
16 KiB
Python
259 lines
16 KiB
Python
#!/usr/bin/python3
|
|
# ----------------------------------------------------------------------
|
|
# Copyright (C) 2022 Canonical, Ltd.
|
|
#
|
|
# This program is free software; you can redistribute it and/or
|
|
# modify it under the terms of version 2 of the GNU General Public
|
|
# License as published by the Free Software Foundation.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# ----------------------------------------------------------------------
|
|
|
|
import unittest
|
|
from collections import namedtuple
|
|
from common_test import AATest, setup_all_loops
|
|
|
|
from apparmor.rule.mqueue import MessageQueueRule, MessageQueueRuleset
|
|
from apparmor.common import AppArmorException, AppArmorBug
|
|
from apparmor.translations import init_translation
|
|
_ = init_translation()
|
|
|
|
|
|
class MessageQueueTestParse(AATest):
|
|
tests = (
|
|
# access type label mqueue_name audit deny allow comment
|
|
('mqueue,', MessageQueueRule(MessageQueueRule.ALL, MessageQueueRule.ALL, MessageQueueRule.ALL, MessageQueueRule.ALL, False, False, False, '')),
|
|
('mqueue create,', MessageQueueRule(('create'), MessageQueueRule.ALL, MessageQueueRule.ALL, MessageQueueRule.ALL, False, False, False, '')),
|
|
('mqueue (create,open,delete),', MessageQueueRule(('create', 'open', 'delete'), MessageQueueRule.ALL, MessageQueueRule.ALL, MessageQueueRule.ALL, False, False, False, '')),
|
|
('mqueue (getattr,setattr),', MessageQueueRule(('getattr', 'setattr'), MessageQueueRule.ALL, MessageQueueRule.ALL, MessageQueueRule.ALL, False, False, False, '')),
|
|
('mqueue (write,read),', MessageQueueRule(('write', 'read'), MessageQueueRule.ALL, MessageQueueRule.ALL, MessageQueueRule.ALL, False, False, False, '')),
|
|
('mqueue (open,delete),', MessageQueueRule(('open', 'delete'), MessageQueueRule.ALL, MessageQueueRule.ALL, MessageQueueRule.ALL, False, False, False, '')),
|
|
('mqueue write label=foo,', MessageQueueRule(('write'), MessageQueueRule.ALL, 'foo', MessageQueueRule.ALL, False, False, False, '')),
|
|
('mqueue read label=foo /queue,', MessageQueueRule(('read'), MessageQueueRule.ALL, 'foo', '/queue', False, False, False, '')),
|
|
('audit mqueue read label=foo /queue,', MessageQueueRule(('read'), MessageQueueRule.ALL, 'foo', '/queue', True, False, False, '')),
|
|
('deny mqueue rw label=foo /queue,', MessageQueueRule(('rw'), MessageQueueRule.ALL, 'foo', '/queue', False, True, False, '')),
|
|
('audit allow mqueue r label=foo /queue,', MessageQueueRule(('r'), MessageQueueRule.ALL, 'foo', '/queue', True, False, True, '')),
|
|
('mqueue w label=foo 1234, # cmt', MessageQueueRule(('w'), MessageQueueRule.ALL, 'foo', '1234', False, False, False, ' # cmt')),
|
|
('mqueue wr 1234,', MessageQueueRule(('wr'), MessageQueueRule.ALL, MessageQueueRule.ALL, '1234', False, False, False, '')),
|
|
('mqueue 1234,', MessageQueueRule(MessageQueueRule.ALL, MessageQueueRule.ALL, MessageQueueRule.ALL, '1234', False, False, False, '')),
|
|
('mqueue type=sysv,', MessageQueueRule(MessageQueueRule.ALL, 'sysv', MessageQueueRule.ALL, MessageQueueRule.ALL, False, False, False, '')),
|
|
('mqueue type=posix,', MessageQueueRule(MessageQueueRule.ALL, 'posix', MessageQueueRule.ALL, MessageQueueRule.ALL, False, False, False, '')),
|
|
('mqueue type=sysv 1234,', MessageQueueRule(MessageQueueRule.ALL, 'sysv', MessageQueueRule.ALL, '1234', False, False, False, '')),
|
|
('mqueue type=posix /queue,', MessageQueueRule(MessageQueueRule.ALL, 'posix', MessageQueueRule.ALL, '/queue', False, False, False, '')),
|
|
('mqueue open type=sysv label=foo 1234,', MessageQueueRule(('open'), 'sysv', 'foo', '1234', False, False, False, '')),
|
|
('mqueue r type=posix /,', MessageQueueRule(('r'), 'posix', MessageQueueRule.ALL, '/', False, False, False, '')),
|
|
)
|
|
|
|
def _run_test(self, rawrule, expected):
|
|
self.assertTrue(MessageQueueRule.match(rawrule))
|
|
obj = MessageQueueRule.create_instance(rawrule)
|
|
expected.raw_rule = rawrule.strip()
|
|
self.assertTrue(obj.is_equal(expected, True))
|
|
|
|
|
|
class MessageQueueTestParseInvalid(AATest):
|
|
tests = (
|
|
('mqueue label=,', AppArmorException),
|
|
('mqueue invalidaccess /queuename,', AppArmorException),
|
|
('mqueue invalidqueuename,', AppArmorException),
|
|
('mqueue invalidqueuename1234,', AppArmorException),
|
|
('mqueue foo label foo bar,', AppArmorException),
|
|
('mqueue type=,', AppArmorException),
|
|
('mqueue type=sysv /foo,', AppArmorException),
|
|
('mqueue type=posix 1234,', AppArmorException),
|
|
)
|
|
|
|
def _run_test(self, rawrule, expected):
|
|
self.assertTrue(MessageQueueRule.match(rawrule)) # the above invalid rules still match the main regex!
|
|
with self.assertRaises(expected):
|
|
MessageQueueRule.create_instance(rawrule)
|
|
|
|
def test_parse_fail(self):
|
|
with self.assertRaises(AppArmorException):
|
|
MessageQueueRule.create_instance('foo,')
|
|
|
|
def test_diff_non_mqueuerule(self):
|
|
exp = namedtuple('exp', ('audit', 'deny'))
|
|
obj = MessageQueueRule(('open'), 'posix', 'bar', '/foo')
|
|
with self.assertRaises(AppArmorBug):
|
|
obj.is_equal(exp(False, False), False)
|
|
|
|
def test_diff_access(self):
|
|
obj1 = MessageQueueRule(('open'), 'posix', 'bar', '/foo')
|
|
obj2 = MessageQueueRule(('create'), 'posix', 'bar', '/foo')
|
|
self.assertFalse(obj1.is_equal(obj2, False))
|
|
|
|
def test_diff_type(self):
|
|
obj1 = MessageQueueRule(('open'), 'sysv', 'bar', MessageQueueRule.ALL)
|
|
obj2 = MessageQueueRule(('open'), 'posix', 'inv', MessageQueueRule.ALL)
|
|
self.assertFalse(obj1.is_equal(obj2, False))
|
|
|
|
def test_diff_label(self):
|
|
obj1 = MessageQueueRule(('open'), 'posix', 'bar', '/foo')
|
|
obj2 = MessageQueueRule(('open'), 'posix', 'inv', '/foo')
|
|
self.assertFalse(obj1.is_equal(obj2, False))
|
|
|
|
def test_diff_mqueue_name(self):
|
|
obj1 = MessageQueueRule(('open'), MessageQueueRule.ALL, 'bar', '/foo')
|
|
obj2 = MessageQueueRule(('open'), MessageQueueRule.ALL, 'bar', '123')
|
|
self.assertFalse(obj1.is_equal(obj2, False))
|
|
|
|
|
|
class InvalidMessageQueueInit(AATest):
|
|
tests = (
|
|
# init params expected exception
|
|
(('write', 'sysv', '', '/foo'), AppArmorBug), # empty label
|
|
(('write', '', 'bar', '/foo'), AppArmorBug), # empty type
|
|
(('', 'sysv', 'bar', '/foo'), AppArmorBug), # empty access
|
|
(('write', 'sysv', 'bar', ''), AppArmorBug), # empty mqueue_name
|
|
((' ', 'sysv', 'bar', '/foo'), AppArmorBug), # whitespace access
|
|
(('write', ' ', 'bar', '/foo'), AppArmorBug), # whitespace type
|
|
(('write', 'sysv', ' ', '/foo'), AppArmorBug), # whitespace label
|
|
(('write', 'sysv', 'bar', ' '), AppArmorBug), # whitespace mqueue_name
|
|
(('xyxy', 'sysv', 'bar', '/foo'), AppArmorException), # invalid access
|
|
((dict(), '', 'bar', '/foo'), AppArmorBug), # wrong type for access
|
|
((None, '', 'bar', '/foo'), AppArmorBug), # wrong type for access
|
|
(('write', dict(), 'bar', '/foo'), AppArmorBug), # wrong type for type
|
|
(('write', None, 'bar', '/foo'), AppArmorBug), # wrong type for type
|
|
(('write', '', dict(), '/foo'), AppArmorBug), # wrong type for label
|
|
(('write', '', None, '/foo'), AppArmorBug), # wrong type for label
|
|
(('write', '', 'bar', dict()), AppArmorBug), # wrong type for mqueue_name
|
|
(('write', '', 'bar', None), AppArmorBug), # wrong type for mqueue_name
|
|
)
|
|
|
|
def _run_test(self, params, expected):
|
|
with self.assertRaises(expected):
|
|
MessageQueueRule(*params)
|
|
|
|
def test_missing_params_1(self):
|
|
with self.assertRaises(TypeError):
|
|
MessageQueueRule()
|
|
|
|
def test_missing_params_2(self):
|
|
with self.assertRaises(TypeError):
|
|
MessageQueueRule('r')
|
|
|
|
def test_missing_params_3(self):
|
|
with self.assertRaises(TypeError):
|
|
MessageQueueRule('r', 'sysv')
|
|
|
|
def test_missing_params_4(self):
|
|
with self.assertRaises(TypeError):
|
|
MessageQueueRule('r', 'sysv', 'foo')
|
|
|
|
|
|
class WriteMessageQueueTestAATest(AATest):
|
|
tests = (
|
|
# raw rule clean rule
|
|
(' mqueue , # foo ', 'mqueue, # foo'),
|
|
(' audit mqueue create,', 'audit mqueue create,'),
|
|
(' audit mqueue (open ),', 'audit mqueue open,'),
|
|
(' audit mqueue (delete , read ),', 'audit mqueue (delete read),'),
|
|
(' deny mqueue write label=bar,# foo bar', 'deny mqueue write label=bar, # foo bar'),
|
|
(' deny mqueue open ,# foo bar', 'deny mqueue open, # foo bar'),
|
|
(' allow mqueue label=tst ,# foo bar', 'allow mqueue label=tst, # foo bar'),
|
|
('mqueue,', 'mqueue,'),
|
|
('mqueue (read),', 'mqueue read,'),
|
|
('mqueue (create),', 'mqueue create,'),
|
|
('mqueue (write read),', 'mqueue (read write),'),
|
|
('mqueue (open,create,open,delete,write,read),', 'mqueue (create delete open read write),'),
|
|
('mqueue r,', 'mqueue r,'),
|
|
('mqueue w,', 'mqueue w,'),
|
|
('mqueue rw,', 'mqueue rw,'),
|
|
('mqueue delete label="tst",', 'mqueue delete label="tst",'),
|
|
('mqueue (getattr) label=bar,', 'mqueue getattr label=bar,'),
|
|
('mqueue getattr /foo,', 'mqueue getattr /foo,'),
|
|
('mqueue (setattr getattr) 1234,', 'mqueue (getattr setattr) 1234,'),
|
|
('mqueue wr label=tst 1234,', 'mqueue wr label=tst 1234,'),
|
|
('mqueue wr type=sysv label=tst 1234,', 'mqueue wr type=sysv label=tst 1234,'),
|
|
('mqueue wr type=posix label=tst /foo,', 'mqueue wr type=posix label=tst /foo,'),
|
|
)
|
|
|
|
def _run_test(self, rawrule, expected):
|
|
self.assertTrue(MessageQueueRule.match(rawrule))
|
|
obj = MessageQueueRule.create_instance(rawrule)
|
|
clean = obj.get_clean()
|
|
raw = obj.get_raw()
|
|
|
|
self.assertEqual(expected.strip(), clean, 'unexpected clean rule')
|
|
self.assertEqual(rawrule.strip(), raw, 'unexpected raw rule')
|
|
|
|
def test_write_manually(self):
|
|
obj = MessageQueueRule('setattr', 'posix', 'bar', '/foo', allow_keyword=True)
|
|
|
|
expected = ' allow mqueue setattr type=posix label=bar /foo,'
|
|
|
|
self.assertEqual(expected, obj.get_clean(2), 'unexpected clean rule')
|
|
self.assertEqual(expected, obj.get_raw(2), 'unexpected raw rule')
|
|
|
|
def test_write_invalid_access(self):
|
|
obj = MessageQueueRule('setattr', 'posix', 'bar', '/foo')
|
|
obj.access = ''
|
|
with self.assertRaises(AppArmorBug):
|
|
obj.get_clean()
|
|
|
|
def test_write_invalid_type(self):
|
|
obj = MessageQueueRule('setattr', 'posix', 'bar', '/foo')
|
|
obj.mqueue_type = ''
|
|
with self.assertRaises(AppArmorBug):
|
|
obj.get_clean()
|
|
|
|
def test_write_invalid_label(self):
|
|
obj = MessageQueueRule('setattr', 'posix', 'bar', '/foo')
|
|
obj.label = ''
|
|
with self.assertRaises(AppArmorBug):
|
|
obj.get_clean()
|
|
|
|
def test_write_invalid_mqueue_name(self):
|
|
obj = MessageQueueRule('setattr', 'posix', 'bar', '/foo')
|
|
obj.mqueue_name = ''
|
|
with self.assertRaises(AppArmorBug):
|
|
obj.get_clean()
|
|
|
|
|
|
class MessageQueueIsCoveredTest(AATest):
|
|
def test_is_covered(self):
|
|
obj = MessageQueueRule(('create'), MessageQueueRule.ALL, 'f*', MessageQueueRule.ALL)
|
|
self.assertTrue(obj.is_covered(MessageQueueRule(('create'), 'sysv', 'f*', '1234')))
|
|
self.assertTrue(obj.is_covered(MessageQueueRule(('create'), 'posix', 'f*', MessageQueueRule.ALL)))
|
|
self.assertTrue(obj.is_covered(MessageQueueRule(('create'), 'sysv', 'foo', MessageQueueRule.ALL)))
|
|
self.assertTrue(obj.is_covered(MessageQueueRule(('create'), 'sysv', 'foo', '1234')))
|
|
|
|
def test_is_not_covered(self):
|
|
obj = MessageQueueRule(('getattr'), 'sysv', 'f*', '1234')
|
|
self.assertFalse(obj.is_covered(MessageQueueRule(('create'), 'sysv', 'foo', MessageQueueRule.ALL)))
|
|
self.assertFalse(obj.is_covered(MessageQueueRule(('getattr'), 'posix', 'foo', MessageQueueRule.ALL)))
|
|
self.assertFalse(obj.is_covered(MessageQueueRule(('getattr'), 'sysv', 'bar', MessageQueueRule.ALL)))
|
|
self.assertFalse(obj.is_covered(MessageQueueRule(('getattr'), 'sysv', 'foo', '123')))
|
|
|
|
|
|
class MessageQueueLogprofHeaderTest(AATest):
|
|
tests = (
|
|
('mqueue,', [ _('Access mode'), _('ALL'), _('Type'), _('ALL'), _('Label'), _('ALL'), _('Message queue name'), _('ALL'), ]), # noqa: E201
|
|
('mqueue (create,getattr) 12,', [ _('Access mode'), 'create getattr', _('Type'), _('ALL'), _('Label'), _('ALL'), _('Message queue name'), '12', ]), # noqa: E201
|
|
('mqueue write label=bar,', [ _('Access mode'), 'write', _('Type'), _('ALL'), _('Label'), 'bar', _('Message queue name'), _('ALL'), ]), # noqa: E201
|
|
('mqueue write type=sysv,', [ _('Access mode'), 'write', _('Type'), 'sysv', _('Label'), _('ALL'), _('Message queue name'), _('ALL'), ]), # noqa: E201
|
|
('mqueue read type=posix,', [ _('Access mode'), 'read', _('Type'), 'posix', _('Label'), _('ALL'), _('Message queue name'), _('ALL'), ]), # noqa: E201
|
|
('deny mqueue read /foo,', [_('Qualifier'), 'deny', _('Access mode'), 'read', _('Type'), _('ALL'), _('Label'), _('ALL'), _('Message queue name'), '/foo', ]),
|
|
('allow mqueue setattr,', [_('Qualifier'), 'allow', _('Access mode'), 'setattr', _('Type'), _('ALL'), _('Label'), _('ALL'), _('Message queue name'), _('ALL'), ]),
|
|
('audit mqueue r label=ba 12,', [_('Qualifier'), 'audit', _('Access mode'), 'r', _('Type'), _('ALL'), _('Label'), 'ba', _('Message queue name'), '12', ]),
|
|
('audit deny mqueue rw,', [_('Qualifier'), 'audit deny', _('Access mode'), 'rw', _('Type'), _('ALL'), _('Label'), _('ALL'), _('Message queue name'), _('ALL'), ]),
|
|
)
|
|
|
|
def _run_test(self, params, expected):
|
|
obj = MessageQueueRule.create_instance(params)
|
|
self.assertEqual(obj.logprof_header(), expected)
|
|
|
|
|
|
class MessageQueueGlobTestAATest(AATest):
|
|
def test_glob(self):
|
|
self.assertEqual(MessageQueueRuleset().get_glob('mqueue create,'), 'mqueue,')
|
|
|
|
|
|
setup_all_loops(__name__)
|
|
if __name__ == '__main__':
|
|
unittest.main(verbosity=1)
|