Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Auditbeat][add_session_metadata processor] Fix more potential enrichment failures #39243

Merged
merged 15 commits into from May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions CHANGELOG.next.asciidoc
Expand Up @@ -94,8 +94,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

*Auditbeat*
- Set field types to correctly match ECS in sessionmd processor {issue}38955[38955] {pull}38994[38994]
- Keep process info on exited processes, to avoid failing to enrich events in sessionmd processor {pull}39173[39173]

- Fix failing to enrich process events in sessionmd processor {issue}38955[38955] {pull}39173[39173] {pull}39243[39243]
- Prevent scenario of losing children-related file events in a directory for recursive fsnotify backend of auditbeat file integrity module {pull}39133[39133]


Expand Down
19 changes: 16 additions & 3 deletions x-pack/auditbeat/processors/sessionmd/add_session_metadata.go
Expand Up @@ -96,13 +96,24 @@ func New(cfg *cfg.C) (beat.Processor, error) {
}

func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) {
_, err := ev.GetValue(p.config.PIDField)
pi, err := ev.GetValue(p.config.PIDField)
if err != nil {
// Do not attempt to enrich events without PID; it's not a supported event
return ev, nil //nolint:nilerr // Running on events without PID is expected
}

err = p.provider.UpdateDB(ev)
// Do not enrich failed syscalls, as there was no actual process change related to it
v, err := ev.GetValue("auditd.result")
if err == nil && v == "fail" {
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
return ev, nil
}

pid, err := pidToUInt32(pi)
if err != nil {
return ev, nil //nolint:nilerr // Running on events with a different PID type is not a processor error
}

err = p.provider.UpdateDB(ev, pid)
if err != nil {
return ev, err
}
Expand Down Expand Up @@ -136,7 +147,9 @@ func (p *addSessionMetadata) enrich(ev *beat.Event) (*beat.Event, error) {

fullProcess, err := p.db.GetProcess(pid)
if err != nil {
return nil, fmt.Errorf("pid %v not found in db: %w", pid, err)
e := fmt.Errorf("pid %v not found in db: %w", pid, err)
p.logger.Errorf("%v", e)
return nil, e
}

processMap := fullProcess.ToMap()
Expand Down
36 changes: 5 additions & 31 deletions x-pack/auditbeat/processors/sessionmd/processdb/db.go
Expand Up @@ -238,7 +238,6 @@ func (db *DB) InsertFork(fork types.ProcessForkEvent) {

pid := fork.ChildPIDs.Tgid
ppid := fork.ParentPIDs.Tgid
db.scrapeAncestors(db.processes[pid])

if entry, ok := db.processes[ppid]; ok {
entry.PIDs = pidInfoFromProto(fork.ChildPIDs)
Expand Down Expand Up @@ -282,7 +281,6 @@ func (db *DB) InsertExec(exec types.ProcessExecEvent) {
}

db.processes[exec.PIDs.Tgid] = proc
db.scrapeAncestors(proc)
entryLeaderPID := db.evaluateEntryLeader(proc)
if entryLeaderPID != nil {
db.entryLeaderRelationships[exec.PIDs.Tgid] = *entryLeaderPID
Expand Down Expand Up @@ -568,6 +566,11 @@ func setSameAsProcess(process *types.Process) {
}
}

func (db *DB) HasProcess(pid uint32) bool {
_, ok := db.processes[pid]
mjwolf marked this conversation as resolved.
Show resolved Hide resolved
return ok
}

func (db *DB) GetProcess(pid uint32) (types.Process, error) {
db.mutex.RLock()
defer db.mutex.RUnlock()
Expand All @@ -585,8 +588,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {
fillParent(&ret, parent)
break
}
db.logger.Debugf("failed to find %d in DB (parent of %d), attempting to scrape", process.PIDs.Ppid, pid)
db.scrapeAncestors(process)
}
}

Expand All @@ -596,8 +597,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {
fillGroupLeader(&ret, groupLeader)
break
}
db.logger.Debugf("failed to find %d in DB (group leader of %d), attempting to scrape", process.PIDs.Pgid, pid)
db.scrapeAncestors(process)
}
}

Expand All @@ -607,8 +606,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {
fillSessionLeader(&ret, sessionLeader)
break
}
db.logger.Debugf("failed to find %d in DB (session leader of %d), attempting to scrape", process.PIDs.Sid, pid)
db.scrapeAncestors(process)
}
}

Expand Down Expand Up @@ -712,29 +709,6 @@ func getTTYType(major uint16, minor uint16) TTYType {
return TTYUnknown
}

func (db *DB) scrapeAncestors(proc Process) {
pkoutsovasilis marked this conversation as resolved.
Show resolved Hide resolved
for _, pid := range []uint32{proc.PIDs.Pgid, proc.PIDs.Ppid, proc.PIDs.Sid} {
if _, exists := db.processes[pid]; pid == 0 || exists {
continue
}
procInfo, err := db.procfs.GetProcess(pid)
if err != nil {
db.logger.Debugf("couldn't get %v from procfs: %w", pid, err)
continue
}
p := Process{
PIDs: pidInfoFromProto(procInfo.PIDs),
Creds: credInfoFromProto(procInfo.Creds),
CTTY: ttyDevFromProto(procInfo.CTTY),
Argv: procInfo.Argv,
Cwd: procInfo.Cwd,
Env: procInfo.Env,
Filename: procInfo.Filename,
}
db.insertProcess(p)
}
}

func (db *DB) Close() {
close(db.stopChan)
}
Expand Up @@ -9,6 +9,7 @@ package ebpf_provider
import (
"context"
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/ebpf"
Expand Down Expand Up @@ -151,7 +152,80 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB) (pr
return &p, nil
}

func (s prvdr) UpdateDB(ev *beat.Event) error {
// no-op for ebpf, DB is updated from pushed ebpf events
return nil
const (
maxWaitLimit = 200 * time.Millisecond // Maximum time UpdateDB will wait for process
combinedWaitLimit = 2 * time.Second // Multiple UpdateDB calls will wait up to this amount within resetDuration
backoffDuration = 10 * time.Second // UpdateDB will stop waiting for processes for this time
resetDuration = 5 * time.Second // After this amount of times with no backoffs, the combinedWait will be reset
)

var (
combinedWait = 0 * time.Millisecond
inBackoff = false
backoffStart = time.Now()
since = time.Now()
backoffSkipped = 0
)

// With ebpf, process events are pushed to the DB by the above goroutine, so this doesn't actually update the DB.
// It does to try sync the processor and ebpf events, so that the process is in the process db before continuing.
//
// It's possible that the event to enrich arrives before the process is inserted into the DB. In that case, this
// will block continuing the enrichment until the process is seen (or the timeout is reached).
//
// If for some reason a lot of time has been spent waiting for missing processes, this also has a backoff timer during
// which it will continue without waiting for missing events to arrive, so the processor doesn't become overly backed-up
// waiting for these processes, at the cost of possibly not enriching some processes.
func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error {
if s.db.HasProcess(pid) {
return nil
}

now := time.Now()
if inBackoff {
if now.Sub(backoffStart) > backoffDuration {
s.logger.Warnf("ended backoff, skipped %d processes", backoffSkipped)
inBackoff = false
combinedWait = 0 * time.Millisecond
} else {
backoffSkipped += backoffSkipped
mjwolf marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
} else {
if combinedWait > combinedWaitLimit {
s.logger.Warn("starting backoff")
inBackoff = true
backoffStart = now
backoffSkipped = 0
return nil
}
// maintain a moving window of time for the delays we track
if now.Sub(since) > resetDuration {
since = now
combinedWait = 0 * time.Millisecond
}
}

start := now
nextWait := 5 * time.Millisecond
for {
waited := time.Since(start)
if s.db.HasProcess(pid) {
s.logger.Debugf("got process that was missing after %v", waited)
combinedWait = combinedWait + waited
return nil
}
if waited >= maxWaitLimit {
e := fmt.Errorf("process %v was not seen after %v", pid, waited)
s.logger.Warnf("%w", e)
combinedWait = combinedWait + waited
return e
}
time.Sleep(nextWait)
if nextWait*2+waited > maxWaitLimit {
nextWait = maxWaitLimit - waited
} else {
nextWait = nextWait * 2
}
}
}
Expand Up @@ -41,16 +41,7 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB, rea
}

// UpdateDB will update the process DB with process info from procfs or the event itself
func (s prvdr) UpdateDB(ev *beat.Event) error {
pi, err := ev.Fields.GetValue(s.pidField)
if err != nil {
return fmt.Errorf("event not supported, no pid")
}
pid, ok := pi.(int)
if !ok {
return fmt.Errorf("pid field not int")
}

func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error {
syscall, err := ev.GetValue(syscallField)
if err != nil {
return fmt.Errorf("event not supported, no syscall data")
Expand All @@ -59,7 +50,7 @@ func (s prvdr) UpdateDB(ev *beat.Event) error {
switch syscall {
case "execveat", "execve":
pe := types.ProcessExecEvent{}
proc_info, err := s.reader.GetProcess(uint32(pid))
proc_info, err := s.reader.GetProcess(pid)
if err == nil {
pe.PIDs = proc_info.PIDs
pe.Creds = proc_info.Creds
Expand All @@ -72,7 +63,7 @@ func (s prvdr) UpdateDB(ev *beat.Event) error {
s.logger.Warnf("couldn't get process info from proc for pid %v: %w", pid, err)
// If process info couldn't be taken from procfs, populate with as much info as
// possible from the event
pe.PIDs.Tgid = uint32(pid)
pe.PIDs.Tgid = pid
var intr interface{}
var i int
var ok bool
Expand Down Expand Up @@ -106,7 +97,7 @@ func (s prvdr) UpdateDB(ev *beat.Event) error {
case "exit_group":
pe := types.ProcessExitEvent{
PIDs: types.PIDInfo{
Tgid: uint32(pid),
Tgid: pid,
},
}
s.db.InsertExit(pe)
Expand All @@ -122,8 +113,8 @@ func (s prvdr) UpdateDB(ev *beat.Event) error {
if result == "success" {
setsid_ev := types.ProcessSetsidEvent{
PIDs: types.PIDInfo{
Tgid: uint32(pid),
Sid: uint32(pid),
Tgid: pid,
Sid: pid,
},
}
s.db.InsertSetsid(setsid_ev)
Expand Down
Expand Up @@ -124,7 +124,7 @@ func TestExecveEvent(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.UpdateDB(&event)
err = provider.UpdateDB(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestExecveatEvent(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.UpdateDB(&event)
err = provider.UpdateDB(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestSetSidEvent(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.UpdateDB(&event)
err = provider.UpdateDB(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down Expand Up @@ -399,7 +399,7 @@ func TestSetSidEventFailed(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.UpdateDB(&event)
err = provider.UpdateDB(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down Expand Up @@ -470,7 +470,7 @@ func TestSetSidSessionLeaderNotScraped(t *testing.T) {
provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid")
require.Nil(t, err, "error creating provider")

err = provider.UpdateDB(&event)
err = provider.UpdateDB(&event, expected.PIDs.Tgid)
require.Nil(t, err)

actual, err := db.GetProcess(pid)
Expand Down
Expand Up @@ -11,5 +11,5 @@ import (
)

type Provider interface {
UpdateDB(*beat.Event) error
UpdateDB(*beat.Event, uint32) error
}