Skip to content

ROX-31047: Refactor file activity pipeline to use pub/sub#19603

Open
Stringy wants to merge 1 commit intogiles/ROX-31047-refactor-process-enrichmentfrom
giles/ROX-31047-refactor-file-activity-pipeline
Open

ROX-31047: Refactor file activity pipeline to use pub/sub#19603
Stringy wants to merge 1 commit intogiles/ROX-31047-refactor-process-enrichmentfrom
giles/ROX-31047-refactor-file-activity-pipeline

Conversation

@Stringy
Copy link
Copy Markdown
Contributor

@Stringy Stringy commented Mar 25, 2026

Description

The file activity pipeline now subscribes to enriched and unenriched process indicator events via the internal pub/sub system, rather than performing its own process enrichment. This allows file activities to be correlated with process indicators that have already been enriched by the process pipeline.

User-facing documentation

Testing and quality

  • the change is production ready: the change is GA, or otherwise the functionality is gated by a feature flag
  • CI results are inspected

Automated testing

  • added unit tests
  • added e2e tests
  • added regression tests
  • added compatibility tests
  • modified existing tests

How I validated my change

Validated together with the process pipeline, outlined in #18546

@Stringy Stringy requested a review from vikin91 March 25, 2026 13:34
@Stringy Stringy requested a review from a team as a code owner March 25, 2026 13:34
Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • The new Prometheus metrics file_activity_buffer_drops and file_activity_buffer_size are defined but I don’t see them being registered with the global registry in this file; please ensure they are added to the existing MustRegister/registration block so they are actually exposed.
  • Several tests rely on fixed time.Sleep calls to wait for the pipeline to process events, which can be flaky under load or on slow CI; consider synchronizing via channels or hooks (e.g., waiting on detector calls or dispatcher events) instead of sleeps.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The new Prometheus metrics `file_activity_buffer_drops` and `file_activity_buffer_size` are defined but I don’t see them being registered with the global registry in this file; please ensure they are added to the existing `MustRegister`/registration block so they are actually exposed.
- Several tests rely on fixed `time.Sleep` calls to wait for the pipeline to process events, which can be flaky under load or on slow CI; consider synchronizing via channels or hooks (e.g., waiting on detector calls or dispatcher events) instead of sleeps.

## Individual Comments

### Comment 1
<location path="sensor/common/filesystem/pipeline/pipeline.go" line_range="200-201" />
<code_context>
 	}
+}
+
+func cacheKey(containerID, processSignalID string) string {
+	return fmt.Sprintf("%s:%s", containerID, processSignalID)
+}
+
</code_context>
<issue_to_address>
**suggestion (performance):** cacheKey could avoid fmt.Sprintf to reduce allocations in a hot path

Since this is likely called frequently, consider `return containerID + ":" + processSignalID` to avoid per-call allocations and formatting overhead while keeping the key the same.

Suggested implementation:

```golang
+}
+
+func cacheKey(containerID, processSignalID string) string {
+	return containerID + ":" + processSignalID
+}
+

```

```golang
 	"context"
 	"time"

```
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +200 to +201
func cacheKey(containerID, processSignalID string) string {
return fmt.Sprintf("%s:%s", containerID, processSignalID)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): cacheKey could avoid fmt.Sprintf to reduce allocations in a hot path

Since this is likely called frequently, consider return containerID + ":" + processSignalID to avoid per-call allocations and formatting overhead while keeping the key the same.

Suggested implementation:

+}
+
+func cacheKey(containerID, processSignalID string) string {
+	return containerID + ":" + processSignalID
+}
+
 	"context"
 	"time"

@rhacs-bot
Copy link
Copy Markdown
Contributor

rhacs-bot commented Mar 25, 2026

Images are ready for the commit at 437d8c8.

To use with deploy scripts, first export MAIN_IMAGE_TAG=4.11.x-467-g437d8c806f.

@codecov
Copy link
Copy Markdown

codecov bot commented Mar 25, 2026

Codecov Report

❌ Patch coverage is 70.98765% with 47 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (giles/ROX-31047-refactor-process-enrichment@b2a3dae). Learn more about missing BASE report.

Files with missing lines Patch % Lines
sensor/common/filesystem/pipeline/pipeline.go 70.98% 35 Missing and 12 partials ⚠️
Additional details and impacted files
@@                              Coverage Diff                               @@
##             giles/ROX-31047-refactor-process-enrichment   #19603   +/-   ##
==============================================================================
  Coverage                                               ?   49.41%           
==============================================================================
  Files                                                  ?     2744           
  Lines                                                  ?   207362           
  Branches                                               ?        0           
==============================================================================
  Hits                                                   ?   102466           
  Misses                                                 ?    97291           
  Partials                                               ?     7605           
Flag Coverage Δ
go-unit-tests 49.41% <70.98%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@Stringy Stringy force-pushed the giles/ROX-31047-refactor-process-enrichment branch from e1bea35 to b2a3dae Compare March 27, 2026 09:51
… enrichment

The file activity pipeline now subscribes to enriched and unenriched
process indicator events via the internal pub/sub system, rather than
performing its own process enrichment. This allows file activities to
be correlated with process indicators that have already been enriched
by the process pipeline.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@Stringy Stringy force-pushed the giles/ROX-31047-refactor-file-activity-pipeline branch from 4922b1a to 437d8c8 Compare March 27, 2026 09:53
Copy link
Copy Markdown
Contributor

@vikin91 vikin91 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Posting a partial review now so that you can address the issues while I am away.
I still need to give it a careful read.

log.Errorf("Failed to register consumer for enriched process indicators, falling back to legacy mode: %v", err)
p.pubSubDispatcher = nil
} else {
log.Info("File system pipeline using pub/sub mode for process enrichment")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A leftover? This can be noisy.

Help: "Current number of container entries waiting in the process-enrichment LRU cache",
})

fileActivityBufferDrops = prometheus.NewCounter(prometheus.CounterOpts{
Copy link
Copy Markdown
Contributor

@vikin91 vikin91 Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You forgot to register those new metrics.
It is a bit ugly that in this pkg we do that in sensor/common/metrics/init.go, so it was easy to miss.

activityChan <- newTestFileActivity(testContainerID, testSignalID, path)
}

time.Sleep(100 * time.Millisecond)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I call those flake-magnets :)
Maybe synctest would help here as well? It is not obvious place to use that, but maybe...
Let's try to avoid sleeps if possible.

Comment on lines +200 to +202
func cacheKey(containerID, processSignalID string) string {
return fmt.Sprintf("%s:%s", containerID, processSignalID)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please benchmark this as Sprintf is expensive and this will be called really many times.
AI suggest me that containerID + ":" + processSignalID would be cheaper, but I don't trust that.

Comment on lines +216 to +221
select {
case <-fileAccessReceived:
// Success — host process was handled directly.
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for host process file access")
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's port to synctest as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants