Skip to content

Commit cbd7a3e

Browse files
committed
ROX-33333: Sensor targeted cache invalidation changes
1 parent 782897c commit cbd7a3e

File tree

8 files changed

+92
-16
lines changed

8 files changed

+92
-16
lines changed

pkg/centralsensor/caps_list.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,9 @@ const (
9696
// ComplianceV2TailoredProfiles identifies the capability of Central to handle
9797
// tailored profile tracking and scan configurations referencing tailored profiles.
9898
ComplianceV2TailoredProfiles = "ComplianceV2TailoredProfiles"
99+
100+
// TargetedImageCacheInvalidationCap identifies the capability to forward
101+
// per-image cache invalidation keys to the admission controller instead of
102+
// flushing the entire image cache.
103+
TargetedImageCacheInvalidationCap SensorCapability = "TargetedImageCacheInvalidation"
99104
)

sensor/common/admissioncontroller/management_service.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ var (
2525
type managementService struct {
2626
sensor.UnimplementedAdmissionControlManagementServiceServer
2727

28-
settingsStream concurrency.ReadOnlyValueStream[*sensor.AdmissionControlSettings]
29-
sensorEventsStream concurrency.ReadOnlyValueStream[*sensor.AdmCtrlUpdateResourceRequest]
28+
settingsStream concurrency.ReadOnlyValueStream[*sensor.AdmissionControlSettings]
29+
sensorEventsStream concurrency.ReadOnlyValueStream[*sensor.AdmCtrlUpdateResourceRequest]
30+
imageCacheInvalidationStream concurrency.ReadOnlyValueStream[*sensor.AdmCtrlImageCacheInvalidation]
3031

3132
alertHandler AlertHandler
3233
admCtrlMgr SettingsManager
@@ -36,8 +37,9 @@ type managementService struct {
3637
// to admission control service replicas.
3738
func NewManagementService(mgr SettingsManager, alertHandler AlertHandler) pkgGRPC.APIService {
3839
return &managementService{
39-
settingsStream: mgr.SettingsStream(),
40-
sensorEventsStream: mgr.SensorEventsStream(),
40+
settingsStream: mgr.SettingsStream(),
41+
sensorEventsStream: mgr.SensorEventsStream(),
42+
imageCacheInvalidationStream: mgr.ImageCacheInvalidationStream(),
4143

4244
alertHandler: alertHandler,
4345
admCtrlMgr: mgr,
@@ -109,6 +111,7 @@ func (s *managementService) Communicate(stream sensor.AdmissionControlManagement
109111
return errors.Wrap(err, "syncing resources")
110112
}
111113
sensorEventIt := s.sensorEventsStream.Iterator(true)
114+
imageCacheInvIt := s.imageCacheInvalidationStream.Iterator(true)
112115

113116
recvdMsgC := make(chan *sensor.MsgFromAdmissionControl)
114117
recvErrC := make(chan error, 1)
@@ -119,6 +122,10 @@ func (s *managementService) Communicate(stream sensor.AdmissionControlManagement
119122
if sensorEventIt != nil {
120123
sensorEventItrDoneC = sensorEventIt.Done()
121124
}
125+
var imageCacheInvDoneC <-chan struct{}
126+
if imageCacheInvIt != nil {
127+
imageCacheInvDoneC = imageCacheInvIt.Done()
128+
}
122129

123130
select {
124131
case err := <-recvErrC:
@@ -138,6 +145,11 @@ func (s *managementService) Communicate(stream sensor.AdmissionControlManagement
138145
if err := s.sendSensorEvent(stream, sensorEventIt); err != nil {
139146
return errors.Wrap(err, "sending sensor events to admission control service")
140147
}
148+
case <-imageCacheInvDoneC:
149+
imageCacheInvIt = imageCacheInvIt.TryNext()
150+
if err := s.sendImageCacheInvalidation(stream, imageCacheInvIt); err != nil {
151+
return errors.Wrap(err, "sending image cache invalidation to admission control service")
152+
}
141153

142154
case <-stream.Context().Done():
143155
return errors.Wrap(stream.Context().Err(), "communicating")
@@ -167,6 +179,22 @@ func (s *managementService) sendSensorEvent(stream sensor.AdmissionControlManage
167179
)
168180
}
169181

182+
func (s *managementService) sendImageCacheInvalidation(stream sensor.AdmissionControlManagementService_CommunicateServer, iter concurrency.ValueStreamIter[*sensor.AdmCtrlImageCacheInvalidation]) error {
183+
obj := iter.Value()
184+
if obj == nil {
185+
return nil
186+
}
187+
188+
return errors.Wrap(
189+
stream.Send(&sensor.MsgToAdmissionControl{
190+
Msg: &sensor.MsgToAdmissionControl_ImageCacheInvalidation{
191+
ImageCacheInvalidation: obj,
192+
},
193+
}),
194+
"sending image cache invalidation",
195+
)
196+
}
197+
170198
func (s *managementService) sync(stream sensor.AdmissionControlManagementService_CommunicateServer) error {
171199
for _, msg := range s.admCtrlMgr.GetResourcesForSync() {
172200
err := stream.Send(&sensor.MsgToAdmissionControl{

sensor/common/admissioncontroller/management_service_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,13 @@ func (s *managementServiceSuite) SetupTest() {
3535
func (s *managementServiceSuite) createManagementService() {
3636
s.mockSettingsManager.EXPECT().SettingsStream().Times(1)
3737
s.mockSettingsManager.EXPECT().SensorEventsStream().Times(1)
38+
s.mockSettingsManager.EXPECT().ImageCacheInvalidationStream().Times(1)
3839
s.service = &managementService{
39-
settingsStream: s.mockSettingsManager.SettingsStream(),
40-
sensorEventsStream: s.mockSettingsManager.SensorEventsStream(),
41-
alertHandler: s.mockAlertHandler,
42-
admCtrlMgr: s.mockSettingsManager,
40+
settingsStream: s.mockSettingsManager.SettingsStream(),
41+
sensorEventsStream: s.mockSettingsManager.SensorEventsStream(),
42+
imageCacheInvalidationStream: s.mockSettingsManager.ImageCacheInvalidationStream(),
43+
alertHandler: s.mockAlertHandler,
44+
admCtrlMgr: s.mockSettingsManager,
4345
}
4446
}
4547

sensor/common/admissioncontroller/mocks/settings_manager.go

Lines changed: 26 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sensor/common/admissioncontroller/settings_manager.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ type SettingsManager interface {
1818
GetResourcesForSync() []*sensor.AdmCtrlUpdateResourceRequest
1919

2020
FlushCache()
21+
InvalidateImageCache(keys []*central.InvalidateImageCache_ImageKey)
2122

2223
SettingsStream() concurrency.ReadOnlyValueStream[*sensor.AdmissionControlSettings]
2324
SensorEventsStream() concurrency.ReadOnlyValueStream[*sensor.AdmCtrlUpdateResourceRequest]
25+
ImageCacheInvalidationStream() concurrency.ReadOnlyValueStream[*sensor.AdmCtrlImageCacheInvalidation]
2426
}

sensor/common/admissioncontroller/settings_manager_impl.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type settingsManager struct {
2222
currSettings *sensor.AdmissionControlSettings
2323
settingsStream *concurrency.ValueStream[*sensor.AdmissionControlSettings]
2424
sensorEventsStream *concurrency.ValueStream[*sensor.AdmCtrlUpdateResourceRequest]
25+
imageCacheInvalidationStream *concurrency.ValueStream[*sensor.AdmCtrlImageCacheInvalidation]
2526
hasClusterConfig, hasPolicies bool
2627
centralEndpoint string
2728
lastClusterLabels map[string]string
@@ -45,9 +46,10 @@ type clusterIDWaiter interface {
4546
// NewSettingsManager creates a new settings manager for admission control settings.
4647
func NewSettingsManager(clusterID clusterIDWaiter, clusterLabels clusterLabelsGetter, deployments store.DeploymentStore, pods store.PodStore, namespaces store.NamespaceStore) SettingsManager {
4748
return &settingsManager{
48-
settingsStream: concurrency.NewValueStream[*sensor.AdmissionControlSettings](nil),
49-
sensorEventsStream: concurrency.NewValueStream[*sensor.AdmCtrlUpdateResourceRequest](nil),
50-
centralEndpoint: env.CentralEndpoint.Setting(),
49+
settingsStream: concurrency.NewValueStream[*sensor.AdmissionControlSettings](nil),
50+
sensorEventsStream: concurrency.NewValueStream[*sensor.AdmCtrlUpdateResourceRequest](nil),
51+
imageCacheInvalidationStream: concurrency.NewValueStream[*sensor.AdmCtrlImageCacheInvalidation](nil),
52+
centralEndpoint: env.CentralEndpoint.Setting(),
5153

5254
clusterID: clusterID,
5355
clusterLabels: clusterLabels,
@@ -178,6 +180,11 @@ func copyMap(m map[string]string) map[string]string {
178180
return result
179181
}
180182

183+
func (p *settingsManager) InvalidateImageCache(keys []*central.InvalidateImageCache_ImageKey) {
184+
p.imageCacheInvalidationStream.Push(&sensor.AdmCtrlImageCacheInvalidation{
185+
ImageKeys: keys,
186+
})
187+
}
181188
func (p *settingsManager) SettingsStream() concurrency.ReadOnlyValueStream[*sensor.AdmissionControlSettings] {
182189
return p.settingsStream
183190
}
@@ -186,6 +193,10 @@ func (p *settingsManager) SensorEventsStream() concurrency.ReadOnlyValueStream[*
186193
return p.sensorEventsStream
187194
}
188195

196+
func (p *settingsManager) ImageCacheInvalidationStream() concurrency.ReadOnlyValueStream[*sensor.AdmCtrlImageCacheInvalidation] {
197+
return p.imageCacheInvalidationStream
198+
}
199+
189200
func (p *settingsManager) GetResourcesForSync() []*sensor.AdmCtrlUpdateResourceRequest {
190201
var ret []*sensor.AdmCtrlUpdateResourceRequest
191202
for _, d := range p.deployments.GetAll() {

sensor/common/reprocessor/handler.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,11 @@ func (h *handlerImpl) Stop() {
6262
func (h *handlerImpl) Notify(common.SensorComponentEvent) {}
6363

6464
func (h *handlerImpl) Capabilities() []centralsensor.SensorCapability {
65-
// A new sensor capability to reprocess deployment has not been added. In case of mismatched upgrades,
65+
// No capability is advertised for deployment reprocessing. In case of mismatched upgrades,
6666
// the re-processing is discarded, which is fine.
67-
return nil
67+
return []centralsensor.SensorCapability{
68+
centralsensor.TargetedImageCacheInvalidationCap,
69+
}
6870
}
6971

7072
func (h *handlerImpl) ProcessReprocessDeployments(req *central.ReprocessDeployment) error {
@@ -86,7 +88,7 @@ func (h *handlerImpl) ProcessInvalidateImageCache(req *central.InvalidateImageCa
8688
case <-h.stopSig.Done():
8789
return errors.Wrap(h.stopSig.Err(), "could not fulfill invalidate image cache request")
8890
default:
89-
h.admCtrlSettingsMgr.FlushCache()
91+
h.admCtrlSettingsMgr.InvalidateImageCache(req.GetImageKeys())
9092

9193
keysToDelete := make([]cache.Key, 0, len(req.GetImageKeys()))
9294
for _, image := range req.GetImageKeys() {

sensor/common/reprocessor/handler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestProcessInvalidateImageCache_WithoutFlattenImageData(t *testing.T) {
3434

3535
ctrl := gomock.NewController(t)
3636
mockAdmCtrlSettingsMgr := mocks.NewMockSettingsManager(ctrl)
37-
mockAdmCtrlSettingsMgr.EXPECT().FlushCache().Times(1)
37+
mockAdmCtrlSettingsMgr.EXPECT().InvalidateImageCache(gomock.Any()).Times(1)
3838

3939
imageCache := expiringcache.NewExpiringCache[cache.Key, cache.Value](1 * time.Hour)
4040

@@ -118,7 +118,7 @@ func TestProcessInvalidateImageCache_WithFlattenImageData(t *testing.T) {
118118

119119
ctrl := gomock.NewController(t)
120120
mockAdmCtrlSettingsMgr := mocks.NewMockSettingsManager(ctrl)
121-
mockAdmCtrlSettingsMgr.EXPECT().FlushCache().Times(1)
121+
mockAdmCtrlSettingsMgr.EXPECT().InvalidateImageCache(gomock.Any()).Times(1)
122122

123123
imageCache := expiringcache.NewExpiringCache[cache.Key, cache.Value](1 * time.Hour)
124124

0 commit comments

Comments
 (0)