Move get_last_login_timestamp() into apparmor.notify

This is a preparation to make adding tests easier.
This commit is contained in:
Christian Boltz 2021-10-24 12:54:20 +02:00
parent b6551618d8
commit 6dc9884c8e
Failed to generate hash of commit
2 changed files with 65 additions and 43 deletions

View file

@ -34,7 +34,6 @@ import os
import re
import sys
import time
import struct
import notify2
import psutil
import pwd
@ -45,6 +44,7 @@ import apparmor.ui as aaui
import apparmor.config as aaconfig
from apparmor.common import DebugLogger, open_file_read
from apparmor.fail import enable_aa_exception_handler
from apparmor.notify import get_last_login_timestamp
from apparmor.translations import init_translation
import LibAppArmor # C-library to parse one log line
@ -61,48 +61,6 @@ def get_user_login():
return username
def get_last_login_timestamp(username):
'''Directly read wtmp and get last login for user as epoch timestamp'''
timestamp = 0
filename = '/var/log/wtmp'
last_login = 0
debug_logger.debug('Username: {}'.format(username))
with open(filename, "rb") as wtmp_file:
offset = 0
wtmp_filesize = os.path.getsize(filename)
debug_logger.debug('WTMP filesize: {}'.format(wtmp_filesize))
while offset < wtmp_filesize:
wtmp_file.seek(offset)
offset += 384 # Increment for next entry
type = struct.unpack("<L", wtmp_file.read(4))[0]
debug_logger.debug('WTMP entry type: {}'.format(type))
# Only parse USER lines
if type == 7:
# Read each item and move pointer forward
pid = struct.unpack("<L", wtmp_file.read(4))[0]
line = wtmp_file.read(32).decode("utf-8", "replace").split('\0', 1)[0]
id = wtmp_file.read(4).decode("utf-8", "replace").split('\0', 1)[0]
user = wtmp_file.read(32).decode("utf-8", "replace").split('\0', 1)[0]
host = wtmp_file.read(256).decode("utf-8", "replace").split('\0', 1)[0]
term = struct.unpack("<H", wtmp_file.read(2))[0]
exit = struct.unpack("<H", wtmp_file.read(2))[0]
session = struct.unpack("<L", wtmp_file.read(4))[0]
timestamp = struct.unpack("<L", wtmp_file.read(4))[0]
usec = struct.unpack("<L", wtmp_file.read(4))[0]
entry = (pid, line, id, user, host, term, exit, session, timestamp, usec)
debug_logger.debug('WTMP entry: {}'.format(entry))
# Store login timestamp for requested user
if user == username:
last_login = timestamp
# When loop is done, last value should be the latest login timestamp
return last_login
def format_event(event, logsource):
output = []

64
utils/apparmor/notify.py Normal file
View file

@ -0,0 +1,64 @@
#! /usr/bin/python3
# ----------------------------------------------------------------------
# Copyright (C) 20182019 Otto Kekäläinen <otto@kekalainen.net>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of version 2 of the GNU General Public
# License as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# ----------------------------------------------------------------------
import os
import struct
from apparmor.common import AppArmorBug, DebugLogger
debug_logger = DebugLogger('apparmor.notify')
def get_last_login_timestamp(username):
'''Directly read wtmp and get last login for user as epoch timestamp'''
timestamp = 0
filename = '/var/log/wtmp'
last_login = 0
debug_logger.debug('Username: {}'.format(username))
with open(filename, "rb") as wtmp_file:
offset = 0
wtmp_filesize = os.path.getsize(filename)
debug_logger.debug('WTMP filesize: {}'.format(wtmp_filesize))
while offset < wtmp_filesize:
wtmp_file.seek(offset)
offset += 384 # Increment for next entry
type = struct.unpack("<L", wtmp_file.read(4))[0]
debug_logger.debug('WTMP entry type: {}'.format(type))
# Only parse USER lines
if type == 7:
# Read each item and move pointer forward
pid = struct.unpack("<L", wtmp_file.read(4))[0]
line = wtmp_file.read(32).decode("utf-8", "replace").split('\0', 1)[0]
id = wtmp_file.read(4).decode("utf-8", "replace").split('\0', 1)[0]
user = wtmp_file.read(32).decode("utf-8", "replace").split('\0', 1)[0]
host = wtmp_file.read(256).decode("utf-8", "replace").split('\0', 1)[0]
term = struct.unpack("<H", wtmp_file.read(2))[0]
exit = struct.unpack("<H", wtmp_file.read(2))[0]
session = struct.unpack("<L", wtmp_file.read(4))[0]
timestamp = struct.unpack("<L", wtmp_file.read(4))[0]
usec = struct.unpack("<L", wtmp_file.read(4))[0]
entry = (pid, line, id, user, host, term, exit, session, timestamp, usec)
debug_logger.debug('WTMP entry: {}'.format(entry))
# Store login timestamp for requested user
if user == username:
last_login = timestamp
# When loop is done, last value should be the latest login timestamp
return last_login