ui,fw: export rules to clipboard, rule dialog improvements

- Added option to export fw rule to clipboard in json format.
- Improvements to receive notifications on the fw rules dialog.
This commit is contained in:
Gustavo Iñiguez Goia 2023-07-15 17:13:58 +02:00
parent 1b3003e007
commit 77c49d533c
Failed to generate hash of commit
4 changed files with 161 additions and 91 deletions

View file

@ -391,28 +391,43 @@ The value must be in the format: VALUE/UNITS/TIME, for example:
def _cb_notification_callback(self, reply):
self._enable_buttons()
if reply.id not in self._notifications_sent:
print(self.LOG_TAG, "notification not in the list:", reply.id, "list:", self._notifications_sent)
return
rep = self._notifications_sent[reply.id]
if reply.code == ui_pb2.OK:
if 'operation' in rep and rep['operation'] == self.OP_DELETE:
self.tabWidget.setDisabled(True)
self._set_status_successful(QC.translate("firewall", "Rule deleted"))
self._disable_controls()
del self._notifications_sent[reply.id]
try:
if reply.id not in self._notifications_sent:
return
self._set_status_successful(QC.translate("firewall", "Rule added"))
rep = self._notifications_sent[reply.id]
if reply.code == ui_pb2.OK:
if 'operation' in rep and rep['operation'] == self.OP_DELETE:
self.tabWidget.setDisabled(True)
self._set_status_successful(QC.translate("firewall", "Rule deleted"))
self._disable_controls()
del self._notifications_sent[reply.id]
return
else:
errmsg = QC.translate("firewall", "Error adding rules:\n{0}".format(reply.data))
if 'operation' in rep and rep['operation'] == self.OP_SAVE:
errmsg = QC.translate("firewall", "Error saving rules:\n{0}".format(reply.data))
self._set_status_error(errmsg)
if 'operation' in rep and rep['operation'] == self.OP_SAVE:
self._set_status_successful(QC.translate("firewall", "Rule saved"))
else:
self._set_status_successful(QC.translate("firewall", "Rule added"))
del self._notifications_sent[reply.id]
else:
# XXX: The errors returned by the nftables lib are not really descriptive.
# "invalid argument", "no such file or directory", without context
# 1st one: invalid combination of table/chain/priorities?
# 2nd one: does the table/chain exist?
errormsg = QC.translate("firewall", "Error adding rules:\n{0}".format(reply.data))
if 'operation' in rep and rep['operation'] == self.OP_SAVE:
if 'uuid' in rep and rep['uuid'] in reply.data:
errormsg = QC.translate("firewall", "Error saving rule")
else:
self._set_status_message(QC.translate("firewall", "Rule saved, but there're other rules with errors (REVIEW):\n{0}".format(reply.data)))
return
self._set_status_error(errormsg)
except Exception as e:
print("[fw rule dialog exception] notif error:", e)
finally:
if reply.id in self._notifications_sent:
del self._notifications_sent[reply.id]
@QtCore.pyqtSlot(int)
def _cb_nodes_updated(self, total):
@ -455,10 +470,10 @@ The value must be in the format: VALUE/UNITS/TIME, for example:
self._set_status_error(QC.translate("firewall", "Error updating rule"))
return
if self.comboNodes.currentIndex() > 0:
self.send_notification(node_addr, node['firewall'], self.OP_DELETE)
else:
if self.comboNodes.currentIndex() == 0:
self.send_notifications(node['firewall'], self.OP_DELETE)
else:
self.send_notification(node_addr, node['firewall'], self.OP_DELETE)
def _cb_save_clicked(self):
if len(self.statements) == 0:
@ -472,12 +487,12 @@ The value must be in the format: VALUE/UNITS/TIME, for example:
self._set_status_message(QC.translate("firewall", "Adding rule, wait"))
ok, err = self._fw.update_rule(node_addr, self.uuid, chain)
if not ok:
self._set_status_error(QC.translate("firewall", "Error updating rule: {0}".format(err)))
self._set_status_error(QC.translate("firewall", "Error updating rule ({0}): {1}".format(node_addr, err)))
return
self._enable_buttons(False)
if self.comboNodes.currentIndex() > 0:
self.send_notification(node_addr, node['firewall'], self.OP_SAVE)
if self.comboNodes.currentIndex() == 0:
self.send_notification(node_addr, node['firewall'], self.OP_SAVE, self.uuid)
else:
self.send_notifications(node['firewall'], self.OP_SAVE)
@ -500,8 +515,9 @@ The value must be in the format: VALUE/UNITS/TIME, for example:
return
self._set_status_message(QC.translate("firewall", "Adding rule, wait"))
self._enable_buttons(False)
if self.comboNodes.currentIndex() > 0:
self.send_notification(node_addr, node['firewall'], self.OP_NEW)
if self.comboNodes.currentIndex() == 0:
self.send_notification(node_addr, node['firewall'], self.OP_NEW, chain.Rules[0].UUID)
else:
self.send_notifications(node['firewall'], self.OP_NEW)
@ -1146,8 +1162,9 @@ The value must be in the format: VALUE/UNITS/TIME, for example:
elif exp.Statement.Name == Fw.Statements.COUNTER.value:
self.statements[idx]['what'].setCurrentIndex(self.STATM_COUNTER+1)
if exp.Statement.Values[0].Key == Fw.ExprCounter.NAME.value:
self.statements[idx]['value'].setCurrentText(exp.Statement.Values[0].Value)
for v in exp.Statement.Values:
if v.Key == Fw.ExprCounter.NAME.value:
self.statements[idx]['value'].setCurrentText(v.Value)
else:
isNotSupported = True
@ -1470,9 +1487,9 @@ The value must be in the format: VALUE/UNITS/TIME, for example:
return True
def send_notification(self, node_addr, fw_config, op):
def send_notification(self, node_addr, fw_config, op, uuid):
nid, notif = self._nodes.reload_fw(node_addr, fw_config, self._notification_callback)
self._notifications_sent[nid] = {'addr': node_addr, 'operation': op, 'notif': notif}
self._notifications_sent[nid] = {'addr': node_addr, 'operation': op, 'notif': notif, 'uuid': uuid}
def send_notifications(self, fw_config, op):
for addr in self._nodes.get_nodes():

View file

@ -859,6 +859,7 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
table = self._get_active_table()
model = table.model()
menu = QtWidgets.QMenu()
exportMenu = QtWidgets.QMenu(QC.translate("stats", "Export"))
selection = table.selectionModel().selectedRows()
if not selection:
@ -886,6 +887,11 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
_menu_delete = menu.addAction(QC.translate("stats", "Delete"))
_menu_edit = menu.addAction(QC.translate("stats", "Edit"))
menu.addSeparator()
_toClipboard = exportMenu.addAction(QC.translate("stats", "To clipboard"))
#_toDisk = exportMenu.addAction(QC.translate("stats", "To disk"))
menu.addMenu(exportMenu)
# move away menu a few pixels to the right, to avoid clicking on it by mistake
point = QtCore.QPoint(pos.x()+10, pos.y()+5)
action = menu.exec_(table.mapToGlobal(point))
@ -906,6 +912,11 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
action == _action_reject or \
action == _action_return:
self._table_menu_change_rule_field(cur_idx, model, selection, FwRules.FIELD_TARGET, action.text())
elif action == _toClipboard:
self._table_menu_export_clipboard(cur_idx, model, selection)
#elif action == _toDisk:
# self._table_menu_export_disk(cur_idx, model, selection)
self._fw.rules.blockSignals(False)
except Exception as e:
@ -1023,15 +1034,23 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
def _table_menu_export_clipboard(self, cur_idx, model, selection):
rules_list = []
for idx in selection:
rule_name = model.index(idx.row(), self.COL_R_NAME).data()
node_addr = model.index(idx.row(), self.COL_R_NODE).data()
if cur_idx == self.TAB_RULES and not self.fwTable.isVisible():
for idx in selection:
rule_name = model.index(idx.row(), self.COL_R_NAME).data()
node_addr = model.index(idx.row(), self.COL_R_NODE).data()
json_rule = self._nodes.rule_to_json(node_addr, rule_name)
if json_rule != None:
rules_list.append(json_rule)
else:
print("export to clipboard: ERROR converting \"{0}\" to json".format(rule_name))
json_rule = self._nodes.rule_to_json(node_addr, rule_name)
if json_rule != None:
rules_list.append(json_rule)
else:
print("export to clipboard: ERROR converting \"{0}\" to json".format(rule_name))
elif cur_idx == self.TAB_RULES and self.fwTable.isVisible():
for idx in selection:
uuid = model.index(idx.row(), FirewallTableModel.COL_UUID).data()
node = model.index(idx.row(), FirewallTableModel.COL_ADDR).data()
r = self._fw.get_protorule_by_uuid(node, uuid)
if r:
rules_list.append(self._fw.rule_to_json(r))
cliptext=""
for r in rules_list:
@ -1302,6 +1321,8 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
"{0}".format(reply.data),
QtWidgets.QMessageBox.Warning)
del self._notifications_sent[reply.id]
else:
Message.ok(
QC.translate("stats", "Warning:"),

View file

@ -59,19 +59,6 @@ class Firewall(QObject):
chain.Rules[0].Enabled = enable
return self.update_rule(addr, uuid, chain)
def get_rule_by_uuid(self, uuid):
if uuid == "":
return None, None
for addr in self._nodes.get_nodes():
node = self._nodes.get_node(addr)
if not 'fwrules' in node:
continue
r = node['fwrules'].get(uuid)
if r != None:
return addr, r
return None, None
def filter_rules(self, nail):
"""
"""
@ -110,6 +97,69 @@ class Firewall(QObject):
return chains
def filter_by_table(self, addr, table, family):
"""get rules by table"""
chains = []
node = self._nodes.get_node(addr)
if not 'firewall' in node:
return chains
for n in node['firewall'].SystemRules:
for c in n.Chains:
for r in c.Rules:
if c.Table == table and c.Family == family:
chains.append(Rules.to_array(addr, c, r))
return chains
def filter_by_chain(self, addr, table, family, chain, hook):
"""get rules by chain"""
chains = []
node = self._nodes.get_node(addr)
if not 'firewall' in node:
return chains
for n in node['firewall'].SystemRules:
for c in n.Chains:
for r in c.Rules:
if c.Table == table and c.Family == family and c.Name == chain and c.Hook == hook:
chains.append(Rules.to_array(addr, c, r))
return chains
def swap_rules(self, view, addr, uuid, old_pos, new_pos):
return self.rules.swap(view, addr, uuid, old_pos, new_pos)
def get_rule_by_uuid(self, uuid):
"""get rule by uuid, in string format
"""
if uuid == "":
return None, None
for addr in self._nodes.get_nodes():
node = self._nodes.get_node(addr)
if not 'fwrules' in node:
continue
r = node['fwrules'].get(uuid)
if r != None:
return addr, r
return None, None
def get_protorule_by_uuid(self, addr, uuid):
"""get protobuffer rule by uuid.
"""
return self.rules.get_by_uuid(addr, uuid)
def get_node_rules(self, addr):
return self.rules.get_by_node(addr)
def get_chains(self):
return self.chains.get()
def get_rules(self):
return self.rules.get()
def rule_to_json(self, rule):
return Rules.to_json(rule)
def apply_profile(self, node_addr, json_profile):
"""
Apply a profile to the firewall configuration.
@ -180,43 +230,3 @@ class Firewall(QObject):
except Exception as e:
return False, "{0}".format(e)
return False, QC.translate("firewall", "profile not deleted")
def swap_rules(self, view, addr, uuid, old_pos, new_pos):
return self.rules.swap(view, addr, uuid, old_pos, new_pos)
def filter_by_table(self, addr, table, family):
"""get rules by table"""
chains = []
node = self._nodes.get_node(addr)
if not 'firewall' in node:
return chains
for n in node['firewall'].SystemRules:
for c in n.Chains:
for r in c.Rules:
if c.Table == table and c.Family == family:
chains.append(Rules.to_array(addr, c, r))
return chains
def filter_by_chain(self, addr, table, family, chain, hook):
"""get rules by chain"""
chains = []
node = self._nodes.get_node(addr)
if not 'firewall' in node:
return chains
for n in node['firewall'].SystemRules:
for c in n.Chains:
for r in c.Rules:
if c.Table == table and c.Family == family and c.Name == chain and c.Hook == hook:
chains.append(Rules.to_array(addr, c, r))
return chains
def get_node_rules(self, addr):
return self.rules.get_by_node(addr)
def get_chains(self):
return self.chains.get()
def get_rules(self):
return self.rules.get()

View file

@ -1,6 +1,8 @@
from PyQt5.QtCore import QObject, pyqtSignal
from PyQt5.QtCore import QCoreApplication as QC
from google.protobuf.json_format import MessageToJson
import uuid
from opensnitch import ui_pb2
from .enums import Operator
from .exprs import ExprLog
@ -139,6 +141,19 @@ class Rules(QObject):
rules.append(Rules.to_array(addr, c, r))
return rules
def get_by_uuid(self, addr, uuid):
rules = []
node = self._nodes.get_node(addr)
if node == None:
return rules
if not 'firewall' in node:
return rules
for u in node['firewall'].SystemRules:
for c in u.Chains:
for r in c.Rules:
if r.UUID == uuid:
return r
return None
def swap(self, view, addr, uuid, old_pos, new_pos):
"""
@ -263,6 +278,13 @@ class Rules(QObject):
return rules
@staticmethod
def to_json(rule):
try:
return MessageToJson(rule)
except:
return None
@staticmethod
def to_array(addr, chain, rule):
cols = []