diff --git a/sensor/kubernetes/fake/deployment.go b/sensor/kubernetes/fake/deployment.go index 0bccf3515425f..d709219724f22 100644 --- a/sensor/kubernetes/fake/deployment.go +++ b/sensor/kubernetes/fake/deployment.go @@ -20,10 +20,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -var ( - processPool = newProcessPool() -) - // ProcessPool stores processes by containerID using a map type ProcessPool struct { Processes map[string][]*storage.ProcessSignal @@ -126,7 +122,7 @@ func (w *WorkloadManager) getDeployment(workload DeploymentWorkload, idx int, de namespace = "default" } - labelsPool.add(namespace, labels) + w.labelsPool.add(namespace, labels) namespacesWithDeploymentsPool.add(namespace) var serviceAccount string @@ -214,7 +210,7 @@ func (w *WorkloadManager) getDeployment(workload DeploymentWorkload, idx int, de var pods []*corev1.Pod for i := 0; i < workload.PodWorkload.NumPods; i++ { - pod := getPod(rs, getID(podIDs, i+idx*workload.PodWorkload.NumPods)) + pod := getPod(rs, getID(podIDs, i+idx*workload.PodWorkload.NumPods), w.ipPool, w.containerPool) w.writeID(podPrefix, pod.UID) pods = append(pods, pod) } @@ -260,7 +256,7 @@ func getReplicaSet(deployment *appsv1.Deployment, id string) *appsv1.ReplicaSet } } -func getPod(replicaSet *appsv1.ReplicaSet, id string) *corev1.Pod { +func getPod(replicaSet *appsv1.ReplicaSet, id string, ipPool *pool, containerPool *pool) *corev1.Pod { pod := &corev1.Pod{ TypeMeta: metav1.TypeMeta{ Kind: "Pod", @@ -288,10 +284,10 @@ func getPod(replicaSet *appsv1.ReplicaSet, id string) *corev1.Pod { StartTime: &metav1.Time{ Time: time.Now(), }, - PodIP: generateAndAddIPToPool(), + PodIP: generateAndAddIPToPool(ipPool), }, } - populatePodContainerStatuses(pod) + populatePodContainerStatuses(pod, containerPool) return pod } @@ -462,7 +458,7 @@ func (w *WorkloadManager) manageDeploymentLifecycle(ctx context.Context, resourc } } -func populatePodContainerStatuses(pod *corev1.Pod) { +func populatePodContainerStatuses(pod *corev1.Pod, containerPool *pool) { statuses := make([]corev1.ContainerStatus, 0, len(pod.Spec.Containers)) for _, container := range pod.Spec.Containers { status := corev1.ContainerStatus{ @@ -492,10 +488,10 @@ func (w *WorkloadManager) managePod(ctx context.Context, deploymentSig *concurre log.Errorf("error deleting pod: %v", err) } w.deleteID(podPrefix, pod.UID) - ipPool.remove(pod.Status.PodIP) + w.ipPool.remove(pod.Status.PodIP) for _, cs := range pod.Status.ContainerStatuses { - containerPool.remove(getShortContainerID(cs.ContainerID)) + w.removeContainerAndAssociatedObjects(getShortContainerID(cs.ContainerID)) } podSig.Signal() } @@ -513,8 +509,8 @@ func (w *WorkloadManager) managePod(ctx context.Context, deploymentSig *concurre // New pod name and UUID pod.Name = randString() pod.UID = newUUID() - pod.Status.PodIP = generateAndAddIPToPool() - populatePodContainerStatuses(pod) + pod.Status.PodIP = generateAndAddIPToPool(w.ipPool) + populatePodContainerStatuses(pod, w.containerPool) if _, err := client.Create(ctx, pod, metav1.CreateOptions{}); err != nil { log.Errorf("error creating pod: %v", err) @@ -527,6 +523,13 @@ func (w *WorkloadManager) managePod(ctx context.Context, deploymentSig *concurre } } +func (w *WorkloadManager) removeContainerAndAssociatedObjects(containerID string) { + w.containerPool.remove(containerID) + // Clean up process and endpoint pools when container is removed + w.processPool.remove(containerID) + w.endpointPool.remove(containerID) +} + func getShortContainerID(id string) string { _, runtimeID := k8sutil.ParseContainerRuntimeString(id) return containerid.ShortContainerIDFromInstanceID(runtimeID) @@ -558,7 +561,7 @@ func (w *WorkloadManager) manageProcessesForPod(podSig *concurrency.Signal, podW if processWorkload.ActiveProcesses { for _, process := range getActiveProcesses(containerID) { w.processes.Process(process) - processPool.add(process) + w.processPool.add(process) } } else { // If less than the rate, then it's a bad process @@ -567,7 +570,7 @@ func (w *WorkloadManager) manageProcessesForPod(podSig *concurrency.Signal, podW } else { goodProcess := getGoodProcess(containerID) w.processes.Process(goodProcess) - processPool.add(goodProcess) + w.processPool.add(goodProcess) } } case <-podSig.Done(): diff --git a/sensor/kubernetes/fake/fake.go b/sensor/kubernetes/fake/fake.go index 81724166f4566..b605275df03ab 100644 --- a/sensor/kubernetes/fake/fake.go +++ b/sensor/kubernetes/fake/fake.go @@ -90,10 +90,17 @@ func (c *clientSetImpl) OpenshiftOperator() operatorVersioned.Interface { // WorkloadManager encapsulates running a fake Kubernetes client type WorkloadManager struct { - db *pebble.DB - fakeClient *fake.Clientset - client client.Interface - workload *Workload + db *pebble.DB + fakeClient *fake.Clientset + client client.Interface + processPool *ProcessPool + labelsPool *labelsPoolPerNamespace + endpointPool *EndpointPool + ipPool *pool + externalIpPool *pool + containerPool *pool + registeredHostConnections []manager.HostNetworkInfo + workload *Workload // signals services servicesInitialized concurrency.Signal @@ -103,13 +110,27 @@ type WorkloadManager struct { // WorkloadManagerConfig WorkloadManager's configuration type WorkloadManagerConfig struct { - workloadFile string + workloadFile string + labelsPool *labelsPoolPerNamespace + processPool *ProcessPool + endpointPool *EndpointPool + ipPool *pool + externalIpPool *pool + containerPool *pool + storagePath string } // ConfigDefaults default configuration func ConfigDefaults() *WorkloadManagerConfig { return &WorkloadManagerConfig{ - workloadFile: workloadPath, + workloadFile: workloadPath, + labelsPool: newLabelsPool(), + processPool: newProcessPool(), + endpointPool: newEndpointPool(), + ipPool: newPool(), + externalIpPool: newPool(), + containerPool: newPool(), + storagePath: env.FakeWorkloadStoragePath.Setting(), } } @@ -119,6 +140,48 @@ func (c *WorkloadManagerConfig) WithWorkloadFile(file string) *WorkloadManagerCo return c } +// WithLabelsPool configures the WorkloadManagerConfig's LabelsPool field +func (c *WorkloadManagerConfig) WithLabelsPool(pool *labelsPoolPerNamespace) *WorkloadManagerConfig { + c.labelsPool = pool + return c +} + +// WithProcessPool configures the WorkloadManagerConfig's ProcessPool field +func (c *WorkloadManagerConfig) WithProcessPool(pool *ProcessPool) *WorkloadManagerConfig { + c.processPool = pool + return c +} + +// WithEndpointPool configures the WorkloadManagerConfig's EndpointPool field +func (c *WorkloadManagerConfig) WithEndpointPool(pool *EndpointPool) *WorkloadManagerConfig { + c.endpointPool = pool + return c +} + +// WithIpPool configures the WorkloadManagerConfig's IpPool field +func (c *WorkloadManagerConfig) WithIpPool(pool *pool) *WorkloadManagerConfig { + c.ipPool = pool + return c +} + +// WithExternalIpPool configures the WorkloadManagerConfig's ExternalIpPool field +func (c *WorkloadManagerConfig) WithExternalIpPool(pool *pool) *WorkloadManagerConfig { + c.externalIpPool = pool + return c +} + +// WithContainerPool configures the WorkloadManagerConfig's ContainerPool field +func (c *WorkloadManagerConfig) WithContainerPool(pool *pool) *WorkloadManagerConfig { + c.containerPool = pool + return c +} + +// WithStoragePath configures the WorkloadManagerConfig's StoragePath field +func (c *WorkloadManagerConfig) WithStoragePath(path string) *WorkloadManagerConfig { + c.storagePath = path + return c +} + // Client returns the mock client func (w *WorkloadManager) Client() client.Interface { return w.client @@ -140,16 +203,21 @@ func NewWorkloadManager(config *WorkloadManagerConfig) *WorkloadManager { } var db *pebble.DB - if storagePath := env.FakeWorkloadStoragePath.Setting(); storagePath != "" { - db, err = pebble.Open(storagePath, &pebble.Options{}) + if config.storagePath != "" { + db, err = pebble.Open(config.storagePath, &pebble.Options{}) if err != nil { log.Panic("could not open id storage") } } - mgr := &WorkloadManager{ db: db, workload: &workload, + processPool: config.processPool, + labelsPool: config.labelsPool, + endpointPool: config.endpointPool, + ipPool: config.ipPool, + externalIpPool: config.externalIpPool, + containerPool: config.containerPool, servicesInitialized: concurrency.NewSignal(), } mgr.initializePreexistingResources() @@ -195,7 +263,7 @@ func (w *WorkloadManager) initializePreexistingResources() { objects = append(objects, node) } - labelsPool.matchLabels = w.workload.MatchLabels + w.labelsPool.matchLabels = w.workload.MatchLabels objects = append(objects, w.getRBAC(w.workload.RBACWorkload, w.getIDsForPrefix(serviceAccountPrefix), w.getIDsForPrefix(rolesPrefix), w.getIDsForPrefix(rolebindingsPrefix))...) var resources []*deploymentResourcesToBeManaged diff --git a/sensor/kubernetes/fake/flows.go b/sensor/kubernetes/fake/flows.go index 7b550521f0170..39c2e5071979f 100644 --- a/sensor/kubernetes/fake/flows.go +++ b/sensor/kubernetes/fake/flows.go @@ -2,7 +2,6 @@ package fake import ( "context" - "fmt" "math/rand" "time" @@ -10,155 +9,17 @@ import ( "github.com/stackrox/rox/generated/storage" "github.com/stackrox/rox/pkg/net" "github.com/stackrox/rox/pkg/protocompat" - "github.com/stackrox/rox/pkg/set" - "github.com/stackrox/rox/pkg/sync" "github.com/stackrox/rox/pkg/timestamp" "github.com/stackrox/rox/sensor/common/networkflow/manager" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -var ( - ipPool = newPool() - externalIpPool = newPool() - containerPool = newPool() - endpointPool = newEndpointPool() - - registeredHostConnections []manager.HostNetworkInfo -) - -type pool struct { - pool set.StringSet - lock sync.RWMutex -} - -func newPool() *pool { - return &pool{ - pool: set.NewStringSet(), - } -} - -func (p *pool) add(val string) bool { - p.lock.Lock() - defer p.lock.Unlock() - - if added := p.pool.Add(val); !added { - return false - } - return true -} - -func (p *pool) remove(val string) { - p.lock.Lock() - defer p.lock.Unlock() - - p.pool.Remove(val) - processPool.remove(val) - endpointPool.remove(val) -} - -func (p *pool) randomElem() (string, bool) { - p.lock.RLock() - defer p.lock.RUnlock() - val := p.pool.GetArbitraryElem() - return val, val != "" -} - -func (p *pool) mustGetRandomElem() string { - p.lock.RLock() - defer p.lock.RUnlock() - val := p.pool.GetArbitraryElem() - if val == "" { - panic("not expecting an empty pool") - } - return val -} - -// EndpointPool stores endpoints by containerID using a map -type EndpointPool struct { - Endpoints map[string][]*sensor.NetworkEndpoint - EndpointsToBeClosed []*sensor.NetworkEndpoint - Capacity int - Size int - lock sync.RWMutex -} - -func newEndpointPool() *EndpointPool { - return &EndpointPool{ - Endpoints: make(map[string][]*sensor.NetworkEndpoint), - EndpointsToBeClosed: make([]*sensor.NetworkEndpoint, 0), - Capacity: 10000, - Size: 0, - } -} - -func (p *EndpointPool) add(val *sensor.NetworkEndpoint) { - p.lock.Lock() - defer p.lock.Unlock() - - if p.Size < p.Capacity { - p.Endpoints[val.ContainerId] = append(p.Endpoints[val.ContainerId], val) - p.Size++ - } -} - -func (p *EndpointPool) remove(containerID string) { - p.lock.Lock() - defer p.lock.Unlock() - - p.EndpointsToBeClosed = append(p.EndpointsToBeClosed, p.Endpoints[containerID]...) - p.Size -= len(p.Endpoints[containerID]) - delete(p.Endpoints, containerID) -} - -func (p *EndpointPool) clearEndpointsToBeClosed() { - p.lock.Lock() - defer p.lock.Unlock() - - p.EndpointsToBeClosed = []*sensor.NetworkEndpoint{} -} - -func generateIP() string { - return fmt.Sprintf("10.%d.%d.%d", rand.Intn(256), rand.Intn(256), rand.Intn(256)) -} - -// Generate IP addresses from 11.0.0.0 to 99.255.255.255 which are all public -func generateExternalIP() string { - return fmt.Sprintf("%d.%d.%d.%d", rand.Intn(89)+11, rand.Intn(256), rand.Intn(256), rand.Intn(256)) -} - -// We want to reuse some external IPs, so we test the cases where multiple -// entities connect to the same external IP, but we also want many external IPs -// that are only used once. -func generateExternalIPPool() { - ip := []int{11, 0, 0, 0} - for range 1000 { - for j := 3; j >= 0; j-- { - ip[j]++ - if ip[j] > 255 { - ip[j] = 0 - } else { - break - } - } - ipString := fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]) - externalIpPool.add(ipString) - } -} - -func generateAndAddIPToPool() string { - ip := generateIP() - for !ipPool.add(ip) { - ip = generateIP() - } - return ip -} - func (w *WorkloadManager) getRandomHostConnection(ctx context.Context) (manager.HostNetworkInfo, bool) { // Return false if the network manager hasn't been initialized yet if !w.servicesInitialized.IsDone() { return nil, false } - if len(registeredHostConnections) == 0 { + if len(w.registeredHostConnections) == 0 { // Initialize the host connections nodeResp, err := w.fakeClient.CoreV1().Nodes().List(ctx, v1.ListOptions{}) if err != nil { @@ -167,38 +28,10 @@ func (w *WorkloadManager) getRandomHostConnection(ctx context.Context) (manager. } for _, node := range nodeResp.Items { info, _ := w.networkManager.RegisterCollector(node.Name) - registeredHostConnections = append(registeredHostConnections, info) - } - } - return registeredHostConnections[rand.Intn(len(registeredHostConnections))], true -} - -func getNetworkProcessUniqueKeyFromProcess(process *storage.ProcessSignal) *storage.NetworkProcessUniqueKey { - if process != nil { - return &storage.NetworkProcessUniqueKey{ - ProcessName: process.Name, - ProcessExecFilePath: process.ExecFilePath, - ProcessArgs: process.Args, + w.registeredHostConnections = append(w.registeredHostConnections, info) } } - - return nil -} - -func getRandomOriginator(containerID string) *storage.NetworkProcessUniqueKey { - var process *storage.ProcessSignal - var percentMatchedProcess float32 = 0.5 - p := rand.Float32() - if p < percentMatchedProcess { - // There is a chance that the process has been filtered out or hasn't gotten to - // the central-db for some other reason so this is not a guarantee that the - // process is in the central-db - process = processPool.getRandomProcess(containerID) - } else { - process = getGoodProcess(containerID) - } - - return getNetworkProcessUniqueKeyFromProcess(process) + return w.registeredHostConnections[rand.Intn(len(w.registeredHostConnections))], true } func makeNetworkConnection(src string, dst string, containerID string, closeTimestamp time.Time) *sensor.NetworkConnection { @@ -236,10 +69,10 @@ func (w *WorkloadManager) getRandomInternalExternalIP() (string, bool, bool) { internal := rand.Intn(100) < 80 if internal { - ip, ok = ipPool.randomElem() + ip, ok = w.ipPool.randomElem() } else { if rand.Intn(100) < 50 { - ip, ok = externalIpPool.randomElem() + ip, ok = w.externalIpPool.randomElem() } else { ip = generateExternalIP() ok = true @@ -264,7 +97,7 @@ func (w *WorkloadManager) getRandomSrcDst() (string, string, bool) { if internal { dst, _, ok = w.getRandomInternalExternalIP() } else { - dst, ok = ipPool.randomElem() + dst, ok = w.ipPool.randomElem() if !ok { log.Error("Found no IPs in the internal pool") } @@ -274,9 +107,9 @@ func (w *WorkloadManager) getRandomSrcDst() (string, string, bool) { } func (w *WorkloadManager) getRandomNetworkEndpoint(containerID string) (*sensor.NetworkEndpoint, bool) { - originator := getRandomOriginator(containerID) + originator := getRandomOriginator(containerID, w.processPool) - ip, ok := ipPool.randomElem() + ip, ok := w.ipPool.randomElem() if !ok { return nil, false } @@ -305,7 +138,7 @@ func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) continue } - containerID, ok := containerPool.randomElem() + containerID, ok := w.containerPool.randomElem() if !ok { log.Error("Found no containers in pool") continue @@ -319,8 +152,8 @@ func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) } conns = append(conns, conn) - if endpointPool.Size < endpointPool.Capacity { - endpointPool.add(networkEndpoint) + if w.endpointPool.Size < w.endpointPool.Capacity { + w.endpointPool.add(networkEndpoint) } if workload.GenerateUnclosedEndpoints { // These endpoints will not be closed - i.e., CloseTimestamp will be always nil. @@ -328,7 +161,7 @@ func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) } } - for _, endpoint := range endpointPool.EndpointsToBeClosed { + for _, endpoint := range w.endpointPool.EndpointsToBeClosed { networkEndpoint := endpoint closeTS, err := protocompat.ConvertTimeToTimestampOrError(time.Now().Add(-5 * time.Second)) if err != nil { @@ -339,7 +172,7 @@ func (w *WorkloadManager) getFakeNetworkConnectionInfo(workload NetworkWorkload) } } - endpointPool.clearEndpointsToBeClosed() + w.endpointPool.clearEndpointsToBeClosed() return &sensor.NetworkConnectionInfo{ UpdatedConnections: conns, @@ -357,7 +190,7 @@ func (w *WorkloadManager) manageFlows(ctx context.Context, workload NetworkWorkl ticker := time.NewTicker(workload.FlowInterval) defer ticker.Stop() - generateExternalIPPool() + generateExternalIPPool(w.externalIpPool) for { select { diff --git a/sensor/kubernetes/fake/flows_test.go b/sensor/kubernetes/fake/flows_test.go index 1d8682bb38e6a..eb8bda7fdf45e 100644 --- a/sensor/kubernetes/fake/flows_test.go +++ b/sensor/kubernetes/fake/flows_test.go @@ -16,16 +16,22 @@ func TestFlowsSuite(t *testing.T) { } func (s *flowsSuite) TestGetRandomInternalExternalIP() { - var w WorkloadManager + w := &WorkloadManager{ + endpointPool: newEndpointPool(), + processPool: newProcessPool(), + ipPool: newPool(), + externalIpPool: newPool(), + containerPool: newPool(), + } _, _, ok := w.getRandomSrcDst() s.False(ok) for range 1000 { - generateAndAddIPToPool() + generateAndAddIPToPool(w.ipPool) } - generateExternalIPPool() + generateExternalIPPool(w.externalIpPool) for range 1000 { ip, internal, ok := w.getRandomInternalExternalIP() diff --git a/sensor/kubernetes/fake/labels.go b/sensor/kubernetes/fake/labels.go index bf6a275863196..d206cfc27aa5c 100644 --- a/sensor/kubernetes/fake/labels.go +++ b/sensor/kubernetes/fake/labels.go @@ -6,10 +6,6 @@ import ( "github.com/stackrox/rox/pkg/sync" ) -var ( - labelsPool = newLabelsPool() -) - type labelsPoolPerNamespace struct { pool map[string][]map[string]string matchLabels bool diff --git a/sensor/kubernetes/fake/networkpolicy.go b/sensor/kubernetes/fake/networkpolicy.go index dc5df6888ff07..1c86e6ae6d2a3 100644 --- a/sensor/kubernetes/fake/networkpolicy.go +++ b/sensor/kubernetes/fake/networkpolicy.go @@ -17,7 +17,7 @@ type networkPolicyToBeManaged struct { func (w *WorkloadManager) getNetworkPolicy(workload NetworkPolicyWorkload, id string) *networkPolicyToBeManaged { namespace := namespacesWithDeploymentsPool.mustGetRandomElem() - labels := labelsPool.randomElem(namespace) + labels := w.labelsPool.randomElem(namespace) np := &networkingV1.NetworkPolicy{ TypeMeta: metav1.TypeMeta{ Kind: "NetworkPolicy", diff --git a/sensor/kubernetes/fake/pools.go b/sensor/kubernetes/fake/pools.go new file mode 100644 index 0000000000000..ca151f0127889 --- /dev/null +++ b/sensor/kubernetes/fake/pools.go @@ -0,0 +1,170 @@ +package fake + +import ( + "fmt" + "math/rand" + + "github.com/stackrox/rox/generated/internalapi/sensor" + "github.com/stackrox/rox/generated/storage" + "github.com/stackrox/rox/pkg/set" + "github.com/stackrox/rox/pkg/sync" +) + +// pool is a thread-safe string pool using set.StringSet +type pool struct { + pool set.StringSet + lock sync.RWMutex +} + +func newPool() *pool { + return &pool{ + pool: set.NewStringSet(), + } +} + +func (p *pool) add(val string) bool { + p.lock.Lock() + defer p.lock.Unlock() + + if added := p.pool.Add(val); !added { + return false + } + return true +} + +func (p *pool) remove(val string) { + p.lock.Lock() + defer p.lock.Unlock() + + p.pool.Remove(val) +} + +func (p *pool) randomElem() (string, bool) { + p.lock.RLock() + defer p.lock.RUnlock() + val := p.pool.GetArbitraryElem() + if val == "" { + return "", false + } + return val, true +} + +func (p *pool) mustGetRandomElem() string { + p.lock.RLock() + defer p.lock.RUnlock() + val := p.pool.GetArbitraryElem() + if val == "" { + panic("not expecting an empty pool") + } + return val +} + +// EndpointPool stores endpoints by containerID using a map +type EndpointPool struct { + Endpoints map[string][]*sensor.NetworkEndpoint + EndpointsToBeClosed []*sensor.NetworkEndpoint + Capacity int + Size int + lock sync.RWMutex +} + +func newEndpointPool() *EndpointPool { + return &EndpointPool{ + Endpoints: make(map[string][]*sensor.NetworkEndpoint), + EndpointsToBeClosed: make([]*sensor.NetworkEndpoint, 0), + Capacity: 10000, + Size: 0, + } +} + +func (p *EndpointPool) add(val *sensor.NetworkEndpoint) { + p.lock.Lock() + defer p.lock.Unlock() + + if p.Size < p.Capacity { + p.Endpoints[val.ContainerId] = append(p.Endpoints[val.ContainerId], val) + p.Size++ + } +} + +func (p *EndpointPool) remove(containerID string) { + p.lock.Lock() + defer p.lock.Unlock() + + p.EndpointsToBeClosed = append(p.EndpointsToBeClosed, p.Endpoints[containerID]...) + p.Size -= len(p.Endpoints[containerID]) + delete(p.Endpoints, containerID) +} + +func (p *EndpointPool) clearEndpointsToBeClosed() { + p.lock.Lock() + defer p.lock.Unlock() + + p.EndpointsToBeClosed = []*sensor.NetworkEndpoint{} +} + +// IP generation and pool manipulation functions + +func generateIP() string { + return fmt.Sprintf("10.%d.%d.%d", rand.Intn(256), rand.Intn(256), rand.Intn(256)) +} + +// Generate IP addresses from 11.0.0.0 to 99.255.255.255 which are all public +func generateExternalIP() string { + return fmt.Sprintf("%d.%d.%d.%d", rand.Intn(89)+11, rand.Intn(256), rand.Intn(256), rand.Intn(256)) +} + +// We want to reuse some external IPs, so we test the cases where multiple +// entities connect to the same external IP, but we also want many external IPs +// that are only used once. +func generateExternalIPPool(pool *pool) { + ip := []int{11, 0, 0, 0} + for range 1000 { + for j := 3; j >= 0; j-- { + ip[j]++ + if ip[j] > 255 { + ip[j] = 0 + } else { + break + } + } + ipString := fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]) + pool.add(ipString) + } +} + +func generateAndAddIPToPool(ipPool *pool) string { + ip := generateIP() + for !ipPool.add(ip) { + ip = generateIP() + } + return ip +} + +func getRandomOriginator(containerID string, pool *ProcessPool) *storage.NetworkProcessUniqueKey { + var process *storage.ProcessSignal + var percentMatchedProcess float32 = 0.5 + p := rand.Float32() + if p < percentMatchedProcess { + // There is a chance that the process has been filtered out or hasn't gotten to + // the central-db for some other reason so this is not a guarantee that the + // process is in the central-db + process = pool.getRandomProcess(containerID) + } else { + process = getGoodProcess(containerID) + } + + return getNetworkProcessUniqueKeyFromProcess(process) +} + +func getNetworkProcessUniqueKeyFromProcess(process *storage.ProcessSignal) *storage.NetworkProcessUniqueKey { + if process != nil { + return &storage.NetworkProcessUniqueKey{ + ProcessName: process.Name, + ProcessExecFilePath: process.ExecFilePath, + ProcessArgs: process.Args, + } + } + + return nil +} diff --git a/sensor/kubernetes/fake/service.go b/sensor/kubernetes/fake/service.go index 7289baa030b98..de7b9b4ef79a0 100644 --- a/sensor/kubernetes/fake/service.go +++ b/sensor/kubernetes/fake/service.go @@ -26,9 +26,9 @@ func getIPFamily() string { return ipFamilies[rand.Intn(len(ipFamilies))] } -func getClusterIP(id string) *v1.Service { +func getClusterIP(id string, lblPool *labelsPoolPerNamespace) *v1.Service { ns := namespacesWithDeploymentsPool.mustGetRandomElem() - labels := labelsPool.randomElem(ns) + labels := lblPool.randomElem(ns) clusterIP := generateIP() return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -56,9 +56,9 @@ func getClusterIP(id string) *v1.Service { } } -func getNodePort(id string) *v1.Service { +func getNodePort(id string, lblPool *labelsPoolPerNamespace) *v1.Service { ns := namespacesWithDeploymentsPool.mustGetRandomElem() - labels := labelsPool.randomElem(ns) + labels := lblPool.randomElem(ns) clusterIP := generateIP() return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -87,9 +87,9 @@ func getNodePort(id string) *v1.Service { } } -func getLoadBalancer(id string) *v1.Service { +func getLoadBalancer(id string, lblPool *labelsPoolPerNamespace) *v1.Service { ns := namespacesWithDeploymentsPool.mustGetRandomElem() - labels := labelsPool.randomElem(ns) + labels := lblPool.randomElem(ns) clusterIP := generateIP() internalTrafficPolicy := v1.ServiceInternalTrafficPolicyCluster allocateLoadBalancerNodePorts := true @@ -138,17 +138,17 @@ func getLoadBalancer(id string) *v1.Service { func (w *WorkloadManager) getServices(workload ServiceWorkload, ids []string) []runtime.Object { objects := make([]runtime.Object, 0, workload.NumClusterIPs+workload.NumNodePorts+workload.NumLoadBalancers) for i := 0; i < workload.NumClusterIPs; i++ { - clusterIP := getClusterIP(getID(ids, i)) + clusterIP := getClusterIP(getID(ids, i), w.labelsPool) w.writeID(servicePrefix, clusterIP.UID) objects = append(objects, clusterIP) } for i := 0; i < workload.NumNodePorts; i++ { - nodePort := getNodePort(getID(ids, i+workload.NumClusterIPs)) + nodePort := getNodePort(getID(ids, i+workload.NumClusterIPs), w.labelsPool) w.writeID(servicePrefix, nodePort.UID) objects = append(objects, nodePort) } for i := 0; i < workload.NumLoadBalancers; i++ { - loadBalancer := getLoadBalancer(getID(ids, i+workload.NumClusterIPs+workload.NumNodePorts)) + loadBalancer := getLoadBalancer(getID(ids, i+workload.NumClusterIPs+workload.NumNodePorts), w.labelsPool) w.writeID(servicePrefix, loadBalancer.UID) objects = append(objects, loadBalancer) }