ui,popups: mark temporary rules as inactive

Once the temporary rules expire, mark them as inactive in the DB.
This commit is contained in:
Gustavo Iñiguez Goia 2025-01-31 00:28:11 +01:00
parent e403b080bc
commit 85173c3553
Failed to generate hash of commit
6 changed files with 119 additions and 2 deletions

View file

@ -134,6 +134,9 @@ class Nodes(QObject):
return nid, notif, rules_list
def disable_rule(self, addr, rule_name):
self._rules.disable(addr, rule_name)
def update_rule_time(self, time, rule_name, addr):
self._rules.update_time(time, rule_name, addr)

View file

@ -165,6 +165,16 @@ class Rules(QObject):
return rule_name
def disable(self, addr, name):
"""Mark a rule as not enabled in the DB"""
self._db.update(
"rules",
"enabled='False'",
(name, addr),
"name=? AND node=?",
action_on_conflict="OR REPLACE"
)
def update_time(self, time, name, addr):
"""Updates the time of a rule, whenever a new connection matched a
rule.

View file

@ -26,8 +26,15 @@ from opensnitch.nodes import Nodes
from opensnitch.config import Config
from opensnitch.version import version
from opensnitch.database import Database
from opensnitch.utils import Utils, CleanerTask, Themes
from opensnitch.utils import Message, languages
from opensnitch.utils import (
Utils,
CleanerTask,
Themes,
OneshotTimer,
languages,
Message
)
from opensnitch.utils.duration import duration
from opensnitch.utils.xdg import Autostart
class UIService(ui_pb2_grpc.UIServicer, QtWidgets.QGraphicsObject):
@ -738,6 +745,16 @@ class UIService(ui_pb2_grpc.UIServicer, QtWidgets.QGraphicsObject):
# daemon.
rule.operator.data = ""
# disable temporal rules in the db
def _disable_temp_rule(args):
self._nodes.disable_rule(args[0], args[1].name)
timeout = duration.to_seconds(rule.duration)
if rule.duration == Config.DURATION_ONCE:
timeout = 1
if timeout > 0:
ost = OneshotTimer(timeout, _disable_temp_rule, (kwargs['peer'], rule,))
ost.start()
elif kwargs['action'] == self.DELETE_RULE:
self._db.delete_rule(kwargs['name'], kwargs['addr'])

View file

@ -227,7 +227,13 @@ class OneshotTimer(GenericTimer):
def run(self):
self.stop_flag.wait(self.interval)
if self.stop_flag.is_set():
return
self.callback(self.args)
self.stop()
def stop(self):
self.stop_flag.set()
class CleanerTask(Thread):
interval = 1

View file

@ -0,0 +1,20 @@
# Copyright (C) 2018 Simone Margaritelli
# 2018 MiWCryptAnalytics
# 2023 munix9
# 2023 Wojtek Widomski
# 2019-2025 Gustavo Iñiguez Goia
#
# This file is part of OpenSnitch.
#
# OpenSnitch is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenSnitch is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OpenSnitch. If not, see <http://www.gnu.org/licenses/>.

View file

@ -0,0 +1,61 @@
#!/usr/bin/env python3
# Copyright (C) 2018 Simone Margaritelli
# 2018 MiWCryptAnalytics
# 2023 munix9
# 2023 Wojtek Widomski
# 2019-2025 Gustavo Iñiguez Goia
#
# This file is part of OpenSnitch.
#
# OpenSnitch is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OpenSnitch is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OpenSnitch. If not, see <http://www.gnu.org/licenses/>.
import re
r = re.compile(r'(\d+)([smhdw]+)')
_second = 1
_minute = 60
_hour = 60 * _minute
_day = 60 * _hour
_week = 60 * _day
_units = {
's': _second,
'm': _minute,
'h': _hour,
'd': _day,
'w': _week
}
def to_seconds(dur_str):
"""converts a Golang duration string to seconds:
"20s" -> 20 seconds
"2m" -> 120 seconds
...
"""
secs = 0
try:
finds = r.findall(dur_str)
for d in finds:
try:
unit = _units[d[1]]
secs += (unit * int(d[0]))
except:
print("duration.to_seconds(): invalid unit:", d)
return secs
except:
return secs