getparents code reorganization

Deoptimizing GetParents() until we figure out how to do it without
leaking mem.
This commit is contained in:
Gustavo Iñiguez Goia 2023-10-04 00:58:17 +02:00
parent 7f493e84a7
commit 19d376abf0
Failed to generate hash of commit
8 changed files with 89 additions and 85 deletions

View file

@ -320,6 +320,8 @@ func (c *Connection) String() string {
// Serialize returns a connection serialized.
func (c *Connection) Serialize() *protocol.Connection {
c.Process.RLock()
defer c.Process.RUnlock()
return &protocol.Connection{
Protocol: c.Protocol,
SrcIp: c.SrcIP.String(),

View file

@ -36,16 +36,12 @@ func MonitorProcEvents(stop <-chan struct{}) {
if _, needsUpdate, found := EventsCache.IsInStore(int(ev.PID), proc); found {
if needsUpdate {
EventsCache.ComputeChecksums(proc)
EventsCache.UpdateItemDetails(proc)
EventsCache.UpdateItem(proc)
}
log.Debug("[procmon exec event inCache] %d, pid:%d tgid:%d\n", ev.TimeStamp, ev.PID, ev.TGID)
continue
}
// adding item to cache in 2 steps:
// 1. with basic information, to have it readily available
// 2. getting the rest of the process details
EventsCache.Add(proc)
EventsCache.UpdateItemDetails(proc)
} else if ev.IsExit() {
p, _, found := EventsCache.IsInStore(int(ev.PID), nil)
if found && p.Proc.IsAlive() == false {

View file

@ -95,11 +95,11 @@ func (e *EventsStore) UpdateItem(proc *Process) {
if proc.Path == "" {
return
}
e.mu.Lock()
ev := &ExecEventItem{
Proc: proc,
LastSeen: time.Now().UnixNano(),
}
e.mu.Lock()
e.eventByPID[proc.ID] = ev
e.eventByPath[proc.Path] = ev
e.mu.Unlock()
@ -185,15 +185,6 @@ func (e *EventsStore) DeleteOldItems() {
}
}
// UpdateItemDetails updates the details of a process
func (e *EventsStore) UpdateItemDetails(proc *Process) {
proc.GetParent()
proc.GetTree()
proc.ReadCwd()
proc.ReadEnv()
e.UpdateItem(proc)
}
// -------------------------------------------------------------------------
// TODO: Move to its own package.
// A hashing service than runs in background, and accepts paths to hash
@ -247,17 +238,15 @@ func (e *EventsStore) ComputeChecksums(proc *Process) {
// pid found in cache
// we should check other parameters to see if the pid is really the same process
// proc/<pid>/maps
item.Proc.mu.RLock()
item.Proc.RLock()
checksumsNum := len(item.Proc.Checksums)
item.Proc.mu.RUnlock()
item.Proc.RUnlock()
if checksumsNum > 0 && (item.Proc.IsAlive() && item.Proc.Path == proc.Path) {
log.Debug("[cache] reuseChecksums() cached PID alive, already hashed: %v, %s new: %s", item.Proc.Checksums, item.Proc.Path, proc.Path)
proc.Checksums = item.Proc.Checksums
return
}
item.Proc.mu.RLock()
log.Debug("[cache] reuseChecksums() PID found inCache, computing hashes: %s new: %s - hashes: |%v<>%v|", item.Proc.Path, proc.Path, item.Proc.Checksums, proc.Checksums)
item.Proc.mu.RUnlock()
proc.ComputeChecksums(e.checksums)
}

View file

@ -27,9 +27,7 @@ var socketsRegex, _ = regexp.Compile(`socket:\[([0-9]+)\]`)
// GetParent obtains the information of this process' parent.
func (p *Process) GetParent() {
p.mu.RLock()
hasParent := p.Parent != nil
p.mu.RUnlock()
if hasParent {
return
@ -50,44 +48,29 @@ func (p *Process) GetParent() {
return
}
if item, found := EventsCache.IsInStoreByPID(ppid); found {
p.mu.Lock()
p.Parent = item.Proc
p.mu.Unlock()
EventsCache.UpdateItem(p)
} else {
p.mu.Lock()
p.Parent = NewProcessEmpty(ppid, "")
p.Parent.ReadPath()
p.mu.Unlock()
EventsCache.Add(p.Parent)
}
// TODO: see how we can reuse this object and the ppid, to save some iterations.
// right now it opens the can of leaks.
p.Parent = NewProcessEmpty(ppid, "")
p.Parent.ReadPath()
// get process tree
p.Parent.GetParent()
}
// GetTree returns all the parents of this process.
func (p *Process) GetTree() {
// BuildTree returns all the parents of this process.
func (p *Process) BuildTree() {
if len(p.Tree) > 0 {
fmt.Println("GetTree not empty:", p.Tree)
return
}
tree := make([]*protocol.StringInt, 0)
for pp := p.Parent; pp != nil; pp = pp.Parent {
// add the parents in reverse order, so when we iterate over them with the rules
// the first item is the most direct parent of the process.
pp.mu.RLock()
tree = append(tree,
p.Tree = append(p.Tree,
&protocol.StringInt{
Key: pp.Path, Value: uint32(pp.ID),
},
)
pp.mu.RUnlock()
}
p.mu.Lock()
p.Tree = tree
p.mu.Unlock()
}
// GetDetails collects information of a process.
@ -153,9 +136,7 @@ func (p *Process) ReadCwd() error {
if err != nil {
return err
}
p.mu.Lock()
p.CWD = link
p.mu.Unlock()
return nil
}
@ -458,9 +439,9 @@ func (p *Process) ComputeChecksum(algo string) {
log.Debug("[hashing] Unable to dump process memory: %s", err)
continue
}
p.mu.Lock()
p.Lock()
p.Checksums[algo] = hex.EncodeToString(h.Sum(code))
p.mu.Unlock()
p.Unlock()
log.Debug("[hashing] memory region hashed, elapsed: %v ,Hash: %s, %s\n", time.Since(start), p.Checksums[algo], paths[i])
code = nil
break
@ -471,9 +452,9 @@ func (p *Process) ComputeChecksum(algo string) {
log.Debug("[hashing %s] Error copying data: %s", algo, err)
continue
}
p.mu.Lock()
p.Lock()
p.Checksums[algo] = hex.EncodeToString(h.Sum(nil))
p.mu.Unlock()
p.Unlock()
log.Debug("[hashing] elapsed: %v ,Hash: %s, %s\n", time.Since(start), p.Checksums[algo], paths[i])
break

View file

@ -165,40 +165,10 @@ func streamEventsWorker(id int, chn chan []byte, lost chan uint64, kernelEvents
} else {
switch event.Type {
case EV_TYPE_EXEC, EV_TYPE_EXECVEAT:
proc := event2process(&event)
if proc == nil {
continue
}
// TODO: store multiple executions with the same pid but different paths: forks, execves...
if item, needsUpdate, found := procmon.EventsCache.IsInStore(int(event.PID), proc); found {
if needsUpdate {
// when a process is replaced in memory, it'll be found in cache by PID,
// but the new process' details will be empty
proc.Parent = item.Proc
procmon.EventsCache.ComputeChecksums(proc)
procmon.EventsCache.UpdateItemDetails(proc)
}
log.Debug("[eBPF event inCache] -> %d, %v", event.PID, item.Proc.Checksums)
continue
}
// adding item to cache in 2 steps:
// 1. with basic information, to have it readily available
// 2. getting the rest of the process details that takes more time
procmon.EventsCache.Add(proc)
procmon.EventsCache.UpdateItemDetails(proc)
processExecEvent(&event)
case EV_TYPE_SCHED_EXIT:
log.Debug("[eBPF exit event] total: %d, pid: %d, ppid: %d", 0 /*execEvents.Len()*/, event.PID, event.PPID)
ev, _, found := procmon.EventsCache.IsInStore(int(event.PID), nil)
if !found {
continue
}
log.Debug("[eBPF exit event inCache] pid: %d, tgid: %d", event.PID, event.PPID)
if ev.Proc.IsAlive() == false {
procmon.EventsCache.Delete(int(event.PID))
log.Debug("[ebpf exit event] deleting DEAD pid: %d", event.PID)
}
processExitEvent(&event)
}
}
}
@ -212,6 +182,9 @@ func event2process(event *execEvent) (proc *procmon.Process) {
proc = procmon.NewProcessEmpty(int(event.PID), byteArrayToString(event.Comm[:]))
proc.UID = int(event.UID)
// trust process path received from kernel
// NOTE: this is the absolute path executed, but no the real path to the binary.
// if it's executed from a chroot, the absolute path willa be /chroot/path/usr/bin/blabla
// if it's from a container, the absolute path will be /proc/<pid>/root/usr/bin/blabla
path := byteArrayToString(event.Filename[:])
if path != "" {
proc.SetPath(path)
@ -228,7 +201,45 @@ func event2process(event *execEvent) (proc *procmon.Process) {
} else {
proc.ReadCmdline()
}
proc.GetParent()
proc.BuildTree()
proc.ReadCwd()
proc.ReadEnv()
log.Debug("[eBPF exec event] ppid: %d, pid: %d, %s -> %s", event.PPID, event.PID, proc.Path, proc.Args)
return
}
func processExecEvent(event *execEvent) {
proc := event2process(event)
if proc == nil {
return
}
// TODO: store multiple executions with the same pid but different paths:
// forks, execves... execs from chroots, containers, etc.
if item, needsUpdate, found := procmon.EventsCache.IsInStore(int(event.PID), proc); found {
if needsUpdate {
// when a process is replaced in memory, it'll be found in cache by PID,
// but the new process's details will be empty
proc.Parent = item.Proc
procmon.EventsCache.ComputeChecksums(proc)
procmon.EventsCache.UpdateItem(proc)
}
log.Debug("[eBPF event inCache] -> %d, %v", event.PID, item.Proc.Checksums)
return
}
procmon.EventsCache.Add(proc)
}
func processExitEvent(event *execEvent) {
log.Debug("[eBPF exit event] pid: %d, ppid: %d", event.PID, event.PPID)
ev, _, found := procmon.EventsCache.IsInStore(int(event.PID), nil)
if !found {
return
}
log.Debug("[eBPF exit event inCache] pid: %d, tgid: %d", event.PID, event.PPID)
if ev.Proc.IsAlive() == false {
procmon.EventsCache.Delete(int(event.PID))
log.Debug("[ebpf exit event] deleting DEAD pid: %d", event.PID)
}
}

View file

@ -158,7 +158,7 @@ func NewProcess(pid int, comm string) *Process {
}
p.GetDetails()
p.GetParent()
p.GetTree()
p.BuildTree()
return p
}
@ -176,6 +176,26 @@ func NewProcessWithParent(pid, ppid int, comm string) *Process {
return p
}
// Lock locks this process for w+r
func (p *Process) Lock() {
p.mu.Lock()
}
// Unlock unlocks reading from this process
func (p *Process) Unlock() {
p.mu.Unlock()
}
// RLock locks this process for r
func (p *Process) RLock() {
p.mu.RLock()
}
// RUnlock unlocks reading from this process
func (p *Process) RUnlock() {
p.mu.RUnlock()
}
//Serialize transforms a Process object to gRPC protocol object
func (p *Process) Serialize() *protocol.Process {
ioStats := p.IOStats

View file

@ -319,9 +319,14 @@ func (o *Operator) Match(con *conman.Connection, hasChecksums bool) bool {
if !hasChecksums {
return ret
}
con.Process.RLock()
for algo := range con.Process.Checksums {
return o.cb(con.Process.Checksums[algo])
ret = o.cb(con.Process.Checksums[algo])
if ret {
break
}
}
con.Process.RUnlock()
return ret
} else if o.Operand == OpProto {
return o.cb(con.Protocol)

View file

@ -67,7 +67,7 @@ func (c *Client) monitorProcessDetails(pid int, stream protocol.UI_Notifications
p = &newProc
if len(p.Tree) == 0 {
p.GetParent()
p.GetTree()
p.BuildTree()
}
} else {
p = procmon.NewProcess(pid, "")