loggers improvements

improvements to the loggers modules:

 - allow to specify a connection timeout (there was only a write
   timeout).
 - performance improvements when building the messages to be
   written/sent.
 - allow to restart the connection with remote servers if we fill up the
   messages queue.
   This can occur for example if we connect to a remote server, start
   sending messages, but we haven't allowed other connections yet.
   In this case the connections never recovered from this state, and we
   weren't prompted to allow the needed connections.
   (more work nd testing needed)
This commit is contained in:
Gustavo Iñiguez Goia 2024-05-11 18:39:04 +02:00
parent 0b67c1a429
commit 64a698f221
Failed to generate hash of commit
6 changed files with 174 additions and 84 deletions

View file

@ -2,8 +2,8 @@ package formats
import (
"encoding/json"
"fmt"
"github.com/evilsocket/opensnitch/daemon/core"
"github.com/evilsocket/opensnitch/daemon/ui/protocol"
)
@ -64,6 +64,6 @@ func (j *JSONEventFormat) Transform(args ...interface{}) (out string) {
if err != nil {
return
}
out = fmt.Sprint(string(rawCfg), "\n\n")
out = core.ConcatStrings(string(rawCfg), "\n\n")
return
}

View file

@ -4,8 +4,11 @@ import (
"fmt"
"log/syslog"
"os"
"strconv"
"strings"
"time"
"github.com/evilsocket/opensnitch/daemon/core"
"github.com/evilsocket/opensnitch/daemon/ui/protocol"
)
@ -38,18 +41,18 @@ func (r *Rfc3164) Transform(args ...interface{}) (out string) {
switch val.(type) {
case *protocol.Connection:
con := val.(*protocol.Connection)
out = fmt.Sprint(out,
out = core.ConcatStrings(out,
" SRC=\"", con.SrcIp, "\"",
" SPT=\"", con.SrcPort, "\"",
" SPT=\"", strconv.FormatUint(uint64(con.SrcPort), 10), "\"",
" DST=\"", con.DstIp, "\"",
" DSTHOST=\"", con.DstHost, "\"",
" DPT=\"", con.DstPort, "\"",
" DPT=\"", strconv.FormatUint(uint64(con.DstPort), 10), "\"",
" PROTO=\"", con.Protocol, "\"",
" PID=\"", con.ProcessId, "\"",
" UID=\"", con.UserId, "\"",
" PID=\"", strconv.FormatUint(uint64(con.ProcessId), 10), "\"",
" UID=\"", strconv.FormatUint(uint64(con.UserId), 10), "\"",
//" COMM=", con.ProcessComm, "\"",
" PATH=\"", con.ProcessPath, "\"",
" CMDLINE=\"", con.ProcessArgs, "\"",
" CMDLINE=\"", strings.Join(con.ProcessArgs, " "), "\"",
" CWD=\"", con.ProcessCwd, "\"",
)
default:

View file

@ -4,8 +4,11 @@ import (
"fmt"
"log/syslog"
"os"
"strconv"
"strings"
"time"
"github.com/evilsocket/opensnitch/daemon/core"
"github.com/evilsocket/opensnitch/daemon/ui/protocol"
)
@ -39,18 +42,18 @@ func (r *Rfc5424) Transform(args ...interface{}) (out string) {
switch val.(type) {
case *protocol.Connection:
con := val.(*protocol.Connection)
out = fmt.Sprint(out,
out = core.ConcatStrings(out,
" SRC=\"", con.SrcIp, "\"",
" SPT=\"", con.SrcPort, "\"",
" SPT=\"", strconv.FormatUint(uint64(con.SrcPort), 10), "\"",
" DST=\"", con.DstIp, "\"",
" DSTHOST=\"", con.DstHost, "\"",
" DPT=\"", con.DstPort, "\"",
" DPT=\"", strconv.FormatUint(uint64(con.DstPort), 10), "\"",
" PROTO=\"", con.Protocol, "\"",
" PID=\"", con.ProcessId, "\"",
" UID=\"", con.UserId, "\"",
" PID=\"", strconv.FormatUint(uint64(con.ProcessId), 10), "\"",
" UID=\"", strconv.FormatUint(uint64(con.UserId), 10), "\"",
//" COMM=", con.ProcessComm, "\"",
" PATH=\"", con.ProcessPath, "\"",
" CMDLINE=\"", con.ProcessArgs, "\"",
" CMDLINE=\"", strings.Join(con.ProcessArgs, " "), "\"",
" CWD=\"", con.ProcessCwd, "\"",
)
default:

View file

@ -3,6 +3,8 @@ package loggers
import (
"context"
"fmt"
"sync"
"time"
"github.com/evilsocket/opensnitch/daemon/log"
)
@ -14,22 +16,32 @@ const logTag = "opensnitch"
type Logger interface {
Transform(...interface{}) string
Write(string)
Close() error
}
// LoggerConfig holds the configuration of a logger
type LoggerConfig struct {
// Name of the logger: syslog, elastic, ...
Name string
// Format: rfc5424, csv, json, ...
Format string
// Protocol: udp, tcp
Protocol string
// Server: 127.0.0.1:514
Server string
// WriteTimeout:
// WriteTimeout ...
WriteTimeout string
// ConnectTimeout ...
ConnectTimeout string
// Tag: opensnitchd, mytag, ...
Tag string
// Workers: number of workers
Workers int
}
@ -38,15 +50,20 @@ type LoggerConfig struct {
type LoggerManager struct {
ctx context.Context
cancel context.CancelFunc
configs []LoggerConfig
loggers map[string]Logger
msgs chan []interface{}
count int
workers int
queueFullHits int
mu *sync.RWMutex
}
// NewLoggerManager instantiates all the configured loggers.
func NewLoggerManager() *LoggerManager {
ctx, cancel := context.WithCancel(context.Background())
lm := &LoggerManager{
mu: &sync.RWMutex{},
ctx: ctx,
cancel: cancel,
loggers: make(map[string]Logger),
@ -56,60 +73,88 @@ func NewLoggerManager() *LoggerManager {
}
// Load loggers configuration and initialize them.
func (l *LoggerManager) Load(configs []LoggerConfig, workers int) {
func (l *LoggerManager) Load(configs []LoggerConfig) {
l.ctx, l.cancel = context.WithCancel(context.Background())
l.mu.Lock()
defer l.mu.Unlock()
l.configs = configs
for _, cfg := range configs {
switch cfg.Name {
case LOGGER_REMOTE, LOGGER_REMOTE_SYSLOG, LOGGER_SYSLOG:
l.workers += cfg.Workers
l.count++
}
}
if l.count == 0 {
return
}
if l.workers == 0 {
l.workers = 4
}
// TODO: allow to configure messages queue size
l.msgs = make(chan []interface{}, l.workers)
for i := 0; i < l.workers; i++ {
go newWorker(i, l.ctx.Done(), l.msgs, l.write)
}
for _, cfg := range configs {
switch cfg.Name {
case LOGGER_REMOTE:
if lgr, err := NewRemote(&cfg); err == nil {
l.count++
l.loggers[fmt.Sprint(lgr.Name, lgr.cfg.Server, lgr.cfg.Protocol)] = lgr
workers += cfg.Workers
}
case LOGGER_REMOTE_SYSLOG:
if lgr, err := NewRemoteSyslog(&cfg); err == nil {
l.count++
l.loggers[fmt.Sprint(lgr.Name, lgr.cfg.Server, lgr.cfg.Protocol)] = lgr
workers += cfg.Workers
}
case LOGGER_SYSLOG:
if lgr, err := NewSyslog(&cfg); err == nil {
l.count++
l.loggers[lgr.Name] = lgr
workers += cfg.Workers
}
}
}
if workers == 0 {
workers = 4
}
l.msgs = make(chan []interface{}, workers)
for i := 0; i < workers; i++ {
go newWorker(i, l)
}
}
// Reload stops and loads the configured loggers again
func (l *LoggerManager) Reload() {
l.Stop()
l.Load(l.configs)
}
// Stop closes the opened loggers, and closes the workers
func (l *LoggerManager) Stop() {
l.cancel()
l.mu.Lock()
defer l.mu.Unlock()
l.count = 0
l.workers = 0
l.queueFullHits = 0
l.cancel()
for _, lg := range l.loggers {
lg.Close()
}
l.loggers = make(map[string]Logger)
}
func (l *LoggerManager) write(args ...interface{}) {
//l.mu.RLock()
//defer l.mu.RUnlock()
for _, logger := range l.loggers {
logger.Write(logger.Transform(args...))
}
}
func newWorker(id int, l *LoggerManager) {
func newWorker(id int, done <-chan struct{}, msgs chan []interface{}, write func(args ...interface{})) {
for {
select {
case <-l.ctx.Done():
case <-done:
goto Exit
case msg := <-l.msgs:
l.write(msg)
case msg := <-msgs:
write(msg)
}
}
Exit:
@ -118,10 +163,30 @@ Exit:
// Log sends data to the loggers.
func (l *LoggerManager) Log(args ...interface{}) {
if l.count > 0 {
go func(args ...interface{}) {
argv := args
l.msgs <- argv
}(args...)
if l.count == 0 {
return
}
// Sending messages to the queue (channel) should be instantaneous, but there're
// several scenarios where we can end up filling up the queue (channel):
// - If we're not connected to the server (GUI), and we need to allow some
// connections.
// - If there's a high load, all workers busy, and writing the logs to the
// logger take too much time.
// In these and other scenarios, if we try to send more than <queueFullHits> times
// while the queue (channel) is full, we'll reload the loggers.
select {
case <-time.After(time.Millisecond * 1):
l.mu.Lock()
log.Debug("loggerMgr.Log() TIMEOUT dispatching log, queued: %d, queue full hits: %d", len(l.msgs), l.queueFullHits)
l.queueFullHits++
// TODO: make queueFullHits configurable
needsReload := len(l.msgs) == l.workers && l.queueFullHits > 30
l.mu.Unlock()
if needsReload {
// FIXME: races occurs on l.write() and l.Load()
l.Reload()
}
case l.msgs <- args:
}
}

View file

@ -10,6 +10,7 @@ import (
"sync/atomic"
"time"
"github.com/evilsocket/opensnitch/daemon/core"
"github.com/evilsocket/opensnitch/daemon/log"
"github.com/evilsocket/opensnitch/daemon/log/formats"
)
@ -31,6 +32,7 @@ type Remote struct {
Tag string
Hostname string
Timeout time.Duration
ConnectTimeout time.Duration
errors uint32
maxErrors uint32
status uint32
@ -66,13 +68,18 @@ func NewRemote(cfg *LoggerConfig) (*Remote, error) {
if err != nil {
sys.Hostname = "localhost"
}
if cfg.WriteTimeout == "" {
cfg.WriteTimeout = writeTimeout
sys.Timeout, err = time.ParseDuration(cfg.WriteTimeout)
if err != nil || cfg.WriteTimeout == "" {
sys.Timeout = writeTimeout
}
sys.ConnectTimeout, err = time.ParseDuration(cfg.ConnectTimeout)
if err != nil || cfg.ConnectTimeout == "" {
sys.ConnectTimeout = connTimeout
}
sys.Timeout = (time.Second * 15)
if err = sys.Open(); err != nil {
log.Error("Error loading logger: %s", err)
log.Error("Error loading logger [%s]: %s", sys.Name, err)
return nil, err
}
log.Info("[%s] initialized: %v", sys.Name, cfg)
@ -87,7 +94,7 @@ func (s *Remote) Open() (err error) {
return fmt.Errorf("[%s] Server address must not be empty", s.Name)
}
s.mu.Lock()
s.netConn, err = s.Dial(s.cfg.Protocol, s.cfg.Server, s.Timeout*5)
s.netConn, err = s.Dial(s.cfg.Protocol, s.cfg.Server, s.ConnectTimeout)
s.mu.Unlock()
if err == nil {
@ -113,12 +120,10 @@ func (s *Remote) Dial(proto, addr string, connTimeout time.Duration) (netConn ne
// Close closes the writer object
func (s *Remote) Close() (err error) {
s.mu.RLock()
if s.netConn != nil {
err = s.netConn.Close()
//s.netConn.conn = nil
s.netConn = nil
}
s.mu.RUnlock()
atomic.StoreUint32(&s.status, DISCONNECTED)
return
}
@ -158,9 +163,13 @@ func (s *Remote) Write(msg string) {
// and have a continuous stream of events. Otherwise it'd stop working.
// I haven't figured out yet why these write errors ocurr.
s.mu.Lock()
defer s.mu.Unlock()
s.netConn.SetWriteDeadline(deadline)
if s.netConn == nil {
s.ReOpen()
return
}
_, err := s.netConn.Write([]byte(msg))
s.mu.Unlock()
if err == nil {
return
}
@ -178,5 +187,5 @@ func (s *Remote) formatLine(msg string) string {
if !strings.HasSuffix(msg, "\n") {
nl = "\n"
}
return fmt.Sprintf("%s%s", msg, nl)
return core.ConcatStrings(msg, nl)
}

View file

@ -14,11 +14,16 @@ import (
const (
LOGGER_REMOTE_SYSLOG = "remote_syslog"
writeTimeout = "1s"
// restart syslog connection after these amount of errors
maxAllowedErrors = 10
)
var (
// default write / connect timeouts
writeTimeout, _ = time.ParseDuration("1s")
connTimeout, _ = time.ParseDuration("5s")
)
// connection status
const (
DISCONNECTED = iota
@ -34,6 +39,7 @@ type RemoteSyslog struct {
netConn net.Conn
Hostname string
Timeout time.Duration
ConnectTimeout time.Duration
errors uint32
status uint32
}
@ -66,13 +72,18 @@ func NewRemoteSyslog(cfg *LoggerConfig) (*RemoteSyslog, error) {
if err != nil {
sys.Hostname = "localhost"
}
if cfg.WriteTimeout == "" {
cfg.WriteTimeout = writeTimeout
sys.Timeout, err = time.ParseDuration(cfg.WriteTimeout)
if err != nil || cfg.WriteTimeout == "" {
sys.Timeout = writeTimeout
}
sys.ConnectTimeout, err = time.ParseDuration(cfg.ConnectTimeout)
if err != nil || cfg.ConnectTimeout == "" {
sys.ConnectTimeout = connTimeout
}
sys.Timeout, _ = time.ParseDuration(cfg.WriteTimeout)
if err = sys.Open(); err != nil {
log.Error("Error loading logger: %s", err)
log.Error("Error loading logger [%s]: %s", sys.Name, err)
return nil, err
}
log.Info("[%s] initialized: %v", sys.Name, cfg)
@ -87,7 +98,7 @@ func (s *RemoteSyslog) Open() (err error) {
return fmt.Errorf("[%s] Server address must not be empty", s.Name)
}
s.mu.Lock()
s.netConn, err = s.Dial(s.cfg.Protocol, s.cfg.Server, s.Timeout*5)
s.netConn, err = s.Dial(s.cfg.Protocol, s.cfg.Server, s.ConnectTimeout)
s.mu.Unlock()
if err == nil {
@ -113,13 +124,12 @@ func (s *RemoteSyslog) Dial(proto, addr string, connTimeout time.Duration) (netC
// Close closes the writer object
func (s *RemoteSyslog) Close() (err error) {
s.mu.RLock()
defer s.mu.RUnlock()
//s.mu.RLock()
if s.netConn != nil {
err = s.netConn.Close()
//s.netConn.conn = nil
s.netConn = nil
}
//s.mu.RUnlock()
atomic.StoreUint32(&s.status, DISCONNECTED)
return
}