mirror of
https://github.com/evilsocket/opensnitch.git
synced 2025-03-04 00:24:40 +01:00
Merge pull request #1237 from nolancarougepro/lan_access_control
Lan access control
This commit is contained in:
commit
ed84394dfc
7 changed files with 225 additions and 29 deletions
|
@ -64,6 +64,7 @@ var (
|
|||
logMicro = false
|
||||
rulesPath = ""
|
||||
configFile = "/etc/opensnitchd/default-config.json"
|
||||
aliasFile = "network_aliases.json"
|
||||
fwConfigFile = ""
|
||||
ebpfModPath = "" // /usr/lib/opensnitchd/ebpf
|
||||
noLiveReload = false
|
||||
|
@ -576,6 +577,12 @@ func main() {
|
|||
|
||||
log.Important("Starting %s v%s", core.Name, core.Version)
|
||||
|
||||
err := rule.LoadAliases(aliasFile)
|
||||
if err != nil {
|
||||
log.Fatal("Error loading network aliases: %v", err)
|
||||
}
|
||||
log.Info("Loading network aliases from %s ...", aliasFile)
|
||||
|
||||
cfg, err := loadDiskConfiguration()
|
||||
if err != nil {
|
||||
log.Fatal("%s", err)
|
||||
|
|
14
daemon/network_aliases.json
Normal file
14
daemon/network_aliases.json
Normal file
|
@ -0,0 +1,14 @@
|
|||
{
|
||||
"LAN": [
|
||||
"10.0.0.0/8",
|
||||
"172.16.0.0/12",
|
||||
"192.168.0.0/16",
|
||||
"127.0.0.0/8",
|
||||
"::1",
|
||||
"fc00::/7"
|
||||
],
|
||||
"MULTICAST": [
|
||||
"224.0.0.0/4",
|
||||
"ff00::/8"
|
||||
]
|
||||
}
|
|
@ -1,8 +1,10 @@
|
|||
package rule
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strconv"
|
||||
|
@ -67,6 +69,60 @@ const (
|
|||
//OpQuotaRxOver = Operand("quota.recv.over") // 1000b, 1kb, 1mb, 1gb, ...
|
||||
)
|
||||
|
||||
var NetworkAliases = make(map[string][]string)
|
||||
var AliasIPCache = make(map[string][]*net.IPNet)
|
||||
|
||||
func LoadAliases(filename string) error {
|
||||
data, err := os.ReadFile(filename)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var aliases map[string][]string
|
||||
if err := json.Unmarshal(data, &aliases); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for alias, networks := range aliases {
|
||||
var ipNets []*net.IPNet
|
||||
for _, network := range networks {
|
||||
_, ipNet, err := net.ParseCIDR(network)
|
||||
if err != nil {
|
||||
// fmt.Printf("Error parsing CIDR for %s: %v\n", network, err)
|
||||
continue
|
||||
}
|
||||
ipNets = append(ipNets, ipNet)
|
||||
}
|
||||
AliasIPCache[alias] = ipNets
|
||||
// fmt.Printf("Alias '%s' loaded with the following networks: %v\n", alias, networks)
|
||||
}
|
||||
|
||||
// fmt.Println("Network aliases successfully loaded into the cache.")
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetAliasByIP(ip string) string {
|
||||
ipAddr := net.ParseIP(ip)
|
||||
for alias, ipNets := range AliasIPCache {
|
||||
for _, ipNet := range ipNets {
|
||||
if ipNet.Contains(ipAddr) {
|
||||
// fmt.Printf("Alias '%s' found for IP address: %s in network %s\n", alias, ip, ipNet.String())
|
||||
return alias
|
||||
}
|
||||
}
|
||||
}
|
||||
// fmt.Printf("No alias found for IP: %s\n", ip)
|
||||
return ""
|
||||
}
|
||||
|
||||
func (o *Operator) SerializeData() string {
|
||||
alias := GetAliasByIP(o.Data)
|
||||
if alias != "" {
|
||||
return alias
|
||||
}
|
||||
return o.Data
|
||||
}
|
||||
|
||||
type opCallback func(value interface{}) bool
|
||||
|
||||
// Operator represents what we want to filter of a connection, and how.
|
||||
|
@ -120,14 +176,39 @@ func (o *Operator) Compile() error {
|
|||
} else if o.Type == List {
|
||||
o.Operand = OpList
|
||||
} else if o.Type == Network {
|
||||
var err error
|
||||
_, o.netMask, err = net.ParseCIDR(o.Data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
o.cb = o.cmpNetwork
|
||||
}
|
||||
// Check if the operator's data is an alias present in the cache
|
||||
if ipNets, found := AliasIPCache[o.Data]; found {
|
||||
o.cb = func(value interface{}) bool {
|
||||
ip := value.(net.IP)
|
||||
matchFound := false
|
||||
|
||||
// fmt.Printf("\nStarting IP check %s for alias '%s'\n", ip, o.Data)
|
||||
|
||||
for _, ipNet := range ipNets {
|
||||
if ipNet.Contains(ip) {
|
||||
// fmt.Printf(" -> Match found: IP %s in network %s for alias '%s'\n", ip, ipNet, o.Data)
|
||||
matchFound = true
|
||||
break
|
||||
}
|
||||
}
|
||||
/*
|
||||
if !matchFound {
|
||||
fmt.Printf(" -> No match found: IP %s for alias '%s'\n", ip, o.Data)
|
||||
}
|
||||
*/
|
||||
return matchFound
|
||||
}
|
||||
// fmt.Printf("Network alias '%s' successfully compiled for the operator.\n", o.Data)
|
||||
} else {
|
||||
// Parse the data as a CIDR if it's not an alias
|
||||
_, netMask, err := net.ParseCIDR(o.Data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("CIDR parsing error: %s", err)
|
||||
}
|
||||
o.netMask = netMask
|
||||
o.cb = o.cmpNetwork
|
||||
}
|
||||
}
|
||||
if o.Operand == OpDomainsLists {
|
||||
if o.Data == "" {
|
||||
return fmt.Errorf("Operand lists is empty, nothing to load: %s", o)
|
||||
|
|
|
@ -25,6 +25,8 @@ from opensnitch.nodes import Nodes
|
|||
from opensnitch import ui_pb2
|
||||
from opensnitch.dialogs.prompt import _utils, _constants, _checksums, _details
|
||||
|
||||
from network_aliases import NetworkAliases
|
||||
|
||||
DIALOG_UI_PATH = "%s/../../res/prompt.ui" % os.path.dirname(sys.modules[__name__].__file__)
|
||||
class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
|
||||
_prompt_trigger = QtCore.pyqtSignal()
|
||||
|
@ -532,6 +534,9 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
|
|||
)
|
||||
|
||||
def _add_dst_networks_to_combo(self, combo, dst_ip):
|
||||
alias = NetworkAliases.get_alias(dst_ip)
|
||||
if alias:
|
||||
combo.addItem(QC.translate("popups", f"to {alias}"), _constants.FIELD_DST_NETWORK)
|
||||
if type(ipaddress.ip_address(dst_ip)) == ipaddress.IPv4Address:
|
||||
combo.addItem(QC.translate("popups", "to {0}").format(ipaddress.ip_network(dst_ip + "/24", strict=False)), _constants.FIELD_DST_NETWORK)
|
||||
combo.addItem(QC.translate("popups", "to {0}").format(ipaddress.ip_network(dst_ip + "/16", strict=False)), _constants.FIELD_DST_NETWORK)
|
||||
|
@ -584,7 +589,7 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
|
|||
self._rule.operator.type, self._rule.operator.operand, self._rule.operator.data = _utils.get_combo_operator(
|
||||
self.whatCombo.itemData(what_idx),
|
||||
self.whatCombo.currentText(),
|
||||
self._con)
|
||||
self._con)
|
||||
if self._rule.operator.data == "":
|
||||
print("popups: Invalid rule, discarding: ", self._rule)
|
||||
self._rule = None
|
||||
|
@ -595,6 +600,17 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
|
|||
|
||||
# TODO: move to a method
|
||||
data=[]
|
||||
|
||||
alias_selected = False
|
||||
|
||||
if self.whatCombo.itemData(what_idx) == _constants.FIELD_DST_NETWORK:
|
||||
alias = NetworkAliases.get_alias(self._con.dst_ip)
|
||||
if alias:
|
||||
_type, _operand, _data = Config.RULE_TYPE_SIMPLE, Config.OPERAND_PROCESS_PATH, self._con.process_path
|
||||
data.append({"type": _type, "operand": _operand, "data": _data})
|
||||
rule_temp_name = slugify(f"{rule_temp_name} {os.path.basename(self._con.process_path)}")
|
||||
alias_selected = True
|
||||
|
||||
if self.checkDstIP.isChecked() and self.whatCombo.itemData(what_idx) != _constants.FIELD_DST_IP:
|
||||
_type, _operand, _data = _utils.get_combo_operator(
|
||||
self.whatIPCombo.itemData(self.whatIPCombo.currentIndex()),
|
||||
|
@ -629,7 +645,7 @@ class PromptDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
|
|||
is_list_rule = True
|
||||
data.append({"type": Config.RULE_TYPE_SIMPLE, "operand": Config.OPERAND_PROCESS_PATH, "data": str(self._con.process_path)})
|
||||
|
||||
if is_list_rule:
|
||||
if is_list_rule or alias_selected:
|
||||
data.append({
|
||||
"type": self._rule.operator.type,
|
||||
"operand": self._rule.operator.operand,
|
||||
|
|
|
@ -25,6 +25,8 @@ from opensnitch.utils import (
|
|||
)
|
||||
from opensnitch.rules import Rule, Rules
|
||||
|
||||
from network_aliases import NetworkAliases
|
||||
|
||||
DIALOG_UI_PATH = "%s/../res/ruleseditor.ui" % os.path.dirname(sys.modules[__name__].__file__)
|
||||
class RulesEditorDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
|
||||
|
||||
|
@ -61,6 +63,7 @@ class RulesEditorDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
|
|||
self._old_rule_name = None
|
||||
|
||||
self.setupUi(self)
|
||||
self.load_aliases_into_menu()
|
||||
self.setWindowIcon(appicon)
|
||||
|
||||
self.ruleNameValidator = qvalidator.RestrictChars(RulesEditorDialog.INVALID_RULE_NAME_CHARS)
|
||||
|
@ -120,6 +123,13 @@ class RulesEditorDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
|
|||
if _rule != None:
|
||||
self._load_rule(rule=_rule)
|
||||
|
||||
def load_aliases_into_menu(self):
|
||||
aliases = NetworkAliases.get_alias_all()
|
||||
|
||||
for alias in reversed(aliases):
|
||||
if self.dstIPCombo.findText(alias) == -1:
|
||||
self.dstIPCombo.insertItem(0, alias)
|
||||
|
||||
def showEvent(self, event):
|
||||
super(RulesEditorDialog, self).showEvent(event)
|
||||
|
||||
|
@ -854,29 +864,34 @@ class RulesEditorDialog(QtWidgets.QDialog, uic.loadUiType(DIALOG_UI_PATH)[0]):
|
|||
|
||||
dstIPtext = self.dstIPCombo.currentText()
|
||||
|
||||
if dstIPtext == self.LAN_LABEL:
|
||||
self.rule.operator.operand = Config.OPERAND_DEST_IP
|
||||
self.rule.operator.type = Config.RULE_TYPE_REGEXP
|
||||
dstIPtext = self.LAN_RANGES
|
||||
elif dstIPtext == self.MULTICAST_LABEL:
|
||||
self.rule.operator.operand = Config.OPERAND_DEST_IP
|
||||
self.rule.operator.type = Config.RULE_TYPE_REGEXP
|
||||
dstIPtext = self.MULTICAST_RANGE
|
||||
if dstIPtext in NetworkAliases.get_alias_all():
|
||||
self.rule.operator.type = Config.RULE_TYPE_NETWORK
|
||||
self.rule.operator.operand = Config.OPERAND_DEST_NETWORK
|
||||
self.rule.operator.data = dstIPtext
|
||||
else:
|
||||
try:
|
||||
if type(ipaddress.ip_address(self.dstIPCombo.currentText())) == ipaddress.IPv4Address \
|
||||
or type(ipaddress.ip_address(self.dstIPCombo.currentText())) == ipaddress.IPv6Address:
|
||||
self.rule.operator.operand = Config.OPERAND_DEST_IP
|
||||
self.rule.operator.type = Config.RULE_TYPE_SIMPLE
|
||||
except Exception:
|
||||
self.rule.operator.operand = Config.OPERAND_DEST_NETWORK
|
||||
self.rule.operator.type = Config.RULE_TYPE_NETWORK
|
||||
|
||||
if self._is_regex(dstIPtext):
|
||||
if dstIPtext == self.LAN_LABEL:
|
||||
self.rule.operator.operand = Config.OPERAND_DEST_IP
|
||||
self.rule.operator.type = Config.RULE_TYPE_REGEXP
|
||||
if self._is_valid_regex(self.dstIPCombo.currentText()) == False:
|
||||
return False, QC.translate("rules", "Dst IP regexp error")
|
||||
dstIPtext = self.LAN_RANGES
|
||||
elif dstIPtext == self.MULTICAST_LABEL:
|
||||
self.rule.operator.operand = Config.OPERAND_DEST_IP
|
||||
self.rule.operator.type = Config.RULE_TYPE_REGEXP
|
||||
dstIPtext = self.MULTICAST_RANGE
|
||||
else:
|
||||
try:
|
||||
if type(ipaddress.ip_address(self.dstIPCombo.currentText())) == ipaddress.IPv4Address \
|
||||
or type(ipaddress.ip_address(self.dstIPCombo.currentText())) == ipaddress.IPv6Address:
|
||||
self.rule.operator.operand = Config.OPERAND_DEST_IP
|
||||
self.rule.operator.type = Config.RULE_TYPE_SIMPLE
|
||||
except Exception:
|
||||
self.rule.operator.operand = Config.OPERAND_DEST_NETWORK
|
||||
self.rule.operator.type = Config.RULE_TYPE_NETWORK
|
||||
|
||||
if self._is_regex(dstIPtext):
|
||||
self.rule.operator.operand = Config.OPERAND_DEST_IP
|
||||
self.rule.operator.type = Config.RULE_TYPE_REGEXP
|
||||
if self._is_valid_regex(self.dstIPCombo.currentText()) == False:
|
||||
return False, QC.translate("rules", "Dst IP regexp error")
|
||||
|
||||
rule_data.append(
|
||||
{
|
||||
|
|
14
ui/opensnitch/network_aliases.json
Normal file
14
ui/opensnitch/network_aliases.json
Normal file
|
@ -0,0 +1,14 @@
|
|||
{
|
||||
"LAN": [
|
||||
"10.0.0.0/8",
|
||||
"172.16.0.0/12",
|
||||
"192.168.0.0/16",
|
||||
"127.0.0.0/8",
|
||||
"::1",
|
||||
"fc00::/7"
|
||||
],
|
||||
"MULTICAST": [
|
||||
"224.0.0.0/4",
|
||||
"ff00::/8"
|
||||
]
|
||||
}
|
49
ui/opensnitch/network_aliases.py
Normal file
49
ui/opensnitch/network_aliases.py
Normal file
|
@ -0,0 +1,49 @@
|
|||
import json
|
||||
import ipaddress
|
||||
import os
|
||||
|
||||
class NetworkAliases:
|
||||
ALIASES = {}
|
||||
|
||||
@staticmethod
|
||||
def load_aliases():
|
||||
# Define the path to the network_aliases.json file
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
filename = os.path.join(script_dir, 'network_aliases.json')
|
||||
|
||||
# Check if the file exists before attempting to load it
|
||||
if not os.path.exists(filename):
|
||||
raise FileNotFoundError(f"The file '{filename}' does not exist.")
|
||||
|
||||
# Load the JSON file
|
||||
with open(filename, 'r') as f:
|
||||
NetworkAliases.ALIASES = json.load(f)
|
||||
print(f"Loaded network aliases from {filename}") # Confirmation message
|
||||
|
||||
@staticmethod
|
||||
def get_alias(ip):
|
||||
try:
|
||||
ip_obj = ipaddress.ip_address(ip)
|
||||
for alias, networks in NetworkAliases.ALIASES.items():
|
||||
for network in networks:
|
||||
net_obj = ipaddress.ip_network(network)
|
||||
if ip_obj in net_obj:
|
||||
return alias
|
||||
except ValueError:
|
||||
pass
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def get_networks_for_alias(alias):
|
||||
return NetworkAliases.ALIASES.get(alias, [])
|
||||
|
||||
@staticmethod
|
||||
def get_alias_all():
|
||||
# Return a list of all alias names
|
||||
return list(NetworkAliases.ALIASES.keys())
|
||||
|
||||
# Load aliases at startup
|
||||
try:
|
||||
NetworkAliases.load_aliases()
|
||||
except FileNotFoundError as e:
|
||||
print(e)
|
Loading…
Add table
Reference in a new issue