Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scale/workloads/10-sensors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ networkWorkload:
batchSize: 5000
flowInterval: 30s
generateUnclosedEndpoints: true
openPortReuseProbability: 0.05
nodeWorkload:
numNodes: 1000
rbacWorkload:
Expand Down
1 change: 1 addition & 0 deletions scale/workloads/active-vulnmgmt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ networkWorkload:
batchSize: 100
flowInterval: 1s
generateUnclosedEndpoints: true
openPortReuseProbability: 0.05
nodeWorkload:
numNodes: 500
rbacWorkload:
Expand Down
1 change: 1 addition & 0 deletions scale/workloads/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ networkWorkload:
batchSize: 100
flowInterval: 1s
generateUnclosedEndpoints: true
openPortReuseProbability: 0.05
nodeWorkload:
numNodes: 1000
rbacWorkload:
Expand Down
1 change: 1 addition & 0 deletions scale/workloads/high-alert.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ networkWorkload:
batchSize: 100
flowInterval: 1s
generateUnclosedEndpoints: true
openPortReuseProbability: 0.05
nodeWorkload:
numNodes: 1000
rbacWorkload:
Expand Down
1 change: 1 addition & 0 deletions scale/workloads/long-running.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ networkWorkload:
batchSize: 100
flowInterval: 1s
generateUnclosedEndpoints: true
openPortReuseProbability: 0.05
nodeWorkload:
numNodes: 1000
rbacWorkload:
Expand Down
1 change: 1 addition & 0 deletions scale/workloads/np-load.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ networkWorkload:
batchSize: 100
flowInterval: 1s
generateUnclosedEndpoints: true
openPortReuseProbability: 0.05
nodeWorkload:
numNodes: 1000
rbacWorkload:
Expand Down
1 change: 1 addition & 0 deletions scale/workloads/okr-single-load.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ networkWorkload:
batchSize: 500
flowInterval: 24h
generateUnclosedEndpoints: true
openPortReuseProbability: 0.05
nodeWorkload:
numNodes: 1000
rbacWorkload:
Expand Down
1 change: 1 addition & 0 deletions scale/workloads/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ networkWorkload:
batchSize: 100
flowInterval: 30s
generateUnclosedEndpoints: true
openPortReuseProbability: 0.05
nodeWorkload:
numNodes: 100
rbacWorkload:
Expand Down
3 changes: 3 additions & 0 deletions scale/workloads/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,7 @@ networkWorkload:
batchSize: 500
# whether to generate endpoints that will never be marked as closed
generateUnclosedEndpoints: true
# a probability of reusing an existing open endpoint by a different process without closing the endpoint
# i.e., multiple processes listening on the same port.
openPortReuseProbability: 0.05

1 change: 1 addition & 0 deletions scale/workloads/scale-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ networkWorkload:
batchSize: 100
flowInterval: 1s
generateUnclosedEndpoints: true
openPortReuseProbability: 0.05
nodeWorkload:
numNodes: 1000
rbacWorkload:
Expand Down
1 change: 1 addition & 0 deletions scale/workloads/small.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ networkWorkload:
batchSize: 100
flowInterval: 30s
generateUnclosedEndpoints: true
openPortReuseProbability: 0.05
nodeWorkload:
numNodes: 100
rbacWorkload:
Expand Down
1 change: 1 addition & 0 deletions scale/workloads/vulnmgmt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ networkWorkload:
batchSize: 100
flowInterval: 0
generateUnclosedEndpoints: true
openPortReuseProbability: 0.05
nodeWorkload:
numNodes: 1000
rbacWorkload:
Expand Down
1 change: 1 addition & 0 deletions scale/workloads/xlarge.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ networkWorkload:
batchSize: 500
flowInterval: 1s
generateUnclosedEndpoints: true
openPortReuseProbability: 0.05
nodeWorkload:
numNodes: 1000
rbacWorkload:
Expand Down
23 changes: 21 additions & 2 deletions sensor/kubernetes/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package fake

import (
"context"
"fmt"
"math"
"os"
"time"

Expand Down Expand Up @@ -101,6 +103,7 @@ type WorkloadManager struct {
containerPool *pool
registeredHostConnections []manager.HostNetworkInfo
workload *Workload
originatorCache *OriginatorCache

// signals services
servicesInitialized concurrency.Signal
Expand Down Expand Up @@ -212,22 +215,38 @@ func NewWorkloadManager(config *WorkloadManagerConfig) *WorkloadManager {
mgr := &WorkloadManager{
db: db,
workload: &workload,
processPool: config.processPool,
originatorCache: NewOriginatorCache(),
labelsPool: config.labelsPool,
endpointPool: config.endpointPool,
ipPool: config.ipPool,
externalIpPool: config.externalIpPool,
containerPool: config.containerPool,
processPool: config.processPool,
servicesInitialized: concurrency.NewSignal(),
}
mgr.initializePreexistingResources()

if warn := validateWorkload(&workload); warn != nil {
log.Warnf("Validaing workload: %s", warn)
}

log.Info("Created Workload manager for workload")
log.Infof("Workload: %s", string(data))
log.Infof("Rendered workload: %+v", workload)
return mgr
}

func validateWorkload(workload *Workload) error {
if workload.NetworkWorkload.OpenPortReuseProbability < 0.0 || workload.NetworkWorkload.OpenPortReuseProbability > 1.0 {
corrected := math.Min(1.0, math.Max(0.0, workload.NetworkWorkload.OpenPortReuseProbability))
workload.NetworkWorkload.OpenPortReuseProbability = corrected
return fmt.Errorf("incorrect probability value %.2f for 'openPortReuseProbability', "+
"rounding to %.2f", workload.NetworkWorkload.OpenPortReuseProbability, corrected)
}
// More validation checks can be added in the future
return nil
}

// SetSignalHandlers sets the handlers that will accept runtime data to be mocked from collector
func (w *WorkloadManager) SetSignalHandlers(processPipeline signal.Pipeline, networkManager manager.Manager) {
w.processes = processPipeline
Expand Down Expand Up @@ -334,5 +353,5 @@ func (w *WorkloadManager) initializePreexistingResources() {
go w.manageNetworkPolicy(context.Background(), resource)
}

go w.manageFlows(context.Background(), w.workload.NetworkWorkload)
go w.manageFlows(context.Background())
}
106 changes: 94 additions & 12 deletions sensor/kubernetes/fake/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,91 @@ package fake

import (
"context"
"fmt"
"math/rand"
"time"

"github.com/stackrox/rox/generated/internalapi/sensor"
"github.com/stackrox/rox/generated/storage"
"github.com/stackrox/rox/pkg/concurrency"
"github.com/stackrox/rox/pkg/net"
"github.com/stackrox/rox/pkg/protocompat"
"github.com/stackrox/rox/pkg/sync"
"github.com/stackrox/rox/pkg/timestamp"
"github.com/stackrox/rox/sensor/common/networkflow/manager"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// OriginatorCache stores originators by endpoint key (IP:port) to provide consistent
// process-to-endpoint mappings with configurable reuse probability.
// This improves test realism by simulating how processes typically bind to consistent endpoints.
//
// This cache implements probabilistic reuse to simulate:
// - High probability case: Single process consistently binds to the endpoint; a new process can bind
// to the same endpoint only after sending a close message.
// - Low probability case: New process takes over the endpoint without sending a close message
// (port is shared by multiple processes).
type OriginatorCache struct {
cache map[string]*storage.NetworkProcessUniqueKey
lock sync.RWMutex
}

// NewOriginatorCache creates a new cache for storing endpoint-to-originator mappings.
func NewOriginatorCache() *OriginatorCache {
return &OriginatorCache{
cache: make(map[string]*storage.NetworkProcessUniqueKey),
}
}

// GetOrSetOriginator retrieves a cached originator for the endpoint or generates a new one.
// The portSharingProbability parameter controls the probability of reusing an existing open endpoint by
// a different process versus generating a new endpoint for the process (range: 0.0-1.0).
//
// This caching mechanism improves test realism by ensuring that network endpoints
// typically close the endpoint before changing the originator (e.g., 95% default), while still allowing
// for on-the-fly process changes (e.g., 5% for multiple processes reusing the same port in parallel).
//
// Note that generating multiple 'open' endpoints with the same IP and Port but different originators
// without a 'close' message in between is not a realistic scenario and occurs very rarely in production
// (practically only when a process deliberately abuses the same IP and Port in parallel to a different process).
//
// Setting the portSharingProbability to an unrealistic value (anything higher than 0.05) would cause additional memory-pressure
// in Sensor enrichment pipeline because the deduping key for processes contains the IP, Port and the originator.
// Note that if Sensor sees an open endpoint for <container1, 1.1.1.1:80, nginx> and then another open endpoint for
// <container1, 1.1.1.1:80, apache2>, then Sensor will keep the nginx-entry forever, as there was no 'close' message in between.
//
// The probability logic is explicit and configurable for different testing scenarios.
func (oc *OriginatorCache) GetOrSetOriginator(endpointKey string, containerID string, openPortReuseProbability float64, processPool *ProcessPool) *storage.NetworkProcessUniqueKey {
originator, exists := concurrency.WithRLock2(&oc.lock, func() (*storage.NetworkProcessUniqueKey, bool) {
originator, exists := oc.cache[endpointKey]
return originator, exists
})

if exists && rand.Float64() > openPortReuseProbability {
// Use the previously-known process for the same endpoint.
return originator
}
// Generate a new originator with probability `openPortReuseProbability`.
// This simulates when multiple processes listen on the same port
newOriginator := getRandomOriginator(containerID, processPool)
// We update the cache only on cache miss.
// In case of openPortReuse, we keep the original originator in the cache, as this is a rare event.
if !exists {
concurrency.WithLock(&oc.lock, func() {
oc.cache[endpointKey] = newOriginator
})
}

return newOriginator
}

// Clear removes all cached originators. Used for cleanup between test runs.
func (oc *OriginatorCache) Clear() {
concurrency.WithLock(&oc.lock, func() {
oc.cache = make(map[string]*storage.NetworkProcessUniqueKey)
})
}

func (w *WorkloadManager) getRandomHostConnection(ctx context.Context) (manager.HostNetworkInfo, bool) {
// Return false if the network manager hasn't been initialized yet
if !w.servicesInitialized.IsDone() {
Expand Down Expand Up @@ -106,20 +179,29 @@ func (w *WorkloadManager) getRandomSrcDst() (string, string, bool) {
return src, dst, ok
}

// getRandomNetworkEndpoint generates a network endpoint with consistent originator caching.
// Uses probabilistic caching (configurable via workload.OpenPortReuseProbability) to simulate
// realistic process-to-endpoint binding behavior in containerized environments.
func (w *WorkloadManager) getRandomNetworkEndpoint(containerID string) (*sensor.NetworkEndpoint, bool) {
originator := getRandomOriginator(containerID, w.processPool)

ip, ok := w.ipPool.randomElem()
if !ok {
return nil, false
}

port := rand.Uint32() % 63556

// Create endpoint key from IP and port for caching
endpointKey := fmt.Sprintf("%s:%d", ip, port)

// Get or set originator for this endpoint with configurable reuse probability
originator := w.originatorCache.GetOrSetOriginator(endpointKey, containerID, w.workload.NetworkWorkload.OpenPortReuseProbability, w.processPool)

networkEndpoint := &sensor.NetworkEndpoint{
SocketFamily: sensor.SocketFamily_SOCKET_FAMILY_IPV4,
Protocol: storage.L4Protocol_L4_PROTOCOL_TCP,
ListenAddress: &sensor.NetworkAddress{
AddressData: net.ParseIP(ip).AsNetIP(),
Port: rand.Uint32() % 63556,
Port: port,
},
ContainerId: containerID,
CloseTimestamp: nil,
Expand All @@ -129,10 +211,10 @@ func (w *WorkloadManager) getRandomNetworkEndpoint(containerID string) (*sensor.
return networkEndpoint, ok
}

func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) *sensor.NetworkConnectionInfo {
conns := make([]*sensor.NetworkConnection, 0, workload.BatchSize)
networkEndpoints := make([]*sensor.NetworkEndpoint, 0, workload.BatchSize)
for i := 0; i < workload.BatchSize; i++ {
func (w *WorkloadManager) getFakeNetworkConnectionInfo() *sensor.NetworkConnectionInfo {
conns := make([]*sensor.NetworkConnection, 0, w.workload.NetworkWorkload.BatchSize)
networkEndpoints := make([]*sensor.NetworkEndpoint, 0, w.workload.NetworkWorkload.BatchSize)
for i := 0; i < w.workload.NetworkWorkload.BatchSize; i++ {
src, dst, ok := w.getRandomSrcDst()
if !ok {
continue
Expand All @@ -155,7 +237,7 @@ func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload)
if w.endpointPool.Size < w.endpointPool.Capacity {
w.endpointPool.add(networkEndpoint)
}
if workload.GenerateUnclosedEndpoints {
if w.workload.NetworkWorkload.GenerateUnclosedEndpoints {
// These endpoints will not be closed - i.e., CloseTimestamp will be always nil.
networkEndpoints = append(networkEndpoints, networkEndpoint)
}
Expand All @@ -182,12 +264,12 @@ func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload)
}

// manageFlows should be called via `go manageFlows` as it will run forever
func (w *WorkloadManager) manageFlows(ctx context.Context, workload NetworkWorkload) {
if workload.FlowInterval == 0 {
func (w *WorkloadManager) manageFlows(ctx context.Context) {
if w.workload.NetworkWorkload.FlowInterval == 0 {
return
}
// Pick a valid pod
ticker := time.NewTicker(workload.FlowInterval)
ticker := time.NewTicker(w.workload.NetworkWorkload.FlowInterval)
defer ticker.Stop()

generateExternalIPPool(w.externalIpPool)
Expand All @@ -199,7 +281,7 @@ func (w *WorkloadManager) manageFlows(ctx context.Context, workload NetworkWorkl
case <-ticker.C:
}

networkConnectionInfo := w.getFakeNetworkConnectionInfo(workload)
networkConnectionInfo := w.getFakeNetworkConnectionInfo()
hostConn, ok := w.getRandomHostConnection(ctx)
if !ok {
continue
Expand Down
Loading
Loading