diff --git a/pkg/centralsensor/caps_list.go b/pkg/centralsensor/caps_list.go index 10df7fcc8e46c..fcdb1fda32748 100644 --- a/pkg/centralsensor/caps_list.go +++ b/pkg/centralsensor/caps_list.go @@ -96,4 +96,9 @@ const ( // ComplianceV2TailoredProfiles identifies the capability of Central to handle // tailored profile tracking and scan configurations referencing tailored profiles. ComplianceV2TailoredProfiles = "ComplianceV2TailoredProfiles" + + // TargetedImageCacheInvalidation identifies the capability to forward + // per-image cache invalidation keys to the admission controller instead of + // flushing the entire image cache. + TargetedImageCacheInvalidation SensorCapability = "TargetedImageCacheInvalidation" ) diff --git a/sensor/admission-control/manager/manager.go b/sensor/admission-control/manager/manager.go index d92ce3c83567f..ffd802c69b5d0 100644 --- a/sensor/admission-control/manager/manager.go +++ b/sensor/admission-control/manager/manager.go @@ -20,6 +20,7 @@ type Manager interface { SettingsUpdateC() chan<- *sensor.AdmissionControlSettings ResourceUpdatesC() chan<- *sensor.AdmCtrlUpdateResourceRequest + ImageCacheInvalidationC() chan<- *sensor.AdmCtrlImageCacheInvalidation SettingsStream() concurrency.ReadOnlyValueStream[*sensor.AdmissionControlSettings] SensorConnStatusFlag() *concurrency.Flag diff --git a/sensor/admission-control/manager/manager_impl.go b/sensor/admission-control/manager/manager_impl.go index 02f499327afe8..d695c8c894fb6 100644 --- a/sensor/admission-control/manager/manager_impl.go +++ b/sensor/admission-control/manager/manager_impl.go @@ -108,6 +108,8 @@ type manager struct { imageNameCacheEnabled bool imageFetchGroup *coalescer.Coalescer[*storage.Image] + imageCacheInvalidationC chan *sensor.AdmCtrlImageCacheInvalidation + depClient sensor.DeploymentServiceClient resourceUpdatesC chan *sensor.AdmCtrlUpdateResourceRequest namespaces *resources.NamespaceStore @@ -160,12 +162,13 @@ func NewManager(namespace string, maxImageCacheSize int64, imageNameCacheEnabled alertsC: make(chan []*storage.Alert), - namespaces: nsStore, - deployments: depStore, - pods: podStore, - resourceUpdatesC: make(chan *sensor.AdmCtrlUpdateResourceRequest), - initialSyncSig: concurrency.NewSignal(), - depClient: deploymentServiceClient, + namespaces: nsStore, + deployments: depStore, + pods: podStore, + resourceUpdatesC: make(chan *sensor.AdmCtrlUpdateResourceRequest), + imageCacheInvalidationC: make(chan *sensor.AdmCtrlImageCacheInvalidation), + initialSyncSig: concurrency.NewSignal(), + depClient: deploymentServiceClient, ownNamespace: namespace, } @@ -247,6 +250,10 @@ func (m *manager) ResourceUpdatesC() chan<- *sensor.AdmCtrlUpdateResourceRequest return m.resourceUpdatesC } +func (m *manager) ImageCacheInvalidationC() chan<- *sensor.AdmCtrlImageCacheInvalidation { + return m.imageCacheInvalidationC +} + func (m *manager) InitialResourceSyncSig() *concurrency.Signal { return &m.initialSyncSig } @@ -264,6 +271,8 @@ func (m *manager) run() { m.ProcessNewSettings(newSettings) case req := <-m.resourceUpdatesC: m.processUpdateResourceRequest(req) + case inv := <-m.imageCacheInvalidationC: + m.processImageCacheInvalidation(inv) default: // Select on syncC only if there is nothing to be read from the main // channels. The duplication of select branches is a bit ugly, but inevitable @@ -276,6 +285,8 @@ func (m *manager) run() { m.ProcessNewSettings(newSettings) case req := <-m.resourceUpdatesC: m.processUpdateResourceRequest(req) + case inv := <-m.imageCacheInvalidationC: + m.processImageCacheInvalidation(inv) case syncSig := <-m.syncC: syncSig.Signal() } @@ -517,3 +528,29 @@ func (m *manager) processUpdateResourceRequest(req *sensor.AdmCtrlUpdateResource func (m *manager) getDeploymentForPod(namespace, podName string) *storage.Deployment { return m.deployments.Get(namespace, m.pods.GetDeploymentID(namespace, podName)) } + +func (m *manager) processImageCacheInvalidation(inv *sensor.AdmCtrlImageCacheInvalidation) { + s := m.currentState() + flatten := s != nil && s.GetFlattenImageData() + + invalidated := 0 + for _, key := range inv.GetImageKeys() { + cacheKey := key.GetImageId() + if flatten && key.GetImageIdV2() != "" { + cacheKey = key.GetImageIdV2() + } + fullName := key.GetImageFullName() + + if cacheKey != "" { + m.imageCache.Remove(cacheKey) + m.imageFetchGroup.Forget(cacheKey) + invalidated++ + } + if fullName != "" { + m.imageNameToImageCacheKey.Remove(fullName) + m.imageFetchGroup.Forget(fullName) + } + } + + log.Infof("Targeted image cache invalidation: invalidated %d entries", invalidated) +} diff --git a/sensor/admission-control/manager/manager_impl_test.go b/sensor/admission-control/manager/manager_impl_test.go index de3a77dbe62d3..6721b63038c51 100644 --- a/sensor/admission-control/manager/manager_impl_test.go +++ b/sensor/admission-control/manager/manager_impl_test.go @@ -3,81 +3,227 @@ package manager import ( "context" "testing" + "time" + lru "github.com/hashicorp/golang-lru/v2" "github.com/stackrox/rox/generated/internalapi/central" + "github.com/stackrox/rox/generated/internalapi/sensor" "github.com/stackrox/rox/generated/storage" + "github.com/stackrox/rox/pkg/coalescer" + "github.com/stackrox/rox/pkg/sizeboundedcache" "github.com/stackrox/rox/sensor/admission-control/resources" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" ) -func TestManager_GetClusterLabels(t *testing.T) { - ctx := context.Background() +type ManagerImplSuite struct { + suite.Suite + mgr *manager + nsStore *resources.NamespaceStore +} + +func TestManagerImplSuite(t *testing.T) { + suite.Run(t, new(ManagerImplSuite)) +} + +func (s *ManagerImplSuite) SetupSuite() { + cache, err := sizeboundedcache.New(1024*1024, 512*1024, func(key string, value imageCacheEntry) int64 { + return 1024 + }) + require.NoError(s.T(), err) + + nameCache, err := lru.New[string, string](imageNameCacheSize) + require.NoError(s.T(), err) + + s.mgr = &manager{ + imageCache: cache, + imageNameToImageCacheKey: nameCache, + imageFetchGroup: coalescer.New[*storage.Image](), + } +} + +func (s *ManagerImplSuite) SetupTest() { + s.mgr.imageCache.Purge() + s.mgr.imageNameToImageCacheKey.Purge() + s.mgr.clusterLabels.Store(nil) + + depStore := resources.NewDeploymentStore(nil) + podStore := resources.NewPodStore() + s.nsStore = resources.NewNamespaceStore(depStore, podStore) + s.mgr.namespaces = s.nsStore +} - t.Run("nil cluster labels returns nil", func(t *testing.T) { - m := &manager{} - labels, err := m.GetClusterLabels(ctx, "cluster-id") - require.NoError(t, err) - assert.Nil(t, labels) +func (s *ManagerImplSuite) addToImageCache(key string) { + s.mgr.imageCache.Add(key, imageCacheEntry{ + Image: &storage.Image{Id: key}, + timestamp: time.Now(), }) +} + +func (s *ManagerImplSuite) addNameMapping(name, key string) { + s.mgr.imageNameToImageCacheKey.Add(name, key) +} - t.Run("returns cluster labels", func(t *testing.T) { - m := &manager{} - clusterLabels := map[string]string{ - "env": "prod", - "region": "us-east-1", - } - m.clusterLabels.Store(&clusterLabels) - labels, err := m.GetClusterLabels(ctx, "cluster-id") - require.NoError(t, err) - assert.Equal(t, map[string]string{ - "env": "prod", - "region": "us-east-1", - }, labels) +func (s *ManagerImplSuite) assertCached(key string, expected bool, msgAndArgs ...interface{}) { + _, ok := s.mgr.imageCache.Get(key) + s.Equal(expected, ok, msgAndArgs...) +} + +func (s *ManagerImplSuite) assertNameMapped(name string, expected bool, msgAndArgs ...interface{}) { + _, ok := s.mgr.imageNameToImageCacheKey.Get(name) + s.Equal(expected, ok, msgAndArgs...) +} + +func (s *ManagerImplSuite) invalidate(keys ...*central.InvalidateImageCache_ImageKey) { + s.mgr.processImageCacheInvalidation(&sensor.AdmCtrlImageCacheInvalidation{ + ImageKeys: keys, }) } -func TestManager_GetNamespaceLabels(t *testing.T) { - ctx := context.Background() +// --- Cluster labels --- - t.Run("returns labels from namespace store", func(t *testing.T) { - depStore := resources.NewDeploymentStore(nil) - podStore := resources.NewPodStore() - nsStore := resources.NewNamespaceStore(depStore, podStore) +func (s *ManagerImplSuite) TestGetClusterLabelsNil() { + labels, err := s.mgr.GetClusterLabels(context.Background(), "cluster-id") + s.NoError(err) + s.Nil(labels) +} + +func (s *ManagerImplSuite) TestGetClusterLabelsReturnsStored() { + clusterLabels := map[string]string{ + "env": "prod", + "region": "us-east-1", + } + s.mgr.clusterLabels.Store(&clusterLabels) - m := &manager{ - namespaces: nsStore, - } + labels, err := s.mgr.GetClusterLabels(context.Background(), "cluster-id") + s.NoError(err) + s.Equal(map[string]string{ + "env": "prod", + "region": "us-east-1", + }, labels) +} - // Add namespace to store - ns := &storage.NamespaceMetadata{ - Name: "test-namespace", - Labels: map[string]string{ - "team": "backend", - "tier": "app", - }, - } - nsStore.ProcessEvent(central.ResourceAction_CREATE_RESOURCE, ns) +// --- Namespace labels --- - labels, err := m.GetNamespaceLabels(ctx, "cluster-id", "test-namespace") - require.NoError(t, err) - assert.Equal(t, map[string]string{ +func (s *ManagerImplSuite) TestGetNamespaceLabelsReturnsStored() { + s.nsStore.ProcessEvent(central.ResourceAction_CREATE_RESOURCE, &storage.NamespaceMetadata{ + Name: "test-namespace", + Labels: map[string]string{ "team": "backend", "tier": "app", - }, labels) + }, }) - t.Run("returns nil for non-existent namespace", func(t *testing.T) { - depStore := resources.NewDeploymentStore(nil) - podStore := resources.NewPodStore() - nsStore := resources.NewNamespaceStore(depStore, podStore) + labels, err := s.mgr.GetNamespaceLabels(context.Background(), "cluster-id", "test-namespace") + s.NoError(err) + s.Equal(map[string]string{ + "team": "backend", + "tier": "app", + }, labels) +} + +func (s *ManagerImplSuite) TestGetNamespaceLabelsNonExistent() { + labels, err := s.mgr.GetNamespaceLabels(context.Background(), "cluster-id", "nonexistent") + s.NoError(err) + s.Nil(labels) +} + +// --- Image cache invalidation --- - m := &manager{ - namespaces: nsStore, - } +func (s *ManagerImplSuite) TestInvalidateByImageID() { + s.mgr.state.Store(createTestState(false)) + s.addToImageCache("sha256:abc") + s.addNameMapping("nginx:latest", "sha256:abc") - labels, err := m.GetNamespaceLabels(ctx, "cluster-id", "nonexistent") - require.NoError(t, err) - assert.Nil(t, labels) + s.invalidate(¢ral.InvalidateImageCache_ImageKey{ + ImageId: "sha256:abc", + ImageFullName: "nginx:latest", }) + + s.assertCached("sha256:abc", false, "image cache entry should be removed") + s.assertNameMapped("nginx:latest", false, "name-to-key mapping should be removed") +} + +func (s *ManagerImplSuite) TestInvalidateByV2IDWhenFlattenEnabled() { + s.mgr.state.Store(createTestState(true)) + s.addToImageCache("v2-uuid") + s.addNameMapping("nginx:latest", "v2-uuid") + + s.invalidate(¢ral.InvalidateImageCache_ImageKey{ + ImageId: "sha256:abc", + ImageIdV2: "v2-uuid", + ImageFullName: "nginx:latest", + }) + + s.assertCached("v2-uuid", false, "image cache entry should be removed by V2 key") + s.assertCached("sha256:abc", false, "V1 key should not be used when flatten is enabled and V2 key is present") + s.assertNameMapped("nginx:latest", false, "name-to-key mapping should be removed") +} + +func (s *ManagerImplSuite) TestFlattenEnabledV2EmptyFallsBack() { + s.mgr.state.Store(createTestState(true)) + s.addToImageCache("sha256:abc") + + s.invalidate(¢ral.InvalidateImageCache_ImageKey{ + ImageId: "sha256:abc", + ImageIdV2: "", + ImageFullName: "nginx:latest", + }) + + s.assertCached("sha256:abc", false, "image cache entry should be removed using V1 key as fallback") +} + +func (s *ManagerImplSuite) TestOnlyFullNameRemovesNameMappingOnly() { + s.mgr.state.Store(createTestState(false)) + s.addToImageCache("sha256:abc") + s.addNameMapping("nginx:latest", "sha256:abc") + + s.invalidate(¢ral.InvalidateImageCache_ImageKey{ + ImageFullName: "nginx:latest", + }) + + s.assertCached("sha256:abc", true, "image cache entry should NOT be removed when only fullName is provided") + s.assertNameMapped("nginx:latest", false, "name-to-key mapping should be removed") +} + +func (s *ManagerImplSuite) TestOnlyImageIDRemovesCacheEntryOnly() { + s.mgr.state.Store(createTestState(false)) + s.addToImageCache("sha256:abc") + s.addNameMapping("nginx:latest", "sha256:abc") + + s.invalidate(¢ral.InvalidateImageCache_ImageKey{ + ImageId: "sha256:abc", + }) + + s.assertCached("sha256:abc", false, "image cache entry should be removed") + s.assertNameMapped("nginx:latest", true, "name-to-key mapping should NOT be removed when fullName is empty") +} + +func (s *ManagerImplSuite) TestMultipleKeysInOneMessage() { + s.mgr.state.Store(createTestState(false)) + s.addToImageCache("sha256:abc") + s.addToImageCache("sha256:def") + s.addNameMapping("nginx:latest", "sha256:abc") + s.addNameMapping("redis:7", "sha256:def") + + s.invalidate( + ¢ral.InvalidateImageCache_ImageKey{ImageId: "sha256:abc", ImageFullName: "nginx:latest"}, + ¢ral.InvalidateImageCache_ImageKey{ImageId: "sha256:def", ImageFullName: "redis:7"}, + ) + + s.assertCached("sha256:abc", false) + s.assertCached("sha256:def", false) + s.assertNameMapped("nginx:latest", false) + s.assertNameMapped("redis:7", false) +} + +func (s *ManagerImplSuite) TestNilStateDefaultsFlattenToFalse() { + s.addToImageCache("sha256:abc") + + s.invalidate(¢ral.InvalidateImageCache_ImageKey{ + ImageId: "sha256:abc", + ImageIdV2: "v2-uuid", + }) + + s.assertCached("sha256:abc", false, "should use ImageId when state is nil (flatten defaults to false)") } diff --git a/sensor/admission-control/settingswatch/sensor_push.go b/sensor/admission-control/settingswatch/sensor_push.go index ad88c35f18459..0f06bf51891b3 100644 --- a/sensor/admission-control/settingswatch/sensor_push.go +++ b/sensor/admission-control/settingswatch/sensor_push.go @@ -15,20 +15,22 @@ import ( // WatchSensorMessagePush watches for sensor pushes, and forwards them. func WatchSensorMessagePush(mgr manager.Manager, cc *grpc.ClientConn) { w := &sensorPushWatch{ - ctx: mgr.Stopped(), - mgmtServiceClient: sensor.NewAdmissionControlManagementServiceClient(cc), - settingsOutC: mgr.SettingsUpdateC(), - updateResourceReqOutC: mgr.ResourceUpdatesC(), - sensorConnStatus: mgr.SensorConnStatusFlag(), - initialResourceSync: mgr.InitialResourceSyncSig(), + ctx: mgr.Stopped(), + mgmtServiceClient: sensor.NewAdmissionControlManagementServiceClient(cc), + settingsOutC: mgr.SettingsUpdateC(), + updateResourceReqOutC: mgr.ResourceUpdatesC(), + imageCacheInvalidationOutC: mgr.ImageCacheInvalidationC(), + sensorConnStatus: mgr.SensorConnStatusFlag(), + initialResourceSync: mgr.InitialResourceSyncSig(), } go w.run() } type sensorPushWatch struct { - ctx concurrency.ErrorWaitable - settingsOutC chan<- *sensor.AdmissionControlSettings - updateResourceReqOutC chan<- *sensor.AdmCtrlUpdateResourceRequest + ctx concurrency.ErrorWaitable + settingsOutC chan<- *sensor.AdmissionControlSettings + updateResourceReqOutC chan<- *sensor.AdmCtrlUpdateResourceRequest + imageCacheInvalidationOutC chan<- *sensor.AdmCtrlImageCacheInvalidation sensorConnStatus *concurrency.Flag initialResourceSync *concurrency.Signal @@ -103,6 +105,12 @@ func (w *sensorPushWatch) dispatchMsg(msg *sensor.MsgToAdmissionControl) error { return errors.Wrap(w.ctx.Err(), "dispatching update resource request") case w.updateResourceReqOutC <- m.UpdateResourceRequest: } + case *sensor.MsgToAdmissionControl_ImageCacheInvalidation: + select { + case <-w.ctx.Done(): + return errors.Wrap(w.ctx.Err(), "dispatching image cache invalidation") + case w.imageCacheInvalidationOutC <- m.ImageCacheInvalidation: + } default: log.Warnf("Received message of unknown type %T from sensor, not sure what to do with it ...", m) } diff --git a/sensor/common/admissioncontroller/management_service.go b/sensor/common/admissioncontroller/management_service.go index fa265416c83a8..fcbf490db8904 100644 --- a/sensor/common/admissioncontroller/management_service.go +++ b/sensor/common/admissioncontroller/management_service.go @@ -25,8 +25,9 @@ var ( type managementService struct { sensor.UnimplementedAdmissionControlManagementServiceServer - settingsStream concurrency.ReadOnlyValueStream[*sensor.AdmissionControlSettings] - sensorEventsStream concurrency.ReadOnlyValueStream[*sensor.AdmCtrlUpdateResourceRequest] + settingsStream concurrency.ReadOnlyValueStream[*sensor.AdmissionControlSettings] + sensorEventsStream concurrency.ReadOnlyValueStream[*sensor.AdmCtrlUpdateResourceRequest] + imageCacheInvalidationStream concurrency.ReadOnlyValueStream[*sensor.AdmCtrlImageCacheInvalidation] alertHandler AlertHandler admCtrlMgr SettingsManager @@ -36,8 +37,9 @@ type managementService struct { // to admission control service replicas. func NewManagementService(mgr SettingsManager, alertHandler AlertHandler) pkgGRPC.APIService { return &managementService{ - settingsStream: mgr.SettingsStream(), - sensorEventsStream: mgr.SensorEventsStream(), + settingsStream: mgr.SettingsStream(), + sensorEventsStream: mgr.SensorEventsStream(), + imageCacheInvalidationStream: mgr.ImageCacheInvalidationStream(), alertHandler: alertHandler, admCtrlMgr: mgr, @@ -100,6 +102,7 @@ func (s *managementService) Communicate(stream sensor.AdmissionControlManagement } settingsIt := s.settingsStream.Iterator(false) + imageCacheInvIt := s.imageCacheInvalidationStream.Iterator(true) if err := s.sendCurrentSettings(stream, settingsIt); err != nil { return errors.Wrap(err, "sending initial settings") @@ -119,6 +122,10 @@ func (s *managementService) Communicate(stream sensor.AdmissionControlManagement if sensorEventIt != nil { sensorEventItrDoneC = sensorEventIt.Done() } + var imageCacheInvDoneC <-chan struct{} + if imageCacheInvIt != nil { + imageCacheInvDoneC = imageCacheInvIt.Done() + } select { case err := <-recvErrC: @@ -138,6 +145,11 @@ func (s *managementService) Communicate(stream sensor.AdmissionControlManagement if err := s.sendSensorEvent(stream, sensorEventIt); err != nil { return errors.Wrap(err, "sending sensor events to admission control service") } + case <-imageCacheInvDoneC: + imageCacheInvIt = imageCacheInvIt.TryNext() + if err := s.sendImageCacheInvalidation(stream, imageCacheInvIt); err != nil { + return errors.Wrap(err, "sending image cache invalidation to admission control service") + } case <-stream.Context().Done(): return errors.Wrap(stream.Context().Err(), "communicating") @@ -167,6 +179,22 @@ func (s *managementService) sendSensorEvent(stream sensor.AdmissionControlManage ) } +func (s *managementService) sendImageCacheInvalidation(stream sensor.AdmissionControlManagementService_CommunicateServer, iter concurrency.ValueStreamIter[*sensor.AdmCtrlImageCacheInvalidation]) error { + obj := iter.Value() + if obj == nil { + return nil + } + + return errors.Wrap( + stream.Send(&sensor.MsgToAdmissionControl{ + Msg: &sensor.MsgToAdmissionControl_ImageCacheInvalidation{ + ImageCacheInvalidation: obj, + }, + }), + "sending image cache invalidation", + ) +} + func (s *managementService) sync(stream sensor.AdmissionControlManagementService_CommunicateServer) error { for _, msg := range s.admCtrlMgr.GetResourcesForSync() { err := stream.Send(&sensor.MsgToAdmissionControl{ diff --git a/sensor/common/admissioncontroller/management_service_test.go b/sensor/common/admissioncontroller/management_service_test.go index 64e03553e3840..cc36f2513900c 100644 --- a/sensor/common/admissioncontroller/management_service_test.go +++ b/sensor/common/admissioncontroller/management_service_test.go @@ -35,11 +35,13 @@ func (s *managementServiceSuite) SetupTest() { func (s *managementServiceSuite) createManagementService() { s.mockSettingsManager.EXPECT().SettingsStream().Times(1) s.mockSettingsManager.EXPECT().SensorEventsStream().Times(1) + s.mockSettingsManager.EXPECT().ImageCacheInvalidationStream().Times(1) s.service = &managementService{ - settingsStream: s.mockSettingsManager.SettingsStream(), - sensorEventsStream: s.mockSettingsManager.SensorEventsStream(), - alertHandler: s.mockAlertHandler, - admCtrlMgr: s.mockSettingsManager, + settingsStream: s.mockSettingsManager.SettingsStream(), + sensorEventsStream: s.mockSettingsManager.SensorEventsStream(), + imageCacheInvalidationStream: s.mockSettingsManager.ImageCacheInvalidationStream(), + alertHandler: s.mockAlertHandler, + admCtrlMgr: s.mockSettingsManager, } } diff --git a/sensor/common/admissioncontroller/mocks/settings_manager.go b/sensor/common/admissioncontroller/mocks/settings_manager.go index 1ffda89030bee..d05b5a635e0a8 100644 --- a/sensor/common/admissioncontroller/mocks/settings_manager.go +++ b/sensor/common/admissioncontroller/mocks/settings_manager.go @@ -69,6 +69,32 @@ func (mr *MockSettingsManagerMockRecorder) GetResourcesForSync() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResourcesForSync", reflect.TypeOf((*MockSettingsManager)(nil).GetResourcesForSync)) } +// ImageCacheInvalidationStream mocks base method. +func (m *MockSettingsManager) ImageCacheInvalidationStream() concurrency.ReadOnlyValueStream[*sensor.AdmCtrlImageCacheInvalidation] { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ImageCacheInvalidationStream") + ret0, _ := ret[0].(concurrency.ReadOnlyValueStream[*sensor.AdmCtrlImageCacheInvalidation]) + return ret0 +} + +// ImageCacheInvalidationStream indicates an expected call of ImageCacheInvalidationStream. +func (mr *MockSettingsManagerMockRecorder) ImageCacheInvalidationStream() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImageCacheInvalidationStream", reflect.TypeOf((*MockSettingsManager)(nil).ImageCacheInvalidationStream)) +} + +// InvalidateImageCache mocks base method. +func (m *MockSettingsManager) InvalidateImageCache(keys []*central.InvalidateImageCache_ImageKey) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "InvalidateImageCache", keys) +} + +// InvalidateImageCache indicates an expected call of InvalidateImageCache. +func (mr *MockSettingsManagerMockRecorder) InvalidateImageCache(keys any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvalidateImageCache", reflect.TypeOf((*MockSettingsManager)(nil).InvalidateImageCache), keys) +} + // SensorEventsStream mocks base method. func (m *MockSettingsManager) SensorEventsStream() concurrency.ReadOnlyValueStream[*sensor.AdmCtrlUpdateResourceRequest] { m.ctrl.T.Helper() diff --git a/sensor/common/admissioncontroller/settings_manager.go b/sensor/common/admissioncontroller/settings_manager.go index ae8f629a19b3d..2894488bd48d7 100644 --- a/sensor/common/admissioncontroller/settings_manager.go +++ b/sensor/common/admissioncontroller/settings_manager.go @@ -18,7 +18,9 @@ type SettingsManager interface { GetResourcesForSync() []*sensor.AdmCtrlUpdateResourceRequest FlushCache() + InvalidateImageCache(keys []*central.InvalidateImageCache_ImageKey) SettingsStream() concurrency.ReadOnlyValueStream[*sensor.AdmissionControlSettings] SensorEventsStream() concurrency.ReadOnlyValueStream[*sensor.AdmCtrlUpdateResourceRequest] + ImageCacheInvalidationStream() concurrency.ReadOnlyValueStream[*sensor.AdmCtrlImageCacheInvalidation] } diff --git a/sensor/common/admissioncontroller/settings_manager_impl.go b/sensor/common/admissioncontroller/settings_manager_impl.go index 0506700f03916..fa6930a9506b9 100644 --- a/sensor/common/admissioncontroller/settings_manager_impl.go +++ b/sensor/common/admissioncontroller/settings_manager_impl.go @@ -22,6 +22,7 @@ type settingsManager struct { currSettings *sensor.AdmissionControlSettings settingsStream *concurrency.ValueStream[*sensor.AdmissionControlSettings] sensorEventsStream *concurrency.ValueStream[*sensor.AdmCtrlUpdateResourceRequest] + imageCacheInvalidationStream *concurrency.ValueStream[*sensor.AdmCtrlImageCacheInvalidation] hasClusterConfig, hasPolicies bool centralEndpoint string lastClusterLabels map[string]string @@ -45,9 +46,10 @@ type clusterIDWaiter interface { // NewSettingsManager creates a new settings manager for admission control settings. func NewSettingsManager(clusterID clusterIDWaiter, clusterLabels clusterLabelsGetter, deployments store.DeploymentStore, pods store.PodStore, namespaces store.NamespaceStore) SettingsManager { return &settingsManager{ - settingsStream: concurrency.NewValueStream[*sensor.AdmissionControlSettings](nil), - sensorEventsStream: concurrency.NewValueStream[*sensor.AdmCtrlUpdateResourceRequest](nil), - centralEndpoint: env.CentralEndpoint.Setting(), + settingsStream: concurrency.NewValueStream[*sensor.AdmissionControlSettings](nil), + sensorEventsStream: concurrency.NewValueStream[*sensor.AdmCtrlUpdateResourceRequest](nil), + imageCacheInvalidationStream: concurrency.NewValueStream[*sensor.AdmCtrlImageCacheInvalidation](nil), + centralEndpoint: env.CentralEndpoint.Setting(), clusterID: clusterID, clusterLabels: clusterLabels, @@ -178,6 +180,11 @@ func copyMap(m map[string]string) map[string]string { return result } +func (p *settingsManager) InvalidateImageCache(keys []*central.InvalidateImageCache_ImageKey) { + p.imageCacheInvalidationStream.Push(&sensor.AdmCtrlImageCacheInvalidation{ + ImageKeys: keys, + }) +} func (p *settingsManager) SettingsStream() concurrency.ReadOnlyValueStream[*sensor.AdmissionControlSettings] { return p.settingsStream } @@ -186,6 +193,10 @@ func (p *settingsManager) SensorEventsStream() concurrency.ReadOnlyValueStream[* return p.sensorEventsStream } +func (p *settingsManager) ImageCacheInvalidationStream() concurrency.ReadOnlyValueStream[*sensor.AdmCtrlImageCacheInvalidation] { + return p.imageCacheInvalidationStream +} + func (p *settingsManager) GetResourcesForSync() []*sensor.AdmCtrlUpdateResourceRequest { var ret []*sensor.AdmCtrlUpdateResourceRequest for _, d := range p.deployments.GetAll() { diff --git a/sensor/common/reprocessor/handler.go b/sensor/common/reprocessor/handler.go index 1fd52220eda1d..fef100e80c01b 100644 --- a/sensor/common/reprocessor/handler.go +++ b/sensor/common/reprocessor/handler.go @@ -64,7 +64,9 @@ func (h *handlerImpl) Notify(common.SensorComponentEvent) {} func (h *handlerImpl) Capabilities() []centralsensor.SensorCapability { // A new sensor capability to reprocess deployment has not been added. In case of mismatched upgrades, // the re-processing is discarded, which is fine. - return nil + return []centralsensor.SensorCapability{ + centralsensor.TargetedImageCacheInvalidation, + } } func (h *handlerImpl) ProcessReprocessDeployments(req *central.ReprocessDeployment) error { @@ -86,7 +88,7 @@ func (h *handlerImpl) ProcessInvalidateImageCache(req *central.InvalidateImageCa case <-h.stopSig.Done(): return errors.Wrap(h.stopSig.Err(), "could not fulfill invalidate image cache request") default: - h.admCtrlSettingsMgr.FlushCache() + h.admCtrlSettingsMgr.InvalidateImageCache(req.GetImageKeys()) keysToDelete := make([]cache.Key, 0, len(req.GetImageKeys())) for _, image := range req.GetImageKeys() { diff --git a/sensor/common/reprocessor/handler_test.go b/sensor/common/reprocessor/handler_test.go index 5d8f4aaa7bcbe..786408091930d 100644 --- a/sensor/common/reprocessor/handler_test.go +++ b/sensor/common/reprocessor/handler_test.go @@ -34,7 +34,7 @@ func TestProcessInvalidateImageCache_WithoutFlattenImageData(t *testing.T) { ctrl := gomock.NewController(t) mockAdmCtrlSettingsMgr := mocks.NewMockSettingsManager(ctrl) - mockAdmCtrlSettingsMgr.EXPECT().FlushCache().Times(1) + mockAdmCtrlSettingsMgr.EXPECT().InvalidateImageCache(gomock.Any()).Times(1) imageCache := expiringcache.NewExpiringCache[cache.Key, cache.Value](1 * time.Hour) @@ -118,7 +118,7 @@ func TestProcessInvalidateImageCache_WithFlattenImageData(t *testing.T) { ctrl := gomock.NewController(t) mockAdmCtrlSettingsMgr := mocks.NewMockSettingsManager(ctrl) - mockAdmCtrlSettingsMgr.EXPECT().FlushCache().Times(1) + mockAdmCtrlSettingsMgr.EXPECT().InvalidateImageCache(gomock.Any()).Times(1) imageCache := expiringcache.NewExpiringCache[cache.Key, cache.Value](1 * time.Hour)