ui, popups: refactoring

will help to write unit tests more easily.
This commit is contained in:
Gustavo Iñiguez Goia 2023-12-01 23:05:40 +01:00
parent 86d3f54247
commit c297b0b539
Failed to generate hash of commit
2 changed files with 194 additions and 150 deletions

View file

@ -22,13 +22,16 @@ from opensnitch.rules import Rules, Rule
from opensnitch.nodes import Nodes
from opensnitch import ui_pb2
from opensnitch.dialogs.prompt import utils
DIALOG_UI_PATH = "%s/../res/prompt.ui" % os.path.dirname(sys.modules[__name__].__file__)
DIALOG_UI_PATH = "%s/../../res/prompt.ui" % os.path.dirname(sys.modules[__name__].__file__)
class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
_prompt_trigger = QtCore.pyqtSignal()
_tick_trigger = QtCore.pyqtSignal()
_timeout_trigger = QtCore.pyqtSignal()
TYPE = "popups"
PAGE_MAIN = 2
PAGE_DETAILS = 0
PAGE_CHECKSUMS = 1
@ -82,6 +85,9 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.setWindowTitle("OpenSnitch v%s" % version)
self._actions = Actions.instance()
obj, self._action = self._actions.load("/home/ga/.config/opensnitch/actions/popups.json")
self._lock = threading.Lock()
self._con = None
self._rule = None
@ -237,6 +243,7 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
# get rule from the db
records = self._rules.get_by_name(self._peer, curRule)
if records == None or records.first() == False:
print("popup.update_rule: None, rule:", curRule)
self.labelChecksumStatus.setStyleSheet('color: red')
self.labelChecksumStatus.setText("" + QC.translate("popups", "Rule not updated, not found by name"))
return
@ -273,12 +280,6 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.stackedWidget.setCurrentIndex(self.PAGE_MAIN)
self._stop_countdown()
def _set_elide_text(self, widget, text, max_size=132):
if len(text) > max_size:
text = text[:max_size] + "..."
widget.setText(text)
def promptUser(self, connection, is_local, peer):
# one at a time
with self._lock:
@ -338,7 +339,9 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
if self._tick > 0:
self.show()
# render details after displaying the pop-up.
self._render_details(self._peer, self.connDetails, self._con)
self._display_checksums_warning(self._peer, self._con)
utils.render_details(self._peer, self.connDetails, self._con)
@QtCore.pyqtSlot()
def on_tick_triggered(self):
@ -349,6 +352,22 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self._timeout_triggered = True
self._send_rule()
def _pre_popup_plugins(self, con):
pass
def _post_popup_plugins(self, con):
"""Actions performed on the pop-up once the connection details have
been displayed on the screen.
"""
for conf in self._action['actions']:
print(conf)
action = self._action['actions'][conf]
if type(action) == dict:
continue
if self.TYPE not in action.TYPE:
continue
action.run(self, (con,))
def _hide_widget(self, widget, hide):
widget.setVisible(not hide)
@ -370,46 +389,6 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.actionButton.setText("{0} ({1})".format(self._action_text[action_idx], self._tick))
self.actionButton.setIcon(self._action_icon[action_idx])
def _set_app_description(self, description):
if description != None and description != "":
self.appDescriptionLabel.setVisible(True)
self.appDescriptionLabel.setFixedHeight(50)
self.appDescriptionLabel.setToolTip(description)
self._set_elide_text(self.appDescriptionLabel, "%s" % description)
else:
self.appDescriptionLabel.setVisible(False)
self.appDescriptionLabel.setFixedHeight(0)
self.appDescriptionLabel.setText("")
def _set_app_path(self, app_name, app_args, con):
# show the binary path if it's not part of the cmdline args:
# cmdline: telnet 1.1.1.1 (path: /usr/bin/telnet.netkit)
# cmdline: /usr/bin/telnet.netkit 1.1.1.1 (the binary path is part of the cmdline args, no need to display it)
if con.process_path != "" and len(con.process_args) >= 1 and con.process_path not in con.process_args:
self.appPathLabel.setToolTip("Process path: %s" % con.process_path)
if app_name.lower() == app_args:
self._set_elide_text(self.appPathLabel, "%s" % con.process_path)
else:
self._set_elide_text(self.appPathLabel, "(%s)" % con.process_path)
self.appPathLabel.setVisible(True)
elif con.process_path != "" and len(con.process_args) == 0:
self._set_elide_text(self.appPathLabel, "%s" % con.process_path)
self.appPathLabel.setVisible(True)
else:
self.appPathLabel.setVisible(False)
self.appPathLabel.setText("")
def _set_app_args(self, app_name, app_args):
# if the app name and the args are the same, there's no need to display
# the args label (amule for example)
if app_name.lower() != app_args:
self.argsLabel.setVisible(True)
self._set_elide_text(self.argsLabel, app_args)
self.argsLabel.setToolTip(app_args)
else:
self.argsLabel.setVisible(False)
self.argsLabel.setText("")
def _verify_checksums(self, con, rule):
"""return true if the checksum of a rule matches the one of the process
opening a connection.
@ -460,66 +439,22 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5],
expected
)
else:
print("no checksums results")
return ""
def _render_details(self, peer, detailsWidget, con):
tree = ""
space = " "
spaces = " "
indicator = ""
self._display_checksums_warning(peer, con)
try:
# reverse() doesn't exist on old protobuf libs.
con.process_tree.reverse()
except:
pass
for path in con.process_tree:
tree = "{0}<p>│{1}\t{2}{3}{4}</p>".format(tree, path.value, spaces, indicator, path.key)
spaces += "&nbsp;" * 4
indicator = "\\_ "
# XXX: table element doesn't work?
details = """<b>{0}</b> {1}:{2} -> {3}:{4}
<br><br>
<b>Path:</b>{5}{6}<br>
<b>Cmdline:</b>&nbsp;{7}<br>
<b>CWD:</b>{8}{9}<br>
<b>MD5:</b>{10}{11}<br>
<b>UID:</b>{12}{13}<br>
<b>PID:</b>{14}{15}<br>
<br>
<b>Process tree:</b><br>
{16}
<br>
<p><b>Environment variables:<b></p>
{17}
""".format(
con.protocol.upper(),
con.src_port, con.src_ip, con.dst_ip, con.dst_port,
space * 6, con.process_path,
" ".join(con.process_args),
space * 6, con.process_cwd,
space * 7, con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5],
space * 9, con.user_id,
space * 9, con.process_id,
tree,
"".join('<p>{}={}</p>'.format(key, value) for key, value in con.process_env.items())
)
detailsWidget.document().clear()
detailsWidget.document().setHtml(details)
detailsWidget.moveCursor(QtGui.QTextCursor.Start)
def _render_connection(self, con):
app_name, app_icon, description, _ = self._apps_parser.get_info_by_path(con.process_path, "terminal")
app_args = " ".join(con.process_args)
self._set_app_description(description)
self._set_app_path(app_name, app_args, con)
self._set_app_args(app_name, app_args)
utils.set_app_description(self.appDescriptionLabel, description)
utils.set_app_path(self.appPathLabel, app_name, app_args, con)
utils.set_app_args(self.argsLabel, app_name, app_args)
print("bin path:", con.process_path, "args:", app_args, "args raw:", self._con.process_args)
print("bin hash:", con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5]) #, con.HasField("process_checksums"))
#print("bin tree:", " > ".join(con.process_tree))
print("bin tree:", con.process_tree)
self.checksumLabel.setText(con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5])
self.checkSum.setChecked(False)
@ -528,18 +463,21 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.argsLabel.setVisible(False)
self.argsLabel.setText("")
app_name = QC.translate("popups", "Unknown process %s" % con.process_path)
#with self._lock:
self.appNameLabel.setText(QC.translate("popups", "Outgoing connection"))
else:
self._set_elide_text(self.appNameLabel, "%s" % app_name, max_size=42)
utils.set_elide_text(self.appNameLabel, "%s" % app_name, max_size=42)
self.appNameLabel.setToolTip(app_name)
#if len(self._con.process_args) == 0 or self._con.process_args[0] == "":
self.cwdLabel.setToolTip("%s %s" % (QC.translate("popups", "Process launched from:"), con.process_cwd))
self._set_elide_text(self.cwdLabel, con.process_cwd, max_size=32)
utils.set_elide_text(self.cwdLabel, con.process_cwd, max_size=32)
pixmap = Icons.get_by_appname(app_icon)
self.iconLabel.setPixmap(pixmap)
message = self._get_popup_message(app_name, con)
message = utils.get_popup_message(self._local, self._peer, app_name, con)
self.messageLabel.setText(message)
self.messageLabel.setToolTip(message)
@ -573,7 +511,6 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self._add_fixed_options_to_combo(self.whatCombo, con, uid)
if con.process_path.startswith(self.APPIMAGE_PREFIX):
self._add_appimage_pattern_to_combo(self.whatCombo, con)
self._add_dst_networks_to_combo(self.whatCombo, con.dst_ip)
if con.dst_host != "" and con.dst_host != con.dst_ip:
@ -583,7 +520,6 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self._add_dst_networks_to_combo(self.whatIPCombo, con.dst_ip)
self._default_action = self._cfg.getInt(self._cfg.DEFAULT_ACTION_KEY)
self._configure_default_duration()
if int(con.process_id) > 0:
@ -591,7 +527,6 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
else:
self.whatCombo.setCurrentIndex(2)
self.checkDstIP.setChecked(self._cfg.getBool(self._cfg.DEFAULT_POPUP_ADVANCED_DSTIP))
self.checkDstPort.setChecked(self._cfg.getBool(self._cfg.DEFAULT_POPUP_ADVANCED_DSTPORT))
self.checkUserID.setChecked(self._cfg.getBool(self._cfg.DEFAULT_POPUP_ADVANCED_UID))
@ -604,6 +539,7 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.setFixedSize(self.size())
self._post_popup_plugins(con)
# https://gis.stackexchange.com/questions/86398/how-to-disable-the-escape-key-for-a-dialog
def keyPressEvent(self, event):
@ -648,7 +584,9 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
"""
appimage_bin = os.path.basename(con.process_path)
appimage_path = os.path.dirname(con.process_path)
print("APP1:", appimage_path)
appimage_path = appimage_path[0:len(self.APPIMAGE_PREFIX)+6]
print("APP2:", appimage_path)
combo.addItem(
QC.translate("popups", "from {0}*/{1}").format(appimage_path, appimage_bin),
self.FIELD_APPIMAGE
@ -673,38 +611,6 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.whatCombo.addItem(QC.translate("popups", "to *.{0}").format('.'.join(parts[i:])), self.FIELD_REGEX_HOST)
self.whatIPCombo.addItem(QC.translate("popups", "to *.{0}").format('.'.join(parts[i:])), self.FIELD_REGEX_HOST)
def _get_popup_message(self, app_name, con):
"""
_get_popup_message helps constructing the message that is displayed on
the pop-up dialog. Example:
curl is connecting to www.opensnitch.io on TCP port 443
"""
message = "<b>%s</b>" % app_name
if not self._local:
message = QC.translate("popups", "<b>Remote</b> process %s running on <b>%s</b>") % ( \
message,
self._peer.split(':')[1])
msg_action = QC.translate("popups", "is connecting to <b>%s</b> on %s port %d") % ( \
con.dst_host or con.dst_ip,
con.protocol.upper(),
con.dst_port )
# icmp port is 0 (i.e.: no port)
if con.dst_port == 0:
msg_action = QC.translate("popups", "is connecting to <b>%s</b>, %s") % ( \
con.dst_host or con.dst_ip,
con.protocol.upper() )
if con.dst_port == 53 and con.dst_ip != con.dst_host and con.dst_host != "":
msg_action = QC.translate("popups", "is attempting to resolve <b>%s</b> via %s, %s port %d") % ( \
con.dst_host,
con.dst_ip,
con.protocol.upper(),
con.dst_port)
return "%s %s" % (message, msg_action)
def _get_duration(self, duration_idx):
if duration_idx == 0:
return Config.DURATION_ONCE
@ -771,7 +677,9 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
elif combo.itemData(what_idx) == self.FIELD_APPIMAGE:
appimage_bin = os.path.basename(con.process_path)
appimage_path = os.path.dirname(con.process_path).replace(".", "\.")
print("APP3:", appimage_path)
appimage_path = appimage_path[0:len(self.APPIMAGE_PREFIX)+7]
print("APP4:", appimage_path)
return Config.RULE_TYPE_REGEXP, Config.OPERAND_PROCESS_PATH, r'^{0}[0-9A-Za-z]{{6}}\/.*{1}$'.format(appimage_path, appimage_bin)
def _on_action_clicked(self, action):
@ -790,16 +698,6 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.checkDstIP.isChecked() or \
self.checkSum.isChecked()
def _get_rule_name(self, rule):
rule_temp_name = slugify("%s %s" % (rule.action, rule.duration))
if self._is_list_rule():
rule_temp_name = "%s-list" % rule_temp_name
else:
rule_temp_name = "%s-simple" % rule_temp_name
rule_temp_name = slugify("%s %s" % (rule_temp_name, rule.operator.data))
return rule_temp_name[:128]
def _send_rule(self):
try:
self._cfg.setSettings("promptDialog/geometry", self.saveGeometry())
@ -821,7 +719,7 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self._rule = None
return
rule_temp_name = self._get_rule_name(self._rule)
rule_temp_name = utils.get_rule_name(self._rule, self._is_list_rule())
self._rule.name = rule_temp_name
# TODO: move to a method
@ -843,6 +741,7 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
_type, _operand, _data = Config.RULE_TYPE_SIMPLE, Config.OPERAND_PROCESS_HASH_MD5, self.checksumLabel.text()
data.append({"type": _type, "operand": _operand, "data": _data})
rule_temp_name = slugify("%s %s" % (rule_temp_name, _operand))
print(data)
is_list_rule = self._is_list_rule()
@ -855,6 +754,7 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
if os.path.isabs(proc_args[0]) == False:
is_list_rule = True
data.append({"type": Config.RULE_TYPE_SIMPLE, "operand": Config.OPERAND_PROCESS_PATH, "data": str(self._con.process_path)})
print(self._con.process_path, "-", self._con.process_args, "-", proc_args[0])
if is_list_rule:
data.append({
@ -886,6 +786,8 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.checkAdvanced.toggle()
self._ischeckAdvanceded = False
print(self._rule)
except Exception as e:
print("[pop-up] exception creating a rule:", e)
finally:

View file

@ -0,0 +1,142 @@
from slugify import slugify
from PyQt5 import QtGui
from PyQt5.QtCore import QCoreApplication as QC
from opensnitch.config import Config
def set_elide_text(widget, text, max_size=132):
if len(text) > max_size:
text = text[:max_size] + "..."
widget.setText(text)
def get_rule_name(rule, is_list):
rule_temp_name = slugify("%s %s" % (rule.action, rule.duration))
if is_list:
rule_temp_name = "%s-list" % rule_temp_name
else:
rule_temp_name = "%s-simple" % rule_temp_name
rule_temp_name = slugify("%s %s" % (rule_temp_name, rule.operator.data))
return rule_temp_name[:128]
def get_popup_message(is_local, node, app_name, con):
"""
_get_popup_message helps constructing the message that is displayed on
the pop-up dialog. Example:
curl is connecting to www.opensnitch.io on TCP port 443
"""
message = "<b>%s</b>" % app_name
if not is_local:
message = QC.translate("popups", "<b>Remote</b> process %s running on <b>%s</b>") % ( \
message,
node.split(':')[1])
msg_action = QC.translate("popups", "is connecting to <b>%s</b> on %s port %d") % ( \
con.dst_host or con.dst_ip,
con.protocol.upper(),
con.dst_port )
# icmp port is 0 (i.e.: no port)
if con.dst_port == 0:
msg_action = QC.translate("popups", "is connecting to <b>%s</b>, %s") % ( \
con.dst_host or con.dst_ip,
con.protocol.upper() )
if con.dst_port == 53 and con.dst_ip != con.dst_host and con.dst_host != "":
msg_action = QC.translate("popups", "is attempting to resolve <b>%s</b> via %s, %s port %d") % ( \
con.dst_host,
con.dst_ip,
con.protocol.upper(),
con.dst_port)
return "%s %s" % (message, msg_action)
def set_app_path(appPathLabel, app_name, app_args, con):
# show the binary path if it's not part of the cmdline args:
# cmdline: telnet 1.1.1.1 (path: /usr/bin/telnet.netkit)
# cmdline: /usr/bin/telnet.netkit 1.1.1.1 (the binary path is part of the cmdline args, no need to display it)
if con.process_path != "" and len(con.process_args) >= 1 and con.process_path not in con.process_args:
appPathLabel.setToolTip("Process path: %s" % con.process_path)
if app_name.lower() == app_args:
set_elide_text(appPathLabel, "%s" % con.process_path)
else:
set_elide_text(appPathLabel, "(%s)" % con.process_path)
appPathLabel.setVisible(True)
elif con.process_path != "" and len(con.process_args) == 0:
set_elide_text(appPathLabel, "%s" % con.process_path)
appPathLabel.setVisible(True)
else:
appPathLabel.setVisible(False)
appPathLabel.setText("")
def set_app_args(argsLabel, app_name, app_args):
# if the app name and the args are the same, there's no need to display
# the args label (amule for example)
if app_name.lower() != app_args:
argsLabel.setVisible(True)
set_elide_text(argsLabel, app_args)
argsLabel.setToolTip(app_args)
else:
argsLabel.setVisible(False)
argsLabel.setText("")
def set_app_description(appDescriptionLabel, description):
if description != None and description != "":
appDescriptionLabel.setVisible(True)
appDescriptionLabel.setFixedHeight(50)
appDescriptionLabel.setToolTip(description)
set_elide_text(appDescriptionLabel, "%s" % description)
else:
appDescriptionLabel.setVisible(False)
appDescriptionLabel.setFixedHeight(0)
appDescriptionLabel.setText("")
def render_details(node, detailsWidget, con):
tree = ""
space = "&nbsp;"
spaces = "&nbsp;"
indicator = ""
try:
# reverse() doesn't exist on old protobuf libs.
con.process_tree.reverse()
except:
pass
for path in con.process_tree:
tree = "{0}<p>│{1}\t{2}{3}{4}</p>".format(tree, path.value, spaces, indicator, path.key)
spaces += "&nbsp;" * 4
indicator = "\\_ "
# XXX: table element doesn't work?
details = """<b>{0}</b> {1}:{2} -> {3}:{4}
<br><br>
<b>Path:</b>{5}{6}<br>
<b>Cmdline:</b>&nbsp;{7}<br>
<b>CWD:</b>{8}{9}<br>
<b>MD5:</b>{10}{11}<br>
<b>UID:</b>{12}{13}<br>
<b>PID:</b>{14}{15}<br>
<br>
<b>Process tree:</b><br>
{16}
<br>
<p><b>Environment variables:<b></p>
{17}
""".format(
con.protocol.upper(),
con.src_port, con.src_ip, con.dst_ip, con.dst_port,
space * 6, con.process_path,
" ".join(con.process_args),
space * 6, con.process_cwd,
space * 7, con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5],
space * 9, con.user_id,
space * 9, con.process_id,
tree,
"".join('<p>{}={}</p>'.format(key, value) for key, value in con.process_env.items())
)
detailsWidget.document().clear()
detailsWidget.document().setHtml(details)
detailsWidget.moveCursor(QtGui.QTextCursor.Start)