ROX-31047: Refactor file activity pipeline to use pub/sub#19603
ROX-31047: Refactor file activity pipeline to use pub/sub#19603Stringy wants to merge 1 commit intogiles/ROX-31047-refactor-process-enrichmentfrom
Conversation
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- The new Prometheus metrics
file_activity_buffer_dropsandfile_activity_buffer_sizeare defined but I don’t see them being registered with the global registry in this file; please ensure they are added to the existingMustRegister/registration block so they are actually exposed. - Several tests rely on fixed
time.Sleepcalls to wait for the pipeline to process events, which can be flaky under load or on slow CI; consider synchronizing via channels or hooks (e.g., waiting on detector calls or dispatcher events) instead of sleeps.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The new Prometheus metrics `file_activity_buffer_drops` and `file_activity_buffer_size` are defined but I don’t see them being registered with the global registry in this file; please ensure they are added to the existing `MustRegister`/registration block so they are actually exposed.
- Several tests rely on fixed `time.Sleep` calls to wait for the pipeline to process events, which can be flaky under load or on slow CI; consider synchronizing via channels or hooks (e.g., waiting on detector calls or dispatcher events) instead of sleeps.
## Individual Comments
### Comment 1
<location path="sensor/common/filesystem/pipeline/pipeline.go" line_range="200-201" />
<code_context>
}
+}
+
+func cacheKey(containerID, processSignalID string) string {
+ return fmt.Sprintf("%s:%s", containerID, processSignalID)
+}
+
</code_context>
<issue_to_address>
**suggestion (performance):** cacheKey could avoid fmt.Sprintf to reduce allocations in a hot path
Since this is likely called frequently, consider `return containerID + ":" + processSignalID` to avoid per-call allocations and formatting overhead while keeping the key the same.
Suggested implementation:
```golang
+}
+
+func cacheKey(containerID, processSignalID string) string {
+ return containerID + ":" + processSignalID
+}
+
```
```golang
"context"
"time"
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| func cacheKey(containerID, processSignalID string) string { | ||
| return fmt.Sprintf("%s:%s", containerID, processSignalID) |
There was a problem hiding this comment.
suggestion (performance): cacheKey could avoid fmt.Sprintf to reduce allocations in a hot path
Since this is likely called frequently, consider return containerID + ":" + processSignalID to avoid per-call allocations and formatting overhead while keeping the key the same.
Suggested implementation:
+}
+
+func cacheKey(containerID, processSignalID string) string {
+ return containerID + ":" + processSignalID
+}
+ "context"
"time"|
Images are ready for the commit at 437d8c8. To use with deploy scripts, first |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## giles/ROX-31047-refactor-process-enrichment #19603 +/- ##
==============================================================================
Coverage ? 49.41%
==============================================================================
Files ? 2744
Lines ? 207362
Branches ? 0
==============================================================================
Hits ? 102466
Misses ? 97291
Partials ? 7605
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
e1bea35 to
b2a3dae
Compare
… enrichment The file activity pipeline now subscribes to enriched and unenriched process indicator events via the internal pub/sub system, rather than performing its own process enrichment. This allows file activities to be correlated with process indicators that have already been enriched by the process pipeline. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
4922b1a to
437d8c8
Compare
vikin91
left a comment
There was a problem hiding this comment.
Posting a partial review now so that you can address the issues while I am away.
I still need to give it a careful read.
| log.Errorf("Failed to register consumer for enriched process indicators, falling back to legacy mode: %v", err) | ||
| p.pubSubDispatcher = nil | ||
| } else { | ||
| log.Info("File system pipeline using pub/sub mode for process enrichment") |
There was a problem hiding this comment.
A leftover? This can be noisy.
| Help: "Current number of container entries waiting in the process-enrichment LRU cache", | ||
| }) | ||
|
|
||
| fileActivityBufferDrops = prometheus.NewCounter(prometheus.CounterOpts{ |
There was a problem hiding this comment.
You forgot to register those new metrics.
It is a bit ugly that in this pkg we do that in sensor/common/metrics/init.go, so it was easy to miss.
| activityChan <- newTestFileActivity(testContainerID, testSignalID, path) | ||
| } | ||
|
|
||
| time.Sleep(100 * time.Millisecond) |
There was a problem hiding this comment.
I call those flake-magnets :)
Maybe synctest would help here as well? It is not obvious place to use that, but maybe...
Let's try to avoid sleeps if possible.
| func cacheKey(containerID, processSignalID string) string { | ||
| return fmt.Sprintf("%s:%s", containerID, processSignalID) | ||
| } |
There was a problem hiding this comment.
Please benchmark this as Sprintf is expensive and this will be called really many times.
AI suggest me that containerID + ":" + processSignalID would be cheaper, but I don't trust that.
| select { | ||
| case <-fileAccessReceived: | ||
| // Success — host process was handled directly. | ||
| case <-time.After(2 * time.Second): | ||
| t.Fatal("Timeout waiting for host process file access") | ||
| } |
There was a problem hiding this comment.
Let's port to synctest as well.
Description
The file activity pipeline now subscribes to enriched and unenriched process indicator events via the internal pub/sub system, rather than performing its own process enrichment. This allows file activities to be correlated with process indicators that have already been enriched by the process pipeline.
User-facing documentation
Testing and quality
Automated testing
How I validated my change
Validated together with the process pipeline, outlined in #18546