ROX-31047: use pubsub for process enrichment#18546
Conversation
|
Skipping CI for Draft Pull Request. |
There was a problem hiding this comment.
Hey - I've found 8 issues, and left some high level feedback:
- The feature-flag checks for
SensorInternalPubSubare duplicated across the process and filesystem pipelines and the enricher; consider centralizing the mode selection (e.g., a helper or config struct) so the behavior for pub/sub vs legacy mode can’t accidentally diverge over time. - In the pub/sub path (
publishEnrichedIndicatorandenricher.enrich), you’re creating events withcontext.Background()instead of propagating the signal’s existing context; if feasible, threading through the original context would preserve cancellation/deadline semantics and aid observability.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The feature-flag checks for `SensorInternalPubSub` are duplicated across the process and filesystem pipelines and the enricher; consider centralizing the mode selection (e.g., a helper or config struct) so the behavior for pub/sub vs legacy mode can’t accidentally diverge over time.
- In the pub/sub path (`publishEnrichedIndicator` and `enricher.enrich`), you’re creating events with `context.Background()` instead of propagating the signal’s existing context; if feasible, threading through the original context would preserve cancellation/deadline semantics and aid observability.
## Individual Comments
### Comment 1
<location> `sensor/common/processsignal/pipeline.go:62-64` </location>
<code_context>
+ msgCtx: msgCtx,
+ }
+
+ if features.SensorInternalPubSub.Enabled() && pubSubDispatcher != nil {
+ log.Info("File system pipeline using pub/sub mode for process enrichment")
+ p.processCache = expiringcache.NewExpiringCache[string, *storage.ProcessIndicator](5 * time.Minute)
</code_context>
<issue_to_address>
**issue (bug_risk):** Consider failing fast if the consumer registration for enriched process indicators fails in pub/sub mode.
Since pub/sub mode depends on `RegisterConsumerToLane` for enriched indicators, a failed registration currently only logs and continues. This leaves the pipeline running in a misconfigured state and silently dropping enriched events. Consider surfacing the error (or disabling pub/sub and falling back to legacy) so misconfigurations or dispatcher issues are detected early instead of causing silent data loss.
</issue_to_address>
### Comment 2
<location> `sensor/common/filesystem/pipeline/pipeline.go:56-60` </location>
<code_context>
+ msgCtx: msgCtx,
+ }
+
+ if features.SensorInternalPubSub.Enabled() && pubSubDispatcher != nil {
+ log.Info("File system pipeline using pub/sub mode for process enrichment")
+ p.processCache = expiringcache.NewExpiringCache[string, *storage.ProcessIndicator](5 * time.Minute)
+
+ if err := pubSubDispatcher.RegisterConsumerToLane(pubsub.EnrichedProcessIndicatorTopic, pubsub.EnrichedProcessIndicatorLane, p.cacheEnrichedIndicator); err != nil {
+ log.Errorf("Failed to register consumer for enriched process indicators in file system pipeline: %v", err)
+ }
</code_context>
<issue_to_address>
**issue (bug_risk):** Similar to the process pipeline, filesystem pipeline should handle consumer registration failures more explicitly.
If `RegisterConsumerToLane` fails, the pipeline proceeds with an initialized but never populated `processCache`, so filesystem events silently lose the enrichment optimization. Consider either returning an error from `NewFileSystemPipeline` or explicitly disabling the pub/sub enrichment path here so the failure is surfaced and behavior remains well-defined.
</issue_to_address>
### Comment 3
<location> `sensor/common/processsignal/pipeline.go:228-229` </location>
<code_context>
+ }
+}
+
+// processEnrichedIndicator replaces the sendIndicatorEvent goroutine in legacy mode.
+func (p *Pipeline) processEnrichedIndicator(event pubsub.Event) error {
+ enrichedEvent, ok := event.(*EnrichedProcessIndicatorEvent)
+ if !ok {
</code_context>
<issue_to_address>
**nitpick:** The comment on processEnrichedIndicator is misleading given the current control flow.
This function is only registered and invoked in the pub/sub branch of `NewProcessPipeline`; legacy mode still uses `sendIndicatorEvent`. The current comment implies the reverse. Please rephrase to clarify that `processEnrichedIndicator` is the pub/sub consumer counterpart to the legacy `sendIndicatorEvent` goroutine.
</issue_to_address>
### Comment 4
<location> `sensor/common/processsignal/enricher.go:180-187` </location>
<code_context>
- e.indicators <- indicator
+ normalize.Indicator(indicator)
+
+ if features.SensorInternalPubSub.Enabled() && e.pubSubDispatcher != nil {
+ event := NewEnrichedProcessIndicatorEvent(context.Background(), indicator)
+ if err := e.pubSubDispatcher.Publish(event); err != nil {
+ log.Errorf("Failed to publish enriched process indicator from enricher for deployment %s with id %s: %v",
+ indicator.GetDeploymentId(), indicator.GetId(), err)
+ metrics.IncrementProcessEnrichmentDrops()
+ }
+ } else {
+ e.indicators <- indicator
+ }
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Consider surfacing publish failures from the enricher more prominently or providing a fallback.
In pub/sub mode, a `Publish` failure drops the indicator and only logs/increments a metric. Since this is the sole egress for enriched indicators in this mode, transient dispatcher issues can cause silent data loss. Please consider a more robust failure strategy (e.g., retries, surfacing the error, or falling back to the legacy channel when publishing fails).
Suggested implementation:
```golang
if features.SensorInternalPubSub.Enabled() && e.pubSubDispatcher != nil {
event := NewEnrichedProcessIndicatorEvent(context.Background(), indicator)
// Retry publish a few times before falling back to the legacy channel to avoid silent data loss.
const maxPublishAttempts = 3
var lastErr error
for attempt := 1; attempt <= maxPublishAttempts; attempt++ {
if err := e.pubSubDispatcher.Publish(event); err != nil {
lastErr = err
log.Errorf("Failed to publish enriched process indicator from enricher (attempt %d/%d) for deployment %s with id %s: %v",
attempt, maxPublishAttempts, indicator.GetDeploymentId(), indicator.GetId(), err)
metrics.IncrementProcessEnrichmentDrops()
continue
}
// Successfully published, clear error and stop retrying.
lastErr = nil
break
}
// If we exhausted all attempts, fall back to the legacy indicators channel.
if lastErr != nil {
log.Warnf("Falling back to legacy enriched process indicator channel for deployment %s with id %s after publish failures: %v",
indicator.GetDeploymentId(), indicator.GetId(), lastErr)
if e.indicators != nil {
e.indicators <- indicator
} else {
log.Errorf("Legacy enriched process indicator channel is not available; dropping indicator for deployment %s with id %s after publish failures",
indicator.GetDeploymentId(), indicator.GetId())
}
}
} else {
e.indicators <- indicator
}
```
1. If you want to **surface the publish failure up the call stack** (beyond logging/metrics/fallback), the enclosing function will need to:
- Return an `error` (or include the error in its return type), and
- Propagate `lastErr` instead of swallowing it after the fallback.
2. If there is a centralized retry/backoff helper in your codebase (e.g. an internal `retry` package), you may want to replace the inline loop with that helper to keep retry semantics consistent across the project.
</issue_to_address>
### Comment 5
<location> `sensor/common/processsignal/pipeline_test.go:181` </location>
<code_context>
mockDetector := mocks.NewMockDetector(mockCtrl)
p := NewProcessPipeline(sensorEvents, mockStore, filter.NewFilter(5, 5, []int{10, 10, 10}),
- mockDetector)
+ mockDetector, nil)
</code_context>
<issue_to_address>
**suggestion (testing):** Explicitly control the feature flag in tests to avoid mode-dependent flakiness
The constructor now depends on `features.SensorInternalPubSub`, but these tests rely on whatever the global default happens to be. Please update them to explicitly set/restore `features.SensorInternalPubSub` (e.g., via a helper or the feature gate API) so they consistently exercise the legacy mode they target. Optionally, add complementary tests with the flag enabled and a fake dispatcher to cover the new behavior as well.
Suggested implementation:
```golang
mockDetector := mocks.NewMockDetector(mockCtrl)
prevSensorInternalPubSub := features.SensorInternalPubSub.Enabled()
features.SensorInternalPubSub.SetTest(false)
t.Cleanup(func() {
features.SensorInternalPubSub.SetTest(prevSensorInternalPubSub)
})
pipeline := NewProcessPipeline(sensorEvents, mockStore,
filter.NewFilter(5, 5, []int{3, 3, 3}),
mockDetector, nil)
```
```golang
mockDetector := mocks.NewMockDetector(mockCtrl)
prevSensorInternalPubSub := features.SensorInternalPubSub.Enabled()
features.SensorInternalPubSub.SetTest(false)
t.Cleanup(func() {
features.SensorInternalPubSub.SetTest(prevSensorInternalPubSub)
})
p := NewProcessPipeline(sensorEvents, mockStore, filter.NewFilter(5, 5, []int{10, 10, 10}),
mockDetector, nil)
p.Notify(common.SensorComponentEventCentralReachable)
```
```golang
mockDetector := mocks.NewMockDetector(mockCtrl)
prevSensorInternalPubSub := features.SensorInternalPubSub.Enabled()
features.SensorInternalPubSub.SetTest(false)
t.Cleanup(func() {
features.SensorInternalPubSub.SetTest(prevSensorInternalPubSub)
})
p := NewProcessPipeline(sensorEvents, mockStore, filter.NewFilter(5, 5, []int{10, 10, 10}),
mockDetector, nil)
```
1. Ensure `pipeline_test.go` imports the features package, e.g.:
- `github.com/stackrox/rox/pkg/features`
Add this to the existing import block if it is not already present.
2. If the actual feature gate API differs (e.g. the methods are named `SetForTest`, `Override`, `EnabledSettingForTest`, etc.), adjust:
- `features.SensorInternalPubSub.Enabled()`
- `features.SensorInternalPubSub.SetTest(...)`
to the correct functions used elsewhere in your test suite for feature flag control.
3. To add complementary tests with the flag enabled and a fake dispatcher (as mentioned in the review comment), you can:
- Create parallel tests that call `features.SensorInternalPubSub.SetTest(true)` in the same pattern,
- Provide a fake implementation of the dispatcher and pass it instead of `nil` in `NewProcessPipeline(...)`,
- Assert on the new behavior specific to the internal pub/sub mode.
</issue_to_address>
### Comment 6
<location> `sensor/common/processsignal/pipeline.go:61` </location>
<code_context>
}
- go p.sendIndicatorEvent()
+
+ // Dual-mode initialization based on feature flag
+ if features.SensorInternalPubSub.Enabled() && pubSubDispatcher != nil {
+ log.Info("Process pipeline using pub/sub mode")
</code_context>
<issue_to_address>
**issue (complexity):** Consider encapsulating the legacy vs pub/sub handling behind a small sink strategy interface and shared handler function so Pipeline no longer needs scattered feature-flag branches and duplicated processing logic.
The feature addition does introduce “dual‑mode” complexity into `Pipeline` (constructor, `Process`, `Shutdown`, `processEnrichedIndicator`). You can keep all functionality but localize the complexity by pushing the mode switch into a small strategy interface and sharing the core processing logic.
### 1. Hide legacy vs pub/sub behind a sink interface
Instead of optional fields plus feature flag checks, let `Pipeline` depend on an `enrichedIndicatorSink` that knows how to deliver enriched indicators and how to shut down.
```go
type enrichedIndicatorSink interface {
Deliver(ctx context.Context, ind *storage.ProcessIndicator)
Shutdown()
}
```
In `Pipeline`, keep only the abstraction:
```go
type Pipeline struct {
clusterEntities *clusterentities.Store
indicators chan *message.ExpiringMessage
enricher *enricher
processFilter filter.Filter
detector detector.Detector
sink enrichedIndicatorSink
stopper concurrency.Stopper
cancelEnricherCtx context.CancelCauseFunc
}
```
Then `NewProcessPipeline` decides once which implementation to use, instead of wiring everything inline in “if pubsub / else” blocks scattered across methods:
```go
func NewProcessPipeline(
indicators chan *message.ExpiringMessage,
clusterEntities *clusterentities.Store,
processFilter filter.Filter,
detector detector.Detector,
pubSubDispatcher common.PubSubDispatcher,
) *Pipeline {
enricherCtx, cancelEnricherCtx := context.WithCancelCause(context.Background())
en := newEnricher(enricherCtx, clusterEntities, pubSubDispatcher)
p := &Pipeline{
clusterEntities: clusterEntities,
indicators: indicators,
enricher: en,
processFilter: processFilter,
detector: detector,
cancelEnricherCtx: cancelEnricherCtx,
stopper: concurrency.NewStopper(),
}
if features.SensorInternalPubSub.Enabled() && pubSubDispatcher != nil {
p.sink = newPubSubSink(pubSubDispatcher, p.processEnrichedIndicator)
} else {
p.sink = newChannelSink(en.getEnrichedC(), p.stopper, p.sendIndicatorEvent)
}
return p
}
```
Concrete implementations keep the existing behavior but localize it:
```go
type channelSink struct {
enrichedIndicators chan *storage.ProcessIndicator
cm *channelmultiplexer.ChannelMultiplexer[*storage.ProcessIndicator]
}
func newChannelSink(
enricherCh <-chan *storage.ProcessIndicator,
stopper concurrency.Stopper,
startSend func(),
) *channelSink {
enrichedIndicators := make(chan *storage.ProcessIndicator)
cm := channelmultiplexer.NewMultiplexer[*storage.ProcessIndicator]()
cm.AddChannel(enricherCh)
cm.AddChannel(enrichedIndicators)
cm.Run()
go startSend()
return &channelSink{
enrichedIndicators: enrichedIndicators,
cm: cm,
}
}
func (s *channelSink) Deliver(_ context.Context, ind *storage.ProcessIndicator) {
s.enrichedIndicators <- ind
}
func (s *channelSink) Shutdown() {
close(s.enrichedIndicators)
}
```
```go
type pubSubSink struct {
dispatcher common.PubSubDispatcher
}
func newPubSubSink(
dispatcher common.PubSubDispatcher,
consumer pubsub.ConsumerFunc,
) *pubSubSink {
_ = dispatcher.RegisterConsumerToLane(
pubsub.EnrichedProcessIndicatorTopic,
pubsub.EnrichedProcessIndicatorLane,
consumer,
)
return &pubSubSink{dispatcher: dispatcher}
}
func (s *pubSubSink) Deliver(ctx context.Context, ind *storage.ProcessIndicator) {
_ = s.dispatcher.Publish(NewEnrichedProcessIndicatorEvent(ctx, ind))
}
func (s *pubSubSink) Shutdown() {
// no-op; dispatcher lifecycle owned elsewhere
}
```
This removes the need for `enrichedIndicators`, `cm`, and `pubSubDispatcher` to be optional/nil on `Pipeline`, and gets rid of the per‑call feature flag check in `Process`.
Your `Process` hot path can then be simplified:
```go
func (p *Pipeline) Process(indicator *storage.ProcessIndicator) {
// ... existing enrichment logic ...
p.sink.Deliver(context.Background(), indicator)
}
```
And `Shutdown` no longer needs mode‑specific guards:
```go
func (p *Pipeline) Shutdown() {
p.cancelEnricherCtx(errors.New("pipeline shutdown"))
defer func() {
p.sink.Shutdown()
_ = p.enricher.Stopped().Wait()
_ = p.stopper.Client().Stopped().Wait()
}()
p.stopper.Client().Stop()
}
```
### 2. Share core processing between legacy and pub/sub consumers
`processEnrichedIndicator` currently reimplements the same logic that the legacy `sendIndicatorEvent` path performs. You can factor out the common work into a helper that both legacy and pub/sub paths call.
For instance:
```go
func (p *Pipeline) handleEnrichedIndicator(ctx context.Context, ind *storage.ProcessIndicator) {
if !p.processFilter.Add(ind) {
return
}
p.detector.ProcessIndicator(ctx, ind)
p.sendToCentral(message.NewExpiring(ctx, ¢ral.MsgFromSensor{
Msg: ¢ral.MsgFromSensor_Event{
Event: ¢ral.SensorEvent{
Id: ind.GetId(),
Action: central.ResourceAction_CREATE_RESOURCE,
Resource: ¢ral.SensorEvent_ProcessIndicator{
ProcessIndicator: ind,
},
},
},
}))
metrics.SetProcessSignalBufferSizeGauge(len(p.indicators))
}
```
Then:
```go
// legacy goroutine:
func (p *Pipeline) sendIndicatorEvent() {
for ind := range p.enrichedIndicators {
p.handleEnrichedIndicator(context.Background(), ind)
}
}
```
```go
// pub/sub consumer:
func (p *Pipeline) processEnrichedIndicator(event pubsub.Event) error {
enrichedEvent, ok := event.(*EnrichedProcessIndicatorEvent)
if !ok || enrichedEvent.Indicator == nil {
return errors.New("invalid enriched process indicator event")
}
p.handleEnrichedIndicator(enrichedEvent.Context, enrichedEvent.Indicator)
return nil
}
```
That keeps all existing behavior but removes duplicated logic and centralizes any future changes to the enriched‑indicator handling in one place.
These two changes (sink interface + shared handler) reduce the dual‑mode branching in `Pipeline`, remove nil guards on optional fields, and simplify the hot path while preserving both legacy and pub/sub functionality.
</issue_to_address>
### Comment 7
<location> `sensor/common/filesystem/pipeline/pipeline.go:35` </location>
<code_context>
activityChan chan *sensorAPI.FileActivity
clusterEntities *clusterentities.Store
+ // processCache avoids duplicate enrichment by reusing enriched indicators from the process pipeline.
+ // Key format: "containerID:processSignalID"
+ processCache expiringcache.Cache[string, *storage.ProcessIndicator]
</code_context>
<issue_to_address>
**issue (complexity):** Consider encapsulating the pub/sub and expiring-cache logic for process indicators behind a small helper type so the filesystem pipeline stays focused on its core responsibilities.
You can keep the new pub/sub + cache behavior but hide most of the wiring and mode conditionals behind a small helper type. That way the pipeline doesn’t need to know about cache keys, event types, or feature flags.
### 1. Encapsulate cache + pub/sub wiring
Extract the cache + consumer registration into a dedicated helper:
```go
type processIndicatorCache struct {
cache expiringcache.Cache[string, *storage.ProcessIndicator]
}
func newProcessIndicatorCache(dispatcher common.PubSubDispatcher) (*processIndicatorCache, error) {
if !features.SensorInternalPubSub.Enabled() || dispatcher == nil {
return nil, nil
}
pic := &processIndicatorCache{
cache: expiringcache.NewExpiringCache[string, *storage.ProcessIndicator](5 * time.Minute),
}
if err := dispatcher.RegisterConsumerToLane(
pubsub.EnrichedProcessIndicatorTopic,
pubsub.EnrichedProcessIndicatorLane,
pic.handleEvent,
); err != nil {
return nil, err
}
return pic, nil
}
func (c *processIndicatorCache) handleEvent(event pubsub.Event) error {
enrichedEvent, ok := event.(*processsignal.EnrichedProcessIndicatorEvent)
if !ok {
return fmt.Errorf("unexpected event type: %T", event)
}
indicator := enrichedEvent.Indicator
if indicator == nil || indicator.GetSignal() == nil {
return nil
}
sig := indicator.GetSignal()
key := cacheKey(sig.GetContainerId(), sig.GetId())
c.cache.Add(key, indicator)
return nil
}
func (c *processIndicatorCache) Get(process *sensorAPI.ProcessSignal) (*storage.ProcessIndicator, bool) {
if c == nil || process.GetContainerId() == "" {
return nil, false
}
key := cacheKey(process.GetContainerId(), process.GetId())
return c.cache.Get(key)
}
```
Then the pipeline just holds this helper instead of the raw cache and dispatcher:
```go
type Pipeline struct {
detector detector.Detector
stopper concurrency.Stopper
activityChan chan *sensorAPI.FileActivity
clusterEntities *clusterentities.Store
processCache *processIndicatorCache
msgCtx context.Context
}
```
### 2. Simplify the constructor
Move feature-flag and dispatcher logic into the helper, so the constructor is mostly wiring:
```go
func NewFileSystemPipeline(
detector detector.Detector,
clusterEntities *clusterentities.Store,
activityChan chan *sensorAPI.FileActivity,
pubSubDispatcher common.PubSubDispatcher,
) *Pipeline {
msgCtx := context.Background()
cache, err := newProcessIndicatorCache(pubSubDispatcher)
if err != nil {
log.Errorf("Failed to init pub/sub cache for enriched process indicators: %v", err)
log.Info("File system pipeline falling back to legacy mode (direct enrichment)")
} else if cache != nil {
log.Info("File system pipeline using pub/sub mode for process enrichment")
} else {
log.Info("File system pipeline using legacy mode (direct enrichment)")
}
p := &Pipeline{
detector: detector,
activityChan: activityChan,
clusterEntities: clusterEntities,
processCache: cache,
stopper: concurrency.NewStopper(),
msgCtx: msgCtx,
}
go p.run()
return p
}
```
### 3. Remove mode conditionals from the hot path
`getIndicator` no longer needs to know about cache keys or pub/sub events; it just consults the helper:
```go
func (p *Pipeline) getIndicator(process *sensorAPI.ProcessSignal) *storage.ProcessIndicator {
if cached, ok := p.processCache.Get(process); ok {
log.Debugf("Using cached enriched indicator for process %s in container %s",
process.GetId(), process.GetContainerId())
return cached
}
// existing legacy enrichment logic unchanged below
signal := &storage.ProcessSignal{
Id: process.GetId(),
Uid: process.GetUid(),
Gid: process.GetGid(),
Time: process.GetCreationTime(),
Name: process.GetName(),
Args: process.GetArgs(),
ExecFilePath: process.GetExecFilePath(),
Pid: process.GetPid(),
Scraped: process.GetScraped(),
}
pi := &storage.ProcessIndicator{
Id: uuid.NewV4().String(),
Signal: signal,
}
if process.GetContainerId() == "" {
return pi
}
metadata, ok, _ := p.clusterEntities.LookupByContainerID(process.GetContainerId())
if ok {
pi.DeploymentId = metadata.DeploymentID
pi.ContainerName = metadata.ContainerName
pi.PodId = metadata.PodID
pi.PodUid = metadata.PodUID
} else {
log.Warnf("Container ID: %s not found for file activity", process.GetContainerId())
}
return pi
}
```
This keeps all functionality (pub/sub reuse, expiring cache, legacy fallback) but:
- `NewFileSystemPipeline` no longer mixes feature flags, dispatcher wiring, and cache internals.
- The pipeline no longer knows about pub/sub event types or cache key formats.
- The hot path is a simple “try cache -> legacy enrichment” call, with the mode encapsulated.
</issue_to_address>
### Comment 8
<location> `sensor/common/processsignal/enricher.go:176` </location>
<code_context>
func (e *enricher) enrich(indicator *storage.ProcessIndicator, metadata clusterentities.ContainerMetadata) {
populateIndicatorFromCachedContainer(indicator, metadata)
- e.indicators <- indicator
</code_context>
<issue_to_address>
**issue (complexity):** Consider introducing a delivery function field on `enricher` and selecting the mode once in `newEnricher` so `enrich` only calls a single `deliver` function without branching on flags or channels.
You can keep the new functionality but move the mode selection out of the hot path and avoid repeated flag checks by introducing a small delivery abstraction inside `enricher`.
For example, add a delivery function field:
```go
type enricher struct {
lru *lru.Cache[string, *containerWrap]
clusterEntities *clusterentities.Store
indicators chan *storage.ProcessIndicator
metadataCallbackChan <-chan clusterentities.ContainerMetadata
pubSubDispatcher common.PubSubDispatcher
stopper concurrency.Stopper
deliver func(*storage.ProcessIndicator)
}
```
Then set up the delivery strategy once in `newEnricher`:
```go
func newEnricher(ctx context.Context, clusterEntities *clusterentities.Store, pubSubDispatcher common.PubSubDispatcher) *enricher {
// ... lru + callback setup ...
e := &enricher{
lru: lru,
clusterEntities: clusterEntities,
metadataCallbackChan: callbackChan,
pubSubDispatcher: pubSubDispatcher,
stopper: concurrency.NewStopper(),
}
// Decide mode once
if features.SensorInternalPubSub.Enabled() && pubSubDispatcher != nil {
e.deliver = func(indicator *storage.ProcessIndicator) {
event := NewEnrichedProcessIndicatorEvent(context.Background(), indicator)
if err := e.pubSubDispatcher.Publish(event); err != nil {
log.Errorf("Failed to publish enriched process indicator from enricher for deployment %s with id %s: %v",
indicator.GetDeploymentId(), indicator.GetId(), err)
metrics.IncrementProcessEnrichmentDrops()
}
}
} else {
e.indicators = make(chan *storage.ProcessIndicator)
e.deliver = func(indicator *storage.ProcessIndicator) {
e.indicators <- indicator
}
}
go e.processLoop(ctx)
return e
}
```
With that, `enrich` no longer needs to know about flags, channels, or dispatchers:
```go
func (e *enricher) enrich(indicator *storage.ProcessIndicator, metadata clusterentities.ContainerMetadata) {
populateIndicatorFromCachedContainer(indicator, metadata)
normalize.Indicator(indicator)
e.deliver(indicator)
metrics.IncrementProcessEnrichmentHits()
metrics.SetProcessEnrichmentCacheSize(float64(e.lru.Len()))
}
```
This keeps:
- The legacy behavior (channel-based delivery, `getEnrichedC` still works in that mode).
- The new pub/sub behavior (no legacy channel created/used in pub/sub mode).
But the dual-mode logic is now localized in `newEnricher`, and the hot path (`enrich`) has a single, simple invariant and no feature-flag or nil checks.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18546 +/- ##
==========================================
+ Coverage 49.38% 49.39% +0.01%
==========================================
Files 2743 2744 +1
Lines 207037 207179 +142
==========================================
+ Hits 102240 102340 +100
- Misses 97216 97245 +29
- Partials 7581 7594 +13
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Images are ready for the commit at 2c26bfa. To use with deploy scripts, first |
4834231 to
14e53a2
Compare
6de96fb to
2835874
Compare
2835874 to
aca46f0
Compare
af6c174 to
531e526
Compare
There was a problem hiding this comment.
Hey - I've found 5 issues, and left some high level feedback:
- Both the process pipeline and the filesystem pipeline register the same ConsumerID (EnrichedProcessConsumer) on the same topic/lane, which likely causes one registration to overwrite the other in the dispatcher — consider using distinct consumer IDs per component or per lane to ensure both consumers receive enriched process indicator events.
- In NewProcessPipeline, failures from inner.Start() are only logged and the Pipeline is still returned; if startup errors mean the pipeline is not actually functional, consider surfacing that error to the caller or exposing a health indicator so callers can react instead of proceeding with a partially initialized pipeline.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Both the process pipeline and the filesystem pipeline register the same ConsumerID (EnrichedProcessConsumer) on the same topic/lane, which likely causes one registration to overwrite the other in the dispatcher — consider using distinct consumer IDs per component or per lane to ensure both consumers receive enriched process indicator events.
- In NewProcessPipeline, failures from inner.Start() are only logged and the Pipeline is still returned; if startup errors mean the pipeline is not actually functional, consider surfacing that error to the caller or exposing a health indicator so callers can react instead of proceeding with a partially initialized pipeline.
## Individual Comments
### Comment 1
<location path="sensor/common/filesystem/pipeline/pipeline.go" line_range="202-211" />
<code_context>
+func (p *Pipeline) bufferActivity(fs *sensorAPI.FileActivity) {
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Buffered activity TTL is based only on the first activity per process, not the most recent.
`entry.timestamp` is only set when the buffer entry is created, but `pruneExpiredBuffers` evicts based on `now.Sub(entry.timestamp) > bufferedActivityTTL`. For long‑lived processes this causes newer activities to be dropped as soon as the oldest entry hits the TTL. Consider updating `entry.timestamp` on each new activity, or tracking separate creation and last-updated timestamps to make the eviction behavior explicit.
Suggested implementation:
```golang
func (p *Pipeline) bufferActivity(fs *sensorAPI.FileActivity) {
process := fs.GetProcess()
if process == nil {
return
}
key := cacheKey(process.GetContainerId(), process.GetId())
p.activityMutex.Lock()
defer p.activityMutex.Unlock()
if p.totalBufferedActivity >= maxTotalBufferedActivities {
```
```golang
entry, exists := p.activityBuffer[key]
if !exists {
entry = &bufferedActivityEntry{}
p.activityBuffer[key] = entry
}
// Refresh the buffer's timestamp on every new activity so eviction is based
// on the most recent activity for this process, not just the first one.
entry.timestamp = time.Now()
```
1. Ensure there is no other code path that still sets `timestamp` only at creation time for `bufferedActivityEntry`; if such code exists, it can be simplified to rely on the new `entry.timestamp = time.Now()` behavior in `bufferActivity`.
2. Verify that `bufferedActivityEntry`'s zero-value construction (without `timestamp` in a composite literal) is valid; if the struct is using field tags or ordering-sensitive initialization elsewhere, adjust those initializations similarly (i.e., construct without `timestamp` and rely on `bufferActivity` to populate it).
3. If `pruneExpiredBuffers` or any other code relies on `timestamp` semantics as a "creation time", update comments and any related logic to make it clear that `timestamp` is now the "last activity time". If you need both semantics, introduce a `createdAt` field and keep `timestamp` (or rename it) as `lastUpdatedAt`, updating both accordingly in `bufferActivity`.
</issue_to_address>
### Comment 2
<location path="sensor/common/filesystem/pipeline/pipeline.go" line_range="343-347" />
<code_context>
+ now := time.Now()
+ expiredKeys := make([]string, 0)
+
+ for key, entry := range p.bufferedActivity {
+ if now.Sub(entry.timestamp) > bufferedActivityTTL {
+ expiredKeys = append(expiredKeys, key)
+ p.totalBufferedActivity -= len(entry.activities)
+ metrics.IncrementFileActivityBufferDrops()
+ }
+ }
</code_context>
<issue_to_address>
**issue:** File activity buffer drop counter increments per buffer entry, not per dropped activity, which mismatches its description.
In `pruneExpiredBuffers`, `file_activity_buffer_drops` is incremented once per expired buffer key, regardless of how many activities are dropped from that buffer. This conflicts with its description as counting dropped activities. If you want activity-level counting, consider something like:
```go
for key, entry := range p.bufferedActivity {
if now.Sub(entry.timestamp) > bufferedActivityTTL {
expiredKeys = append(expiredKeys, key)
p.totalBufferedActivity -= len(entry.activities)
metrics.IncrementFileActivityBufferDropsBy(len(entry.activities))
}
}
```
and update the metrics helper. Otherwise, adjust the metric name/help text to indicate it counts buffers rather than activities.
</issue_to_address>
### Comment 3
<location path="sensor/common/processsignal/pipeline.go" line_range="29" />
<code_context>
-// Pipeline is the struct that handles a process signal
-type Pipeline struct {
+type processPipeline interface {
+ Start() error
+ Stop() error
</code_context>
<issue_to_address>
**issue (complexity):** Consider collapsing the multiple pipeline types into a single Pipeline that embeds basePipeline and directly handles both channel and pub/sub modes to reduce indirection and clarify ownership.
You can simplify this without losing the pub/sub functionality by tightening the ownership and collapsing unnecessary layers.
### 1. Remove `processPipeline` + wrapper `Pipeline` split
Right now you have:
- `Pipeline` (wrapper)
- `processPipeline` interface
- `basePipeline`
- `channelPipeline` / `pubsubPipeline`
`basePipeline.Start()` is a no-op and `Pipeline` + `basePipeline` both hold/use the same `Stopper`, which creates ownership confusion.
You can keep `basePipeline` as the shared logic and drop the indirection via `processPipeline` and the `inner` wrapper entirely. Make `Pipeline` *be* the concrete type that holds the mode-specific wiring:
```go
type Pipeline struct {
basePipeline
// mode-specific fields
cm *channelmultiplexer.ChannelMultiplexer[*storage.ProcessIndicator]
clusterEntities *clusterentities.Store
enrichedIndicators chan *storage.ProcessIndicator
pubSubDispatcher common.PubSubDispatcher
usePubSub bool
}
```
Then in `NewProcessPipeline` configure the mode and start sources directly:
```go
func NewProcessPipeline(
indicators chan *message.ExpiringMessage,
clusterEntities *clusterentities.Store,
processFilter filter.Filter,
detector detector.Detector,
pubSubDispatcher common.PubSubDispatcher,
) *Pipeline {
enricherCtx, cancelEnricherCtx := context.WithCancelCause(context.Background())
en := newEnricher(enricherCtx, clusterEntities, pubSubDispatcher)
stopper := concurrency.NewStopper()
base := newBasePipeline(indicators, en, processFilter, detector, stopper, cancelEnricherCtx)
p := &Pipeline{
basePipeline: base,
clusterEntities: clusterEntities,
pubSubDispatcher: pubSubDispatcher,
usePubSub: features.SensorInternalPubSub.Enabled() && pubSubDispatcher != nil,
}
if err := p.startSources(); err != nil {
log.Error("Failed to start process pipeline:", err)
}
return p
}
func (p *Pipeline) startSources() error {
if p.usePubSub {
log.Info("Process pipeline using pub/sub mode")
return p.pubSubDispatcher.RegisterConsumerToLane(
pubsub.EnrichedProcessConsumer,
pubsub.EnrichedProcessIndicatorTopic,
pubsub.EnrichedProcessIndicatorLane,
p.processEnrichedIndicator,
)
}
log.Info("Process pipeline using legacy channel mode")
p.enrichedIndicators = make(chan *storage.ProcessIndicator)
p.cm = channelmultiplexer.NewMultiplexer[*storage.ProcessIndicator]()
p.cm.AddChannel(p.enricher.getEnrichedC())
p.cm.AddChannel(p.enrichedIndicators)
p.cm.Run()
go p.sendIndicatorEvent()
return nil
}
```
`Process` then just delegates to the appropriate source path without another interface layer:
```go
func (p *Pipeline) Process(signal *storage.ProcessSignal) {
select {
case <-p.stopper.Flow().StopRequested():
dropSignal(signal, "pipeline shutting down before enrichment")
return
default:
}
indicator := &storage.ProcessIndicator{
Id: uuid.NewV4().String(),
Signal: signal,
}
if p.usePubSub {
event := NewUnenrichedProcessIndicatorEvent(context.Background(), indicator)
if err := p.pubSubDispatcher.Publish(event); err != nil {
dropSignal(indicator.GetSignal(), "Failed to publish to dispatcher")
}
return
}
p.enricher.add(indicator)
}
```
With this:
- `processPipeline`, `channelPipeline`, and `pubsubPipeline` can go away.
- There is a single `Pipeline` struct, with one `Stopper` owned and coordinated in one place.
- `basePipeline` still cleanly holds shared logic (`processEnrichedIndicator`, `handleEnrichedIndicator`, `sendToCentral`, `Stop`).
### 2. Make `Pipeline` the sole owner of `Stopper` and shutdown
Once you collapse the wrapper, the shutdown path becomes clearer:
```go
// Shutdown closes all communication channels and shuts down the enricher.
func (p *Pipeline) Shutdown() {
_ = p.Stop() // delegate to basePipeline.Stop()
}
func (p *Pipeline) Stop() error {
// optional: channel-mode only
if p.enrichedIndicators != nil {
close(p.enrichedIndicators)
}
p.cancelEnricherCtx(errors.New("pipeline shutdown"))
p.stopper.Client().Stop()
_ = p.enricher.Stopped().Wait()
_ = p.stopper.Client().Stopped().Wait()
return nil
}
func (p *Pipeline) WaitForShutdown() error {
if err := p.stopper.Client().Stopped().Wait(); err != nil {
return errors.Wrap(err, "waiting for pipeline stopper")
}
return nil
}
```
This removes the dual ownership feeling (`Pipeline.stopper` vs `basePipeline.stopper` vs `inner.Stop`) and keeps lifecycle coordination in one visible place.
### 3. Simplify `Start`/`Stop` error handling
Both `channelPipeline.Start` and `pubsubPipeline.Start` were building `errorhelpers.NewErrorList`, but they only had one meaningful failure (`RegisterConsumerToLane`). Once you inline `startSources` into `Pipeline`, you can use plain error handling (as shown above), and `Stop` doesn’t really need aggregated errors either.
If you want to keep `basePipeline.Start()` for symmetry, you can keep it trivial without error aggregation:
```go
func (p *basePipeline) Start() error {
// no-op for now, kept for future extension
return nil
}
```
This reduces boilerplate making lifecycle code easier to follow, while preserving the new pub/sub behavior.
</issue_to_address>
### Comment 4
<location path="sensor/common/filesystem/pipeline/pipeline.go" line_range="43" />
<code_context>
+ timestamp time.Time
+}
+
type Pipeline struct {
detector detector.Detector
stopper concurrency.Stopper
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the buffering logic and enrichment branching from `Pipeline` into dedicated helper types to keep the pipeline focused and easier to reason about.
The added buffering + enrichment logic significantly increases `Pipeline`’s responsibilities. You can reduce complexity without changing behavior by extracting two focused helpers.
### 1. Extract buffering into its own type
All of `bufferedActivity`, `totalBufferedActivity`, `activityMutex`, `cleanupExpiredBuffers`, `pruneExpiredBuffers`, `bufferActivity`, `popBufferedActivity` can be moved into a small `activityBuffer` that owns its goroutine.
**Before (simplified in Pipeline):**
```go
type Pipeline struct {
// ...
bufferedActivity map[string]*bufferedActivityEntry
totalBufferedActivity int
activityMutex sync.Mutex
wg sync.WaitGroup
}
func (p *Pipeline) bufferActivity(fs *sensorAPI.FileActivity) { /* ... */ }
func (p *Pipeline) popBufferedActivity(key string) []*sensorAPI.FileActivity { /* ... */ }
func (p *Pipeline) cleanupExpiredBuffers() { /* ... */ }
func (p *Pipeline) pruneExpiredBuffers() { /* ... */ }
```
**After (Pipeline + helper):**
```go
type activityBuffer struct {
mu sync.Mutex
entries map[string]*bufferedActivityEntry
total int
stop chan struct{}
}
func newActivityBuffer() *activityBuffer {
return &activityBuffer{
entries: make(map[string]*bufferedActivityEntry),
stop: make(chan struct{}),
}
}
func (b *activityBuffer) Add(fs *sensorAPI.FileActivity) {
// move bufferActivity logic here (incl. metrics)
}
func (b *activityBuffer) Pop(key string) []*sensorAPI.FileActivity {
// move popBufferedActivity logic here
}
func (b *activityBuffer) Start() {
go func() {
ticker := time.NewTicker(bufferCleanupInterval)
defer ticker.Stop()
for {
select {
case <-b.stop:
return
case <-ticker.C:
b.pruneExpired()
}
}
}()
}
func (b *activityBuffer) Stop() {
close(b.stop)
}
func (b *activityBuffer) pruneExpired() {
// move pruneExpiredBuffers logic here
}
```
Then `Pipeline` shrinks to:
```go
type Pipeline struct {
detector detector.Detector
stopper concurrency.Stopper
activityChan chan *sensorAPI.FileActivity
clusterEntities *clusterentities.Store
pubSubDispatcher common.PubSubDispatcher
buffer *activityBuffer
wg sync.WaitGroup
msgCtx context.Context
}
func NewFileSystemPipeline(..., pubSubDispatcher common.PubSubDispatcher) *Pipeline {
p := &Pipeline{
// ...
buffer: newActivityBuffer(),
}
if features.SensorInternalPubSub.Enabled() && pubSubDispatcher != nil {
// ...
p.buffer.Start()
}
p.wg.Add(1)
go p.run()
return p
}
func (p *Pipeline) Stop() {
p.stopper.Client().Stop()
if p.buffer != nil {
p.buffer.Stop()
}
p.wg.Wait()
}
```
Usage sites become simpler:
```go
func (p *Pipeline) processFileActivity(fs *sensorAPI.FileActivity) {
// ...
if features.SensorInternalPubSub.Enabled() && p.pubSubDispatcher != nil {
p.buffer.Add(fs)
// publish...
return
}
// legacy path...
}
func (p *Pipeline) processEnrichedIndicator(event pubsub.Event) error {
// ...
buffered := p.buffer.Pop(key)
for _, fs := range buffered {
access := p.translateWithIndicator(fs, indicator)
// ...
}
return nil
}
```
This keeps all behavior, but moves locking, TTL, metrics, and goroutine lifecycle out of `Pipeline`.
### 2. Hide enrichment branching behind a small strategy
`processFileActivity` currently has three concerns: building indicators, deciding host vs container, and choosing legacy vs pub/sub. You can push the “how to enrich” decision into an interface with two implementations.
**Interface + implementations (sketch):**
```go
type enrichmentCoordinator interface {
Handle(ctx context.Context, fs *sensorAPI.FileActivity, indicator *storage.ProcessIndicator)
}
type legacyCoordinator struct {
clusterEntities *clusterentities.Store
detector detector.Detector
}
func (c *legacyCoordinator) Handle(ctx context.Context, fs *sensorAPI.FileActivity, indicator *storage.ProcessIndicator) {
process := fs.GetProcess()
metadata, ok, _ := c.clusterEntities.LookupByContainerID(process.GetContainerId())
if ok {
processsignal.PopulateIndicatorFromContainer(indicator, metadata)
} else {
log.Warnf("Container ID: %s not found for file activity", process.GetContainerId())
}
if access := translateWithIndicator(fs, indicator); access != nil {
c.detector.ProcessFileAccess(ctx, access)
}
}
type pubSubCoordinator struct {
buffer *activityBuffer
dispatcher common.PubSubDispatcher
}
func (c *pubSubCoordinator) Handle(ctx context.Context, fs *sensorAPI.FileActivity, indicator *storage.ProcessIndicator) {
c.buffer.Add(fs)
event := processsignal.NewUnenrichedProcessIndicatorEvent(ctx, indicator)
if err := c.dispatcher.Publish(event); err != nil {
log.Errorf("Failed to publish unenriched process indicator for file activity: %v", err)
}
}
```
`Pipeline` then just wires the correct strategy once:
```go
type Pipeline struct {
// ...
coordinator enrichmentCoordinator
}
func NewFileSystemPipeline(..., pubSubDispatcher common.PubSubDispatcher) *Pipeline {
p := &Pipeline{ /* ... */ }
if features.SensorInternalPubSub.Enabled() && pubSubDispatcher != nil {
p.buffer = newActivityBuffer()
p.coordinator = &pubSubCoordinator{
buffer: p.buffer,
dispatcher: pubSubDispatcher,
}
p.buffer.Start()
} else {
p.coordinator = &legacyCoordinator{
clusterEntities: clusterEntities,
detector: detector,
}
}
// ...
}
```
Now `processFileActivity` becomes closer to the original, linear flow:
```go
func (p *Pipeline) processFileActivity(fs *sensorAPI.FileActivity) {
process := fs.GetProcess()
if process == nil {
return
}
indicator := p.buildIndicator(fs)
if process.GetContainerId() == "" {
if access := p.translateWithIndicator(fs, indicator); access != nil {
p.detector.ProcessFileAccess(p.msgCtx, access)
}
return
}
p.coordinator.Handle(p.msgCtx, fs, indicator)
}
```
This removes feature-flag branching and pub/sub details from the main pipeline logic, while preserving current behavior.
</issue_to_address>
### Comment 5
<location path="sensor/common/processsignal/enricher.go" line_range="210" />
<code_context>
}
}
+func (e *enricher) publishEnrichedIndicator(indicator *storage.ProcessIndicator) {
+ if features.SensorInternalPubSub.Enabled() && e.pubSubDispatcher != nil {
+ event := NewEnrichedProcessIndicatorEvent(context.Background(), indicator)
</code_context>
<issue_to_address>
**issue (complexity):** Consider introducing small abstractions for publishing enriched indicators and for separating immediate enrichment from caching to simplify control flow and make the enricher easier to reason about.
You can reduce the new complexity with two small abstractions without changing behavior.
### 1. Extract an output strategy for enriched indicators
Let the constructor decide the output mechanism once, and make `enrich` oblivious to flags / dispatcher:
```go
type enrichedOutput interface {
Publish(*storage.ProcessIndicator)
}
type channelOutput struct {
ch chan *storage.ProcessIndicator
}
func (o channelOutput) Publish(ind *storage.ProcessIndicator) {
o.ch <- ind
}
type pubSubOutput struct {
dispatcher common.PubSubDispatcher
}
func (o pubSubOutput) Publish(ind *storage.ProcessIndicator) {
event := NewEnrichedProcessIndicatorEvent(context.Background(), ind)
if err := o.dispatcher.Publish(event); err != nil {
log.Errorf("Failed to publish enriched process indicator for deployment %s with id %s: %v",
ind.GetDeploymentId(), ind.GetId(), err)
metrics.IncrementProcessEnrichmentDrops()
}
}
```
Wire it in the constructor:
```go
type enricher struct {
lru *lru.Cache[string, *containerWrap]
clusterEntities *clusterentities.Store
indicators chan *storage.ProcessIndicator
metadataCallbackChan <-chan clusterentities.ContainerMetadata
output enrichedOutput
stopper concurrency.Stopper
}
func newEnricher(ctx context.Context, clusterEntities *clusterentities.Store, dispatcher common.PubSubDispatcher) *enricher {
// ... lru + callbackChan setup ...
indicators := make(chan *storage.ProcessIndicator)
var out enrichedOutput = channelOutput{ch: indicators}
if features.SensorInternalPubSub.Enabled() && dispatcher != nil {
out = pubSubOutput{dispatcher: dispatcher}
// register consumer, as you already do
}
e := &enricher{
lru: unenrichedCache,
clusterEntities: clusterEntities,
indicators: indicators,
metadataCallbackChan: callbackChan,
output: out,
stopper: concurrency.NewStopper(),
}
go e.processLoop(ctx)
return e
}
```
Then `enrich` becomes simpler and no longer needs `publishEnrichedIndicator` or flag checks:
```go
func (e *enricher) enrich(ind *storage.ProcessIndicator, md clusterentities.ContainerMetadata) {
PopulateIndicatorFromContainer(ind, md)
normalize.Indicator(ind)
e.output.Publish(ind)
metrics.IncrementProcessEnrichmentHits()
metrics.SetProcessEnrichmentCacheSize(float64(e.lru.Len()))
}
```
This removes branching from the hot path and centralizes the feature-flag logic in the constructor.
### 2. Split `add` into “try now” vs “cache for later”
Right now `add` mixes null checks, immediate enrichment, and LRU management. You can make the control flow clearer and easier to test with two helpers:
```go
func (e *enricher) add(ind *storage.ProcessIndicator) {
if !e.tryImmediateEnrich(ind) {
e.cacheForLaterEnrichment(ind)
}
}
func (e *enricher) tryImmediateEnrich(ind *storage.ProcessIndicator) bool {
if ind == nil || ind.GetSignal() == nil {
return false
}
sig := ind.GetSignal()
md, ok, _ := e.clusterEntities.LookupByContainerID(sig.GetContainerId())
if !ok {
return false
}
e.enrich(ind, md)
return true
}
func (e *enricher) cacheForLaterEnrichment(ind *storage.ProcessIndicator) {
if ind == nil || ind.GetSignal() == nil {
return
}
sig := ind.GetSignal()
id := sig.GetContainerId()
wrap, ok := e.lru.Get(id)
if !ok {
wrap = &containerWrap{expiration: time.Now().Add(containerExpiration)}
}
wrap.addProcess(ind)
e.lru.Add(id, wrap)
metrics.SetProcessEnrichmentCacheSize(float64(e.lru.Len()))
}
```
`processUnenrichedIndicator` and the legacy pipeline can both call `add`, but tests can target `tryImmediateEnrich` and `cacheForLaterEnrichment` independently, and the responsibilities are split cleanly.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
aa8ffb9 to
8ca2bdb
Compare
vikin91
left a comment
There was a problem hiding this comment.
The code looks good, but I have several comments. Those are not hard blockers, but rather my own insecurities about not knowing the PubSub system on the expert level yet (sorry for that).
- The 3 small detailed comments should be trivial to address.
- For the call
e.pubSubDispatcher.Publish(event)I would love to see some more debuggability added as minimum (metric). - Adding tests involving the new pubsub transport should also not be skipped. I left suggestions what could be covered (but feel free to decide yourself).
PR description - how tested: I would be curious how do you detect that this feature is working correctly. Did you look at particular log line, a metric, or the result in UI?
As the debuggability and the ability to quickly evaluate that sensor is working correctly is a priority in the Sensor team, I would appreciate leaving a bit more detailed information about that.
e1bea35 to
b2a3dae
Compare
|
/retest |
vikin91
left a comment
There was a problem hiding this comment.
Posting two more comments while I read through sensor/common/processsignal/pipeline_test.go
vikin91
left a comment
There was a problem hiding this comment.
I left several comments in the tests but they are not blocking and fully optional.
Once the error (Failed to register consumer for unenriched...) issue is addressed, this is green to me.
vikin91
left a comment
There was a problem hiding this comment.
Looks good, thanks for improving!
You still need to fix the style issue regarding the canceling of the context, but I am marking it green already, as you cannot merge with that check failing.
|
/retest |
Refactor the process signal pipeline and enricher to publish enriched and unenriched process indicators via the internal pub/sub system. This enables downstream consumers (such as the file activity pipeline) to subscribe to process enrichment events. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Return error from NewProcessPipeline on start failure, add pipeline mode metric, downgrade dropped-event log to Info, add pub/sub tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
e0dff7d to
2c26bfa
Compare
|
@Stringy: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
Description
With the new pub/sub system in Sensor, this PR refactors both the process signal pipeline and the file activity pipeline to enrich process indicators with deployment details, using new pub sub topics.
The both pipelines can now published unenriched indicators for the enricher to subscribe to, enrich, and publish back to a different enriched indicators topic. The pipelines subscribe to enriched indicators.
This allows both pipelines to use the enricher without relying on a tight coupling between components, and significantly improves the enrichment in the file activity pipeline which previously relied on a "dumb" approach, with no enrichment.
Some additional metrics have been added for file activity.
User-facing documentation
Testing and quality
Automated testing
How I validated my change
Tested locally on an OCP cluster, running e2e tests for process indicators. These tests confirm that enrichment has not regressed (processes are correctly correlated to the right deployments)
Confirmed pub/sub registration via new metric and via inspecting Sensor logs.
New unit tests should cover pub sub registration failures and happy path with full pub/sub enrichment
We do not yet have file activity e2e tests (coming in #19452) but I have created file activity policies and triggered file activity violations, ensuring that the embedded process indicators are correctly enriched.