ui, popups: refactoring 2

This commit is contained in:
Gustavo Iñiguez Goia 2023-12-01 23:58:00 +01:00
parent c297b0b539
commit d3ba9d65ce
Failed to generate hash of commit
4 changed files with 160 additions and 184 deletions

View file

@ -8,7 +8,7 @@ import json
import ipaddress
from datetime import datetime
from PyQt5 import QtCore, QtGui, uic, QtWidgets
from PyQt5 import QtCore, uic, QtWidgets
from PyQt5.QtCore import QCoreApplication as QC, QEvent
from slugify import slugify
@ -17,12 +17,11 @@ from opensnitch.utils import Icons
from opensnitch.desktop_parser import LinuxDesktopParser
from opensnitch.config import Config
from opensnitch.version import version
from opensnitch.actions import Actions
from opensnitch.rules import Rules, Rule
from opensnitch.nodes import Nodes
from opensnitch import ui_pb2
from opensnitch.dialogs.prompt import utils
from opensnitch.dialogs.prompt import utils, constants, checksums
DIALOG_UI_PATH = "%s/../../res/prompt.ui" % os.path.dirname(sys.modules[__name__].__file__)
class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
@ -30,41 +29,6 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
_tick_trigger = QtCore.pyqtSignal()
_timeout_trigger = QtCore.pyqtSignal()
TYPE = "popups"
PAGE_MAIN = 2
PAGE_DETAILS = 0
PAGE_CHECKSUMS = 1
DEFAULT_TIMEOUT = 15
# don't translate
FIELD_REGEX_HOST = "regex_host"
FIELD_REGEX_IP = "regex_ip"
FIELD_PROC_PATH = "process_path"
FIELD_PROC_ARGS = "process_args"
FIELD_PROC_ID = "process_id"
FIELD_USER_ID = "user_id"
FIELD_DST_IP = "dst_ip"
FIELD_DST_PORT = "dst_port"
FIELD_DST_NETWORK = "dst_network"
FIELD_DST_HOST = "simple_host"
FIELD_APPIMAGE = "appimage_path"
DURATION_30s = "30s"
DURATION_5m = "5m"
DURATION_15m = "15m"
DURATION_30m = "30m"
DURATION_1h = "1h"
# don't translate
APPIMAGE_PREFIX = "/tmp/.mount_"
# label displayed in the pop-up combo
DURATION_session = QC.translate("popups", "until reboot")
# label displayed in the pop-up combo
DURATION_forever = QC.translate("popups", "forever")
def __init__(self, parent=None, appicon=None):
QtWidgets.QDialog.__init__(self, parent, QtCore.Qt.WindowStaysOnTopHint)
# Other interesting flags: QtCore.Qt.Tool | QtCore.Qt.BypassWindowManagerHint
@ -85,9 +49,6 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.setWindowTitle("OpenSnitch v%s" % version)
self._actions = Actions.instance()
obj, self._action = self._actions.load("/home/ga/.config/opensnitch/actions/popups.json")
self._lock = threading.Lock()
self._con = None
self._rule = None
@ -96,7 +57,7 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self._prompt_trigger.connect(self.on_connection_prompt_triggered)
self._timeout_trigger.connect(self.on_timeout_triggered)
self._tick_trigger.connect(self.on_tick_triggered)
self._tick = int(self._cfg.getSettings(self._cfg.DEFAULT_TIMEOUT_KEY)) if self._cfg.hasKey(self._cfg.DEFAULT_TIMEOUT_KEY) else self.DEFAULT_TIMEOUT
self._tick = int(self._cfg.getSettings(self._cfg.DEFAULT_TIMEOUT_KEY)) if self._cfg.hasKey(self._cfg.DEFAULT_TIMEOUT_KEY) else constants.DEFAULT_TIMEOUT
self._tick_thread = None
self._done = threading.Event()
self._timeout_text = ""
@ -217,7 +178,7 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.checkSum.setVisible(self._con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5] != "" and state)
self.checksumLabel_2.setVisible(self._con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5] != "" and state)
self.checksumLabel.setVisible(self._con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5] != "" and state)
self.stackedWidget.setCurrentIndex(self.PAGE_MAIN)
self.stackedWidget.setCurrentIndex(constants.PAGE_MAIN)
self._ischeckAdvanceded = state
self.adjust_size()
@ -228,10 +189,10 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
def _cb_warninglbl_clicked(self):
self._stop_countdown()
self.stackedWidget.setCurrentIndex(self.PAGE_CHECKSUMS)
self.stackedWidget.setCurrentIndex(constants.PAGE_CHECKSUMS)
def _cb_cmdinfo_clicked(self):
self.stackedWidget.setCurrentIndex(self.PAGE_DETAILS)
self.stackedWidget.setCurrentIndex(constants.PAGE_DETAILS)
self._stop_countdown()
def _cb_update_rule_clicked(self):
@ -240,28 +201,10 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
if curRule == "":
return
# get rule from the db
records = self._rules.get_by_name(self._peer, curRule)
if records == None or records.first() == False:
print("popup.update_rule: None, rule:", curRule)
rule, error = checksums.update_rule(self._peer, self._rules, curRule, self._con)
if rule == None:
self.labelChecksumStatus.setStyleSheet('color: red')
self.labelChecksumStatus.setText("" + QC.translate("popups", "Rule not updated, not found by name"))
return
# transform it to proto rule
rule_obj = Rule.new_from_records(records)
if rule_obj.operator.type != Config.RULE_TYPE_LIST:
if rule_obj.operator.operand == Config.OPERAND_PROCESS_HASH_MD5:
rule_obj.operator.data = self._con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5]
else:
for op in rule_obj.operator.list:
if op.operand == Config.OPERAND_PROCESS_HASH_MD5:
op.data = self._con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5]
break
# add it back again to the db
added = self._rules.add_rules(self._peer, [rule_obj])
if not added:
self.labelChecksumStatus.setStyleSheet('color: red')
self.labelChecksumStatus.setText("" + QC.translate("popups", "Rule not updated."))
self.labelChecksumStatus.setText("" + error)
return
self._nodes.send_notification(
@ -270,14 +213,14 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
id=int(str(time.time()).replace(".", "")),
type=ui_pb2.CHANGE_RULE,
data="",
rules=[rule_obj]
rules=[rule]
)
)
self.labelChecksumStatus.setStyleSheet('color: green')
self.labelChecksumStatus.setText("" + QC.translate("popups", "Rule updated."))
def _cb_cmdback_clicked(self):
self.stackedWidget.setCurrentIndex(self.PAGE_MAIN)
self.stackedWidget.setCurrentIndex(constants.PAGE_MAIN)
self._stop_countdown()
def promptUser(self, connection, is_local, peer):
@ -287,7 +230,7 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
if self._tick_thread != None and self._tick_thread.is_alive():
self._tick_thread.join()
self._cfg.reload()
self._tick = int(self._cfg.getSettings(self._cfg.DEFAULT_TIMEOUT_KEY)) if self._cfg.hasKey(self._cfg.DEFAULT_TIMEOUT_KEY) else self.DEFAULT_TIMEOUT
self._tick = int(self._cfg.getSettings(self._cfg.DEFAULT_TIMEOUT_KEY)) if self._cfg.hasKey(self._cfg.DEFAULT_TIMEOUT_KEY) else constants.DEFAULT_TIMEOUT
self._tick_thread = threading.Thread(target=self._timeout_worker)
self._tick_thread.stop = self._ischeckAdvanceded
self._timeout_triggered = False
@ -334,7 +277,7 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
@QtCore.pyqtSlot()
def on_connection_prompt_triggered(self):
self.stackedWidget.setCurrentIndex(self.PAGE_MAIN)
self.stackedWidget.setCurrentIndex(constants.PAGE_MAIN)
self._render_connection(self._con)
if self._tick > 0:
self.show()
@ -352,32 +295,9 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self._timeout_triggered = True
self._send_rule()
def _pre_popup_plugins(self, con):
pass
def _post_popup_plugins(self, con):
"""Actions performed on the pop-up once the connection details have
been displayed on the screen.
"""
for conf in self._action['actions']:
print(conf)
action = self._action['actions'][conf]
if type(action) == dict:
continue
if self.TYPE not in action.TYPE:
continue
action.run(self, (con,))
def _hide_widget(self, widget, hide):
widget.setVisible(not hide)
def _configure_default_duration(self):
if self._cfg.hasKey(self._cfg.DEFAULT_DURATION_KEY):
cur_idx = self._cfg.getInt(self._cfg.DEFAULT_DURATION_KEY)
self.durationCombo.setCurrentIndex(cur_idx)
else:
self.durationCombo.setCurrentIndex(self._cfg.DEFAULT_DURATION_IDX)
def _set_cmd_action_text(self):
action_idx = self._cfg.getInt(self._cfg.DEFAULT_ACTION_KEY)
if action_idx == Config.ACTION_ALLOW_IDX:
@ -389,23 +309,6 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.actionButton.setText("{0} ({1})".format(self._action_text[action_idx], self._tick))
self.actionButton.setIcon(self._action_icon[action_idx])
def _verify_checksums(self, con, rule):
"""return true if the checksum of a rule matches the one of the process
opening a connection.
"""
if rule.operator.type != Config.RULE_TYPE_LIST:
return True, ""
if con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5] == "":
return True, ""
for ro in rule.operator.list:
if ro.type == Config.RULE_TYPE_SIMPLE and ro.operand == Config.OPERAND_PROCESS_HASH_MD5:
if ro.data != con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5]:
return False, ro.data
return True, ""
def _display_checksums_warning(self, peer, con):
self.messageLabel.setStyleSheet('')
self.labelChecksumStatus.setText('')
@ -414,7 +317,7 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
if records != None and records.first():
rule = Rule.new_from_records(records)
validates, expected = self._verify_checksums(con, rule)
validates, expected = checksums.verify(con, rule)
if not validates:
self.messageLabel.setStyleSheet('color: red')
self.messageLabel.setText(
@ -439,8 +342,6 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5],
expected
)
else:
print("no checksums results")
return ""
@ -451,10 +352,6 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
utils.set_app_path(self.appPathLabel, app_name, app_args, con)
utils.set_app_args(self.argsLabel, app_name, app_args)
print("bin path:", con.process_path, "args:", app_args, "args raw:", self._con.process_args)
print("bin hash:", con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5]) #, con.HasField("process_checksums"))
#print("bin tree:", " > ".join(con.process_tree))
print("bin tree:", con.process_tree)
self.checksumLabel.setText(con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5])
self.checkSum.setChecked(False)
@ -509,7 +406,7 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.whatIPCombo.clear()
self._add_fixed_options_to_combo(self.whatCombo, con, uid)
if con.process_path.startswith(self.APPIMAGE_PREFIX):
if con.process_path.startswith(constants.APPIMAGE_PREFIX):
self._add_appimage_pattern_to_combo(self.whatCombo, con)
self._add_dst_networks_to_combo(self.whatCombo, con.dst_ip)
@ -520,7 +417,7 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self._add_dst_networks_to_combo(self.whatIPCombo, con.dst_ip)
self._default_action = self._cfg.getInt(self._cfg.DEFAULT_ACTION_KEY)
self._configure_default_duration()
utils.set_default_duration(self._cfg, self.durationCombo)
if int(con.process_id) > 0:
self.whatCombo.setCurrentIndex(int(self._cfg.getSettings(self._cfg.DEFAULT_TARGET_KEY)))
@ -539,8 +436,6 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.setFixedSize(self.size())
self._post_popup_plugins(con)
# https://gis.stackexchange.com/questions/86398/how-to-disable-the-escape-key-for-a-dialog
def keyPressEvent(self, event):
if not event.key() == QtCore.Qt.Key_Escape:
@ -555,113 +450,93 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
def _add_fixed_options_to_combo(self, combo, con, uid):
# the order of these combobox entries must match those in the preferences dialog
# prefs -> UI -> Default target
combo.addItem(QC.translate("popups", "from this executable"), self.FIELD_PROC_PATH)
combo.addItem(QC.translate("popups", "from this executable"), constants.FIELD_PROC_PATH)
if int(con.process_id) < 0:
combo.model().item(0).setEnabled(False)
combo.addItem(QC.translate("popups", "from this command line"), self.FIELD_PROC_ARGS)
combo.addItem(QC.translate("popups", "from this command line"), constants.FIELD_PROC_ARGS)
combo.addItem(QC.translate("popups", "to port {0}").format(con.dst_port), self.FIELD_DST_PORT)
combo.addItem(QC.translate("popups", "to {0}").format(con.dst_ip), self.FIELD_DST_IP)
combo.addItem(QC.translate("popups", "to port {0}").format(con.dst_port), constants.FIELD_DST_PORT)
combo.addItem(QC.translate("popups", "to {0}").format(con.dst_ip), constants.FIELD_DST_IP)
combo.addItem(QC.translate("popups", "from user {0}").format(uid), self.FIELD_USER_ID)
combo.addItem(QC.translate("popups", "from user {0}").format(uid), constants.FIELD_USER_ID)
if int(con.user_id) < 0:
combo.model().item(4).setEnabled(False)
combo.addItem(QC.translate("popups", "from this PID"), self.FIELD_PROC_ID)
combo.addItem(QC.translate("popups", "from this PID"), constants.FIELD_PROC_ID)
def _add_ip_regexp_to_combo(self, combo, IPcombo, con):
IPcombo.addItem(QC.translate("popups", "to {0}").format(con.dst_ip), self.FIELD_DST_IP)
IPcombo.addItem(QC.translate("popups", "to {0}").format(con.dst_ip), constants.FIELD_DST_IP)
parts = con.dst_ip.split('.')
nparts = len(parts)
for i in range(1, nparts):
combo.addItem(QC.translate("popups", "to {0}.*").format('.'.join(parts[:i])), self.FIELD_REGEX_IP)
IPcombo.addItem(QC.translate("popups", "to {0}.*").format( '.'.join(parts[:i])), self.FIELD_REGEX_IP)
combo.addItem(QC.translate("popups", "to {0}.*").format('.'.join(parts[:i])), constants.FIELD_REGEX_IP)
IPcombo.addItem(QC.translate("popups", "to {0}.*").format( '.'.join(parts[:i])), constants.FIELD_REGEX_IP)
def _add_appimage_pattern_to_combo(self, combo, con):
"""appimages' absolute path usually starts with /tmp/.mount_<
"""
appimage_bin = os.path.basename(con.process_path)
appimage_path = os.path.dirname(con.process_path)
print("APP1:", appimage_path)
appimage_path = appimage_path[0:len(self.APPIMAGE_PREFIX)+6]
print("APP2:", appimage_path)
appimage_path = appimage_path[0:len(constants.APPIMAGE_PREFIX)+6]
combo.addItem(
QC.translate("popups", "from {0}*/{1}").format(appimage_path, appimage_bin),
self.FIELD_APPIMAGE
constants.FIELD_APPIMAGE
)
def _add_dst_networks_to_combo(self, combo, dst_ip):
if type(ipaddress.ip_address(dst_ip)) == ipaddress.IPv4Address:
combo.addItem(QC.translate("popups", "to {0}").format(ipaddress.ip_network(dst_ip + "/24", strict=False)), self.FIELD_DST_NETWORK)
combo.addItem(QC.translate("popups", "to {0}").format(ipaddress.ip_network(dst_ip + "/16", strict=False)), self.FIELD_DST_NETWORK)
combo.addItem(QC.translate("popups", "to {0}").format(ipaddress.ip_network(dst_ip + "/8", strict=False)), self.FIELD_DST_NETWORK)
combo.addItem(QC.translate("popups", "to {0}").format(ipaddress.ip_network(dst_ip + "/24", strict=False)), constants.FIELD_DST_NETWORK)
combo.addItem(QC.translate("popups", "to {0}").format(ipaddress.ip_network(dst_ip + "/16", strict=False)), constants.FIELD_DST_NETWORK)
combo.addItem(QC.translate("popups", "to {0}").format(ipaddress.ip_network(dst_ip + "/8", strict=False)), constants.FIELD_DST_NETWORK)
else:
combo.addItem(QC.translate("popups", "to {0}").format(ipaddress.ip_network(dst_ip + "/64", strict=False)), self.FIELD_DST_NETWORK)
combo.addItem(QC.translate("popups", "to {0}").format(ipaddress.ip_network(dst_ip + "/128", strict=False)), self.FIELD_DST_NETWORK)
combo.addItem(QC.translate("popups", "to {0}").format(ipaddress.ip_network(dst_ip + "/64", strict=False)), constants.FIELD_DST_NETWORK)
combo.addItem(QC.translate("popups", "to {0}").format(ipaddress.ip_network(dst_ip + "/128", strict=False)), constants.FIELD_DST_NETWORK)
def _add_dsthost_to_combo(self, dst_host):
self.whatCombo.addItem("%s" % dst_host, self.FIELD_DST_HOST)
self.whatIPCombo.addItem("%s" % dst_host, self.FIELD_DST_HOST)
self.whatCombo.addItem("%s" % dst_host, constants.FIELD_DST_HOST)
self.whatIPCombo.addItem("%s" % dst_host, constants.FIELD_DST_HOST)
parts = dst_host.split('.')[1:]
nparts = len(parts)
for i in range(0, nparts - 1):
self.whatCombo.addItem(QC.translate("popups", "to *.{0}").format('.'.join(parts[i:])), self.FIELD_REGEX_HOST)
self.whatIPCombo.addItem(QC.translate("popups", "to *.{0}").format('.'.join(parts[i:])), self.FIELD_REGEX_HOST)
def _get_duration(self, duration_idx):
if duration_idx == 0:
return Config.DURATION_ONCE
elif duration_idx == 1:
return self.DURATION_30s
elif duration_idx == 2:
return self.DURATION_5m
elif duration_idx == 3:
return self.DURATION_15m
elif duration_idx == 4:
return self.DURATION_30m
elif duration_idx == 5:
return self.DURATION_1h
elif duration_idx == 6:
return Config.DURATION_UNTIL_RESTART
else:
return Config.DURATION_ALWAYS
self.whatCombo.addItem(QC.translate("popups", "to *.{0}").format('.'.join(parts[i:])), constants.FIELD_REGEX_HOST)
self.whatIPCombo.addItem(QC.translate("popups", "to *.{0}").format('.'.join(parts[i:])), constants.FIELD_REGEX_HOST)
def _get_combo_operator(self, combo, what_idx, con):
if combo.itemData(what_idx) == self.FIELD_PROC_PATH:
if combo.itemData(what_idx) == constants.FIELD_PROC_PATH:
return Config.RULE_TYPE_SIMPLE, Config.OPERAND_PROCESS_PATH, con.process_path
elif combo.itemData(what_idx) == self.FIELD_PROC_ARGS:
elif combo.itemData(what_idx) == constants.FIELD_PROC_ARGS:
# this should not happen
if len(con.process_args) == 0 or con.process_args[0] == "":
return Config.RULE_TYPE_SIMPLE, Config.OPERAND_PROCESS_PATH, con.process_path
return Config.RULE_TYPE_SIMPLE, Config.OPERAND_PROCESS_COMMAND, ' '.join(con.process_args)
elif combo.itemData(what_idx) == self.FIELD_PROC_ID:
elif combo.itemData(what_idx) == constants.FIELD_PROC_ID:
return Config.RULE_TYPE_SIMPLE, Config.OPERAND_PROCESS_ID, "{0}".format(con.process_id)
elif combo.itemData(what_idx) == self.FIELD_USER_ID:
elif combo.itemData(what_idx) == constants.FIELD_USER_ID:
return Config.RULE_TYPE_SIMPLE, Config.OPERAND_USER_ID, "%s" % con.user_id
elif combo.itemData(what_idx) == self.FIELD_DST_PORT:
elif combo.itemData(what_idx) == constants.FIELD_DST_PORT:
return Config.RULE_TYPE_SIMPLE, Config.OPERAND_DEST_PORT, "%s" % con.dst_port
elif combo.itemData(what_idx) == self.FIELD_DST_IP:
elif combo.itemData(what_idx) == constants.FIELD_DST_IP:
return Config.RULE_TYPE_SIMPLE, Config.OPERAND_DEST_IP, con.dst_ip
elif combo.itemData(what_idx) == self.FIELD_DST_HOST:
elif combo.itemData(what_idx) == constants.FIELD_DST_HOST:
return Config.RULE_TYPE_SIMPLE, Config.OPERAND_DEST_HOST, combo.currentText()
elif combo.itemData(what_idx) == self.FIELD_DST_NETWORK:
elif combo.itemData(what_idx) == constants.FIELD_DST_NETWORK:
# strip "to ": "to x.x.x/20" -> "x.x.x/20"
# we assume that to is one word in all languages
parts = combo.currentText().split(' ')
text = parts[len(parts)-1]
return Config.RULE_TYPE_NETWORK, Config.OPERAND_DEST_NETWORK, text
elif combo.itemData(what_idx) == self.FIELD_REGEX_HOST:
elif combo.itemData(what_idx) == constants.FIELD_REGEX_HOST:
parts = combo.currentText().split(' ')
text = parts[len(parts)-1]
# ^(|.*\.)yahoo\.com
@ -669,17 +544,15 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
dsthost = r'^(|.*\.)%s' % dsthost[2:]
return Config.RULE_TYPE_REGEXP, Config.OPERAND_DEST_HOST, dsthost
elif combo.itemData(what_idx) == self.FIELD_REGEX_IP:
elif combo.itemData(what_idx) == constants.FIELD_REGEX_IP:
parts = combo.currentText().split(' ')
text = parts[len(parts)-1]
return Config.RULE_TYPE_REGEXP, Config.OPERAND_DEST_IP, "%s" % r'\.'.join(text.split('.')).replace("*", ".*")
elif combo.itemData(what_idx) == self.FIELD_APPIMAGE:
elif combo.itemData(what_idx) == constants.FIELD_APPIMAGE:
appimage_bin = os.path.basename(con.process_path)
appimage_path = os.path.dirname(con.process_path).replace(".", "\.")
print("APP3:", appimage_path)
appimage_path = appimage_path[0:len(self.APPIMAGE_PREFIX)+7]
print("APP4:", appimage_path)
appimage_path = appimage_path[0:len(constants.APPIMAGE_PREFIX)+7]
return Config.RULE_TYPE_REGEXP, Config.OPERAND_PROCESS_PATH, r'^{0}[0-9A-Za-z]{{6}}\/.*{1}$'.format(appimage_path, appimage_bin)
def _on_action_clicked(self, action):
@ -704,7 +577,7 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self._rule = ui_pb2.Rule(name="user.choice")
self._rule.created = int(datetime.now().timestamp())
self._rule.enabled = True
self._rule.duration = self._get_duration(self.durationCombo.currentIndex())
self._rule.duration = utils.get_duration(self.durationCombo.currentIndex())
self._rule.action = Config.ACTION_ALLOW
if self._default_action == Config.ACTION_DENY_IDX:
@ -715,7 +588,7 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
what_idx = self.whatCombo.currentIndex()
self._rule.operator.type, self._rule.operator.operand, self._rule.operator.data = self._get_combo_operator(self.whatCombo, what_idx, self._con)
if self._rule.operator.data == "":
print("Invalid rule, discarding: ", self._rule)
print("popups: Invalid rule, discarding: ", self._rule)
self._rule = None
return
@ -724,16 +597,16 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
# TODO: move to a method
data=[]
if self.checkDstIP.isChecked() and self.whatCombo.itemData(what_idx) != self.FIELD_DST_IP:
if self.checkDstIP.isChecked() and self.whatCombo.itemData(what_idx) != constants.FIELD_DST_IP:
_type, _operand, _data = self._get_combo_operator(self.whatIPCombo, self.whatIPCombo.currentIndex(), self._con)
data.append({"type": _type, "operand": _operand, "data": _data})
rule_temp_name = slugify("%s %s" % (rule_temp_name, _data))
if self.checkDstPort.isChecked() and self.whatCombo.itemData(what_idx) != self.FIELD_DST_PORT:
if self.checkDstPort.isChecked() and self.whatCombo.itemData(what_idx) != constants.FIELD_DST_PORT:
data.append({"type": Config.RULE_TYPE_SIMPLE, "operand": Config.OPERAND_DEST_PORT, "data": str(self._con.dst_port)})
rule_temp_name = slugify("%s %s" % (rule_temp_name, str(self._con.dst_port)))
if self.checkUserID.isChecked() and self.whatCombo.itemData(what_idx) != self.FIELD_USER_ID:
if self.checkUserID.isChecked() and self.whatCombo.itemData(what_idx) != constants.FIELD_USER_ID:
data.append({"type": Config.RULE_TYPE_SIMPLE, "operand": Config.OPERAND_USER_ID, "data": str(self._con.user_id)})
rule_temp_name = slugify("%s %s" % (rule_temp_name, str(self._con.user_id)))
@ -741,7 +614,6 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
_type, _operand, _data = Config.RULE_TYPE_SIMPLE, Config.OPERAND_PROCESS_HASH_MD5, self.checksumLabel.text()
data.append({"type": _type, "operand": _operand, "data": _data})
rule_temp_name = slugify("%s %s" % (rule_temp_name, _operand))
print(data)
is_list_rule = self._is_list_rule()
@ -754,7 +626,6 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
if os.path.isabs(proc_args[0]) == False:
is_list_rule = True
data.append({"type": Config.RULE_TYPE_SIMPLE, "operand": Config.OPERAND_PROCESS_PATH, "data": str(self._con.process_path)})
print(self._con.process_path, "-", self._con.process_args, "-", proc_args[0])
if is_list_rule:
data.append({
@ -786,8 +657,6 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.checkAdvanced.toggle()
self._ischeckAdvanceded = False
print(self._rule)
except Exception as e:
print("[pop-up] exception creating a rule:", e)
finally:

View file

@ -0,0 +1,47 @@
from PyQt5.QtCore import QCoreApplication as QC
from opensnitch.config import Config
from opensnitch.rules import Rule
def verify(con, rule):
"""return true if the checksum of a rule matches the one of the process
opening a connection.
"""
if rule.operator.type != Config.RULE_TYPE_LIST:
return True, ""
if con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5] == "":
return True, ""
for ro in rule.operator.list:
if ro.type == Config.RULE_TYPE_SIMPLE and ro.operand == Config.OPERAND_PROCESS_HASH_MD5:
if ro.data != con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5]:
return False, ro.data
return True, ""
def update_rule(node, rules, rule_name, con):
"""try to obtain the rule from the DB by name.
return the rule on success, or None + error message on error.
"""
# get rule from the db
records = rules.get_by_name(node, rule_name)
if records == None or records.first() == False:
return None, QC.translate("popups", "Rule not updated, not found by name")
# transform it to proto rule
rule_obj = Rule.new_from_records(records)
if rule_obj.operator.type != Config.RULE_TYPE_LIST:
if rule_obj.operator.operand == Config.OPERAND_PROCESS_HASH_MD5:
rule_obj.operator.data = con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5]
else:
for op in rule_obj.operator.list:
if op.operand == Config.OPERAND_PROCESS_HASH_MD5:
op.data = con.process_checksums[Config.OPERAND_PROCESS_HASH_MD5]
break
# add it back again to the db
added = rules.add_rules(node, [rule_obj])
if not added:
return None, QC.translate("popups", "Rule not updated.")
return rule_obj, ""

View file

@ -0,0 +1,34 @@
from PyQt5.QtCore import QCoreApplication as QC
PAGE_MAIN = 2
PAGE_DETAILS = 0
PAGE_CHECKSUMS = 1
DEFAULT_TIMEOUT = 15
# don't translate
FIELD_REGEX_HOST = "regex_host"
FIELD_REGEX_IP = "regex_ip"
FIELD_PROC_PATH = "process_path"
FIELD_PROC_ARGS = "process_args"
FIELD_PROC_ID = "process_id"
FIELD_USER_ID = "user_id"
FIELD_DST_IP = "dst_ip"
FIELD_DST_PORT = "dst_port"
FIELD_DST_NETWORK = "dst_network"
FIELD_DST_HOST = "simple_host"
FIELD_APPIMAGE = "appimage_path"
DURATION_30s = "30s"
DURATION_5m = "5m"
DURATION_15m = "15m"
DURATION_30m = "30m"
DURATION_1h = "1h"
# don't translate
APPIMAGE_PREFIX = "/tmp/.mount_"
# label displayed in the pop-up combo
DURATION_session = QC.translate("popups", "until reboot")
# label displayed in the pop-up combo
DURATION_forever = QC.translate("popups", "forever")

View file

@ -4,6 +4,7 @@ from PyQt5 import QtGui
from PyQt5.QtCore import QCoreApplication as QC
from opensnitch.config import Config
from opensnitch.dialogs.prompt import constants
def set_elide_text(widget, text, max_size=132):
if len(text) > max_size:
@ -93,6 +94,31 @@ def set_app_description(appDescriptionLabel, description):
appDescriptionLabel.setFixedHeight(0)
appDescriptionLabel.setText("")
def get_duration(duration_idx):
if duration_idx == 0:
return Config.DURATION_ONCE
elif duration_idx == 1:
return constants.DURATION_30s
elif duration_idx == 2:
return constants.DURATION_5m
elif duration_idx == 3:
return constants.DURATION_15m
elif duration_idx == 4:
return constants.DURATION_30m
elif duration_idx == 5:
return constants.DURATION_1h
elif duration_idx == 6:
return Config.DURATION_UNTIL_RESTART
else:
return Config.DURATION_ALWAYS
def set_default_duration(cfg, durationCombo):
if cfg.hasKey(Config.DEFAULT_DURATION_KEY):
cur_idx = cfg.getInt(Config.DEFAULT_DURATION_KEY)
durationCombo.setCurrentIndex(cur_idx)
else:
durationCombo.setCurrentIndex(Config.DEFAULT_DURATION_IDX)
def render_details(node, detailsWidget, con):
tree = ""
space = "&nbsp;"