diff --git a/scale/workloads/10-sensors.yaml b/scale/workloads/10-sensors.yaml index ca46d92a9a425..c521a027358b4 100644 --- a/scale/workloads/10-sensors.yaml +++ b/scale/workloads/10-sensors.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 5000 flowInterval: 30s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/scale/workloads/active-vulnmgmt.yaml b/scale/workloads/active-vulnmgmt.yaml index be575de890a65..c77a4a83b5868 100644 --- a/scale/workloads/active-vulnmgmt.yaml +++ b/scale/workloads/active-vulnmgmt.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 100 flowInterval: 1s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 500 rbacWorkload: diff --git a/scale/workloads/default.yaml b/scale/workloads/default.yaml index 1b8210cc49735..68b5505caa031 100755 --- a/scale/workloads/default.yaml +++ b/scale/workloads/default.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 100 flowInterval: 1s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/scale/workloads/high-alert.yaml b/scale/workloads/high-alert.yaml index 652f91a9b4cf7..ccfa77e8b7069 100755 --- a/scale/workloads/high-alert.yaml +++ b/scale/workloads/high-alert.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 100 flowInterval: 1s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/scale/workloads/long-running.yaml b/scale/workloads/long-running.yaml index aee062cd7ca50..752089da52dce 100755 --- a/scale/workloads/long-running.yaml +++ b/scale/workloads/long-running.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 100 flowInterval: 1s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/scale/workloads/np-load.yaml b/scale/workloads/np-load.yaml index 956cbdc2fa9ac..007645c07a85b 100755 --- a/scale/workloads/np-load.yaml +++ b/scale/workloads/np-load.yaml @@ -41,6 +41,7 @@ networkWorkload: batchSize: 100 flowInterval: 1s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/scale/workloads/okr-single-load.yaml b/scale/workloads/okr-single-load.yaml index 532ad8f542c79..4cfb7cdf956ed 100755 --- a/scale/workloads/okr-single-load.yaml +++ b/scale/workloads/okr-single-load.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 500 flowInterval: 24h generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/scale/workloads/rbac.yaml b/scale/workloads/rbac.yaml index 20d5840572e94..c21d15d844058 100755 --- a/scale/workloads/rbac.yaml +++ b/scale/workloads/rbac.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 100 flowInterval: 30s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 100 rbacWorkload: diff --git a/scale/workloads/sample.yaml b/scale/workloads/sample.yaml index 0ddc483261440..62010e7348037 100755 --- a/scale/workloads/sample.yaml +++ b/scale/workloads/sample.yaml @@ -58,4 +58,7 @@ networkWorkload: batchSize: 500 # whether to generate endpoints that will never be marked as closed generateUnclosedEndpoints: true + # a probability of reusing an existing open endpoint by a different process without closing the endpoint + # i.e., multiple processes listening on the same port. + openPortReuseProbability: 0.05 diff --git a/scale/workloads/scale-test.yaml b/scale/workloads/scale-test.yaml index aed256d5072d9..1240243b188f5 100755 --- a/scale/workloads/scale-test.yaml +++ b/scale/workloads/scale-test.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 100 flowInterval: 1s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/scale/workloads/small.yaml b/scale/workloads/small.yaml index 8a8d4aa7c2da4..64727047726a9 100755 --- a/scale/workloads/small.yaml +++ b/scale/workloads/small.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 100 flowInterval: 30s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 100 rbacWorkload: diff --git a/scale/workloads/vulnmgmt.yaml b/scale/workloads/vulnmgmt.yaml index d20a65a3eb8ca..abde1c3420d76 100755 --- a/scale/workloads/vulnmgmt.yaml +++ b/scale/workloads/vulnmgmt.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 100 flowInterval: 0 generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/scale/workloads/xlarge.yaml b/scale/workloads/xlarge.yaml index 7d1a2ca291700..0c2b59ec75adc 100755 --- a/scale/workloads/xlarge.yaml +++ b/scale/workloads/xlarge.yaml @@ -17,6 +17,7 @@ networkWorkload: batchSize: 500 flowInterval: 1s generateUnclosedEndpoints: true + openPortReuseProbability: 0.05 nodeWorkload: numNodes: 1000 rbacWorkload: diff --git a/sensor/kubernetes/fake/fake.go b/sensor/kubernetes/fake/fake.go index b605275df03ab..84d9bfcf70b92 100644 --- a/sensor/kubernetes/fake/fake.go +++ b/sensor/kubernetes/fake/fake.go @@ -2,6 +2,8 @@ package fake import ( "context" + "fmt" + "math" "os" "time" @@ -101,6 +103,7 @@ type WorkloadManager struct { containerPool *pool registeredHostConnections []manager.HostNetworkInfo workload *Workload + originatorCache *OriginatorCache // signals services servicesInitialized concurrency.Signal @@ -212,22 +215,38 @@ func NewWorkloadManager(config *WorkloadManagerConfig) *WorkloadManager { mgr := &WorkloadManager{ db: db, workload: &workload, - processPool: config.processPool, + originatorCache: NewOriginatorCache(), labelsPool: config.labelsPool, endpointPool: config.endpointPool, ipPool: config.ipPool, externalIpPool: config.externalIpPool, containerPool: config.containerPool, + processPool: config.processPool, servicesInitialized: concurrency.NewSignal(), } mgr.initializePreexistingResources() + if warn := validateWorkload(&workload); warn != nil { + log.Warnf("Validaing workload: %s", warn) + } + log.Info("Created Workload manager for workload") log.Infof("Workload: %s", string(data)) log.Infof("Rendered workload: %+v", workload) return mgr } +func validateWorkload(workload *Workload) error { + if workload.NetworkWorkload.OpenPortReuseProbability < 0.0 || workload.NetworkWorkload.OpenPortReuseProbability > 1.0 { + corrected := math.Min(1.0, math.Max(0.0, workload.NetworkWorkload.OpenPortReuseProbability)) + workload.NetworkWorkload.OpenPortReuseProbability = corrected + return fmt.Errorf("incorrect probability value %.2f for 'openPortReuseProbability', "+ + "rounding to %.2f", workload.NetworkWorkload.OpenPortReuseProbability, corrected) + } + // More validation checks can be added in the future + return nil +} + // SetSignalHandlers sets the handlers that will accept runtime data to be mocked from collector func (w *WorkloadManager) SetSignalHandlers(processPipeline signal.Pipeline, networkManager manager.Manager) { w.processes = processPipeline @@ -334,5 +353,5 @@ func (w *WorkloadManager) initializePreexistingResources() { go w.manageNetworkPolicy(context.Background(), resource) } - go w.manageFlows(context.Background(), w.workload.NetworkWorkload) + go w.manageFlows(context.Background()) } diff --git a/sensor/kubernetes/fake/flows.go b/sensor/kubernetes/fake/flows.go index 39c2e5071979f..61d0e6021b44f 100644 --- a/sensor/kubernetes/fake/flows.go +++ b/sensor/kubernetes/fake/flows.go @@ -2,18 +2,91 @@ package fake import ( "context" + "fmt" "math/rand" "time" "github.com/stackrox/rox/generated/internalapi/sensor" "github.com/stackrox/rox/generated/storage" + "github.com/stackrox/rox/pkg/concurrency" "github.com/stackrox/rox/pkg/net" "github.com/stackrox/rox/pkg/protocompat" + "github.com/stackrox/rox/pkg/sync" "github.com/stackrox/rox/pkg/timestamp" "github.com/stackrox/rox/sensor/common/networkflow/manager" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// OriginatorCache stores originators by endpoint key (IP:port) to provide consistent +// process-to-endpoint mappings with configurable reuse probability. +// This improves test realism by simulating how processes typically bind to consistent endpoints. +// +// This cache implements probabilistic reuse to simulate: +// - High probability case: Single process consistently binds to the endpoint; a new process can bind +// to the same endpoint only after sending a close message. +// - Low probability case: New process takes over the endpoint without sending a close message +// (port is shared by multiple processes). +type OriginatorCache struct { + cache map[string]*storage.NetworkProcessUniqueKey + lock sync.RWMutex +} + +// NewOriginatorCache creates a new cache for storing endpoint-to-originator mappings. +func NewOriginatorCache() *OriginatorCache { + return &OriginatorCache{ + cache: make(map[string]*storage.NetworkProcessUniqueKey), + } +} + +// GetOrSetOriginator retrieves a cached originator for the endpoint or generates a new one. +// The portSharingProbability parameter controls the probability of reusing an existing open endpoint by +// a different process versus generating a new endpoint for the process (range: 0.0-1.0). +// +// This caching mechanism improves test realism by ensuring that network endpoints +// typically close the endpoint before changing the originator (e.g., 95% default), while still allowing +// for on-the-fly process changes (e.g., 5% for multiple processes reusing the same port in parallel). +// +// Note that generating multiple 'open' endpoints with the same IP and Port but different originators +// without a 'close' message in between is not a realistic scenario and occurs very rarely in production +// (practically only when a process deliberately abuses the same IP and Port in parallel to a different process). +// +// Setting the portSharingProbability to an unrealistic value (anything higher than 0.05) would cause additional memory-pressure +// in Sensor enrichment pipeline because the deduping key for processes contains the IP, Port and the originator. +// Note that if Sensor sees an open endpoint for and then another open endpoint for +// , then Sensor will keep the nginx-entry forever, as there was no 'close' message in between. +// +// The probability logic is explicit and configurable for different testing scenarios. +func (oc *OriginatorCache) GetOrSetOriginator(endpointKey string, containerID string, openPortReuseProbability float64, processPool *ProcessPool) *storage.NetworkProcessUniqueKey { + originator, exists := concurrency.WithRLock2(&oc.lock, func() (*storage.NetworkProcessUniqueKey, bool) { + originator, exists := oc.cache[endpointKey] + return originator, exists + }) + + if exists && rand.Float64() > openPortReuseProbability { + // Use the previously-known process for the same endpoint. + return originator + } + // Generate a new originator with probability `openPortReuseProbability`. + // This simulates when multiple processes listen on the same port + newOriginator := getRandomOriginator(containerID, processPool) + // We update the cache only on cache miss. + // In case of openPortReuse, we keep the original originator in the cache, as this is a rare event. + if !exists { + concurrency.WithLock(&oc.lock, func() { + oc.cache[endpointKey] = newOriginator + }) + } + + return newOriginator +} + +// Clear removes all cached originators. Used for cleanup between test runs. +func (oc *OriginatorCache) Clear() { + concurrency.WithLock(&oc.lock, func() { + oc.cache = make(map[string]*storage.NetworkProcessUniqueKey) + }) +} + func (w *WorkloadManager) getRandomHostConnection(ctx context.Context) (manager.HostNetworkInfo, bool) { // Return false if the network manager hasn't been initialized yet if !w.servicesInitialized.IsDone() { @@ -106,20 +179,29 @@ func (w *WorkloadManager) getRandomSrcDst() (string, string, bool) { return src, dst, ok } +// getRandomNetworkEndpoint generates a network endpoint with consistent originator caching. +// Uses probabilistic caching (configurable via workload.OpenPortReuseProbability) to simulate +// realistic process-to-endpoint binding behavior in containerized environments. func (w *WorkloadManager) getRandomNetworkEndpoint(containerID string) (*sensor.NetworkEndpoint, bool) { - originator := getRandomOriginator(containerID, w.processPool) - ip, ok := w.ipPool.randomElem() if !ok { return nil, false } + port := rand.Uint32() % 63556 + + // Create endpoint key from IP and port for caching + endpointKey := fmt.Sprintf("%s:%d", ip, port) + + // Get or set originator for this endpoint with configurable reuse probability + originator := w.originatorCache.GetOrSetOriginator(endpointKey, containerID, w.workload.NetworkWorkload.OpenPortReuseProbability, w.processPool) + networkEndpoint := &sensor.NetworkEndpoint{ SocketFamily: sensor.SocketFamily_SOCKET_FAMILY_IPV4, Protocol: storage.L4Protocol_L4_PROTOCOL_TCP, ListenAddress: &sensor.NetworkAddress{ AddressData: net.ParseIP(ip).AsNetIP(), - Port: rand.Uint32() % 63556, + Port: port, }, ContainerId: containerID, CloseTimestamp: nil, @@ -129,10 +211,10 @@ func (w *WorkloadManager) getRandomNetworkEndpoint(containerID string) (*sensor. return networkEndpoint, ok } -func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) *sensor.NetworkConnectionInfo { - conns := make([]*sensor.NetworkConnection, 0, workload.BatchSize) - networkEndpoints := make([]*sensor.NetworkEndpoint, 0, workload.BatchSize) - for i := 0; i < workload.BatchSize; i++ { +func (w *WorkloadManager) getFakeNetworkConnectionInfo() *sensor.NetworkConnectionInfo { + conns := make([]*sensor.NetworkConnection, 0, w.workload.NetworkWorkload.BatchSize) + networkEndpoints := make([]*sensor.NetworkEndpoint, 0, w.workload.NetworkWorkload.BatchSize) + for i := 0; i < w.workload.NetworkWorkload.BatchSize; i++ { src, dst, ok := w.getRandomSrcDst() if !ok { continue @@ -155,7 +237,7 @@ func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) if w.endpointPool.Size < w.endpointPool.Capacity { w.endpointPool.add(networkEndpoint) } - if workload.GenerateUnclosedEndpoints { + if w.workload.NetworkWorkload.GenerateUnclosedEndpoints { // These endpoints will not be closed - i.e., CloseTimestamp will be always nil. networkEndpoints = append(networkEndpoints, networkEndpoint) } @@ -182,12 +264,12 @@ func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) } // manageFlows should be called via `go manageFlows` as it will run forever -func (w *WorkloadManager) manageFlows(ctx context.Context, workload NetworkWorkload) { - if workload.FlowInterval == 0 { +func (w *WorkloadManager) manageFlows(ctx context.Context) { + if w.workload.NetworkWorkload.FlowInterval == 0 { return } // Pick a valid pod - ticker := time.NewTicker(workload.FlowInterval) + ticker := time.NewTicker(w.workload.NetworkWorkload.FlowInterval) defer ticker.Stop() generateExternalIPPool(w.externalIpPool) @@ -199,7 +281,7 @@ func (w *WorkloadManager) manageFlows(ctx context.Context, workload NetworkWorkl case <-ticker.C: } - networkConnectionInfo := w.getFakeNetworkConnectionInfo(workload) + networkConnectionInfo := w.getFakeNetworkConnectionInfo() hostConn, ok := w.getRandomHostConnection(ctx) if !ok { continue diff --git a/sensor/kubernetes/fake/flows_test.go b/sensor/kubernetes/fake/flows_test.go index eb8bda7fdf45e..24beda2ff639e 100644 --- a/sensor/kubernetes/fake/flows_test.go +++ b/sensor/kubernetes/fake/flows_test.go @@ -3,22 +3,36 @@ package fake import ( "testing" + "github.com/stackrox/rox/generated/storage" "github.com/stackrox/rox/pkg/net" "github.com/stretchr/testify/suite" ) type flowsSuite struct { suite.Suite + containerID string + endpointKey string + processPool *ProcessPool } func TestFlowsSuite(t *testing.T) { suite.Run(t, new(flowsSuite)) } +func (s *flowsSuite) SetupSuite() { + // containerID must be 12 chars due to impl details of `getActiveProcesses`. + s.containerID = "container123" + s.endpointKey = "10.0.0.1:8080" + s.processPool = newProcessPool() + for _, process := range getActiveProcesses(s.containerID) { + s.processPool.add(process) + } +} + func (s *flowsSuite) TestGetRandomInternalExternalIP() { w := &WorkloadManager{ endpointPool: newEndpointPool(), - processPool: newProcessPool(), + processPool: s.processPool, ipPool: newPool(), externalIpPool: newPool(), containerPool: newPool(), @@ -46,3 +60,50 @@ func (s *flowsSuite) TestGetRandomInternalExternalIP() { s.True(ok) } } + +func (s *flowsSuite) TestOriginatorCache_BasicCaching() { + cache := NewOriginatorCache() + + // Manually seed the cache with a known originator + seedOriginator := &storage.NetworkProcessUniqueKey{ + ProcessName: "cached-process", + ProcessExecFilePath: "/usr/bin/cached-process", + ProcessArgs: "cached args", + } + cache.cache[s.endpointKey] = seedOriginator + + // With 0.0 probability, it should return the cached originator + for range 10 { + originator := cache.GetOrSetOriginator(s.endpointKey, s.containerID, 0.0, s.processPool) + s.NotNil(originator) + s.Equal("cached-process", originator.ProcessName) + s.Equal("/usr/bin/cached-process", originator.ProcessExecFilePath) + s.Equal("cached args", originator.ProcessArgs) + } +} + +func (s *flowsSuite) TestOriginatorCache_ProbabilityCaching() { + cache := NewOriginatorCache() + + seedOriginator := &storage.NetworkProcessUniqueKey{ + ProcessName: "cached-process", + ProcessExecFilePath: "/usr/bin/cached-process", + ProcessArgs: "cached args", + } + cache.cache[s.endpointKey] = seedOriginator + numCacheMisses := 0 + + s.T().Logf("Testing with endpoint key: %s, number of processes in pool: %d", s.endpointKey, len(s.processPool.Processes[s.containerID])) + + for range 100_000 { + originator := cache.GetOrSetOriginator(s.endpointKey, s.containerID, 0.05, s.processPool) + s.Require().NotNil(originator) + if originator != seedOriginator { + numCacheMisses++ + } + } + s.Len(cache.cache, 1, "Cache should have 1 entry") + got := float64(numCacheMisses) / 100_000 + s.T().Logf("Observed probability of reusing port: %f", got) + s.InDelta(0.05, got, 0.02, "Cache miss rate should be close to 0.05 (range <0.03,0.07>)") +} diff --git a/sensor/kubernetes/fake/workload.go b/sensor/kubernetes/fake/workload.go index 5fd866ad5e8b9..799116e289bad 100644 --- a/sensor/kubernetes/fake/workload.go +++ b/sensor/kubernetes/fake/workload.go @@ -45,6 +45,10 @@ type NetworkWorkload struct { FlowInterval time.Duration `yaml:"flowInterval"` BatchSize int `yaml:"batchSize"` GenerateUnclosedEndpoints bool `yaml:"generateUnclosedEndpoints"` + // OpenPortReuseProbability is the probability of reusing an existing open endpoint + // by a different process without closing the endpoint. + // In releases 4.8 and older, this was not configurable and was always set to 1.0. + OpenPortReuseProbability float64 `yaml:"openPortReuseProbability"` } // PodWorkload defines the workload and lifecycle of the pods within a deployment