remote loggers improvements

- Remote logger: fixed a couple of leaks.
- Allow to use multiple remote loggers.
- Allow to use rfc3164 format.
This commit is contained in:
Gustavo Iñiguez Goia 2023-07-23 22:27:28 +02:00
parent cdf6497ea7
commit 6e340a7e39
Failed to generate hash of commit
3 changed files with 31 additions and 22 deletions

View file

@ -25,11 +25,15 @@ func NewRfc5424() *Rfc5424 {
// Transform takes input arguments and formats them to RFC5424 format.
func (r *Rfc5424) Transform(args ...interface{}) (out string) {
hostname := ""
tag := ""
arg1 := args[0]
arg2 := args[1]
arg3 := args[2]
hostname := arg2.(string)
tag := arg3.(string)
if len(args) > 1 {
arg2 := args[1]
arg3 := args[2]
hostname = arg2.(string)
tag = arg3.(string)
}
values := arg1.([]interface{})
for n, val := range values {
switch val.(type) {

View file

@ -49,6 +49,12 @@ func NewLoggerManager() *LoggerManager {
func (l *LoggerManager) Load(configs []LoggerConfig, workers int) {
for _, cfg := range configs {
switch cfg.Name {
case LOGGER_REMOTE:
if lgr, err := NewRemote(&cfg); err == nil {
l.count++
l.loggers[fmt.Sprint(lgr.Name, lgr.cfg.Server, lgr.cfg.Protocol)] = lgr
workers += cfg.Workers
}
case LOGGER_REMOTE_SYSLOG:
if lgr, err := NewRemoteSyslog(&cfg); err == nil {
l.count++
@ -70,7 +76,7 @@ func (l *LoggerManager) Load(configs []LoggerConfig, workers int) {
l.msgs = make(chan []interface{}, workers)
for i := 0; i < workers; i++ {
go l.newWorker(i)
go newWorker(i, l)
}
}
@ -81,7 +87,7 @@ func (l *LoggerManager) write(args ...interface{}) {
}
}
func (l *LoggerManager) newWorker(id int) {
func newWorker(id int, l *LoggerManager) {
for {
for msg := range l.msgs {
l.write(msg)

View file

@ -54,7 +54,9 @@ func NewRemoteSyslog(cfg *LoggerConfig) (*RemoteSyslog, error) {
// list of allowed formats for this logger
sys.logFormat = formats.NewRfc5424()
if cfg.Format == formats.CSV {
if cfg.Format == formats.RFC3164 {
sys.logFormat = formats.NewRfc3164()
} else if cfg.Format == formats.CSV {
sys.logFormat = formats.NewCSV()
}
@ -86,7 +88,9 @@ func (s *RemoteSyslog) Open() (err error) {
if s.cfg.Server == "" {
return fmt.Errorf("[%s] Server address must not be empty", s.Name)
}
s.mu.Lock()
s.netConn, err = s.Dial(s.cfg.Protocol, s.cfg.Server, s.Timeout*5)
s.mu.Unlock()
if err == nil {
atomic.StoreUint32(&s.status, CONNECTED)
@ -98,9 +102,7 @@ func (s *RemoteSyslog) Open() (err error) {
func (s *RemoteSyslog) Dial(proto, addr string, connTimeout time.Duration) (netConn net.Conn, err error) {
switch proto {
case "udp", "tcp":
s.mu.Lock()
netConn, err = net.DialTimeout(proto, addr, connTimeout)
s.mu.Unlock()
if err != nil {
return nil, err
}
@ -157,20 +159,17 @@ func (s *RemoteSyslog) Write(msg string) {
// Reopening the connection with the server helps to resume sending events to syslog,
// and have a continuous stream of events. Otherwise it'd stop working.
// I haven't figured out yet why these write errors ocurr.
switch s.netConn.(type) {
case *net.TCPConn, *net.UDPConn:
s.mu.RLock()
s.netConn.SetWriteDeadline(deadline)
_, err := s.netConn.Write([]byte(s.formatLine(msg)))
s.mu.RUnlock()
s.mu.RLock()
s.netConn.SetWriteDeadline(deadline)
_, err := s.netConn.Write([]byte(msg))
s.mu.RUnlock()
if err != nil {
log.Debug("[%s] %s write error: %v", s.Name, s.cfg.Protocol, err.(net.Error))
atomic.AddUint32(&s.errors, 1)
if atomic.LoadUint32(&s.errors) > maxAllowedErrors {
s.ReOpen()
return
}
if err != nil {
log.Debug("[%s] %s write error: %v", s.Name, s.cfg.Protocol, err.(net.Error))
atomic.AddUint32(&s.errors, 1)
if atomic.LoadUint32(&s.errors) > maxAllowedErrors {
s.ReOpen()
return
}
}
}