Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(instrumentation,telemetry): Incorporate thread pool telemetry #419

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions configs/fibratus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ kstream:
# Determines whether DNS client events are collected
#enable-dns: true

# Determines whether thread pool events are collected
#enable-threadpool: true

# Indicates if stack enrichment is enabled for eligible events
#stack-enrichment: true

Expand Down
5 changes: 5 additions & 0 deletions internal/etw/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (e *EventSource) Open(config *config.Config) error {
config.Kstream.EnableMemKevents = config.Kstream.EnableMemKevents && (e.r.HasMemEvents || (config.Yara.Enabled && !config.Yara.SkipAllocs))
config.Kstream.EnableDNSEvents = config.Kstream.EnableDNSEvents && e.r.HasDNSEvents
config.Kstream.EnableAuditAPIEvents = config.Kstream.EnableAuditAPIEvents && e.r.HasAuditAPIEvents
config.Kstream.EnableThreadpoolEvents = config.Kstream.EnableThreadpoolEvents && e.r.HasThreadpoolEvents
for _, ktype := range ktypes.All() {
if ktype == ktypes.CreateProcess || ktype == ktypes.TerminateProcess ||
ktype == ktypes.LoadImage || ktype == ktypes.UnloadImage {
Expand Down Expand Up @@ -189,6 +190,9 @@ func (e *EventSource) Open(config *config.Config) error {
if config.Kstream.EnableAuditAPIEvents {
e.addTrace(etw.KernelAuditAPICallsSession, etw.KernelAuditAPICallsGUID)
}
if config.Kstream.EnableThreadpoolEvents {
e.addTrace(etw.ThreadpoolSession, etw.ThreadpoolGUID)
}

for _, trace := range e.traces {
err := trace.Start()
Expand Down Expand Up @@ -226,6 +230,7 @@ func (e *EventSource) Open(config *config.Config) error {
// Init consumer and open the trace for processing
consumer := NewConsumer(e.psnap, e.hsnap, config, e.sequencer, e.evts)
consumer.SetFilter(e.filter)

// Attach event listeners
for _, lis := range e.listeners {
consumer.q.RegisterListener(lis)
Expand Down
9 changes: 9 additions & 0 deletions internal/etw/stackext.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,12 @@ func (s *StackExtensions) EnableMemoryCallstack() {
s.AddStackTracing(ktypes.VirtualAlloc)
}
}

// EnableThreadpoolCallstack enables stack tracing for thread pool events.
func (s *StackExtensions) EnableThreadpoolCallstack() {
if s.config.EnableThreadpoolEvents {
s.AddStackTracing(ktypes.SubmitThreadpoolWork)
s.AddStackTracing(ktypes.SubmitThreadpoolCallback)
s.AddStackTracing(ktypes.SetThreadpoolTimer)
}
}
13 changes: 12 additions & 1 deletion internal/etw/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ func (t *Trace) enableCallstacks() {
if t.IsSystemRegistryTrace() {
t.stackExtensions.EnableRegistryCallstack()
}

if t.IsThreadpoolTrace() {
t.stackExtensions.EnableThreadpoolCallstack()
}
}

// Start registers and starts an event tracing session.
Expand Down Expand Up @@ -202,7 +206,9 @@ func (t *Trace) Start() error {
log.Warnf("unable to set empty system flags: %v", err)
return nil
}

sysTraceFlags[0] = flags

// enable object manager tracking
if cfg.EnableHandleKevents {
sysTraceFlags[4] = etw.Handle
Expand All @@ -225,13 +231,14 @@ func (t *Trace) Start() error {
// enrichment is enabled, it is necessary to instruct the provider
// to emit stack addresses in the extended data item section when
// writing events to the session buffers
if cfg.StackEnrichment && !t.IsSystemProvider() {
if cfg.StackEnrichment && !t.IsSystemProvider() && !t.IsThreadpoolTrace() {
return etw.EnableTraceWithOpts(t.GUID, t.startHandle, t.Keywords, etw.EnableTraceOpts{WithStacktrace: true})
} else if cfg.StackEnrichment && len(t.stackExtensions.EventIds()) > 0 {
if err := etw.EnableStackTracing(t.startHandle, t.stackExtensions.EventIds()); err != nil {
return fmt.Errorf("fail to enable system events callstack tracing: %v", err)
}
}

if t.IsSystemRegistryTrace() {
if err := etw.EnableTrace(t.GUID, t.startHandle, t.Keywords); err != nil {
return err
Expand All @@ -249,6 +256,7 @@ func (t *Trace) Start() error {
sysTraceFlags[0] = etw.Registry
return etw.SetTraceSystemFlags(handle, sysTraceFlags)
}

return etw.EnableTrace(t.GUID, t.startHandle, t.Keywords)
}

Expand Down Expand Up @@ -343,6 +351,9 @@ func (t *Trace) IsKernelTrace() bool { return t.GUID == etw.KernelTraceControlGU
// IsSystemRegistryTrace determines if this is the system registry logger trace.
func (t *Trace) IsSystemRegistryTrace() bool { return t.GUID == etw.SystemRegistryProviderID }

// IsThreadpoolTrace determines if this is the thread pool logger trace.
func (t *Trace) IsThreadpoolTrace() bool { return t.GUID == etw.ThreadpoolGUID }

// IsSystemProvider determines if this is one of the granular system provider traces.
func (t *Trace) IsSystemProvider() bool {
return t.GUID == etw.SystemIOProviderID || t.GUID == etw.SystemRegistryProviderID || t.GUID == etw.SystemProcessProviderID || t.GUID == etw.SystemMemoryProviderID
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ func (c *Config) addFlags() {
c.flags.Bool(enableMemKevents, true, "Determines whether memory manager kernel events are collected by Kernel Logger provider")
c.flags.Bool(enableAuditAPIEvents, true, "Determines whether kernel audit API calls events are published")
c.flags.Bool(enableDNSEvents, true, "Determines whether DNS client events are enabled")
c.flags.Bool(enableThreadpoolEvents, true, "Determines whether thread pool events are published")
c.flags.Bool(stackEnrichment, true, "Indicates if stack enrichment is enabled for eligible events")
c.flags.Int(bufferSize, int(maxBufferSize), "Represents the amount of memory allocated for each event tracing session buffer, in kilobytes. The buffer size affects the rate at which buffers fill and must be flushed (small buffer size requires less memory but it increases the rate at which buffers must be flushed)")
c.flags.Int(minBuffers, int(defaultMinBuffers), "Determines the minimum number of buffers allocated for the event tracing session's buffer pool")
Expand Down
29 changes: 16 additions & 13 deletions pkg/config/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,19 +170,20 @@ func (ctx *ActionContext) UniquePids() []uint32 {
// enabling/disabling event providers/types
// dynamically.
type RulesCompileResult struct {
HasProcEvents bool
HasThreadEvents bool
HasImageEvents bool
HasFileEvents bool
HasNetworkEvents bool
HasRegistryEvents bool
HasHandleEvents bool
HasMemEvents bool
HasVAMapEvents bool
HasDNSEvents bool
HasAuditAPIEvents bool
UsedEvents []ktypes.Ktype
NumberRules int
HasProcEvents bool
HasThreadEvents bool
HasImageEvents bool
HasFileEvents bool
HasNetworkEvents bool
HasRegistryEvents bool
HasHandleEvents bool
HasMemEvents bool
HasVAMapEvents bool
HasDNSEvents bool
HasAuditAPIEvents bool
HasThreadpoolEvents bool
UsedEvents []ktypes.Ktype
NumberRules int
}

func (r RulesCompileResult) ContainsEvent(ktype ktypes.Ktype) bool {
Expand Down Expand Up @@ -217,6 +218,7 @@ func (r RulesCompileResult) String() string {
HasVAMapEvents: %t
HasAuditAPIEvents: %t
HasDNSEvents: %t
HasThreadpoolEvents: %t
Events: %s`,
r.HasProcEvents,
r.HasThreadEvents,
Expand All @@ -229,6 +231,7 @@ func (r RulesCompileResult) String() string {
r.HasVAMapEvents,
r.HasAuditAPIEvents,
r.HasDNSEvents,
r.HasThreadpoolEvents,
strings.Join(events, ", "),
)
}
Expand Down
34 changes: 19 additions & 15 deletions pkg/config/kstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,22 @@ import (
)

const (
enableThreadKevents = "kstream.enable-thread"
enableRegistryKevents = "kstream.enable-registry"
enableNetKevents = "kstream.enable-net"
enableFileIOKevents = "kstream.enable-fileio"
enableVAMapKevents = "kstream.enable-vamap"
enableImageKevents = "kstream.enable-image"
enableHandleKevents = "kstream.enable-handle"
enableMemKevents = "kstream.enable-mem"
enableAuditAPIEvents = "kstream.enable-audit-api"
enableDNSEvents = "kstream.enable-dns"
stackEnrichment = "kstream.stack-enrichment"
bufferSize = "kstream.buffer-size"
minBuffers = "kstream.min-buffers"
maxBuffers = "kstream.max-buffers"
flushInterval = "kstream.flush-interval"
enableThreadKevents = "kstream.enable-thread"
enableRegistryKevents = "kstream.enable-registry"
enableNetKevents = "kstream.enable-net"
enableFileIOKevents = "kstream.enable-fileio"
enableVAMapKevents = "kstream.enable-vamap"
enableImageKevents = "kstream.enable-image"
enableHandleKevents = "kstream.enable-handle"
enableMemKevents = "kstream.enable-mem"
enableAuditAPIEvents = "kstream.enable-audit-api"
enableDNSEvents = "kstream.enable-dns"
enableThreadpoolEvents = "kstream.enable-threadpool"
stackEnrichment = "kstream.stack-enrichment"
bufferSize = "kstream.buffer-size"
minBuffers = "kstream.min-buffers"
maxBuffers = "kstream.max-buffers"
flushInterval = "kstream.flush-interval"

excludedEvents = "kstream.blacklist.events"
excludedImages = "kstream.blacklist.images"
Expand Down Expand Up @@ -82,6 +83,8 @@ type KstreamConfig struct {
EnableAuditAPIEvents bool `json:"enable-audit-api" yaml:"enable-audit-api"`
// EnableDNSEvents indicates if DNS client events are enabled
EnableDNSEvents bool `json:"enable-dns" yaml:"enable-dns"`
// EnableThreadpoolEvents indicates if thread pool events are enabled
EnableThreadpoolEvents bool `json:"enable-threadpool" yaml:"enable-threadpool"`
// StackEnrichment indicates if stack enrichment is enabled for eligible events.
StackEnrichment bool `json:"stack-enrichment" yaml:"stack-enrichment"`
// BufferSize represents the amount of memory allocated for each event tracing session buffer, in kilobytes.
Expand Down Expand Up @@ -115,6 +118,7 @@ func (c *KstreamConfig) initFromViper(v *viper.Viper) {
c.EnableMemKevents = v.GetBool(enableMemKevents)
c.EnableAuditAPIEvents = v.GetBool(enableAuditAPIEvents)
c.EnableDNSEvents = v.GetBool(enableDNSEvents)
c.EnableThreadpoolEvents = v.GetBool(enableThreadpoolEvents)
c.StackEnrichment = v.GetBool(stackEnrichment)
c.BufferSize = uint32(v.GetInt(bufferSize))
c.MinBuffers = uint32(v.GetInt(minBuffers))
Expand Down
31 changes: 16 additions & 15 deletions pkg/config/schema_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,25 +179,26 @@ var schema = `
"kstream": {
"type": "object",
"properties": {
"enable-thread": {"type": "boolean"},
"enable-image": {"type": "boolean"},
"enable-registry": {"type": "boolean"},
"enable-fileio": {"type": "boolean"},
"enable-vamap": {"type": "boolean"},
"enable-handle": {"type": "boolean"},
"enable-net": {"type": "boolean"},
"enable-mem": {"type": "boolean"},
"enable-audit-api": {"type": "boolean"},
"enable-dns": {"type": "boolean"},
"stack-enrichment": {"type": "boolean"},
"min-buffers": {"type": "integer", "minimum": 1, "maximum": {{ .MinBuffers }}},
"max-buffers": {"type": "integer", "minimum": 2, "maximum": {{ .MaxBuffers }}},
"buffer-size": {"type": "integer", "maximum": {{ .MaxBufferSize }}},
"enable-thread": {"type": "boolean"},
"enable-image": {"type": "boolean"},
"enable-registry": {"type": "boolean"},
"enable-fileio": {"type": "boolean"},
"enable-vamap": {"type": "boolean"},
"enable-handle": {"type": "boolean"},
"enable-net": {"type": "boolean"},
"enable-mem": {"type": "boolean"},
"enable-audit-api": {"type": "boolean"},
"enable-dns": {"type": "boolean"},
"enable-threadpool": {"type": "boolean"},
"stack-enrichment": {"type": "boolean"},
"min-buffers": {"type": "integer", "minimum": 1, "maximum": {{ .MinBuffers }}},
"max-buffers": {"type": "integer", "minimum": 2, "maximum": {{ .MaxBuffers }}},
"buffer-size": {"type": "integer", "maximum": {{ .MaxBufferSize }}},
"flush-interval": {"type": "string", "minLength": 2, "pattern": "[0-9]+s"},
"blacklist": {
"type": "object",
"properties": {
"events": {"type": "array", "items": {"type": "string", "enum": ["CreateThread", "TerminateThread", "OpenProcess", "OpenThread", "SetThreadContext", "LoadImage", "UnloadImage", "CreateFile", "CloseFile", "ReadFile", "WriteFile", "DeleteFile", "RenameFile", "SetFileInformation", "EnumDirectory", "MapViewFile", "UnmapViewFile", "RegCreateKey", "RegOpenKey", "RegSetValue", "RegQueryValue", "RegQueryKey", "RegDeleteKey", "RegDeleteValue", "RegCloseKey", "Accept", "Send", "Recv", "Connect", "Disconnect", "Reconnect", "Retransmit", "CreateHandle", "CloseHandle", "DuplicateHandle", "QueryDns", "ReplyDns", "VirtualAlloc", "VirtualFree", "CreateSymbolicLinkObject"]}},
"events": {"type": "array", "items": {"type": "string", "enum": ["CreateThread", "TerminateThread", "OpenProcess", "OpenThread", "SetThreadContext", "LoadImage", "UnloadImage", "CreateFile", "CloseFile", "ReadFile", "WriteFile", "DeleteFile", "RenameFile", "SetFileInformation", "EnumDirectory", "MapViewFile", "UnmapViewFile", "RegCreateKey", "RegOpenKey", "RegSetValue", "RegQueryValue", "RegQueryKey", "RegDeleteKey", "RegDeleteValue", "RegCloseKey", "Accept", "Send", "Recv", "Connect", "Disconnect", "Reconnect", "Retransmit", "CreateHandle", "CloseHandle", "DuplicateHandle", "QueryDns", "ReplyDns", "VirtualAlloc", "VirtualFree", "CreateSymbolicLinkObject", "SubmitThreadpoolWork", "SubmitThreadpoolCallback", "SetThreadpoolTimer"]}},
"images": {"type": "array", "items": {"type": "string", "minLength": 1}}
},
"additionalProperties": false
Expand Down
2 changes: 2 additions & 0 deletions pkg/filter/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,8 @@ func (r *Rules) buildCompileResult() *config.RulesCompileResult {
rs.HasMemEvents = true
case ktypes.Handle:
rs.HasHandleEvents = true
case ktypes.Threadpool:
rs.HasThreadpoolEvents = true
}
if typ == ktypes.MapViewFile || typ == ktypes.UnmapViewFile {
rs.HasVAMapEvents = true
Expand Down
10 changes: 10 additions & 0 deletions pkg/kevent/kevent_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,16 @@ func (e *Kevent) Summary() string {
case ktypes.ReplyDNS:
dnsName := e.GetParamAsString(kparams.DNSName)
return printSummary(e, fmt.Sprintf("received DNS response for <code>%s</code> query", dnsName))
case ktypes.CreateSymbolicLinkObject:
src := e.GetParamAsString(kparams.LinkSource)
target := e.GetParamAsString(kparams.LinkTarget)
return printSummary(e, fmt.Sprintf("created symbolic link from %s to %s", src, target))
case ktypes.SubmitThreadpoolWork:
return printSummary(e, "enqueued the work item to the thread pool")
case ktypes.SubmitThreadpoolCallback:
return printSummary(e, "Submitted the thread pool callback for execution within the work item")
case ktypes.SetThreadpoolTimer:
return printSummary(e, "set thread pool timer object")
}
return ""
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/kevent/kparam_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,30 @@ func (e *Kevent) produceParams(evt *etw.EventRecord) {
if evt.HasStackTrace() {
e.AppendParam(kparams.Callstack, kparams.Slice, evt.Callstack())
}
case ktypes.SubmitThreadpoolWork, ktypes.SubmitThreadpoolCallback:
poolID := evt.ReadUint64(0)
taskID := evt.ReadUint64(8)
callback := evt.ReadUint64(16)
ctx := evt.ReadUint64(24)
tag := evt.ReadUint64(32)
e.AppendParam(kparams.ThreadpoolPoolID, kparams.Address, poolID)
e.AppendParam(kparams.ThreadpoolTaskID, kparams.Address, taskID)
e.AppendParam(kparams.ThreadpoolCallback, kparams.Address, callback)
e.AppendParam(kparams.ThreadpoolContext, kparams.Address, ctx)
e.AppendParam(kparams.ThreadpoolSubprocessTag, kparams.Address, tag)
case ktypes.SetThreadpoolTimer:
duetime := evt.ReadUint64(0)
subqueue := evt.ReadUint64(8)
timer := evt.ReadUint64(16)
period := evt.ReadUint32(24)
window := evt.ReadUint32(28)
absolute := evt.ReadUint32(32)
e.AppendParam(kparams.ThreadpoolTimerDuetime, kparams.Uint64, duetime)
e.AppendParam(kparams.ThreadpoolTimerSubqueue, kparams.Address, subqueue)
e.AppendParam(kparams.ThreadpoolTimer, kparams.Address, timer)
e.AppendParam(kparams.ThreadpoolTimerPeriod, kparams.Uint32, period)
e.AppendParam(kparams.ThreadpoolTimerWindow, kparams.Uint32, window)
e.AppendParam(kparams.ThreadpoolTimerAbsolute, kparams.Bool, absolute > 0)
}
}

Expand Down
33 changes: 33 additions & 0 deletions pkg/kevent/kparams/fields_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,37 @@ const (
LinkSource = "source"
// LinkTarget identifies the parameter that represents the target symbolic link object or other kernel object
LinkTarget = "target"

// ThreadpoolPoolID represents the thread pool identifier.
ThreadpoolPoolID = "pool_id"
// ThreadpoolTaskID represents the thread pool task identifier.
ThreadpoolTaskID = "task_id"
// ThreadpoolCallback represents the address of the callback function.
ThreadpoolCallback = "callback"
// ThreadpoolCallbackSymbol represents the callback symbol.
ThreadpoolCallbackSymbol = "callback_symbol"
// ThreadpoolCallbackModule represents the module containing the callback symbol.
ThreadpoolCallbackModule = "callback_module"
// ThreadpoolContext represents the address of the callback context.
ThreadpoolContext = "context"
// ThreadpoolContextRip represents the value of instruction pointer contained in the callback context.
ThreadpoolContextRip = "context_rip"
// ThreadpoolContextRipSymbol represents the symbol name associated with the instruction pointer in callback context.
ThreadpoolContextRipSymbol = "context_rip_symbol"
// ThreadpoolContextRipModule represents the module name associated with the instruction pointer in callback context.
ThreadpoolContextRipModule = "context_rip_module"
// ThreadpoolSubprocessTag represents the service identifier associated with the thread pool.
ThreadpoolSubprocessTag = "subprocess_tag"
// ThreadpoolTimerDuetime represents the timer due time.
ThreadpoolTimerDuetime = "duetime"
// ThreadpoolTimerSubqueue represents the memory address of the timer subqueue.
ThreadpoolTimerSubqueue = "subqueue"
// ThreadpoolTimer represents the memory address of the timer object.
ThreadpoolTimer = "timer"
// ThreadpoolTimerPeriod represents the period of the timer
ThreadpoolTimerPeriod = "period"
// ThreadpoolTimerWindow represents the timer tolerate period.
ThreadpoolTimerWindow = "window"
// ThreadpoolTimerAbsolute indicates if the timer is absolute or relative.
ThreadpoolTimerAbsolute = "absolute"
)
3 changes: 3 additions & 0 deletions pkg/kevent/ktypes/category.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
Mem Category = "mem"
// Object the category for object manager events
Object Category = "object"
// Threadpool is the category for thread pool events
Threadpool Category = "threadpool"
// Other is the category for uncategorized events
Other Category = "other"
// Unknown is the category for events that couldn't match any of the previous categories
Expand Down Expand Up @@ -82,5 +84,6 @@ func Categories() []string {
string(Other),
string(Unknown),
string(Object),
string(Threadpool),
}
}
2 changes: 2 additions & 0 deletions pkg/kevent/ktypes/eventset.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func (e *EventsetMasks) bitsetIndex(guid windows.GUID) int {
return 9
case DNSEventGUID:
return 10
case ThreadpoolGUID:
return 11
default:
return -1
}
Expand Down
Loading
Loading