mirror of
synced 2025-03-04 16:35:02 +01:00
Intermediate codebase update and the test cases are still broken
This commit is contained in:
3 changed files with 772 additions and 76 deletions
@ -1,6 +1,7 @@
# No old version logs, only 2.6 + supported
#global variable names corruption
from __future__ import with_statement
@ -9,6 +10,7 @@ import logging
import os
import re
import shutil
import stat
import subprocess
import sys
import time
@ -148,8 +150,6 @@ OPERATION_TYPES = {
'sock_shutdown': 'net'
ARROWS = {'A': 'UP', 'B': 'DOWN', 'C': 'RIGHT', 'D': 'LEFT'}
def on_exit():
"""Shutdowns the logger and records exit if debugging enabled"""
@ -164,16 +164,6 @@ def op_type(operation):
operation_type = OPERATION_TYPES.get(operation, 'unknown')
return operation_type
def getkey():
key = readkey()
if key == '\x1B':
key = readkey()
if key == '[':
key = readkey()
if(ARROWS.get(key, False)):
key = ARROWS[key]
return key
def check_for_LD_XXX(file):
"""Returns True if specified program contains references to LD_PRELOAD or
LD_LIBRARY_PATH to give the Px/Ux code better suggestions"""
@ -484,7 +474,16 @@ def delete_profile(local_prof):
if aa.get(local_prof, False):
def confirm_and_abort():
ans = UI_YesNo(gettext('Are you sure you want to abandon this set of profile changes and exit?'), 'n')
if ans == 'y':
UI_Info(gettext('Abandoning all changes.'))
for prof in created:
def get_profile(prof_name):
profile_data = None
@ -583,7 +582,8 @@ def autodep(bin_name, pname=''):
if not bin_name and pname.startswith('/'):
bin_name = pname
if not repo_cfg and not cfg['repository'].get('url', False):
repo_cfg = read_config('repository.conf')
repo_conf = apparmor.config.Config('shell')
repo_cfg = repo_conf.read_config('repository.conf')
if not repo_cfg.get('repository', False) or repo_cfg['repository']['enabled'] == 'later':
if bin_name:
@ -596,7 +596,7 @@ def autodep(bin_name, pname=''):
if not bin_full:
return None
pname = bin_full
profile_data = get_profile(pname)
# Create a new profile if no existing profile
if not profile_data:
@ -718,7 +718,19 @@ def sync_profile():
if new_profiles:
def fetch_profile_by_id(url, id):
return None, None
def fetch_profiles_by_name(url, distro, user):
return None, None
def fetch_profiles_by_user(url, distro, user):
return None, None
def submit_created_profiles(new_profiles):
#url = cfg['repository']['url']
if new_profiles:
@ -838,23 +850,11 @@ def console_select_and_upload_profiles(title, message, profiles_up):
'information is required to upload profiles to the repository.\n' +
'These changes could not be sent.\n')
def set_profile_local_only(profs):
def set_profiles_local_only(profs):
for p in profs:
aa[profs][profs]['repo']['neversubmit'] = True
def confirm_and_abort():
ans = UI_YesNo('Are you sure you want to abandon this set of profile changes and exit?', 'n')
if ans == 'y':
UI_Info('Abandoning all changes.')
for prof in created:
def confirm_and_finish():
def build_x_functions(default, options, exec_toggle):
ret_list = []
@ -1066,7 +1066,7 @@ def handle_children(profile, hat, root):
combinedaudit = False
## Check return Value Consistency
# Check if path matches any existing regexps in profile
cm, am , m = rematch_frag(aa[profile][hat], 'allow', exec_target)
cm, am , m = rematchfrag(aa[profile][hat], 'allow', exec_target)
if cm:
combinedmode |= cm
if am:
@ -1245,7 +1245,7 @@ def handle_children(profile, hat, root):
exec_mode = str_to_mode('ix')
elif regex_optmode.search(ans):
match = regex_optmode.search(ans).groups()[0]
exec_mode = str_to_match(match)
exec_mode = str_to_mode(match)
px_default = 'n'
px_msg = gettext('Should AppArmor sanitise the environment when\n' +
'switching profiles?\n\n' +
@ -1312,10 +1312,10 @@ def handle_children(profile, hat, root):
changed[profile] = True
if exec_mode & str_to_mode('i'):
if 'perl' in exec_target:
aa[profile][hat]['include']['abstractions/perl'] = True
elif '/bin/bash' in exec_path or '/bin/sh' in exec_path:
aa[profile][hat]['include']['abstractions/bash'] = True
#if 'perl' in exec_target:
# aa[profile][hat]['include']['abstractions/perl'] = True
#elif '/bin/bash' in exec_target or '/bin/sh' in exec_target:
# aa[profile][hat]['include']['abstractions/bash'] = True
hashbang = head(exec_target)
if hashbang.startswith('#!'):
interpreter = hashbang[2:].strip()
@ -1355,9 +1355,9 @@ def handle_children(profile, hat, root):
if ynans == 'y':
helpers[exec_target] = 'enforce'
if to_name:
autodep_base('', exec_target)
autodep('', exec_target)
autodep_base(exec_target, '')
autodep(exec_target, '')
elif ans.startswith('CMD_cx') or ans.startswith('CMD_cix'):
if to_name:
@ -1754,6 +1754,12 @@ def is_repo_profile(profile_data):
def get_repo_user_pass():
# To-Do
def get_preferred_user(repo_url):
# To-Do
def repo_is_enabled():
# To-Do
return False
def update_repo_profile(profile):
# To-Do
@ -1888,13 +1894,13 @@ def ask_the_questions():
deny_mode = 0
deny_audit = 0
fmode, famode, fm = rematch_frag(aa[profile][hat], 'allow', path)
fmode, famode, fm = rematchfrag(aa[profile][hat], 'allow', path)
if fmode:
allow_mode |= fmode
if famode:
allow_audit |= famode
cm, cam, m = rematch_frag(aa[profile][hat], 'deny', path)
cm, cam, m = rematchfrag(aa[profile][hat], 'deny', path)
if cm:
deny_mode |= cm
if cam:
@ -1958,13 +1964,7 @@ def ask_the_questions():
if aa[profile][hat][incname]:
if cfg['settings']['custom_includes']:
for incn in cfg['settings']['custom_includes'].split():
if incn == incname:
include_valid = True
if valid_abstraction(incname):
include_valid = True
include_valid = valid_include(profile, incname)
if not include_valid:
@ -2527,7 +2527,7 @@ def save_profiles():
selected_profiles_ref = yarg['PROFILES']
for profile_name in selected_profiles_ref:
@ -2552,7 +2552,7 @@ def save_profiles():
display_changes(oldprofile, newprofile)
for profile_name in changed_list:
def get_pager():
@ -2646,7 +2646,7 @@ def collapse_log():
combinedmode |= aa[profile][hat]['allow']['path'][path]
# Match path to regexps in profile
combinedmode |= rematch_frag(aa[profile][hat], 'allow', path)
combinedmode |= rematchfrag(aa[profile][hat], 'allow', path)
# Match path from includes
combinedmode |= match_prof_incs_to_path(aa[profile][hat], 'allow', path)
@ -3408,7 +3408,7 @@ def escape(escape):
return '"%s"' % escape
return escape
def write_header(profile_data, depth, name, embedded_hat, write_flags):
def write_header(prof_data, depth, name, embedded_hat, write_flags):
pre = ' ' * depth
data = []
name = quote_if_needed(name)
@ -3416,46 +3416,49 @@ def write_header(profile_data, depth, name, embedded_hat, write_flags):
if (not embedded_hat and re.search('^[^\/]|^"[^\/]', name)) or (embedded_hat and re.search('^[^^]' ,name)):
name = 'profile %s' % name
if write_flags and profile_data['flags']:
data.append('%s%s flags(%s) {' % (pre, name, profile_data['flags']))
if write_flags and prof_data['flags']:
data.append('%s%s flags(%s) {' % (pre, name, prof_data['flags']))
data.append('%s%s {' % (pre, name))
return data
def write_single(profile_data, depth, allow, name, prefix, tail):
def write_single(prof_data, depth, allow, name, prefix, tail):
pre = ' ' * depth
data = []
ref, allow = set_ref_allow(profile_data, allow)
ref, allow = set_ref_allow(prof_data, allow)
if ref.get(name, False):
for key in sorted(re[name].keys()):
qkey = quote_if_needed(key)
data.append(pre + allow + prefix + qkey + tail)
data.append('%s%s%s%s%s' %(pre, allow, prefix, qkey, tail))
if ref[name].keys():
return data
def set_ref_allow(profile_data, allow):
if allow:
if allow == 'deny':
return profile_data[allow], 'deny '
return profile_data[allow], ''
def set_allow_str(allow):
if allow == 'deny':
return 'deny '
return profile_data, ''
return ''
def set_ref_allow(prof_data, allow):
if allow:
return prof_data[allow], set_allow_str(allow)
return prof_data, ''
def write_pair(profile_data, depth, allow, name, prefix, sep, tail, fn):
def write_pair(prof_data, depth, allow, name, prefix, sep, tail, fn):
pre = ' ' * depth
data = []
ref, allow = set_ref_allow(profile_data, allow)
ref, allow = set_ref_allow(prof_data, allow)
if ref.get(name, False):
for key in sorted(re[name].keys()):
value = fn(ref[name][key])#eval('%s(%s)' % (fn, ref[name][key]))
data.append(pre + allow + prefix + key + sep + value)
data.append('%s%s%s%s%s%s' %(pre, allow, prefix, key, sep, value))
if ref[name].keys():
@ -3485,9 +3488,7 @@ def write_list_vars(prof_data, depth):
def write_cap_rules(prof_data, depth, allow):
pre = ' ' * depth
data = []
allowstr = ''
if allow == 'deny':
allowstr = 'deny'
allowstr = set_allow_str(allow)
if prof_data[allow].get('capability', False):
for cap in sorted(prof_data[allow]['capability'].keys()):
@ -3495,7 +3496,7 @@ def write_cap_rules(prof_data, depth, allow):
if prof_data[allow]['capability'][cap].get('audit', False):
audit = 'audit'
if prof_data[allow]['capability'][cap].get('set', False):
data.append(pre + audit + allowstr + 'capability,')
data.append('%s%s%scapability %s,' %(pre, audit, allowstr))
return data
@ -3506,3 +3507,546 @@ def write_capabilities(prof_data, depth):
data += write_cap_rules(prof_data, depth, 'allow')
return data
def write_net_rules(prof_data, depth, allow):
pre = ' ' * depth
data = []
allowstr = set_allow_str(allow)
if prof_data[allow].get('netdomain', False):
if prof_data[allow]['netdomain'].get('rule', False) == 'all':
if prof_data[allow]['netdomain']['audit'].get('all', False):
audit = 'audit '
data.append('%s%snetwork,' %(pre, audit))
for fam in sorted(prof_data[allow]['netdomain']['rule'].keys()):
if prof_data[allow]['netdomain']['rule'][fam] == True:
if prof_data[allow]['netdomain']['audit'][fam]:
audit = 'audit'
data.append('%s%s%snetwork %s' % (pre, audit, allowstr, fam))
for typ in sorted(prof_data[allow]['netdomain']['rule'][fam].keys()):
if prof_data[allow]['netdomain']['audit'][fam].get(typ, False):
audit = 'audit'
data.append('%s%s%snetwork %s %s,' % (pre, audit, allowstr,fam, typ))
if prof_data[allow].get('netdomain', False):
return data
def write_netdomain(prof_data, depth):
data = write_net_rules(prof_data, depth, 'deny')
data += write_net_rules(prof_data, depth, 'allow')
return data
def write_link_rules(prof_data, depth, allow):
pre = ' ' * depth
data = []
allowstr = set_allow_str(allow)
if prof_data[allow].get('link', False):
for path in sorted(prof_data[allow]['link'].keys()):
to_name = prof_data[allow]['link'][path]['to']
subset = ''
if prof_data[allow]['link'][path]['mode'] & AA_LINK_SUBSET:
subset = 'subset'
audit = ''
if prof_data[allow]['link'][path].get('audit', False):
audit = 'audit '
path = quote_if_needed(path)
to_name = quote_if_needed(to_name)
data.append('%s%s%slink %s%s -> %s,' %(pre, audit, allowstr, subset, path, to_name))
return data
def write_links(prof_data, depth):
data = write_link_rules(prof_data, depth, 'deny')
data += write_link_rules(prof_data, depth, 'allow')
return data
def write_path_rules(prof_data, depth, allow):
pre = ' ' * depth
data = []
allowstr = set_allow_str(allow)
if prof_data[allow].get('path', False):
for path in sorted(prof_data[allow]['path'].keys()):
mode = prof_data[allow]['path'][path]['mode']
audit = prof_data[allow]['path'][path]['audit']
tail = ''
if prof_data[allow]['path'][path].get('to', False):
tail = ' -> %s' % prof_data[allow]['path'][path]['to']
user, other = split_mode(mode)
user_audit, other_audit = split_mode(audit)
while user or other:
ownerstr = ''
tmpmode = 0
tmpaudit = False
if user & ~other:
# if no other mode set
ownerstr = 'owner'
tmpmode = user & ~other
tmpaudit = user_audit
user = user & ~tmpmode
if user_audit & ~other_audit & user:
ownerstr = 'owner '
tmpaudit = user_audit & ~other_audit & user
tmpmode = user & tmpaudit
user = user & ~tmpmode
ownerstr = ''
tmpmode = user | other
tmpaudit = user_audit | other_audit
user = user & ~tmpmode
other = other & ~tmpmode
if tmpmode & tmpaudit:
modestr = mode_to_str(tmpmode & tmpaudit)
path = quote_if_needed(path)
data.append('%saudit %s%s%s %s%s,' %(pre, allowstr, ownerstr, path, modestr, tail))
tmpmode = tmpmode & ~tmpaudit
if tmpmode:
modestr = mode_to_str(tmpmode)
path = quote_if_needed(path)
data.append('%s%s%s%s %s%s,' %(pre, allowstr, ownerstr, path, modestr, tail))
return data
def write_paths(prof_data, depth):
data = write_path_rules(prof_data, depth, 'deny')
data += write_path_rules(prof_data, depth, 'allow')
return data
def write_rules(prof_data, depth):
data = write_alias(prof_data, depth)
data += write_list_vars(prof_data, depth)
data += write_includes(prof_data, depth)
data += write_rlimits(prof_data, depth)
data += write_capabilities(prof_data, depth)
data += write_netdomain(prof_data, depth)
data += write_links(prof_data, depth)
data += write_paths(prof_data, depth)
data += write_change_profile(prof_data, depth)
return data
def write_piece(profile_data, depth, name, nhat, write_flags):
pre = ' ' * depth
data = []
wname = None
inhat = False
if name == nhat:
wname = name
wname = name + '//' + nhat
name = nhat
inhat = True
data += write_header(profile_data[name], depth, wname, False, write_flags)
data += write_rules(profile_data[name], depth+1)
pre2 = ' ' * (depth+1)
# External hat declarations
for hat in filter(lambda x: x != name, sorted(profile_data.keys())):
if profile_data[hat].get('declared', False):
data.append('%s^%s,' %(pre2, hat))
if not inhat:
# Embedded hats
for hat in filter(lambda x: x != name, sorted(profile_data.keys())):
if not profile_data[hat]['external'] and not profile_data[hat]['declared']:
if profile_data[hat]['profile']:
data += map(str, write_header(profile_data[hat], depth+1, hat, True, write_flags))
data += map(str, write_header(profile_data[hat], depth+1, '^'+hat, True, write_flags))
data += map(str, write_rules(profile_data[hat], depth+2))
data.append('%s}' %pre2)
data.append('%s}' %pre)
# External hats
for hat in filter(lambda x: x != name, sorted(profile_data.keys())):
if name == nhat and profile_data[hat].get('external', False):
data += map(lambda x: ' %s' %x, write_piece(profile_data, depth-1, name, nhat, write_flags))
data.append(' }')
return data
def serialize_profile(profile_data, name, options):
string = ''
include_metadata = False
include_flags = True
data= []
if options and type(options) == dict:
if options.get('METADATA', False):
include_metadata = True
if options.get('NO_FLAGS', False):
include_flags = False
if include_metadata:
string = '# Last Modified: %s\n' %time.time()
if (profile_data[name].get('repo', False) and profile_data[name]['repo']['url']
and profile_data[name]['repo']['user'] and profile_data[name]['repo']['id']):
repo = profile_data[name]['repo']
string += '# REPOSITORY: %s %s %s\n' %(repo['url'], repo['user'], repo['id'])
elif profile_data[name]['repo']['neversubmit']:
if profile_data[name].get('initial_comment', False):
comment = profile_data[name]['initial_comment']
comment.replace('\\n', '\n')
string += comment + '\n'
filename = get_profile_filename(name)
if filelist.get(filename, False):
data += write_alias(filelist[filename], 0)
data += write_list_vars(filelist[filename], 0)
data += write_includes(filelist[filename], 0)
data += write_piece(profile_data, 0, name, name, include_flags)
string += '\n'.join(data)
return string+'\n'
def write_profile_ui_feedback(profile):
UI_Info(gettext('Writing updated profile for %s.') %profile)
def write_profile(profile):
filename = None
if aa[profile][profile].get('filename', False):
filename = aa[profile][profile]['filename']
filename = get_profile_filename(profile)
newprof = tempfile.NamedTemporaryFile('rw', suffix='~' ,delete=False)
if os.path.exists(filename):
shutil.copymode(filename, newprof.name)
#permission_600 = stat.S_IRUSR | stat.S_IWUSR # Owner read and write
#os.chmod(newprof.name, permission_600)
serialize_options = {}
serialize_options['METADATA'] = True
profile_string = serialize_profile(aa[profile], profile, serialize_options)
os.rename(newprof.name, filename)
original_aa[profile] = deepcopy(aa[profile])
def matchliteral(aa_regexp, literal):
p_regexp = '^'+convert_regexp(aa_regexp)+'$'
match = False
match = re.search(p_regexp, literal)
return None
return match
def profile_known_exec(profile, typ, exec_target):
if typ == 'exec':
cm = None
am = None
m = []
cm, am, m = rematchfrag(profile, 'deny', exec_target)
if cm & AA_MAY_EXEC:
return -1
cm, am, m = match_prof_incs_to_path(profile, 'deny', exec_target)
if cm & AA_MAY_EXEC:
return -1
cm, am, m = rematchfrag(profile, 'allow', exec_target)
if cm & AA_MAY_EXEC:
return 1
cm, am, m = match_prof_incs_to_path(profile, 'allow', exec_target)
if cm & AA_MAY_EXEC:
return 1
return 0
def profile_known_capability(profile, capname):
if profile['deny']['capability'][capname].get('set', False):
return -1
if profile['allow']['capability'][capname].get('set', False):
return 1
for incname in profile['include'].keys():
if include[incname][incname]['deny']['capability'][capname].get('set', False):
return -1
if include[incname][incname]['allow']['capability'][capname].get('set', False):
return 1
return 0
def profile_known_network(profile, family, sock_type):
if netrules_access_check(profile['deny']['netdomain'], family, sock_type):
return -1
if netrules_access_check(profile['allow']['netdomain'], family, sock_type):
return 1
for incname in profile['include'].keys():
if netrules_access_check(include[incname][incname]['deny']['netdomain'], family, sock_type):
return -1
if netrules_access_check(include[incname][incname]['allow']['netdomain'], family, sock_type):
return 1
return 0
def netrules_access_check(netrules, family, sock_type):
if not netrules:
return 0
all_net = False
all_net_family = False
net_family_sock = False
if netrules['rule'].get('all', False):
all_net = True
if netrules['rule'].get(family, False) == True:
all_net_family = True
if (netrules['rule'].get(family, False) and
type(netrules['rule'][family]) == dict and
net_family_sock = True
if all_net or all_net_family or net_family_sock:
return True
return False
def reload_base(bin_path):
if not check_for_apparmor():
return None
filename = get_profile_filename(bin_path)
subprocess.call("cat '%s' | %s -I%s -r >/dev/null 2>&1" %(filename, parser ,profile_dir), shell=True)
def reload(bin_path):
bin_path = find_executable(bin_path)
if not bin:
return None
return reload_base(bin_path)
def get_include_data(filename):
data = []
if os.path.exists(profile_dir + '/' + filename):
with open_file_read(profile_dir + '/' + filename) as f_in:
data = f_in.readlines()
raise AppArmorException('File Not Found: %s' %filename)
return data
def load_include(incname):
load_includeslist = [incname]
if include.get(incname, {}).get(incname, False):
return 0
while load_includeslist:
incfile = load_includeslist.pop(0)
data = get_include_data(incfile)
incdata = parse_profile_data(data, incfile, True)
if incdata:
attach_profile_data(include, incdata)
return 0
def rematchfrag(frag, allow, path):
combinedmode = 0
combinedaudit = 0
matches = []
for entry in frag[allow]['path'].keys():
match = matchliteral(entry, path)
if match:
combinedmode |= frag[allow]['path'][entry]['mode']
combinedaudit |= frag[allow]['path'][entry]['audit']
return combinedmode, combinedaudit, matches
def match_include_to_path(incname, allow, path):
combinedmode = 0
combinedaudit = 0
matches = []
includelist = [incname]
while includelist:
incfile = includelist.pop(0)
ret = load_include(incfile)
cm, am , m = rematchfrag(include[incfile][incfile], allow, path)
if cm:
combinedmode |= cm
combinedaudit |= am
matches += m
if include[incfile][incfile][allow]['path'][path]:
combinedmode |= include[incfile][incfile][allow]['path'][path]['mode']
combinedaudit |= include[incfile][incfile][allow]['path'][path]['audit']
if include[incfile][incfile]['include'].keys():
includelist + include[incfile][incfile]['include'].keys()
return combinedmode, combinedaudit, matches
def match_prof_incs_to_path(frag, allow, path):
combinedmode = 0
combinedaudit = 0
matches = []
includelist = list(frag['include'].keys())
while includelist:
incname = includelist.pop(0)
cm, am, m = match_include_to_path(incname, allow, path)
if cm:
combinedmode |= cm
combinedaudit |= am
matches += m
return combinedmode, combinedaudit, matches
def suggest_incs_for_path(incname, path, allow):
combinedmode = 0
combinedaudit = 0
matches = []
includelist = [incname]
while includelist:
inc = includelist.pop(0)
cm, am , m = rematchfrag(include[inc][inc], 'allow', path)
if cm:
combinedmode |= cm
combinedaudit |= am
matches += m
if include[inc][inc]['allow']['path'].get(path, False):
combinedmode |= include[inc][inc]['allow']['path'][path]['mode']
combinedaudit |= include[inc][inc]['allow']['path'][path]['audit']
if include[inc][inc]['include'].keys():
includelist += include[inc][inc]['include'].keys()
return combinedmode, combinedaudit, matches
def check_qualifiers(program):
if cfg['qualifiers'].get(program, False):
if cfg['qualifiers'][program] != 'p':
fatal_error(gettext('%s is currently marked as a program that should not have its own\n' +
'profile. Usually, programs are marked this way if creating a profile for \n' +
'them is likely to break the rest of the system. If you know what you\'re\n' +
'doing and are certain you want to create a profile for this program, edit\n' +
'the corresponding entry in the [qualifiers] section in /etc/apparmor/logprof.conf.') %program)
def get_subdirectories(current_dir):
"""Returns a list of all directories directly inside given directory"""
if sys.version_info < (3,0):
return os.walk(current_dir).next()[1]
return os.walk(current_dir).__next__()[1]
def loadincludes():
incdirs = get_subdirectories(profile_dir)
for idir in incdirs:
if is_skippable_dir(idir):
for dirpath, dirname, files in os.walk(profile_dir + '/' + idir):
if is_skippable_dir(dirpath):
for fi in files:
if is_skippable_file(fi):
load_include(dirpath + '/' + fi)
def glob_common(path):
globs = []
if re.search('[\d\.]+\.so$', path) or re.search('\.so\.[\d\.]+$', path):
libpath = path
libpath = re.sub('[\d\.]+\.so$', '*.so', libpath)
libpath = re.sub('\.so\.[\d\.]+$', '.so.*', libpath)
if libpath != path:
for glob in cfg['globs']:
if re.search(glob, path):
globbedpath = path
globbedpath = re.sub(glob, cfg['globs'][glob])
if globbedpath != path:
return sorted(set(globs))
def combine_name(name1, name2):
if name1 == name2:
return name1
return '%s^%s' %(name1, name2)
def split_name(name):
names = name.split('^')
if len(names) == 1:
return name, name
return names[0], names[1]
def matchregexp(new, old):
if re.search('\{.*(\,.*)*\}', old):
return None
if re.search('\[.+\]', old) or re.search('\*', old) or re.search('\?', old):
if re.search('\{.*\,.*\}', new):
conf = apparmor.config.Config('ini')
cfg = conf.read_config('logprof.conf')
if cfg['settings'].get('default_owner_prompt', False):
cfg['settings']['default_owner_prompt'] = False
profile_dir = conf.find_first_dir(cfg['settings']['profiledir']) or '/etc/apparmor.d'
if not os.path.isdir(profile_dir):
raise AppArmorException('Can\'t find AppArmor profiles' )
extra_profile_dir = conf.find_first_dir(cfg['settings']['inactive_profiledir']) or '/etc/apparmor/profiles/extras/'
parser = conf.find_first_file(cfg['settings']['parser']) or '/sbin/apparmor_parser'
if not os.path.isfile(parser) or not os.access(parser, os.EX_OK):
raise AppArmorException('Can\'t find apparmor_parser')
filename = conf.find_first_file(cfg['settings']['logfiles']) or '/var/log/syslog'
if not os.path.isfile(filename):
raise AppArmorException('Can\'t find system log.')
ldd = conf.find_first_file(cfg['settings']['ldd']) or '/usr/bin/ldd'
if not os.path.isfile(ldd) or not os.access(ldd, os.EX_OK):
raise AppArmorException('Can\'t find ldd')
logger = conf.find_first_file(cfg['settings']['logger']) or '/bin/logger'
if not os.path.isfile(logger) or not os.access(logger, os.EX_OK):
raise AppArmorException('Can\'t find logger')
@ -4,9 +4,10 @@ import gettext
import locale
import logging
import os
import re
from apparmor.yasti import yastLog, SendDataToYast, GetDataFromYast
from apparmor.common import readkey
from apparmor.common import readkey, AppArmorException
debug_logger = None
@ -30,6 +31,18 @@ def init_localisation():
trans = gettext.NullTranslations()
ARROWS = {'A': 'UP', 'B': 'DOWN', 'C': 'RIGHT', 'D': 'LEFT'}
def getkey():
key = readkey()
if key == '\x1B':
key = readkey()
if key == '[':
key = readkey()
if(ARROWS.get(key, False)):
key = ARROWS[key]
return key
def UI_Info(text):
@ -71,7 +84,7 @@ def UI_YesNo(text, default):
ans = default
'type': 'dialog-yesno',
'question': text
@ -130,7 +143,7 @@ def UI_GetString(text, default):
'label': text,
'default': default
ypath, yarg = GetDatFromYast()
ypath, yarg = GetDataFromYast()
string = yarg['string']
return string
@ -257,10 +270,149 @@ def UI_ShortMessage(title, message):
ypath, yarg = GetDataFromYast()
def UI_longMessage(title, message):
def UI_LongMessage(title, message):
'type': 'long-dialog-message',
'headline': title,
'message': message
ypath, yarg = GetDataFromYast()
def confirm_and_finish():
def Text_PromptUser(question):
title = question['title']
explanation = question['explanation']
headers = question['headers']
functions = question['functions']
default = question['default']
options = question['options']
selected = question.get('selected', False) or 0
helptext = question['helptext']
if helptext:
menu_items = []
keys = dict()
for cmd in functions:
if not CMDS.get(cmd, False):
raise AppArmorException('PromptUser: %s %s' %(gettext('Unknown command'), cmd))
menutext = gettext(CMDS[cmd])
menuhotkey = re.search('\((\S)\)', menutext)
if not menuhotkey:
raise AppArmorException('PromptUser: %s \'%s\'' %(gettext('Invalid hotkey in'), menutext))
key = menuhotkey.groups()[0].lower()
# Duplicate hotkey
if keys.get(key, False):
raise AppArmorException('PromptUser: %s %s: %s' %(gettext('Duplicate hotkey for'), cmd, menutext))
keys[key] = cmd
if default and default == cmd:
menutext = '[%s]' %menutext
default_key = 0
if default and CMDS[default]:
defaulttext = gettext(CMDS[default])
defaulthotkey = re.search('\((\S)\)', defaulttext)
if not menuhotkey:
raise AppArmorException('PromptUser: %s \'%s\'' %(gettext('Invalid hotkey in default item'), defaulttext))
default_key = defaulthotkey.groups()[0].lower()
if keys.get(default_key, False):
raise AppArmorException('PromptUser: %s %s' %(gettext('Invalid default'), default))
widest = 0
header_copy = headers[:]
while header_copy:
header = header_copy.pop(0)
if len(header) > widest:
widest = len(header)
widest += 1
formatstr = '%-' + widest + 's %s\n'
function_regexp = '^('
function_regexp += '|'.join(keys.keys())
if options:
function_regexp += '|\d'
function_regexp += ')$'
while not re.search(function_regexp, ans, flags=re.IGNORECASE):
prompt = '\n'
if title:
prompt += '= %s =\n\n' %title
if headers:
header_copy = headers[:]
while header_copy:
header = header_copy.pop(0)
value = header_copy.pop(0)
prompt += formatstr %(header+':', value)
prompt += '\n'
if explanation:
prompt += explanation + '\n\n'
if options:
for index, option in enumerate(options):
if selected == index:
format_option = ' [%s - %s]'
format_option = ' %s - %s '
prompt += format_option %(index+1, option)
prompt += '\n'
prompt += ' / '.join(menu_items)
ans = readkey().lower()
if ans:
if ans == 'up':
if options and selected > 0:
selected -= 1
elif ans == 'down':
if options and selected < len(options)-2:
selected += 1
elif keys.get(ans, False) == 'CMD_HELP':
sys.stdout.write('\n%s\n' %helptext)
elif int(ans) == 10:
# If they hit return choose default option
ans = default_key
elif options and re.search('^\d$', ans):
ans = int(ans)
if ans > 0 and ans < len(options):
selected = ans - 1
if keys.get(ans, False) == 'CMD_HELP':
sys.stdout.write('\n%s\n' %helptext)
ans = 'again'
if keys.get(ans, False):
ans = keys[ans]
return ans, selected
@ -1,5 +1,5 @@
import re
#import ycp
import ycp
import os
import sys
import logging
Add table
Reference in a new issue