mirror of
https://github.com/evilsocket/opensnitch.git
synced 2025-03-04 08:34:40 +01:00
ui, tests: fixed, added new ones.
This commit is contained in:
parent
fd231e7645
commit
d8a36f8d41
7 changed files with 226 additions and 5 deletions
|
@ -2,7 +2,7 @@
|
|||
|
||||
from PyQt5.QtCore import QCoreApplication as QC
|
||||
import os
|
||||
from utils import Utils
|
||||
from opensnitch.utils import Utils
|
||||
from opensnitch.config import Config
|
||||
|
||||
class DesktopNotifications():
|
||||
|
|
23
ui/tests/README.md
Normal file
23
ui/tests/README.md
Normal file
|
@ -0,0 +1,23 @@
|
|||
GUI unit tests.
|
||||
|
||||
We use pytest [0] to pytest-qt [1] to test GUI code.
|
||||
|
||||
To run the tests: `cd tests; pytest -v`
|
||||
|
||||
TODO:
|
||||
- test service class (Service.py)
|
||||
- test events window (stats.py):
|
||||
- The size of the window must be saved on close, and restored when opening it again.
|
||||
- Columns width of every view must be saved and restored properly.
|
||||
- On the Events tab, clicking on the Node, Process or Rule column must jump to the detailed view of the selected item.
|
||||
- When entering into a detail view:
|
||||
- the results limit configured must be respected (that little button on the bottom right of every tab).
|
||||
- must apply the proper SQL query for every detailed view.
|
||||
- When going back from a detail view:
|
||||
- The SQL query must be restored.
|
||||
- Test rules context menu actions.
|
||||
- Test select rows and copy them to the clipboard (ctrl+c).
|
||||
|
||||
|
||||
0. https://docs.pytest.org/en/6.2.x/
|
||||
1. https://pytest-qt.readthedocs.io/en/latest/intro.html
|
0
ui/tests/__init__.py
Normal file
0
ui/tests/__init__.py
Normal file
|
@ -1,5 +1,52 @@
|
|||
|
||||
from opensnitch.database import Database
|
||||
from opensnitch.config import Config
|
||||
from opensnitch.nodes import Nodes
|
||||
|
||||
# grpc object
|
||||
class ClientConfig:
|
||||
version = "1.2.3"
|
||||
name = "bla"
|
||||
logLevel = 0
|
||||
isFirewallRunning = False
|
||||
rules = []
|
||||
config = '''{
|
||||
"Server":{
|
||||
"Address": "unix:///tmp/osui.sock",
|
||||
"LogFile": "/var/log/opensnitchd.log"
|
||||
},
|
||||
"DefaultAction": "deny",
|
||||
"DefaultDuration": "once",
|
||||
"InterceptUnknown": false,
|
||||
"ProcMonitorMethod": "ebpf",
|
||||
"LogLevel": 0,
|
||||
"Firewall": "iptables",
|
||||
"Stats": {
|
||||
"MaxEvents": 150,
|
||||
"MaxStats": 50
|
||||
}
|
||||
}
|
||||
'''
|
||||
|
||||
class Connection:
|
||||
protocol = "tcp"
|
||||
src_ip = "127.0.0.1"
|
||||
src_port = "12345"
|
||||
dst_ip = "127.0.0.1"
|
||||
dst_host = "localhost"
|
||||
dst_port = "54321"
|
||||
user_id = 1000
|
||||
process_id = 9876
|
||||
process_path = "/bin/cmd"
|
||||
process_cwd = "/tmp"
|
||||
process_args = "/bin/cmd --parm1 test"
|
||||
process_env = []
|
||||
|
||||
db = Database.instance()
|
||||
db.initialize()
|
||||
Config.init()
|
||||
|
||||
nodes = Nodes.instance()
|
||||
nodes._nodes["unix:/tmp/osui.sock"] = {
|
||||
'data': ClientConfig
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
import os
|
||||
import time
|
||||
import json
|
||||
from PyQt5 import QtCore, QtWidgets
|
||||
from PyQt5 import QtCore, QtWidgets, QtGui
|
||||
|
||||
from opensnitch.config import Config
|
||||
from opensnitch.dialogs.preferences import PreferencesDialog
|
||||
|
@ -20,8 +20,9 @@ class TestPreferences():
|
|||
|
||||
@classmethod
|
||||
def setup_method(self):
|
||||
white_icon = QtGui.QIcon("../res/icon-white.svg")
|
||||
self.reset_settings()
|
||||
self.prefs = PreferencesDialog()
|
||||
self.prefs = PreferencesDialog(appicon=white_icon)
|
||||
self.prefs.show()
|
||||
|
||||
def run(self, qtbot):
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
#
|
||||
|
||||
import json
|
||||
from PyQt5 import QtCore, QtWidgets
|
||||
from PyQt5 import QtCore, QtWidgets, QtGui
|
||||
|
||||
from opensnitch.config import Config
|
||||
from opensnitch.dialogs.ruleseditor import RulesEditorDialog
|
||||
|
@ -12,7 +12,8 @@ class TestRulesEditor():
|
|||
|
||||
@classmethod
|
||||
def setup_method(self):
|
||||
self.rd = RulesEditorDialog()
|
||||
white_icon = QtGui.QIcon("../res/icon-white.svg")
|
||||
self.rd = RulesEditorDialog(appicon=white_icon)
|
||||
self.rd.show()
|
||||
self.rd.ruleNameEdit.setText("xxx")
|
||||
self.rd.nodesCombo.addItem("unix:/tmp/osui.sock")
|
||||
|
|
149
ui/tests/test_nodes.py
Normal file
149
ui/tests/test_nodes.py
Normal file
|
@ -0,0 +1,149 @@
|
|||
#
|
||||
# pytest -v tests/nodes.py
|
||||
#
|
||||
|
||||
import json
|
||||
from PyQt5 import QtCore
|
||||
from opensnitch import ui_pb2
|
||||
from opensnitch.config import Config
|
||||
from opensnitch.nodes import Nodes
|
||||
from tests.dialogs import ClientConfig
|
||||
|
||||
class NotifTest(QtCore.QObject):
|
||||
"""We need to subclass from QObject in order to be able to user signals and slots.
|
||||
"""
|
||||
signal = QtCore.pyqtSignal(ui_pb2.NotificationReply)
|
||||
|
||||
@QtCore.pyqtSlot(ui_pb2.NotificationReply)
|
||||
def callback(self, reply):
|
||||
assert reply != None
|
||||
assert reply.code == ui_pb2.OK and reply.type == ui_pb2.LOAD_FIREWALL and reply.data == "test"
|
||||
|
||||
|
||||
class TestNodes():
|
||||
|
||||
@classmethod
|
||||
def setup_method(self):
|
||||
self.nid = None
|
||||
self.daemon_config = ClientConfig
|
||||
self.nodes = Nodes.instance()
|
||||
self.nodes._db.insert("nodes",
|
||||
"(addr, status, hostname, daemon_version, daemon_uptime, " \
|
||||
"daemon_rules, cons, cons_dropped, version, last_connection)",
|
||||
(
|
||||
"1.2.3.4", Nodes.ONLINE, "xxx", "v1.2.3", str(0),
|
||||
"", "0", "0", "",
|
||||
"2022-01-03 11:22:48.101624"
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def test_add(self, qtbot):
|
||||
node = self.nodes.add("peer:1.2.3.4", self.daemon_config)
|
||||
|
||||
assert node != None
|
||||
|
||||
def test_get_node(self, qtbot):
|
||||
node = self.nodes.get_node("peer:1.2.3.4")
|
||||
|
||||
assert node != None
|
||||
|
||||
def test_get_addr(self, qtbot):
|
||||
proto, addr = self.nodes.get_addr("peer:1.2.3.4")
|
||||
|
||||
assert proto == "peer" and addr == "1.2.3.4"
|
||||
|
||||
def test_get_nodes(self, qtbot):
|
||||
nodes = self.nodes.get_nodes()
|
||||
print(nodes)
|
||||
|
||||
assert nodes.get("peer:1.2.3.4") != None
|
||||
|
||||
def test_add_rule(self, qtbot):
|
||||
self.nodes.add_rule(
|
||||
"2022-01-03 11:22:48.101624",
|
||||
"peer:1.2.3.4",
|
||||
"test",
|
||||
True,
|
||||
False,
|
||||
Config.ACTION_ALLOW, Config.DURATION_30s,
|
||||
Config.RULE_TYPE_SIMPLE, False, "dest.host", ""
|
||||
)
|
||||
|
||||
query = self.nodes._db.get_rule("test", "peer:1.2.3.4")
|
||||
|
||||
assert query.first() == True
|
||||
assert query.record().value(0) == "2022-01-03 11:22:48.101624"
|
||||
assert query.record().value(1) == "peer:1.2.3.4"
|
||||
assert query.record().value(2) == "test"
|
||||
assert query.record().value(3) == "1"
|
||||
assert query.record().value(4) == "0"
|
||||
assert query.record().value(5) == Config.ACTION_ALLOW
|
||||
assert query.record().value(6) == Config.DURATION_30s
|
||||
assert query.record().value(7) == Config.RULE_TYPE_SIMPLE
|
||||
assert query.record().value(8) == "0"
|
||||
assert query.record().value(9) == "dest.host"
|
||||
|
||||
def test_update_rule_time(self, qtbot):
|
||||
query = self.nodes._db.get_rule("test", "peer:1.2.3.4")
|
||||
assert query.first() == True
|
||||
assert query.record().value(0) == "2022-01-03 11:22:48.101624"
|
||||
|
||||
self.nodes.update_rule_time("2022-01-03 21:22:48.101624", "test", "peer:1.2.3.4")
|
||||
query = self.nodes._db.get_rule("test", "peer:1.2.3.4")
|
||||
assert query.first() == True
|
||||
assert query.record().value(0) == "2022-01-03 21:22:48.101624"
|
||||
|
||||
def test_delete_rule(self, qtbot):
|
||||
query = self.nodes._db.get_rule("test", "peer:1.2.3.4")
|
||||
assert query.first() == True
|
||||
|
||||
self.nodes.delete_rule("test", "peer:1.2.3.4", None)
|
||||
query = self.nodes._db.get_rule("test", "peer:1.2.3.4")
|
||||
assert query.first() == False
|
||||
|
||||
def test_update_node_status(self, qtbot):
|
||||
query = self.nodes._db.select("SELECT status FROM nodes WHERE addr = '{0}'".format("1.2.3.4"))
|
||||
assert query != None and query.exec_() == True and query.first() == True
|
||||
assert query.record().value(0) == Nodes.ONLINE
|
||||
|
||||
self.nodes.update("peer", "1.2.3.4", Nodes.OFFLINE)
|
||||
query = self.nodes._db.select("SELECT status FROM nodes WHERE addr = '{0}'".format("1.2.3.4"))
|
||||
assert query != None and query.exec_() == True and query.first() == True
|
||||
assert query.record().value(0) == Nodes.OFFLINE
|
||||
|
||||
def test_send_notification(self, qtbot):
|
||||
notifs = NotifTest()
|
||||
notifs.signal.connect(notifs.callback)
|
||||
|
||||
test_notif = ui_pb2.Notification(
|
||||
clientName="",
|
||||
serverName="",
|
||||
type=ui_pb2.LOAD_FIREWALL,
|
||||
data="test",
|
||||
rules=[])
|
||||
|
||||
self.nid = self.nodes.send_notification("peer:1.2.3.4", test_notif, notifs.signal)
|
||||
assert self.nodes._notifications_sent[self.nid] != None
|
||||
assert self.nodes._notifications_sent[self.nid]['type'] == ui_pb2.LOAD_FIREWALL
|
||||
|
||||
def test_reply_notification(self, qtbot):
|
||||
reply_notif = ui_pb2.Notification(
|
||||
id = self.nid,
|
||||
clientName="",
|
||||
serverName="",
|
||||
type=ui_pb2.LOAD_FIREWALL,
|
||||
data="test",
|
||||
rules=[])
|
||||
# just after process the reply, the notification is deleted (except if
|
||||
# is of type MONITOR_PROCESS
|
||||
self.nodes.reply_notification("peer:1.2.3.4", reply_notif)
|
||||
assert self.nid not in self.nodes._notifications_sent
|
||||
|
||||
def test_delete(self, qtbot):
|
||||
self.nodes.delete("peer:1.2.3.4")
|
||||
node = self.nodes.get_node("peer:1.2.3.4")
|
||||
nodes = self.nodes.get_nodes()
|
||||
|
||||
assert node == None
|
||||
assert nodes.get("peer:1.2.3.4") == None
|
Loading…
Add table
Reference in a new issue