Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions sensor/kubernetes/fake/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var (
processPool = newProcessPool()
)

// ProcessPool stores processes by containerID using a map
type ProcessPool struct {
Processes map[string][]*storage.ProcessSignal
Expand Down Expand Up @@ -126,7 +122,7 @@ func (w *WorkloadManager) getDeployment(workload DeploymentWorkload, idx int, de
namespace = "default"
}

labelsPool.add(namespace, labels)
w.labelsPool.add(namespace, labels)
namespacesWithDeploymentsPool.add(namespace)

var serviceAccount string
Expand Down Expand Up @@ -214,7 +210,7 @@ func (w *WorkloadManager) getDeployment(workload DeploymentWorkload, idx int, de

var pods []*corev1.Pod
for i := 0; i < workload.PodWorkload.NumPods; i++ {
pod := getPod(rs, getID(podIDs, i+idx*workload.PodWorkload.NumPods))
pod := getPod(rs, getID(podIDs, i+idx*workload.PodWorkload.NumPods), w.ipPool, w.containerPool)
w.writeID(podPrefix, pod.UID)
pods = append(pods, pod)
}
Expand Down Expand Up @@ -260,7 +256,7 @@ func getReplicaSet(deployment *appsv1.Deployment, id string) *appsv1.ReplicaSet
}
}

func getPod(replicaSet *appsv1.ReplicaSet, id string) *corev1.Pod {
func getPod(replicaSet *appsv1.ReplicaSet, id string, ipPool *pool, containerPool *pool) *corev1.Pod {
pod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
Expand Down Expand Up @@ -288,10 +284,10 @@ func getPod(replicaSet *appsv1.ReplicaSet, id string) *corev1.Pod {
StartTime: &metav1.Time{
Time: time.Now(),
},
PodIP: generateAndAddIPToPool(),
PodIP: generateAndAddIPToPool(ipPool),
},
}
populatePodContainerStatuses(pod)
populatePodContainerStatuses(pod, containerPool)
return pod
}

Expand Down Expand Up @@ -462,7 +458,7 @@ func (w *WorkloadManager) manageDeploymentLifecycle(ctx context.Context, resourc
}
}

func populatePodContainerStatuses(pod *corev1.Pod) {
func populatePodContainerStatuses(pod *corev1.Pod, containerPool *pool) {
statuses := make([]corev1.ContainerStatus, 0, len(pod.Spec.Containers))
for _, container := range pod.Spec.Containers {
status := corev1.ContainerStatus{
Expand Down Expand Up @@ -492,10 +488,10 @@ func (w *WorkloadManager) managePod(ctx context.Context, deploymentSig *concurre
log.Errorf("error deleting pod: %v", err)
}
w.deleteID(podPrefix, pod.UID)
ipPool.remove(pod.Status.PodIP)
w.ipPool.remove(pod.Status.PodIP)

for _, cs := range pod.Status.ContainerStatuses {
containerPool.remove(getShortContainerID(cs.ContainerID))
w.removeContainerAndAssociatedObjects(getShortContainerID(cs.ContainerID))
}
podSig.Signal()
}
Expand All @@ -513,8 +509,8 @@ func (w *WorkloadManager) managePod(ctx context.Context, deploymentSig *concurre
// New pod name and UUID
pod.Name = randString()
pod.UID = newUUID()
pod.Status.PodIP = generateAndAddIPToPool()
populatePodContainerStatuses(pod)
pod.Status.PodIP = generateAndAddIPToPool(w.ipPool)
populatePodContainerStatuses(pod, w.containerPool)

if _, err := client.Create(ctx, pod, metav1.CreateOptions{}); err != nil {
log.Errorf("error creating pod: %v", err)
Expand All @@ -527,6 +523,13 @@ func (w *WorkloadManager) managePod(ctx context.Context, deploymentSig *concurre
}
}

func (w *WorkloadManager) removeContainerAndAssociatedObjects(containerID string) {
w.containerPool.remove(containerID)
// Clean up process and endpoint pools when container is removed
w.processPool.remove(containerID)
w.endpointPool.remove(containerID)
}

func getShortContainerID(id string) string {
_, runtimeID := k8sutil.ParseContainerRuntimeString(id)
return containerid.ShortContainerIDFromInstanceID(runtimeID)
Expand Down Expand Up @@ -558,7 +561,7 @@ func (w *WorkloadManager) manageProcessesForPod(podSig *concurrency.Signal, podW
if processWorkload.ActiveProcesses {
for _, process := range getActiveProcesses(containerID) {
w.processes.Process(process)
processPool.add(process)
w.processPool.add(process)
}
} else {
// If less than the rate, then it's a bad process
Expand All @@ -567,7 +570,7 @@ func (w *WorkloadManager) manageProcessesForPod(podSig *concurrency.Signal, podW
} else {
goodProcess := getGoodProcess(containerID)
w.processes.Process(goodProcess)
processPool.add(goodProcess)
w.processPool.add(goodProcess)
}
}
case <-podSig.Done():
Expand Down
88 changes: 78 additions & 10 deletions sensor/kubernetes/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,17 @@ func (c *clientSetImpl) OpenshiftOperator() operatorVersioned.Interface {

// WorkloadManager encapsulates running a fake Kubernetes client
type WorkloadManager struct {
db *pebble.DB
fakeClient *fake.Clientset
client client.Interface
workload *Workload
db *pebble.DB
fakeClient *fake.Clientset
client client.Interface
processPool *ProcessPool
labelsPool *labelsPoolPerNamespace
endpointPool *EndpointPool
ipPool *pool
externalIpPool *pool
containerPool *pool
registeredHostConnections []manager.HostNetworkInfo
workload *Workload

// signals services
servicesInitialized concurrency.Signal
Expand All @@ -103,13 +110,27 @@ type WorkloadManager struct {

// WorkloadManagerConfig WorkloadManager's configuration
type WorkloadManagerConfig struct {
workloadFile string
workloadFile string
labelsPool *labelsPoolPerNamespace
processPool *ProcessPool
endpointPool *EndpointPool
ipPool *pool
externalIpPool *pool
containerPool *pool
storagePath string
}

// ConfigDefaults default configuration
func ConfigDefaults() *WorkloadManagerConfig {
return &WorkloadManagerConfig{
workloadFile: workloadPath,
workloadFile: workloadPath,
labelsPool: newLabelsPool(),
processPool: newProcessPool(),
endpointPool: newEndpointPool(),
ipPool: newPool(),
externalIpPool: newPool(),
containerPool: newPool(),
storagePath: env.FakeWorkloadStoragePath.Setting(),
}
}

Expand All @@ -119,6 +140,48 @@ func (c *WorkloadManagerConfig) WithWorkloadFile(file string) *WorkloadManagerCo
return c
}

// WithLabelsPool configures the WorkloadManagerConfig's LabelsPool field
func (c *WorkloadManagerConfig) WithLabelsPool(pool *labelsPoolPerNamespace) *WorkloadManagerConfig {
c.labelsPool = pool
return c
}

// WithProcessPool configures the WorkloadManagerConfig's ProcessPool field
func (c *WorkloadManagerConfig) WithProcessPool(pool *ProcessPool) *WorkloadManagerConfig {
c.processPool = pool
return c
}

// WithEndpointPool configures the WorkloadManagerConfig's EndpointPool field
func (c *WorkloadManagerConfig) WithEndpointPool(pool *EndpointPool) *WorkloadManagerConfig {
c.endpointPool = pool
return c
}

// WithIpPool configures the WorkloadManagerConfig's IpPool field
func (c *WorkloadManagerConfig) WithIpPool(pool *pool) *WorkloadManagerConfig {
c.ipPool = pool
return c
}

// WithExternalIpPool configures the WorkloadManagerConfig's ExternalIpPool field
func (c *WorkloadManagerConfig) WithExternalIpPool(pool *pool) *WorkloadManagerConfig {
c.externalIpPool = pool
return c
}

// WithContainerPool configures the WorkloadManagerConfig's ContainerPool field
func (c *WorkloadManagerConfig) WithContainerPool(pool *pool) *WorkloadManagerConfig {
c.containerPool = pool
return c
}

// WithStoragePath configures the WorkloadManagerConfig's StoragePath field
func (c *WorkloadManagerConfig) WithStoragePath(path string) *WorkloadManagerConfig {
c.storagePath = path
return c
}

// Client returns the mock client
func (w *WorkloadManager) Client() client.Interface {
return w.client
Expand All @@ -140,16 +203,21 @@ func NewWorkloadManager(config *WorkloadManagerConfig) *WorkloadManager {
}

var db *pebble.DB
if storagePath := env.FakeWorkloadStoragePath.Setting(); storagePath != "" {
db, err = pebble.Open(storagePath, &pebble.Options{})
if config.storagePath != "" {
db, err = pebble.Open(config.storagePath, &pebble.Options{})
if err != nil {
log.Panic("could not open id storage")
}
}

mgr := &WorkloadManager{
db: db,
workload: &workload,
processPool: config.processPool,
labelsPool: config.labelsPool,
endpointPool: config.endpointPool,
ipPool: config.ipPool,
externalIpPool: config.externalIpPool,
containerPool: config.containerPool,
servicesInitialized: concurrency.NewSignal(),
}
mgr.initializePreexistingResources()
Expand Down Expand Up @@ -195,7 +263,7 @@ func (w *WorkloadManager) initializePreexistingResources() {
objects = append(objects, node)
}

labelsPool.matchLabels = w.workload.MatchLabels
w.labelsPool.matchLabels = w.workload.MatchLabels

objects = append(objects, w.getRBAC(w.workload.RBACWorkload, w.getIDsForPrefix(serviceAccountPrefix), w.getIDsForPrefix(rolesPrefix), w.getIDsForPrefix(rolebindingsPrefix))...)
var resources []*deploymentResourcesToBeManaged
Expand Down
Loading
Loading