#! /usr/bin/python3 # ---------------------------------------------------------------------- # Copyright (C) 2013 Kshitij Gupta # Copyright (C) 2016 Canonical, Ltd. # # This program is free software; you can redistribute it and/or # modify it under the terms of version 2 of the GNU General Public # License as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # ---------------------------------------------------------------------- import argparse import os import re import subprocess import sys import apparmor.aa as aa import apparmor.ui as ui from apparmor.common import AppArmorException, open_file_read from apparmor.fail import enable_aa_exception_handler from apparmor.translations import init_translation enable_aa_exception_handler() # setup exception handling _ = init_translation() # setup module translations parser = argparse.ArgumentParser(description=_("Lists unconfined processes having tcp or udp ports")) parser.add_argument("--paranoid", action="store_true", help=_("scan all processes")) parser.add_argument("--show", default=None, type=str, help=_("all | network | server | client")) parser.add_argument("--short", action="store_true", help=_("only display processes that are unconfined")) parser.add_argument('--configdir', type=str, help=argparse.SUPPRESS) bin_group = parser.add_mutually_exclusive_group() bin_group.add_argument("--with-ss", action='store_true', help=_("use ss(8) to find listening processes (default)")) bin_group.add_argument("--with-netstat", action='store_true', help=_("use netstat(8) to find listening processes")) args = parser.parse_args() # set default set of processes to show show = 'server' if args.paranoid: if args.show is not None and args.show != 'all': raise AppArmorException(_("Arguments --paranoid and --show=%s conflict") % args.show) show = 'all' if args.show is not None: if not args.show or args.show not in ['all', 'network', 'server', 'client']: raise AppArmorException(_("Argument --show invalid value '%s'") % args.show) show = args.show aa.init_aa(confdir=args.configdir) aa_mountpoint = aa.check_for_apparmor() if not aa_mountpoint: raise AppArmorException(_("It seems AppArmor was not started. Please enable AppArmor and try again.")) def map_show_to_flags(show): flags = '-nlp' if show == 'client': flags = '-np' elif show == 'network': flags = '-nap' return flags def get_all_pids(): """Return a set of all pids via walking /proc""" return set(filter(lambda x: re.search(r"^\d+$", x), aa.get_subdirectories("/proc"))) def get_pids_ss(flags, ss='ss'): """Get a set of pids listening on network sockets via ss(8)""" regex_lines = re.compile(r"^(tcp|udp|raw|p_dgr)\s.+\s+users:(?P\(\(.*\)\))$") regex_users_pids = re.compile(r'(\("[^"]+",(pid=)?(\d+),[^)]+\))') pids = set() my_env = os.environ.copy() my_env['LANG'] = 'C' my_env['PATH'] = '/bin:/usr/bin:/sbin:/usr/sbin' for family in ['inet', 'inet6', 'link']: cmd = [ss, flags, '--family', family] if sys.version_info < (3, 0): output = subprocess.check_output(cmd, shell=False, env=my_env).split("\n") else: # Python3 needs to translate a stream of bytes to string with specified encoding output = str(subprocess.check_output(cmd, shell=False, env=my_env), encoding='utf8').split("\n") for line in output: match = regex_lines.search(line.strip()) if match: users = match.group('users') for (_, _, pid) in regex_users_pids.findall(users): pids.add(pid) return pids def get_pids_netstat(flags, netstat='netstat'): """Get a set of pids listening on network sockets via netstat(8)""" regex_tcp_udp = re.compile(r"^(tcp|udp|raw)6?\s+\d+\s+\d+\s+\S+:(\d+)\s+\S+:(\*|\d+)\s+(LISTEN|\d+|\s+)\s+(?P\d+)/(\S+)") cmd = [netstat, flags, '--protocol', 'inet,inet6'] my_env = os.environ.copy() my_env['LANG'] = 'C' my_env['PATH'] = '/bin:/usr/bin:/sbin:/usr/sbin' if sys.version_info < (3, 0): output = subprocess.check_output(cmd, shell=False, env=my_env).split("\n") else: # Python3 needs to translate a stream of bytes to string with specified encoding output = str(subprocess.check_output(cmd, shell=False, env=my_env), encoding='utf8').split("\n") pids = set() for line in output: match = regex_tcp_udp.search(line) if match: pids.add(match.group('pid')) return pids def read_proc_current(filename): attr = None try: # don't bother with if os.path.exists(filename): there is always a race with open_file_read(filename) as current: for line in current: line = line.strip() if line.endswith(' (complain)', 1) or line.endswith(' (enforce)', 1) or line.endswith(' (kill)', 1) or line.endswith(' (user)', 1) or line.endswith(' (mixed)', 1): # enforce at least one char as profile name # intentionally not checking for '(unconfined)', because $binary confined by $profile (unconfined) would look very confusing attr = line except OSError: # just ignore errors atm # print("Error trying to open {filename}") return None return attr def escape_special_chars(data): """escape special characters in program names so that they can't mess up the terminal""" data = repr(data) if len(data) > 1 and data.startswith("'") and data.endswith("'"): return data[1:-1] else: return data pids = set() if show == 'all': pids = get_all_pids() elif args.with_ss or (not args.with_netstat and (aa.which("ss") is not None)): pids = get_pids_ss(map_show_to_flags(show)) else: pids = get_pids_netstat(map_show_to_flags(show)) for pid in sorted(map(int, pids)): try: prog = os.readlink("/proc/%s/exe" % pid) prog = escape_special_chars(prog) except OSError: continue if os.path.exists("/proc/%s/attr/apparmor/current" % pid): attr = read_proc_current("/proc/%s/attr/apparmor/current" % pid) else: # fallback to shared attr/current if attr/apparmor/current doesn't exist attr = read_proc_current("/proc/%s/attr/current" % pid) pname = None cmdline = None with open_file_read("/proc/%s/cmdline" % pid) as cmd: cmdline = cmd.readlines()[0] pname = cmdline.split("\0")[0] if '/' in pname and pname != prog: pname = "(%s)" % pname pname = escape_special_chars(pname) else: pname = "" regex_interpreter = re.compile(r"^(/usr)?/bin/(python|perl|bash|dash|sh)$") if not attr: if regex_interpreter.search(prog): cmdline = re.sub(r"\x00", " ", cmdline) cmdline = re.sub(r"\s+$", "", cmdline).strip() cmdline = escape_special_chars(cmdline) ui.UI_Info(_("%(pid)s %(program)s (%(commandline)s) not confined") % {'pid': pid, 'program': prog, 'commandline': cmdline}) else: if pname and pname[-1] == ')': pname = ' ' + pname ui.UI_Info(_("%(pid)s %(program)s%(pname)s not confined") % {'pid': pid, 'program': prog, 'pname': pname}) elif not args.short: if regex_interpreter.search(prog): cmdline = re.sub(r"\0", " ", cmdline) cmdline = re.sub(r"\s+$", "", cmdline).strip() cmdline = escape_special_chars(cmdline) ui.UI_Info(_("%(pid)s %(program)s (%(commandline)s) confined by '%(attribute)s'") % {'pid': pid, 'program': prog, 'commandline': cmdline, 'attribute': attr}) else: if pname and pname[-1] == ')': pname = ' ' + pname ui.UI_Info(_("%(pid)s %(program)s%(pname)s confined by '%(attribute)s'") % {'pid': pid, 'program': prog, 'pname': pname, 'attribute': attr})