Skip to content

ROX-29060: Refactor network flow enrichment#15076

Merged
vikin91 merged 52 commits intomasterfrom
piotr/ROX-29060-refactor-enrichment
Aug 6, 2025
Merged

ROX-29060: Refactor network flow enrichment#15076
vikin91 merged 52 commits intomasterfrom
piotr/ROX-29060-refactor-enrichment

Conversation

@vikin91
Copy link
Contributor

@vikin91 vikin91 commented Apr 24, 2025

Description

  • Merge endpoint enrichment for network graph and plop into one function
  • Gather logging and metrics collection in one place
  • Decouple result of enrichment from the action to be taken (retry, remove, etc) to make the process easy to test
  • Reorganize code among few files to get rid of one huge sensor/common/networkflow/manager/manager_impl.go and split to multiple files (that makes the diff look so large)
  • Add unit tests with major coverage
  • NO major CHANGE IN THE ENRICHMENT BEHAVIOR is intended - there are tiny deviations from this. Here is what I have noticed:
    1. On successful enrichment of closed endpoints, we no longer delete the entry from activeEndpoints. Instead, CheckRemove action is executed, where we check before deleting. Benefit: This ensures that checks for network graph and PLoP are consulted before making decision about deleting.
    2. When (1) endpoint lookup is successful, (2) a connection is incoming and (3) connection is local, then such connection will be reported twice to sensor: as incoming and outgoing. We no longer retry the enrichment, of the incoming part of the connection. Instead, we skip enrichment of the incoming part without retrying, while still processing the outgoing part as usual. Benefit: we skip few loops of retries that would always fail in the same way as long as the connection is active.

User-facing documentation

  • CHANGELOG is updated OR update is not needed
  • documentation PR is created and is linked above OR is not needed

Testing and quality

  • the change is production ready: the change is GA or otherwise the functionality is gated by a feature flag
  • CI results are inspected

Automated testing

  • added unit tests
  • added e2e tests
  • added regression tests
  • added compatibility tests
  • modified existing tests

How I validated my change

  • Extensive unit test coverage reaching 90% of the enrichment code
  • Manually on a long-running cluster with looking at the metrics
  • Full CI suite
Enrichment of Connections

for fake workloads running overnight. Using dashboard from #16151

Enrichment-of-Connections-Dashboards-Grafana-07-25-2025_08_25_AM
Enrichment of Endpoints

for fake workloads running overnight. Using dashboard from #16151

Enrichment-of-Endpoints-Dashboards-Grafana-07-25-2025_08_26_AM

@vikin91
Copy link
Contributor Author

vikin91 commented Apr 24, 2025

@openshift-ci
Copy link

openshift-ci bot commented Apr 24, 2025

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@rhacs-bot
Copy link
Contributor

rhacs-bot commented Apr 24, 2025

Images are ready for the commit at 32b6e2c.

To use with deploy scripts, first export MAIN_IMAGE_TAG=4.9.x-276-g32b6e2c351.

@codecov
Copy link

codecov bot commented Apr 24, 2025

Codecov Report

❌ Patch coverage is 85.97064% with 86 lines in your changes missing coverage. Please review.
✅ Project coverage is 49.08%. Comparing base (860991a) to head (7eb005c).
⚠️ Report is 16 commits behind head on master.

Files with missing lines Patch % Lines
sensor/common/networkflow/manager/manager_impl.go 61.16% 35 Missing and 5 partials ⚠️
...n/networkflow/manager/manager_enrich_connection.go 88.42% 24 Missing and 4 partials ⚠️
...mon/networkflow/manager/manager_enrich_endpoint.go 90.37% 16 Missing and 2 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master   #15076      +/-   ##
==========================================
+ Coverage   49.04%   49.08%   +0.03%     
==========================================
  Files        2625     2628       +3     
  Lines      193919   193991      +72     
==========================================
+ Hits        95117    95214      +97     
+ Misses      91350    91326      -24     
+ Partials     7452     7451       -1     
Flag Coverage Δ
go-unit-tests 49.08% <85.97%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@vikin91
Copy link
Contributor Author

vikin91 commented Apr 25, 2025

(WIP) State on Friday
Screenshot 2025-04-25 at 16 42 56

Base automatically changed from piotr/ROX-28259-the-sensor-fix to master April 30, 2025 06:24
@vikin91 vikin91 force-pushed the piotr/ROX-29060-refactor-enrichment branch from 2e9f47d to 76efe70 Compare April 30, 2025 06:57
@vikin91 vikin91 added ci-all-qa-tests Tells CI to run all API tests (not just BAT). auto-retest PRs with this label will be automatically retested if prow checks fails labels May 5, 2025
@vikin91 vikin91 force-pushed the piotr/ROX-29060-refactor-enrichment branch from 0815ba1 to d62fd48 Compare July 8, 2025 10:55
@red-hat-konflux
Copy link
Contributor

Caution

There are some errors in your PipelineRun template.

PipelineRun Error
quay-proxy no kind "ImageDigestMirrorSet" is registered for version "config.openshift.io/v1" in scheme "k8s.io/client-go/kubernetes/scheme/register.go:83"

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @vikin91 - I've reviewed your changes and found some issues that need to be addressed.

Blocking issues:

  • purgeHostConns now uses a RWMutex, but the function still mutates the underlying data. (link)

General comments:

  • With the introduction of concurrency.WithLock and WithRLock wrappers, please double-check lock acquisition order across methods to prevent potential deadlocks under contention.
  • The isUpdated helper nicely consolidates timestamp comparisons—please add a brief comment explaining how InfiniteFuture is used to distinguish open vs closed state for future maintainers.
  • There’s similar send-to-central logic in sendConnsEps and sendProcesses; consider extracting a shared helper to reduce boilerplate and ensure consistent metrics handling.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- With the introduction of concurrency.WithLock and WithRLock wrappers, please double-check lock acquisition order across methods to prevent potential deadlocks under contention.
- The isUpdated helper nicely consolidates timestamp comparisons—please add a brief comment explaining how InfiniteFuture is used to distinguish open vs closed state for future maintainers.
- There’s similar send-to-central logic in sendConnsEps and sendProcesses; consider extracting a shared helper to reduce boilerplate and ensure consistent metrics handling.

## Individual Comments

### Comment 1
<location> `sensor/common/networkflow/manager/manager_impl.go:690` </location>
<code_context>
 	return conns, conns.currentSequenceID
 }

 func (m *networkFlowManager) deleteHostConnections(hostname string) {
-	m.connectionsByHostMutex.Lock()
-	defer m.connectionsByHostMutex.Unlock()
</code_context>

<issue_to_address>
The locking order in deleteHostConnections has changed and could introduce deadlocks.

Please verify that all code paths acquire connectionsByHostMutex and conns.mutex in the same order to prevent potential deadlocks.
</issue_to_address>

### Comment 2
<location> `sensor/common/networkflow/manager/manager_impl.go:732` </location>
<code_context>

 func (h *hostConnections) Process(networkInfo *sensor.NetworkConnectionInfo, nowTimestamp timestamp.MicroTS, sequenceID int64) error {
 	flowMetrics.NetworkConnectionInfoMessagesRcvd.With(prometheus.Labels{"Hostname": h.hostname}).Inc()
-	now := timestamp.Now()
</code_context>

<issue_to_address>
The order of sequenceID checks and locking in hostConnections.Process has changed.

Checking sequenceID before acquiring the lock may introduce a race condition if sequenceID can change concurrently. Please verify if this order is safe, or consider locking before the check.
</issue_to_address>

### Comment 3
<location> `sensor/common/networkflow/manager/purger.go:155` </location>
<code_context>
 		numPurgedActiveEp, numPurgedActiveConn, numPurgedHostEp, numPurgedHostConn)
 }

-func purgeHostConns(mutex *sync.Mutex, maxAge time.Duration, enrichmentQueue map[string]*hostConnections, store EntityStore) (numPurgedEps, numPurgedConns int) {
+func purgeHostConns(mutex *sync.RWMutex, maxAge time.Duration, enrichmentQueue map[string]*hostConnections, store EntityStore) (numPurgedEps, numPurgedConns int) {
 	timer := prometheus.NewTimer(flowMetrics.ActiveEndpointsPurgerDuration.WithLabelValues("hostConns"))
</code_context>

<issue_to_address>
purgeHostConns now uses a RWMutex, but the function still mutates the underlying data.

Using RWMutex.RLock here is unsafe because the function mutates enrichmentQueue. Use Lock instead, or ensure all mutations are properly synchronized.
</issue_to_address>

### Comment 4
<location> `sensor/common/networkflow/manager/manager_enrich_endpoint.go:30` </location>
<code_context>
+	EnrichmentReasonEpSuccessInactive      EnrichmentReasonEp = "success-inactive"
+)
+
+func (m *networkFlowManager) enrichHostContainerEndpoints(now timestamp.MicroTS, hostConns *hostConnections, enrichedEndpoints map[containerEndpointIndicator]timestamp.MicroTS, processesListening map[processListeningIndicator]timestamp.MicroTS) {
+	hostConns.mutex.Lock()
+	defer hostConns.mutex.Unlock()
</code_context>

<issue_to_address>
The endpoint enrichment logic now updates both endpoint and process metrics, which may cause double-counting.

Since updateEndpointMetric increments both FlowEnrichmentEventsEndpoint and HostProcessesEnrichmentEvents, please confirm if counting the same event in both metrics is intentional or if this could lead to misleading monitoring data.
</issue_to_address>

### Comment 5
<location> `sensor/common/networkflow/manager/manager_sending_test.go:143` </location>
<code_context>
-	b.assertOneUpdatedCloseEndpoint()
-}
-
-func (b *sendNetflowsSuite) TestCloseEndpointFailedLookup() {
-	b.expectFailedLookup(1)
-
</code_context>

<issue_to_address>
Missing test for endpoint with partial enrichment (e.g., PLOP succeeds, NG fails).

Add a test case where network graph enrichment fails but PLOP enrichment succeeds to verify correct handling of partial enrichment results.

Suggested implementation:

```golang
}

func (b *sendNetflowsSuite) TestCloseEndpoint() {
	b.expectContainerLookups(1)

	b.updateEp(createEndpointPair(timestamp.Now().Add(-time.Hour), timestamp.Now()).lastSeen(timestamp.Now()))
	b.thenTickerTicks()
	b.assertOneUpdatedCloseEndpoint()
}

// TestCloseEndpointPartialEnrichment verifies that when PLOP enrichment succeeds but NG enrichment fails,
// the system correctly handles the partial enrichment result.
func (b *sendNetflowsSuite) TestCloseEndpointPartialEnrichment() {
	// Simulate PLOP enrichment success
	b.expectContainerLookups(1)
	// Simulate NG enrichment failure
	b.expectFailedLookup(1)

	// Update endpoint as in the other tests
	b.updateEp(createEndpointPair(timestamp.Now().Add(-time.Hour), timestamp.Now()).lastSeen(timestamp.Now()))
	b.thenTickerTicks()

	// Assert that the endpoint is updated with partial enrichment (PLOP only)
	b.assertOneUpdatedCloseEndpointPartialEnrichment()
}

```

You will need to implement the helper/assertion method `assertOneUpdatedCloseEndpointPartialEnrichment()` in your test suite, or adapt the assertion to match your codebase's conventions for checking partial enrichment results.
</issue_to_address>

### Comment 6
<location> `sensor/common/networkflow/manager/manager_sending_test.go:256` </location>
<code_context>
-	b.Assert().NotEqual(int32(0), netflowUpdate.NetworkFlowUpdate.GetUpdatedEndpoints()[0].GetLastActiveTimestamp().GetNanos(), "the endpoint should not be open")
-}
-
-func mustNotRead[T any](t *testing.T, ch chan T) {
-	select {
-	case <-ch:
</code_context>

<issue_to_address>
Consider adding comments to helper functions for clarity.

Adding brief comments to helper functions like mustNotRead, mustReadTimeout, and mustSendWithoutBlock will help future contributors understand their purpose.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@vikin91 vikin91 force-pushed the piotr/ROX-29060-refactor-enrichment branch from 0bd24ad to fc66fda Compare July 17, 2025 14:40
@stackrox stackrox deleted a comment from openshift-ci bot Jul 17, 2025
@vikin91 vikin91 marked this pull request as ready for review July 22, 2025 07:18
@vikin91 vikin91 requested a review from a team as a code owner July 22, 2025 07:18
@vikin91 vikin91 requested review from rhybrillou and removed request for a team July 22, 2025 07:18
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @vikin91 - I've reviewed your changes - here's some feedback:

  • The connection and endpoint enrichment logic share many patterns—consider extracting shared code paths (e.g., retry logic and container‐ID grace‐period checks) to reduce duplication and ease future maintenance.
  • There’s substantial test duplication in the new enrichment and sending tests—consolidating common setup and assertion helpers would simplify the test suite and make it easier to extend.
  • The constant clusterEntityResolutionWaitPeriod is still hardcoded while ContainerIDResolutionGracePeriod is configurable—consider making both durations configurable or clearly document their distinct purposes to avoid confusion.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The connection and endpoint enrichment logic share many patterns—consider extracting shared code paths (e.g., retry logic and container‐ID grace‐period checks) to reduce duplication and ease future maintenance.
- There’s substantial test duplication in the new enrichment and sending tests—consolidating common setup and assertion helpers would simplify the test suite and make it easier to extend.
- The constant clusterEntityResolutionWaitPeriod is still hardcoded while ContainerIDResolutionGracePeriod is configurable—consider making both durations configurable or clearly document their distinct purposes to avoid confusion.

## Individual Comments

### Comment 1
<location> `sensor/common/networkflow/manager/manager_impl.go:635` </location>
<code_context>
 }

 func computeUpdatedProcesses(current map[processListeningIndicator]timestamp.MicroTS, previous map[processListeningIndicator]timestamp.MicroTS, previousMutex *sync.RWMutex) []*storage.ProcessListeningOnPortFromSensor {
+	if !env.ProcessesListeningOnPort.BooleanSetting() {
+		return []*storage.ProcessListeningOnPortFromSensor{}
</code_context>

<issue_to_address>
Early return disables process updates if the feature flag is off, but this may mask issues.

Consider adding a log or metric if 'current' is non-empty when the feature is disabled, to help detect unexpected process data and potential misconfigurations.

Suggested implementation:

```golang
func computeUpdatedProcesses(current map[processListeningIndicator]timestamp.MicroTS, previous map[processListeningIndicator]timestamp.MicroTS, previousMutex *sync.RWMutex) []*storage.ProcessListeningOnPortFromSensor {
	if !env.ProcessesListeningOnPort.BooleanSetting() {
		if len(current) > 0 {
			log.Warnf("Received %d process(es) while ProcessesListeningOnPort feature is disabled. This may indicate a misconfiguration.", len(current))
			// Optionally, increment a metric here if a metrics system is available.
			// metrics.IncProcessDataWhileFeatureDisabled(len(current))
		}
		return []*storage.ProcessListeningOnPortFromSensor{}
	}

```

- If your codebase uses a custom logger or metrics system, replace `log.Warnf` and the commented metric line with the appropriate call.
- If the `log` package is not imported, add `import "log"` or the relevant logger import at the top of the file.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @vikin91 - I've reviewed your changes - here's some feedback:

  • Standardize concurrency idioms by choosing either concurrency.WithLock wrappers or explicit mutex.Lock()/Unlock() calls consistently to improve readability and avoid lock-order confusion.
  • Consider consolidating all enrichment result and action enums (EnrichmentResult, PostEnrichmentAction, EnrichmentReasonConn, EnrichmentReasonEp) into a single documented file or package doc so they’re easier to discover and maintain.
  • Add a focused unit test or code comment for the isUpdated logic to explicitly cover the OPEN→CLOSED transition and more‐recent‐activity case, since it’s critical and non‐obvious behavior.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Standardize concurrency idioms by choosing either `concurrency.WithLock` wrappers or explicit `mutex.Lock()/Unlock()` calls consistently to improve readability and avoid lock-order confusion.
- Consider consolidating all enrichment result and action enums (`EnrichmentResult`, `PostEnrichmentAction`, `EnrichmentReasonConn`, `EnrichmentReasonEp`) into a single documented file or package doc so they’re easier to discover and maintain.
- Add a focused unit test or code comment for the `isUpdated` logic to explicitly cover the OPEN→CLOSED transition and more‐recent‐activity case, since it’s critical and non‐obvious behavior.

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@vikin91
Copy link
Contributor Author

vikin91 commented Jul 22, 2025

  • Standardize concurrency idioms by choosing either concurrency.WithLock wrappers or explicit mutex.Lock()/Unlock() calls consistently to improve readability and avoid lock-order confusion.
  • Consider consolidating all enrichment result and action enums (EnrichmentResult, PostEnrichmentAction, EnrichmentReasonConn, EnrichmentReasonEp) into a single documented file or package doc so they’re easier to discover and maintain.
  • Add a focused unit test or code comment for the isUpdated logic to explicitly cover the OPEN→CLOSED transition and more‐recent‐activity case, since it’s critical and non‐obvious behavior.

I addressed 2nd and third point in 859a669 and 86abf4c.

The first point follows the style that we use - concurrency.WithLock is used for a block of lines within a function, whereas mutex.Lock()/Unlock() is used for the entire function or the rest of a function. I will keep it this way, unless there are comments from human reviewers.

@vikin91 vikin91 force-pushed the piotr/ROX-29060-refactor-enrichment branch from da96bc3 to 7eb005c Compare August 5, 2025 14:59
@openshift-ci
Copy link

openshift-ci bot commented Aug 5, 2025

@vikin91: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
ci/prow/ocp-4-18-qa-e2e-tests da96bc3 link false /test ocp-4-18-qa-e2e-tests

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

@rhacs-bot
Copy link
Contributor

/retest

@vikin91 vikin91 merged commit 9cb0fd9 into master Aug 6, 2025
97 checks passed
@vikin91 vikin91 deleted the piotr/ROX-29060-refactor-enrichment branch August 6, 2025 09:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ai-review area/sensor auto-retest PRs with this label will be automatically retested if prow checks fails ci-all-qa-tests Tells CI to run all API tests (not just BAT).

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants