ui: fixed exporting/importing rules

When exporting rules, use rfc3339 format for the Created field.
We were exporting as timestamp, which caused issues when importing them.

Related:
 58aa979cae
 issue #1140
This commit is contained in:
Gustavo Iñiguez Goia 2024-06-19 00:00:36 +02:00
parent 68de35141b
commit 552aed5bc5
Failed to generate hash of commit

View file

@ -7,11 +7,15 @@ from opensnitch.config import Config
import os
import json
from slugify import slugify
from datetime import datetime
from google.protobuf.json_format import MessageToJson, Parse
DefaultRulesPath = "/etc/opensnitchd/rules"
# date format displayed on the GUI (created column)
DBDateFieldFormat = "%Y-%m-%d %H:%M:%S"
class Rule():
def __init__(self):
pass
@ -43,7 +47,7 @@ class Rule():
created = int(datetime.now().timestamp())
if records.value(RuleFields.Created) != "":
created = int(datetime.strptime(
records.value(RuleFields.Created), "%Y-%m-%d %H:%M:%S"
records.value(RuleFields.Created), DBDateFieldFormat
).timestamp())
rule.created = created
@ -100,17 +104,20 @@ class Rules(QObject):
for _,r in enumerate(rules):
# Operator list is always saved as json string to the db.
rjson = json.loads(MessageToJson(r))
if r.operator.type == Config.RULE_TYPE_LIST and rjson.get('operator') != None and rjson.get('operator').get('list') != None:
if r.operator.type == Config.RULE_TYPE_LIST and \
rjson.get('operator') != None and \
rjson.get('operator').get('list') != None:
r.operator.data = json.dumps(rjson.get('operator').get('list'))
self.add(datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
self.add(datetime.now().strftime(DBDateFieldFormat),
addr,
r.name, r.description, str(r.enabled),
str(r.precedence), str(r.nolog), r.action, r.duration,
r.operator.type,
str(r.operator.sensitive),
r.operator.operand, r.operator.data,
str(datetime.fromtimestamp(r.created).strftime("%Y-%m-%d %H:%M:%S")))
str(datetime.fromtimestamp(r.created).strftime(DBDateFieldFormat)))
return True
except Exception as e:
@ -167,6 +174,12 @@ class Rules(QObject):
action_on_conflict="OR REPLACE"
)
def _timestamp_to_rfc3339(self, time):
"""converts timestamp to rfc3339 format"""
return "{0}Z".format(
datetime.fromtimestamp(time).isoformat(timespec='microseconds')
)
def rule_to_json(self, node, rule_name):
try:
records = self._db.get_rule(rule_name, node)
@ -178,35 +191,23 @@ class Rules(QObject):
# exclude this field when exporting to json
tempRule = MessageToJson(rule)
jRule = json.loads(tempRule)
jRule['created'] = "{0}Z".format(
datetime.fromtimestamp(rule.created).isoformat(timespec='microseconds')
)
jRule['created'] = self._timestamp_to_rfc3339(rule.created)
return json.dumps(jRule, indent=" ")
except Exception as e:
print("rule_to_json() exception:", e)
return None
def export_rule(self, node, rule_name, outdir):
"""Gets the the rule from the DB and writes it out to a directory.
A new directory per node will be created.
"""
def _export_rule_common(self, node, records, outdir):
try:
records = self._db.get_rule(rule_name, node)
if records.next() == False:
print("export_rule() get_error 2:", records)
return False
rule = Rule.new_from_records(records)
rulesdir = outdir + "/" + node
try:
os.makedirs(rulesdir, 0o700)
except Exception as e:
print("exception creating dirs:", e)
rulename = rule.name
if ".json" not in rulename:
rulename = rulename + ".json"
with open(rulesdir + "/" + rulename, 'w') as jsfile:
with open(outdir + "/" + rulename, 'w') as jsfile:
actual_json_text = MessageToJson(rule)
jRule = json.loads(actual_json_text)
jRule['created'] = self._timestamp_to_rfc3339(rule.created)
actual_json_text = json.dumps(jRule, indent=" ")
jsfile.write( actual_json_text )
return True
@ -215,32 +216,48 @@ class Rules(QObject):
return False
def export_rule(self, node, rule_name, outdir):
"""Gets the rule from the DB and writes it out to a directory.
A new directory per node will be created.
"""
try:
records = self._db.get_rule(rule_name, node)
if records.next() == False:
print("export_rule() get_error 2:", records)
return False
rulesdir = outdir + "/" + slugify(node)
try:
os.makedirs(rulesdir, 0o700)
except Exception as e:
print("exception creating dirs:", e)
return self._export_rule_common(node, records, rulesdir)
except Exception as e:
print(self.LOG_TAG, "export_rules(", node, rulesdir, ") exception:", e)
return False
def export_rules(self, node, outdir):
"""Gets the the rules from the DB and writes them out to a directory.
"""Gets the rules from the DB and writes them out to a directory.
A new directory per node will be created.
"""
records = self._db.get_rules(node)
if records == None:
return False
rulesdir = outdir + "/" + slugify(node)
try:
os.makedirs(rulesdir, 0o700)
except Exception as e:
print("exception creating dirs:", e)
try:
while records.next() != False:
rule = Rule.new_from_records(records)
rulesdir = outdir + "/" + node
try:
os.makedirs(rulesdir, 0o700)
except:
pass
rulename = rule.name
if ".json" not in rulename:
rulename = rulename + ".json"
with open(rulesdir + "/" + rulename, 'w') as jsfile:
actual_json_text = MessageToJson(rule)
jsfile.write( actual_json_text )
self._export_rule_common(node, records, rulesdir)
except Exception as e:
print(self.LOG_TAG, "export_rules(", node, outdir, ") exception:", e)
print(self.LOG_TAG, "export_rules(", node, rulesdir, ") exception:", e)
return False
return True
@ -254,7 +271,20 @@ class Rules(QObject):
for rulename in os.listdir(rulesdir):
with open(rulesdir + "/" + rulename, 'r') as f:
jsrule = f.read()
pb_rule = Parse(text=jsrule, message=ui_pb2.Rule(), ignore_unknown_fields=True)
# up until v1.6.5/v1.7.0, 'created' field was exported as timestamp.
# since > v1.6.5 it's exported in rfc3339 format, so if we fail to
# parse the rule, we'll try to convert the 'created' value from
# timestamp to rfc3339.
try:
pb_rule = Parse(text=jsrule, message=ui_pb2.Rule(), ignore_unknown_fields=True)
except:
jRule = json.loads(jsrule)
created = int(datetime.strptime(
jRule['created'], "%Y-%m-%dT%H:%M:%S.%fZ"
).timestamp())
jRule['created'] = created
jsrule = json.dumps(jRule)
pb_rule = Parse(text=jsrule, message=ui_pb2.Rule(), ignore_unknown_fields=True)
rules.append(pb_rule)
return rules