From d7b5a9e9e48157c1f1488c4cce1b61d3d62ff01d Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Tue, 2 Sep 2025 17:54:45 +0200 Subject: [PATCH 1/5] Limit unrealistic port reuse when generating fake workload --- scale/workloads/10-sensors.yaml | 1 + scale/workloads/active-vulnmgmt.yaml | 1 + scale/workloads/default.yaml | 1 + scale/workloads/high-alert.yaml | 1 + scale/workloads/long-running.yaml | 1 + scale/workloads/np-load.yaml | 1 + scale/workloads/okr-single-load.yaml | 1 + scale/workloads/rbac.yaml | 1 + scale/workloads/sample.yaml | 3 + scale/workloads/scale-test.yaml | 1 + scale/workloads/small.yaml | 1 + scale/workloads/vulnmgmt.yaml | 1 + scale/workloads/xlarge.yaml | 1 + sensor/kubernetes/fake/fake.go | 4 +- sensor/kubernetes/fake/flows.go | 93 ++++++++++++++++++++++++++-- sensor/kubernetes/fake/flows_test.go | 63 ++++++++++++++++++- sensor/kubernetes/fake/workload.go | 4 ++ 17 files changed, 172 insertions(+), 7 deletions(-) diff --git a/scale/workloads/10-sensors.yaml b/scale/workloads/10-sensors.yaml index ca46d92a9a425..c521a027358b4 100644 --- a/scale/workloads/10-sensors.yaml +++ b/scale/workloads/10-sensors.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 5000 flowInterval: 30s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/scale/workloads/active-vulnmgmt.yaml b/scale/workloads/active-vulnmgmt.yaml index be575de890a65..c77a4a83b5868 100644 --- a/scale/workloads/active-vulnmgmt.yaml +++ b/scale/workloads/active-vulnmgmt.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 100 flowInterval: 1s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 500 rbacWorkload: diff --git a/scale/workloads/default.yaml b/scale/workloads/default.yaml index 1b8210cc49735..68b5505caa031 100755 --- a/scale/workloads/default.yaml +++ b/scale/workloads/default.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 100 flowInterval: 1s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/scale/workloads/high-alert.yaml b/scale/workloads/high-alert.yaml index 652f91a9b4cf7..ccfa77e8b7069 100755 --- a/scale/workloads/high-alert.yaml +++ b/scale/workloads/high-alert.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 100 flowInterval: 1s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/scale/workloads/long-running.yaml b/scale/workloads/long-running.yaml index aee062cd7ca50..752089da52dce 100755 --- a/scale/workloads/long-running.yaml +++ b/scale/workloads/long-running.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 100 flowInterval: 1s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/scale/workloads/np-load.yaml b/scale/workloads/np-load.yaml index 956cbdc2fa9ac..007645c07a85b 100755 --- a/scale/workloads/np-load.yaml +++ b/scale/workloads/np-load.yaml @@ -41,6 +41,7 @@ networkWorkload: batchSize: 100 flowInterval: 1s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/scale/workloads/okr-single-load.yaml b/scale/workloads/okr-single-load.yaml index 532ad8f542c79..4cfb7cdf956ed 100755 --- a/scale/workloads/okr-single-load.yaml +++ b/scale/workloads/okr-single-load.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 500 flowInterval: 24h generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/scale/workloads/rbac.yaml b/scale/workloads/rbac.yaml index 20d5840572e94..c21d15d844058 100755 --- a/scale/workloads/rbac.yaml +++ b/scale/workloads/rbac.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 100 flowInterval: 30s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 100 rbacWorkload: diff --git a/scale/workloads/sample.yaml b/scale/workloads/sample.yaml index 0ddc483261440..62010e7348037 100755 --- a/scale/workloads/sample.yaml +++ b/scale/workloads/sample.yaml @@ -58,4 +58,7 @@ networkWorkload: batchSize: 500 # whether to generate endpoints that will never be marked as closed generateUnclosedEndpoints: true + # a probability of reusing an existing open endpoint by a different process without closing the endpoint + # i.e., multiple processes listening on the same port. + openPortReuseProbability: 0.05 diff --git a/scale/workloads/scale-test.yaml b/scale/workloads/scale-test.yaml index aed256d5072d9..1240243b188f5 100755 --- a/scale/workloads/scale-test.yaml +++ b/scale/workloads/scale-test.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 100 flowInterval: 1s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/scale/workloads/small.yaml b/scale/workloads/small.yaml index 8a8d4aa7c2da4..64727047726a9 100755 --- a/scale/workloads/small.yaml +++ b/scale/workloads/small.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 100 flowInterval: 30s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 100 rbacWorkload: diff --git a/scale/workloads/vulnmgmt.yaml b/scale/workloads/vulnmgmt.yaml index d20a65a3eb8ca..abde1c3420d76 100755 --- a/scale/workloads/vulnmgmt.yaml +++ b/scale/workloads/vulnmgmt.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 100 flowInterval: 0 generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/scale/workloads/xlarge.yaml b/scale/workloads/xlarge.yaml index 7d1a2ca291700..0c2b59ec75adc 100755 --- a/scale/workloads/xlarge.yaml +++ b/scale/workloads/xlarge.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 500 flowInterval: 1s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/sensor/kubernetes/fake/fake.go b/sensor/kubernetes/fake/fake.go index b605275df03ab..abc864107d231 100644 --- a/sensor/kubernetes/fake/fake.go +++ b/sensor/kubernetes/fake/fake.go @@ -101,6 +101,7 @@ type WorkloadManager struct { containerPool *pool registeredHostConnections []manager.HostNetworkInfo workload *Workload + originatorCache *OriginatorCache // signals services servicesInitialized concurrency.Signal @@ -212,12 +213,13 @@ func NewWorkloadManager(config *WorkloadManagerConfig) *WorkloadManager { mgr := &WorkloadManager{ db: db, workload: &workload, - processPool: config.processPool, + originatorCache: NewOriginatorCache(), // Dependency injection labelsPool: config.labelsPool, endpointPool: config.endpointPool, ipPool: config.ipPool, externalIpPool: config.externalIpPool, containerPool: config.containerPool, + processPool: config.processPool, servicesInitialized: concurrency.NewSignal(), } mgr.initializePreexistingResources() diff --git a/sensor/kubernetes/fake/flows.go b/sensor/kubernetes/fake/flows.go index 39c2e5071979f..62cb60dea2d73 100644 --- a/sensor/kubernetes/fake/flows.go +++ b/sensor/kubernetes/fake/flows.go @@ -2,18 +2,92 @@ package fake import ( "context" + "fmt" "math/rand" "time" "github.com/stackrox/rox/generated/internalapi/sensor" "github.com/stackrox/rox/generated/storage" + "github.com/stackrox/rox/pkg/concurrency" "github.com/stackrox/rox/pkg/net" "github.com/stackrox/rox/pkg/protocompat" + "github.com/stackrox/rox/pkg/sync" "github.com/stackrox/rox/pkg/timestamp" "github.com/stackrox/rox/sensor/common/networkflow/manager" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// OriginatorCache stores originators by endpoint key (IP:port) to provide consistent +// process-to-endpoint mappings with configurable reuse probability. +// This improves test realism by simulating how processes typically bind to consistent endpoints. +// +// This cache implements probabilistic reuse to simulate: +// - High probability case: Process consistently binds to the same endpoint (normal operation) +// - Low probability case: New process takes over endpoint (restart/port reuse scenarios) +type OriginatorCache struct { + cache map[string]*storage.NetworkProcessUniqueKey + lock sync.RWMutex +} + +// NewOriginatorCache creates a new cache for storing endpoint-to-originator mappings. +// This constructor enables dependency injection instead of relying on global state. +func NewOriginatorCache() *OriginatorCache { + return &OriginatorCache{ + cache: make(map[string]*storage.NetworkProcessUniqueKey), + } +} + +// GetOrSetOriginator retrieves a cached originator for the endpoint or generates a new one. +// The portSharingProbability parameter controls the probability of reusing an existing open endpoint by a different process +// versus generating a new endpoint for the process (range: 0.0-1.0). +// +// This caching mechanism improves test realism by ensuring that network endpoints +// typically close the endpoint before changing the originator (e.g., 95% default), while still allowing +// for on-the-fly process changes (e.g., 5% for multiple proccesses reusing the same port in parallel). +// +// Note that generating multiple 'open' endpoints with the same IP and Port but different originators +// without a 'close' message in between is not a realistic scenario and occurs very rarely in production +// (practically only when a process deliberately abuses the same IP and Port in parallel to a different process). +// +// Setting the portSharingProbability to an unrealistic value (anything higher than 0.05) would cause additional memory-pressure +// in Sensor enrichment pipeline because the deduping key contains the IP, Port and the originator. +// Note that if Sensor sees an open endpoint for and then another open endpoint for +// , then Sensor will keep the nginx-entry forever, as there was no 'close' message in between. +// +// The probability logic is explicit and configurable for different testing scenarios. +func (oc *OriginatorCache) GetOrSetOriginator(endpointKey string, containerID string, openPortReuseProbability float32, processPool *ProcessPool) *storage.NetworkProcessUniqueKey { + // Use panic-safe read lock to check cache + originator, exists := concurrency.WithRLock2(&oc.lock, func() (*storage.NetworkProcessUniqueKey, bool) { + originator, exists := oc.cache[endpointKey] + return originator, exists + }) + + if exists && rand.Float32() > openPortReuseProbability { + // Use the same process for the same endpoint + return originator + } + // Generate a new originator with probability `openPortReuseProbability`. + // This simulates when multiple processes listen on the same port + newOriginator := getRandomOriginator(containerID, processPool) + // We update the cache only on cache miss. + // In case of openPortReuse, we keep the original originator in the cache, as this is a rare event. + if !exists { + concurrency.WithLock(&oc.lock, func() { + oc.cache[endpointKey] = newOriginator + }) + } + + return newOriginator +} + +// Clear removes all cached originators. Used for cleanup between test runs. +func (oc *OriginatorCache) Clear() { + // Use panic-safe lock for cache reset + concurrency.WithLock(&oc.lock, func() { + oc.cache = make(map[string]*storage.NetworkProcessUniqueKey) + }) +} + func (w *WorkloadManager) getRandomHostConnection(ctx context.Context) (manager.HostNetworkInfo, bool) { // Return false if the network manager hasn't been initialized yet if !w.servicesInitialized.IsDone() { @@ -106,20 +180,29 @@ func (w *WorkloadManager) getRandomSrcDst() (string, string, bool) { return src, dst, ok } -func (w *WorkloadManager) getRandomNetworkEndpoint(containerID string) (*sensor.NetworkEndpoint, bool) { - originator := getRandomOriginator(containerID, w.processPool) - +// getRandomNetworkEndpoint generates a network endpoint with consistent originator caching. +// Uses probabilistic caching (configurable via workload.OpenPortReuseProbability) to simulate +// realistic process-to-endpoint binding behavior in containerized environments. +func (w *WorkloadManager) getRandomNetworkEndpoint(containerID string, workload NetworkWorkload) (*sensor.NetworkEndpoint, bool) { ip, ok := w.ipPool.randomElem() if !ok { return nil, false } + port := rand.Uint32() % 63556 + + // Create endpoint key from IP and port for caching + endpointKey := fmt.Sprintf("%s:%d", ip, port) + + // Get or set originator for this endpoint with configurable reuse probability + originator := w.originatorCache.GetOrSetOriginator(endpointKey, containerID, workload.OpenPortReuseProbability, w.processPool) + networkEndpoint := &sensor.NetworkEndpoint{ SocketFamily: sensor.SocketFamily_SOCKET_FAMILY_IPV4, Protocol: storage.L4Protocol_L4_PROTOCOL_TCP, ListenAddress: &sensor.NetworkAddress{ AddressData: net.ParseIP(ip).AsNetIP(), - Port: rand.Uint32() % 63556, + Port: port, }, ContainerId: containerID, CloseTimestamp: nil, @@ -145,7 +228,7 @@ func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) } conn := makeNetworkConnection(src, dst, containerID, time.Now().Add(-5*time.Second)) - networkEndpoint, ok := w.getRandomNetworkEndpoint(containerID) + networkEndpoint, ok := w.getRandomNetworkEndpoint(containerID, workload) if !ok { log.Error("Found no IPs in the internal pool") continue diff --git a/sensor/kubernetes/fake/flows_test.go b/sensor/kubernetes/fake/flows_test.go index eb8bda7fdf45e..24beda2ff639e 100644 --- a/sensor/kubernetes/fake/flows_test.go +++ b/sensor/kubernetes/fake/flows_test.go @@ -3,22 +3,36 @@ package fake import ( "testing" + "github.com/stackrox/rox/generated/storage" "github.com/stackrox/rox/pkg/net" "github.com/stretchr/testify/suite" ) type flowsSuite struct { suite.Suite + containerID string + endpointKey string + processPool *ProcessPool } func TestFlowsSuite(t *testing.T) { suite.Run(t, new(flowsSuite)) } +func (s *flowsSuite) SetupSuite() { + // containerID must be 12 chars due to impl details of `getActiveProcesses`. + s.containerID = "container123" + s.endpointKey = "10.0.0.1:8080" + s.processPool = newProcessPool() + for _, process := range getActiveProcesses(s.containerID) { + s.processPool.add(process) + } +} + func (s *flowsSuite) TestGetRandomInternalExternalIP() { w := &WorkloadManager{ endpointPool: newEndpointPool(), - processPool: newProcessPool(), + processPool: s.processPool, ipPool: newPool(), externalIpPool: newPool(), containerPool: newPool(), @@ -46,3 +60,50 @@ func (s *flowsSuite) TestGetRandomInternalExternalIP() { s.True(ok) } } + +func (s *flowsSuite) TestOriginatorCache_BasicCaching() { + cache := NewOriginatorCache() + + // Manually seed the cache with a known originator + seedOriginator := &storage.NetworkProcessUniqueKey{ + ProcessName: "cached-process", + ProcessExecFilePath: "/usr/bin/cached-process", + ProcessArgs: "cached args", + } + cache.cache[s.endpointKey] = seedOriginator + + // With 0.0 probability, it should return the cached originator + for range 10 { + originator := cache.GetOrSetOriginator(s.endpointKey, s.containerID, 0.0, s.processPool) + s.NotNil(originator) + s.Equal("cached-process", originator.ProcessName) + s.Equal("/usr/bin/cached-process", originator.ProcessExecFilePath) + s.Equal("cached args", originator.ProcessArgs) + } +} + +func (s *flowsSuite) TestOriginatorCache_ProbabilityCaching() { + cache := NewOriginatorCache() + + seedOriginator := &storage.NetworkProcessUniqueKey{ + ProcessName: "cached-process", + ProcessExecFilePath: "/usr/bin/cached-process", + ProcessArgs: "cached args", + } + cache.cache[s.endpointKey] = seedOriginator + numCacheMisses := 0 + + s.T().Logf("Testing with endpoint key: %s, number of processes in pool: %d", s.endpointKey, len(s.processPool.Processes[s.containerID])) + + for range 100_000 { + originator := cache.GetOrSetOriginator(s.endpointKey, s.containerID, 0.05, s.processPool) + s.Require().NotNil(originator) + if originator != seedOriginator { + numCacheMisses++ + } + } + s.Len(cache.cache, 1, "Cache should have 1 entry") + got := float64(numCacheMisses) / 100_000 + s.T().Logf("Observed probability of reusing port: %f", got) + s.InDelta(0.05, got, 0.02, "Cache miss rate should be close to 0.05 (range <0.03,0.07>)") +} diff --git a/sensor/kubernetes/fake/workload.go b/sensor/kubernetes/fake/workload.go index 5fd866ad5e8b9..3955a16b1c5d6 100644 --- a/sensor/kubernetes/fake/workload.go +++ b/sensor/kubernetes/fake/workload.go @@ -45,6 +45,10 @@ type NetworkWorkload struct { FlowInterval time.Duration `yaml:"flowInterval"` BatchSize int `yaml:"batchSize"` GenerateUnclosedEndpoints bool `yaml:"generateUnclosedEndpoints"` + // OpenPortReuseProbability is the probability of reusing an existing open endpoint + // by a different process without closing the endpoint. + // In releases 4.8 and older, this was not configurable and was always set to1.0. + OpenPortReuseProbability float32 `yaml:"openPortReuseProbability"` } // PodWorkload defines the workload and lifecycle of the pods within a deployment From 72a20e424a9a1dc373f9ec913266e673f99dad86 Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Tue, 9 Sep 2025 11:15:06 +0200 Subject: [PATCH 2/5] Self-review: Remove unnecesary func args --- sensor/kubernetes/fake/fake.go | 4 +-- sensor/kubernetes/fake/flows.go | 44 ++++++++++++++++----------------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/sensor/kubernetes/fake/fake.go b/sensor/kubernetes/fake/fake.go index abc864107d231..eaef67885a6d4 100644 --- a/sensor/kubernetes/fake/fake.go +++ b/sensor/kubernetes/fake/fake.go @@ -213,7 +213,7 @@ func NewWorkloadManager(config *WorkloadManagerConfig) *WorkloadManager { mgr := &WorkloadManager{ db: db, workload: &workload, - originatorCache: NewOriginatorCache(), // Dependency injection + originatorCache: NewOriginatorCache(), labelsPool: config.labelsPool, endpointPool: config.endpointPool, ipPool: config.ipPool, @@ -336,5 +336,5 @@ func (w *WorkloadManager) initializePreexistingResources() { go w.manageNetworkPolicy(context.Background(), resource) } - go w.manageFlows(context.Background(), w.workload.NetworkWorkload) + go w.manageFlows(context.Background()) } diff --git a/sensor/kubernetes/fake/flows.go b/sensor/kubernetes/fake/flows.go index 62cb60dea2d73..2fa124a5afe23 100644 --- a/sensor/kubernetes/fake/flows.go +++ b/sensor/kubernetes/fake/flows.go @@ -22,15 +22,16 @@ import ( // This improves test realism by simulating how processes typically bind to consistent endpoints. // // This cache implements probabilistic reuse to simulate: -// - High probability case: Process consistently binds to the same endpoint (normal operation) -// - Low probability case: New process takes over endpoint (restart/port reuse scenarios) +// - High probability case: Single process consistently binds to the endpoint; a new process can bind +// to the same endpoint only after sending a close message. +// - Low probability case: New process takes over the endpoint without sending a close message +// (port is shared by multiple processes). type OriginatorCache struct { cache map[string]*storage.NetworkProcessUniqueKey lock sync.RWMutex } // NewOriginatorCache creates a new cache for storing endpoint-to-originator mappings. -// This constructor enables dependency injection instead of relying on global state. func NewOriginatorCache() *OriginatorCache { return &OriginatorCache{ cache: make(map[string]*storage.NetworkProcessUniqueKey), @@ -38,21 +39,21 @@ func NewOriginatorCache() *OriginatorCache { } // GetOrSetOriginator retrieves a cached originator for the endpoint or generates a new one. -// The portSharingProbability parameter controls the probability of reusing an existing open endpoint by a different process -// versus generating a new endpoint for the process (range: 0.0-1.0). +// The portSharingProbability parameter controls the probability of reusing an existing open endpoint by +// a different process versus generating a new endpoint for the process (range: 0.0-1.0). // // This caching mechanism improves test realism by ensuring that network endpoints // typically close the endpoint before changing the originator (e.g., 95% default), while still allowing -// for on-the-fly process changes (e.g., 5% for multiple proccesses reusing the same port in parallel). +// for on-the-fly process changes (e.g., 5% for multiple processes reusing the same port in parallel). // // Note that generating multiple 'open' endpoints with the same IP and Port but different originators // without a 'close' message in between is not a realistic scenario and occurs very rarely in production // (practically only when a process deliberately abuses the same IP and Port in parallel to a different process). // // Setting the portSharingProbability to an unrealistic value (anything higher than 0.05) would cause additional memory-pressure -// in Sensor enrichment pipeline because the deduping key contains the IP, Port and the originator. +// in Sensor enrichment pipeline because the deduping key for processes contains the IP, Port and the originator. // Note that if Sensor sees an open endpoint for and then another open endpoint for -// , then Sensor will keep the nginx-entry forever, as there was no 'close' message in between. +// , then Sensor will keep the nginx-entry forever, as there was no 'close' message in between. // // The probability logic is explicit and configurable for different testing scenarios. func (oc *OriginatorCache) GetOrSetOriginator(endpointKey string, containerID string, openPortReuseProbability float32, processPool *ProcessPool) *storage.NetworkProcessUniqueKey { @@ -63,7 +64,7 @@ func (oc *OriginatorCache) GetOrSetOriginator(endpointKey string, containerID st }) if exists && rand.Float32() > openPortReuseProbability { - // Use the same process for the same endpoint + // Use the previously-known process for the same endpoint. return originator } // Generate a new originator with probability `openPortReuseProbability`. @@ -82,7 +83,6 @@ func (oc *OriginatorCache) GetOrSetOriginator(endpointKey string, containerID st // Clear removes all cached originators. Used for cleanup between test runs. func (oc *OriginatorCache) Clear() { - // Use panic-safe lock for cache reset concurrency.WithLock(&oc.lock, func() { oc.cache = make(map[string]*storage.NetworkProcessUniqueKey) }) @@ -183,7 +183,7 @@ func (w *WorkloadManager) getRandomSrcDst() (string, string, bool) { // getRandomNetworkEndpoint generates a network endpoint with consistent originator caching. // Uses probabilistic caching (configurable via workload.OpenPortReuseProbability) to simulate // realistic process-to-endpoint binding behavior in containerized environments. -func (w *WorkloadManager) getRandomNetworkEndpoint(containerID string, workload NetworkWorkload) (*sensor.NetworkEndpoint, bool) { +func (w *WorkloadManager) getRandomNetworkEndpoint(containerID string) (*sensor.NetworkEndpoint, bool) { ip, ok := w.ipPool.randomElem() if !ok { return nil, false @@ -195,7 +195,7 @@ func (w *WorkloadManager) getRandomNetworkEndpoint(containerID string, workload endpointKey := fmt.Sprintf("%s:%d", ip, port) // Get or set originator for this endpoint with configurable reuse probability - originator := w.originatorCache.GetOrSetOriginator(endpointKey, containerID, workload.OpenPortReuseProbability, w.processPool) + originator := w.originatorCache.GetOrSetOriginator(endpointKey, containerID, w.workload.NetworkWorkload.OpenPortReuseProbability, w.processPool) networkEndpoint := &sensor.NetworkEndpoint{ SocketFamily: sensor.SocketFamily_SOCKET_FAMILY_IPV4, @@ -212,10 +212,10 @@ func (w *WorkloadManager) getRandomNetworkEndpoint(containerID string, workload return networkEndpoint, ok } -func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) *sensor.NetworkConnectionInfo { - conns := make([]*sensor.NetworkConnection, 0, workload.BatchSize) - networkEndpoints := make([]*sensor.NetworkEndpoint, 0, workload.BatchSize) - for i := 0; i < workload.BatchSize; i++ { +func (w *WorkloadManager) getFakeNetworkConnectionInfo() *sensor.NetworkConnectionInfo { + conns := make([]*sensor.NetworkConnection, 0, w.workload.NetworkWorkload.BatchSize) + networkEndpoints := make([]*sensor.NetworkEndpoint, 0, w.workload.NetworkWorkload.BatchSize) + for i := 0; i < w.workload.NetworkWorkload.BatchSize; i++ { src, dst, ok := w.getRandomSrcDst() if !ok { continue @@ -228,7 +228,7 @@ func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) } conn := makeNetworkConnection(src, dst, containerID, time.Now().Add(-5*time.Second)) - networkEndpoint, ok := w.getRandomNetworkEndpoint(containerID, workload) + networkEndpoint, ok := w.getRandomNetworkEndpoint(containerID) if !ok { log.Error("Found no IPs in the internal pool") continue @@ -238,7 +238,7 @@ func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) if w.endpointPool.Size < w.endpointPool.Capacity { w.endpointPool.add(networkEndpoint) } - if workload.GenerateUnclosedEndpoints { + if w.workload.NetworkWorkload.GenerateUnclosedEndpoints { // These endpoints will not be closed - i.e., CloseTimestamp will be always nil. networkEndpoints = append(networkEndpoints, networkEndpoint) } @@ -265,12 +265,12 @@ func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) } // manageFlows should be called via `go manageFlows` as it will run forever -func (w *WorkloadManager) manageFlows(ctx context.Context, workload NetworkWorkload) { - if workload.FlowInterval == 0 { +func (w *WorkloadManager) manageFlows(ctx context.Context) { + if w.workload.NetworkWorkload.FlowInterval == 0 { return } // Pick a valid pod - ticker := time.NewTicker(workload.FlowInterval) + ticker := time.NewTicker(w.workload.NetworkWorkload.FlowInterval) defer ticker.Stop() generateExternalIPPool(w.externalIpPool) @@ -282,7 +282,7 @@ func (w *WorkloadManager) manageFlows(ctx context.Context, workload NetworkWorkl case <-ticker.C: } - networkConnectionInfo := w.getFakeNetworkConnectionInfo(workload) + networkConnectionInfo := w.getFakeNetworkConnectionInfo() hostConn, ok := w.getRandomHostConnection(ctx) if !ok { continue From f82abc31a7f4a9bcddbea40f52d7afaeabf46f60 Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Tue, 9 Sep 2025 15:26:14 +0200 Subject: [PATCH 3/5] Validate the probability value --- sensor/kubernetes/fake/flows.go | 15 +++++++++++---- sensor/kubernetes/fake/workload.go | 2 +- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/sensor/kubernetes/fake/flows.go b/sensor/kubernetes/fake/flows.go index 2fa124a5afe23..d448f5d64d6e6 100644 --- a/sensor/kubernetes/fake/flows.go +++ b/sensor/kubernetes/fake/flows.go @@ -3,6 +3,7 @@ package fake import ( "context" "fmt" + "math" "math/rand" "time" @@ -55,15 +56,21 @@ func NewOriginatorCache() *OriginatorCache { // Note that if Sensor sees an open endpoint for and then another open endpoint for // , then Sensor will keep the nginx-entry forever, as there was no 'close' message in between. // -// The probability logic is explicit and configurable for different testing scenarios. -func (oc *OriginatorCache) GetOrSetOriginator(endpointKey string, containerID string, openPortReuseProbability float32, processPool *ProcessPool) *storage.NetworkProcessUniqueKey { - // Use panic-safe read lock to check cache +// The probability logic is explicit and configurable for differ{ent testing scenarios. +func (oc *OriginatorCache) GetOrSetOriginator(endpointKey string, containerID string, openPortReuseProbability float64, processPool *ProcessPool) *storage.NetworkProcessUniqueKey { + // Ensure that the probability is between 0.0 and 1.0. + prob := math.Min(1.0, math.Max(0.0, openPortReuseProbability)) + if openPortReuseProbability < 0.0 || openPortReuseProbability > 1.0 { + log.Warnf("Incorrect probability value %.2f for 'openPortReuseProbability', "+ + "rounding to: %.2f.", openPortReuseProbability, prob) + } + originator, exists := concurrency.WithRLock2(&oc.lock, func() (*storage.NetworkProcessUniqueKey, bool) { originator, exists := oc.cache[endpointKey] return originator, exists }) - if exists && rand.Float32() > openPortReuseProbability { + if exists && rand.Float64() > prob { // Use the previously-known process for the same endpoint. return originator } diff --git a/sensor/kubernetes/fake/workload.go b/sensor/kubernetes/fake/workload.go index 3955a16b1c5d6..2de9d7a4262e6 100644 --- a/sensor/kubernetes/fake/workload.go +++ b/sensor/kubernetes/fake/workload.go @@ -48,7 +48,7 @@ type NetworkWorkload struct { // OpenPortReuseProbability is the probability of reusing an existing open endpoint // by a different process without closing the endpoint. // In releases 4.8 and older, this was not configurable and was always set to1.0. - OpenPortReuseProbability float32 `yaml:"openPortReuseProbability"` + OpenPortReuseProbability float64 `yaml:"openPortReuseProbability"` } // PodWorkload defines the workload and lifecycle of the pods within a deployment From e7cdf921710ea0a1d451c018afd828143a378225 Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Tue, 16 Sep 2025 12:34:17 +0200 Subject: [PATCH 4/5] Do validation once. Correct typos --- sensor/kubernetes/fake/fake.go | 13 +++++++++++++ sensor/kubernetes/fake/flows.go | 8 ++------ sensor/kubernetes/fake/workload.go | 2 +- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/sensor/kubernetes/fake/fake.go b/sensor/kubernetes/fake/fake.go index eaef67885a6d4..f6ecc632f6cfe 100644 --- a/sensor/kubernetes/fake/fake.go +++ b/sensor/kubernetes/fake/fake.go @@ -2,6 +2,7 @@ package fake import ( "context" + "fmt" "os" "time" @@ -224,12 +225,24 @@ func NewWorkloadManager(config *WorkloadManagerConfig) *WorkloadManager { } mgr.initializePreexistingResources() + if warn := validateWorkload(workload); warn != nil { + log.Warnf("Validaing workload: %s", warn) + } + log.Info("Created Workload manager for workload") log.Infof("Workload: %s", string(data)) log.Infof("Rendered workload: %+v", workload) return mgr } +func validateWorkload(workload Workload) error { + if workload.NetworkWorkload.OpenPortReuseProbability < 0.0 || workload.NetworkWorkload.OpenPortReuseProbability > 1.0 { + return fmt.Errorf("incorrect probability value %.2f for 'openPortReuseProbability', "+ + "rounding to the nearest value from range <0.0, 1.0>", workload.NetworkWorkload.OpenPortReuseProbability) + } + return nil +} + // SetSignalHandlers sets the handlers that will accept runtime data to be mocked from collector func (w *WorkloadManager) SetSignalHandlers(processPipeline signal.Pipeline, networkManager manager.Manager) { w.processes = processPipeline diff --git a/sensor/kubernetes/fake/flows.go b/sensor/kubernetes/fake/flows.go index d448f5d64d6e6..2ce15593fa203 100644 --- a/sensor/kubernetes/fake/flows.go +++ b/sensor/kubernetes/fake/flows.go @@ -56,14 +56,10 @@ func NewOriginatorCache() *OriginatorCache { // Note that if Sensor sees an open endpoint for and then another open endpoint for // , then Sensor will keep the nginx-entry forever, as there was no 'close' message in between. // -// The probability logic is explicit and configurable for differ{ent testing scenarios. +// The probability logic is explicit and configurable for different testing scenarios. func (oc *OriginatorCache) GetOrSetOriginator(endpointKey string, containerID string, openPortReuseProbability float64, processPool *ProcessPool) *storage.NetworkProcessUniqueKey { - // Ensure that the probability is between 0.0 and 1.0. + // Ensure that the probability is between 0.0 and 1.0. Warning log has been produced in `validateWorkload`. prob := math.Min(1.0, math.Max(0.0, openPortReuseProbability)) - if openPortReuseProbability < 0.0 || openPortReuseProbability > 1.0 { - log.Warnf("Incorrect probability value %.2f for 'openPortReuseProbability', "+ - "rounding to: %.2f.", openPortReuseProbability, prob) - } originator, exists := concurrency.WithRLock2(&oc.lock, func() (*storage.NetworkProcessUniqueKey, bool) { originator, exists := oc.cache[endpointKey] diff --git a/sensor/kubernetes/fake/workload.go b/sensor/kubernetes/fake/workload.go index 2de9d7a4262e6..799116e289bad 100644 --- a/sensor/kubernetes/fake/workload.go +++ b/sensor/kubernetes/fake/workload.go @@ -47,7 +47,7 @@ type NetworkWorkload struct { GenerateUnclosedEndpoints bool `yaml:"generateUnclosedEndpoints"` // OpenPortReuseProbability is the probability of reusing an existing open endpoint // by a different process without closing the endpoint. - // In releases 4.8 and older, this was not configurable and was always set to1.0. + // In releases 4.8 and older, this was not configurable and was always set to 1.0. OpenPortReuseProbability float64 `yaml:"openPortReuseProbability"` } From ddb5b450d06b9d116b0f3e0d2ebdc503abb45323 Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Thu, 18 Sep 2025 17:55:23 +0200 Subject: [PATCH 5/5] Round the OpenPortReuseProbability when validating workload --- sensor/kubernetes/fake/fake.go | 10 +++++++--- sensor/kubernetes/fake/flows.go | 6 +----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sensor/kubernetes/fake/fake.go b/sensor/kubernetes/fake/fake.go index f6ecc632f6cfe..84d9bfcf70b92 100644 --- a/sensor/kubernetes/fake/fake.go +++ b/sensor/kubernetes/fake/fake.go @@ -3,6 +3,7 @@ package fake import ( "context" "fmt" + "math" "os" "time" @@ -225,7 +226,7 @@ func NewWorkloadManager(config *WorkloadManagerConfig) *WorkloadManager { } mgr.initializePreexistingResources() - if warn := validateWorkload(workload); warn != nil { + if warn := validateWorkload(&workload); warn != nil { log.Warnf("Validaing workload: %s", warn) } @@ -235,11 +236,14 @@ func NewWorkloadManager(config *WorkloadManagerConfig) *WorkloadManager { return mgr } -func validateWorkload(workload Workload) error { +func validateWorkload(workload *Workload) error { if workload.NetworkWorkload.OpenPortReuseProbability < 0.0 || workload.NetworkWorkload.OpenPortReuseProbability > 1.0 { + corrected := math.Min(1.0, math.Max(0.0, workload.NetworkWorkload.OpenPortReuseProbability)) + workload.NetworkWorkload.OpenPortReuseProbability = corrected return fmt.Errorf("incorrect probability value %.2f for 'openPortReuseProbability', "+ - "rounding to the nearest value from range <0.0, 1.0>", workload.NetworkWorkload.OpenPortReuseProbability) + "rounding to %.2f", workload.NetworkWorkload.OpenPortReuseProbability, corrected) } + // More validation checks can be added in the future return nil } diff --git a/sensor/kubernetes/fake/flows.go b/sensor/kubernetes/fake/flows.go index 2ce15593fa203..61d0e6021b44f 100644 --- a/sensor/kubernetes/fake/flows.go +++ b/sensor/kubernetes/fake/flows.go @@ -3,7 +3,6 @@ package fake import ( "context" "fmt" - "math" "math/rand" "time" @@ -58,15 +57,12 @@ func NewOriginatorCache() *OriginatorCache { // // The probability logic is explicit and configurable for different testing scenarios. func (oc *OriginatorCache) GetOrSetOriginator(endpointKey string, containerID string, openPortReuseProbability float64, processPool *ProcessPool) *storage.NetworkProcessUniqueKey { - // Ensure that the probability is between 0.0 and 1.0. Warning log has been produced in `validateWorkload`. - prob := math.Min(1.0, math.Max(0.0, openPortReuseProbability)) - originator, exists := concurrency.WithRLock2(&oc.lock, func() (*storage.NetworkProcessUniqueKey, bool) { originator, exists := oc.cache[endpointKey] return originator, exists }) - if exists && rand.Float64() > prob { + if exists && rand.Float64() > openPortReuseProbability { // Use the previously-known process for the same endpoint. return originator }