allow to configure multiple queues

Added option to configure multiple nfqueues.

Post with detailed information about the performance:
https://github.com/evilsocket/opensnitch/discussions/1104

After using -queues 1:6 , you need to configure the rules manually:
(for TCP)
nft insert rule inet mangle output tcp flags syn / fin,syn,rst,ack queue to numgen inc mod 6

TODO:
 - Configure queues in the fw automatically based on the queues defined.
 - Investigate if we need to use runtime.LockOSThread() in NewQueue().
 - Allow to use multiple instances of the daemon:
    * One daemon acts as the main daemon, connected to the server (UI) and
    managing the rules and notifications.
    * The other daemons only intercept and apply verdicts on packets, with
    the rules loaded from a central directory (/etc/opensnitchd/rules)

FIXME:
 - There's a deadlock repeating the packets when a connection is waiting
   for approval.
 - Investigate the high mem consumption under heavy load.
This commit is contained in:
Gustavo Iñiguez Goia 2024-04-05 18:09:23 +02:00
parent 2ec37ed593
commit f032575af0
Failed to generate hash of commit

View file

@ -34,6 +34,8 @@ import (
"runtime" "runtime"
"runtime/pprof" "runtime/pprof"
"runtime/trace" "runtime/trace"
"strconv"
"strings"
"syscall" "syscall"
"time" "time"
@ -68,6 +70,7 @@ var (
ebpfModPath = "" // /usr/lib/opensnitchd/ebpf ebpfModPath = "" // /usr/lib/opensnitchd/ebpf
noLiveReload = false noLiveReload = false
queueNum = 0 queueNum = 0
queues = ""
repeatQueueNum int //will be set later to queueNum + 1 repeatQueueNum int //will be set later to queueNum + 1
workers = 16 workers = 16
debug = false debug = false
@ -91,7 +94,7 @@ var (
queue = (*netfilter.Queue)(nil) queue = (*netfilter.Queue)(nil)
repeatQueue = (*netfilter.Queue)(nil) repeatQueue = (*netfilter.Queue)(nil)
repeatPktChan = (<-chan netfilter.Packet)(nil) repeatPktChan = (<-chan netfilter.Packet)(nil)
pktChan = (<-chan netfilter.Packet)(nil) pktChan = [](<-chan netfilter.Packet)(nil)
wrkChan = (chan netfilter.Packet)(nil) wrkChan = (chan netfilter.Packet)(nil)
sigChan = (chan os.Signal)(nil) sigChan = (chan os.Signal)(nil)
exitChan = (chan bool)(nil) exitChan = (chan bool)(nil)
@ -106,6 +109,7 @@ func init() {
flag.StringVar(&procmonMethod, "process-monitor-method", procmonMethod, "How to search for processes path. Options: ftrace, audit (experimental), ebpf (experimental), proc (default)") flag.StringVar(&procmonMethod, "process-monitor-method", procmonMethod, "How to search for processes path. Options: ftrace, audit (experimental), ebpf (experimental), proc (default)")
flag.StringVar(&uiSocket, "ui-socket", uiSocket, "Path the UI gRPC service listener (https://github.com/grpc/grpc/blob/master/doc/naming.md).") flag.StringVar(&uiSocket, "ui-socket", uiSocket, "Path the UI gRPC service listener (https://github.com/grpc/grpc/blob/master/doc/naming.md).")
flag.IntVar(&queueNum, "queue-num", queueNum, "Netfilter queue number.") flag.IntVar(&queueNum, "queue-num", queueNum, "Netfilter queue number.")
flag.StringVar(&queues, "queues", queues, "Netfilter total queues. Format: -queues 1:10 (starts 10 queues)")
flag.IntVar(&workers, "workers", workers, "Number of concurrent workers.") flag.IntVar(&workers, "workers", workers, "Number of concurrent workers.")
flag.BoolVar(&noLiveReload, "no-live-reload", debug, "Disable rules live reloading.") flag.BoolVar(&noLiveReload, "no-live-reload", debug, "Disable rules live reloading.")
@ -154,16 +158,10 @@ func overwriteLogging() bool {
func setupQueues() { func setupQueues() {
// prepare the queue // prepare the queue
var err error var err error
queue, err = netfilter.NewQueue(uint16(queueNum))
if err != nil {
msg := fmt.Sprintf("Error creating queue #%d: %s", queueNum, err)
uiClient.SendWarningAlert(msg)
log.Warning("Is opensnitchd already running?")
log.Fatal(msg)
}
pktChan = queue.Packets()
repeatQueueNum = queueNum + 1 // use upper range numbers for the repeating queue, not to interfere with
// the queue ranges.
repeatQueueNum = 32000 - queueNum
repeatQueue, err = netfilter.NewQueue(uint16(repeatQueueNum)) repeatQueue, err = netfilter.NewQueue(uint16(repeatQueueNum))
if err != nil { if err != nil {
@ -173,6 +171,25 @@ func setupQueues() {
log.Warning(msg) log.Warning(msg)
} }
repeatPktChan = repeatQueue.Packets() repeatPktChan = repeatQueue.Packets()
// the format to specify multiple queues is 1:10
qs := strings.SplitN(queues, ":", 2)
lowb := uint64(0)
upb := uint64(1)
if len(qs) > 1 {
lowb, err = strconv.ParseUint(qs[0], 10, 16)
if err != nil {
lowb = 0
}
upb, err = strconv.ParseUint(qs[1], 10, 16)
if err != nil {
upb = lowb + 1
}
}
for i := lowb; i < upb; i++ {
q, _ := netfilter.NewQueue(uint16(i))
pktChan = append(pktChan, q.Packets())
}
} }
func setupLogging() { func setupLogging() {
@ -258,12 +275,9 @@ func worker(id int) {
case <-ctx.Done(): case <-ctx.Done():
goto Exit goto Exit
default: default:
pkt, ok := <-wrkChan pkt := <-wrkChan
if !ok {
log.Debug("worker channel closed %d", id)
goto Exit
}
onPacket(pkt) onPacket(pkt)
} }
} }
Exit: Exit:
@ -273,7 +287,7 @@ Exit:
func setupWorkers() { func setupWorkers() {
log.Debug("Starting %d workers ...", workers) log.Debug("Starting %d workers ...", workers)
// setup the workers // setup the workers
wrkChan = make(chan netfilter.Packet) wrkChan = make(chan netfilter.Packet, workers)
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go worker(i) go worker(i)
} }
@ -638,18 +652,26 @@ func main() {
initSystemdResolvedMonitor() initSystemdResolvedMonitor()
log.Info("Running on netfilter queue #%d ...", queueNum) log.Info("Running on netfilter queue #%d ...", queueNum)
for { for _, p := range pktChan {
select { go func(c <-chan netfilter.Packet) {
case <-ctx.Done(): for {
goto Exit select {
case pkt, ok := <-pktChan: case <-ctx.Done():
if !ok { return
goto Exit case pkt, ok := <-c:
if !ok {
return
}
wrkChan <- pkt
}
} }
wrkChan <- pkt }(p)
}
} }
Exit: select {
case <-sigChan:
case <-ctx.Done():
}
close(wrkChan) close(wrkChan)
doCleanup(queue, repeatQueue) doCleanup(queue, repeatQueue)
os.Exit(0) os.Exit(0)