changed addrs polling by async events

For the eBPF monitoring method, we listed and stored local addresses
every second, so that we could later check if the source IP of an
outbound connection was local or not, because sometimes we received
outbound connections like:
 443:1.1.1.1 -> 192.168.1.123:12345

This could have been alread solved on this change e090833, so maybe
we no longer need this code.

 - Now we subscribe to local addresses events, to receive add/remove
   events asynchronously, without having to list local addrs
   every second, alliviating CPU usage.

 - Fixed creating context object to cancel subroutines. It was not
   working properly when switching between proc monitor methods.
This commit is contained in:
Gustavo Iñiguez Goia 2023-07-23 21:14:47 +02:00
parent e090833d29
commit f652174f75
Failed to generate hash of commit
4 changed files with 76 additions and 25 deletions

34
daemon/netlink/ifaces.go Normal file
View file

@ -0,0 +1,34 @@
package netlink
import (
"github.com/evilsocket/opensnitch/daemon/log"
"github.com/vishvananda/netlink"
)
// GetLocalAddrs returns the list of local IPs
func GetLocalAddrs() map[string]netlink.Addr {
localAddresses := make(map[string]netlink.Addr)
addr, err := netlink.AddrList(nil, netlink.FAMILY_ALL)
if err != nil {
log.Error("eBPF error looking up this machine's addresses via netlink: %v", err)
return nil
}
for _, a := range addr {
log.Debug("local addr: %+v\n", a)
localAddresses[a.IP.String()] = a
}
return localAddresses
}
// AddrUpdateToAddr translate AddrUpdate struct to Addr
func AddrUpdateToAddr(addr *netlink.AddrUpdate) netlink.Addr {
return netlink.Addr{
IPNet: &addr.LinkAddress,
LinkIndex: addr.LinkIndex,
Flags: addr.Flags,
Scope: addr.Scope,
PreferedLft: addr.PreferedLft,
ValidLft: addr.ValidLft,
}
}

View file

@ -4,7 +4,6 @@ import (
"context"
"encoding/binary"
"fmt"
"net"
"sync"
"syscall"
"unsafe"
@ -14,6 +13,7 @@ import (
daemonNetlink "github.com/evilsocket/opensnitch/daemon/netlink"
"github.com/evilsocket/opensnitch/daemon/procmon"
elf "github.com/iovisor/gobpf/elf"
"github.com/vishvananda/netlink"
)
//contains pointers to ebpf maps for a given protocol (tcp/udp/v6)
@ -48,14 +48,15 @@ var (
TCP: make(map[*daemonNetlink.Socket]int),
TCPv6: make(map[*daemonNetlink.Socket]int),
}
ctxTasks, cancelTasks = context.WithCancel(context.Background())
running = false
ctxTasks context.Context
cancelTasks context.CancelFunc
running = false
maxKernelEvents = 32768
kernelEvents = make(chan interface{}, maxKernelEvents)
// list of local addresses of this machine
localAddresses []net.IP
localAddresses = make(map[string]netlink.Addr)
hostByteOrder binary.ByteOrder
)
@ -108,6 +109,7 @@ func Start() error {
}
}
ctxTasks, cancelTasks = context.WithCancel(context.Background())
ebpfCache = NewEbpfCache()
initEventsStreamer()

View file

@ -220,10 +220,6 @@ func findInAlreadyEstablishedTCP(proto string, srcPort uint, srcIP net.IP, dstIP
func findAddressInLocalAddresses(addr net.IP) bool {
lock.Lock()
defer lock.Unlock()
for _, a := range localAddresses {
if addr.String() == a.String() {
return true
}
}
return false
_, found := localAddresses[addr.String()]
return found
}

View file

@ -45,29 +45,47 @@ func monitorCache() {
Exit:
}
// maintains a list of this machine's local addresses
// TODO: use netlink.AddrSubscribeWithOptions()
// maintain a list of this machine's local addresses
func monitorLocalAddresses() {
newAddrChan := make(chan netlink.AddrUpdate)
done := make(chan struct{})
defer close(done)
lock.Lock()
localAddresses = daemonNetlink.GetLocalAddrs()
lock.Unlock()
netlink.AddrSubscribeWithOptions(newAddrChan, done,
netlink.AddrSubscribeOptions{
ErrorCallback: func(err error) {
log.Error("AddrSubscribeWithOptions error: %s", err)
},
ListExisting: true,
})
for {
select {
case <-ctxTasks.Done():
done <- struct{}{}
goto Exit
default:
addr, err := netlink.AddrList(nil, netlink.FAMILY_ALL)
if err != nil {
log.Error("eBPF error looking up this machine's addresses via netlink: %v", err)
continue
case addr := <-newAddrChan:
if addr.NewAddr && !findAddressInLocalAddresses(addr.LinkAddress.IP) {
log.Debug("local addr added: %+v\n", addr)
lock.Lock()
localAddresses[addr.LinkAddress.IP.String()] = daemonNetlink.AddrUpdateToAddr(&addr)
lock.Unlock()
} else if !addr.NewAddr {
log.Debug("local addr removed: %+v\n", addr)
lock.Lock()
delete(localAddresses, addr.LinkAddress.IP.String())
lock.Unlock()
}
lock.Lock()
localAddresses = nil
for _, a := range addr {
localAddresses = append(localAddresses, a.IP)
}
lock.Unlock()
time.Sleep(time.Second * 1)
}
}
Exit:
log.Debug("monitorLocalAddresses exited")
}
// monitorAlreadyEstablished makes sure that when an already-established connection is closed
@ -79,7 +97,7 @@ func monitorAlreadyEstablished() {
case <-ctxTasks.Done():
goto Exit
default:
time.Sleep(time.Second * 1)
time.Sleep(time.Second * 2)
socketListTCP, err := daemonNetlink.SocketsDump(uint8(syscall.AF_INET), uint8(syscall.IPPROTO_TCP))
if err != nil {
log.Debug("eBPF error in dumping TCP sockets via netlink")
@ -124,6 +142,7 @@ func monitorAlreadyEstablished() {
}
}
Exit:
log.Debug("monitorAlreadyEstablished exited")
}
func socketsAreEqual(aSocket, bSocket *daemonNetlink.Socket) bool {