From 64a698f221218bca931f81fce250df405a30e8e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gustavo=20I=C3=B1iguez=20Goia?= Date: Sat, 11 May 2024 18:39:04 +0200 Subject: [PATCH] loggers improvements improvements to the loggers modules: - allow to specify a connection timeout (there was only a write timeout). - performance improvements when building the messages to be written/sent. - allow to restart the connection with remote servers if we fill up the messages queue. This can occur for example if we connect to a remote server, start sending messages, but we haven't allowed other connections yet. In this case the connections never recovered from this state, and we weren't prompted to allow the needed connections. (more work nd testing needed) --- daemon/log/formats/json.go | 4 +- daemon/log/formats/rfc3164.go | 15 ++-- daemon/log/formats/rfc5424.go | 15 ++-- daemon/log/loggers/logger.go | 129 +++++++++++++++++++++------- daemon/log/loggers/remote.go | 53 +++++++----- daemon/log/loggers/remote_syslog.go | 42 +++++---- 6 files changed, 174 insertions(+), 84 deletions(-) diff --git a/daemon/log/formats/json.go b/daemon/log/formats/json.go index 438c032b..9c074a7a 100644 --- a/daemon/log/formats/json.go +++ b/daemon/log/formats/json.go @@ -2,8 +2,8 @@ package formats import ( "encoding/json" - "fmt" + "github.com/evilsocket/opensnitch/daemon/core" "github.com/evilsocket/opensnitch/daemon/ui/protocol" ) @@ -64,6 +64,6 @@ func (j *JSONEventFormat) Transform(args ...interface{}) (out string) { if err != nil { return } - out = fmt.Sprint(string(rawCfg), "\n\n") + out = core.ConcatStrings(string(rawCfg), "\n\n") return } diff --git a/daemon/log/formats/rfc3164.go b/daemon/log/formats/rfc3164.go index 7138bacf..0f6a933b 100644 --- a/daemon/log/formats/rfc3164.go +++ b/daemon/log/formats/rfc3164.go @@ -4,8 +4,11 @@ import ( "fmt" "log/syslog" "os" + "strconv" + "strings" "time" + "github.com/evilsocket/opensnitch/daemon/core" "github.com/evilsocket/opensnitch/daemon/ui/protocol" ) @@ -38,18 +41,18 @@ func (r *Rfc3164) Transform(args ...interface{}) (out string) { switch val.(type) { case *protocol.Connection: con := val.(*protocol.Connection) - out = fmt.Sprint(out, + out = core.ConcatStrings(out, " SRC=\"", con.SrcIp, "\"", - " SPT=\"", con.SrcPort, "\"", + " SPT=\"", strconv.FormatUint(uint64(con.SrcPort), 10), "\"", " DST=\"", con.DstIp, "\"", " DSTHOST=\"", con.DstHost, "\"", - " DPT=\"", con.DstPort, "\"", + " DPT=\"", strconv.FormatUint(uint64(con.DstPort), 10), "\"", " PROTO=\"", con.Protocol, "\"", - " PID=\"", con.ProcessId, "\"", - " UID=\"", con.UserId, "\"", + " PID=\"", strconv.FormatUint(uint64(con.ProcessId), 10), "\"", + " UID=\"", strconv.FormatUint(uint64(con.UserId), 10), "\"", //" COMM=", con.ProcessComm, "\"", " PATH=\"", con.ProcessPath, "\"", - " CMDLINE=\"", con.ProcessArgs, "\"", + " CMDLINE=\"", strings.Join(con.ProcessArgs, " "), "\"", " CWD=\"", con.ProcessCwd, "\"", ) default: diff --git a/daemon/log/formats/rfc5424.go b/daemon/log/formats/rfc5424.go index 40e8c910..544af07f 100644 --- a/daemon/log/formats/rfc5424.go +++ b/daemon/log/formats/rfc5424.go @@ -4,8 +4,11 @@ import ( "fmt" "log/syslog" "os" + "strconv" + "strings" "time" + "github.com/evilsocket/opensnitch/daemon/core" "github.com/evilsocket/opensnitch/daemon/ui/protocol" ) @@ -39,18 +42,18 @@ func (r *Rfc5424) Transform(args ...interface{}) (out string) { switch val.(type) { case *protocol.Connection: con := val.(*protocol.Connection) - out = fmt.Sprint(out, + out = core.ConcatStrings(out, " SRC=\"", con.SrcIp, "\"", - " SPT=\"", con.SrcPort, "\"", + " SPT=\"", strconv.FormatUint(uint64(con.SrcPort), 10), "\"", " DST=\"", con.DstIp, "\"", " DSTHOST=\"", con.DstHost, "\"", - " DPT=\"", con.DstPort, "\"", + " DPT=\"", strconv.FormatUint(uint64(con.DstPort), 10), "\"", " PROTO=\"", con.Protocol, "\"", - " PID=\"", con.ProcessId, "\"", - " UID=\"", con.UserId, "\"", + " PID=\"", strconv.FormatUint(uint64(con.ProcessId), 10), "\"", + " UID=\"", strconv.FormatUint(uint64(con.UserId), 10), "\"", //" COMM=", con.ProcessComm, "\"", " PATH=\"", con.ProcessPath, "\"", - " CMDLINE=\"", con.ProcessArgs, "\"", + " CMDLINE=\"", strings.Join(con.ProcessArgs, " "), "\"", " CWD=\"", con.ProcessCwd, "\"", ) default: diff --git a/daemon/log/loggers/logger.go b/daemon/log/loggers/logger.go index 05238574..1eccf254 100644 --- a/daemon/log/loggers/logger.go +++ b/daemon/log/loggers/logger.go @@ -3,6 +3,8 @@ package loggers import ( "context" "fmt" + "sync" + "time" "github.com/evilsocket/opensnitch/daemon/log" ) @@ -14,39 +16,54 @@ const logTag = "opensnitch" type Logger interface { Transform(...interface{}) string Write(string) + Close() error } // LoggerConfig holds the configuration of a logger type LoggerConfig struct { // Name of the logger: syslog, elastic, ... Name string + // Format: rfc5424, csv, json, ... Format string + // Protocol: udp, tcp Protocol string + // Server: 127.0.0.1:514 Server string - // WriteTimeout: + + // WriteTimeout ... WriteTimeout string + + // ConnectTimeout ... + ConnectTimeout string + // Tag: opensnitchd, mytag, ... Tag string + // Workers: number of workers Workers int } // LoggerManager represents the LoggerManager. type LoggerManager struct { - ctx context.Context - cancel context.CancelFunc - loggers map[string]Logger - msgs chan []interface{} - count int + ctx context.Context + cancel context.CancelFunc + configs []LoggerConfig + loggers map[string]Logger + msgs chan []interface{} + count int + workers int + queueFullHits int + mu *sync.RWMutex } // NewLoggerManager instantiates all the configured loggers. func NewLoggerManager() *LoggerManager { ctx, cancel := context.WithCancel(context.Background()) lm := &LoggerManager{ + mu: &sync.RWMutex{}, ctx: ctx, cancel: cancel, loggers: make(map[string]Logger), @@ -56,60 +73,88 @@ func NewLoggerManager() *LoggerManager { } // Load loggers configuration and initialize them. -func (l *LoggerManager) Load(configs []LoggerConfig, workers int) { +func (l *LoggerManager) Load(configs []LoggerConfig) { + l.ctx, l.cancel = context.WithCancel(context.Background()) + + l.mu.Lock() + defer l.mu.Unlock() + l.configs = configs + + for _, cfg := range configs { + switch cfg.Name { + case LOGGER_REMOTE, LOGGER_REMOTE_SYSLOG, LOGGER_SYSLOG: + l.workers += cfg.Workers + l.count++ + } + } + if l.count == 0 { + return + } + if l.workers == 0 { + l.workers = 4 + } + + // TODO: allow to configure messages queue size + l.msgs = make(chan []interface{}, l.workers) + for i := 0; i < l.workers; i++ { + go newWorker(i, l.ctx.Done(), l.msgs, l.write) + } + for _, cfg := range configs { switch cfg.Name { case LOGGER_REMOTE: if lgr, err := NewRemote(&cfg); err == nil { - l.count++ l.loggers[fmt.Sprint(lgr.Name, lgr.cfg.Server, lgr.cfg.Protocol)] = lgr - workers += cfg.Workers } case LOGGER_REMOTE_SYSLOG: if lgr, err := NewRemoteSyslog(&cfg); err == nil { - l.count++ l.loggers[fmt.Sprint(lgr.Name, lgr.cfg.Server, lgr.cfg.Protocol)] = lgr - workers += cfg.Workers } case LOGGER_SYSLOG: if lgr, err := NewSyslog(&cfg); err == nil { - l.count++ l.loggers[lgr.Name] = lgr - workers += cfg.Workers } } } - if workers == 0 { - workers = 4 - } - - l.msgs = make(chan []interface{}, workers) - for i := 0; i < workers; i++ { - go newWorker(i, l) - } - } +// Reload stops and loads the configured loggers again +func (l *LoggerManager) Reload() { + l.Stop() + l.Load(l.configs) +} + +// Stop closes the opened loggers, and closes the workers func (l *LoggerManager) Stop() { - l.cancel() + l.mu.Lock() + defer l.mu.Unlock() l.count = 0 + l.workers = 0 + l.queueFullHits = 0 + + l.cancel() + for _, lg := range l.loggers { + lg.Close() + } l.loggers = make(map[string]Logger) } func (l *LoggerManager) write(args ...interface{}) { + //l.mu.RLock() + //defer l.mu.RUnlock() for _, logger := range l.loggers { logger.Write(logger.Transform(args...)) } } -func newWorker(id int, l *LoggerManager) { +func newWorker(id int, done <-chan struct{}, msgs chan []interface{}, write func(args ...interface{})) { for { select { - case <-l.ctx.Done(): + case <-done: goto Exit - case msg := <-l.msgs: - l.write(msg) + case msg := <-msgs: + write(msg) } } Exit: @@ -118,10 +163,30 @@ Exit: // Log sends data to the loggers. func (l *LoggerManager) Log(args ...interface{}) { - if l.count > 0 { - go func(args ...interface{}) { - argv := args - l.msgs <- argv - }(args...) + if l.count == 0 { + return + } + // Sending messages to the queue (channel) should be instantaneous, but there're + // several scenarios where we can end up filling up the queue (channel): + // - If we're not connected to the server (GUI), and we need to allow some + // connections. + // - If there's a high load, all workers busy, and writing the logs to the + // logger take too much time. + // In these and other scenarios, if we try to send more than times + // while the queue (channel) is full, we'll reload the loggers. + select { + case <-time.After(time.Millisecond * 1): + l.mu.Lock() + log.Debug("loggerMgr.Log() TIMEOUT dispatching log, queued: %d, queue full hits: %d", len(l.msgs), l.queueFullHits) + l.queueFullHits++ + // TODO: make queueFullHits configurable + needsReload := len(l.msgs) == l.workers && l.queueFullHits > 30 + l.mu.Unlock() + + if needsReload { + // FIXME: races occurs on l.write() and l.Load() + l.Reload() + } + case l.msgs <- args: } } diff --git a/daemon/log/loggers/remote.go b/daemon/log/loggers/remote.go index 637fbfe3..fab2c7fa 100644 --- a/daemon/log/loggers/remote.go +++ b/daemon/log/loggers/remote.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + "github.com/evilsocket/opensnitch/daemon/core" "github.com/evilsocket/opensnitch/daemon/log" "github.com/evilsocket/opensnitch/daemon/log/formats" ) @@ -22,18 +23,19 @@ const ( // It can write to the local or a remote daemon, UDP or TCP. // It supports writing events in RFC5424, RFC3164, CSV and JSON formats. type Remote struct { - mu *sync.RWMutex - Writer *syslog.Writer - cfg *LoggerConfig - logFormat formats.LoggerFormat - netConn net.Conn - Name string - Tag string - Hostname string - Timeout time.Duration - errors uint32 - maxErrors uint32 - status uint32 + mu *sync.RWMutex + Writer *syslog.Writer + cfg *LoggerConfig + logFormat formats.LoggerFormat + netConn net.Conn + Name string + Tag string + Hostname string + Timeout time.Duration + ConnectTimeout time.Duration + errors uint32 + maxErrors uint32 + status uint32 } // NewRemote returns a new object that manipulates and prints outbound connections @@ -66,13 +68,18 @@ func NewRemote(cfg *LoggerConfig) (*Remote, error) { if err != nil { sys.Hostname = "localhost" } - if cfg.WriteTimeout == "" { - cfg.WriteTimeout = writeTimeout + sys.Timeout, err = time.ParseDuration(cfg.WriteTimeout) + if err != nil || cfg.WriteTimeout == "" { + sys.Timeout = writeTimeout + } + + sys.ConnectTimeout, err = time.ParseDuration(cfg.ConnectTimeout) + if err != nil || cfg.ConnectTimeout == "" { + sys.ConnectTimeout = connTimeout } - sys.Timeout = (time.Second * 15) if err = sys.Open(); err != nil { - log.Error("Error loading logger: %s", err) + log.Error("Error loading logger [%s]: %s", sys.Name, err) return nil, err } log.Info("[%s] initialized: %v", sys.Name, cfg) @@ -87,7 +94,7 @@ func (s *Remote) Open() (err error) { return fmt.Errorf("[%s] Server address must not be empty", s.Name) } s.mu.Lock() - s.netConn, err = s.Dial(s.cfg.Protocol, s.cfg.Server, s.Timeout*5) + s.netConn, err = s.Dial(s.cfg.Protocol, s.cfg.Server, s.ConnectTimeout) s.mu.Unlock() if err == nil { @@ -113,12 +120,10 @@ func (s *Remote) Dial(proto, addr string, connTimeout time.Duration) (netConn ne // Close closes the writer object func (s *Remote) Close() (err error) { - s.mu.RLock() if s.netConn != nil { err = s.netConn.Close() - //s.netConn.conn = nil + s.netConn = nil } - s.mu.RUnlock() atomic.StoreUint32(&s.status, DISCONNECTED) return } @@ -158,9 +163,13 @@ func (s *Remote) Write(msg string) { // and have a continuous stream of events. Otherwise it'd stop working. // I haven't figured out yet why these write errors ocurr. s.mu.Lock() + defer s.mu.Unlock() s.netConn.SetWriteDeadline(deadline) + if s.netConn == nil { + s.ReOpen() + return + } _, err := s.netConn.Write([]byte(msg)) - s.mu.Unlock() if err == nil { return } @@ -178,5 +187,5 @@ func (s *Remote) formatLine(msg string) string { if !strings.HasSuffix(msg, "\n") { nl = "\n" } - return fmt.Sprintf("%s%s", msg, nl) + return core.ConcatStrings(msg, nl) } diff --git a/daemon/log/loggers/remote_syslog.go b/daemon/log/loggers/remote_syslog.go index a3424222..5c920648 100644 --- a/daemon/log/loggers/remote_syslog.go +++ b/daemon/log/loggers/remote_syslog.go @@ -14,11 +14,16 @@ import ( const ( LOGGER_REMOTE_SYSLOG = "remote_syslog" - writeTimeout = "1s" // restart syslog connection after these amount of errors maxAllowedErrors = 10 ) +var ( + // default write / connect timeouts + writeTimeout, _ = time.ParseDuration("1s") + connTimeout, _ = time.ParseDuration("5s") +) + // connection status const ( DISCONNECTED = iota @@ -30,12 +35,13 @@ const ( // It can write to the local or a remote daemon. type RemoteSyslog struct { Syslog - mu *sync.RWMutex - netConn net.Conn - Hostname string - Timeout time.Duration - errors uint32 - status uint32 + mu *sync.RWMutex + netConn net.Conn + Hostname string + Timeout time.Duration + ConnectTimeout time.Duration + errors uint32 + status uint32 } // NewRemoteSyslog returns a new object that manipulates and prints outbound connections @@ -66,13 +72,18 @@ func NewRemoteSyslog(cfg *LoggerConfig) (*RemoteSyslog, error) { if err != nil { sys.Hostname = "localhost" } - if cfg.WriteTimeout == "" { - cfg.WriteTimeout = writeTimeout + sys.Timeout, err = time.ParseDuration(cfg.WriteTimeout) + if err != nil || cfg.WriteTimeout == "" { + sys.Timeout = writeTimeout + } + + sys.ConnectTimeout, err = time.ParseDuration(cfg.ConnectTimeout) + if err != nil || cfg.ConnectTimeout == "" { + sys.ConnectTimeout = connTimeout } - sys.Timeout, _ = time.ParseDuration(cfg.WriteTimeout) if err = sys.Open(); err != nil { - log.Error("Error loading logger: %s", err) + log.Error("Error loading logger [%s]: %s", sys.Name, err) return nil, err } log.Info("[%s] initialized: %v", sys.Name, cfg) @@ -87,7 +98,7 @@ func (s *RemoteSyslog) Open() (err error) { return fmt.Errorf("[%s] Server address must not be empty", s.Name) } s.mu.Lock() - s.netConn, err = s.Dial(s.cfg.Protocol, s.cfg.Server, s.Timeout*5) + s.netConn, err = s.Dial(s.cfg.Protocol, s.cfg.Server, s.ConnectTimeout) s.mu.Unlock() if err == nil { @@ -113,13 +124,12 @@ func (s *RemoteSyslog) Dial(proto, addr string, connTimeout time.Duration) (netC // Close closes the writer object func (s *RemoteSyslog) Close() (err error) { - s.mu.RLock() - defer s.mu.RUnlock() - + //s.mu.RLock() if s.netConn != nil { err = s.netConn.Close() - //s.netConn.conn = nil + s.netConn = nil } + //s.mu.RUnlock() atomic.StoreUint32(&s.status, DISCONNECTED) return }