ROX-30941: Enrich endpoints and processes together#16901
Conversation
|
This change is part of the following stack: Change managed by git-spice. |
|
Skipping CI for Draft Pull Request. |
There was a problem hiding this comment.
Hey there - I've reviewed your changes - here's some feedback:
- The PR title and description should be updated to accurately describe the removal of the process deduper and the related test refactoring, since the current wording doesn’t match the actual changes.
- Exposing TransitionBased.GetState for tests leaks internal state into the public API—consider moving it to a test‐only helper or interface to keep production code clean.
- The new info‐level logging in computeUpdatedEntitiesNoPast will be very verbose in production; it’s better suited for debug level to avoid flooding the logs.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The PR title and description should be updated to accurately describe the removal of the process deduper and the related test refactoring, since the current wording doesn’t match the actual changes.
- Exposing TransitionBased.GetState for tests leaks internal state into the public API—consider moving it to a test‐only helper or interface to keep production code clean.
- The new info‐level logging in computeUpdatedEntitiesNoPast will be very verbose in production; it’s better suited for debug level to avoid flooding the logs.
## Individual Comments
### Comment 1
<location> `sensor/common/networkflow/updatecomputer/transition_based.go:23` </location>
<code_context>
type deduperAction int
+func (d *deduperAction) String() string {
+ switch *d {
+ case deduperActionAdd:
</code_context>
<issue_to_address>
**suggestion:** Consider using a value receiver for String() instead of a pointer receiver.
Since String() does not modify deduperAction, defining it with a value receiver is more idiomatic and avoids unnecessary pointer usage.
</issue_to_address>
### Comment 2
<location> `sensor/common/networkflow/manager/manager_process_sending_test.go:238` </location>
<code_context>
+ b.T().Logf("proc deduper: (%s)", dedupers[updatecomputer.ProcessEnrichedEntity].ElementsString(";"))
+}
+
+func (b *sendNetflowsSuite) getUpdates(num int) ([]*storage.ProcessListeningOnPortFromSensor, []*storage.NetworkEndpoint) {
+ p := make([]*storage.ProcessListeningOnPortFromSensor, 0)
+ e := make([]*storage.NetworkEndpoint, 0)
</code_context>
<issue_to_address>
**issue (bug_risk):** Potential issue with range usage in getUpdates helper.
Since 'num' is an integer, replace 'for range num' with 'for i := 0; i < num; i++' to ensure correct iteration.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
Images are ready for the commit at ae317c4. To use with deploy scripts, first |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #16901 +/- ##
==========================================
- Coverage 48.79% 48.79% -0.01%
==========================================
Files 2711 2712 +1
Lines 202284 202328 +44
==========================================
+ Hits 98712 98732 +20
- Misses 95793 95813 +20
- Partials 7779 7783 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
484ae5e to
1a2b56c
Compare
There was a problem hiding this comment.
Hey there - I've reviewed your changes - here's some feedback:
- In the Legacy implementation, OnSuccessfulSendEndpoints and OnSuccessfulSendProcesses appear to write to enrichedConnsLastSentState—verify they’re updating the correct endpoint and process state maps instead of overriding connection state.
- When replacing a process on an existing endpoint, consider emitting a close event for the old process (instead of silently dropping it) so Central’s state doesn’t retain stale listeners.
- ComputeUpdatedEndpointsAndProcesses is handling multiple responsibilities; breaking out the transition and deduper logic into smaller helper functions would make it more readable and easier to test.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In the Legacy implementation, OnSuccessfulSendEndpoints and OnSuccessfulSendProcesses appear to write to enrichedConnsLastSentState—verify they’re updating the correct endpoint and process state maps instead of overriding connection state.
- When replacing a process on an existing endpoint, consider emitting a close event for the old process (instead of silently dropping it) so Central’s state doesn’t retain stale listeners.
- ComputeUpdatedEndpointsAndProcesses is handling multiple responsibilities; breaking out the transition and deduper logic into smaller helper functions would make it more readable and easier to test.
## Individual Comments
### Comment 1
<location> `sensor/common/networkflow/updatecomputer/legacy.go:107` </location>
<code_context>
- if currentProcesses != nil {
- l.enrichedProcessesLastSentState = maps.Clone(currentProcesses)
+// OnSuccessfulSendProcesses contains actions that should be executed after successful sending of processesListening updates to Central.
+func (l *Legacy) OnSuccessfulSendProcesses(enrichedEndpointsProcesses map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) {
+ if enrichedEndpointsProcesses != nil {
+ l.lastSentStateMutex.Lock()
</code_context>
<issue_to_address>
**issue (bug_risk):** OnSuccessfulSendProcesses in Legacy may panic if ProcessListening is nil.
Add a nil check for withClose.ProcessListening before dereferencing to prevent potential panics.
</issue_to_address>
### Comment 2
<location> `sensor/common/networkflow/updatecomputer/update_computer_test.go:471` </location>
<code_context>
+ expectNumUpdatesProc: map[string]int{implLegacy: 0, implTransitionBased: 0},
+ },
+ // Process-specific cases
+ "Should replace process on two open-endpoint messages": {
+ initialMapping: map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose{
+ ep1: {
</code_context>
<issue_to_address>
**suggestion (testing):** Suggestion to add a test for process closure when endpoint is replaced.
Please add a test case where the endpoint is closed after replacing the process, to confirm the deduper removes both the process and the endpoint as expected.
</issue_to_address>
### Comment 3
<location> `sensor/common/networkflow/manager/manager_process_sending_test.go:63` </location>
<code_context>
+ },
+ },
+ },
+ "open-e1p1 followed by open-e1p2 should not keep p1 in deduper (replacing behavior)": {
+ events: []event{
+ {
</code_context>
<issue_to_address>
**suggestion (testing):** Suggestion to add a test for rapid process flapping on the same endpoint.
Add a test simulating rapid endpoint changes to verify the deduper correctly handles frequent replacements and does not leak or miss updates.
Suggested implementation:
```golang
"open-e1p1 followed by open-e1p2 should not keep p1 in deduper (replacing behavior)": {
events: []event{
{
description: "Open endpoint e1 with new process p1",
input: e1p1open,
expectedNumContainerLookups: 1,
expectedNumUpdatesEndpoint: 1,
expectedNumUpdatesProcess: 1,
expectedDeduperState: map[string]string{
e1p1open.endpointIndicator(deploymentID).Key(indicator.HashingAlgoHash): e1p1open.processListeningIndicator().Key(indicator.HashingAlgoHash),
},
expectedUpdatedProcesses: &p1,
},
},
},
"rapid endpoint flapping should not leak or miss updates": {
events: []event{
{
description: "Open endpoint e1 with process p1",
input: e1p1open,
expectedNumContainerLookups: 1,
expectedNumUpdatesEndpoint: 1,
expectedNumUpdatesProcess: 1,
expectedDeduperState: map[string]string{
e1p1open.endpointIndicator(deploymentID).Key(indicator.HashingAlgoHash): e1p1open.processListeningIndicator().Key(indicator.HashingAlgoHash),
},
expectedUpdatedProcesses: &p1,
},
{
description: "Close endpoint e1 with process p1",
input: e1p1closed,
expectedNumContainerLookups: 1,
expectedNumUpdatesEndpoint: 1,
expectedNumUpdatesProcess: 1,
expectedDeduperState: map[string]string{},
expectedUpdatedProcesses: &p1,
},
{
description: "Open endpoint e1 with process p2",
input: e1p2open,
expectedNumContainerLookups: 1,
expectedNumUpdatesEndpoint: 1,
expectedNumUpdatesProcess: 1,
expectedDeduperState: map[string]string{
e1p2open.endpointIndicator(deploymentID).Key(indicator.HashingAlgoHash): e1p2open.processListeningIndicator().Key(indicator.HashingAlgoHash),
},
expectedUpdatedProcesses: &p2,
},
{
description: "Close endpoint e1 with process p2",
input: e1p2closed,
expectedNumContainerLookups: 1,
expectedNumUpdatesEndpoint: 1,
expectedNumUpdatesProcess: 1,
expectedDeduperState: map[string]string{},
expectedUpdatedProcesses: &p2,
},
{
description: "Open endpoint e1 with process p3",
input: e1p3open,
expectedNumContainerLookups: 1,
expectedNumUpdatesEndpoint: 1,
expectedNumUpdatesProcess: 1,
expectedDeduperState: map[string]string{
e1p3open.endpointIndicator(deploymentID).Key(indicator.HashingAlgoHash): e1p3open.processListeningIndicator().Key(indicator.HashingAlgoHash),
},
expectedUpdatedProcesses: &p3,
},
{
description: "Close endpoint e1 with process p3",
input: e1p3closed,
expectedNumContainerLookups: 1,
expectedNumUpdatesEndpoint: 1,
expectedNumUpdatesProcess: 1,
expectedDeduperState: map[string]string{},
expectedUpdatedProcesses: &p3,
},
},
},
```
You must ensure that the test data for `e1p2open`, `e1p2closed`, `e1p3open`, `e1p3closed`, `p2`, and `p3` are defined in your test setup, similar to how `e1p1open`, `e1p1closed`, and `p1` are defined.
</issue_to_address>
### Comment 4
<location> `sensor/common/networkflow/manager/enrichment_test.go:296` </location>
<code_context>
- map[indicator.ProcessListening]timestamp.MicroTS)
+ prePopulateData func(*testing.T, map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose)
}{
"Fresh endpoint with no process info should yield result EnrichmentResultSuccess for Network Graph and EnrichmentResultInvalidInput for PLOP": {
setupEndpoint: func() (*containerEndpoint, *connStatus) {
</code_context>
<issue_to_address>
**suggestion (testing):** Suggestion to add a test for endpoint with nil process after closure.
Please add a test to verify that closing an endpoint with a nil process does not leave stale entries in enrichment or deduplication logic.
Suggested implementation:
```golang
}{
"Fresh endpoint with no process info should yield result EnrichmentResultSuccess for Network Graph and EnrichmentResultInvalidInput for PLOP": {
setupEndpoint: func() (*containerEndpoint, *connStatus) {
expectedResultNG: EnrichmentResultSuccess,
expectedResultPLOP: EnrichmentResultSuccess,
expectedReasonNG: EnrichmentReasonEpDuplicate,
prePopulateData: func(t *testing.T, data map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) {
// Pre-populate with newer timestamp to trigger duplicate detection
endpointIndicator := indicator.ContainerEndpoint{
Entity: networkgraph.EntityForDeployment("test-deployment"),
Port: 80,
}
data[endpointIndicator] = &indicator.ProcessListeningWithClose{
Process: nil,
Closed: false,
}
},
},
"Endpoint with nil process after closure should not leave stale entries in enrichment or deduplication": {
setupEndpoint: func() (*containerEndpoint, *connStatus) {
endpoint := &containerEndpoint{
indicator: indicator.ContainerEndpoint{
Entity: networkgraph.EntityForDeployment("test-deployment"),
Port: 8080,
},
process: nil,
}
status := &connStatus{
closed: true,
}
return endpoint, status
},
expectedResultNG: EnrichmentResultInvalidInput,
expectedResultPLOP: EnrichmentResultInvalidInput,
expectedReasonNG: EnrichmentReasonEpNoProcess,
prePopulateData: func(t *testing.T, data map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) {
// Simulate closure with nil process
endpointIndicator := indicator.ContainerEndpoint{
Entity: networkgraph.EntityForDeployment("test-deployment"),
Port: 8080,
}
data[endpointIndicator] = &indicator.ProcessListeningWithClose{
Process: nil,
Closed: true,
}
},
verifyCleanup: func(t *testing.T, enrichmentMap map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) {
endpointIndicator := indicator.ContainerEndpoint{
Entity: networkgraph.EntityForDeployment("test-deployment"),
Port: 8080,
}
_, exists := enrichmentMap[endpointIndicator]
if exists {
t.Errorf("Stale entry found for endpoint with nil process after closure")
}
},
},
```
- You may need to ensure that the test runner invokes `verifyCleanup` after the enrichment logic runs, passing the relevant map or state.
- If your test harness does not support a `verifyCleanup` function, add assertions directly after the enrichment logic in the test body.
- Adjust the endpoint and port values if your test setup uses different conventions.
</issue_to_address>
### Comment 5
<location> `sensor/common/networkflow/updatecomputer/transition_based.go:405` </location>
<code_context>
- eps map[indicator.ContainerEndpoint]timestamp.MicroTS,
- procs map[indicator.ProcessListening]timestamp.MicroTS,
-) {
+func (c *TransitionBased) OnSuccessfulSendConnections(conns map[indicator.NetworkConn]timestamp.MicroTS) {
if conns != nil {
c.cachedUpdatesConn = make([]*storage.NetworkFlow, 0)
</code_context>
<issue_to_address>
**issue (complexity):** Consider consolidating the three cache-reset methods into a single OnSuccessfulSend method to centralize logic and reduce duplication.
```suggestion
// The three nearly‐identical methods OnSuccessfulSendConnections, OnSuccessfulSendEndpoints and
// OnSuccessfulSendProcesses can be collapsed into the existing OnSuccessfulSend. This removes
// duplication and keeps all cache‐reset logic in one place.
// 1) Delete these methods entirely:
//
// func (c *TransitionBased) OnSuccessfulSendConnections(conns map[indicator.NetworkConn]timestamp.MicroTS) { … }
// func (c *TransitionBased) OnSuccessfulSendEndpoints(enrichedEndpointsProcesses map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) { … }
// func (c *TransitionBased) OnSuccessfulSendProcesses(enrichedEndpointsProcesses map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) { … }
//
// 2) Update callers to use the single signature:
//
// func (c *TransitionBased) OnSuccessfulSend(
// conns map[indicator.NetworkConn]timestamp.MicroTS,
// eps map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose,
// procs map[indicator.ProcessListening]timestamp.MicroTS,
// ) {
// if conns != nil {
// c.cachedUpdatesConn = c.cachedUpdatesConn[:0]
// }
// if eps != nil {
// c.cachedUpdatesEp = c.cachedUpdatesEp[:0]
// }
// if procs != nil {
// c.cachedUpdatesProc = c.cachedUpdatesProc[:0]
// }
// }
//
// 3) If you still need the old entrypoints, simply delegate:
//
// func (c *TransitionBased) OnSuccessfulSendConnections(conns map[indicator.NetworkConn]timestamp.MicroTS) {
// c.OnSuccessfulSend(conns, nil, nil)
// }
//
// func (c *TransitionBased) OnSuccessfulSendEndpoints(eps map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) {
// c.OnSuccessfulSend(nil, eps, nil)
// }
//
// func (c *TransitionBased) OnSuccessfulSendProcesses(procs map[indicator.ProcessListening]timestamp.MicroTS) {
// c.OnSuccessfulSend(nil, nil, procs)
// }
//
// This centralizes the slice‐reset logic and makes future changes easier.
```
</issue_to_address>
### Comment 6
<location> `sensor/common/networkflow/updatecomputer/legacy.go:48` </location>
<code_context>
}
-func (l *Legacy) ComputeUpdatedEndpoints(current map[indicator.ContainerEndpoint]timestamp.MicroTS) []*storage.NetworkEndpoint {
+func (l *Legacy) ComputeUpdatedEndpointsAndProcesses(enrichedEndpointsProcesses map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) ([]*storage.NetworkEndpoint, []*storage.ProcessListeningOnPortFromSensor) {
+ currentEps := make(map[indicator.ContainerEndpoint]timestamp.MicroTS, len(l.enrichedEndpointsLastSentState))
+ currentProc := make(map[indicator.ProcessListening]timestamp.MicroTS)
</code_context>
<issue_to_address>
**issue (complexity):** Consider introducing generic helper functions to unify and simplify repeated locking and map update patterns.
```suggestion
You can collapse the repeated lock+computeUpdates and lock+clone patterns by introducing two small helpers (using Go-1.18 generics):
1) A `withUpdates` helper to DRY-up all `ComputeUpdated…` methods:
```go
// withUpdates wraps computeUpdates under the same RLock logic.
func (l *Legacy) withUpdates[T comparable, P any](
current map[T]timestamp.MicroTS,
lastSent map[T]timestamp.MicroTS,
toProto func(T, timestamp.MicroTS) P,
) []P {
return concurrency.WithRLock1(&l.lastSentStateMutex, func() []P {
return computeUpdates(current, lastSent, toProto)
})
}
// then each ComputeUpdated… becomes a one-liner:
func (l *Legacy) ComputeUpdatedConns(cur map[indicator.NetworkConn]timestamp.MicroTS) []*storage.NetworkFlow {
return l.withUpdates(cur, l.enrichedConnsLastSentState,
func(c indicator.NetworkConn, ts timestamp.MicroTS) *storage.NetworkFlow {
return (&c).ToProto(ts)
})
}
func (l *Legacy) ComputeUpdatedEndpoints(cur map[indicator.ContainerEndpoint]timestamp.MicroTS) []*storage.NetworkEndpoint {
return l.withUpdates(cur, l.enrichedEndpointsLastSentState,
func(e indicator.ContainerEndpoint, ts timestamp.MicroTS) *storage.NetworkEndpoint {
return (&e).ToProto(ts)
})
}
// for processes you still need the env-guard:
func (l *Legacy) ComputeUpdatedProcesses(cur map[indicator.ProcessListening]timestamp.MicroTS) []*storage.ProcessListeningOnPortFromSensor {
if !env.ProcessesListeningOnPort.BooleanSetting() {
if len(cur) > 0 {
logging.GetRateLimitedLogger().Warn( /* … */ )
}
return nil
}
return l.withUpdates(cur, l.enrichedProcessesLastSentState,
func(p indicator.ProcessListening, ts timestamp.MicroTS) *storage.ProcessListeningOnPortFromSensor {
return (&p).ToProto(ts)
})
}
```
2) An `updateState` helper to unify all `OnSuccessfulSend…` methods:
```go
// updateState clones any map[T]TS under a lock.
func (l *Legacy) updateState[T comparable](
dest *map[T]timestamp.MicroTS,
current map[T]timestamp.MicroTS,
) {
if current == nil {
return
}
l.lastSentStateMutex.Lock()
defer l.lastSentStateMutex.Unlock()
*dest = maps.Clone(current)
}
// usage in the three OnSuccessfulSend… methods:
func (l *Legacy) OnSuccessfulSendConnections(cur map[indicator.NetworkConn]timestamp.MicroTS) {
l.updateState(&l.enrichedConnsLastSentState, cur)
}
func (l *Legacy) OnSuccessfulSendEndpoints(endps map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) {
m := make(map[indicator.ContainerEndpoint]timestamp.MicroTS, len(endps))
for ep, wc := range endps {
m[ep] = wc.LastSeen
}
l.updateState(&l.enrichedEndpointsLastSentState, m)
}
func (l *Legacy) OnSuccessfulSendProcesses(endps map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) {
m := make(map[indicator.ProcessListening]timestamp.MicroTS, len(endps))
for _, wc := range endps {
if wc.ProcessListening != nil {
m[*wc.ProcessListening] = wc.LastSeen
}
}
l.updateState(&l.enrichedProcessesLastSentState, m)
}
```
This removes the boilerplate lock/clone and split logic without changing any behavior.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
Images are ready for the commit at ebb3641. To use with deploy scripts, first |
This comment was marked as off-topic.
This comment was marked as off-topic.
I believe this is now accurate. The process deduper was actually not removed completely but merged with the endpoints deduper as those entities should be analyzed together.
This has been removed many commits ago.
This function has been removed many commits ago. |
|
/retest |
2 similar comments
|
/retest |
|
/retest |
|
Images are ready for the commit at 638b6d4. To use with deploy scripts, first |
ebb3641 to
638b6d4
Compare
|
Images are ready for the commit at 0264519. To use with deploy scripts, first |
lvalerom
left a comment
There was a problem hiding this comment.
Looks really good. I didn't see anything blocking, most of my comments are clarifications or for test coverage.
|
@lvalerom I resolved the comments and will merge the PR provided you are OOO and the code-freeze is approaching. |
|
Unfortunately, I need to rebase due to #16915 :( |
0264519 to
26fee46
Compare
|
Images are ready for the commit at 26fee46. To use with deploy scripts, first |
|
@vikin91: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
|
/retest |




AI on the contents of this PR
ProcessListeningWithTimestamp), switching maps everywhere, wiring ComputeUpdatedEndpointsAndProcesses, addingsetLastSeenOrAdd, adjusting call sites & metrics hooks. (Minimal logic beyond safe plumbing.) transition_based.go: atomic update of endpoints+processes, close/replace semantics, deduper consistency.Description
Fixes an issue where the process deduper would grow indefinitely by properly coupling endpoint and process lifecycles. This change ensures that when an endpoint closes or changes its associated process, the corresponding process entries are correctly managed in the deduper, preventing memory leaks and maintaining accurate network flow state.
Problem
The previous implementation had separate
ComputeUpdatedEndpointsandComputeUpdatedProcessesmethods, which could led to Process deduper growing indefinitely when new processes were reported for an endpoint without being closed in between, i.e., this scenario:Solution
I replaced separate
ComputeUpdatedEndpointsandComputeUpdatedProcessesmethods withComputeUpdatedEndpointsAndProcessesto process endpoints and their associated processes atomically.Note, that in the scenario mentioned in the "Problem" section, the current behavior will be:
This has been discussed with Collector - most probably, in this case both processes are listening in parallel. Central will close the process1 when endpoint1 closes in the future.
Technical changes
The enricher and
ComputeUpdatedEndpointsAndProcessesoperates now on a mapmap[indicator.ContainerEndpoint]*indicator.ProcessListeningWithTimestampthat maps an endpoint to a process and the close timestamp.Exemplary values:
Testing
Added test scenarios with wide coverage:
TestUpdateComputer_ProcessListeningdemonstrating the lifecycle scenarios for endpoints and processes. This is also the regression test for the deduper issue. This test leverages theTestableUpdateComputerto peek into the deduper states.TestComputeUpdatedEndpointsAndProcesseswhich is an package-level test that checks the TransitionBased update computer for the same issues asTestUpdateComputer_ProcessListeningUser-facing documentation
Testing and quality
Automated testing
e2eintegration testsHow I validated my change
and looking at the deduper size metrics.
Long-running with fake workload (~2 days long run)
Sensor memory (ignore the yellow chart, it is from a previous run)

Sizes of collections used in enrichment

Long-running cluster with Berserker load from branch
jv-ROX-28976-optimize-berserker-load-in-long-running-cluster(~1 day long run)(The gap on the chart was caused due to monitoring deployment going OOM. Giving it more memory fixed the issue)
