Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/centralsensor/caps_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,9 @@ const (
// ComplianceV2TailoredProfiles identifies the capability of Central to handle
// tailored profile tracking and scan configurations referencing tailored profiles.
ComplianceV2TailoredProfiles = "ComplianceV2TailoredProfiles"

// TargetedImageCacheInvalidation identifies the capability to forward
// per-image cache invalidation keys to the admission controller instead of
// flushing the entire image cache.
TargetedImageCacheInvalidation SensorCapability = "TargetedImageCacheInvalidation"
)
1 change: 1 addition & 0 deletions sensor/admission-control/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Manager interface {

SettingsUpdateC() chan<- *sensor.AdmissionControlSettings
ResourceUpdatesC() chan<- *sensor.AdmCtrlUpdateResourceRequest
ImageCacheInvalidationC() chan<- *sensor.AdmCtrlImageCacheInvalidation

SettingsStream() concurrency.ReadOnlyValueStream[*sensor.AdmissionControlSettings]
SensorConnStatusFlag() *concurrency.Flag
Expand Down
49 changes: 43 additions & 6 deletions sensor/admission-control/manager/manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ type manager struct {
imageNameCacheEnabled bool
imageFetchGroup *coalescer.Coalescer[*storage.Image]

imageCacheInvalidationC chan *sensor.AdmCtrlImageCacheInvalidation

depClient sensor.DeploymentServiceClient
resourceUpdatesC chan *sensor.AdmCtrlUpdateResourceRequest
namespaces *resources.NamespaceStore
Expand Down Expand Up @@ -160,12 +162,13 @@ func NewManager(namespace string, maxImageCacheSize int64, imageNameCacheEnabled

alertsC: make(chan []*storage.Alert),

namespaces: nsStore,
deployments: depStore,
pods: podStore,
resourceUpdatesC: make(chan *sensor.AdmCtrlUpdateResourceRequest),
initialSyncSig: concurrency.NewSignal(),
depClient: deploymentServiceClient,
namespaces: nsStore,
deployments: depStore,
pods: podStore,
resourceUpdatesC: make(chan *sensor.AdmCtrlUpdateResourceRequest),
imageCacheInvalidationC: make(chan *sensor.AdmCtrlImageCacheInvalidation),
initialSyncSig: concurrency.NewSignal(),
depClient: deploymentServiceClient,

ownNamespace: namespace,
}
Expand Down Expand Up @@ -247,6 +250,10 @@ func (m *manager) ResourceUpdatesC() chan<- *sensor.AdmCtrlUpdateResourceRequest
return m.resourceUpdatesC
}

func (m *manager) ImageCacheInvalidationC() chan<- *sensor.AdmCtrlImageCacheInvalidation {
return m.imageCacheInvalidationC
}

func (m *manager) InitialResourceSyncSig() *concurrency.Signal {
return &m.initialSyncSig
}
Expand All @@ -264,6 +271,8 @@ func (m *manager) run() {
m.ProcessNewSettings(newSettings)
case req := <-m.resourceUpdatesC:
m.processUpdateResourceRequest(req)
case inv := <-m.imageCacheInvalidationC:
m.processImageCacheInvalidation(inv)
default:
// Select on syncC only if there is nothing to be read from the main
// channels. The duplication of select branches is a bit ugly, but inevitable
Expand All @@ -276,6 +285,8 @@ func (m *manager) run() {
m.ProcessNewSettings(newSettings)
case req := <-m.resourceUpdatesC:
m.processUpdateResourceRequest(req)
case inv := <-m.imageCacheInvalidationC:
m.processImageCacheInvalidation(inv)
case syncSig := <-m.syncC:
syncSig.Signal()
}
Expand Down Expand Up @@ -517,3 +528,29 @@ func (m *manager) processUpdateResourceRequest(req *sensor.AdmCtrlUpdateResource
func (m *manager) getDeploymentForPod(namespace, podName string) *storage.Deployment {
return m.deployments.Get(namespace, m.pods.GetDeploymentID(namespace, podName))
}

func (m *manager) processImageCacheInvalidation(inv *sensor.AdmCtrlImageCacheInvalidation) {
s := m.currentState()
flatten := s != nil && s.GetFlattenImageData()

invalidated := 0
for _, key := range inv.GetImageKeys() {
cacheKey := key.GetImageId()
if flatten && key.GetImageIdV2() != "" {
cacheKey = key.GetImageIdV2()
}
fullName := key.GetImageFullName()

if cacheKey != "" {
m.imageCache.Remove(cacheKey)
m.imageFetchGroup.Forget(cacheKey)
invalidated++
}
if fullName != "" {
m.imageNameToImageCacheKey.Remove(fullName)
m.imageFetchGroup.Forget(fullName)
}
}

log.Infof("Targeted image cache invalidation: invalidated %d entries", invalidated)
}
252 changes: 199 additions & 53 deletions sensor/admission-control/manager/manager_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,81 +3,227 @@ package manager
import (
"context"
"testing"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/stackrox/rox/generated/internalapi/central"
"github.com/stackrox/rox/generated/internalapi/sensor"
"github.com/stackrox/rox/generated/storage"
"github.com/stackrox/rox/pkg/coalescer"
"github.com/stackrox/rox/pkg/sizeboundedcache"
"github.com/stackrox/rox/sensor/admission-control/resources"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

func TestManager_GetClusterLabels(t *testing.T) {
ctx := context.Background()
type ManagerImplSuite struct {
suite.Suite
mgr *manager
nsStore *resources.NamespaceStore
}

func TestManagerImplSuite(t *testing.T) {
suite.Run(t, new(ManagerImplSuite))
}

func (s *ManagerImplSuite) SetupSuite() {
cache, err := sizeboundedcache.New(1024*1024, 512*1024, func(key string, value imageCacheEntry) int64 {
return 1024
})
require.NoError(s.T(), err)

nameCache, err := lru.New[string, string](imageNameCacheSize)
require.NoError(s.T(), err)

s.mgr = &manager{
imageCache: cache,
imageNameToImageCacheKey: nameCache,
imageFetchGroup: coalescer.New[*storage.Image](),
}
}

func (s *ManagerImplSuite) SetupTest() {
s.mgr.imageCache.Purge()
s.mgr.imageNameToImageCacheKey.Purge()
s.mgr.clusterLabels.Store(nil)

depStore := resources.NewDeploymentStore(nil)
podStore := resources.NewPodStore()
s.nsStore = resources.NewNamespaceStore(depStore, podStore)
s.mgr.namespaces = s.nsStore
}

t.Run("nil cluster labels returns nil", func(t *testing.T) {
m := &manager{}
labels, err := m.GetClusterLabels(ctx, "cluster-id")
require.NoError(t, err)
assert.Nil(t, labels)
func (s *ManagerImplSuite) addToImageCache(key string) {
s.mgr.imageCache.Add(key, imageCacheEntry{
Image: &storage.Image{Id: key},
timestamp: time.Now(),
})
}

func (s *ManagerImplSuite) addNameMapping(name, key string) {
s.mgr.imageNameToImageCacheKey.Add(name, key)
}

t.Run("returns cluster labels", func(t *testing.T) {
m := &manager{}
clusterLabels := map[string]string{
"env": "prod",
"region": "us-east-1",
}
m.clusterLabels.Store(&clusterLabels)
labels, err := m.GetClusterLabels(ctx, "cluster-id")
require.NoError(t, err)
assert.Equal(t, map[string]string{
"env": "prod",
"region": "us-east-1",
}, labels)
func (s *ManagerImplSuite) assertCached(key string, expected bool, msgAndArgs ...interface{}) {
_, ok := s.mgr.imageCache.Get(key)
s.Equal(expected, ok, msgAndArgs...)
}

func (s *ManagerImplSuite) assertNameMapped(name string, expected bool, msgAndArgs ...interface{}) {
_, ok := s.mgr.imageNameToImageCacheKey.Get(name)
s.Equal(expected, ok, msgAndArgs...)
}

func (s *ManagerImplSuite) invalidate(keys ...*central.InvalidateImageCache_ImageKey) {
s.mgr.processImageCacheInvalidation(&sensor.AdmCtrlImageCacheInvalidation{
ImageKeys: keys,
})
}

func TestManager_GetNamespaceLabels(t *testing.T) {
ctx := context.Background()
// --- Cluster labels ---

t.Run("returns labels from namespace store", func(t *testing.T) {
depStore := resources.NewDeploymentStore(nil)
podStore := resources.NewPodStore()
nsStore := resources.NewNamespaceStore(depStore, podStore)
func (s *ManagerImplSuite) TestGetClusterLabelsNil() {
labels, err := s.mgr.GetClusterLabels(context.Background(), "cluster-id")
s.NoError(err)
s.Nil(labels)
}

func (s *ManagerImplSuite) TestGetClusterLabelsReturnsStored() {
clusterLabels := map[string]string{
"env": "prod",
"region": "us-east-1",
}
s.mgr.clusterLabels.Store(&clusterLabels)

m := &manager{
namespaces: nsStore,
}
labels, err := s.mgr.GetClusterLabels(context.Background(), "cluster-id")
s.NoError(err)
s.Equal(map[string]string{
"env": "prod",
"region": "us-east-1",
}, labels)
}

// Add namespace to store
ns := &storage.NamespaceMetadata{
Name: "test-namespace",
Labels: map[string]string{
"team": "backend",
"tier": "app",
},
}
nsStore.ProcessEvent(central.ResourceAction_CREATE_RESOURCE, ns)
// --- Namespace labels ---

labels, err := m.GetNamespaceLabels(ctx, "cluster-id", "test-namespace")
require.NoError(t, err)
assert.Equal(t, map[string]string{
func (s *ManagerImplSuite) TestGetNamespaceLabelsReturnsStored() {
s.nsStore.ProcessEvent(central.ResourceAction_CREATE_RESOURCE, &storage.NamespaceMetadata{
Name: "test-namespace",
Labels: map[string]string{
"team": "backend",
"tier": "app",
}, labels)
},
})

t.Run("returns nil for non-existent namespace", func(t *testing.T) {
depStore := resources.NewDeploymentStore(nil)
podStore := resources.NewPodStore()
nsStore := resources.NewNamespaceStore(depStore, podStore)
labels, err := s.mgr.GetNamespaceLabels(context.Background(), "cluster-id", "test-namespace")
s.NoError(err)
s.Equal(map[string]string{
"team": "backend",
"tier": "app",
}, labels)
}

func (s *ManagerImplSuite) TestGetNamespaceLabelsNonExistent() {
labels, err := s.mgr.GetNamespaceLabels(context.Background(), "cluster-id", "nonexistent")
s.NoError(err)
s.Nil(labels)
}

// --- Image cache invalidation ---

m := &manager{
namespaces: nsStore,
}
func (s *ManagerImplSuite) TestInvalidateByImageID() {
s.mgr.state.Store(createTestState(false))
s.addToImageCache("sha256:abc")
s.addNameMapping("nginx:latest", "sha256:abc")

labels, err := m.GetNamespaceLabels(ctx, "cluster-id", "nonexistent")
require.NoError(t, err)
assert.Nil(t, labels)
s.invalidate(&central.InvalidateImageCache_ImageKey{
ImageId: "sha256:abc",
ImageFullName: "nginx:latest",
})

s.assertCached("sha256:abc", false, "image cache entry should be removed")
s.assertNameMapped("nginx:latest", false, "name-to-key mapping should be removed")
}

func (s *ManagerImplSuite) TestInvalidateByV2IDWhenFlattenEnabled() {
s.mgr.state.Store(createTestState(true))
s.addToImageCache("v2-uuid")
s.addNameMapping("nginx:latest", "v2-uuid")

s.invalidate(&central.InvalidateImageCache_ImageKey{
ImageId: "sha256:abc",
ImageIdV2: "v2-uuid",
ImageFullName: "nginx:latest",
})

s.assertCached("v2-uuid", false, "image cache entry should be removed by V2 key")
s.assertCached("sha256:abc", false, "V1 key should not be used when flatten is enabled and V2 key is present")
s.assertNameMapped("nginx:latest", false, "name-to-key mapping should be removed")
}

func (s *ManagerImplSuite) TestFlattenEnabledV2EmptyFallsBack() {
s.mgr.state.Store(createTestState(true))
s.addToImageCache("sha256:abc")

s.invalidate(&central.InvalidateImageCache_ImageKey{
ImageId: "sha256:abc",
ImageIdV2: "",
ImageFullName: "nginx:latest",
})

s.assertCached("sha256:abc", false, "image cache entry should be removed using V1 key as fallback")
}

func (s *ManagerImplSuite) TestOnlyFullNameRemovesNameMappingOnly() {
s.mgr.state.Store(createTestState(false))
s.addToImageCache("sha256:abc")
s.addNameMapping("nginx:latest", "sha256:abc")

s.invalidate(&central.InvalidateImageCache_ImageKey{
ImageFullName: "nginx:latest",
})

s.assertCached("sha256:abc", true, "image cache entry should NOT be removed when only fullName is provided")
s.assertNameMapped("nginx:latest", false, "name-to-key mapping should be removed")
}

func (s *ManagerImplSuite) TestOnlyImageIDRemovesCacheEntryOnly() {
s.mgr.state.Store(createTestState(false))
s.addToImageCache("sha256:abc")
s.addNameMapping("nginx:latest", "sha256:abc")

s.invalidate(&central.InvalidateImageCache_ImageKey{
ImageId: "sha256:abc",
})

s.assertCached("sha256:abc", false, "image cache entry should be removed")
s.assertNameMapped("nginx:latest", true, "name-to-key mapping should NOT be removed when fullName is empty")
}

func (s *ManagerImplSuite) TestMultipleKeysInOneMessage() {
s.mgr.state.Store(createTestState(false))
s.addToImageCache("sha256:abc")
s.addToImageCache("sha256:def")
s.addNameMapping("nginx:latest", "sha256:abc")
s.addNameMapping("redis:7", "sha256:def")

s.invalidate(
&central.InvalidateImageCache_ImageKey{ImageId: "sha256:abc", ImageFullName: "nginx:latest"},
&central.InvalidateImageCache_ImageKey{ImageId: "sha256:def", ImageFullName: "redis:7"},
)

s.assertCached("sha256:abc", false)
s.assertCached("sha256:def", false)
s.assertNameMapped("nginx:latest", false)
s.assertNameMapped("redis:7", false)
}

func (s *ManagerImplSuite) TestNilStateDefaultsFlattenToFalse() {
s.addToImageCache("sha256:abc")

s.invalidate(&central.InvalidateImageCache_ImageKey{
ImageId: "sha256:abc",
ImageIdV2: "v2-uuid",
})

s.assertCached("sha256:abc", false, "should use ImageId when state is nil (flatten defaults to false)")
}
Loading
Loading