added new task nodemonitor

Added new task to monitor the resources of remote nodes, like
ram, swap, number of processes or load average of the system.

The task is initiated when the user selects a node, and the data
received from the node is added to the right panel of the Nodes tab.

The task is stopped when changing to another tab, or when deselecting a
node.

Particularly useful for monitoring remote nodes.
This commit is contained in:
Gustavo Iñiguez Goia 2024-09-28 01:16:05 +02:00
parent aea751793f
commit 58613543e2
6 changed files with 1032 additions and 507 deletions

View file

@ -0,0 +1,122 @@
package nodemonitor
import (
"context"
"encoding/json"
"fmt"
"syscall"
"time"
"unsafe"
"github.com/evilsocket/opensnitch/daemon/log"
"github.com/evilsocket/opensnitch/daemon/tasks"
)
// Name of this task
var Name = "node-monitor"
// Config of this task
type Config struct {
Interval string
Name string
}
// NodeMonitor monitors the resources of a node (ram, swap, load avg, etc).
type NodeMonitor struct {
tasks.TaskBase
Ticker *time.Ticker
Interval string
Node string
}
// New returns a new NodeMonitor
func New(node, interval string, stopOnDisconnect bool) (string, *NodeMonitor) {
return fmt.Sprint(Name, "-", node), &NodeMonitor{
TaskBase: tasks.TaskBase{
Results: make(chan interface{}),
Errors: make(chan error),
StopChan: make(chan struct{}),
},
Node: node,
Interval: interval,
}
}
// Start ...
func (pm *NodeMonitor) Start(ctx context.Context, cancel context.CancelFunc) error {
pm.Ctx = ctx
pm.Cancel = cancel
if pm.Interval == "" {
pm.Interval = "5s"
}
interval, err := time.ParseDuration(pm.Interval)
if err != nil {
return err
}
pm.Ticker = time.NewTicker(interval)
go func(ctx context.Context) {
var info syscall.Sysinfo_t
for {
select {
case <-pm.StopChan:
goto Exit
case <-ctx.Done():
goto Exit
case <-pm.Ticker.C:
// TODO:
// - filesystem stats
// - daemon status (mem && cpu usage, internal/debug pkg, etc)
err := syscall.Sysinfo(&info)
if err != nil {
pm.TaskBase.Errors <- err
continue
}
infoJSON, err := json.Marshal(info)
if err != nil {
pm.TaskBase.Errors <- err
continue
}
pm.TaskBase.Results <- unsafe.String(unsafe.SliceData(infoJSON), len(infoJSON))
}
}
Exit:
log.Debug("[tasks.NodeMonitor] stopped (%s)", pm.Node)
}(ctx)
return err
}
// Pause stops temporarily the task. For example it might be paused when the
// connection with the GUI (server) is closed.
func (pm *NodeMonitor) Pause() error {
// TODO
return nil
}
// Resume stopped tasks.
func (pm *NodeMonitor) Resume() error {
// TODO
return nil
}
// Stop ...
func (pm *NodeMonitor) Stop() error {
if pm.StopOnDisconnect {
return nil
}
pm.Cancel()
close(pm.TaskBase.Results)
close(pm.TaskBase.Errors)
return nil
}
// Results ...
func (pm *NodeMonitor) Results() <-chan interface{} {
return pm.TaskBase.Results
}
// Errors ...
func (pm *NodeMonitor) Errors() <-chan error {
return pm.TaskBase.Errors
}

View file

@ -15,6 +15,7 @@ import (
"github.com/evilsocket/opensnitch/daemon/procmon/monitor"
"github.com/evilsocket/opensnitch/daemon/rule"
"github.com/evilsocket/opensnitch/daemon/tasks"
"github.com/evilsocket/opensnitch/daemon/tasks/nodemonitor"
"github.com/evilsocket/opensnitch/daemon/tasks/pidmonitor"
"github.com/evilsocket/opensnitch/daemon/ui/config"
"github.com/evilsocket/opensnitch/daemon/ui/protocol"
@ -58,6 +59,33 @@ func (c *Client) getClientConfig() *protocol.ClientConfig {
}
}
func (c *Client) monitorNode(node, interval string, stream protocol.UI_NotificationsClient, notification *protocol.Notification) {
taskName, nodeMonTask := nodemonitor.New(node, interval, true)
ctxNode, err := TaskMgr.AddTask(taskName, nodeMonTask)
if err != nil {
c.sendNotificationReply(stream, notification.Id, "", err)
return
}
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
goto Exit
case err := <-nodeMonTask.Errors():
c.sendNotificationReply(stream, notification.Id, "", err)
case temp := <-nodeMonTask.Results():
data, ok := temp.(string)
if !ok {
goto Exit
}
c.sendNotificationReply(stream, notification.Id, data, nil)
}
}
Exit:
TaskMgr.RemoveTask(taskName)
}(ctxNode)
}
func (c *Client) monitorProcessDetails(pid int, interval string, stream protocol.UI_NotificationsClient, notification *protocol.Notification) {
if !core.Exists(fmt.Sprint("/proc/", pid)) {
c.sendNotificationReply(stream, notification.Id, "", fmt.Errorf("The process is no longer running"))
@ -185,6 +213,8 @@ func (c *Client) handleActionTaskStart(stream protocol.UI_NotificationsClient, n
return
}
c.monitorProcessDetails(pid, taskConf.Data["interval"], stream, notification)
case nodemonitor.Name:
c.monitorNode(taskConf.Data["node"], taskConf.Data["interval"], stream, notification)
default:
log.Debug("TaskStart, unknown task: %v", taskConf)
//c.sendNotificationReply(stream, notification.Id, "", err)
@ -208,6 +238,8 @@ func (c *Client) handleActionTaskStop(stream protocol.UI_NotificationsClient, no
return
}
TaskMgr.RemoveTask(fmt.Sprint(taskConf.Name, "-", pid))
case nodemonitor.Name:
TaskMgr.RemoveTask(fmt.Sprint(nodemonitor.Name, "-", taskConf.Data["node"]))
default:
log.Debug("TaskStop, unknown task: %v", taskConf)
//c.sendNotificationReply(stream, notification.Id, "", err)

View file

@ -140,6 +140,7 @@ class Config:
STATS_RULES_TREE_EXPANDED_0 = "statsDialog/rules_tree_0_expanded"
STATS_RULES_TREE_EXPANDED_1 = "statsDialog/rules_tree_1_expanded"
STATS_RULES_SPLITTER_POS = "statsDialog/rules_splitter_pos"
STATS_NODES_SPLITTER_POS = "statsDialog/nodes_splitter_pos"
STATS_VIEW_COL_STATE = "statsDialog/view_columns_state"
STATS_VIEW_DETAILS_COL_STATE = "statsDialog/view_details_columns_state"

View file

@ -4,6 +4,7 @@ import sys
import os
import csv
import io
import json
from PyQt5 import QtCore, QtGui, uic, QtWidgets
from PyQt5.QtCore import QCoreApplication as QC
@ -56,6 +57,16 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
COL_RULES = 7
GENERAL_COL_NUM = 8
# nodes
COL_N_STATUS = 2
COL_N_HOSTNAME = 3
COL_N_VERSION = 4
COL_N_UPTIME = 5
COL_N_RULES = 6
COL_N_CONNECTIONS = 7
COL_N_DROPPED = 8
COL_N_KERNEL = 9
# stats
COL_WHAT = 0
@ -130,10 +141,6 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
TAB_FIREWALL: False,
TAB_ALERTS: False
}
# restore scrollbar position when going back from a detail view
LAST_SCROLL_VALUE = None
# try to restore last selection
LAST_SELECTED_ITEM = ""
TABLES = {
TAB_MAIN: {
@ -357,6 +364,12 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.FIREWALL_DISABLED = QC.translate("stats", "Disabled")
self.FIREWALL_RUNNING = QC.translate("stats", "Running")
# restore scrollbar position when going back from a detail view
self.LAST_SCROLL_VALUE = None
# try to restore last selection
self.LAST_SELECTED_ITEM = ""
self.LAST_TAB = 0
self._db = db
self._db_sqlite = self._db.get_db()
self._db_name = dbname
@ -406,6 +419,8 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.nodeLabel.setStyleSheet('color: green;font-size:12pt; font-weight:600;')
self.rulesSplitter.setStretchFactor(0,0)
self.rulesSplitter.setStretchFactor(1,2)
self.nodesSplitter.setStretchFactor(0,0)
self.nodesSplitter.setStretchFactor(0,3)
self.rulesTreePanel.resizeColumnToContents(0)
self.rulesTreePanel.resizeColumnToContents(1)
self.rulesTreePanel.itemExpanded.connect(self._cb_rules_tree_item_expanded)
@ -424,7 +439,8 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.limitCombo.currentIndexChanged.connect(self._cb_limit_combo_changed)
self.tabWidget.currentChanged.connect(self._cb_tab_changed)
self.delRuleButton.clicked.connect(self._cb_del_rule_clicked)
self.rulesSplitter.splitterMoved.connect(self._cb_rules_splitter_moved)
self.rulesSplitter.splitterMoved.connect(lambda pos, index: self._cb_splitter_moved(self.TAB_RULES, pos, index))
self.nodesSplitter.splitterMoved.connect(lambda pos, index: self._cb_splitter_moved(self.TAB_NODES, pos, index))
self.rulesTreePanel.itemClicked.connect(self._cb_rules_tree_item_clicked)
self.rulesTreePanel.itemDoubleClicked.connect(self._cb_rules_tree_item_double_clicked)
self.enableRuleCheck.clicked.connect(self._cb_enable_rule_toggled)
@ -644,6 +660,7 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.TABLES[idx]['label'].setStyleSheet('color: blue; font-size:9pt; font-weight:600;')
self.TABLES[idx]['label'].setVisible(False)
self.TABLES[idx]['view'].doubleClicked.connect(self._cb_table_double_clicked)
self.TABLES[idx]['view'].selectionModel().selectionChanged.connect(self._cb_table_selection_changed)
self.TABLES[idx]['view'].installEventFilter(self)
self.TABLES[self.TAB_FIREWALL]['view'].rowsReordered.connect(self._cb_fw_table_rows_reordered)
@ -809,8 +826,20 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
elif len(rulesSizes) > 0:
self.comboRulesFilter.setVisible(rulesSizes[0] == 0)
else:
# default position when the user hasn't moved it.
w = self.rulesSplitter.width()
self.rulesSplitter.setSizes([int(w/3), int(w/2)])
self.rulesSplitter.setSizes([int(w/2), int(w/4)])
nodes_splitter_pos = self._cfg.getSettings(Config.STATS_NODES_SPLITTER_POS)
if type(nodes_splitter_pos) == QtCore.QByteArray:
self.nodesSplitter.restoreState(nodes_splitter_pos)
nodesSizes = self.nodesSplitter.sizes()
self.nodesSplitter.setVisible(not self.IN_DETAIL_VIEW[self.TAB_NODES] and nodesSizes[0] > 0)
#elif len(rulesSizes) > 0:
# self.comboRulesFilter.setVisible(rulesSizes[0] == 0)
else:
w = self.nodesSplitter.width()
self.nodesSplitter.setSizes([int(w/2), int(w/3)])
self._restore_details_view_columns(self.eventsTable.horizontalHeader(), Config.STATS_GENERAL_COL_STATE)
self._restore_details_view_columns(self.nodesTable.horizontalHeader(), Config.STATS_NODES_COL_STATE)
@ -1471,6 +1500,11 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
@QtCore.pyqtSlot(ui_pb2.NotificationReply)
def _cb_notification_callback(self, reply):
if reply.id in self._notifications_sent:
noti = self._notifications_sent[reply.id]
if noti.type == ui_pb2.TASK_START and reply.code != ui_pb2.ERROR:
self._update_node_info(reply.data)
return
if reply.code == ui_pb2.ERROR:
Message.ok(
QC.translate("stats", "Error:"),
@ -1488,6 +1522,9 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
def _cb_tab_changed(self, index):
self.comboAction.setVisible(index == self.TAB_MAIN)
if self.LAST_TAB == self.TAB_NODES and self.LAST_SELECTED_ITEM != "":
self._unmonitor_deselected_node(self.LAST_SELECTED_ITEM)
self.TABLES[index]['cmdCleanStats'].setVisible(True)
if index == self.TAB_MAIN:
self._set_events_query()
@ -1504,6 +1541,7 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
elif index == self.TAB_NODES:
self.TABLES[index]['cmdCleanStats'].setVisible( self.IN_DETAIL_VIEW[index] )
self.LAST_TAB = index
self._refresh_active_table()
def _cb_table_context_menu(self, pos):
@ -1686,6 +1724,27 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
"{0}{1}".format(Config.STATS_VIEW_DETAILS_COL_STATE, cur_idx)
)
def _cb_table_selection_changed(self, selected, deselected):
cur_idx = self.tabWidget.currentIndex()
if cur_idx == self.TAB_NODES:
if not deselected.isEmpty():
self.LAST_SELECTED_ITEM = ""
last_addr = deselected.indexes()[self.COL_NODE].data()
self._unmonitor_deselected_node(last_addr)
if not selected.isEmpty():
node_addr = selected.indexes()[self.COL_NODE].data()
if node_addr == self.LAST_SELECTED_ITEM:
return
self.LAST_SELECTED_ITEM = node_addr
self._monitor_selected_node(
node_addr,
selected.indexes()[self.COL_N_UPTIME].data(),
selected.indexes()[self.COL_N_HOSTNAME].data(),
selected.indexes()[self.COL_N_VERSION].data(),
selected.indexes()[self.COL_N_KERNEL].data()
)
def _cb_table_double_clicked(self, row):
cur_idx = self.tabWidget.currentIndex()
if self.IN_DETAIL_VIEW[cur_idx]:
@ -1831,9 +1890,15 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self._set_rules_filter(parent_row, item_row, item.text(0), node_addr, fw_table)
def _cb_rules_splitter_moved(self, pos, index):
def _cb_splitter_moved(self, tab, pos, index):
if tab == self.TAB_RULES:
self.comboRulesFilter.setVisible(pos == 0)
self._cfg.setSettings(Config.STATS_RULES_SPLITTER_POS, self.rulesSplitter.saveState())
elif tab == self.TAB_NODES:
#w = self.nodesSplitter.width()
#if pos >= w-2:
# self._unmonitor_deselected_node()
self._cfg.setSettings(Config.STATS_NODES_SPLITTER_POS, self.nodesSplitter.saveState())
def _cb_start_clicked(self):
if self.daemon_connected == False:
@ -2200,7 +2265,7 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
col = self.TAB_RULES
#self.TABLES[cur_idx]['view'].selectItem(self.LAST_SELECTED_ITEM, col)
self.LAST_SELECTED_ITEM = ""
#self.LAST_SELECTED_ITEM = ""
def _restore_scroll_value(self):
if self.LAST_SCROLL_VALUE != None:
@ -2303,6 +2368,8 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
self.setQuery(model, qstr)
def _set_nodes_query(self, data):
if data != self.LAST_SELECTED_ITEM:
self._monitor_selected_node(data, "", "", "", "")
model = self._get_active_table().model()
self.setQuery(model, "SELECT " \
@ -2359,6 +2426,150 @@ class StatsDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
return qstr
def _reset_node_info(self, status=""):
# value 0 is continuous progress
self.nodeRAMProgress.setMaximum(1)
self.nodeRAMProgress.setValue(0)
self.labelNodeProcs.setText("")
self.labelNodeLoadAvg.setText("")
self.labelNodeUptime.setText("")
self.labelNodeSwap.setText("")
self.labelNodeRAM.setText(status)
def _monitor_selected_node(self, node_addr, col_uptime, col_hostname, col_version, col_kernel):
# TODO:
# - create a tasks package, to centralize/normalize tasks' names and
# config
if not self._nodes.is_connected(node_addr):
self._reset_node_info(QC.translate("stats", "node not connected"))
else:
noti = ui_pb2.Notification(
clientName="",
serverName="",
type=ui_pb2.TASK_START,
data='{"name": "node-monitor", "data": {"node": "%s", "interval": "5s"}}' % node_addr,
rules=[])
nid = self._nodes.send_notification(
node_addr, noti, self._notification_callback
)
if nid != None:
self._notifications_sent[nid] = noti
self.nodeRAMProgress.setMaximum(0)
self.nodeSwapProgress.setMaximum(0)
self.labelNodeName.setText(QC.translate("stats", "loading node information..."))
self.labelNodeName.setText("<h3>{0}</h3>".format(col_hostname))
self.labelNodeDetails.setText(
QC.translate(
"stats",
"<p><strong>daemon uptime:</strong> {0}</p>".format(col_uptime) + \
"<p><strong>Version:</strong> {0}</p>".format(col_version) + \
"<p><strong>Kernel:</strong> {0}</p>".format(col_kernel)
)
)
def _unmonitor_deselected_node(self, last_addr):
if not self._nodes.is_connected(last_addr):
self._reset_node_info(QC.translate("stats", "node not connected"))
else:
noti = ui_pb2.Notification(
clientName="",
serverName="",
type=ui_pb2.TASK_STOP,
data='{"name": "node-monitor", "data": {"node": "%s", "interval": "5s"}}' % last_addr,
rules=[])
nid = self._nodes.send_notification(
last_addr, noti, self._notification_callback
)
if nid != None:
self._notifications_sent[nid] = noti
self.labelNodeDetails.setText("")
def _update_node_info(self, data):
try:
# TODO: move to .utils
def formatUptime(uptime):
hours = uptime / 3600
minutes = uptime % 3600 / 60
#seconds = uptime % 60
days = (uptime / 1440) / 60
months = 0
years = 0
if days > 0:
hours = hours % 24
minutes = (uptime % 3600) / 60
if days > 0:
uptime = "{0:.0f} days {1:.0f}h {2:.0f}m".format(days, hours, minutes)
else:
uptime = "{0:.0f}h {1:.0f}m".format(hours, minutes)
return QC.translate(
"stats",
"<strong>System uptime:</strong> %s" % uptime
)
# TODO: move to .utils
def bytes2units(value):
units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
idx = 0
while value / 1024 > 0:
value = value / 1024
idx+=1
if value < 1024:
break
return "{0:.0f} {1}".format(value, units [idx])
node_data = json.loads(data)
load1 = node_data['Loads'][0] / 100000
totalRam = node_data['Totalram']
totalSwap = node_data['Totalswap']
freeRam = totalRam - node_data['Freeram']
freeSwap = totalSwap - node_data['Freeswap']
self.nodeRAMProgress.setMaximum(int(totalRam/1000))
self.nodeRAMProgress.setValue(int(freeRam/1000))
self.nodeRAMProgress.setFormat("%p%")
self.nodeSwapProgress.setMaximum(int(totalSwap/1000))
self.nodeSwapProgress.setFormat("%p%")
self.nodeSwapProgress.setValue(int(freeSwap/1000))
# if any of these values is 0, set max progressbar value to 1, to
# avoid the "busy" effect:
# https://doc.qt.io/qtforpython-5/PySide2/QtWidgets/QProgressBar.html#detailed-description
if self.nodeRAMProgress.value() == 0:
self.nodeRAMProgress.setMaximum(1)
if self.nodeSwapProgress.value() == 0:
self.nodeSwapProgress.setMaximum(1)
ram = bytes2units(totalRam)
free = bytes2units(node_data['Freeram'])
swap = bytes2units(totalSwap)
freeSwap = bytes2units(node_data['Freeswap'])
self.labelNodeRAM.setText("<strong>RAM:</strong> {0} <strong>Free:</strong> {1}".format(ram, free))
self.labelNodeSwap.setText("<strong>Swap:</strong> {0} <strong>Free:</strong> {1}".format(swap, freeSwap))
self.labelNodeProcs.setText(
QC.translate("stats", "<strong>Processes:</strong> {0}".format(node_data['Procs']))
)
self.nodeRAMProgress.setFormat("%p%")
self.nodeSwapProgress.setFormat("%p%")
self.labelNodeLoadAvg.setText(
QC.translate(
"stats",
"<strong>Load avg:</strong> {0:.2f}, {1:.2f}, {2:.2f}".format(
node_data['Loads'][0] / 100000,
node_data['Loads'][1] / 100000,
node_data['Loads'][2] / 100000
)
)
)
self.labelNodeUptime.setText(formatUptime(node_data['Uptime']))
except Exception as e:
print("exception parsing taskStart data:", e, data)
# TODO: update nodes tab
def _update_nodes_interception_status(self, show=True, disable=False):
addr = self.TABLES[self.TAB_NODES]['label'].text()
node_cfg = self._nodes.get_node(addr)

View file

@ -160,7 +160,7 @@ class Nodes(QObject):
def get_node(self, addr):
try:
return self._nodes[addr]
except Exception as e:
except:
return None
def get_nodes(self):
@ -195,6 +195,13 @@ class Nodes(QObject):
print(self.LOG_TAG, "get_addr() error getting addr:", peer)
return peer
def is_connected(self, addr):
try:
nd = self.get_node(addr)
return nd != None or (nd in self._nodes and self._nodes[nd]['online'] == True)
except:
return None
def is_local(self, addr):
if addr.startswith("unix"):
return True

File diff suppressed because it is too large Load diff