ROX-29060: Refactor network flow enrichment#15076
Conversation
|
This change is part of the following stack: Change managed by git-spice. |
|
Skipping CI for Draft Pull Request. |
|
Images are ready for the commit at 32b6e2c. To use with deploy scripts, first |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #15076 +/- ##
==========================================
+ Coverage 49.04% 49.08% +0.03%
==========================================
Files 2625 2628 +3
Lines 193919 193991 +72
==========================================
+ Hits 95117 95214 +97
+ Misses 91350 91326 -24
+ Partials 7452 7451 -1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
15fdd4a to
4e09b2e
Compare
2e9f47d to
76efe70
Compare
0815ba1 to
d62fd48
Compare
|
Caution There are some errors in your PipelineRun template.
|
There was a problem hiding this comment.
Hey @vikin91 - I've reviewed your changes and found some issues that need to be addressed.
Blocking issues:
- purgeHostConns now uses a RWMutex, but the function still mutates the underlying data. (link)
General comments:
- With the introduction of concurrency.WithLock and WithRLock wrappers, please double-check lock acquisition order across methods to prevent potential deadlocks under contention.
- The isUpdated helper nicely consolidates timestamp comparisons—please add a brief comment explaining how InfiniteFuture is used to distinguish open vs closed state for future maintainers.
- There’s similar send-to-central logic in sendConnsEps and sendProcesses; consider extracting a shared helper to reduce boilerplate and ensure consistent metrics handling.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- With the introduction of concurrency.WithLock and WithRLock wrappers, please double-check lock acquisition order across methods to prevent potential deadlocks under contention.
- The isUpdated helper nicely consolidates timestamp comparisons—please add a brief comment explaining how InfiniteFuture is used to distinguish open vs closed state for future maintainers.
- There’s similar send-to-central logic in sendConnsEps and sendProcesses; consider extracting a shared helper to reduce boilerplate and ensure consistent metrics handling.
## Individual Comments
### Comment 1
<location> `sensor/common/networkflow/manager/manager_impl.go:690` </location>
<code_context>
return conns, conns.currentSequenceID
}
func (m *networkFlowManager) deleteHostConnections(hostname string) {
- m.connectionsByHostMutex.Lock()
- defer m.connectionsByHostMutex.Unlock()
</code_context>
<issue_to_address>
The locking order in deleteHostConnections has changed and could introduce deadlocks.
Please verify that all code paths acquire connectionsByHostMutex and conns.mutex in the same order to prevent potential deadlocks.
</issue_to_address>
### Comment 2
<location> `sensor/common/networkflow/manager/manager_impl.go:732` </location>
<code_context>
func (h *hostConnections) Process(networkInfo *sensor.NetworkConnectionInfo, nowTimestamp timestamp.MicroTS, sequenceID int64) error {
flowMetrics.NetworkConnectionInfoMessagesRcvd.With(prometheus.Labels{"Hostname": h.hostname}).Inc()
- now := timestamp.Now()
</code_context>
<issue_to_address>
The order of sequenceID checks and locking in hostConnections.Process has changed.
Checking sequenceID before acquiring the lock may introduce a race condition if sequenceID can change concurrently. Please verify if this order is safe, or consider locking before the check.
</issue_to_address>
### Comment 3
<location> `sensor/common/networkflow/manager/purger.go:155` </location>
<code_context>
numPurgedActiveEp, numPurgedActiveConn, numPurgedHostEp, numPurgedHostConn)
}
-func purgeHostConns(mutex *sync.Mutex, maxAge time.Duration, enrichmentQueue map[string]*hostConnections, store EntityStore) (numPurgedEps, numPurgedConns int) {
+func purgeHostConns(mutex *sync.RWMutex, maxAge time.Duration, enrichmentQueue map[string]*hostConnections, store EntityStore) (numPurgedEps, numPurgedConns int) {
timer := prometheus.NewTimer(flowMetrics.ActiveEndpointsPurgerDuration.WithLabelValues("hostConns"))
</code_context>
<issue_to_address>
purgeHostConns now uses a RWMutex, but the function still mutates the underlying data.
Using RWMutex.RLock here is unsafe because the function mutates enrichmentQueue. Use Lock instead, or ensure all mutations are properly synchronized.
</issue_to_address>
### Comment 4
<location> `sensor/common/networkflow/manager/manager_enrich_endpoint.go:30` </location>
<code_context>
+ EnrichmentReasonEpSuccessInactive EnrichmentReasonEp = "success-inactive"
+)
+
+func (m *networkFlowManager) enrichHostContainerEndpoints(now timestamp.MicroTS, hostConns *hostConnections, enrichedEndpoints map[containerEndpointIndicator]timestamp.MicroTS, processesListening map[processListeningIndicator]timestamp.MicroTS) {
+ hostConns.mutex.Lock()
+ defer hostConns.mutex.Unlock()
</code_context>
<issue_to_address>
The endpoint enrichment logic now updates both endpoint and process metrics, which may cause double-counting.
Since updateEndpointMetric increments both FlowEnrichmentEventsEndpoint and HostProcessesEnrichmentEvents, please confirm if counting the same event in both metrics is intentional or if this could lead to misleading monitoring data.
</issue_to_address>
### Comment 5
<location> `sensor/common/networkflow/manager/manager_sending_test.go:143` </location>
<code_context>
- b.assertOneUpdatedCloseEndpoint()
-}
-
-func (b *sendNetflowsSuite) TestCloseEndpointFailedLookup() {
- b.expectFailedLookup(1)
-
</code_context>
<issue_to_address>
Missing test for endpoint with partial enrichment (e.g., PLOP succeeds, NG fails).
Add a test case where network graph enrichment fails but PLOP enrichment succeeds to verify correct handling of partial enrichment results.
Suggested implementation:
```golang
}
func (b *sendNetflowsSuite) TestCloseEndpoint() {
b.expectContainerLookups(1)
b.updateEp(createEndpointPair(timestamp.Now().Add(-time.Hour), timestamp.Now()).lastSeen(timestamp.Now()))
b.thenTickerTicks()
b.assertOneUpdatedCloseEndpoint()
}
// TestCloseEndpointPartialEnrichment verifies that when PLOP enrichment succeeds but NG enrichment fails,
// the system correctly handles the partial enrichment result.
func (b *sendNetflowsSuite) TestCloseEndpointPartialEnrichment() {
// Simulate PLOP enrichment success
b.expectContainerLookups(1)
// Simulate NG enrichment failure
b.expectFailedLookup(1)
// Update endpoint as in the other tests
b.updateEp(createEndpointPair(timestamp.Now().Add(-time.Hour), timestamp.Now()).lastSeen(timestamp.Now()))
b.thenTickerTicks()
// Assert that the endpoint is updated with partial enrichment (PLOP only)
b.assertOneUpdatedCloseEndpointPartialEnrichment()
}
```
You will need to implement the helper/assertion method `assertOneUpdatedCloseEndpointPartialEnrichment()` in your test suite, or adapt the assertion to match your codebase's conventions for checking partial enrichment results.
</issue_to_address>
### Comment 6
<location> `sensor/common/networkflow/manager/manager_sending_test.go:256` </location>
<code_context>
- b.Assert().NotEqual(int32(0), netflowUpdate.NetworkFlowUpdate.GetUpdatedEndpoints()[0].GetLastActiveTimestamp().GetNanos(), "the endpoint should not be open")
-}
-
-func mustNotRead[T any](t *testing.T, ch chan T) {
- select {
- case <-ch:
</code_context>
<issue_to_address>
Consider adding comments to helper functions for clarity.
Adding brief comments to helper functions like mustNotRead, mustReadTimeout, and mustSendWithoutBlock will help future contributors understand their purpose.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
0bd24ad to
fc66fda
Compare
There was a problem hiding this comment.
Hey @vikin91 - I've reviewed your changes - here's some feedback:
- The connection and endpoint enrichment logic share many patterns—consider extracting shared code paths (e.g., retry logic and container‐ID grace‐period checks) to reduce duplication and ease future maintenance.
- There’s substantial test duplication in the new enrichment and sending tests—consolidating common setup and assertion helpers would simplify the test suite and make it easier to extend.
- The constant clusterEntityResolutionWaitPeriod is still hardcoded while ContainerIDResolutionGracePeriod is configurable—consider making both durations configurable or clearly document their distinct purposes to avoid confusion.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The connection and endpoint enrichment logic share many patterns—consider extracting shared code paths (e.g., retry logic and container‐ID grace‐period checks) to reduce duplication and ease future maintenance.
- There’s substantial test duplication in the new enrichment and sending tests—consolidating common setup and assertion helpers would simplify the test suite and make it easier to extend.
- The constant clusterEntityResolutionWaitPeriod is still hardcoded while ContainerIDResolutionGracePeriod is configurable—consider making both durations configurable or clearly document their distinct purposes to avoid confusion.
## Individual Comments
### Comment 1
<location> `sensor/common/networkflow/manager/manager_impl.go:635` </location>
<code_context>
}
func computeUpdatedProcesses(current map[processListeningIndicator]timestamp.MicroTS, previous map[processListeningIndicator]timestamp.MicroTS, previousMutex *sync.RWMutex) []*storage.ProcessListeningOnPortFromSensor {
+ if !env.ProcessesListeningOnPort.BooleanSetting() {
+ return []*storage.ProcessListeningOnPortFromSensor{}
</code_context>
<issue_to_address>
Early return disables process updates if the feature flag is off, but this may mask issues.
Consider adding a log or metric if 'current' is non-empty when the feature is disabled, to help detect unexpected process data and potential misconfigurations.
Suggested implementation:
```golang
func computeUpdatedProcesses(current map[processListeningIndicator]timestamp.MicroTS, previous map[processListeningIndicator]timestamp.MicroTS, previousMutex *sync.RWMutex) []*storage.ProcessListeningOnPortFromSensor {
if !env.ProcessesListeningOnPort.BooleanSetting() {
if len(current) > 0 {
log.Warnf("Received %d process(es) while ProcessesListeningOnPort feature is disabled. This may indicate a misconfiguration.", len(current))
// Optionally, increment a metric here if a metrics system is available.
// metrics.IncProcessDataWhileFeatureDisabled(len(current))
}
return []*storage.ProcessListeningOnPortFromSensor{}
}
```
- If your codebase uses a custom logger or metrics system, replace `log.Warnf` and the commented metric line with the appropriate call.
- If the `log` package is not imported, add `import "log"` or the relevant logger import at the top of the file.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
There was a problem hiding this comment.
Hey @vikin91 - I've reviewed your changes - here's some feedback:
- Standardize concurrency idioms by choosing either
concurrency.WithLockwrappers or explicitmutex.Lock()/Unlock()calls consistently to improve readability and avoid lock-order confusion. - Consider consolidating all enrichment result and action enums (
EnrichmentResult,PostEnrichmentAction,EnrichmentReasonConn,EnrichmentReasonEp) into a single documented file or package doc so they’re easier to discover and maintain. - Add a focused unit test or code comment for the
isUpdatedlogic to explicitly cover the OPEN→CLOSED transition and more‐recent‐activity case, since it’s critical and non‐obvious behavior.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Standardize concurrency idioms by choosing either `concurrency.WithLock` wrappers or explicit `mutex.Lock()/Unlock()` calls consistently to improve readability and avoid lock-order confusion.
- Consider consolidating all enrichment result and action enums (`EnrichmentResult`, `PostEnrichmentAction`, `EnrichmentReasonConn`, `EnrichmentReasonEp`) into a single documented file or package doc so they’re easier to discover and maintain.
- Add a focused unit test or code comment for the `isUpdated` logic to explicitly cover the OPEN→CLOSED transition and more‐recent‐activity case, since it’s critical and non‐obvious behavior.Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
I addressed 2nd and third point in 859a669 and 86abf4c. The first point follows the style that we use - |
da96bc3 to
7eb005c
Compare
|
@vikin91: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
|
/retest |

Description
sensor/common/networkflow/manager/manager_impl.goand split to multiple files (that makes the diff look so large)activeEndpoints. Instead,CheckRemoveaction is executed, where we check before deleting. Benefit: This ensures that checks for network graph and PLoP are consulted before making decision about deleting.User-facing documentation
Testing and quality
Automated testing
How I validated my change
Enrichment of Connections
for fake workloads running overnight. Using dashboard from #16151
Enrichment of Endpoints
for fake workloads running overnight. Using dashboard from #16151