mirror of
synced 2025-03-09 02:41:03 +01:00
726 lines
33 KiB
Executable file
726 lines
33 KiB
Executable file
#! /usr/bin/env python
# ----------------------------------------------------------------------
# Copyright (C) 2013 Kshitij Gupta <kgupta8592@gmail.com>
# Copyright (C) 2014-2016 Christian Boltz <apparmor@cboltz.de>
# This program is free software; you can redistribute it and/or
# modify it under the terms of version 2 of the GNU General Public
# License as published by the Free Software Foundation.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# ----------------------------------------------------------------------
import argparse
import re
import os
import apparmor.aa
import apparmor.aamode
import apparmor.severity
import apparmor.cleanprofile as cleanprofile
import apparmor.ui as aaui
from apparmor.aa import (available_buttons, combine_name, delete_duplicates,
get_profile_filename, is_known_rule, match_includes)
from apparmor.common import AppArmorException
from apparmor.regex import re_match_include
# setup exception handling
from apparmor.fail import enable_aa_exception_handler
# setup module translations
from apparmor.translations import init_translation
_ = init_translation()
parser = argparse.ArgumentParser(description=_('Merge the given profiles into /etc/apparmor.d/ (or the directory specified with -d)'))
parser.add_argument('files', nargs='+', type=str, help=_('Profile(s) to merge'))
#parser.add_argument('other', nargs='?', type=str, help=_('other profile'))
parser.add_argument('-d', '--dir', type=str, help=_('path to profiles'))
#parser.add_argument('-a', '--auto', action='store_true', help=_('Automatically merge profiles, exits incase of *x conflicts'))
args = parser.parse_args()
args.other = None
# 2-way merge or 3-way merge based on number of params
merge_mode = 2 #if args.other == None else 3
profiles = [args.files, [args.other]]
profiledir = args.dir
if profiledir:
apparmor.aa.profile_dir = apparmor.aa.get_full_path(profiledir)
if not os.path.isdir(apparmor.aa.profile_dir):
raise AppArmorException(_("%s is not a directory.") %profiledir)
def reset_aa():
apparmor.aa.aa = apparmor.aa.hasher()
apparmor.aa.filelist = apparmor.aa.hasher()
apparmor.aa.include = dict()
apparmor.aa.existing_profiles = apparmor.aa.hasher()
apparmor.aa.original_aa = apparmor.aa.hasher()
def find_profiles_from_files(files):
profile_to_filename = dict()
for file_name in files:
apparmor.aa.read_profile(file_name, True)
for profile_name in apparmor.aa.filelist[file_name]['profiles'].keys():
profile_to_filename[profile_name] = file_name
return profile_to_filename
def find_files_from_profiles(profiles):
profile_to_filename = dict()
for profile_name in profiles:
profile_to_filename[profile_name] = apparmor.aa.get_profile_filename(profile_name)
return profile_to_filename
def main():
profiles_to_merge = set()
base_files, other_files = profiles
base_profile_to_file = find_profiles_from_files(base_files)
profiles_to_merge = profiles_to_merge.union(set(base_profile_to_file.keys()))
other_profile_to_file = dict()
if merge_mode == 3:
other_profile_to_file = find_profiles_from_files(other_files)
user_profile_to_file = find_files_from_profiles(profiles_to_merge)
# print(base_files,"\n",other_files)
# print(base_profile_to_file,"\n",other_profile_to_file,"\n",user_profile_to_file)
# print(profiles_to_merge)
for profile_name in profiles_to_merge:
aaui.UI_Info("\n\n" + _("Merging profile for %s" % profile_name))
user_file = user_profile_to_file[profile_name]
base_file = base_profile_to_file.get(profile_name, None)
other_file = None
if merge_mode == 3:
other_file = other_profile_to_file.get(profile_name, None)
if base_file == None:
if other_file == None:
act([user_file, other_file, None], 2, profile_name)
if other_file == None:
act([user_file, base_file, None], 2, profile_name)
act([user_file, base_file, other_file], 3, profile_name)
def act(files, merge_mode, merging_profile):
mergeprofiles = Merge(files)
#Get rid of common/superfluous stuff
# mergeprofiles.clear_common()
# mergeprofiles.clear_common() temporarily disabled because it crashes,
# see https://bugs.launchpad.net/apparmor/+bug/1382236
# if not args.auto:
if 1 == 1: # workaround to avoid lots of whitespace changes
if merge_mode == 3:
mergeprofiles.ask_the_questions('other', merging_profile)
mergeprofiles.ask_the_questions('base', merging_profile)
q = aaui.PromptQuestion()
q.title = _('Changed Local Profiles')
q.explanation = _('The following local profiles were changed. Would you like to save them?')
q.default = 'CMD_VIEW_CHANGES'
q.options = [merging_profile]
q.selected = 0
ans = ''
arg = None
programs = list(mergeprofiles.user.aa.keys())
program = programs[0]
while ans != 'CMD_SAVE_CHANGES':
ans, arg = q.promptUser()
if ans == 'CMD_SAVE_CHANGES':
elif ans == 'CMD_VIEW_CHANGES':
for program in programs:
apparmor.aa.original_aa[program] = apparmor.aa.deepcopy(apparmor.aa.aa[program])
#oldprofile = apparmor.serialize_profile(apparmor.original_aa[program], program, '')
newprofile = apparmor.aa.serialize_profile(mergeprofiles.user.aa[program], program, '')
apparmor.aa.display_changes_with_comments(mergeprofiles.user.filename, newprofile)
elif ans == 'CMD_IGNORE_ENTRY':
class Merge(object):
def __init__(self, profiles):
user, base, other = profiles
#Read and parse base profile and save profile data, include data from it and reset them
apparmor.aa.read_profile(base, True)
self.base = cleanprofile.Prof(base)
#Read and parse other profile and save profile data, include data from it and reset them
if merge_mode == 3:
apparmor.aa.read_profile(other, True)
self.other = cleanprofile.Prof(other)
#Read and parse user profile
apparmor.aa.read_profile(user, True)
self.user = cleanprofile.Prof(user)
def clear_common(self):
deleted = 0
if merge_mode == 3:
#Remove off the parts in other profile which are common/superfluous from user profile
user_other = cleanprofile.CleanProf(False, self.user, self.other)
deleted += user_other.compare_profiles()
#Remove off the parts in base profile which are common/superfluous from user profile
user_base = cleanprofile.CleanProf(False, self.user, self.base)
deleted += user_base.compare_profiles()
if merge_mode == 3:
#Remove off the parts in other profile which are common/superfluous from base profile
base_other = cleanprofile.CleanProf(False, self.base, self.other)
deleted += base_other.compare_profiles()
def conflict_mode(self, profile, hat, allow, path, mode, new_mode, old_mode):
m = new_mode
o = old_mode
new_mode = apparmor.aa.flatten_mode(new_mode)
old_mode = apparmor.aa.flatten_mode(old_mode)
conflict_modes = set('uUpPcCiIxX')
conflict_x= (old_mode | new_mode) & conflict_modes
if conflict_x:
#We may have conflicting x modes
if conflict_x & set('x'):
if conflict_x & set('X'):
if len(conflict_x) > 1:
q = aaui.PromptQuestion()
q.headers = [_('Path'), path]
q.headers += [_('Select the appropriate mode'), '']
options = []
options.append('%s: %s' %(mode, apparmor.aa.mode_to_str_user(new_mode)))# - (old_mode & conflict_x))))
options.append('%s: %s' %(mode, apparmor.aa.mode_to_str_user(old_mode)))#(old_mode | new_mode) - (new_mode & conflict_x))))
q.options = options
q.functions = ['CMD_ALLOW', 'CMD_ABORT']
done = False
while not done:
ans, selected = q.promptUser()
if ans == 'CMD_ALLOW':
if selected == 0:
self.user.aa[profile][hat][allow]['path'][path][mode] = m#apparmor.aa.owner_flatten_mode(new_mode)#(old_mode | new_mode) - (old_mode & conflict_x)
return m
elif selected == 1:
return o
pass#self.user.aa[profile][hat][allow][path][mode] = (old_mode | new_mode) - (new_mode & conflict_x)
raise AppArmorException(_('Unknown selection'))
done = True
def ask_the_questions(self, other, profile):
aa = self.user.aa # keep references so that the code in this function can use the short name
changed = apparmor.aa.changed # (and be more in sync with aa.py ask_the_questions())
if other == 'other':
other = self.other
other = self.base
#Add the file-wide includes from the other profile to the user profile
done = False
options = []
for inc in other.filelist[other.filename]['include'].keys():
if not inc in self.user.filelist[self.user.filename]['include'].keys():
options.append('#include <%s>' %inc)
default_option = 1
q = aaui.PromptQuestion()
q.options = options
q.selected = default_option - 1
q.headers = [_('File includes'), _('Select the ones you wish to add')]
q.default = 'CMD_ALLOW'
while not done and options:
ans, selected = q.promptUser()
if ans == 'CMD_IGNORE_ENTRY':
done = True
elif ans == 'CMD_ALLOW':
selection = options[selected]
inc = re_match_include(selection)
self.user.filelist[self.user.filename]['include'][inc] = True
aaui.UI_Info(_('Adding %s to the file.') % selection)
elif ans == 'CMD_FINISHED':
sev_db = apparmor.aa.sev_db
if not sev_db:
sev_db = apparmor.severity.Severity(apparmor.aa.CONFDIR + '/severity.db', _('unknown'))
for hat in sorted(other.aa[profile].keys()):
#Add the includes from the other profile to the user profile
done = False
options = []
for inc in other.aa[profile][hat]['include'].keys():
if not inc in aa[profile][hat]['include'].keys():
options.append('#include <%s>' %inc)
default_option = 1
q = aaui.PromptQuestion()
q.options = options
q.selected = default_option - 1
q.headers = [_('File includes'), _('Select the ones you wish to add')]
q.default = 'CMD_ALLOW'
while not done and options:
ans, selected = q.promptUser()
if ans == 'CMD_IGNORE_ENTRY':
done = True
elif ans == 'CMD_ALLOW':
selection = options[selected]
inc = re_match_include(selection)
deleted = apparmor.aa.delete_duplicates(aa[profile][hat], inc)
aa[profile][hat]['include'][inc] = True
aaui.UI_Info(_('Adding %s to the file.') % selection)
if deleted:
aaui.UI_Info(_('Deleted %s previous matching profile entries.') % deleted)
elif ans == 'CMD_FINISHED':
# Process all the path entries.
for allow in ['allow', 'deny']:
for path in sorted(other.aa[profile][hat][allow]['path'].keys()):
#print(path, other.aa[profile][hat][allow]['path'][path])
mode = other.aa[profile][hat][allow]['path'][path]['mode']
if aa[profile][hat][allow]['path'].get(path, False):
mode = self.conflict_mode(profile, hat, allow, path, 'mode', other.aa[profile][hat][allow]['path'][path]['mode'], aa[profile][hat][allow]['path'][path]['mode'])
self.conflict_mode(profile, hat, allow, path, 'audit', other.aa[profile][hat][allow]['path'][path]['audit'], aa[profile][hat][allow]['path'][path]['audit'])
changed[profile] = True
# Lookup modes from profile
allow_mode = set()
allow_audit = set()
deny_mode = set()
deny_audit = set()
fmode, famode, fm = apparmor.aa.rematchfrag(aa[profile][hat], 'allow', path)
if fmode:
allow_mode |= fmode
if famode:
allow_audit |= famode
cm, cam, m = apparmor.aa.rematchfrag(aa[profile][hat], 'deny', path)
if cm:
deny_mode |= cm
if cam:
deny_audit |= cam
imode, iamode, im = apparmor.aa.match_prof_incs_to_path(aa[profile][hat], 'allow', path)
if imode:
allow_mode |= imode
if iamode:
allow_audit |= iamode
cm, cam, m = apparmor.aa.match_prof_incs_to_path(aa[profile][hat], 'deny', path)
if cm:
deny_mode |= cm
if cam:
deny_audit |= cam
if deny_mode & apparmor.aamode.AA_MAY_EXEC:
deny_mode |= apparmor.aamode.ALL_AA_EXEC_TYPE
# Mask off the denied modes
mode = mode - deny_mode
# If we get an exec request from some kindof event that generates 'PERMITTING X'
# check if its already in allow_mode
# if not add ix permission
if mode & apparmor.aamode.AA_MAY_EXEC:
# Remove all type access permission
mode = mode - apparmor.aamode.ALL_AA_EXEC_TYPE
if not allow_mode & apparmor.aamode.AA_MAY_EXEC:
mode |= apparmor.aa.str_to_mode('ix')
if not mode:
matches = []
if fmode:
matches += fm
if imode:
matches += im
if not apparmor.aa.mode_contains(allow_mode, mode):
default_option = 1
options = []
newincludes = []
include_valid = False
for incname in apparmor.aa.include.keys():
include_valid = False
# If already present skip
if aa[profile][hat][incname]:
if incname.startswith(apparmor.aa.profile_dir):
incname = incname.replace(apparmor.aa.profile_dir+'/', '', 1)
include_valid = apparmor.aa.valid_include('', incname)
if not include_valid:
cm, am, m = apparmor.aa.match_include_to_path(incname, 'allow', path)
if cm and apparmor.aa.mode_contains(cm, mode):
dm = apparmor.aa.match_include_to_path(incname, 'deny', path)[0]
# If the mode is denied
if not mode & dm:
if not list(filter(lambda s: '/**' == s, m)):
# Add new includes to the options
if newincludes:
options += list(map(lambda s: '#include <%s>' % s, sorted(set(newincludes))))
# We should have literal the path in options list too
# Add any the globs matching path from logprof
globs = apparmor.aa.glob_common(path)
if globs:
matches += globs
# Add any user entered matching globs
for user_glob in apparmor.aa.user_globs:
if apparmor.aa.matchliteral(user_glob, path):
matches = list(set(matches))
if path in matches:
options += apparmor.aa.order_globs(matches, path)
default_option = len(options)
severity = sev_db.rank(path, apparmor.aa.mode_to_str(mode))
audit_toggle = 0
owner_toggle = 0
if apparmor.aa.cfg['settings']['default_owner_prompt']:
owner_toggle = apparmor.aa.cfg['settings']['default_owner_prompt']
done = False
while not done:
q = aaui.PromptQuestion()
q.headers = [_('Profile'), apparmor.aa.combine_name(profile, hat),
_('Path'), path]
if allow_mode:
mode |= allow_mode
tail = ''
s = ''
prompt_mode = None
if owner_toggle == 0:
prompt_mode = apparmor.aa.flatten_mode(mode)
tail = ' ' + _('(owner permissions off)')
elif owner_toggle == 1:
prompt_mode = mode
elif owner_toggle == 2:
prompt_mode = allow_mode | apparmor.aa.owner_flatten_mode(mode - allow_mode)
tail = ' ' + _('(force new perms to owner)')
prompt_mode = apparmor.aa.owner_flatten_mode(mode)
tail = ' ' + _('(force all rule perms to owner)')
if audit_toggle == 1:
s = apparmor.aa.mode_to_str_user(allow_mode)
if allow_mode:
s += ', '
s += 'audit ' + apparmor.aa.mode_to_str_user(prompt_mode - allow_mode) + tail
elif audit_toggle == 2:
s = 'audit ' + apparmor.aa.mode_to_str_user(prompt_mode) + tail
s = apparmor.aa.mode_to_str_user(prompt_mode) + tail
q.headers += [_('Old Mode'), apparmor.aa.mode_to_str_user(allow_mode),
_('New Mode'), s]
s = ''
tail = ''
prompt_mode = None
if audit_toggle:
s = 'audit'
if owner_toggle == 0:
prompt_mode = apparmor.aa.flatten_mode(mode)
tail = ' ' + _('(owner permissions off)')
elif owner_toggle == 1:
prompt_mode = mode
prompt_mode = apparmor.aa.owner_flatten_mode(mode)
tail = ' ' + _('(force perms to owner)')
s = apparmor.aa.mode_to_str_user(prompt_mode)
q.headers += [_('Mode'), s]
q.headers += [_('Severity'), severity]
q.options = options
q.selected = default_option - 1
q.default = 'CMD_ALLOW'
ans, selected = q.promptUser()
if ans == 'CMD_IGNORE_ENTRY':
done = True
elif ans == 'CMD_FINISHED':
if ans == 'CMD_OTHER':
aaui.UI_Important("Sorry, not implemented yet!")
# audit_toggle, owner_toggle = aaui.UI_ask_mode_toggles(audit_toggle, owner_toggle, allow_mode)
# crashes with
# audit_toggle, owner_toggle = aaui.UI_ask_mode_toggles(audit_toggle, owner_toggle, allow_mode)
# AttributeError: 'module' object has no attribute 'UI_ask_mode_toggles'
elif ans == 'CMD_USER_TOGGLE':
owner_toggle += 1
if not allow_mode and owner_toggle == 2:
owner_toggle += 1
if owner_toggle > 3:
owner_toggle = 0
elif ans == 'CMD_ALLOW':
path = options[selected]
done = True
match = re_match_include(path)
if match:
inc = match
deleted = apparmor.aa.delete_duplicates(aa[profile][hat], inc)
aa[profile][hat]['include'][inc] = True
changed[profile] = True
aaui.UI_Info(_('Adding %s to profile.') % path)
if deleted:
aaui.UI_Info(_('Deleted %s previous matching profile entries.') % deleted)
if aa[profile][hat]['allow']['path'][path].get('mode', False):
mode |= aa[profile][hat]['allow']['path'][path]['mode']
deleted = []
for entry in aa[profile][hat]['allow']['path'].keys():
if path == entry:
if apparmor.aa.matchregexp(path, entry):
if apparmor.aa.mode_contains(mode, aa[profile][hat]['allow']['path'][entry]['mode']):
for entry in deleted:
deleted = len(deleted)
if owner_toggle == 0:
mode = apparmor.aa.flatten_mode(mode)
#elif owner_toggle == 1:
# mode = mode
elif owner_toggle == 2:
mode = allow_mode | apparmor.aa.owner_flatten_mode(mode - allow_mode)
elif owner_toggle == 3:
mode = apparmor.aa.owner_flatten_mode(mode)
if not aa[profile][hat]['allow'].get(path, False):
aa[profile][hat]['allow']['path'][path]['mode'] = aa[profile][hat]['allow']['path'][path].get('mode', set()) | mode
tmpmode = set()
if audit_toggle == 1:
tmpmode = mode - allow_mode
elif audit_toggle == 2:
tmpmode = mode
aa[profile][hat]['allow']['path'][path]['audit'] = aa[profile][hat]['allow']['path'][path].get('audit', set()) | tmpmode
changed[profile] = True
aaui.UI_Info(_('Adding %(path)s %(mode)s to profile') % { 'path': path, 'mode': apparmor.aa.mode_to_str_user(mode) })
if deleted:
aaui.UI_Info(_('Deleted %s previous matching profile entries.') % deleted)
elif ans == 'CMD_DENY':
path = options[selected].strip()
# Add new entry?
aa[profile][hat]['deny']['path'][path]['mode'] = aa[profile][hat]['deny']['path'][path].get('mode', set()) | (mode - allow_mode)
aa[profile][hat]['deny']['path'][path]['audit'] = aa[profile][hat]['deny']['path'][path].get('audit', set())
changed[profile] = True
done = True
elif ans == 'CMD_NEW':
arg = options[selected]
if not re_match_include(arg):
ans = aaui.UI_GetString(_('Enter new path: '), arg)
# if ans:
# if not matchliteral(ans, path):
# ynprompt = _('The specified path does not match this log entry:\n\n Log Entry: %s\n Entered Path: %s\nDo you really want to use this path?') % (path,ans)
# key = aaui.UI_YesNo(ynprompt, 'n')
# if key == 'n':
# continue
default_option = len(options)
elif ans == 'CMD_GLOB':
newpath = options[selected].strip()
if not re_match_include(newpath):
newpath = apparmor.aa.glob_path(newpath)
if newpath not in options:
default_option = len(options)
default_option = options.index(newpath) + 1
elif ans == 'CMD_GLOBEXT':
newpath = options[selected].strip()
if not re_match_include(newpath):
newpath = apparmor.aa.glob_path_withext(newpath)
if newpath not in options:
default_option = len(options)
default_option = options.index(newpath) + 1
elif re.search('\d', ans):
default_option = ans
for ruletype in apparmor.aa.ruletypes:
if other.aa[profile][hat].get(ruletype, False): # needed until we have proper profile initialization
for rule_obj in other.aa[profile][hat][ruletype].rules:
if is_known_rule(aa[profile][hat], ruletype, rule_obj):
default_option = 1
options = []
newincludes = match_includes(aa[profile][hat], ruletype, rule_obj)
q = aaui.PromptQuestion()
if newincludes:
options += list(map(lambda inc: '#include <%s>' % inc, sorted(set(newincludes))))
q.options = options
q.selected = default_option - 1
done = False
while not done:
q.headers = [_('Profile'), combine_name(profile, hat)]
q.headers += rule_obj.logprof_header()
# Load variables into sev_db? Not needed/used for capabilities and network rules.
severity = rule_obj.severity(sev_db)
if severity != sev_db.NOT_IMPLEMENTED:
q.headers += [_('Severity'), severity]
q.functions = available_buttons(rule_obj)
q.default = q.functions[0]
ans, selected = q.promptUser()
if ans == 'CMD_IGNORE_ENTRY':
done = True
elif ans == 'CMD_FINISHED':
elif ans.startswith('CMD_AUDIT'):
if ans == 'CMD_AUDIT_NEW':
rule_obj.audit = True
rule_obj.raw_rule = None
rule_obj.audit = False
rule_obj.raw_rule = None
options[len(options) - 1] = rule_obj.get_clean()
q.options = options
elif ans == 'CMD_ALLOW':
done = True
changed[profile] = True
selection = options[selected]
inc = re_match_include(selection)
if inc:
deleted = delete_duplicates(aa[profile][hat], inc)
aa[profile][hat]['include'][inc] = True
aaui.UI_Info(_('Adding %s to profile.') % selection)
if deleted:
aaui.UI_Info(_('Deleted %s previous matching profile entries.') % deleted)
aaui.UI_Info(_('Adding %s to profile.') % rule_obj.get_clean())
elif ans == 'CMD_DENY':
done = True
changed[profile] = True
rule_obj.deny = True
rule_obj.raw_rule = None # reset raw rule after manually modifying rule_obj
aaui.UI_Info(_('Adding %s to profile.') % rule_obj.get_clean())
done = False
if __name__ == '__main__':