Skip to content

ROX-30941: Enrich endpoints and processes together#16901

Merged
vikin91 merged 12 commits intomasterfrom
piotr/ROX-30941-no-deduping-processes
Oct 1, 2025
Merged

ROX-30941: Enrich endpoints and processes together#16901
vikin91 merged 12 commits intomasterfrom
piotr/ROX-30941-no-deduping-processes

Conversation

@vikin91
Copy link
Contributor

@vikin91 vikin91 commented Sep 18, 2025

AI on the contents of this PR

  • Boilerplate (~260 lines changed): type rename/add (ProcessListeningWithTimestamp), switching maps everywhere, wiring ComputeUpdatedEndpointsAndProcesses, adding setLastSeenOrAdd, adjusting call sites & metrics hooks. (Minimal logic beyond safe plumbing.) 
  • New implementation (~350 lines changed): the real change lives in transition_based.go: atomic update of endpoints+processes, close/replace semantics, deduper consistency.
  • Tests (~687 lines changed): regression coverage for the deduper leak and lifecycle scenarios both at updatecomputer and manager layers.

Description

Fixes an issue where the process deduper would grow indefinitely by properly coupling endpoint and process lifecycles. This change ensures that when an endpoint closes or changes its associated process, the corresponding process entries are correctly managed in the deduper, preventing memory leaks and maintaining accurate network flow state.

Problem

The previous implementation had separate ComputeUpdatedEndpoints and ComputeUpdatedProcesses methods, which could led to Process deduper growing indefinitely when new processes were reported for an endpoint without being closed in between, i.e., this scenario:

  1. open endpoint1 with process1, then
  2. open endpoint1 with process2,
  3. resulted in process1 staying in the deduper forever.

Solution

I replaced separate ComputeUpdatedEndpoints and ComputeUpdatedProcesses methods with ComputeUpdatedEndpointsAndProcesses to process endpoints and their associated processes atomically.

  • Enforced 1:1 relationship - each endpoint can have 0 or 1 process (assumption for this work - documented in Jira)
  • Proper lifecycle coupling - when an endpoint closes, its process is automatically closed; when a new process opens on an existing endpoint, the previous process is removed from the deduper. This is covered by newly added tests.

Note, that in the scenario mentioned in the "Problem" section, the current behavior will be:

  1. open endpoint1 with process1 =>
    • 2 messages to Central:
      • one with open endpoint,
      • one with open process1
  2. open endpoint1 with process2 =>
    • 1 message to Central:
      • with open process2;
      • there is no automated close of process1

This has been discussed with Collector - most probably, in this case both processes are listening in parallel. Central will close the process1 when endpoint1 closes in the future.

Technical changes

The enricher and ComputeUpdatedEndpointsAndProcesses operates now on a map map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithTimestamp that maps an endpoint to a process and the close timestamp.

// before
endpoints map[indicator.ContainerEndpoint]timestamp.MicroTS
processes map[indicator.ProcessListening]timestamp.MicroTS

// after 
endpointsAndProcesses map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithTimestamp 

Exemplary values:

// Endpoint `ep1` with timestamp `now` without a process
value1 := map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithTimestamp{
    ep1: {
        ProcessListening: nil,
        LastSeen:         now,
    },
}

// Endpoint `ep1` with a process. Both with timestamp `closedInThePast`.
value2 := map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithTimestamp{
    ep1: {
        ProcessListening: &indicator.ProcessListening{
            Process: indicator.ProcessInfo{
                ProcessName: "proc-name",
                ProcessArgs: "proc-args",
                ProcessExec: "proc-exec",
            },
            PodID:         "",
            ContainerName: "",
            DeploymentID:  "",
            PodUID:        "",
            Namespace:     "",
            Protocol:      0,
            Port:          0,
        },
        LastSeen: closedInThePast,
    },
}

Testing

Added test scenarios with wide coverage:

  • TestUpdateComputer_ProcessListening demonstrating the lifecycle scenarios for endpoints and processes. This is also the regression test for the deduper issue. This test leverages the TestableUpdateComputer to peek into the deduper states.
  • TestComputeUpdatedEndpointsAndProcesses which is an package-level test that checks the TransitionBased update computer for the same issues as TestUpdateComputer_ProcessListening

User-facing documentation

Testing and quality

  • the change is production ready: the change is GA, or otherwise the functionality is gated by a feature flag
  • CI results are inspected

Automated testing

  • added unit tests
  • added e2e integration tests
  • added regression tests
  • added compatibility tests
  • modified existing tests

How I validated my change

  • Tests executed by CI
  • Running it on the Long-running with
    • Fake-workloads
    • Jouko's new Berserker load
      and looking at the deduper size metrics.

Long-running with fake workload (~2 days long run)

Sensor memory (ignore the yellow chart, it is from a previous run)
Screenshot 2025-09-25 at 09 16 20

Screenshot 2025-09-25 at 09 17 10

Sizes of collections used in enrichment
Screenshot 2025-09-25 at 09 19 44

Long-running cluster with Berserker load from branch jv-ROX-28976-optimize-berserker-load-in-long-running-cluster (~1 day long run)

(The gap on the chart was caused due to monitoring deployment going OOM. Giving it more memory fixed the issue)
Screenshot 2025-09-25 at 09 10 53

Screenshot 2025-09-25 at 09 12 10 Screenshot 2025-09-25 at 09 11 45

@vikin91
Copy link
Contributor Author

vikin91 commented Sep 18, 2025

@openshift-ci
Copy link

openshift-ci bot commented Sep 18, 2025

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • The PR title and description should be updated to accurately describe the removal of the process deduper and the related test refactoring, since the current wording doesn’t match the actual changes.
  • Exposing TransitionBased.GetState for tests leaks internal state into the public API—consider moving it to a test‐only helper or interface to keep production code clean.
  • The new info‐level logging in computeUpdatedEntitiesNoPast will be very verbose in production; it’s better suited for debug level to avoid flooding the logs.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The PR title and description should be updated to accurately describe the removal of the process deduper and the related test refactoring, since the current wording doesn’t match the actual changes.
- Exposing TransitionBased.GetState for tests leaks internal state into the public API—consider moving it to a test‐only helper or interface to keep production code clean.
- The new info‐level logging in computeUpdatedEntitiesNoPast will be very verbose in production; it’s better suited for debug level to avoid flooding the logs.

## Individual Comments

### Comment 1
<location> `sensor/common/networkflow/updatecomputer/transition_based.go:23` </location>
<code_context>

 type deduperAction int

+func (d *deduperAction) String() string {
+	switch *d {
+	case deduperActionAdd:
</code_context>

<issue_to_address>
**suggestion:** Consider using a value receiver for String() instead of a pointer receiver.

Since String() does not modify deduperAction, defining it with a value receiver is more idiomatic and avoids unnecessary pointer usage.
</issue_to_address>

### Comment 2
<location> `sensor/common/networkflow/manager/manager_process_sending_test.go:238` </location>
<code_context>
+	b.T().Logf("proc deduper: (%s)", dedupers[updatecomputer.ProcessEnrichedEntity].ElementsString(";"))
+}
+
+func (b *sendNetflowsSuite) getUpdates(num int) ([]*storage.ProcessListeningOnPortFromSensor, []*storage.NetworkEndpoint) {
+	p := make([]*storage.ProcessListeningOnPortFromSensor, 0)
+	e := make([]*storage.NetworkEndpoint, 0)
</code_context>

<issue_to_address>
**issue (bug_risk):** Potential issue with range usage in getUpdates helper.

Since 'num' is an integer, replace 'for range num' with 'for i := 0; i < num; i++' to ensure correct iteration.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@rhacs-bot
Copy link
Contributor

rhacs-bot commented Sep 18, 2025

Images are ready for the commit at ae317c4.

To use with deploy scripts, first export MAIN_IMAGE_TAG=4.9.x-798-gae317c4c8a.

@codecov
Copy link

codecov bot commented Sep 18, 2025

Codecov Report

❌ Patch coverage is 76.76768% with 46 lines in your changes missing coverage. Please review.
✅ Project coverage is 48.79%. Comparing base (85aa9d2) to head (26fee46).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
...mon/networkflow/updatecomputer/transition_based.go 70.40% 37 Missing ⚠️
...ensor/common/networkflow/updatecomputer/testing.go 0.00% 5 Missing ⚠️
...mon/networkflow/manager/manager_enrich_endpoint.go 85.71% 2 Missing and 2 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master   #16901      +/-   ##
==========================================
- Coverage   48.79%   48.79%   -0.01%     
==========================================
  Files        2711     2712       +1     
  Lines      202284   202328      +44     
==========================================
+ Hits        98712    98732      +20     
- Misses      95793    95813      +20     
- Partials     7779     7783       +4     
Flag Coverage Δ
go-unit-tests 48.79% <76.76%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@vikin91 vikin91 changed the title ROX-30941: Remove deduper for processes ROX-30941: Enrich endpoints and processes together Sep 19, 2025
@vikin91 vikin91 force-pushed the piotr/ROX-30941-no-deduping-processes branch from 484ae5e to 1a2b56c Compare September 23, 2025 15:34
@vikin91 vikin91 marked this pull request as ready for review September 23, 2025 15:45
@vikin91 vikin91 requested a review from a team as a code owner September 23, 2025 15:45
@vikin91 vikin91 added ci-all-qa-tests Tells CI to run all API tests (not just BAT). do-not-merge/work-in-progress labels Sep 23, 2025
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • In the Legacy implementation, OnSuccessfulSendEndpoints and OnSuccessfulSendProcesses appear to write to enrichedConnsLastSentState—verify they’re updating the correct endpoint and process state maps instead of overriding connection state.
  • When replacing a process on an existing endpoint, consider emitting a close event for the old process (instead of silently dropping it) so Central’s state doesn’t retain stale listeners.
  • ComputeUpdatedEndpointsAndProcesses is handling multiple responsibilities; breaking out the transition and deduper logic into smaller helper functions would make it more readable and easier to test.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In the Legacy implementation, OnSuccessfulSendEndpoints and OnSuccessfulSendProcesses appear to write to enrichedConnsLastSentState—verify they’re updating the correct endpoint and process state maps instead of overriding connection state.
- When replacing a process on an existing endpoint, consider emitting a close event for the old process (instead of silently dropping it) so Central’s state doesn’t retain stale listeners.
- ComputeUpdatedEndpointsAndProcesses is handling multiple responsibilities; breaking out the transition and deduper logic into smaller helper functions would make it more readable and easier to test.

## Individual Comments

### Comment 1
<location> `sensor/common/networkflow/updatecomputer/legacy.go:107` </location>
<code_context>
-	if currentProcesses != nil {
-		l.enrichedProcessesLastSentState = maps.Clone(currentProcesses)
+// OnSuccessfulSendProcesses contains actions that should be executed after successful sending of processesListening updates to Central.
+func (l *Legacy) OnSuccessfulSendProcesses(enrichedEndpointsProcesses map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) {
+	if enrichedEndpointsProcesses != nil {
+		l.lastSentStateMutex.Lock()
</code_context>

<issue_to_address>
**issue (bug_risk):** OnSuccessfulSendProcesses in Legacy may panic if ProcessListening is nil.

Add a nil check for withClose.ProcessListening before dereferencing to prevent potential panics.
</issue_to_address>

### Comment 2
<location> `sensor/common/networkflow/updatecomputer/update_computer_test.go:471` </location>
<code_context>
+			expectNumUpdatesProc: map[string]int{implLegacy: 0, implTransitionBased: 0},
+		},
+		// Process-specific cases
+		"Should replace process on two open-endpoint messages": {
+			initialMapping: map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose{
+				ep1: {
</code_context>

<issue_to_address>
**suggestion (testing):** Suggestion to add a test for process closure when endpoint is replaced.

Please add a test case where the endpoint is closed after replacing the process, to confirm the deduper removes both the process and the endpoint as expected.
</issue_to_address>

### Comment 3
<location> `sensor/common/networkflow/manager/manager_process_sending_test.go:63` </location>
<code_context>
+				},
+			},
+		},
+		"open-e1p1 followed by open-e1p2 should not keep p1 in deduper (replacing behavior)": {
+			events: []event{
+				{
</code_context>

<issue_to_address>
**suggestion (testing):** Suggestion to add a test for rapid process flapping on the same endpoint.

Add a test simulating rapid endpoint changes to verify the deduper correctly handles frequent replacements and does not leak or miss updates.

Suggested implementation:

```golang
		"open-e1p1 followed by open-e1p2 should not keep p1 in deduper (replacing behavior)": {
			events: []event{
				{
					description:                 "Open endpoint e1 with new process p1",
					input:                       e1p1open,
					expectedNumContainerLookups: 1,
					expectedNumUpdatesEndpoint:  1,
					expectedNumUpdatesProcess:   1,
					expectedDeduperState: map[string]string{
						e1p1open.endpointIndicator(deploymentID).Key(indicator.HashingAlgoHash): e1p1open.processListeningIndicator().Key(indicator.HashingAlgoHash),
					},
					expectedUpdatedProcesses: &p1,
				},
			},
		},

		"rapid endpoint flapping should not leak or miss updates": {
			events: []event{
				{
					description:                 "Open endpoint e1 with process p1",
					input:                       e1p1open,
					expectedNumContainerLookups: 1,
					expectedNumUpdatesEndpoint:  1,
					expectedNumUpdatesProcess:   1,
					expectedDeduperState: map[string]string{
						e1p1open.endpointIndicator(deploymentID).Key(indicator.HashingAlgoHash): e1p1open.processListeningIndicator().Key(indicator.HashingAlgoHash),
					},
					expectedUpdatedProcesses: &p1,
				},
				{
					description:                 "Close endpoint e1 with process p1",
					input:                       e1p1closed,
					expectedNumContainerLookups: 1,
					expectedNumUpdatesEndpoint:  1,
					expectedNumUpdatesProcess:   1,
					expectedDeduperState:        map[string]string{},
					expectedUpdatedProcesses:    &p1,
				},
				{
					description:                 "Open endpoint e1 with process p2",
					input:                       e1p2open,
					expectedNumContainerLookups: 1,
					expectedNumUpdatesEndpoint:  1,
					expectedNumUpdatesProcess:   1,
					expectedDeduperState: map[string]string{
						e1p2open.endpointIndicator(deploymentID).Key(indicator.HashingAlgoHash): e1p2open.processListeningIndicator().Key(indicator.HashingAlgoHash),
					},
					expectedUpdatedProcesses: &p2,
				},
				{
					description:                 "Close endpoint e1 with process p2",
					input:                       e1p2closed,
					expectedNumContainerLookups: 1,
					expectedNumUpdatesEndpoint:  1,
					expectedNumUpdatesProcess:   1,
					expectedDeduperState:        map[string]string{},
					expectedUpdatedProcesses:    &p2,
				},
				{
					description:                 "Open endpoint e1 with process p3",
					input:                       e1p3open,
					expectedNumContainerLookups: 1,
					expectedNumUpdatesEndpoint:  1,
					expectedNumUpdatesProcess:   1,
					expectedDeduperState: map[string]string{
						e1p3open.endpointIndicator(deploymentID).Key(indicator.HashingAlgoHash): e1p3open.processListeningIndicator().Key(indicator.HashingAlgoHash),
					},
					expectedUpdatedProcesses: &p3,
				},
				{
					description:                 "Close endpoint e1 with process p3",
					input:                       e1p3closed,
					expectedNumContainerLookups: 1,
					expectedNumUpdatesEndpoint:  1,
					expectedNumUpdatesProcess:   1,
					expectedDeduperState:        map[string]string{},
					expectedUpdatedProcesses:    &p3,
				},
			},
		},

```

You must ensure that the test data for `e1p2open`, `e1p2closed`, `e1p3open`, `e1p3closed`, `p2`, and `p3` are defined in your test setup, similar to how `e1p1open`, `e1p1closed`, and `p1` are defined.
</issue_to_address>

### Comment 4
<location> `sensor/common/networkflow/manager/enrichment_test.go:296` </location>
<code_context>
-			map[indicator.ProcessListening]timestamp.MicroTS)
+		prePopulateData    func(*testing.T, map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose)
 	}{
 		"Fresh endpoint with no process info should yield result EnrichmentResultSuccess for Network Graph and EnrichmentResultInvalidInput for PLOP": {
 			setupEndpoint: func() (*containerEndpoint, *connStatus) {
</code_context>

<issue_to_address>
**suggestion (testing):** Suggestion to add a test for endpoint with nil process after closure.

Please add a test to verify that closing an endpoint with a nil process does not leave stale entries in enrichment or deduplication logic.

Suggested implementation:

```golang
	}{
		"Fresh endpoint with no process info should yield result EnrichmentResultSuccess for Network Graph and EnrichmentResultInvalidInput for PLOP": {
			setupEndpoint: func() (*containerEndpoint, *connStatus) {
			expectedResultNG:   EnrichmentResultSuccess,
			expectedResultPLOP: EnrichmentResultSuccess,
			expectedReasonNG:   EnrichmentReasonEpDuplicate,
			prePopulateData: func(t *testing.T, data map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) {
				// Pre-populate with newer timestamp to trigger duplicate detection
				endpointIndicator := indicator.ContainerEndpoint{
					Entity:   networkgraph.EntityForDeployment("test-deployment"),
					Port:     80,
				}
				data[endpointIndicator] = &indicator.ProcessListeningWithClose{
					Process: nil,
					Closed:  false,
				}
			},
		},
		"Endpoint with nil process after closure should not leave stale entries in enrichment or deduplication": {
			setupEndpoint: func() (*containerEndpoint, *connStatus) {
				endpoint := &containerEndpoint{
					indicator: indicator.ContainerEndpoint{
						Entity: networkgraph.EntityForDeployment("test-deployment"),
						Port:   8080,
					},
					process: nil,
				}
				status := &connStatus{
					closed: true,
				}
				return endpoint, status
			},
			expectedResultNG:   EnrichmentResultInvalidInput,
			expectedResultPLOP: EnrichmentResultInvalidInput,
			expectedReasonNG:   EnrichmentReasonEpNoProcess,
			prePopulateData: func(t *testing.T, data map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) {
				// Simulate closure with nil process
				endpointIndicator := indicator.ContainerEndpoint{
					Entity:   networkgraph.EntityForDeployment("test-deployment"),
					Port:     8080,
				}
				data[endpointIndicator] = &indicator.ProcessListeningWithClose{
					Process: nil,
					Closed:  true,
				}
			},
			verifyCleanup: func(t *testing.T, enrichmentMap map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) {
				endpointIndicator := indicator.ContainerEndpoint{
					Entity:   networkgraph.EntityForDeployment("test-deployment"),
					Port:     8080,
				}
				_, exists := enrichmentMap[endpointIndicator]
				if exists {
					t.Errorf("Stale entry found for endpoint with nil process after closure")
				}
			},
		},

```

- You may need to ensure that the test runner invokes `verifyCleanup` after the enrichment logic runs, passing the relevant map or state.
- If your test harness does not support a `verifyCleanup` function, add assertions directly after the enrichment logic in the test body.
- Adjust the endpoint and port values if your test setup uses different conventions.
</issue_to_address>

### Comment 5
<location> `sensor/common/networkflow/updatecomputer/transition_based.go:405` </location>
<code_context>
-	eps map[indicator.ContainerEndpoint]timestamp.MicroTS,
-	procs map[indicator.ProcessListening]timestamp.MicroTS,
-) {
+func (c *TransitionBased) OnSuccessfulSendConnections(conns map[indicator.NetworkConn]timestamp.MicroTS) {
 	if conns != nil {
 		c.cachedUpdatesConn = make([]*storage.NetworkFlow, 0)
</code_context>

<issue_to_address>
**issue (complexity):** Consider consolidating the three cache-reset methods into a single OnSuccessfulSend method to centralize logic and reduce duplication.

```suggestion
// The three nearly‐identical methods OnSuccessfulSendConnections, OnSuccessfulSendEndpoints and
// OnSuccessfulSendProcesses can be collapsed into the existing OnSuccessfulSend. This removes
// duplication and keeps all cache‐reset logic in one place.

// 1) Delete these methods entirely:
//
//   func (c *TransitionBased) OnSuccessfulSendConnections(conns map[indicator.NetworkConn]timestamp.MicroTS) { … }
//   func (c *TransitionBased) OnSuccessfulSendEndpoints(enrichedEndpointsProcesses map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) { … }
//   func (c *TransitionBased) OnSuccessfulSendProcesses(enrichedEndpointsProcesses map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) { … }
//
// 2) Update callers to use the single signature:
//
//   func (c *TransitionBased) OnSuccessfulSend(
//       conns map[indicator.NetworkConn]timestamp.MicroTS,
//       eps map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose,
//       procs map[indicator.ProcessListening]timestamp.MicroTS,
//   ) {
//       if conns != nil {
//           c.cachedUpdatesConn = c.cachedUpdatesConn[:0]
//       }
//       if eps != nil {
//           c.cachedUpdatesEp = c.cachedUpdatesEp[:0]
//       }
//       if procs != nil {
//           c.cachedUpdatesProc = c.cachedUpdatesProc[:0]
//       }
//   }
//
// 3) If you still need the old entrypoints, simply delegate:
//
//   func (c *TransitionBased) OnSuccessfulSendConnections(conns map[indicator.NetworkConn]timestamp.MicroTS) {
//       c.OnSuccessfulSend(conns, nil, nil)
//   }
//
//   func (c *TransitionBased) OnSuccessfulSendEndpoints(eps map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) {
//       c.OnSuccessfulSend(nil, eps, nil)
//   }
//
//   func (c *TransitionBased) OnSuccessfulSendProcesses(procs map[indicator.ProcessListening]timestamp.MicroTS) {
//       c.OnSuccessfulSend(nil, nil, procs)
//   }
//
// This centralizes the slice‐reset logic and makes future changes easier.
```
</issue_to_address>

### Comment 6
<location> `sensor/common/networkflow/updatecomputer/legacy.go:48` </location>
<code_context>
 }

-func (l *Legacy) ComputeUpdatedEndpoints(current map[indicator.ContainerEndpoint]timestamp.MicroTS) []*storage.NetworkEndpoint {
+func (l *Legacy) ComputeUpdatedEndpointsAndProcesses(enrichedEndpointsProcesses map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) ([]*storage.NetworkEndpoint, []*storage.ProcessListeningOnPortFromSensor) {
+	currentEps := make(map[indicator.ContainerEndpoint]timestamp.MicroTS, len(l.enrichedEndpointsLastSentState))
+	currentProc := make(map[indicator.ProcessListening]timestamp.MicroTS)
</code_context>

<issue_to_address>
**issue (complexity):** Consider introducing generic helper functions to unify and simplify repeated locking and map update patterns.

```suggestion
You can collapse the repeated lock+computeUpdates and lock+clone patterns by introducing two small helpers (using Go-1.18 generics):

1) A `withUpdates` helper to DRY-up all `ComputeUpdated…` methods:

```go
// withUpdates wraps computeUpdates under the same RLock logic.
func (l *Legacy) withUpdates[T comparable, P any](
    current map[T]timestamp.MicroTS,
    lastSent map[T]timestamp.MicroTS,
    toProto func(T, timestamp.MicroTS) P,
) []P {
    return concurrency.WithRLock1(&l.lastSentStateMutex, func() []P {
        return computeUpdates(current, lastSent, toProto)
    })
}

// then each ComputeUpdated… becomes a one-liner:
func (l *Legacy) ComputeUpdatedConns(cur map[indicator.NetworkConn]timestamp.MicroTS) []*storage.NetworkFlow {
    return l.withUpdates(cur, l.enrichedConnsLastSentState,
        func(c indicator.NetworkConn, ts timestamp.MicroTS) *storage.NetworkFlow {
            return (&c).ToProto(ts)
        })
}

func (l *Legacy) ComputeUpdatedEndpoints(cur map[indicator.ContainerEndpoint]timestamp.MicroTS) []*storage.NetworkEndpoint {
    return l.withUpdates(cur, l.enrichedEndpointsLastSentState,
        func(e indicator.ContainerEndpoint, ts timestamp.MicroTS) *storage.NetworkEndpoint {
            return (&e).ToProto(ts)
        })
}

// for processes you still need the env-guard:
func (l *Legacy) ComputeUpdatedProcesses(cur map[indicator.ProcessListening]timestamp.MicroTS) []*storage.ProcessListeningOnPortFromSensor {
    if !env.ProcessesListeningOnPort.BooleanSetting() {
        if len(cur) > 0 {
            logging.GetRateLimitedLogger().Warn( /* … */ )
        }
        return nil
    }
    return l.withUpdates(cur, l.enrichedProcessesLastSentState,
        func(p indicator.ProcessListening, ts timestamp.MicroTS) *storage.ProcessListeningOnPortFromSensor {
            return (&p).ToProto(ts)
        })
}
```

2) An `updateState` helper to unify all `OnSuccessfulSend…` methods:

```go
// updateState clones any map[T]TS under a lock.
func (l *Legacy) updateState[T comparable](
    dest *map[T]timestamp.MicroTS,
    current map[T]timestamp.MicroTS,
) {
    if current == nil {
        return
    }
    l.lastSentStateMutex.Lock()
    defer l.lastSentStateMutex.Unlock()
    *dest = maps.Clone(current)
}

// usage in the three OnSuccessfulSend… methods:
func (l *Legacy) OnSuccessfulSendConnections(cur map[indicator.NetworkConn]timestamp.MicroTS) {
    l.updateState(&l.enrichedConnsLastSentState, cur)
}

func (l *Legacy) OnSuccessfulSendEndpoints(endps map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) {
    m := make(map[indicator.ContainerEndpoint]timestamp.MicroTS, len(endps))
    for ep, wc := range endps {
        m[ep] = wc.LastSeen
    }
    l.updateState(&l.enrichedEndpointsLastSentState, m)
}

func (l *Legacy) OnSuccessfulSendProcesses(endps map[indicator.ContainerEndpoint]*indicator.ProcessListeningWithClose) {
    m := make(map[indicator.ProcessListening]timestamp.MicroTS, len(endps))
    for _, wc := range endps {
        if wc.ProcessListening != nil {
            m[*wc.ProcessListening] = wc.LastSeen
        }
    }
    l.updateState(&l.enrichedProcessesLastSentState, m)
}
```

This removes the boilerplate lock/clone and split logic without changing any behavior.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@vikin91 vikin91 added the auto-retest PRs with this label will be automatically retested if prow checks fails label Sep 23, 2025
@rhacs-bot
Copy link
Contributor

rhacs-bot commented Sep 23, 2025

Images are ready for the commit at ebb3641.

To use with deploy scripts, first export MAIN_IMAGE_TAG=0.0.60-5-gebb36412b9.

@red-hat-konflux

This comment was marked as off-topic.

@vikin91
Copy link
Contributor Author

vikin91 commented Sep 24, 2025

  • The PR title and description should be updated to accurately describe the removal of the process deduper and the related test refactoring, since the current wording doesn’t match the actual changes.

I believe this is now accurate. The process deduper was actually not removed completely but merged with the endpoints deduper as those entities should be analyzed together.

  • Exposing TransitionBased.GetState for tests leaks internal state into the public API—consider moving it to a test‐only helper or interface to keep production code clean.

This has been removed many commits ago.

  • The new info‐level logging in computeUpdatedEntitiesNoPast will be very verbose in production; it’s better suited for debug level to avoid flooding the logs.

This function has been removed many commits ago.

@rhacs-bot
Copy link
Contributor

/retest

2 similar comments
@rhacs-bot
Copy link
Contributor

/retest

@rhacs-bot
Copy link
Contributor

/retest

@rhacs-bot
Copy link
Contributor

rhacs-bot commented Sep 24, 2025

Images are ready for the commit at 638b6d4.

To use with deploy scripts, first export MAIN_IMAGE_TAG=4.9.x-891-g638b6d47ab.

@vikin91 vikin91 force-pushed the piotr/ROX-30941-no-deduping-processes branch from ebb3641 to 638b6d4 Compare September 25, 2025 07:22
@vikin91 vikin91 requested a review from lvalerom September 25, 2025 07:34
@rhacs-bot
Copy link
Contributor

rhacs-bot commented Sep 25, 2025

Images are ready for the commit at 0264519.

To use with deploy scripts, first export MAIN_IMAGE_TAG=4.9.x-894-g0264519f93.

@vikin91
Copy link
Contributor Author

vikin91 commented Sep 26, 2025

Full graphical grafana dump

Enrichment-Update-Computer-Dashboards-Grafana-09-26-2025_05_29_PM Enrichment-of-Endpoints-Dashboards-Grafana-09-26-2025_05_30_PM Core-Dashboard-Dashboards-Grafana-09-26-2025_05_32_PM Enrichment-of-Connections-Dashboards-Grafana-09-26-2025_05_30_PM

Copy link
Contributor

@lvalerom lvalerom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really good. I didn't see anything blocking, most of my comments are clarifications or for test coverage.

@vikin91
Copy link
Contributor Author

vikin91 commented Oct 1, 2025

@lvalerom I resolved the comments and will merge the PR provided you are OOO and the code-freeze is approaching.
Two discussions could result in small follow-up changes - I am happy to introduce them once we finish the discussion.

@vikin91
Copy link
Contributor Author

vikin91 commented Oct 1, 2025

Unfortunately, I need to rebase due to #16915 :(

@vikin91 vikin91 force-pushed the piotr/ROX-30941-no-deduping-processes branch from 0264519 to 26fee46 Compare October 1, 2025 08:01
@rhacs-bot
Copy link
Contributor

rhacs-bot commented Oct 1, 2025

Images are ready for the commit at 26fee46.

To use with deploy scripts, first export MAIN_IMAGE_TAG=4.9.x-945-g26fee4684c.

@vikin91 vikin91 enabled auto-merge (squash) October 1, 2025 09:41
@openshift-ci
Copy link

openshift-ci bot commented Oct 1, 2025

@vikin91: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
ci/prow/ocp-4-19-nongroovy-e2e-tests 26fee46 link false /test ocp-4-19-nongroovy-e2e-tests

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

@rhacs-bot
Copy link
Contributor

/retest

@vikin91 vikin91 merged commit f39b48e into master Oct 1, 2025
164 of 174 checks passed
@vikin91 vikin91 deleted the piotr/ROX-30941-no-deduping-processes branch October 1, 2025 10:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ai-review area/sensor auto-retest PRs with this label will be automatically retested if prow checks fails ci-all-qa-tests Tells CI to run all API tests (not just BAT).

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants