mirror of
https://gitlab.com/apparmor/apparmor.git
synced 2025-03-04 08:24:42 +01:00
NetworkRule: Add support for fine-grained mediation rules
This commit is contained in:
parent
1457eada8b
commit
5b08e06186
17 changed files with 364 additions and 229 deletions
|
@ -1,4 +1,4 @@
|
|||
/usr/lib/NetworkManager/nm-dhcp-client.action {
|
||||
network inet6 dgram,
|
||||
network inet6 dgram port=10580,
|
||||
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/usr/sbin/apache2 {
|
||||
network inet6 stream,
|
||||
network inet6 stream ip=::ffff:192.168.236.159 port=80 peer=(ip=::ffff:192.168.103.80 port=61985),
|
||||
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
/usr/sbin/apache2 {
|
||||
|
||||
^www.xxxxxxxxxx.co.uk {
|
||||
network inet6 stream,
|
||||
network (send) inet6 stream ip=::ffff:192.168.1.100 port=80 peer=(ip=::ffff:192.168.1.100 port=45658),
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
/usr/local/apache-tomcat-8.0.33/bin/catalina.sh {
|
||||
|
||||
^/usr/local/jdk1.8.0_92/bin/java {
|
||||
network inet6 stream,
|
||||
network (receive) inet6 stream ip=::ffff:127.0.0.1 port=8080 peer=(ip=::ffff:127.0.0.1 port=52308),
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/usr/bin/evince-thumbnailer {
|
||||
network inet stream,
|
||||
network inet stream ip=192.168.66.150 port=765 peer=(ip=192.168.66.200 port=2049),
|
||||
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/usr/bin/evince-thumbnailer {
|
||||
network inet stream,
|
||||
network inet stream port=765 peer=(port=2049),
|
||||
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/usr/lib/dovecot/imap-login {
|
||||
network inet6 stream,
|
||||
network inet6 stream port=143,
|
||||
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/home/ubuntu/tmp/nc {
|
||||
network inet6 stream,
|
||||
network inet6 stream ip=::1 port=2048 peer=(ip=::1 port=33986),
|
||||
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/home/ubuntu/tmp/nc {
|
||||
network inet6 stream,
|
||||
network inet6 stream ip=::ffff:127.0.0.1 port=2048 peer=(ip=::ffff:127.0.0.1 port=59180),
|
||||
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
/usr/bin/nginx-amplify-agent.py {
|
||||
|
||||
^null-/bin/dash {
|
||||
network inet stream,
|
||||
network (receive, send) inet stream ip=192.168.10.3 port=50758 peer=(ip=54.153.70.241 port=443),
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1657,11 +1657,15 @@ def collapse_log(hashlog, ignore_null_profiles=True):
|
|||
log_dict[aamode][final_name]['dbus'].add(dbus_event)
|
||||
|
||||
nd = hashlog[aamode][full_profile]['network']
|
||||
for family in nd.keys():
|
||||
for sock_type in nd[family].keys():
|
||||
net_event = NetworkRule(family, sock_type, log_event=True)
|
||||
if not hat_exists or not is_known_rule(aa[profile][hat], 'network', net_event):
|
||||
log_dict[aamode][final_name]['network'].add(net_event)
|
||||
for access in nd.keys():
|
||||
for family in nd[access].keys():
|
||||
for sock_type in nd[access][family].keys():
|
||||
for protocol in nd[access][family][sock_type].keys():
|
||||
for local_event in nd[access][family][sock_type][protocol].keys():
|
||||
for peer_event in nd[access][family][sock_type][protocol][local_event].keys():
|
||||
net_event = NetworkRule(access, family, sock_type, local_event, peer_event, log_event=True)
|
||||
if not hat_exists or not is_known_rule(aa[profile][hat], 'network', net_event):
|
||||
log_dict[aamode][final_name]['network'].add(net_event)
|
||||
|
||||
ptrace = hashlog[aamode][full_profile]['ptrace']
|
||||
for peer in ptrace.keys():
|
||||
|
|
|
@ -125,12 +125,19 @@ class ReadLog:
|
|||
elif ev['operation'] and (ev['operation'] == 'umount'):
|
||||
ev['flags'] = event.flags
|
||||
ev['fs_type'] = event.fs_type
|
||||
elif ev['class'] and ev['class'] == 'net' and ev['family'] and ev['family'] == 'unix':
|
||||
ev['peer'] = event.peer
|
||||
ev['peer_profile'] = event.peer_profile
|
||||
elif ev['class'] and ev['class'] == 'net' or self.op_type(ev) == 'net':
|
||||
ev['accesses'] = event.requested_mask
|
||||
ev['addr'] = event.net_addr
|
||||
ev['peer_addr'] = event.peer_addr
|
||||
ev['port'] = event.net_local_port or None
|
||||
ev['remote_port'] = event.net_foreign_port or None
|
||||
if ev['family'] and ev['family'] == 'unix':
|
||||
ev['addr'] = event.net_addr
|
||||
ev['peer_addr'] = event.peer_addr
|
||||
ev['peer'] = event.peer
|
||||
ev['peer_profile'] = event.peer_profile
|
||||
else:
|
||||
ev['addr'] = event.net_local_addr
|
||||
ev['peer_addr'] = event.net_foreign_addr
|
||||
|
||||
elif ev['operation'] and ev['operation'].startswith('dbus_'):
|
||||
ev['peer_profile'] = event.peer_profile
|
||||
ev['bus'] = event.dbus_bus
|
||||
|
@ -277,7 +284,9 @@ class ReadLog:
|
|||
return
|
||||
|
||||
elif self.op_type(e) == 'net':
|
||||
self.hashlog[aamode][full_profile]['network'][e['family']][e['sock_type']][e['protocol']] = True
|
||||
local = (e['addr'], e['port'])
|
||||
peer = (e['peer_addr'], e['remote_port'])
|
||||
self.hashlog[aamode][full_profile]['network'][e['accesses']][e['family']][e['sock_type']][e['protocol']][local][peer] = True
|
||||
return
|
||||
|
||||
elif e['operation'] == 'change_hat':
|
||||
|
@ -397,7 +406,7 @@ class ReadLog:
|
|||
if event['family'] and event['protocol'] and event['sock_type']:
|
||||
# 'unix' events also use keywords like 'connect', but protocol is 0 and should therefore be filtered out
|
||||
return 'net'
|
||||
elif event['denied_mask']:
|
||||
elif event['denied_mask'] or event['operation'] == 'file_lock':
|
||||
return 'file'
|
||||
else:
|
||||
raise AppArmorException('unknown file or network event type')
|
||||
|
|
|
@ -561,3 +561,39 @@ def quote_if_needed(data):
|
|||
if ' ' in data:
|
||||
data = '"' + data + '"'
|
||||
return data
|
||||
|
||||
|
||||
def check_dict_keys(d, possible_keys, type_all):
|
||||
"""Check that all keys in dictionary are among possible keys"""
|
||||
if d == type_all or d == {}:
|
||||
return type_all
|
||||
if not possible_keys >= d.keys():
|
||||
raise AppArmorException(f'Incorrect key in dict {d}. Possible keys are {possible_keys},')
|
||||
return d
|
||||
|
||||
|
||||
def initialize_cond_dict(d, keys, suffix, type_all):
|
||||
out = {
|
||||
key: d[f'{key}{suffix}']
|
||||
for key in keys
|
||||
if f'{key}{suffix}' in d and d[f'{key}{suffix}'] is not None
|
||||
}
|
||||
return out if out != {} else type_all
|
||||
|
||||
|
||||
def tuple_to_dict(t, keys):
|
||||
d = {}
|
||||
for idx, k in enumerate(keys):
|
||||
if t[idx] is not None:
|
||||
d[k] = t[idx]
|
||||
return d
|
||||
|
||||
|
||||
def print_dict_values(d, type_all, prefix=None):
|
||||
if d == type_all:
|
||||
return ''
|
||||
to_print = ' '.join(f'{k}={v}' for k, v in d.items())
|
||||
if prefix:
|
||||
return f' {prefix}=({to_print})'
|
||||
else:
|
||||
return f' {to_print}'
|
||||
|
|
|
@ -16,9 +16,13 @@
|
|||
import re
|
||||
|
||||
from apparmor.common import AppArmorBug, AppArmorException
|
||||
from apparmor.regex import RE_PROFILE_NETWORK
|
||||
from apparmor.rule import BaseRule, BaseRuleset, logprof_value_or_all, parse_modifiers
|
||||
from apparmor.regex import RE_PROFILE_NETWORK, strip_parenthesis
|
||||
from apparmor.rule import BaseRule, BaseRuleset, logprof_value_or_all, parse_modifiers, check_dict_keys, \
|
||||
check_and_split_list, initialize_cond_dict, print_dict_values, tuple_to_dict
|
||||
from apparmor.translations import init_translation
|
||||
from apparmor.rule.unix import unix_accesses as network_accesses
|
||||
from apparmor.rule.unix import access_flags
|
||||
import ipaddress
|
||||
|
||||
_ = init_translation()
|
||||
|
||||
|
@ -32,15 +36,46 @@ network_type_keywords = ['stream', 'dgram', 'seqpacket', 'rdm', 'raw', 'packet']
|
|||
network_protocol_keywords = ['tcp', 'udp', 'icmp']
|
||||
|
||||
|
||||
|
||||
byte = r"(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)"
|
||||
network_ipv4 = f"{byte}\.{byte}\.{byte}\.{byte}"
|
||||
|
||||
network_ipv6 = (
|
||||
r'('
|
||||
r'([0-9a-f]{1,4}:){7}[0-9a-f]{1,4}|'
|
||||
r'([0-9a-f]{1,4}:){1,7}:|'
|
||||
r'([0-9a-f]{1,4}:){1,6}:[0-9a-f]{1,4}|'
|
||||
r'([0-9a-f]{1,4}:){1,5}(:[0-9a-f]{1,4}){1,2}|'
|
||||
r'([0-9a-f]{1,4}:){1,4}(:[0-9a-f]{1,4}){1,3}|'
|
||||
r'([0-9a-f]{1,4}:){1,3}(:[0-9a-f]{1,4}){1,4}|'
|
||||
r'([0-9a-f]{1,4}:){1,2}(:[0-9a-f]{1,4}){1,5}|'
|
||||
r'[0-9a-f]{1,4}:((:[0-9a-f]{1,4}){1,6})|'
|
||||
r':((:[0-9a-f]{1,4}){1,7}|:)|'
|
||||
r'fe80:(:[0-9a-f]{0,4}){0,4}%%[0-9a-zA-Z]{1,}|'
|
||||
r'::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|'
|
||||
r'([0-9a-f]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])'
|
||||
r')(%%[0-9a-zA-Z]{1,})?'
|
||||
)
|
||||
|
||||
network_port = r'(port\s*=\s*(?P<%s>\d+))\s*'
|
||||
ip_cond = fr'\s*ip\s*=\s*(?P<%s>(({network_ipv4})|({network_ipv6})|none))\s*'
|
||||
|
||||
RE_LOCAL_EXPR = f'((({ip_cond % "ip" })|({network_port % "port"}))*)'
|
||||
RE_PEER_EXPR = fr'(peer\s*=\s*\(\s*(({ip_cond % "ip_peer"})|({network_port % "port_peer"}))+\s*\))'
|
||||
|
||||
|
||||
RE_NETWORK_DOMAIN = '(' + '|'.join(network_domain_keywords) + ')'
|
||||
RE_NETWORK_TYPE = '(' + '|'.join(network_type_keywords) + ')'
|
||||
RE_NETWORK_PROTOCOL = '(' + '|'.join(network_protocol_keywords) + ')'
|
||||
|
||||
RE_NETWORK_DETAILS = re.compile(
|
||||
r'^\s*'
|
||||
+ '(?P<domain>' + RE_NETWORK_DOMAIN + ')?' # optional domain
|
||||
+ r'(\s+(?P<type_or_protocol>' + RE_NETWORK_TYPE + '|' + RE_NETWORK_PROTOCOL + '))?' # optional type or protocol
|
||||
+ r'\s*$')
|
||||
+ r'(\s*' + network_accesses + r')?\s*'
|
||||
+ '(?P<domain>' + RE_NETWORK_DOMAIN + r')?\s*' # optional domain
|
||||
+ r'(\s+(?P<type_or_protocol>' + RE_NETWORK_TYPE + '|' + RE_NETWORK_PROTOCOL + '))?\s*' # optional type or protocol
|
||||
+ '(' + RE_LOCAL_EXPR + r')?\s*'
|
||||
+ '(' + RE_PEER_EXPR + ')?\s*'
|
||||
+ r'$')
|
||||
|
||||
|
||||
class NetworkRule(BaseRule):
|
||||
|
@ -56,12 +91,42 @@ class NetworkRule(BaseRule):
|
|||
rule_name = 'network'
|
||||
_match_re = RE_PROFILE_NETWORK
|
||||
|
||||
def __init__(self, domain, type_or_protocol, audit=False, deny=False, allow_keyword=False,
|
||||
comment='', log_event=None):
|
||||
def __init__(self, accesses, domain, type_or_protocol, local_expr, peer_expr, audit=False, deny=False,
|
||||
allow_keyword=False, comment='', log_event=None):
|
||||
|
||||
|
||||
super().__init__(audit=audit, deny=deny, allow_keyword=allow_keyword,
|
||||
comment=comment, log_event=log_event)
|
||||
|
||||
if type(local_expr) is tuple:
|
||||
if accesses is None:
|
||||
accesses = self.ALL
|
||||
else:
|
||||
accesses = accesses.split()
|
||||
local_expr = tuple_to_dict(local_expr, ['ip', 'port'])
|
||||
peer_expr = tuple_to_dict(peer_expr, ['ip', 'port'])
|
||||
|
||||
self.accesses, self.all_accesses, unknown_items = check_and_split_list(accesses, access_flags, self.ALL, type(self).__name__, 'accesses')
|
||||
|
||||
if unknown_items:
|
||||
raise AppArmorException(f'Invalid access in Network rule: {unknown_items}')
|
||||
|
||||
self.local_expr = check_dict_keys(local_expr, {'ip', 'port'}, self.ALL)
|
||||
self.peer_expr = check_dict_keys(peer_expr, {'ip', 'port'}, self.ALL)
|
||||
|
||||
if self.local_expr != self.ALL and 'port' in self.local_expr and int(self.local_expr['port']) > 65535:
|
||||
raise AppArmorException(f"Invalid port: {self.local_expr['port']}")
|
||||
if self.peer_expr != self.ALL and 'port' in self.peer_expr and int(self.peer_expr['port']) > 65535:
|
||||
raise AppArmorException(f"Invalid remote port: {self.peer_expr['port']}")
|
||||
|
||||
if self.local_expr != self.ALL and 'ip' in self.local_expr and not is_valid_ip(self.local_expr['ip']):
|
||||
raise AppArmorException(f"Invalid ip: {self.local_expr['ip']}")
|
||||
if self.peer_expr != self.ALL and 'ip' in self.peer_expr and not is_valid_ip(self.peer_expr['ip']):
|
||||
raise AppArmorException(f"Invalid ip: {self.peer_expr['ip']}")
|
||||
|
||||
if not self.all_accesses and self.peer_expr != self.ALL and self.accesses & {'create', 'bind', 'listen', 'shutdown', 'getattr', 'setattr', 'getopt', 'setopt'}:
|
||||
raise AppArmorException('Cannot use a peer_expr and an access in {create, bind, listen, shutdown, getattr, setattr, getopt, setopt} simultaneously')
|
||||
|
||||
self.domain = None
|
||||
self.all_domains = False
|
||||
if domain == self.ALL:
|
||||
|
@ -69,6 +134,9 @@ class NetworkRule(BaseRule):
|
|||
elif isinstance(domain, str):
|
||||
if domain in network_domain_keywords:
|
||||
self.domain = domain
|
||||
|
||||
if not self.domain.startswith('inet') and (self.local_expr != self.ALL or self.peer_expr != self.ALL):
|
||||
raise AppArmorException('Cannot use a local expression or a peer expression for non-inet domains')
|
||||
else:
|
||||
raise AppArmorBug('Passed unknown domain to %s: %s' % (type(self).__name__, domain))
|
||||
else:
|
||||
|
@ -88,6 +156,7 @@ class NetworkRule(BaseRule):
|
|||
else:
|
||||
raise AppArmorBug('Passed unknown object to %s: %s' % (type(self).__name__, str(type_or_protocol)))
|
||||
|
||||
|
||||
@classmethod
|
||||
def _create_instance(cls, raw_rule, matches):
|
||||
"""parse raw_rule and return instance of this class"""
|
||||
|
@ -103,20 +172,27 @@ class NetworkRule(BaseRule):
|
|||
if not details:
|
||||
raise AppArmorException(_("Invalid or unknown keywords in 'network %s" % rule_details))
|
||||
|
||||
if details.group('domain'):
|
||||
domain = details.group('domain')
|
||||
else:
|
||||
domain = cls.ALL
|
||||
r = details.groupdict()
|
||||
|
||||
if details.group('type_or_protocol'):
|
||||
type_or_protocol = details.group('type_or_protocol')
|
||||
domain = r['domain'] or cls.ALL
|
||||
type_or_protocol = r['type_or_protocol'] or cls.ALL
|
||||
|
||||
if r['accesses']:
|
||||
accesses = strip_parenthesis(r['accesses']).replace(',', ' ').split()
|
||||
else:
|
||||
type_or_protocol = cls.ALL
|
||||
accesses = cls.ALL
|
||||
|
||||
local_expr = initialize_cond_dict(r, ['ip', 'port'], '', cls.ALL)
|
||||
peer_expr = initialize_cond_dict(r, ['ip', 'port'], '_peer', cls.ALL)
|
||||
|
||||
else:
|
||||
accesses = cls.ALL
|
||||
domain = cls.ALL
|
||||
type_or_protocol = cls.ALL
|
||||
local_expr = cls.ALL
|
||||
peer_expr = cls.ALL
|
||||
|
||||
return cls(domain, type_or_protocol,
|
||||
return cls(accesses, domain, type_or_protocol, local_expr, peer_expr,
|
||||
audit=audit, deny=deny, allow_keyword=allow_keyword, comment=comment)
|
||||
|
||||
def get_clean(self, depth=0):
|
||||
|
@ -124,6 +200,8 @@ class NetworkRule(BaseRule):
|
|||
|
||||
space = ' ' * depth
|
||||
|
||||
accesses = ' (%s)' % (', '.join(sorted(self.accesses))) if not self.all_accesses else ''
|
||||
|
||||
if self.all_domains:
|
||||
domain = ''
|
||||
elif self.domain:
|
||||
|
@ -138,23 +216,54 @@ class NetworkRule(BaseRule):
|
|||
else:
|
||||
raise AppArmorBug('Empty type or protocol in network rule')
|
||||
|
||||
return ('%s%snetwork%s%s,%s' % (space, self.modifiers_str(), domain, type_or_protocol, self.comment))
|
||||
local_expr = print_dict_values(self.local_expr, self.ALL)
|
||||
peer_expr = print_dict_values(self.peer_expr, self.ALL, 'peer')
|
||||
|
||||
return ('%s%snetwork%s%s%s%s%s,%s' % (space, self.modifiers_str(), accesses, domain, type_or_protocol, local_expr, peer_expr, self.comment))
|
||||
|
||||
def _is_covered_localvars(self, other_rule):
|
||||
"""check if other_rule is covered by this rule object"""
|
||||
|
||||
if not self._is_covered_list(self.accesses, self.all_accesses, other_rule.accesses, other_rule.all_accesses, 'accesses'):
|
||||
return False
|
||||
|
||||
if not self._is_covered_plain(self.domain, self.all_domains, other_rule.domain, other_rule.all_domains, 'domain'):
|
||||
return False
|
||||
|
||||
if not self._is_covered_plain(self.type_or_protocol, self.all_type_or_protocols, other_rule.type_or_protocol, other_rule.all_type_or_protocols, 'type or protocol'):
|
||||
return False
|
||||
|
||||
if not self._is_covered_dict(self.local_expr, other_rule.local_expr):
|
||||
return False
|
||||
if not self._is_covered_dict(self.peer_expr, other_rule.peer_expr):
|
||||
return False
|
||||
|
||||
# still here? -> then it is covered
|
||||
return True
|
||||
|
||||
def _is_covered_dict(self, d, other):
|
||||
|
||||
if d is self.ALL:
|
||||
return True
|
||||
elif other is self.ALL:
|
||||
return False
|
||||
|
||||
for it in other:
|
||||
if it not in d:
|
||||
continue # No constraints on this item.
|
||||
else:
|
||||
if not self._is_covered_plain(d[it], False, other[it], False, it):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def _is_equal_localvars(self, rule_obj, strict):
|
||||
"""compare if rule-specific variables are equal"""
|
||||
|
||||
if self.accesses != rule_obj.accesses:
|
||||
return False
|
||||
|
||||
if (self.domain != rule_obj.domain
|
||||
or self.all_domains != rule_obj.all_domains):
|
||||
return False
|
||||
|
@ -163,15 +272,28 @@ class NetworkRule(BaseRule):
|
|||
or self.all_type_or_protocols != rule_obj.all_type_or_protocols):
|
||||
return False
|
||||
|
||||
if self.local_expr != rule_obj.local_expr:
|
||||
return False
|
||||
|
||||
if self.peer_expr != rule_obj.peer_expr:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _logprof_header_localvars(self):
|
||||
accesses = logprof_value_or_all(self.accesses, self.all_accesses)
|
||||
family = logprof_value_or_all(self.domain, self.all_domains) # noqa: E221
|
||||
sock_type = logprof_value_or_all(self.type_or_protocol, self.all_type_or_protocols)
|
||||
|
||||
local_expr = logprof_value_or_all(self.local_expr, self.local_expr == self.ALL)
|
||||
peer_expr = logprof_value_or_all(self.peer_expr, self.peer_expr == self.ALL)
|
||||
|
||||
return (
|
||||
_('Accesses'), accesses,
|
||||
_('Network Family'), family,
|
||||
_('Socket Type'), sock_type,
|
||||
_('Local'), local_expr,
|
||||
_('Peer'), peer_expr,
|
||||
)
|
||||
|
||||
|
||||
|
@ -182,3 +304,13 @@ class NetworkRuleset(BaseRuleset):
|
|||
"""Return the next possible glob. For network rules, that's "network DOMAIN," or "network," (all network)"""
|
||||
# XXX return 'network DOMAIN,' if 'network DOMAIN TYPE_OR_PROTOCOL' was given
|
||||
return 'network,'
|
||||
|
||||
|
||||
def is_valid_ip(ip):
|
||||
if ip == 'none':
|
||||
return True
|
||||
try:
|
||||
ipaddress.ip_address(ip)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
|
|
@ -16,8 +16,8 @@ import re
|
|||
from apparmor.common import AppArmorException
|
||||
|
||||
from apparmor.regex import RE_PROFILE_UNIX, strip_parenthesis
|
||||
from apparmor.rule import AARE
|
||||
from apparmor.rule import BaseRule, BaseRuleset, parse_modifiers, logprof_value_or_all, check_and_split_list
|
||||
from apparmor.rule import (BaseRule, BaseRuleset, parse_modifiers, logprof_value_or_all, check_and_split_list,
|
||||
check_dict_keys, tuple_to_dict, print_dict_values, initialize_cond_dict, AARE)
|
||||
|
||||
from apparmor.translations import init_translation
|
||||
|
||||
|
@ -74,18 +74,18 @@ class UnixRule(BaseRule):
|
|||
|
||||
if type(rule_conds) is tuple: # This comes from the logparser, we convert it to dicts
|
||||
accesses = strip_parenthesis(accesses).replace(',', ' ').split()
|
||||
rule_conds = _tuple_to_dict(rule_conds, ['type', 'protocol'])
|
||||
local_expr = _tuple_to_dict(local_expr, ['addr', 'label', 'attr', 'opt'])
|
||||
peer_expr = _tuple_to_dict(peer_expr, ['addr', 'label'])
|
||||
rule_conds = tuple_to_dict(rule_conds, ['type', 'protocol'])
|
||||
local_expr = tuple_to_dict(local_expr, ['addr', 'label', 'attr', 'opt'])
|
||||
peer_expr = tuple_to_dict(peer_expr, ['addr', 'label'])
|
||||
|
||||
self.accesses, self.all_accesses, unknown_items = check_and_split_list(accesses, access_flags, self.ALL, type(self).__name__, 'accesses')
|
||||
|
||||
if unknown_items:
|
||||
raise AppArmorException(f'Invalid access in Unix rule: {unknown_items}')
|
||||
|
||||
self.rule_conds = _check_dict_keys(rule_conds, {'type', 'protocol'})
|
||||
self.local_expr = _check_dict_keys(local_expr, {'addr', 'label', 'attr', 'opt'})
|
||||
self.peer_expr = _check_dict_keys(peer_expr, {'addr', 'label'})
|
||||
self.rule_conds = check_dict_keys(rule_conds, {'type', 'protocol'}, self.ALL)
|
||||
self.local_expr = check_dict_keys(local_expr, {'addr', 'label', 'attr', 'opt'}, self.ALL)
|
||||
self.peer_expr = check_dict_keys(peer_expr, {'addr', 'label'}, self.ALL)
|
||||
|
||||
if not self.all_accesses and self.peer_expr != self.ALL and self.accesses & {'create', 'bind', 'listen', 'shutdown', 'getattr', 'setattr', 'getopt', 'setopt'}:
|
||||
raise AppArmorException('Cannot use a peer_expr and an access in {create, bind, listen, shutdown, getattr, setattr, getopt, setopt} simultaneously')
|
||||
|
@ -115,9 +115,9 @@ class UnixRule(BaseRule):
|
|||
accesses = cls.ALL
|
||||
|
||||
|
||||
rule_conds = _initialize_cond_dict(r, ['type', 'protocol'], '_cond_set')
|
||||
local_expr = _initialize_cond_dict(r, ['addr', 'label', 'attr', 'opt'], '_cond')
|
||||
peer_expr = _initialize_cond_dict(r, ['addr', 'label'], '_peer_cond')
|
||||
rule_conds = initialize_cond_dict(r, ['type', 'protocol'], '_cond_set', cls.ALL)
|
||||
local_expr = initialize_cond_dict(r, ['addr', 'label', 'attr', 'opt'], '_cond', cls.ALL)
|
||||
peer_expr = initialize_cond_dict(r, ['addr', 'label'], '_peer_cond', cls.ALL)
|
||||
|
||||
else:
|
||||
accesses = cls.ALL
|
||||
|
@ -131,9 +131,9 @@ class UnixRule(BaseRule):
|
|||
space = ' ' * depth
|
||||
|
||||
accesses = ' (%s)' % (', '.join(sorted(self.accesses))) if not self.all_accesses else ''
|
||||
rule_conds = _print_dict_values(self.rule_conds)
|
||||
local_expr = _print_dict_values(self.local_expr)
|
||||
peer_expr = _print_dict_values(self.peer_expr, 'peer')
|
||||
rule_conds = print_dict_values(self.rule_conds, self.ALL)
|
||||
local_expr = print_dict_values(self.local_expr, self.ALL)
|
||||
peer_expr = print_dict_values(self.peer_expr, self.ALL, 'peer')
|
||||
return f'{space}unix{self.modifiers_str()}{accesses}{rule_conds}{local_expr}{peer_expr},{self.comment}'
|
||||
|
||||
def _is_covered_localvars(self, other_rule):
|
||||
|
@ -202,39 +202,5 @@ class UnixRule(BaseRule):
|
|||
|
||||
return True
|
||||
|
||||
|
||||
def _print_dict_values(d, prefix=None):
|
||||
if d == UnixRule.ALL:
|
||||
return ''
|
||||
to_print = ' '.join(f'{k}={v}' for k, v in d.items())
|
||||
if prefix:
|
||||
return f' {prefix}=({to_print})'
|
||||
else:
|
||||
return f' {to_print}'
|
||||
|
||||
|
||||
def _initialize_cond_dict(d, keys, suffix):
|
||||
out = {
|
||||
key: d[f'{key}{suffix}']
|
||||
for key in keys
|
||||
if f'{key}{suffix}' in d and d[f'{key}{suffix}'] is not None
|
||||
}
|
||||
return out if out != {} else UnixRule.ALL
|
||||
|
||||
|
||||
def _check_dict_keys(d, possible_keys):
|
||||
if d == UnixRule.ALL or d == {}:
|
||||
return UnixRule.ALL
|
||||
if not possible_keys >= d.keys():
|
||||
raise AppArmorException(f'Incorrect key in dict {d}. Possible keys are {possible_keys},')
|
||||
return d
|
||||
|
||||
def _tuple_to_dict(t, keys):
|
||||
d = {}
|
||||
for idx, k in enumerate(keys):
|
||||
if t[idx] is not None:
|
||||
d[k] = t[idx]
|
||||
return d
|
||||
|
||||
class UnixRuleset(BaseRuleset):
|
||||
'''Class to handle and store a collection of Unix rules'''
|
||||
|
|
|
@ -18,15 +18,16 @@ from collections import namedtuple
|
|||
|
||||
from apparmor.common import AppArmorBug, AppArmorException, cmd
|
||||
from apparmor.logparser import ReadLog
|
||||
from apparmor.rule.network import NetworkRule, NetworkRuleset, network_domain_keywords
|
||||
from apparmor.rule.network import NetworkRule, NetworkRuleset, network_domain_keywords, network_ipv6
|
||||
from apparmor.translations import init_translation
|
||||
from common_test import AATest, setup_all_loops
|
||||
import re
|
||||
|
||||
_ = init_translation()
|
||||
|
||||
exp = namedtuple(
|
||||
'exp', ('audit', 'allow_keyword', 'deny', 'comment',
|
||||
'domain', 'all_domains', 'type_or_protocol', 'all_type_or_protocols'))
|
||||
'accesses' ,'domain', 'all_domains', 'type_or_protocol', 'all_type_or_protocols', 'local_expr', 'peer_expr'))
|
||||
|
||||
# --- check if the keyword list is up to date --- #
|
||||
|
||||
|
@ -56,6 +57,25 @@ class NetworkKeywordsTest(AATest):
|
|||
'on an newer kernel and will require updating the list of network domain keywords in '
|
||||
'utils/apparmor/rule/network.py')
|
||||
|
||||
class NetworkPV6Test(AATest):
|
||||
def test_ipv6(self):
|
||||
tests = [
|
||||
("2001:0db8:85a3:0000:0000:8a2e:0370:7334", True), # Standard IPv6
|
||||
("2001:db8::8a2e:370:7334", True), # Zero Compression
|
||||
("::1", True), # IPv6 Loopback
|
||||
("::", True), # IPv6 Unspecified
|
||||
("::ffff:192.168.236.159", True), # IPv6-mapped IPv4
|
||||
("fe80::1ff:fe23:4567:890a%eth0", True), # IPv6 with Zone Identifier
|
||||
("1234:5678::abcd:ef12:3456", True), # Mixed groups and zero compression
|
||||
("12345::6789", False), # Erroneous IP (invalid hex group length)
|
||||
("192.168.1.1", False), # IPv4 only
|
||||
]
|
||||
|
||||
for test in tests:
|
||||
self.assertEqual(bool(re.match(network_ipv6, test[0])), test[1])
|
||||
|
||||
|
||||
|
||||
# --- tests for single NetworkRule --- #
|
||||
|
||||
|
||||
|
@ -63,23 +83,30 @@ class NetworkTest(AATest):
|
|||
def _compare_obj(self, obj, expected):
|
||||
self.assertEqual(expected.allow_keyword, obj.allow_keyword)
|
||||
self.assertEqual(expected.audit, obj.audit)
|
||||
self.assertEqual(expected.accesses, obj.accesses)
|
||||
self.assertEqual(expected.domain, obj.domain)
|
||||
self.assertEqual(expected.type_or_protocol, obj.type_or_protocol)
|
||||
self.assertEqual(expected.all_domains, obj.all_domains)
|
||||
self.assertEqual(expected.all_type_or_protocols, obj.all_type_or_protocols)
|
||||
self.assertEqual(expected.deny, obj.deny)
|
||||
self.assertEqual(expected.comment, obj.comment)
|
||||
|
||||
self.assertEqual(expected.local_expr, obj.local_expr)
|
||||
self.assertEqual(expected.peer_expr, obj.peer_expr)
|
||||
|
||||
class NetworkTestParse(NetworkTest):
|
||||
tests = (
|
||||
# rawrule audit allow deny comment domain all? type/proto all?
|
||||
('network,', exp(False, False, False, '', None, True, None, True)),
|
||||
('network inet,', exp(False, False, False, '', 'inet', False, None, True)),
|
||||
('network inet stream,', exp(False, False, False, '', 'inet', False, 'stream', False)),
|
||||
('deny network inet stream, # comment', exp(False, False, True, ' # comment', 'inet', False, 'stream', False)),
|
||||
('audit allow network tcp,', exp(True, True, False, '', None, True, 'tcp', False)),
|
||||
('network stream,', exp(False, False, False, '', None, True, 'stream', False)),
|
||||
# rawrule audit allow deny comment access domain all? type/proto all? local_expr peer_expr
|
||||
('network,', exp(False, False, False, '', None, None, True, None, True, NetworkRule.ALL, NetworkRule.ALL )),
|
||||
('network inet,', exp(False, False, False, '', None, 'inet', False, None, True, NetworkRule.ALL, NetworkRule.ALL )),
|
||||
('network inet stream,', exp(False, False, False, '', None, 'inet', False, 'stream', False, NetworkRule.ALL, NetworkRule.ALL )),
|
||||
('deny network inet stream, # comment', exp(False, False, True, ' # comment', None, 'inet', False, 'stream', False, NetworkRule.ALL, NetworkRule.ALL )),
|
||||
('audit allow network tcp,', exp(True, True, False, '', None, None, True, 'tcp', False, NetworkRule.ALL, NetworkRule.ALL )),
|
||||
('network stream,', exp(False, False, False, '', None, None, True, 'stream', False, NetworkRule.ALL, NetworkRule.ALL )),
|
||||
('network stream peer=(ip=::1 port=22),', exp(False, False, False, '', None, None, True, 'stream', False, NetworkRule.ALL, {"ip": "::1", 'port':'22'}, )),
|
||||
('network stream ip=::1 port=22,', exp(False, False, False, '', None, None, True, 'stream', False, {"ip": "::1", 'port': '22'}, NetworkRule.ALL )),
|
||||
('network (bind,listen) stream,', exp(False, False, False, '', {'listen', 'bind'}, None, True, 'stream', False, NetworkRule.ALL, NetworkRule.ALL )),
|
||||
('network (connect, rw) stream ip=192.168.122.2 port=22 peer=(ip=192.168.122.3 port=22),',
|
||||
exp(False, False, False, '', {'connect', 'rw'}, None, True, 'stream', False, {'ip': '192.168.122.2', 'port': '22'},{"ip": "192.168.122.3", 'port': '22'} )),
|
||||
)
|
||||
|
||||
def _run_test(self, rawrule, expected):
|
||||
|
@ -91,10 +118,18 @@ class NetworkTestParse(NetworkTest):
|
|||
|
||||
class NetworkTestParseInvalid(NetworkTest):
|
||||
tests = (
|
||||
('network foo,', AppArmorException),
|
||||
('network foo bar,', AppArmorException),
|
||||
('network foo tcp,', AppArmorException),
|
||||
('network inet bar,', AppArmorException),
|
||||
('network foo,', AppArmorException),
|
||||
('network foo bar,', AppArmorException),
|
||||
('network foo tcp,', AppArmorException),
|
||||
('network inet bar,', AppArmorException),
|
||||
('network ip=999.999.999.999,', AppArmorException),
|
||||
('network port=99999,', AppArmorException),
|
||||
('network inet ip=in:va::li:d0,', AppArmorException),
|
||||
('network inet ip=in:va::li:d0,', AppArmorException),
|
||||
('network inet ip=1:2:3:4:5:6:7:8:9:0:0:0,', AppArmorException), # too many segments
|
||||
('network inet peer=(ip=1:2:3:4:5:6:7:8:9:0:0:0),', AppArmorException), # too many segments
|
||||
('network packet ip=1::,', AppArmorException), # Only inet[6] domains can be used in conjunction with a local expression
|
||||
('network packet peer=(ip=1::),', AppArmorException), # Only inet[6] domains can be used in conjunction with a peer expression
|
||||
)
|
||||
|
||||
def _run_test(self, rawrule, expected):
|
||||
|
@ -106,7 +141,7 @@ class NetworkTestParseInvalid(NetworkTest):
|
|||
class NetworkTestParseFromLog(NetworkTest):
|
||||
def test_net_from_log(self):
|
||||
parser = ReadLog('', '', '')
|
||||
event = 'type=AVC msg=audit(1428699242.551:386): apparmor="DENIED" operation="create" profile="/bin/ping" pid=10589 comm="ping" family="inet" sock_type="raw" protocol=1'
|
||||
event = 'type=AVC msg=audit(1428699242.551:386): apparmor="DENIED" operation="create" profile="/bin/ping" pid=10589 comm="ping" family="inet" sock_type="raw" protocol=1 lport=1234'
|
||||
|
||||
parsed_event = parser.parse_event(event)
|
||||
|
||||
|
@ -124,6 +159,11 @@ class NetworkTestParseFromLog(NetworkTest):
|
|||
'resource': None,
|
||||
'info': None,
|
||||
'aamode': 'REJECTING',
|
||||
'accesses': None,
|
||||
'addr': None,
|
||||
'peer_addr': None,
|
||||
'port' : 1234,
|
||||
'remote_port': None,
|
||||
'time': 1428699242,
|
||||
'active_hat': None,
|
||||
'pid': 10589,
|
||||
|
@ -134,10 +174,10 @@ class NetworkTestParseFromLog(NetworkTest):
|
|||
'class': None,
|
||||
})
|
||||
|
||||
obj = NetworkRule(parsed_event['family'], parsed_event['sock_type'], log_event=parsed_event)
|
||||
obj = NetworkRule(NetworkRule.ALL, parsed_event['family'], parsed_event['sock_type'], NetworkRule.ALL, NetworkRule.ALL, log_event=parsed_event)
|
||||
|
||||
# audit allow deny comment domain all? type/proto all?
|
||||
expected = exp(False, False, False, '', 'inet', False, 'raw', False)
|
||||
expected = exp(False, False, False, '', None, 'inet', False, 'raw', False, NetworkRule.ALL, NetworkRule.ALL)
|
||||
|
||||
self._compare_obj(obj, expected)
|
||||
|
||||
|
@ -146,13 +186,17 @@ class NetworkTestParseFromLog(NetworkTest):
|
|||
|
||||
class NetworkFromInit(NetworkTest):
|
||||
tests = (
|
||||
# NetworkRule object audit allow deny comment domain all? type/proto all?
|
||||
(NetworkRule('inet', 'raw', deny=True), exp(False, False, True, '', 'inet', False, 'raw', False)),
|
||||
(NetworkRule('inet', 'raw'), exp(False, False, False, '', 'inet', False, 'raw', False)),
|
||||
(NetworkRule('inet', NetworkRule.ALL), exp(False, False, False, '', 'inet', False, None, True)),
|
||||
(NetworkRule(NetworkRule.ALL, NetworkRule.ALL), exp(False, False, False, '', None, True, None, True)),
|
||||
(NetworkRule(NetworkRule.ALL, 'tcp'), exp(False, False, False, '', None, True, 'tcp', False)),
|
||||
(NetworkRule(NetworkRule.ALL, 'stream'), exp(False, False, False, '', None, True, 'stream', False)),
|
||||
# NetworkRule object audit allow deny comment access domain all? type/proto all? Local expr Peer expr
|
||||
(NetworkRule(NetworkRule.ALL, 'inet', 'raw', NetworkRule.ALL, NetworkRule.ALL, deny=True), exp(False, False, True, '', None, 'inet', False, 'raw', False, NetworkRule.ALL, NetworkRule.ALL)),
|
||||
(NetworkRule(NetworkRule.ALL, 'inet', 'raw', NetworkRule.ALL, NetworkRule.ALL), exp(False, False, False, '', None, 'inet', False, 'raw', False, NetworkRule.ALL, NetworkRule.ALL)),
|
||||
(NetworkRule(NetworkRule.ALL, 'inet', NetworkRule.ALL, NetworkRule.ALL, NetworkRule.ALL), exp(False, False, False, '', None, 'inet', False, None, True, NetworkRule.ALL, NetworkRule.ALL)),
|
||||
(NetworkRule(NetworkRule.ALL, NetworkRule.ALL, NetworkRule.ALL, NetworkRule.ALL, NetworkRule.ALL), exp(False, False, False, '', None, None, True, None, True, NetworkRule.ALL, NetworkRule.ALL)),
|
||||
(NetworkRule(NetworkRule.ALL, NetworkRule.ALL, 'tcp', NetworkRule.ALL, NetworkRule.ALL), exp(False, False, False, '', None, None, True, 'tcp', False, NetworkRule.ALL, NetworkRule.ALL)),
|
||||
(NetworkRule(NetworkRule.ALL, NetworkRule.ALL, 'stream', NetworkRule.ALL, NetworkRule.ALL), exp(False, False, False, '', None, None, True, 'stream', False, NetworkRule.ALL, NetworkRule.ALL)),
|
||||
(NetworkRule('bind', NetworkRule.ALL, 'stream', NetworkRule.ALL, NetworkRule.ALL), exp(False, False, False, '', {'bind'}, None, True, 'stream', False, NetworkRule.ALL, NetworkRule.ALL)),
|
||||
(NetworkRule({'bind', 'listen'}, NetworkRule.ALL, 'stream', {'port': '22'}, NetworkRule.ALL), exp(False, False, False, '', {'bind', 'listen'},None, True, 'stream', False, {'port' : '22'}, NetworkRule.ALL)),
|
||||
(NetworkRule(NetworkRule.ALL, NetworkRule.ALL, 'stream', NetworkRule.ALL, {'port': '22'}), exp(False, False, False, '', None, None, True, 'stream', False, NetworkRule.ALL, {'port':'22'})),
|
||||
(NetworkRule(NetworkRule.ALL, NetworkRule.ALL, 'stream', NetworkRule.ALL, {'ip': '::1', 'port':'22'}), exp(False, False, False, '', None, None, True, 'stream', False, NetworkRule.ALL, {'ip': '::1', 'port':'22'})),
|
||||
)
|
||||
|
||||
def _run_test(self, obj, expected):
|
||||
|
@ -161,17 +205,23 @@ class NetworkFromInit(NetworkTest):
|
|||
|
||||
class InvalidNetworkInit(AATest):
|
||||
tests = (
|
||||
# init params expected exception
|
||||
(('inet', ''), AppArmorBug), # empty type_or_protocol
|
||||
(('', 'tcp'), AppArmorBug), # empty domain
|
||||
((' ', 'tcp'), AppArmorBug), # whitespace domain
|
||||
(('inet', ' '), AppArmorBug), # whitespace type_or_protocol
|
||||
(('xyxy', 'tcp'), AppArmorBug), # invalid domain
|
||||
(('inet', 'xyxy'), AppArmorBug), # invalid type_or_protocol
|
||||
((dict(), 'tcp'), AppArmorBug), # wrong type for domain
|
||||
((None, 'tcp'), AppArmorBug), # wrong type for domain
|
||||
(('inet', dict()), AppArmorBug), # wrong type for type_or_protocol
|
||||
(('inet', None), AppArmorBug), # wrong type for type_or_protocol
|
||||
# init params expected exception
|
||||
((NetworkRule.ALL, 'inet', '', NetworkRule.ALL, NetworkRule.ALL), AppArmorBug), # empty type_or_protocol
|
||||
((NetworkRule.ALL, '', 'tcp', NetworkRule.ALL, NetworkRule.ALL), AppArmorBug), # empty domain
|
||||
((NetworkRule.ALL, ' ', 'tcp', NetworkRule.ALL, NetworkRule.ALL), AppArmorBug), # whitespace domain
|
||||
((NetworkRule.ALL, 'inet', ' ', NetworkRule.ALL, NetworkRule.ALL), AppArmorBug), # whitespace type_or_protocol
|
||||
((NetworkRule.ALL, 'xyxy', 'tcp', NetworkRule.ALL, NetworkRule.ALL), AppArmorBug), # invalid domain
|
||||
((NetworkRule.ALL, 'inet', 'xyxy', NetworkRule.ALL, NetworkRule.ALL), AppArmorBug), # invalid type_or_protocol
|
||||
((NetworkRule.ALL, dict(), 'tcp', NetworkRule.ALL, NetworkRule.ALL), AppArmorBug), # wrong type for domain
|
||||
((NetworkRule.ALL, None, 'tcp', NetworkRule.ALL, NetworkRule.ALL), AppArmorBug), # wrong type for domain
|
||||
((NetworkRule.ALL, 'inet', dict(), NetworkRule.ALL, NetworkRule.ALL), AppArmorBug), # wrong type for type_or_protocol
|
||||
((NetworkRule.ALL, 'inet', None, NetworkRule.ALL, NetworkRule.ALL), AppArmorBug), # wrong type for type_or_protocol
|
||||
(('invalid_access', 'inet', None, NetworkRule.ALL, NetworkRule.ALL), AppArmorException), # Invalid Access
|
||||
(({'bind', 'invld'},'inet', None, NetworkRule.ALL, NetworkRule.ALL), AppArmorException), # Invalid Access
|
||||
((NetworkRule.ALL, 'inet', None, {'ip': ':::::'}, NetworkRule.ALL), AppArmorException), # Invalid ip in local expression
|
||||
((NetworkRule.ALL, 'inet', None, NetworkRule.ALL, {'ip': ':::::'}), AppArmorException), # Invalid ip in peer expression
|
||||
((NetworkRule.ALL, 'inet', None, {'invld': '0'}, NetworkRule.ALL), AppArmorException), # Invalid keyword in local expression
|
||||
((NetworkRule.ALL, 'inet', None, NetworkRule.ALL, {'invld': '0'}), AppArmorException), # Invalid keyword in peer expression
|
||||
)
|
||||
|
||||
def _run_test(self, params, expected):
|
||||
|
@ -202,15 +252,16 @@ class InvalidNetworkTest(AATest):
|
|||
def test_invalid_net_non_NetworkRule(self):
|
||||
self._check_invalid_rawrule('dbus,') # not a network rule
|
||||
|
||||
|
||||
def test_empty_net_data_1(self):
|
||||
obj = NetworkRule('inet', 'stream')
|
||||
obj = NetworkRule(NetworkRule.ALL, 'inet', 'stream', NetworkRule.ALL, NetworkRule.ALL)
|
||||
obj.domain = ''
|
||||
# no domain set, and ALL not set
|
||||
with self.assertRaises(AppArmorBug):
|
||||
obj.get_clean(1)
|
||||
|
||||
def test_empty_net_data_2(self):
|
||||
obj = NetworkRule('inet', 'stream')
|
||||
obj = NetworkRule(NetworkRule.ALL, 'inet', 'stream',NetworkRule.ALL, NetworkRule.ALL)
|
||||
obj.type_or_protocol = ''
|
||||
# no type_or_protocol set, and ALL not set
|
||||
with self.assertRaises(AppArmorBug):
|
||||
|
@ -228,16 +279,20 @@ class WriteNetworkTestAATest(AATest):
|
|||
self.assertEqual(rawrule.strip(), raw, 'unexpected raw rule')
|
||||
|
||||
tests = (
|
||||
# raw rule clean rule
|
||||
(' network , # foo ', 'network, # foo'),
|
||||
(' audit network inet,', 'audit network inet,'),
|
||||
(' deny network inet stream,# foo bar', 'deny network inet stream, # foo bar'),
|
||||
(' deny network inet ,# foo bar', 'deny network inet, # foo bar'),
|
||||
(' allow network tcp ,# foo bar', 'allow network tcp, # foo bar'),
|
||||
# raw rule clean rule
|
||||
(' network , # foo ', 'network, # foo'),
|
||||
(' audit network inet,', 'audit network inet,'),
|
||||
(' deny network inet stream,# foo bar', 'deny network inet stream, # foo bar'),
|
||||
(' deny network inet ,# foo bar', 'deny network inet, # foo bar'),
|
||||
(' allow network tcp ,# foo bar', 'allow network tcp, # foo bar'),
|
||||
(' network stream peer = ( ip=::1 port=22 ) ,', 'network stream peer=(ip=::1 port=22),'),
|
||||
(' network ( bind , listen ) stream ip = ::1 port = 22 ,','network (bind, listen) stream ip=::1 port=22,'),
|
||||
(' allow network tcp ,# foo bar', 'allow network tcp, # foo bar'),
|
||||
|
||||
)
|
||||
|
||||
def test_write_manually(self):
|
||||
obj = NetworkRule('inet', 'stream', allow_keyword=True)
|
||||
obj = NetworkRule(NetworkRule.ALL, 'inet', 'stream', NetworkRule.ALL, NetworkRule.ALL, allow_keyword=True)
|
||||
|
||||
expected = ' allow network inet stream,'
|
||||
|
||||
|
@ -348,12 +403,29 @@ class NetworkCoveredTest_05(NetworkCoveredTest):
|
|||
( 'deny network,', (False, False, False, False)),
|
||||
)
|
||||
|
||||
class NetworkCoveredTest_06(NetworkCoveredTest):
|
||||
rule = 'network (rw, connect) port=127 peer=(ip=192.168.122.3),'
|
||||
|
||||
tests = (
|
||||
# rule equal strict equal covered covered exact
|
||||
('network (rw, connect) port=127 peer=(ip=192.168.122.3),', (True, True, True, True)),
|
||||
('network (rw, connect) port=127 ip=192.168.122.2 peer=(ip=192.168.122.3),', (False, False, True, True)),
|
||||
('network (rw, connect) inet port=127 ip=192.168.122.2 peer=(ip=192.168.122.3),', (False, False, True, True)),
|
||||
('network (rw, connect) port=127 ip=192.168.122.2 peer=(ip=192.168.122.3 port=12345),', (False, False, True, True)),
|
||||
('network (rw, connect) inet port=127 ip=192.168.122.2 peer=(ip=192.168.122.3 port=12345),',(False, False, True, True)),
|
||||
('network connect port=12345 ip=192.168.122.2 peer=(ip=192.168.122.3),', (False, False, False, False)),
|
||||
('network (r, connect) port=12345 ip=192.168.122.2 peer=(ip=192.168.122.3),', (False, False, False, False)),
|
||||
('network (r, connect) port=128 peer=(ip=192.168.122.3),', (False, False, False, False)),
|
||||
('network (rw, connect) port=127 peer=(ip=127.0.0.1),', (False, False, False, False)),
|
||||
('network (rw, connect) port=127,', (False, False, False, False)),
|
||||
)
|
||||
|
||||
|
||||
class NetworkCoveredTest_Invalid(AATest):
|
||||
def test_borked_obj_is_covered_1(self):
|
||||
obj = NetworkRule.create_instance('network inet,')
|
||||
|
||||
testobj = NetworkRule('inet', 'stream')
|
||||
testobj = NetworkRule(NetworkRule.ALL, 'inet', 'stream', NetworkRule.ALL, NetworkRule.ALL)
|
||||
testobj.domain = ''
|
||||
|
||||
with self.assertRaises(AppArmorBug):
|
||||
|
@ -362,7 +434,7 @@ class NetworkCoveredTest_Invalid(AATest):
|
|||
def test_borked_obj_is_covered_2(self):
|
||||
obj = NetworkRule.create_instance('network inet,')
|
||||
|
||||
testobj = NetworkRule('inet', 'stream')
|
||||
testobj = NetworkRule( NetworkRule.ALL,'inet', 'stream', NetworkRule.ALL, NetworkRule.ALL)
|
||||
testobj.type_or_protocol = ''
|
||||
|
||||
with self.assertRaises(AppArmorBug):
|
||||
|
@ -391,13 +463,15 @@ class NetworkCoveredTest_Invalid(AATest):
|
|||
|
||||
class NetworkLogprofHeaderTest(AATest):
|
||||
tests = (
|
||||
('network,', [ _('Network Family'), _('ALL'), _('Socket Type'), _('ALL')]),
|
||||
('network inet,', [ _('Network Family'), 'inet', _('Socket Type'), _('ALL')]),
|
||||
('network inet stream,', [ _('Network Family'), 'inet', _('Socket Type'), 'stream']),
|
||||
('deny network,', [_('Qualifier'), 'deny', _('Network Family'), _('ALL'), _('Socket Type'), _('ALL')]),
|
||||
('allow network inet,', [_('Qualifier'), 'allow', _('Network Family'), 'inet', _('Socket Type'), _('ALL')]),
|
||||
('audit network inet stream,', [_('Qualifier'), 'audit', _('Network Family'), 'inet', _('Socket Type'), 'stream']),
|
||||
('audit deny network inet,', [_('Qualifier'), 'audit deny', _('Network Family'), 'inet', _('Socket Type'), _('ALL')]),
|
||||
('network,', [ _('Accesses'), _('ALL'), _('Network Family'), _('ALL'), _('Socket Type'), _('ALL'), _('Local'), _('ALL'), _('Peer'), _('ALL')]),
|
||||
('network inet,', [ _('Accesses'), _('ALL'), _('Network Family'), 'inet', _('Socket Type'), _('ALL'), _('Local'), _('ALL'), _('Peer'), _('ALL')]),
|
||||
('network inet stream,', [ _('Accesses'), _('ALL'), _('Network Family'), 'inet', _('Socket Type'), 'stream', _('Local'), _('ALL'), _('Peer'), _('ALL')]),
|
||||
('deny network,', [_('Qualifier'), 'deny', _('Accesses'), _('ALL'), _('Network Family'), _('ALL'), _('Socket Type'), _('ALL'), _('Local'), _('ALL'), _('Peer'), _('ALL')]),
|
||||
('allow network inet,', [_('Qualifier'), 'allow', _('Accesses'), _('ALL'), _('Network Family'), 'inet', _('Socket Type'), _('ALL'), _('Local'), _('ALL'), _('Peer'), _('ALL')]),
|
||||
('audit network inet stream,', [_('Qualifier'), 'audit', _('Accesses'), _('ALL'), _('Network Family'), 'inet', _('Socket Type'), 'stream', _('Local'), _('ALL'), _('Peer'), _('ALL')]),
|
||||
('audit deny network inet,', [_('Qualifier'), 'audit deny', _('Accesses'), _('ALL'), _('Network Family'), 'inet', _('Socket Type'), _('ALL'), _('Local'), _('ALL'), _('Peer'), _('ALL')]),
|
||||
('network (bind, listen) stream ip=::1 port=22,', [ _('Accesses'), 'bind listen',_('Network Family'), _('ALL'), _('Socket Type'), 'stream', _('Local'), {'ip': '::1', 'port': '22'}, _('Peer'), _('ALL')]),
|
||||
('audit deny network inet peer=(ip=::1),', [_('Qualifier'), 'audit deny', _('Accesses'), _('ALL'), _('Network Family'), 'inet', _('Socket Type'), _('ALL'), _('Local'), _('ALL'), _('Peer'), {'ip': '::1'}]),
|
||||
)
|
||||
|
||||
def _run_test(self, params, expected):
|
||||
|
@ -407,7 +481,7 @@ class NetworkLogprofHeaderTest(AATest):
|
|||
|
||||
class NetworkRuleReprTest(AATest):
|
||||
tests = (
|
||||
(NetworkRule('inet', 'stream'), '<NetworkRule> network inet stream,'),
|
||||
(NetworkRule(NetworkRule.ALL, 'inet', 'stream', NetworkRule.ALL, NetworkRule.ALL), '<NetworkRule> network inet stream,'),
|
||||
(NetworkRule.create_instance(' allow network inet stream, # foo'), '<NetworkRule> allow network inet stream, # foo'),
|
||||
)
|
||||
|
||||
|
@ -506,7 +580,7 @@ class NetworkDeleteTestAATest(AATest):
|
|||
class NetworkRulesetReprTest(AATest):
|
||||
def test_network_ruleset_repr(self):
|
||||
obj = NetworkRuleset()
|
||||
obj.add(NetworkRule('inet', 'stream'))
|
||||
obj.add(NetworkRule(NetworkRule.ALL, 'inet', 'stream', NetworkRule.ALL, NetworkRule.ALL))
|
||||
obj.add(NetworkRule.create_instance(' allow network inet stream, # foo'))
|
||||
|
||||
expected = '<NetworkRuleset>\n network inet stream,\n allow network inet stream, # foo\n</NetworkRuleset>'
|
||||
|
|
|
@ -229,6 +229,8 @@ exception_not_raised = (
|
|||
'xtrans/simple_bad_conflicting_x_6.sd',
|
||||
'xtrans/simple_bad_conflicting_x_8.sd',
|
||||
'xtrans/x-conflict.sd',
|
||||
|
||||
'network/perms/bad_modifier_2.sd',
|
||||
)
|
||||
|
||||
# testcases with lines that don't match any regex and end up as "unknown line"
|
||||
|
@ -445,94 +447,6 @@ syntax_failure = (
|
|||
'vars/vars_simple_assignment_12.sd', # Redefining existing variable @{BAR} ('\' not handled)
|
||||
'bare_include_tests/ok_2.sd', # two #include<...> in one line
|
||||
|
||||
# fine grained net
|
||||
'network/network_ok_8.sd',
|
||||
'network/network_ok_9.sd',
|
||||
'network/network_ok_10.sd',
|
||||
'network/network_ok_11.sd',
|
||||
'network/network_ok_12.sd',
|
||||
'network/network_ok_13.sd',
|
||||
'network/network_ok_14.sd',
|
||||
'network/network_ok_15.sd',
|
||||
'network/network_ok_16.sd',
|
||||
'network/network_ok_17.sd',
|
||||
'network/network_ok_18.sd',
|
||||
'network/network_ok_19.sd',
|
||||
'network/network_ok_20.sd',
|
||||
'network/network_ok_21.sd',
|
||||
'network/network_ok_22.sd',
|
||||
'network/network_ok_23.sd',
|
||||
'network/network_ok_24.sd',
|
||||
'network/network_ok_25.sd',
|
||||
'network/network_ok_26.sd',
|
||||
'network/network_ok_27.sd',
|
||||
'network/network_ok_28.sd',
|
||||
'network/network_ok_29.sd',
|
||||
'network/network_ok_30.sd',
|
||||
'network/network_ok_31.sd',
|
||||
'network/network_ok_32.sd',
|
||||
'network/network_ok_33.sd',
|
||||
'network/network_ok_34.sd',
|
||||
'network/network_ok_35.sd',
|
||||
'network/network_ok_36.sd',
|
||||
'network/network_ok_37.sd',
|
||||
'network/network_ok_38.sd',
|
||||
'network/network_ok_39.sd',
|
||||
'network/network_ok_40.sd',
|
||||
'network/network_ok_41.sd',
|
||||
'network/network_ok_42.sd',
|
||||
'network/network_ok_43.sd',
|
||||
'network/network_ok_44.sd',
|
||||
'network/perms/ok_accept_1.sd',
|
||||
'network/perms/ok_accept_2.sd',
|
||||
'network/perms/ok_attr_1.sd',
|
||||
'network/perms/ok_attr_2.sd',
|
||||
'network/perms/ok_attr_3.sd',
|
||||
'network/perms/ok_attr_4.sd',
|
||||
'network/perms/ok_attr_5.sd',
|
||||
'network/perms/ok_attr_6.sd',
|
||||
'network/perms/ok_attr_7.sd',
|
||||
'network/perms/ok_attr_8.sd',
|
||||
'network/perms/ok_bind_1.sd',
|
||||
'network/perms/ok_bind_2.sd',
|
||||
'network/perms/ok_connect_1.sd',
|
||||
'network/perms/ok_connect_2.sd',
|
||||
'network/perms/ok_create_1.sd',
|
||||
'network/perms/ok_create_2.sd',
|
||||
'network/perms/ok_create_3.sd',
|
||||
'network/perms/ok_create_4.sd',
|
||||
'network/perms/ok_listen_1.sd',
|
||||
'network/perms/ok_listen_2.sd',
|
||||
'network/perms/ok_listen_3.sd',
|
||||
'network/perms/ok_msg_1.sd',
|
||||
'network/perms/ok_msg_2.sd',
|
||||
'network/perms/ok_msg_3.sd',
|
||||
'network/perms/ok_msg_4.sd',
|
||||
'network/perms/ok_msg_5.sd',
|
||||
'network/perms/ok_msg_6.sd',
|
||||
'network/perms/ok_msg_7.sd',
|
||||
'network/perms/ok_msg_8.sd',
|
||||
'network/perms/ok_msg_9.sd',
|
||||
'network/perms/ok_msg_10.sd',
|
||||
'network/perms/ok_msg_12.sd',
|
||||
'network/perms/ok_msg_13.sd',
|
||||
'network/perms/ok_msg_14.sd',
|
||||
'network/perms/ok_msg_15.sd',
|
||||
'network/perms/ok_msg_16.sd',
|
||||
'network/perms/ok_msg_17.sd',
|
||||
'network/perms/ok_msg_18.sd',
|
||||
'network/perms/ok_msg_19.sd',
|
||||
'network/perms/ok_msg_20.sd',
|
||||
'network/perms/ok_opt_1.sd',
|
||||
'network/perms/ok_opt_2.sd',
|
||||
'network/perms/ok_opt_3.sd',
|
||||
'network/perms/ok_opt_4.sd',
|
||||
'network/perms/ok_opt_5.sd',
|
||||
'network/perms/ok_opt_6.sd',
|
||||
'network/perms/ok_opt_7.sd',
|
||||
'network/perms/ok_shutdown_1.sd',
|
||||
'network/perms/ok_shutdown_2.sd',
|
||||
'network/perms/ok_shutdown_3.sd',
|
||||
)
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue