diff --git a/tests/profile_check.py b/tests/profile_check.py new file mode 100644 index 00000000..5cc39d6b --- /dev/null +++ b/tests/profile_check.py @@ -0,0 +1,463 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: GPL-2.0-only + +# KNOWN ISSUES: +# No guards for file type - expects AppArmor +# Diffirent suggestions for single line are mutually exclusive +# Suggestion could point to changed profile name, based on other suggestion + +import sys +import argparse +import pathlib +import shlex +import json +from copy import deepcopy + +def sanitizeProfileName(name): + + if name.startswith('/') or name.startswith('@{'): + name = pathlib.Path(name).stem + + if ' ' in name: + name = re.sub(r'\s+', '-', name) + + return name + +def makeLocalIdentity(nestingStacker_): + + newStacker = [] + for i in nestingStacker_: + i = sanitizeProfileName(i) + newStacker.append(i) + + identity = '_'.join(newStacker) # separate each (sub)profile identity with underscores + + return identity + +def getCurrentProfile(stacker): + + if stacker: + profile = stacker[-1] + else: + profile = None + + return profile + +def handleFileMessages(l, file, profile, lineNum): + + wholeFileAccessProfiles = ( +# '', + ) + suggestOwner = ( # TODO: switch to AARE + r'^@{HOME}', + r'^/home/\w+/', + r'^/run/user/\d+/', + r'^/tmp/', + r'^/var/tmp/', + r'^/dev/shm/', + ) + + lG = l.groupdict() + reason_ = None + if lG.get('path'): + if lG.get('path').startswith('/**') and profile not in wholeFileAccessProfiles: + severity_ = 'ERROR' + reason_ = 'Whole filesystem access is too broad' + suggestion_ = None + + for r in suggestOwner: + if re.match(r, lG.get('path')) and not lG.get('owner'): + indentRe = re.match(r'^\s+', l.group()) + if indentRe: + indent = indentRe.group() + else: + indent = '' + + severity_ = 'NOTICE' + reason_ = "'owner' is likely required" + suggestion_ = indent + 'owner ' + l.group().lstrip() + break + + elif lG.get('bare_file') and profile not in wholeFileAccessProfiles: + severity_ = 'ERROR' + reason_ = 'Whole filesystem access is too broad' + suggestion_ = None + + if reason_: # something matched + msg = ({'filename': file, + 'profile': profile, + 'severity': severity_, + 'line': lineNum, + 'reason': reason_, + 'suggestion': suggestion_}) + else: + msg = None + + return msg + +def readApparmorFile(fullpath): + '''AA file could contain multiple AA profiles''' + headers = ( + '# AppArmor.d - Full set of apparmor profiles', + '# Copyright (C) ', + '# SPDX-License-Identifier: GPL-2.0-only', + ) + + file_data = {} + fileVars = {} + nestingStacker = [] + duplicateProfilesCounter = [] + localExists = {} + localExists_eol = {} + messages = [] + exceptionMsg = None + line = None + gotAbi = False + gotHeaders = {} + gotAttach = False + isAfterProfileStart = False + try: + with open(fullpath, 'r') as f: + for n,line in enumerate(f, start=1): + if isAfterProfileStart: + isAfterProfileStart = False + expectedIndent = len(nestingStacker) * ' ' + indentRe = re.match(r'^\s+', line) + if indentRe: + indent = indentRe.group() + else: + indent = '' + + if indent != expectedIndent: + spacesCount = len(nestingStacker) * 2 + nesingCount = len(nestingStacker) + messages.append({'filename': fullpath, + 'profile': getCurrentProfile(nestingStacker), + 'severity': 'WARNING', + 'line': n, + 'reason': f"Expected {spacesCount} spaces for {nesingCount} nesting", + 'suggestion': f"{expectedIndent}{line}"}) + + if line.endswith(' \n'): + messages.append({'filename': fullpath, + 'profile': getCurrentProfile(nestingStacker), + 'severity': 'WARNING', + 'line': n, + 'reason': "Redundant trailing whitespace", + 'suggestion': line.rstrip()}) + + if '\t' in line: + messages.append({'filename': fullpath, + 'profile': getCurrentProfile(nestingStacker), + 'severity': 'WARNING', + 'line': n, + 'reason': "Tabs are not allowed", + 'suggestion': line.replace('\t', '')}) + + if len(gotHeaders) < 3 and not nestingStacker: + for nH,i in enumerate(headers): + if line.startswith(i): + gotHeaders[nH] = True + + if RE_ABI.search(line): + gotAbi = line + + elif RE_PROFILE_START.search(line) or RE_PROFILE_HAT_DEF.search(line): + isAfterProfileStart = True + m = parse_profile_start_line(line, fullpath) + if m.get('profile'): + nestingStacker.append(m.get('profile')) # set early + + if m.get('attachment') != '@{exec_path}' and not gotAttach: # can be only singular + gotAttach = True + messages.append({'filename': fullpath, + 'profile': getCurrentProfile(nestingStacker), + 'severity': 'WARNING', + 'line': n, + 'reason': "'@{exec_path}' must be defined as main path attachment", + 'suggestion': None}) + + profileMsg = {'filename': fullpath, + 'profile': getCurrentProfile(nestingStacker), + 'severity': 'WARNING', + 'line': n, + 'reason': "A short named profile must be defined", + 'suggestion': None} + if m.get('plainprofile'): + messages.append(profileMsg) + elif m.get('namedprofile'): + if m.get('namedprofile').startswith('/'): + messages.append(profileMsg) + + if m.get('flags'): + m['flags'] = set(shlex.split(m.pop('flags').replace(',', ''))) + if 'complain' in m['flags']: + messages.append({'filename': fullpath, + 'profile': getCurrentProfile(nestingStacker), + 'severity': 'WARNING', + 'line': n, + 'reason': "'complain' flag must be defined in 'dists/flags'", + 'suggestion': None}) + else: + m['flags'] = set() + + if m.get('profile'): + duplicateProfilesCounter.append(m.get('profile')) + profileIdentity = '//'.join(nestingStacker) + file_data[profileIdentity] = m + + elif RE_PROFILE_VARIABLE.search(line): + lineV = RE_PROFILE_VARIABLE.search(line).groups() + + name = strip_quotes(lineV[0]) + operation = lineV[1] + val = separate_vars(lineV[2]) + if fileVars.get(name): + fileVars[name].update(set(val)) + if operation == '=': + messages.append({'filename': fullpath, + 'profile': getCurrentProfile(nestingStacker), + 'severity': 'DEGRADED', + 'line': n, + 'reason': "Tunable must be appended with '+='", + 'suggestion': None}) + else: + fileVars[name] = set(val) + if operation == '+=': + messages.append({'filename': fullpath, + 'profile': getCurrentProfile(nestingStacker), + 'severity': 'DEGRADED', + 'line': n, + 'reason': "Tunable must be defined with '='", + 'suggestion': None}) + + elif RE_INCLUDE.search(line): + if nestingStacker: + profileIdentity = '//'.join(nestingStacker) + localIdentity = makeLocalIdentity(nestingStacker) + localValue = f'include if exists ' # commented out will also match + if localValue in line: + localExists[profileIdentity] = localValue + + # Handle file entries + elif RE_PROFILE_FILE_ENTRY.search(line): + lineF = RE_PROFILE_FILE_ENTRY.search(line) + fileMsg = handleFileMessages(lineF, fullpath, getCurrentProfile(nestingStacker), n) + if fileMsg: + messages.append(fileMsg) + + elif RE_PROFILE_END.search(line): + if getCurrentProfile(nestingStacker): + if not nestingStacker: + messages.append({'filename': fullpath, + 'profile': None, + 'severity': 'DEGRADED', + 'line': n, + 'reason': "Unbalanced parenthesis?", # not fully covered + 'suggestion': None}) + else: + profileIdentity = '//'.join(nestingStacker) + localExists_eol[profileIdentity] = n + del nestingStacker[-1] # remove last + + except PermissionError: + exceptionMsg = 'Unable to read the file (PermissionError)' + + except UnicodeDecodeError: + exceptionMsg = 'Unable to read the file (UnicodeDecodeError)' + + except FileNotFoundError: + exceptionMsg = 'No such file or directory (FileNotFoundError)' + + if exceptionMsg: + messages.append({'filename': fullpath, + 'profile': None, + 'severity': 'NOTICE', + 'line': None, + 'reason': exceptionMsg, + 'suggestion': None}) + + # Ensure proper header is present + if len(gotHeaders) < 3: + combinedHeader = '\n'.join(headers) + messages.append({'filename': fullpath, + 'profile': None, + 'severity': 'WARNING', + 'line': 1, + 'reason': 'No proper header', + 'suggestion': combinedHeader}) + + # Ensure ABI is present + changeAbi = False + abi = 'abi ,' + if gotAbi: + if gotAbi.strip() != abi: + changeAbi = True + else: + changeAbi = True + + if changeAbi: + messages.append({'filename': fullpath, + 'profile': None, + 'severity': 'WARNING', + 'line': None, + 'reason': 'ABI is required', + 'suggestion': abi}) + + # Ensure trailing vim syntax + if line: + trailingSyntax = '# vim:syntax=apparmor' + if line != trailingSyntax: + messages.append({'filename': fullpath, + 'profile': None, + 'severity': 'WARNING', + 'line': None, + 'reason': 'No trailing syntax hint', + 'suggestion': trailingSyntax}) + + # Assign variables to profile attachments as paths and assign filenames + for p,d in deepcopy(file_data).items(): + file_data[p]['filename'] = fullpath + attachment = d.get('attachment') + if attachment: + if attachment.startswith('@{'): + if fileVars.get(attachment): + file_data[p]['attach_paths'] = fileVars[attachment] # incoming set + else: + messages.append({'filename': fullpath, + 'profile': p, + 'severity': 'ERROR', + 'line': None, + 'reason': f"Unknown global variable as profile attachment: {attachment}", + 'suggestion': None}) + + else: + if isinstance(file_data[p].get('attachment'), set): + raise ValueError("Expecting 'str' or 'None', not 'set'") + file_data[p]['attach_paths'] = {file_data[p]['attachment']} + + # Check if profile block does not have corresponding 'local' include + for p,d in file_data.items(): + if not localExists.get(p): # not found previously + if '//' in p: + identity = p.split('//') + else: + identity = [p] + + localIdentity = makeLocalIdentity(identity) + filename = file_data[p]['filename'] + messages.append({'filename': filename, + 'profile': p, + 'severity': 'WARNING', + 'line': localExists_eol.get(p), # None? Unbalanced parenthesis? + 'reason': "The (sub)profile block does not have expected 'local' include", + 'suggestion': f'include if exists '}) + + # Track multiple definitions inside single file + for profile in duplicateProfilesCounter: + counter = duplicateProfilesCounter.count(profile) + if counter >= 2: + messages.append({'filename': fullpath, + 'profile': profile, + 'severity': 'DEGRADED', + 'line': None, + 'reason': "Profile has been defined {counter} times in the same file", + 'suggestion': None}) + + return (messages, file_data) + +def findAllProfileFilenames(profile_dir): + + profiles = set() + for path in pathlib.Path(profile_dir).iterdir(): + if path.is_file() and not is_skippable_file(path): + profiles.add(path.resolve()) + + # Not default, dig deeper + if not profiles: + nestedDirs = ( + 'groups', + 'profiles-a-f', + 'profiles-g-l', + 'profiles-m-r', + 'profiles-s-z', + ) + for d in nestedDirs: + dirpath = pathlib.Path(pathlib.Path(profile_dir).resolve(), pathlib.Path(d)) + for p in dirpath.rglob("*"): + if p.is_file(): + profiles.add(p) + + return profiles + +def handleArgs(): + """DEGRADED are purposed for fatal errors - when the profile set will fail to load entirely""" + + allSeverities = ['DEBUG', 'NOTICE', 'WARNING', 'ERROR', 'CRITICAL', 'DEGRADED'] + aaRoot = '/etc/apparmor.d' + + parser = argparse.ArgumentParser() + parser.add_argument('-d', '--aa-root-dir', action='store', + default=aaRoot, + help='Target different AppArmor root directory rather than default') + parser.add_argument('-p', '--profile', action='append', + help='Handle only specified profile') +# parser.add_argument('-s', '--severity', action='append', +# choices=allSeverities, +# help='Handle only specified severity event') + + args = parser.parse_args() + +# if not args.severity: +# args.severity = allSeverities + + return args + +def main(argv): + + args = handleArgs() + + messages = [] + + profile_dir = args.aa_root_dir + if not args.profile: + profiles = findAllProfileFilenames(profile_dir) + else: + profiles = set() + for p in args.profile: + absolutePath = pathlib.Path(p).resolve() + profiles.add(absolutePath) + + profile_data = {} + for path in sorted(profiles): + readApparmorFile_Out = readApparmorFile(path) + profilesInFile = readApparmorFile_Out[1] + messages.extend(readApparmorFile_Out[0]) + profile_data.update(profilesInFile) + + for m in messages: + m['filename'] = str(m.get('filename')) + print(json.dumps(m, indent=2)) + + if messages: + sys.exit(1) + + return None + +if __name__ == '__main__': + '''Safeguard errors does NOT cover loosening existing profiles after loading!''' + try: + from apparmor.regex import * + from apparmor.aa import is_skippable_file + from apparmor.rule.file import FileRule, FileRuleset + from apparmor.common import convert_regexp + try: + from apparmor.rule.variable import separate_vars + except ModuleNotFoundError: + from apparmor.aa import separate_vars + + except ModuleNotFoundError: + raise ModuleNotFoundError(f"""Can't find 'python3-apparmor' package! Install with: +$ sudo apt install python3-apparmor""") + + main(sys.argv)