From 19bd59857a92ef959f9b60a5d6af314e9383b6a7 Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Tue, 2 Sep 2025 14:31:06 +0200 Subject: [PATCH 1/7] Empty commit From 4c7f2884aade61faf37f195550eacd92055af7ea Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Tue, 2 Sep 2025 14:35:29 +0200 Subject: [PATCH 2/7] Avoid using globals --- sensor/kubernetes/fake/deployment.go | 20 ++++---- sensor/kubernetes/fake/fake.go | 67 ++++++++++++++++++------- sensor/kubernetes/fake/flows.go | 9 ++-- sensor/kubernetes/fake/labels.go | 4 -- sensor/kubernetes/fake/networkpolicy.go | 6 +-- sensor/kubernetes/fake/service.go | 20 ++++---- 6 files changed, 76 insertions(+), 50 deletions(-) diff --git a/sensor/kubernetes/fake/deployment.go b/sensor/kubernetes/fake/deployment.go index 0bccf3515425f..626a79c3f315c 100644 --- a/sensor/kubernetes/fake/deployment.go +++ b/sensor/kubernetes/fake/deployment.go @@ -20,10 +20,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -var ( - processPool = newProcessPool() -) - // ProcessPool stores processes by containerID using a map type ProcessPool struct { Processes map[string][]*storage.ProcessSignal @@ -108,7 +104,7 @@ func createDeploymentLabels(random bool, numLabels int) map[string]string { return createMap(numLabels) } -func (w *WorkloadManager) getDeployment(workload DeploymentWorkload, idx int, deploymentIDs, replicaSetIDs, podIDs []string) *deploymentResourcesToBeManaged { +func (w *WorkloadManager) getDeployment(workload DeploymentWorkload, idx int, deploymentIDs, replicaSetIDs, podIDs []string, lblPool *labelsPoolPerNamespace) *deploymentResourcesToBeManaged { var labels map[string]string if workload.NumLabels == 0 { labels = createDeploymentLabels(workload.RandomLabels, 3) @@ -126,7 +122,7 @@ func (w *WorkloadManager) getDeployment(workload DeploymentWorkload, idx int, de namespace = "default" } - labelsPool.add(namespace, labels) + lblPool.add(namespace, labels) namespacesWithDeploymentsPool.add(namespace) var serviceAccount string @@ -397,7 +393,7 @@ func (w *WorkloadManager) manageDeployment(ctx context.Context, resources *deplo // The previous function returning means that the deployments, replicaset and pods were all deleted // Now we recreate the objects again for count := 0; resources.workload.NumLifecycles == 0 || count < resources.workload.NumLifecycles; count++ { - resources = w.getDeployment(resources.workload, 0, nil, nil, nil) + resources = w.getDeployment(resources.workload, 0, nil, nil, nil, w.labelsPool) deployment, replicaSet, pods := resources.deployment, resources.replicaSet, resources.pods if _, err := w.client.Kubernetes().AppsV1().Deployments(deployment.Namespace).Create(ctx, deployment, metav1.CreateOptions{}); err != nil { log.Errorf("error creating deployment: %v", err) @@ -495,7 +491,11 @@ func (w *WorkloadManager) managePod(ctx context.Context, deploymentSig *concurre ipPool.remove(pod.Status.PodIP) for _, cs := range pod.Status.ContainerStatuses { - containerPool.remove(getShortContainerID(cs.ContainerID)) + containerID := getShortContainerID(cs.ContainerID) + containerPool.remove(containerID) + // Clean up process and endpoint pools when container is removed + processPool.remove(containerID) + endpointPool.remove(containerID) } podSig.Signal() } @@ -558,7 +558,7 @@ func (w *WorkloadManager) manageProcessesForPod(podSig *concurrency.Signal, podW if processWorkload.ActiveProcesses { for _, process := range getActiveProcesses(containerID) { w.processes.Process(process) - processPool.add(process) + w.processPool.add(process) } } else { // If less than the rate, then it's a bad process @@ -567,7 +567,7 @@ func (w *WorkloadManager) manageProcessesForPod(podSig *concurrency.Signal, podW } else { goodProcess := getGoodProcess(containerID) w.processes.Process(goodProcess) - processPool.add(goodProcess) + w.processPool.add(goodProcess) } } case <-podSig.Done(): diff --git a/sensor/kubernetes/fake/fake.go b/sensor/kubernetes/fake/fake.go index 81724166f4566..fe0176778d553 100644 --- a/sensor/kubernetes/fake/fake.go +++ b/sensor/kubernetes/fake/fake.go @@ -90,10 +90,12 @@ func (c *clientSetImpl) OpenshiftOperator() operatorVersioned.Interface { // WorkloadManager encapsulates running a fake Kubernetes client type WorkloadManager struct { - db *pebble.DB - fakeClient *fake.Clientset - client client.Interface - workload *Workload + db *pebble.DB + fakeClient *fake.Clientset + client client.Interface + processPool *ProcessPool + labelsPool *labelsPoolPerNamespace + workload *Workload // signals services servicesInitialized concurrency.Signal @@ -104,12 +106,18 @@ type WorkloadManager struct { // WorkloadManagerConfig WorkloadManager's configuration type WorkloadManagerConfig struct { workloadFile string + labelsPool *labelsPoolPerNamespace + processPool *ProcessPool + storagePath string } // ConfigDefaults default configuration func ConfigDefaults() *WorkloadManagerConfig { return &WorkloadManagerConfig{ workloadFile: workloadPath, + labelsPool: newLabelsPool(), + processPool: newProcessPool(), + storagePath: env.FakeWorkloadStoragePath.Setting(), } } @@ -119,6 +127,24 @@ func (c *WorkloadManagerConfig) WithWorkloadFile(file string) *WorkloadManagerCo return c } +// WithLabelsPool configures the WorkloadManagerConfig's LabelsPool field +func (c *WorkloadManagerConfig) WithLabelsPool(pool *labelsPoolPerNamespace) *WorkloadManagerConfig { + c.labelsPool = pool + return c +} + +// WithProcessPool configures the WorkloadManagerConfig's ProcessPool field +func (c *WorkloadManagerConfig) WithProcessPool(pool *ProcessPool) *WorkloadManagerConfig { + c.processPool = pool + return c +} + +// WithStoragePath configures the WorkloadManagerConfig's StoragePath field +func (c *WorkloadManagerConfig) WithStoragePath(path string) *WorkloadManagerConfig { + c.storagePath = path + return c +} + // Client returns the mock client func (w *WorkloadManager) Client() client.Interface { return w.client @@ -140,19 +166,20 @@ func NewWorkloadManager(config *WorkloadManagerConfig) *WorkloadManager { } var db *pebble.DB - if storagePath := env.FakeWorkloadStoragePath.Setting(); storagePath != "" { - db, err = pebble.Open(storagePath, &pebble.Options{}) + if config.storagePath != "" { + db, err = pebble.Open(config.storagePath, &pebble.Options{}) if err != nil { log.Panic("could not open id storage") } } - mgr := &WorkloadManager{ db: db, workload: &workload, + processPool: config.processPool, + labelsPool: config.labelsPool, servicesInitialized: concurrency.NewSignal(), } - mgr.initializePreexistingResources() + mgr.initializePreexistingResourcesWithDeps(&workload, config.labelsPool) log.Info("Created Workload manager for workload") log.Infof("Workload: %s", string(data)) @@ -178,10 +205,14 @@ func (w *WorkloadManager) clearActions() { } func (w *WorkloadManager) initializePreexistingResources() { + w.initializePreexistingResourcesWithDeps(w.workload, w.labelsPool) +} + +func (w *WorkloadManager) initializePreexistingResourcesWithDeps(workload *Workload, lblPool *labelsPoolPerNamespace) { var objects []runtime.Object numNamespaces := defaultNamespaceNum - if num := w.workload.NumNamespaces; num != 0 { + if num := workload.NumNamespaces; num != 0 { numNamespaces = num } for _, n := range getNamespaces(numNamespaces, w.getIDsForPrefix(namespacePrefix)) { @@ -189,23 +220,23 @@ func (w *WorkloadManager) initializePreexistingResources() { objects = append(objects, n) } - nodes := w.getNodes(w.workload.NodeWorkload, w.getIDsForPrefix(nodePrefix)) + nodes := w.getNodes(workload.NodeWorkload, w.getIDsForPrefix(nodePrefix)) for _, node := range nodes { w.writeID(nodePrefix, node.UID) objects = append(objects, node) } - labelsPool.matchLabels = w.workload.MatchLabels + lblPool.matchLabels = workload.MatchLabels - objects = append(objects, w.getRBAC(w.workload.RBACWorkload, w.getIDsForPrefix(serviceAccountPrefix), w.getIDsForPrefix(rolesPrefix), w.getIDsForPrefix(rolebindingsPrefix))...) + objects = append(objects, w.getRBAC(workload.RBACWorkload, w.getIDsForPrefix(serviceAccountPrefix), w.getIDsForPrefix(rolesPrefix), w.getIDsForPrefix(rolebindingsPrefix))...) var resources []*deploymentResourcesToBeManaged deploymentIDs := w.getIDsForPrefix(deploymentPrefix) replicaSetIDs := w.getIDsForPrefix(replicaSetPrefix) podIDs := w.getIDsForPrefix(podPrefix) - for _, deploymentWorkload := range w.workload.DeploymentWorkload { + for _, deploymentWorkload := range workload.DeploymentWorkload { for i := 0; i < deploymentWorkload.NumDeployments; i++ { - resource := w.getDeployment(deploymentWorkload, i, deploymentIDs, replicaSetIDs, podIDs) + resource := w.getDeployment(deploymentWorkload, i, deploymentIDs, replicaSetIDs, podIDs, lblPool) resources = append(resources, resource) objects = append(objects, resource.deployment, resource.replicaSet) @@ -215,12 +246,12 @@ func (w *WorkloadManager) initializePreexistingResources() { } } - objects = append(objects, w.getServices(w.workload.ServiceWorkload, w.getIDsForPrefix(servicePrefix))...) + objects = append(objects, w.getServices(workload.ServiceWorkload, w.getIDsForPrefix(servicePrefix), lblPool)...) var npResources []*networkPolicyToBeManaged networkPolicyIDs := w.getIDsForPrefix(networkPolicyPrefix) - for _, npWorkload := range w.workload.NetworkPolicyWorkload { + for _, npWorkload := range workload.NetworkPolicyWorkload { for i := 0; i < npWorkload.NumNetworkPolicies; i++ { - resource := w.getNetworkPolicy(npWorkload, getID(networkPolicyIDs, i)) + resource := w.getNetworkPolicy(npWorkload, getID(networkPolicyIDs, i), lblPool) w.writeID(networkPolicyPrefix, resource.networkPolicy.UID) npResources = append(npResources, resource) @@ -266,5 +297,5 @@ func (w *WorkloadManager) initializePreexistingResources() { go w.manageNetworkPolicy(context.Background(), resource) } - go w.manageFlows(context.Background(), w.workload.NetworkWorkload) + go w.manageFlows(context.Background(), workload.NetworkWorkload) } diff --git a/sensor/kubernetes/fake/flows.go b/sensor/kubernetes/fake/flows.go index 7b550521f0170..6e6a27d5f78fe 100644 --- a/sensor/kubernetes/fake/flows.go +++ b/sensor/kubernetes/fake/flows.go @@ -21,6 +21,7 @@ var ( ipPool = newPool() externalIpPool = newPool() containerPool = newPool() + processPool = newProcessPool() // Global process registry for network flows endpointPool = newEndpointPool() registeredHostConnections []manager.HostNetworkInfo @@ -52,8 +53,6 @@ func (p *pool) remove(val string) { defer p.lock.Unlock() p.pool.Remove(val) - processPool.remove(val) - endpointPool.remove(val) } func (p *pool) randomElem() (string, bool) { @@ -185,7 +184,7 @@ func getNetworkProcessUniqueKeyFromProcess(process *storage.ProcessSignal) *stor return nil } -func getRandomOriginator(containerID string) *storage.NetworkProcessUniqueKey { +func getRandomOriginator(containerID string, pool *ProcessPool) *storage.NetworkProcessUniqueKey { var process *storage.ProcessSignal var percentMatchedProcess float32 = 0.5 p := rand.Float32() @@ -193,7 +192,7 @@ func getRandomOriginator(containerID string) *storage.NetworkProcessUniqueKey { // There is a chance that the process has been filtered out or hasn't gotten to // the central-db for some other reason so this is not a guarantee that the // process is in the central-db - process = processPool.getRandomProcess(containerID) + process = pool.getRandomProcess(containerID) } else { process = getGoodProcess(containerID) } @@ -274,7 +273,7 @@ func (w *WorkloadManager) getRandomSrcDst() (string, string, bool) { } func (w *WorkloadManager) getRandomNetworkEndpoint(containerID string) (*sensor.NetworkEndpoint, bool) { - originator := getRandomOriginator(containerID) + originator := getRandomOriginator(containerID, w.processPool) ip, ok := ipPool.randomElem() if !ok { diff --git a/sensor/kubernetes/fake/labels.go b/sensor/kubernetes/fake/labels.go index bf6a275863196..d206cfc27aa5c 100644 --- a/sensor/kubernetes/fake/labels.go +++ b/sensor/kubernetes/fake/labels.go @@ -6,10 +6,6 @@ import ( "github.com/stackrox/rox/pkg/sync" ) -var ( - labelsPool = newLabelsPool() -) - type labelsPoolPerNamespace struct { pool map[string][]map[string]string matchLabels bool diff --git a/sensor/kubernetes/fake/networkpolicy.go b/sensor/kubernetes/fake/networkpolicy.go index dc5df6888ff07..4a37dac46d8a1 100644 --- a/sensor/kubernetes/fake/networkpolicy.go +++ b/sensor/kubernetes/fake/networkpolicy.go @@ -15,9 +15,9 @@ type networkPolicyToBeManaged struct { networkPolicy *networkingV1.NetworkPolicy } -func (w *WorkloadManager) getNetworkPolicy(workload NetworkPolicyWorkload, id string) *networkPolicyToBeManaged { +func (w *WorkloadManager) getNetworkPolicy(workload NetworkPolicyWorkload, id string, lblPool *labelsPoolPerNamespace) *networkPolicyToBeManaged { namespace := namespacesWithDeploymentsPool.mustGetRandomElem() - labels := labelsPool.randomElem(namespace) + labels := lblPool.randomElem(namespace) np := &networkingV1.NetworkPolicy{ TypeMeta: metav1.TypeMeta{ Kind: "NetworkPolicy", @@ -55,7 +55,7 @@ func (w *WorkloadManager) manageNetworkPolicy(ctx context.Context, resources *ne w.manageNetworkPolicyLifecycle(ctx, resources) for count := 0; resources.workload.NumLifecycles == 0 || count < resources.workload.NumLifecycles; count++ { - resources = w.getNetworkPolicy(resources.workload, "") + resources = w.getNetworkPolicy(resources.workload, "", w.labelsPool) if _, err := w.client.Kubernetes().NetworkingV1().NetworkPolicies(resources.networkPolicy.Namespace).Create(ctx, resources.networkPolicy, metav1.CreateOptions{}); err != nil { log.Errorf("error creating networkPolicy: %v", err) } diff --git a/sensor/kubernetes/fake/service.go b/sensor/kubernetes/fake/service.go index 7289baa030b98..5a75a7c5839e1 100644 --- a/sensor/kubernetes/fake/service.go +++ b/sensor/kubernetes/fake/service.go @@ -26,9 +26,9 @@ func getIPFamily() string { return ipFamilies[rand.Intn(len(ipFamilies))] } -func getClusterIP(id string) *v1.Service { +func getClusterIP(id string, lblPool *labelsPoolPerNamespace) *v1.Service { ns := namespacesWithDeploymentsPool.mustGetRandomElem() - labels := labelsPool.randomElem(ns) + labels := lblPool.randomElem(ns) clusterIP := generateIP() return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -56,9 +56,9 @@ func getClusterIP(id string) *v1.Service { } } -func getNodePort(id string) *v1.Service { +func getNodePort(id string, lblPool *labelsPoolPerNamespace) *v1.Service { ns := namespacesWithDeploymentsPool.mustGetRandomElem() - labels := labelsPool.randomElem(ns) + labels := lblPool.randomElem(ns) clusterIP := generateIP() return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -87,9 +87,9 @@ func getNodePort(id string) *v1.Service { } } -func getLoadBalancer(id string) *v1.Service { +func getLoadBalancer(id string, lblPool *labelsPoolPerNamespace) *v1.Service { ns := namespacesWithDeploymentsPool.mustGetRandomElem() - labels := labelsPool.randomElem(ns) + labels := lblPool.randomElem(ns) clusterIP := generateIP() internalTrafficPolicy := v1.ServiceInternalTrafficPolicyCluster allocateLoadBalancerNodePorts := true @@ -135,20 +135,20 @@ func getLoadBalancer(id string) *v1.Service { } } -func (w *WorkloadManager) getServices(workload ServiceWorkload, ids []string) []runtime.Object { +func (w *WorkloadManager) getServices(workload ServiceWorkload, ids []string, lblPool *labelsPoolPerNamespace) []runtime.Object { objects := make([]runtime.Object, 0, workload.NumClusterIPs+workload.NumNodePorts+workload.NumLoadBalancers) for i := 0; i < workload.NumClusterIPs; i++ { - clusterIP := getClusterIP(getID(ids, i)) + clusterIP := getClusterIP(getID(ids, i), lblPool) w.writeID(servicePrefix, clusterIP.UID) objects = append(objects, clusterIP) } for i := 0; i < workload.NumNodePorts; i++ { - nodePort := getNodePort(getID(ids, i+workload.NumClusterIPs)) + nodePort := getNodePort(getID(ids, i+workload.NumClusterIPs), lblPool) w.writeID(servicePrefix, nodePort.UID) objects = append(objects, nodePort) } for i := 0; i < workload.NumLoadBalancers; i++ { - loadBalancer := getLoadBalancer(getID(ids, i+workload.NumClusterIPs+workload.NumNodePorts)) + loadBalancer := getLoadBalancer(getID(ids, i+workload.NumClusterIPs+workload.NumNodePorts), lblPool) w.writeID(servicePrefix, loadBalancer.UID) objects = append(objects, loadBalancer) } From d5f6a8e114c04a50ffe572894664e3ecde630779 Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Tue, 2 Sep 2025 16:57:16 +0200 Subject: [PATCH 3/7] Move all other globals to WorkloadManager --- sensor/kubernetes/fake/deployment.go | 22 +-- sensor/kubernetes/fake/fake.go | 73 +++++++--- sensor/kubernetes/fake/flows.go | 192 ++------------------------- sensor/kubernetes/fake/flows_test.go | 12 +- sensor/kubernetes/fake/pools.go | 170 ++++++++++++++++++++++++ 5 files changed, 258 insertions(+), 211 deletions(-) create mode 100644 sensor/kubernetes/fake/pools.go diff --git a/sensor/kubernetes/fake/deployment.go b/sensor/kubernetes/fake/deployment.go index 626a79c3f315c..f89b076309b60 100644 --- a/sensor/kubernetes/fake/deployment.go +++ b/sensor/kubernetes/fake/deployment.go @@ -210,7 +210,7 @@ func (w *WorkloadManager) getDeployment(workload DeploymentWorkload, idx int, de var pods []*corev1.Pod for i := 0; i < workload.PodWorkload.NumPods; i++ { - pod := getPod(rs, getID(podIDs, i+idx*workload.PodWorkload.NumPods)) + pod := getPod(rs, getID(podIDs, i+idx*workload.PodWorkload.NumPods), w.ipPool, w.containerPool) w.writeID(podPrefix, pod.UID) pods = append(pods, pod) } @@ -256,7 +256,7 @@ func getReplicaSet(deployment *appsv1.Deployment, id string) *appsv1.ReplicaSet } } -func getPod(replicaSet *appsv1.ReplicaSet, id string) *corev1.Pod { +func getPod(replicaSet *appsv1.ReplicaSet, id string, ipPool *pool, containerPool *pool) *corev1.Pod { pod := &corev1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", @@ -284,10 +284,10 @@ func getPod(replicaSet *appsv1.ReplicaSet, id string) *corev1.Pod { StartTime: &metav1.Time{ Time: time.Now(), }, - PodIP: generateAndAddIPToPool(), + PodIP: generateAndAddIPToPool(ipPool), }, } - populatePodContainerStatuses(pod) + populatePodContainerStatuses(pod, containerPool) return pod } @@ -458,7 +458,7 @@ func (w *WorkloadManager) manageDeploymentLifecycle(ctx context.Context, resourc } } -func populatePodContainerStatuses(pod *corev1.Pod) { +func populatePodContainerStatuses(pod *corev1.Pod, containerPool *pool) { statuses := make([]corev1.ContainerStatus, 0, len(pod.Spec.Containers)) for _, container := range pod.Spec.Containers { status := corev1.ContainerStatus{ @@ -488,14 +488,14 @@ func (w *WorkloadManager) managePod(ctx context.Context, deploymentSig *concurre log.Errorf("error deleting pod: %v", err) } w.deleteID(podPrefix, pod.UID) - ipPool.remove(pod.Status.PodIP) + w.ipPool.remove(pod.Status.PodIP) for _, cs := range pod.Status.ContainerStatuses { containerID := getShortContainerID(cs.ContainerID) - containerPool.remove(containerID) + w.containerPool.remove(containerID) // Clean up process and endpoint pools when container is removed - processPool.remove(containerID) - endpointPool.remove(containerID) + w.processPool.remove(containerID) + w.endpointPool.remove(containerID) } podSig.Signal() } @@ -513,8 +513,8 @@ func (w *WorkloadManager) managePod(ctx context.Context, deploymentSig *concurre // New pod name and UUID pod.Name = randString() pod.UID = newUUID() - pod.Status.PodIP = generateAndAddIPToPool() - populatePodContainerStatuses(pod) + pod.Status.PodIP = generateAndAddIPToPool(w.ipPool) + populatePodContainerStatuses(pod, w.containerPool) if _, err := client.Create(ctx, pod, metav1.CreateOptions{}); err != nil { log.Errorf("error creating pod: %v", err) diff --git a/sensor/kubernetes/fake/fake.go b/sensor/kubernetes/fake/fake.go index fe0176778d553..1411f59d64882 100644 --- a/sensor/kubernetes/fake/fake.go +++ b/sensor/kubernetes/fake/fake.go @@ -90,12 +90,17 @@ func (c *clientSetImpl) OpenshiftOperator() operatorVersioned.Interface { // WorkloadManager encapsulates running a fake Kubernetes client type WorkloadManager struct { - db *pebble.DB - fakeClient *fake.Clientset - client client.Interface - processPool *ProcessPool - labelsPool *labelsPoolPerNamespace - workload *Workload + db *pebble.DB + fakeClient *fake.Clientset + client client.Interface + processPool *ProcessPool + labelsPool *labelsPoolPerNamespace + endpointPool *EndpointPool + ipPool *pool + externalIpPool *pool + containerPool *pool + registeredHostConnections []manager.HostNetworkInfo + workload *Workload // signals services servicesInitialized concurrency.Signal @@ -105,19 +110,27 @@ type WorkloadManager struct { // WorkloadManagerConfig WorkloadManager's configuration type WorkloadManagerConfig struct { - workloadFile string - labelsPool *labelsPoolPerNamespace - processPool *ProcessPool - storagePath string + workloadFile string + labelsPool *labelsPoolPerNamespace + processPool *ProcessPool + endpointPool *EndpointPool + ipPool *pool + externalIpPool *pool + containerPool *pool + storagePath string } // ConfigDefaults default configuration func ConfigDefaults() *WorkloadManagerConfig { return &WorkloadManagerConfig{ - workloadFile: workloadPath, - labelsPool: newLabelsPool(), - processPool: newProcessPool(), - storagePath: env.FakeWorkloadStoragePath.Setting(), + workloadFile: workloadPath, + labelsPool: newLabelsPool(), + processPool: newProcessPool(), + endpointPool: newEndpointPool(), + ipPool: newPool(), + externalIpPool: newPool(), + containerPool: newPool(), + storagePath: env.FakeWorkloadStoragePath.Setting(), } } @@ -139,6 +152,30 @@ func (c *WorkloadManagerConfig) WithProcessPool(pool *ProcessPool) *WorkloadMana return c } +// WithEndpointPool configures the WorkloadManagerConfig's EndpointPool field +func (c *WorkloadManagerConfig) WithEndpointPool(pool *EndpointPool) *WorkloadManagerConfig { + c.endpointPool = pool + return c +} + +// WithIpPool configures the WorkloadManagerConfig's IpPool field +func (c *WorkloadManagerConfig) WithIpPool(pool *pool) *WorkloadManagerConfig { + c.ipPool = pool + return c +} + +// WithExternalIpPool configures the WorkloadManagerConfig's ExternalIpPool field +func (c *WorkloadManagerConfig) WithExternalIpPool(pool *pool) *WorkloadManagerConfig { + c.externalIpPool = pool + return c +} + +// WithContainerPool configures the WorkloadManagerConfig's ContainerPool field +func (c *WorkloadManagerConfig) WithContainerPool(pool *pool) *WorkloadManagerConfig { + c.containerPool = pool + return c +} + // WithStoragePath configures the WorkloadManagerConfig's StoragePath field func (c *WorkloadManagerConfig) WithStoragePath(path string) *WorkloadManagerConfig { c.storagePath = path @@ -177,6 +214,10 @@ func NewWorkloadManager(config *WorkloadManagerConfig) *WorkloadManager { workload: &workload, processPool: config.processPool, labelsPool: config.labelsPool, + endpointPool: config.endpointPool, + ipPool: config.ipPool, + externalIpPool: config.externalIpPool, + containerPool: config.containerPool, servicesInitialized: concurrency.NewSignal(), } mgr.initializePreexistingResourcesWithDeps(&workload, config.labelsPool) @@ -204,10 +245,6 @@ func (w *WorkloadManager) clearActions() { } } -func (w *WorkloadManager) initializePreexistingResources() { - w.initializePreexistingResourcesWithDeps(w.workload, w.labelsPool) -} - func (w *WorkloadManager) initializePreexistingResourcesWithDeps(workload *Workload, lblPool *labelsPoolPerNamespace) { var objects []runtime.Object diff --git a/sensor/kubernetes/fake/flows.go b/sensor/kubernetes/fake/flows.go index 6e6a27d5f78fe..39c2e5071979f 100644 --- a/sensor/kubernetes/fake/flows.go +++ b/sensor/kubernetes/fake/flows.go @@ -2,7 +2,6 @@ package fake import ( "context" - "fmt" "math/rand" "time" @@ -10,154 +9,17 @@ import ( "github.com/stackrox/rox/generated/storage" "github.com/stackrox/rox/pkg/net" "github.com/stackrox/rox/pkg/protocompat" - "github.com/stackrox/rox/pkg/set" - "github.com/stackrox/rox/pkg/sync" "github.com/stackrox/rox/pkg/timestamp" "github.com/stackrox/rox/sensor/common/networkflow/manager" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -var ( - ipPool = newPool() - externalIpPool = newPool() - containerPool = newPool() - processPool = newProcessPool() // Global process registry for network flows - endpointPool = newEndpointPool() - - registeredHostConnections []manager.HostNetworkInfo -) - -type pool struct { - pool set.StringSet - lock sync.RWMutex -} - -func newPool() *pool { - return &pool{ - pool: set.NewStringSet(), - } -} - -func (p *pool) add(val string) bool { - p.lock.Lock() - defer p.lock.Unlock() - - if added := p.pool.Add(val); !added { - return false - } - return true -} - -func (p *pool) remove(val string) { - p.lock.Lock() - defer p.lock.Unlock() - - p.pool.Remove(val) -} - -func (p *pool) randomElem() (string, bool) { - p.lock.RLock() - defer p.lock.RUnlock() - val := p.pool.GetArbitraryElem() - return val, val != "" -} - -func (p *pool) mustGetRandomElem() string { - p.lock.RLock() - defer p.lock.RUnlock() - val := p.pool.GetArbitraryElem() - if val == "" { - panic("not expecting an empty pool") - } - return val -} - -// EndpointPool stores endpoints by containerID using a map -type EndpointPool struct { - Endpoints map[string][]*sensor.NetworkEndpoint - EndpointsToBeClosed []*sensor.NetworkEndpoint - Capacity int - Size int - lock sync.RWMutex -} - -func newEndpointPool() *EndpointPool { - return &EndpointPool{ - Endpoints: make(map[string][]*sensor.NetworkEndpoint), - EndpointsToBeClosed: make([]*sensor.NetworkEndpoint, 0), - Capacity: 10000, - Size: 0, - } -} - -func (p *EndpointPool) add(val *sensor.NetworkEndpoint) { - p.lock.Lock() - defer p.lock.Unlock() - - if p.Size < p.Capacity { - p.Endpoints[val.ContainerId] = append(p.Endpoints[val.ContainerId], val) - p.Size++ - } -} - -func (p *EndpointPool) remove(containerID string) { - p.lock.Lock() - defer p.lock.Unlock() - - p.EndpointsToBeClosed = append(p.EndpointsToBeClosed, p.Endpoints[containerID]...) - p.Size -= len(p.Endpoints[containerID]) - delete(p.Endpoints, containerID) -} - -func (p *EndpointPool) clearEndpointsToBeClosed() { - p.lock.Lock() - defer p.lock.Unlock() - - p.EndpointsToBeClosed = []*sensor.NetworkEndpoint{} -} - -func generateIP() string { - return fmt.Sprintf("10.%d.%d.%d", rand.Intn(256), rand.Intn(256), rand.Intn(256)) -} - -// Generate IP addresses from 11.0.0.0 to 99.255.255.255 which are all public -func generateExternalIP() string { - return fmt.Sprintf("%d.%d.%d.%d", rand.Intn(89)+11, rand.Intn(256), rand.Intn(256), rand.Intn(256)) -} - -// We want to reuse some external IPs, so we test the cases where multiple -// entities connect to the same external IP, but we also want many external IPs -// that are only used once. -func generateExternalIPPool() { - ip := []int{11, 0, 0, 0} - for range 1000 { - for j := 3; j >= 0; j-- { - ip[j]++ - if ip[j] > 255 { - ip[j] = 0 - } else { - break - } - } - ipString := fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]) - externalIpPool.add(ipString) - } -} - -func generateAndAddIPToPool() string { - ip := generateIP() - for !ipPool.add(ip) { - ip = generateIP() - } - return ip -} - func (w *WorkloadManager) getRandomHostConnection(ctx context.Context) (manager.HostNetworkInfo, bool) { // Return false if the network manager hasn't been initialized yet if !w.servicesInitialized.IsDone() { return nil, false } - if len(registeredHostConnections) == 0 { + if len(w.registeredHostConnections) == 0 { // Initialize the host connections nodeResp, err := w.fakeClient.CoreV1().Nodes().List(ctx, v1.ListOptions{}) if err != nil { @@ -166,38 +28,10 @@ func (w *WorkloadManager) getRandomHostConnection(ctx context.Context) (manager. } for _, node := range nodeResp.Items { info, _ := w.networkManager.RegisterCollector(node.Name) - registeredHostConnections = append(registeredHostConnections, info) - } - } - return registeredHostConnections[rand.Intn(len(registeredHostConnections))], true -} - -func getNetworkProcessUniqueKeyFromProcess(process *storage.ProcessSignal) *storage.NetworkProcessUniqueKey { - if process != nil { - return &storage.NetworkProcessUniqueKey{ - ProcessName: process.Name, - ProcessExecFilePath: process.ExecFilePath, - ProcessArgs: process.Args, + w.registeredHostConnections = append(w.registeredHostConnections, info) } } - - return nil -} - -func getRandomOriginator(containerID string, pool *ProcessPool) *storage.NetworkProcessUniqueKey { - var process *storage.ProcessSignal - var percentMatchedProcess float32 = 0.5 - p := rand.Float32() - if p < percentMatchedProcess { - // There is a chance that the process has been filtered out or hasn't gotten to - // the central-db for some other reason so this is not a guarantee that the - // process is in the central-db - process = pool.getRandomProcess(containerID) - } else { - process = getGoodProcess(containerID) - } - - return getNetworkProcessUniqueKeyFromProcess(process) + return w.registeredHostConnections[rand.Intn(len(w.registeredHostConnections))], true } func makeNetworkConnection(src string, dst string, containerID string, closeTimestamp time.Time) *sensor.NetworkConnection { @@ -235,10 +69,10 @@ func (w *WorkloadManager) getRandomInternalExternalIP() (string, bool, bool) { internal := rand.Intn(100) < 80 if internal { - ip, ok = ipPool.randomElem() + ip, ok = w.ipPool.randomElem() } else { if rand.Intn(100) < 50 { - ip, ok = externalIpPool.randomElem() + ip, ok = w.externalIpPool.randomElem() } else { ip = generateExternalIP() ok = true @@ -263,7 +97,7 @@ func (w *WorkloadManager) getRandomSrcDst() (string, string, bool) { if internal { dst, _, ok = w.getRandomInternalExternalIP() } else { - dst, ok = ipPool.randomElem() + dst, ok = w.ipPool.randomElem() if !ok { log.Error("Found no IPs in the internal pool") } @@ -275,7 +109,7 @@ func (w *WorkloadManager) getRandomSrcDst() (string, string, bool) { func (w *WorkloadManager) getRandomNetworkEndpoint(containerID string) (*sensor.NetworkEndpoint, bool) { originator := getRandomOriginator(containerID, w.processPool) - ip, ok := ipPool.randomElem() + ip, ok := w.ipPool.randomElem() if !ok { return nil, false } @@ -304,7 +138,7 @@ func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) continue } - containerID, ok := containerPool.randomElem() + containerID, ok := w.containerPool.randomElem() if !ok { log.Error("Found no containers in pool") continue @@ -318,8 +152,8 @@ func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) } conns = append(conns, conn) - if endpointPool.Size < endpointPool.Capacity { - endpointPool.add(networkEndpoint) + if w.endpointPool.Size < w.endpointPool.Capacity { + w.endpointPool.add(networkEndpoint) } if workload.GenerateUnclosedEndpoints { // These endpoints will not be closed - i.e., CloseTimestamp will be always nil. @@ -327,7 +161,7 @@ func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) } } - for _, endpoint := range endpointPool.EndpointsToBeClosed { + for _, endpoint := range w.endpointPool.EndpointsToBeClosed { networkEndpoint := endpoint closeTS, err := protocompat.ConvertTimeToTimestampOrError(time.Now().Add(-5 * time.Second)) if err != nil { @@ -338,7 +172,7 @@ func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) } } - endpointPool.clearEndpointsToBeClosed() + w.endpointPool.clearEndpointsToBeClosed() return &sensor.NetworkConnectionInfo{ UpdatedConnections: conns, @@ -356,7 +190,7 @@ func (w *WorkloadManager) manageFlows(ctx context.Context, workload NetworkWorkl ticker := time.NewTicker(workload.FlowInterval) defer ticker.Stop() - generateExternalIPPool() + generateExternalIPPool(w.externalIpPool) for { select { diff --git a/sensor/kubernetes/fake/flows_test.go b/sensor/kubernetes/fake/flows_test.go index 1d8682bb38e6a..eb8bda7fdf45e 100644 --- a/sensor/kubernetes/fake/flows_test.go +++ b/sensor/kubernetes/fake/flows_test.go @@ -16,16 +16,22 @@ func TestFlowsSuite(t *testing.T) { } func (s *flowsSuite) TestGetRandomInternalExternalIP() { - var w WorkloadManager + w := &WorkloadManager{ + endpointPool: newEndpointPool(), + processPool: newProcessPool(), + ipPool: newPool(), + externalIpPool: newPool(), + containerPool: newPool(), + } _, _, ok := w.getRandomSrcDst() s.False(ok) for range 1000 { - generateAndAddIPToPool() + generateAndAddIPToPool(w.ipPool) } - generateExternalIPPool() + generateExternalIPPool(w.externalIpPool) for range 1000 { ip, internal, ok := w.getRandomInternalExternalIP() diff --git a/sensor/kubernetes/fake/pools.go b/sensor/kubernetes/fake/pools.go new file mode 100644 index 0000000000000..ca151f0127889 --- /dev/null +++ b/sensor/kubernetes/fake/pools.go @@ -0,0 +1,170 @@ +package fake + +import ( + "fmt" + "math/rand" + + "github.com/stackrox/rox/generated/internalapi/sensor" + "github.com/stackrox/rox/generated/storage" + "github.com/stackrox/rox/pkg/set" + "github.com/stackrox/rox/pkg/sync" +) + +// pool is a thread-safe string pool using set.StringSet +type pool struct { + pool set.StringSet + lock sync.RWMutex +} + +func newPool() *pool { + return &pool{ + pool: set.NewStringSet(), + } +} + +func (p *pool) add(val string) bool { + p.lock.Lock() + defer p.lock.Unlock() + + if added := p.pool.Add(val); !added { + return false + } + return true +} + +func (p *pool) remove(val string) { + p.lock.Lock() + defer p.lock.Unlock() + + p.pool.Remove(val) +} + +func (p *pool) randomElem() (string, bool) { + p.lock.RLock() + defer p.lock.RUnlock() + val := p.pool.GetArbitraryElem() + if val == "" { + return "", false + } + return val, true +} + +func (p *pool) mustGetRandomElem() string { + p.lock.RLock() + defer p.lock.RUnlock() + val := p.pool.GetArbitraryElem() + if val == "" { + panic("not expecting an empty pool") + } + return val +} + +// EndpointPool stores endpoints by containerID using a map +type EndpointPool struct { + Endpoints map[string][]*sensor.NetworkEndpoint + EndpointsToBeClosed []*sensor.NetworkEndpoint + Capacity int + Size int + lock sync.RWMutex +} + +func newEndpointPool() *EndpointPool { + return &EndpointPool{ + Endpoints: make(map[string][]*sensor.NetworkEndpoint), + EndpointsToBeClosed: make([]*sensor.NetworkEndpoint, 0), + Capacity: 10000, + Size: 0, + } +} + +func (p *EndpointPool) add(val *sensor.NetworkEndpoint) { + p.lock.Lock() + defer p.lock.Unlock() + + if p.Size < p.Capacity { + p.Endpoints[val.ContainerId] = append(p.Endpoints[val.ContainerId], val) + p.Size++ + } +} + +func (p *EndpointPool) remove(containerID string) { + p.lock.Lock() + defer p.lock.Unlock() + + p.EndpointsToBeClosed = append(p.EndpointsToBeClosed, p.Endpoints[containerID]...) + p.Size -= len(p.Endpoints[containerID]) + delete(p.Endpoints, containerID) +} + +func (p *EndpointPool) clearEndpointsToBeClosed() { + p.lock.Lock() + defer p.lock.Unlock() + + p.EndpointsToBeClosed = []*sensor.NetworkEndpoint{} +} + +// IP generation and pool manipulation functions + +func generateIP() string { + return fmt.Sprintf("10.%d.%d.%d", rand.Intn(256), rand.Intn(256), rand.Intn(256)) +} + +// Generate IP addresses from 11.0.0.0 to 99.255.255.255 which are all public +func generateExternalIP() string { + return fmt.Sprintf("%d.%d.%d.%d", rand.Intn(89)+11, rand.Intn(256), rand.Intn(256), rand.Intn(256)) +} + +// We want to reuse some external IPs, so we test the cases where multiple +// entities connect to the same external IP, but we also want many external IPs +// that are only used once. +func generateExternalIPPool(pool *pool) { + ip := []int{11, 0, 0, 0} + for range 1000 { + for j := 3; j >= 0; j-- { + ip[j]++ + if ip[j] > 255 { + ip[j] = 0 + } else { + break + } + } + ipString := fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]) + pool.add(ipString) + } +} + +func generateAndAddIPToPool(ipPool *pool) string { + ip := generateIP() + for !ipPool.add(ip) { + ip = generateIP() + } + return ip +} + +func getRandomOriginator(containerID string, pool *ProcessPool) *storage.NetworkProcessUniqueKey { + var process *storage.ProcessSignal + var percentMatchedProcess float32 = 0.5 + p := rand.Float32() + if p < percentMatchedProcess { + // There is a chance that the process has been filtered out or hasn't gotten to + // the central-db for some other reason so this is not a guarantee that the + // process is in the central-db + process = pool.getRandomProcess(containerID) + } else { + process = getGoodProcess(containerID) + } + + return getNetworkProcessUniqueKeyFromProcess(process) +} + +func getNetworkProcessUniqueKeyFromProcess(process *storage.ProcessSignal) *storage.NetworkProcessUniqueKey { + if process != nil { + return &storage.NetworkProcessUniqueKey{ + ProcessName: process.Name, + ProcessExecFilePath: process.ExecFilePath, + ProcessArgs: process.Args, + } + } + + return nil +} From 01f8d8db7e1a0cb145a7ba33ef0dfc6208c7839d Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Thu, 4 Sep 2025 13:06:37 +0200 Subject: [PATCH 4/7] Refactor: Extract func removeContainerAndAssociatedObjects --- sensor/kubernetes/fake/deployment.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sensor/kubernetes/fake/deployment.go b/sensor/kubernetes/fake/deployment.go index f89b076309b60..e6c5a2d9bb8b5 100644 --- a/sensor/kubernetes/fake/deployment.go +++ b/sensor/kubernetes/fake/deployment.go @@ -492,10 +492,7 @@ func (w *WorkloadManager) managePod(ctx context.Context, deploymentSig *concurre for _, cs := range pod.Status.ContainerStatuses { containerID := getShortContainerID(cs.ContainerID) - w.containerPool.remove(containerID) - // Clean up process and endpoint pools when container is removed - w.processPool.remove(containerID) - w.endpointPool.remove(containerID) + w.removeContainerAndAssociatedObjects(containerID) } podSig.Signal() } @@ -527,6 +524,13 @@ func (w *WorkloadManager) managePod(ctx context.Context, deploymentSig *concurre } } +func (w *WorkloadManager) removeContainerAndAssociatedObjects(containerID string) { + w.containerPool.remove(containerID) + // Clean up process and endpoint pools when container is removed + w.processPool.remove(containerID) + w.endpointPool.remove(containerID) +} + func getShortContainerID(id string) string { _, runtimeID := k8sutil.ParseContainerRuntimeString(id) return containerid.ShortContainerIDFromInstanceID(runtimeID) From 240636c17f33beaebe6bd0074aea027c1de091eb Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Fri, 5 Sep 2025 11:38:59 +0200 Subject: [PATCH 5/7] Remove duplicated func args from initializePreexistingResourcesWithDeps --- sensor/kubernetes/fake/fake.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sensor/kubernetes/fake/fake.go b/sensor/kubernetes/fake/fake.go index 1411f59d64882..b8c2c4f41894e 100644 --- a/sensor/kubernetes/fake/fake.go +++ b/sensor/kubernetes/fake/fake.go @@ -220,7 +220,7 @@ func NewWorkloadManager(config *WorkloadManagerConfig) *WorkloadManager { containerPool: config.containerPool, servicesInitialized: concurrency.NewSignal(), } - mgr.initializePreexistingResourcesWithDeps(&workload, config.labelsPool) + mgr.initializePreexistingResourcesWithDeps() log.Info("Created Workload manager for workload") log.Infof("Workload: %s", string(data)) @@ -245,11 +245,11 @@ func (w *WorkloadManager) clearActions() { } } -func (w *WorkloadManager) initializePreexistingResourcesWithDeps(workload *Workload, lblPool *labelsPoolPerNamespace) { +func (w *WorkloadManager) initializePreexistingResourcesWithDeps() { var objects []runtime.Object numNamespaces := defaultNamespaceNum - if num := workload.NumNamespaces; num != 0 { + if num := w.workload.NumNamespaces; num != 0 { numNamespaces = num } for _, n := range getNamespaces(numNamespaces, w.getIDsForPrefix(namespacePrefix)) { @@ -257,23 +257,23 @@ func (w *WorkloadManager) initializePreexistingResourcesWithDeps(workload *Workl objects = append(objects, n) } - nodes := w.getNodes(workload.NodeWorkload, w.getIDsForPrefix(nodePrefix)) + nodes := w.getNodes(w.workload.NodeWorkload, w.getIDsForPrefix(nodePrefix)) for _, node := range nodes { w.writeID(nodePrefix, node.UID) objects = append(objects, node) } - lblPool.matchLabels = workload.MatchLabels + w.labelsPool.matchLabels = w.workload.MatchLabels - objects = append(objects, w.getRBAC(workload.RBACWorkload, w.getIDsForPrefix(serviceAccountPrefix), w.getIDsForPrefix(rolesPrefix), w.getIDsForPrefix(rolebindingsPrefix))...) + objects = append(objects, w.getRBAC(w.workload.RBACWorkload, w.getIDsForPrefix(serviceAccountPrefix), w.getIDsForPrefix(rolesPrefix), w.getIDsForPrefix(rolebindingsPrefix))...) var resources []*deploymentResourcesToBeManaged deploymentIDs := w.getIDsForPrefix(deploymentPrefix) replicaSetIDs := w.getIDsForPrefix(replicaSetPrefix) podIDs := w.getIDsForPrefix(podPrefix) - for _, deploymentWorkload := range workload.DeploymentWorkload { + for _, deploymentWorkload := range w.workload.DeploymentWorkload { for i := 0; i < deploymentWorkload.NumDeployments; i++ { - resource := w.getDeployment(deploymentWorkload, i, deploymentIDs, replicaSetIDs, podIDs, lblPool) + resource := w.getDeployment(deploymentWorkload, i, deploymentIDs, replicaSetIDs, podIDs, w.labelsPool) resources = append(resources, resource) objects = append(objects, resource.deployment, resource.replicaSet) @@ -283,12 +283,12 @@ func (w *WorkloadManager) initializePreexistingResourcesWithDeps(workload *Workl } } - objects = append(objects, w.getServices(workload.ServiceWorkload, w.getIDsForPrefix(servicePrefix), lblPool)...) + objects = append(objects, w.getServices(w.workload.ServiceWorkload, w.getIDsForPrefix(servicePrefix), w.labelsPool)...) var npResources []*networkPolicyToBeManaged networkPolicyIDs := w.getIDsForPrefix(networkPolicyPrefix) - for _, npWorkload := range workload.NetworkPolicyWorkload { + for _, npWorkload := range w.workload.NetworkPolicyWorkload { for i := 0; i < npWorkload.NumNetworkPolicies; i++ { - resource := w.getNetworkPolicy(npWorkload, getID(networkPolicyIDs, i), lblPool) + resource := w.getNetworkPolicy(npWorkload, getID(networkPolicyIDs, i), w.labelsPool) w.writeID(networkPolicyPrefix, resource.networkPolicy.UID) npResources = append(npResources, resource) @@ -334,5 +334,5 @@ func (w *WorkloadManager) initializePreexistingResourcesWithDeps(workload *Workl go w.manageNetworkPolicy(context.Background(), resource) } - go w.manageFlows(context.Background(), workload.NetworkWorkload) + go w.manageFlows(context.Background(), w.workload.NetworkWorkload) } From 7e33a34faaf74026e00cdc44d98a266b145af523 Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Fri, 5 Sep 2025 12:19:28 +0200 Subject: [PATCH 6/7] Make funcs use internal fields where possible Changed: func (w *WorkloadManager) getServices(workload ServiceWorkload, ids []string) // was: (..., lblPool *labelsPoolPerNamespace) func (w *WorkloadManager) getNetworkPolicy(workload NetworkPolicyWorkload, id string) // was: (..., lblPool *labelsPoolPerNamespace) func (w *WorkloadManager) getDeployment(workload DeploymentWorkload, idx int, deploymentIDs, replicaSetIDs, podIDs []string) // was: (..., lblPool *labelsPoolPerNamespace) Kept: func getClusterIP(id string, lblPool *labelsPoolPerNamespace) func getNodePort(id string, lblPool *labelsPoolPerNamespace) func getLoadBalancer(id string, lblPool *labelsPoolPerNamespace) func getPod(replicaSet *appsv1.ReplicaSet, id string, ipPool *pool, containerPool *pool) func populatePodContainerStatuses(pod *corev1.Pod, containerPool *pool) func generateAndAddIPToPool(ipPool *pool) string --- sensor/kubernetes/fake/deployment.go | 6 +++--- sensor/kubernetes/fake/fake.go | 6 +++--- sensor/kubernetes/fake/networkpolicy.go | 6 +++--- sensor/kubernetes/fake/service.go | 8 ++++---- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/sensor/kubernetes/fake/deployment.go b/sensor/kubernetes/fake/deployment.go index e6c5a2d9bb8b5..55943382619ca 100644 --- a/sensor/kubernetes/fake/deployment.go +++ b/sensor/kubernetes/fake/deployment.go @@ -104,7 +104,7 @@ func createDeploymentLabels(random bool, numLabels int) map[string]string { return createMap(numLabels) } -func (w *WorkloadManager) getDeployment(workload DeploymentWorkload, idx int, deploymentIDs, replicaSetIDs, podIDs []string, lblPool *labelsPoolPerNamespace) *deploymentResourcesToBeManaged { +func (w *WorkloadManager) getDeployment(workload DeploymentWorkload, idx int, deploymentIDs, replicaSetIDs, podIDs []string) *deploymentResourcesToBeManaged { var labels map[string]string if workload.NumLabels == 0 { labels = createDeploymentLabels(workload.RandomLabels, 3) @@ -122,7 +122,7 @@ func (w *WorkloadManager) getDeployment(workload DeploymentWorkload, idx int, de namespace = "default" } - lblPool.add(namespace, labels) + w.labelsPool.add(namespace, labels) namespacesWithDeploymentsPool.add(namespace) var serviceAccount string @@ -393,7 +393,7 @@ func (w *WorkloadManager) manageDeployment(ctx context.Context, resources *deplo // The previous function returning means that the deployments, replicaset and pods were all deleted // Now we recreate the objects again for count := 0; resources.workload.NumLifecycles == 0 || count < resources.workload.NumLifecycles; count++ { - resources = w.getDeployment(resources.workload, 0, nil, nil, nil, w.labelsPool) + resources = w.getDeployment(resources.workload, 0, nil, nil, nil) deployment, replicaSet, pods := resources.deployment, resources.replicaSet, resources.pods if _, err := w.client.Kubernetes().AppsV1().Deployments(deployment.Namespace).Create(ctx, deployment, metav1.CreateOptions{}); err != nil { log.Errorf("error creating deployment: %v", err) diff --git a/sensor/kubernetes/fake/fake.go b/sensor/kubernetes/fake/fake.go index b8c2c4f41894e..b2e55f7d85b22 100644 --- a/sensor/kubernetes/fake/fake.go +++ b/sensor/kubernetes/fake/fake.go @@ -273,7 +273,7 @@ func (w *WorkloadManager) initializePreexistingResourcesWithDeps() { podIDs := w.getIDsForPrefix(podPrefix) for _, deploymentWorkload := range w.workload.DeploymentWorkload { for i := 0; i < deploymentWorkload.NumDeployments; i++ { - resource := w.getDeployment(deploymentWorkload, i, deploymentIDs, replicaSetIDs, podIDs, w.labelsPool) + resource := w.getDeployment(deploymentWorkload, i, deploymentIDs, replicaSetIDs, podIDs) resources = append(resources, resource) objects = append(objects, resource.deployment, resource.replicaSet) @@ -283,12 +283,12 @@ func (w *WorkloadManager) initializePreexistingResourcesWithDeps() { } } - objects = append(objects, w.getServices(w.workload.ServiceWorkload, w.getIDsForPrefix(servicePrefix), w.labelsPool)...) + objects = append(objects, w.getServices(w.workload.ServiceWorkload, w.getIDsForPrefix(servicePrefix))...) var npResources []*networkPolicyToBeManaged networkPolicyIDs := w.getIDsForPrefix(networkPolicyPrefix) for _, npWorkload := range w.workload.NetworkPolicyWorkload { for i := 0; i < npWorkload.NumNetworkPolicies; i++ { - resource := w.getNetworkPolicy(npWorkload, getID(networkPolicyIDs, i), w.labelsPool) + resource := w.getNetworkPolicy(npWorkload, getID(networkPolicyIDs, i)) w.writeID(networkPolicyPrefix, resource.networkPolicy.UID) npResources = append(npResources, resource) diff --git a/sensor/kubernetes/fake/networkpolicy.go b/sensor/kubernetes/fake/networkpolicy.go index 4a37dac46d8a1..1c86e6ae6d2a3 100644 --- a/sensor/kubernetes/fake/networkpolicy.go +++ b/sensor/kubernetes/fake/networkpolicy.go @@ -15,9 +15,9 @@ type networkPolicyToBeManaged struct { networkPolicy *networkingV1.NetworkPolicy } -func (w *WorkloadManager) getNetworkPolicy(workload NetworkPolicyWorkload, id string, lblPool *labelsPoolPerNamespace) *networkPolicyToBeManaged { +func (w *WorkloadManager) getNetworkPolicy(workload NetworkPolicyWorkload, id string) *networkPolicyToBeManaged { namespace := namespacesWithDeploymentsPool.mustGetRandomElem() - labels := lblPool.randomElem(namespace) + labels := w.labelsPool.randomElem(namespace) np := &networkingV1.NetworkPolicy{ TypeMeta: metav1.TypeMeta{ Kind: "NetworkPolicy", @@ -55,7 +55,7 @@ func (w *WorkloadManager) manageNetworkPolicy(ctx context.Context, resources *ne w.manageNetworkPolicyLifecycle(ctx, resources) for count := 0; resources.workload.NumLifecycles == 0 || count < resources.workload.NumLifecycles; count++ { - resources = w.getNetworkPolicy(resources.workload, "", w.labelsPool) + resources = w.getNetworkPolicy(resources.workload, "") if _, err := w.client.Kubernetes().NetworkingV1().NetworkPolicies(resources.networkPolicy.Namespace).Create(ctx, resources.networkPolicy, metav1.CreateOptions{}); err != nil { log.Errorf("error creating networkPolicy: %v", err) } diff --git a/sensor/kubernetes/fake/service.go b/sensor/kubernetes/fake/service.go index 5a75a7c5839e1..de7b9b4ef79a0 100644 --- a/sensor/kubernetes/fake/service.go +++ b/sensor/kubernetes/fake/service.go @@ -135,20 +135,20 @@ func getLoadBalancer(id string, lblPool *labelsPoolPerNamespace) *v1.Service { } } -func (w *WorkloadManager) getServices(workload ServiceWorkload, ids []string, lblPool *labelsPoolPerNamespace) []runtime.Object { +func (w *WorkloadManager) getServices(workload ServiceWorkload, ids []string) []runtime.Object { objects := make([]runtime.Object, 0, workload.NumClusterIPs+workload.NumNodePorts+workload.NumLoadBalancers) for i := 0; i < workload.NumClusterIPs; i++ { - clusterIP := getClusterIP(getID(ids, i), lblPool) + clusterIP := getClusterIP(getID(ids, i), w.labelsPool) w.writeID(servicePrefix, clusterIP.UID) objects = append(objects, clusterIP) } for i := 0; i < workload.NumNodePorts; i++ { - nodePort := getNodePort(getID(ids, i+workload.NumClusterIPs), lblPool) + nodePort := getNodePort(getID(ids, i+workload.NumClusterIPs), w.labelsPool) w.writeID(servicePrefix, nodePort.UID) objects = append(objects, nodePort) } for i := 0; i < workload.NumLoadBalancers; i++ { - loadBalancer := getLoadBalancer(getID(ids, i+workload.NumClusterIPs+workload.NumNodePorts), lblPool) + loadBalancer := getLoadBalancer(getID(ids, i+workload.NumClusterIPs+workload.NumNodePorts), w.labelsPool) w.writeID(servicePrefix, loadBalancer.UID) objects = append(objects, loadBalancer) } From 5c376e976d39fccbda6aeee2c6276a1d64ce5ca1 Mon Sep 17 00:00:00 2001 From: Piotr Rygielski <114479+vikin91@users.noreply.github.com> Date: Fri, 5 Sep 2025 12:35:51 +0200 Subject: [PATCH 7/7] Self-review --- sensor/kubernetes/fake/deployment.go | 3 +-- sensor/kubernetes/fake/fake.go | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sensor/kubernetes/fake/deployment.go b/sensor/kubernetes/fake/deployment.go index 55943382619ca..d709219724f22 100644 --- a/sensor/kubernetes/fake/deployment.go +++ b/sensor/kubernetes/fake/deployment.go @@ -491,8 +491,7 @@ func (w *WorkloadManager) managePod(ctx context.Context, deploymentSig *concurre w.ipPool.remove(pod.Status.PodIP) for _, cs := range pod.Status.ContainerStatuses { - containerID := getShortContainerID(cs.ContainerID) - w.removeContainerAndAssociatedObjects(containerID) + w.removeContainerAndAssociatedObjects(getShortContainerID(cs.ContainerID)) } podSig.Signal() } diff --git a/sensor/kubernetes/fake/fake.go b/sensor/kubernetes/fake/fake.go index b2e55f7d85b22..b605275df03ab 100644 --- a/sensor/kubernetes/fake/fake.go +++ b/sensor/kubernetes/fake/fake.go @@ -220,7 +220,7 @@ func NewWorkloadManager(config *WorkloadManagerConfig) *WorkloadManager { containerPool: config.containerPool, servicesInitialized: concurrency.NewSignal(), } - mgr.initializePreexistingResourcesWithDeps() + mgr.initializePreexistingResources() log.Info("Created Workload manager for workload") log.Infof("Workload: %s", string(data)) @@ -245,7 +245,7 @@ func (w *WorkloadManager) clearActions() { } } -func (w *WorkloadManager) initializePreexistingResourcesWithDeps() { +func (w *WorkloadManager) initializePreexistingResources() { var objects []runtime.Object numNamespaces := defaultNamespaceNum