mirror of
https://gitlab.com/apparmor/apparmor.git
synced 2025-03-04 08:24:42 +01:00
Added read from custom logfile feature and some other older changes I sadly dont remember
This commit is contained in:
parent
eb61520753
commit
42ea5f4f67
12 changed files with 83 additions and 90 deletions
|
@ -1,4 +1,3 @@
|
|||
Known Bugs:
|
||||
Will allow multiple letters in the () due to translation/unicode issues with regexing the key.
|
||||
User input will probably bug out in a different locale.
|
||||
Moving the arrow keys to select or de-select option in yes and no doesn't work.
|
||||
|
|
|
@ -146,4 +146,4 @@ class Test(unittest.TestCase):
|
|||
|
||||
if __name__ == "__main__":
|
||||
#import sys;sys.argv = ['', 'Test.testName']
|
||||
unittest.main()
|
||||
unittest.main()
|
||||
|
|
|
@ -23,7 +23,7 @@ class Test(unittest.TestCase):
|
|||
|
||||
|
||||
def test_RegexParser(self):
|
||||
tests=apparmor.config.Config('ini')
|
||||
tests = apparmor.config.Config('ini')
|
||||
tests.CONF_DIR = '.'
|
||||
regex_tests = tests.read_config('regex_tests.ini')
|
||||
for regex in regex_tests.sections():
|
||||
|
@ -38,4 +38,4 @@ class Test(unittest.TestCase):
|
|||
|
||||
if __name__ == "__main__":
|
||||
#import sys;sys.argv = ['', 'Test.test_RegexParser']
|
||||
unittest.main()
|
||||
unittest.main()
|
||||
|
|
|
@ -49,4 +49,4 @@ class Test(unittest.TestCase):
|
|||
|
||||
if __name__ == "__main__":
|
||||
#import sys;sys.argv = ['', 'Test.testConfig']
|
||||
unittest.main()
|
||||
unittest.main()
|
||||
|
|
|
@ -27,7 +27,7 @@ test_path = '/usr/sbin/ntpd'
|
|||
local_profilename = './profiles/usr.sbin.ntpd'
|
||||
|
||||
python_interpreter = 'python'
|
||||
if sys.version_info >= (3,0):
|
||||
if sys.version_info >= (3, 0):
|
||||
python_interpreter = 'python3'
|
||||
|
||||
class Test(unittest.TestCase):
|
||||
|
@ -105,17 +105,16 @@ class Test(unittest.TestCase):
|
|||
|
||||
def test_autodep(self):
|
||||
pass
|
||||
|
||||
|
||||
def test_unconfined(self):
|
||||
output = subprocess.check_output('%s ./../Tools/aa-unconfined'%python_interpreter, shell=True)
|
||||
|
||||
|
||||
output_force = subprocess.check_output('%s ./../Tools/aa-unconfined --paranoid'%python_interpreter, shell=True)
|
||||
|
||||
|
||||
self.assertIsNot(output, '', 'Failed to run aa-unconfined')
|
||||
|
||||
|
||||
self.assertIsNot(output_force, '', 'Failed to run aa-unconfined in paranoid mode')
|
||||
|
||||
|
||||
|
||||
|
||||
def test_cleanprof(self):
|
||||
input_file = 'cleanprof_test.in'
|
||||
|
@ -147,7 +146,7 @@ if __name__ == "__main__":
|
|||
#Should be the set of cleanprofile
|
||||
shutil.copytree('/etc/apparmor.d', './profiles', symlinks=True)
|
||||
|
||||
apparmor.profile_dir='./profiles'
|
||||
apparmor.profile_dir = './profiles'
|
||||
|
||||
atexit.register(clean_profile_dir)
|
||||
|
||||
|
|
|
@ -34,57 +34,57 @@ class Test(unittest.TestCase):
|
|||
shutil.rmtree('./profiles')
|
||||
|
||||
def testRank_Test(self):
|
||||
s = severity.Severity('severity.db')
|
||||
rank = s.rank('/usr/bin/whatis', 'x')
|
||||
sev_db = severity.Severity('severity.db')
|
||||
rank = sev_db.rank('/usr/bin/whatis', 'x')
|
||||
self.assertEqual(rank, 5, 'Wrong rank')
|
||||
rank = s.rank('/etc', 'x')
|
||||
rank = sev_db.rank('/etc', 'x')
|
||||
self.assertEqual(rank, 10, 'Wrong rank')
|
||||
rank = s.rank('/dev/doublehit', 'x')
|
||||
rank = sev_db.rank('/dev/doublehit', 'x')
|
||||
self.assertEqual(rank, 0, 'Wrong rank')
|
||||
rank = s.rank('/dev/doublehit', 'rx')
|
||||
rank = sev_db.rank('/dev/doublehit', 'rx')
|
||||
self.assertEqual(rank, 4, 'Wrong rank')
|
||||
rank = s.rank('/dev/doublehit', 'rwx')
|
||||
rank = sev_db.rank('/dev/doublehit', 'rwx')
|
||||
self.assertEqual(rank, 8, 'Wrong rank')
|
||||
rank = s.rank('/dev/tty10', 'rwx')
|
||||
rank = sev_db.rank('/dev/tty10', 'rwx')
|
||||
self.assertEqual(rank, 9, 'Wrong rank')
|
||||
rank = s.rank('/var/adm/foo/**', 'rx')
|
||||
rank = sev_db.rank('/var/adm/foo/**', 'rx')
|
||||
self.assertEqual(rank, 3, 'Wrong rank')
|
||||
rank = s.rank('CAP_KILL')
|
||||
rank = sev_db.rank('CAP_KILL')
|
||||
self.assertEqual(rank, 8, 'Wrong rank')
|
||||
rank = s.rank('CAP_SETPCAP')
|
||||
rank = sev_db.rank('CAP_SETPCAP')
|
||||
self.assertEqual(rank, 9, 'Wrong rank')
|
||||
self.assertEqual(s.rank('/etc/apparmor/**', 'r') , 6, 'Invalid Rank')
|
||||
self.assertEqual(s.rank('/etc/**', 'r') , 10, 'Invalid Rank')
|
||||
self.assertEqual(sev_db.rank('/etc/apparmor/**', 'r') , 6, 'Invalid Rank')
|
||||
self.assertEqual(sev_db.rank('/etc/**', 'r') , 10, 'Invalid Rank')
|
||||
|
||||
# Load all variables for /sbin/klogd and test them
|
||||
s.load_variables('profiles/sbin.klogd')
|
||||
self.assertEqual(s.rank('@{PROC}/sys/vm/overcommit_memory', 'r'), 6, 'Invalid Rank')
|
||||
self.assertEqual(s.rank('@{HOME}/sys/@{PROC}/overcommit_memory', 'r'), 10, 'Invalid Rank')
|
||||
self.assertEqual(s.rank('/overco@{multiarch}mmit_memory', 'r'), 10, 'Invalid Rank')
|
||||
sev_db.load_variables('profiles/sbin.klogd')
|
||||
self.assertEqual(sev_db.rank('@{PROC}/sys/vm/overcommit_memory', 'r'), 6, 'Invalid Rank')
|
||||
self.assertEqual(sev_db.rank('@{HOME}/sys/@{PROC}/overcommit_memory', 'r'), 10, 'Invalid Rank')
|
||||
self.assertEqual(sev_db.rank('/overco@{multiarch}mmit_memory', 'r'), 10, 'Invalid Rank')
|
||||
|
||||
s.unload_variables()
|
||||
sev_db.unload_variables()
|
||||
|
||||
s.load_variables('profiles/usr.sbin.dnsmasq')
|
||||
self.assertEqual(s.rank('@{PROC}/sys/@{TFTP_DIR}/overcommit_memory', 'r'), 6, 'Invalid Rank')
|
||||
self.assertEqual(s.rank('@{PROC}/sys/vm/overcommit_memory', 'r'), 6, 'Invalid Rank')
|
||||
self.assertEqual(s.rank('@{HOME}/sys/@{PROC}/overcommit_memory', 'r'), 10, 'Invalid Rank')
|
||||
self.assertEqual(s.rank('/overco@{multiarch}mmit_memory', 'r'), 10, 'Invalid Rank')
|
||||
sev_db.load_variables('profiles/usr.sbin.dnsmasq')
|
||||
self.assertEqual(sev_db.rank('@{PROC}/sys/@{TFTP_DIR}/overcommit_memory', 'r'), 6, 'Invalid Rank')
|
||||
self.assertEqual(sev_db.rank('@{PROC}/sys/vm/overcommit_memory', 'r'), 6, 'Invalid Rank')
|
||||
self.assertEqual(sev_db.rank('@{HOME}/sys/@{PROC}/overcommit_memory', 'r'), 10, 'Invalid Rank')
|
||||
self.assertEqual(sev_db.rank('/overco@{multiarch}mmit_memory', 'r'), 10, 'Invalid Rank')
|
||||
|
||||
#self.assertEqual(s.rank('/proc/@{PID}/maps', 'rw'), 9, 'Invalid Rank')
|
||||
#self.assertEqual(sev_db.rank('/proc/@{PID}/maps', 'rw'), 9, 'Invalid Rank')
|
||||
|
||||
def testInvalid(self):
|
||||
s = severity.Severity('severity.db')
|
||||
rank = s.rank('/dev/doublehit', 'i')
|
||||
sev_db = severity.Severity('severity.db')
|
||||
rank = sev_db.rank('/dev/doublehit', 'i')
|
||||
self.assertEqual(rank, 10, 'Wrong')
|
||||
try:
|
||||
broken = severity.Severity('severity_broken.db')
|
||||
severity.Severity('severity_broken.db')
|
||||
except AppArmorException:
|
||||
pass
|
||||
rank = s.rank('CAP_UNKOWN')
|
||||
rank = s.rank('CAP_K*')
|
||||
rank = sev_db.rank('CAP_UNKOWN')
|
||||
rank = sev_db.rank('CAP_K*')
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
#import sys;sys.argv = ['', 'Test.testName']
|
||||
unittest.main()
|
||||
unittest.main()
|
||||
|
|
|
@ -58,6 +58,12 @@ profiling = args.program
|
|||
profiledir = args.dir
|
||||
filename = args.file
|
||||
|
||||
|
||||
if not os.path.isfile(filename):
|
||||
raise apparmor.AppArmorException(_('The logfile %s does not exist. Please check the path') % filename)
|
||||
|
||||
apparmor.filename = filename
|
||||
|
||||
aa_mountpoint = apparmor.check_for_apparmor()
|
||||
if not aa_mountpoint:
|
||||
raise apparmor.AppArmorException(_('It seems AppArmor was not started. Please enable AppArmor and try again.'))
|
||||
|
|
|
@ -31,6 +31,11 @@ filename = args.file
|
|||
logmark = args.mark or ''
|
||||
|
||||
|
||||
if not os.path.isfile(filename):
|
||||
raise apparmor.AppArmorException(_('The logfile %s does not exist. Please check the path') % filename)
|
||||
|
||||
apparmor.filename = filename
|
||||
|
||||
aa_mountpoint = apparmor.check_for_apparmor()
|
||||
if not aa_mountpoint:
|
||||
raise apparmor.AppArmorException(_('It seems AppArmor was not started. Please enable AppArmor and try again.'))
|
||||
|
|
|
@ -13,8 +13,6 @@
|
|||
#
|
||||
# ----------------------------------------------------------------------
|
||||
import argparse
|
||||
import gettext
|
||||
import locale
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
|
@ -24,82 +22,70 @@ init_translations()
|
|||
|
||||
import apparmor.aa as apparmor
|
||||
|
||||
#gettext.bindtextdomain('apparmor-utils', '/usr/share/locale/')#/%s/LC_MESSAGES/apparmor-utils.mo' % locale.getlocale()[0])
|
||||
#gettext.textdomain('apparmor-utils')
|
||||
#_ = gettext.gettext
|
||||
#gettext.translation('apparmor-utils','./Trans/')
|
||||
#gettext.install('apparmor-utils')
|
||||
#print(os.path.join('/usr/share/locale', locale.getlocale()[0], 'LC_MESSAGES', '%s.mo' %
|
||||
# 'apparmor-utils'))
|
||||
#_ = gettext.translation('apparmor-utils', '/usr/share/locale', [locale.getlocale()[0]]).gettext
|
||||
|
||||
#gettext.find
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(description=_('Lists unconfined processes having tcp or udp ports'))
|
||||
parser.add_argument('--paranoid', action='store_true', help=_('scan all processes from /proc'))
|
||||
parser = argparse.ArgumentParser(description=_("Lists unconfined processes having tcp or udp ports"))
|
||||
parser.add_argument("--paranoid", action="store_true", help=_("scan all processes from /proc"))
|
||||
args = parser.parse_args()
|
||||
|
||||
paranoid = args.paranoid
|
||||
|
||||
aa_mountpoint = apparmor.check_for_apparmor()
|
||||
if not aa_mountpoint:
|
||||
raise apparmor.AppArmorException(_('It seems AppArmor was not started. Please enable AppArmor and try again.'))
|
||||
raise apparmor.AppArmorException(_("It seems AppArmor was not started. Please enable AppArmor and try again."))
|
||||
|
||||
pids = []
|
||||
if paranoid:
|
||||
pids = list(filter(lambda x: re.search('^\d+$', x), apparmor.get_subdirectories('/proc')))
|
||||
pids = list(filter(lambda x: re.search(r"^\d+$", x), apparmor.get_subdirectories("/proc")))
|
||||
else:
|
||||
regex_tcp_udp = re.compile('^(tcp|udp)\s+\d+\s+\d+\s+\S+\:(\d+)\s+\S+\:(\*|\d+)\s+(LISTEN|\s+)\s+(\d+)\/(\S+)')
|
||||
regex_tcp_udp = re.compile(r"^(tcp|udp)\s+\d+\s+\d+\s+\S+\:(\d+)\s+\S+\:(\*|\d+)\s+(LISTEN|\s+)\s+(\d+)\/(\S+)")
|
||||
import subprocess
|
||||
if sys.version_info < (3,0):
|
||||
output = subprocess.check_output('LANG=C netstat -nlp', shell=True).split('\n')
|
||||
if sys.version_info < (3, 0):
|
||||
output = subprocess.check_output("LANG=C netstat -nlp", shell=True).split("\n")
|
||||
else:
|
||||
#Python3 needs to translate a stream of bytes to string with specified encoding
|
||||
output = str(subprocess.check_output('LANG=C netstat -nlp', shell=True), encoding='utf8').split('\n')
|
||||
output = str(subprocess.check_output("LANG=C netstat -nlp", shell=True), encoding='utf8').split("\n")
|
||||
|
||||
for line in output:
|
||||
match = regex_tcp_udp.search(line)
|
||||
if match:
|
||||
pids.append(match.groups()[4])
|
||||
# We can safely remove duplicate pid's?
|
||||
pids = list(map(lambda x: int(x), set(pids)))
|
||||
pids = list(map(int, set(pids)))
|
||||
|
||||
for pid in sorted(pids):
|
||||
try:
|
||||
prog = os.readlink('/proc/%s/exe'%pid)
|
||||
except:
|
||||
prog = os.readlink("/proc/%s/exe"%pid)
|
||||
except IOError:
|
||||
continue
|
||||
attr = None
|
||||
if os.path.exists('/proc/%s/attr/current'%pid):
|
||||
with apparmor.open_file_read('/proc/%s/attr/current'%pid) as current:
|
||||
if os.path.exists("/proc/%s/attr/current"%pid):
|
||||
with apparmor.open_file_read("/proc/%s/attr/current"%pid) as current:
|
||||
for line in current:
|
||||
if line.startswith('/') or line.startswith('null'):
|
||||
if line.startswith("/") or line.startswith("null"):
|
||||
attr = line.strip()
|
||||
|
||||
cmdline = apparmor.cmd(['cat', '/proc/%s/cmdline'%pid])[1]
|
||||
pname = cmdline.split('\0')[0]
|
||||
cmdline = apparmor.cmd(["cat", "/proc/%s/cmdline"%pid])[1]
|
||||
pname = cmdline.split("\0")[0]
|
||||
if '/' in pname and pname != prog:
|
||||
pname = '(%s)'%pname
|
||||
pname = "(%s)"% pname
|
||||
else:
|
||||
pname = ''
|
||||
regex_interpreter = re.compile('^(/usr)?/bin/(python|perl|bash|dash|sh)$')
|
||||
pname = ""
|
||||
regex_interpreter = re.compile(r"^(/usr)?/bin/(python|perl|bash|dash|sh)$")
|
||||
if not attr:
|
||||
if regex_interpreter.search(prog):
|
||||
cmdline = re.sub('\x00', ' ', cmdline)
|
||||
cmdline = re.sub('\s+$', '', cmdline).strip()
|
||||
cmdline = re.sub(r"\x00", " ", cmdline)
|
||||
cmdline = re.sub(r"\s+$", "", cmdline).strip()
|
||||
|
||||
apparmor.UI_Info(_('%s %s (%s) not confined\n')%(pid, prog, cmdline))
|
||||
apparmor.UI_Info(_("%s %s (%s) not confined\n")%(pid, prog, cmdline))
|
||||
else:
|
||||
if pname and pname[-1] == ')':
|
||||
pname += ' '
|
||||
apparmor.UI_Info(_('%s %s %snot confined\n')%(pid, prog, pname))
|
||||
apparmor.UI_Info(_("%s %s %snot confined\n")%(pid, prog, pname))
|
||||
else:
|
||||
if regex_interpreter.search(prog):
|
||||
cmdline = re.sub('\0', ' ', cmdline)
|
||||
cmdline = re.sub('\s+$', '', cmdline).strip()
|
||||
cmdline = re.sub(r"\0", " ", cmdline)
|
||||
cmdline = re.sub(r"\s+$", "", cmdline).strip()
|
||||
apparmor.UI_Info(_("%s %s (%s) confined by '%s'\n")%(pid, prog, cmdline, attr))
|
||||
else:
|
||||
if pname and pname[-1] == ')':
|
||||
pname += ' '
|
||||
apparmor.UI_Info(_("%s %s %sconfined by '%s'\n")%(pid, prog, pname, attr))
|
||||
apparmor.UI_Info(_("%s %s %sconfined by '%s'\n")%(pid, prog, pname, attr))
|
||||
|
|
|
@ -10,16 +10,15 @@
|
|||
# ------------------------------------------------------------------
|
||||
import gettext
|
||||
import locale
|
||||
|
||||
|
||||
def init_localisation():
|
||||
locale.setlocale(locale.LC_ALL, '')
|
||||
#If a correct locale has been provided set filename else let an IOError be raised
|
||||
filename = '/usr/share/locale/%s/LC_MESSAGES/apparmor-utils.mo' % locale.getlocale()[0]
|
||||
try:
|
||||
trans = gettext.GNUTranslations(open(filename, 'rb'))
|
||||
print("Locale installed for %s"%locale.getlocale()[0])
|
||||
except IOError:
|
||||
trans = gettext.NullTranslations()
|
||||
trans.install()
|
||||
|
||||
init_localisation()
|
||||
|
||||
init_localisation()
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# ----------------------------------------------------------------------
|
||||
# No old version logs, only 2.6 + supported
|
||||
from __future__ import with_statement
|
||||
import codecs
|
||||
import inspect
|
||||
import os
|
||||
import re
|
||||
|
@ -113,7 +112,7 @@ def check_for_LD_XXX(file):
|
|||
# Limit to checking files under 100k for the sake of speed
|
||||
if size >100000:
|
||||
return False
|
||||
with codecs.open(file, 'r', encoding='ascii') as f_in:
|
||||
with open_file_read(file, encoding='ascii') as f_in:
|
||||
for line in f_in:
|
||||
if 'LD_PRELOAD' in line or 'LD_LIBRARY_PATH' in line:
|
||||
found = True
|
||||
|
|
|
@ -132,10 +132,10 @@ def get_directory_contents(path):
|
|||
files.sort()
|
||||
return files
|
||||
|
||||
def open_file_read(path):
|
||||
def open_file_read(path, encoding='UTF-8'):
|
||||
'''Open specified file read-only'''
|
||||
try:
|
||||
orig = codecs.open(path, 'r', 'UTF-8')
|
||||
orig = codecs.open(path, 'r', encoding)
|
||||
except Exception:
|
||||
raise
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue