utils: use capability rule class in aa.py and cleanprof.py

This patch integrated the new capability rule class into aa.py and
cleanprof.py.

Patch changes:
  v6:
      - fix logic around same_file in cleanprofile.py that was causing
        capabilities to be deleted when they weren't covered by an
        abstraction.
  v5:
      - merge my changes into Christian's original patches
      - use CapabilityRule.parse() for parsing raw capability rules and
	getting a CapabilityRule instance back
      - cope with move of parse_modifiers back into rule/__init__.py.

Originally-by: Christian Boltz <apparmor@cboltz.de>
Signed-off-by: Steve Beattie <steve@nxnw.org>
Acked-by: Christian Boltz <apparmor@cboltz.de>
This commit is contained in:
Christian Boltz 2014-12-16 14:13:25 -08:00 committed by Steve Beattie
parent c55a466dc9
commit e259b2d652
2 changed files with 68 additions and 135 deletions

View file

@ -52,6 +52,9 @@ from apparmor.regex import (RE_PROFILE_START, RE_PROFILE_END, RE_PROFILE_CAP, RE
import apparmor.rules as aarules
from apparmor.rule.capability import CapabilityRuleset, CapabilityRule
from apparmor.rule import parse_modifiers
from apparmor.yasti import SendDataToYast, GetDataFromYast, shutdown_yast
# setup module translations
@ -89,13 +92,19 @@ seen_events = 0 # was our
# To store the globs entered by users so they can be provided again
user_globs = []
# The key for representing bare rules such as "capability," or "file,"
# The key for representing bare "file," rules
ALL = '\0ALL'
## Variables used under logprof
### Were our
t = hasher() # dict()
transitions = hasher()
# keys used in aa[profile][hat]:
# a) rules (as dict): alias, change_profile, include, lvar, rlimit
# b) rules (as hasher): allow, deny
# c) one for each rule class
# d) other: declared, external, flags, name, profile
aa = hasher() # Profiles originally in sd, replace by aa
original_aa = hasher()
extras = hasher() # Inactive profiles from extras
@ -1546,13 +1555,14 @@ def ask_the_questions():
for hat in hats:
for capability in sorted(log_dict[aamode][profile][hat]['capability'].keys()):
# skip if capability already in profile
if profile_known_capability(aa[profile][hat], capability):
capability_obj = CapabilityRule(capability)
if is_known_rule(aa[profile][hat], 'capability', capability_obj):
continue
# Load variables? Don't think so.
severity = sev_db.rank('CAP_%s' % capability)
default_option = 1
options = []
newincludes = match_cap_includes(aa[profile][hat], capability)
newincludes = match_includes(aa[profile][hat], 'capability', capability_obj)
q = aaui.PromptQuestion()
if newincludes:
@ -1625,16 +1635,18 @@ def ask_the_questions():
if deleted:
aaui.UI_Info(_('Deleted %s previous matching profile entries.') % deleted)
aa[profile][hat]['allow']['capability'][capability]['set'] = True
aa[profile][hat]['allow']['capability'][capability]['audit'] = audit_toggle
else:
capability_obj = CapabilityRule(capability, audit=audit)
aa[profile][hat]['capability'].add(capability_obj)
aaui.UI_Info(_('Adding capability %s to profile.') % capability)
changed[profile] = True
aaui.UI_Info(_('Adding capability %s to profile.') % capability)
done = True
elif ans == 'CMD_DENY':
aa[profile][hat]['deny']['capability'][capability]['set'] = True
capability_obj = CapabilityRule(capability, audit=audit, deny=True)
aa[profile][hat]['capability'].add(capability_obj)
changed[profile] = True
aaui.UI_Info(_('Denying capability %s to profile.') % capability)
@ -2119,20 +2131,6 @@ def delete_net_duplicates(netrules, incnetrules):
deleted += 1
return deleted
def delete_cap_duplicates(profilecaps, inccaps):
deleted = []
if profilecaps and inccaps:
for capname in profilecaps.keys():
# XXX The presence of a bare capability rule ("capability,") should
# cause more specific capability rules
# ("capability audit_control,") to be deleted
if inccaps[capname].get('set', False) == 1:
deleted.append(capname)
for capname in deleted:
profilecaps.pop(capname)
return len(deleted)
def delete_path_duplicates(profile, incname, allow):
deleted = []
for entry in profile[allow]['path'].keys():
@ -2159,9 +2157,7 @@ def delete_duplicates(profile, incname):
deleted += delete_net_duplicates(profile['deny']['netdomain'], include[incname][incname]['deny']['netdomain'])
deleted += delete_cap_duplicates(profile['allow']['capability'], include[incname][incname]['allow']['capability'])
deleted += delete_cap_duplicates(profile['deny']['capability'], include[incname][incname]['deny']['capability'])
deleted += profile['capability'].delete_duplicates(include[incname][incname]['capability'])
deleted += delete_path_duplicates(profile, incname, 'allow')
deleted += delete_path_duplicates(profile, incname, 'deny')
@ -2171,9 +2167,7 @@ def delete_duplicates(profile, incname):
deleted += delete_net_duplicates(profile['deny']['netdomain'], filelist[incname][incname]['deny']['netdomain'])
deleted += delete_cap_duplicates(profile['allow']['capability'], filelist[incname][incname]['allow']['capability'])
deleted += delete_cap_duplicates(profile['deny']['capability'], filelist[incname][incname]['deny']['capability'])
deleted += profile['capability'].delete_duplicates(filelist[incname][incname]['capability'])
deleted += delete_path_duplicates(profile, incname, 'allow')
deleted += delete_path_duplicates(profile, incname, 'deny')
@ -2201,10 +2195,16 @@ def match_net_include(incname, family, type):
return False
def match_cap_includes(profile, cap):
def match_cap_includes(profile, capability):
# still used by aa-mergeprof
capability_obj = CapabilityRule(capability)
return match_includes(profile, 'capability', capability_obj)
def match_includes(profile, rule_type, rule_obj):
newincludes = []
for incname in include.keys():
if valid_include(profile, incname) and include[incname][incname]['allow']['capability'][cap].get('set', False) == 1:
# XXX type check should go away once we init all profiles correctly
if valid_include(profile, incname) and include[incname][incname].get(rule_type, False) and include[incname][incname][rule_type].is_covered(rule_obj):
newincludes.append(incname)
return newincludes
@ -2512,10 +2512,11 @@ def collapse_log():
log_dict[aamode][profile][hat]['path'][path] = mode
for capability in prelog[aamode][profile][hat]['capability'].keys():
for cap in prelog[aamode][profile][hat]['capability'].keys():
# If capability not already in profile
if not aa[profile][hat]['allow']['capability'][capability].get('set', False):
log_dict[aamode][profile][hat]['capability'][capability] = True
# XXX remove first check when we have proper profile initialisation
if aa[profile][hat].get('capability', False) and not aa[profile][hat]['capability'].is_covered(CapabilityRule(cap)):
log_dict[aamode][profile][hat]['capability'][cap] = True
nd = prelog[aamode][profile][hat]['netdomain']
for family in nd.keys():
@ -2702,6 +2703,10 @@ def parse_profile_data(data, file, do_include):
profile_data[profile][profile]['repo']['url'] = repo_data['url']
profile_data[profile][profile]['repo']['user'] = repo_data['user']
# init rule classes (if not done yet)
if not profile_data[profile][hat].get('capability', False):
profile_data[profile][hat]['capability'] = CapabilityRuleset()
elif RE_PROFILE_END.search(line):
# If profile ends and we're not in one
if not profile:
@ -2717,21 +2722,14 @@ def parse_profile_data(data, file, do_include):
initial_comment = ''
elif RE_PROFILE_CAP.search(line):
matches = RE_PROFILE_CAP.search(line)
if not profile:
raise AppArmorException(_('Syntax Error: Unexpected capability entry found in file: %(file)s line: %(line)s') % { 'file': file, 'line': lineno + 1 })
audit, allow, allow_keyword, comment = parse_audit_allow(matches)
# TODO: honor allow_keyword and comment
# init rule class (if not done yet)
if not profile_data[profile][hat].get('capability', False):
profile_data[profile][hat]['capability'] = CapabilityRuleset()
capability = ALL
if matches.group('capability'):
capability = matches.group('capability').strip()
# TODO: can contain more than one capability- split it?
profile_data[profile][hat][allow]['capability'][capability]['set'] = True
profile_data[profile][hat][allow]['capability'][capability]['audit'] = audit
profile_data[profile][hat]['capability'].add(CapabilityRule.parse(line))
elif RE_PROFILE_LINK.search(line):
matches = RE_PROFILE_LINK.search(line).groups()
@ -2840,7 +2838,7 @@ def parse_profile_data(data, file, do_include):
if not profile:
raise AppArmorException(_('Syntax Error: Unexpected bare file rule found in file: %(file)s line: %(line)s') % { 'file': file, 'line': lineno + 1 })
audit, allow, allow_keyword, comment = parse_audit_allow(matches)
audit, allow, allow_keyword, comment = parse_modifiers(matches)
# TODO: honor allow_keyword and comment
mode = apparmor.aamode.AA_BARE_FILE_MODE
@ -3179,26 +3177,6 @@ def parse_profile_data(data, file, do_include):
return profile_data
def parse_audit_allow(matches):
audit = False
if matches.group('audit'):
audit = True
allow = 'allow'
allow_keyword = False
if matches.group('allow'):
allow = matches.group('allow').strip()
allow_keyword = True
if allow != 'allow' and allow != 'deny': # should never happen
raise AppArmorException(_("Invalid allow/deny keyword %s" % allow))
comment = ''
if matches.group('comment'):
# include a space so that we don't need to add it everywhere when writing the rule
comment = ' %s' % matches.group('comment')
return (audit, allow, allow_keyword, comment)
# RE_DBUS_ENTRY = re.compile('^dbus\s*()?,\s*$')
# use stuff like '(?P<action>(send|write|w|receive|read|r|rw))'
@ -3371,29 +3349,10 @@ def var_transform(ref):
def write_list_vars(prof_data, depth):
return write_pair(prof_data, depth, '', 'lvar', '', ' = ', '', var_transform)
def write_cap_rules(prof_data, depth, allow):
pre = ' ' * depth
data = []
allowstr = set_allow_str(allow)
if prof_data[allow].get('capability', False):
for cap in sorted(prof_data[allow]['capability'].keys()):
audit = ''
if prof_data[allow]['capability'][cap].get('audit', False):
audit = 'audit '
if prof_data[allow]['capability'][cap].get('set', False):
if cap == ALL:
data.append('%s%s%scapability,' % (pre, audit, allowstr))
else:
data.append('%s%s%scapability %s,' % (pre, audit, allowstr, cap))
data.append('')
return data
def write_capabilities(prof_data, depth):
#data = write_single(prof_data, depth, '', 'set_capability', 'set capability ', ',')
data = write_cap_rules(prof_data, depth, 'deny')
data += write_cap_rules(prof_data, depth, 'allow')
data = []
if prof_data.get('capability', False):
data = prof_data['capability'].get_clean(depth)
return data
def write_net_rules(prof_data, depth, allow):
@ -3823,10 +3782,14 @@ def serialize_profile_from_old_profile(profile_data, name, options):
depth = len(line) - len(line.lstrip())
data += write_methods[segs](prof_data, int(depth / 2))
segments[segs] = False
# delete rules from prof_data to avoid duplication (they are in data now)
if prof_data['allow'].get(segs, False):
prof_data['allow'].pop(segs)
if prof_data['deny'].get(segs, False):
prof_data['deny'].pop(segs)
if prof_data.get(segs, False):
t = type(prof_data[segs])
prof_data[segs] = t()
return data
#data.append('reading prof')
@ -3887,16 +3850,20 @@ def serialize_profile_from_old_profile(profile_data, name, options):
if profile:
depth = int(len(line) - len(line.lstrip()) / 2) + 1
# first write sections that were modified (and remove them from write_prof_data)
# first write sections that were modified
#for segs in write_methods.keys():
for segs in default_write_order:
if segments[segs]:
data += write_methods[segs](write_prof_data[name], depth)
segments[segs] = False
# delete rules from write_prof_data to avoid duplication (they are in data now)
if write_prof_data[name]['allow'].get(segs, False):
write_prof_data[name]['allow'].pop(segs)
if write_prof_data[name]['deny'].get(segs, False):
write_prof_data[name]['deny'].pop(segs)
if write_prof_data[name].get(segs, False):
t = type(write_prof_data[name][segs])
write_prof_data[name][segs] = t()
# then write everything else
for segs in default_write_order:
@ -3938,34 +3905,13 @@ def serialize_profile_from_old_profile(profile_data, name, options):
profile = None
elif RE_PROFILE_CAP.search(line):
matches = RE_PROFILE_CAP.search(line).groups()
audit = False
if matches[0]:
audit = matches[0]
allow = 'allow'
if matches[1] and matches[1].strip() == 'deny':
allow = 'deny'
capability = ALL
if matches[2]:
capability = matches[2].strip()
if not write_prof_data[hat][allow]['capability'][capability].get('set', False):
correct = False
if not write_prof_data[hat][allow]['capability'][capability].get(audit, False) == audit:
correct = False
if correct:
cap = CapabilityRule.parse(line)
if write_prof_data[hat]['capability'].is_covered(cap, True, True):
if not segments['capability'] and True in segments.values():
data += write_prior_segments(write_prof_data[name], segments, line)
segments['capability'] = True
write_prof_data[hat][allow]['capability'].pop(capability)
write_prof_data[hat]['capability'].delete(cap)
data.append(line)
#write_prof_data[hat][allow]['capability'][capability].pop(audit)
#Remove this line
else:
# To-Do
pass
@ -4341,20 +4287,18 @@ def profile_known_exec(profile, typ, exec_target):
return 0
def profile_known_capability(profile, capname):
if profile['deny']['capability'][capname].get('set', False):
return -1
if profile['allow']['capability'][capname].get('set', False):
return 1
def is_known_rule(profile, rule_type, rule_obj):
# XXX get rid of get() checks after we have a proper function to initialize a profile
if profile.get(rule_type, False):
if profile[rule_type].is_covered(rule_obj, False):
return True
for incname in profile['include'].keys():
if include[incname][incname]['deny']['capability'][capname].get('set', False):
return -1
if include[incname][incname]['allow']['capability'][capname].get('set', False):
return 1
if include[incname][incname].get(rule_type, False):
if include[incname][incname][rule_type].is_covered(rule_obj, False):
return True
return 0
return False
def profile_known_network(profile, family, sock_type):
if netrules_access_check(profile['deny']['netdomain'], family, sock_type):

View file

@ -65,8 +65,8 @@ class CleanProf(object):
deleted += apparmor.aa.delete_duplicates(self.other.aa[program][hat], inc)
#Clean the duplicates of caps in other profile
deleted += delete_cap_duplicates(self.profile.aa[program][hat]['allow']['capability'], self.other.aa[program][hat]['allow']['capability'], self.same_file)
deleted += delete_cap_duplicates(self.profile.aa[program][hat]['deny']['capability'], self.other.aa[program][hat]['deny']['capability'], self.same_file)
if not self.same_file:
deleted += self.other.aa[program][hat]['capability'].delete_duplicates(self.profile.aa[program][hat]['capability'])
#Clean the duplicates of path in other profile
deleted += delete_path_duplicates(self.profile.aa[program][hat], self.other.aa[program][hat], 'allow', self.same_file)
@ -108,17 +108,6 @@ def delete_path_duplicates(profile, profile_other, allow, same_profile=True):
return len(deleted)
def delete_cap_duplicates(profilecaps, profilecaps_other, same_profile=True):
deleted = []
if profilecaps and profilecaps_other and not same_profile:
for capname in profilecaps.keys():
if profilecaps_other[capname].get('set', False):
deleted.append(capname)
for capname in deleted:
profilecaps_other.pop(capname)
return len(deleted)
def delete_net_duplicates(netrules, netrules_other, same_profile=True):
deleted = 0
hasher_obj = apparmor.aa.hasher()