Split ui into separate subpackage

Move LinuxDesktopParser into ui subpackage, it does not belong in the
Opensnitch core
This commit is contained in:
adisbladis 2017-05-21 15:24:34 +08:00
parent 2e4b91ceea
commit 302992c30d
Failed to generate hash of commit
10 changed files with 184 additions and 112 deletions

View file

@ -16,91 +16,13 @@
# program. If not, go to http://www.gnu.org/licenses/gpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from threading import Lock
import configparser
import pyinotify
import threading
import logging
import glob
import os
DESKTOP_PATHS = [
os.path.join(d, 'applications')
for d in os.getenv('XDG_DATA_DIRS', '/usr/share/').split(':')
]
class LinuxDesktopParser(threading.Thread):
def __init__(self):
super().__init__()
self.lock = Lock()
self.daemon = True
self.running = False
self.apps = {}
for desktop_path in DESKTOP_PATHS:
if not os.path.exists(desktop_path):
continue
for desktop_file in glob.glob(os.path.join(desktop_path,
'*.desktop')):
self.populate_app(desktop_file)
self.start()
def populate_app(self, desktop_path):
parser = configparser.ConfigParser(
strict=False) # Allow duplicate config entries
parser.read(desktop_path, 'utf8')
cmd = parser.get('Desktop Entry', 'exec', raw=True,
fallback=' ').split(' ')[0] or None
if cmd is None:
return
icon = parser.get('Desktop Entry', 'icon',
raw=True, fallback=None)
name = parser.get('Desktop Entry', 'name',
raw=True, fallback=None)
with self.lock:
self.apps[cmd] = (name, icon, desktop_path)
def get_info_by_path(self, path):
path = os.path.basename(path)
with self.lock:
return self.apps.get(path, (path, None))[:2]
def run(self):
self.running = True
wm = pyinotify.WatchManager()
notifier = pyinotify.Notifier(wm)
def inotify_callback(event):
if event.mask == pyinotify.IN_CLOSE_WRITE:
self.populate_app(event.pathname)
elif event.mask == pyinotify.IN_DELETE:
with self.lock:
for cmd, data in self.apps.items():
if data[2] == event.pathname:
del self.apps[cmd]
break
for p in DESKTOP_PATHS:
if os.path.exists(p):
wm.add_watch(p,
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_DELETE,
inotify_callback)
notifier.loop()
class Application:
def __init__(self, procmon, desktop_parser, pid, path):
def __init__(self, procmon, pid, path):
self.pid = pid
self.path = path
self.name, self.icon = desktop_parser.get_info_by_path(path)
try:

View file

@ -23,7 +23,7 @@ from socket import inet_ntoa, getservbyport
class Connection:
def __init__(self, packet_id, procmon, desktop_parser, payload):
def __init__(self, packet_id, procmon, payload):
self.id = packet_id
self.data = payload
self.pkt = ip.IP( self.data )
@ -66,8 +66,7 @@ class Connection:
self.dst_addr,
self.dst_port,
self.proto)
self.app = Application(procmon, desktop_parser,
self.pid, self.app_path)
self.app = Application(procmon, self.pid, self.app_path)
self.app_path = self.app.path
def get_app_name(self):

View file

@ -30,7 +30,6 @@ from opensnitch.connection import Connection
from opensnitch.dns import DNSCollector
from opensnitch.rule import Rule, Rules
from opensnitch.procmon import ProcMon
from opensnitch.app import LinuxDesktopParser
MARK_PACKET_DROP = 101285
@ -113,7 +112,6 @@ class Snitch:
# TODO: Support IPv6!
def __init__(self, database):
self.desktop_parser = LinuxDesktopParser()
self.lock = Lock()
self.rules = Rules(database)
self.dns = DNSCollector()
@ -132,8 +130,7 @@ class Snitch:
return
self.latest_packet_id += 1
conn = Connection(self.latest_packet_id, self.procmon,
self.desktop_parser, data)
conn = Connection(self.latest_packet_id, self.procmon, data)
if conn.proto is None:
logging.debug("Could not detect protocol for packet.")
return

21
opensnitch/ui/__init__.py Normal file
View file

@ -0,0 +1,21 @@
# This file is part of OpenSnitch.
#
# Copyright(c) 2017 Simone Margaritelli
# evilsocket@gmail.com
# http://www.evilsocket.net
#
# This file may be licensed under the terms of of the
# GNU General Public License Version 2 (the ``GPL'').
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the GPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the GPL along with this
# program. If not, go to http://www.gnu.org/licenses/gpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__all__ = ('QtApp',)
from .app import QtApp

48
opensnitch/ui/app.py Normal file
View file

@ -0,0 +1,48 @@
# This file is part of OpenSnitch.
#
# Copyright(c) 2017 Simone Margaritelli
# evilsocket@gmail.com
# http://www.evilsocket.net
#
# This file may be licensed under the terms of of the
# GNU General Public License Version 2 (the ``GPL'').
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the GPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the GPL along with this
# program. If not, go to http://www.gnu.org/licenses/gpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from PyQt5 import QtWidgets
import queue
import sys
import os
from .desktop_parser import LinuxDesktopParser
from .dialog import Dialog
# TODO: Implement tray icon and menu.
# TODO: Implement rules editor.
RESOURCES_PATH = "%s/resources/" % os.path.dirname(
sys.modules[__name__].__file__)
DIALOG_UI_PATH = "%s/dialog.ui" % RESOURCES_PATH
class QtApp:
def __init__(self, connection_futures, rules):
self.desktop_parser = LinuxDesktopParser()
self.app = QtWidgets.QApplication([])
self.connection_queue = queue.Queue()
self.rules = rules
self.dialog = Dialog(self, connection_futures, self.desktop_parser)
def run(self):
self.app.exec()
def prompt_user(self, connection):
self.connection_queue.put(connection)
self.dialog.add_connection_signal.emit()

View file

@ -0,0 +1,94 @@
# This file is part of OpenSnitch.
#
# Copyright(c) 2017 Simone Margaritelli
# evilsocket@gmail.com
# http://www.evilsocket.net
#
# This file may be licensed under the terms of of the
# GNU General Public License Version 2 (the ``GPL'').
#
# Software distributed under the License is distributed
# on an ``AS IS'' basis, WITHOUT WARRANTY OF ANY KIND, either
# express or implied. See the GPL for the specific language
# governing rights and limitations.
#
# You should have received a copy of the GPL along with this
# program. If not, go to http://www.gnu.org/licenses/gpl.html
# or write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from threading import Lock
import configparser
import pyinotify
import threading
import glob
import os
DESKTOP_PATHS = tuple([
os.path.join(d, 'applications')
for d in os.getenv('XDG_DATA_DIRS', '/usr/share/').split(':')
])
class LinuxDesktopParser(threading.Thread):
def __init__(self):
super().__init__()
self.lock = Lock()
self.daemon = True
self.running = False
self.apps = {}
for desktop_path in DESKTOP_PATHS:
if not os.path.exists(desktop_path):
continue
for desktop_file in glob.glob(os.path.join(desktop_path,
'*.desktop')):
self.populate_app(desktop_file)
self.start()
def populate_app(self, desktop_path):
parser = configparser.ConfigParser(
strict=False) # Allow duplicate config entries
parser.read(desktop_path, 'utf8')
cmd = parser.get('Desktop Entry', 'exec', raw=True,
fallback=' ').split(' ')[0] or None
if cmd is None:
return
icon = parser.get('Desktop Entry', 'icon',
raw=True, fallback=None)
name = parser.get('Desktop Entry', 'name',
raw=True, fallback=None)
with self.lock:
self.apps[cmd] = (name, icon, desktop_path)
def get_info_by_path(self, path):
path = os.path.basename(path)
return self.apps.get(path, (path, None))[:2]
def run(self):
self.running = True
wm = pyinotify.WatchManager()
notifier = pyinotify.Notifier(wm)
def inotify_callback(event):
if event.mask == pyinotify.IN_CLOSE_WRITE:
self.populate_app(event.pathname)
elif event.mask == pyinotify.IN_DELETE:
with self.lock:
for cmd, data in self.apps.items():
if data[2] == event.pathname:
del self.apps[cmd]
break
for p in DESKTOP_PATHS:
if os.path.exists(p):
wm.add_watch(p,
pyinotify.IN_CLOSE_WRITE | pyinotify.IN_DELETE,
inotify_callback)
notifier.loop()

View file

@ -31,21 +31,6 @@ RESOURCES_PATH = "%s/resources/" % os.path.dirname(
DIALOG_UI_PATH = "%s/dialog.ui" % RESOURCES_PATH
class QtApp:
def __init__(self, connection_futures, rules):
self.app = QtWidgets.QApplication([])
self.connection_queue = queue.Queue()
self.rules = rules
self.dialog = Dialog(self, connection_futures)
def run(self):
self.app.exec()
def prompt_user(self, connection):
self.connection_queue.put(connection)
self.dialog.add_connection_signal.emit()
class Dialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
DEFAULT_RESULT = (Rule.ONCE, Rule.ACCEPT, False)
@ -53,7 +38,7 @@ class Dialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
add_connection_signal = QtCore.pyqtSignal()
def __init__(self, app, connection_futures, parent=None):
def __init__(self, app, connection_futures, desktop_parser, parent=None):
self.connection = None
QtWidgets.QDialog.__init__(self, parent,
QtCore.Qt.WindowStaysOnTopHint)
@ -66,6 +51,8 @@ class Dialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.rules = app.rules
self.add_connection_signal.connect(self.handle_connection)
self.desktop_parser = desktop_parser
self.rule_lock = threading.Lock()
@QtCore.pyqtSlot()
@ -91,8 +78,11 @@ class Dialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
return self.add_connection_signal.emit()
self.connection = connection
self.setup_labels()
self.setup_icon()
app_name, app_icon = self.desktop_parser.get_info_by_path(
connection.app.path)
self.setup_labels(app_name)
self.setup_icon(app_icon)
self.setup_extra()
self.result = Dialog.DEFAULT_RESULT
self.action_combo_box.setCurrentIndex(0)
@ -101,9 +91,8 @@ class Dialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
def trigger_handle_connection(self):
return self.add_connection_signal.emit()
def setup_labels(self):
self.app_name_label.setText(
getattr(self.connection.app, 'name', 'Unknown'))
def setup_labels(self, app_name):
self.app_name_label.setText(app_name or 'Unknown')
message = self.MESSAGE_TEMPLATE % (
self.connection.get_app_name_and_cmdline(),
@ -139,9 +128,11 @@ class Dialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.action_combo_box.currentIndexChanged[str].connect(
self._action_changed)
def setup_icon(self):
if getattr(self.connection.app, 'icon', None) is not None:
icon = QtGui.QIcon().fromTheme(self.connection.app.icon)
def setup_icon(self, app_icon):
if app_icon is None:
return
icon = QtGui.QIcon().fromTheme(app_icon)
pixmap = icon.pixmap(icon.actualSize(QtCore.QSize(48, 48)))
self.icon_label.setPixmap(pixmap)